dataloader/tests/integration_tests/test_pipeline_load_tenera_i...

190 lines
6.7 KiB
Python
Raw Permalink Normal View History

2025-11-06 11:33:04 +01:00
"""
Интеграционные тесты для пайплайна load_tenera.
ВНИМАНИЕ: Эти тесты требуют работающего SuperTenera API и настоящего соединения с БД.
По умолчанию они исключены из запуска через pytest.mark.skip.
Для запуска используйте: pytest tests/integration_tests/test_pipeline_load_tenera_integration.py --no-skip
"""
from __future__ import annotations
import pytest
from dataloader.context import APP_CTX
from dataloader.interfaces.tenera.interface import get_async_tenera_interface
from dataloader.storage.repositories.quotes import QuotesRepository
from dataloader.workers.pipelines.load_tenera import load_tenera
@pytest.mark.integration
@pytest.mark.skip(reason="Requires working SuperTenera API - run manually when service is available")
class TestLoadTeneraIntegration:
"""Интеграционные тесты для пайплайна load_tenera."""
@pytest.mark.asyncio
async def test_full_tenera_pipeline_with_real_api(self, db_session):
"""
Тест полного пайплайна TENERA с реальным API.
Требования:
- SuperTenera API должен быть доступен
- База данных должна быть настроена
- Схема quotes должна существовать
- Таблицы quote_section, quote, quote_value должны существовать
"""
try:
async with get_async_tenera_interface() as tenera:
data = await tenera.get_quotes_data()
assert data is not None
except Exception as e:
pytest.skip(f"SuperTenera API not available: {e}")
steps = 0
async for _ in load_tenera({}):
steps += 1
assert steps > 0
async with APP_CTX.sessionmaker() as session:
result = await session.execute("SELECT COUNT(*) FROM quotes.quote_value")
count = result.scalar()
assert count > 0
@pytest.mark.asyncio
async def test_tenera_interface_get_quotes_data(self):
"""
Тест получения данных котировок из SuperTenera API.
Требование: SuperTenera API должен быть доступен.
"""
try:
async with get_async_tenera_interface() as tenera:
data = await tenera.get_quotes_data()
assert data is not None
assert hasattr(data, "cbr")
assert hasattr(data, "investing")
assert hasattr(data, "sgx")
assert hasattr(data, "tradingeconomics")
assert hasattr(data, "bloomberg")
assert hasattr(data, "trading_view")
except Exception as e:
pytest.skip(f"SuperTenera API not available: {e}")
@pytest.mark.asyncio
async def test_quotes_repository_get_section_by_name(self, db_session):
"""
Тест получения секции по имени.
Требование: схема quotes и таблица quote_section должны существовать в БД.
"""
repo = QuotesRepository(db_session)
section = await repo.get_section_by_name("cbr")
if section is not None:
assert section.section_nm == "cbr"
assert section.section_id is not None
else:
pytest.skip("Section 'cbr' not found in database - seed data required")
@pytest.mark.asyncio
async def test_quotes_repository_upsert_quote(self, db_session):
"""
Тест upsert котировки.
Требование: схема quotes должна существовать в БД.
"""
repo = QuotesRepository(db_session)
section = await repo.get_section_by_name("cbr")
if section is None:
pytest.skip("Section 'cbr' not found - cannot test upsert")
from datetime import datetime
quote = await repo.upsert_quote(
section=section,
name="TEST_USD",
last_update_dttm=datetime.now(),
)
assert quote is not None
assert quote.quote_nm == "TEST_USD"
assert quote.section_id == section.section_id
quote2 = await repo.upsert_quote(
section=section,
name="TEST_USD",
last_update_dttm=datetime.now(),
)
assert quote2.quote_id == quote.quote_id
@pytest.mark.asyncio
async def test_quotes_repository_bulk_upsert_quote_values(self, db_session):
"""
Тест массового upsert значений котировок.
Требование: схема quotes должна существовать в БД.
"""
repo = QuotesRepository(db_session)
section = await repo.get_section_by_name("cbr")
if section is None:
pytest.skip("Section 'cbr' not found - cannot test bulk upsert")
from datetime import datetime
quote = await repo.upsert_quote(
section=section,
name="TEST_BULK_USD",
last_update_dttm=datetime.now(),
)
test_rows = [
{
"dt": datetime(2025, 1, 15, i, 0, 0),
"value_base": 75.0 + i,
}
for i in range(5)
]
await repo.bulk_upsert_quote_values(quote, test_rows)
await db_session.commit()
result = await db_session.execute(
"SELECT COUNT(*) FROM quotes.quote_value WHERE quote_id = :quote_id",
{"quote_id": quote.quote_id},
)
count = result.scalar()
assert count == 5
@pytest.mark.asyncio
async def test_tenera_pipeline_processes_all_sources(self, db_session):
"""
Тест что пайплайн обрабатывает все источники.
Требования:
- SuperTenera API должен быть доступен
- Все секции должны существовать в БД
"""
try:
async with get_async_tenera_interface() as tenera:
data = await tenera.get_quotes_data()
sources_with_data = []
for source_name in ["cbr", "investing", "sgx", "tradingeconomics", "bloomberg", "trading_view"]:
source_data = getattr(data, source_name, None)
if source_data:
sources_with_data.append(source_name)
assert len(sources_with_data) > 0
except Exception as e:
pytest.skip(f"SuperTenera API not available: {e}")