1202 lines
40 KiB
Markdown
1202 lines
40 KiB
Markdown
|
|
Это вырезка файлов из ETL сервиса старого. Оно работало, но местами было не совсем оптимально.
|
|||
|
|
нужно сделать из этого файла полноценный ETL задачу для нашего сервиса, раскидать куда надо интерфейс, модели и т.п и зарегистрировать задачу.
|
|||
|
|
|
|||
|
|
src\dataloader\interfaces\tenera\interface.py -
|
|||
|
|
"""Интерфейс для взаимодействия с сервисом SuperTenera.
|
|||
|
|
|
|||
|
|
Позволяет запросить актуальные данные по котировкам с SuperTenera в json.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
from pathlib import Path
|
|||
|
|
import ssl
|
|||
|
|
import uuid
|
|||
|
|
from asyncio import TimeoutError
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import Any, Literal, Self
|
|||
|
|
|
|||
|
|
import aiohttp
|
|||
|
|
|
|||
|
|
from dataloader.config import APP_CONFIG
|
|||
|
|
from dataloader.interfaces.tenera.schemas import MainData
|
|||
|
|
from dataloader.interfaces.utils import log_method_call
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SuperTeneraConnectionError(Exception):
|
|||
|
|
"""Ошибка подключения к SuperTenera API."""
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SuperTeneraInterface:
|
|||
|
|
"""Интерфейс для взаимодействия с сервисом SuperTenera по http."""
|
|||
|
|
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
logger,
|
|||
|
|
base_url: str,
|
|||
|
|
*,
|
|||
|
|
timezone=None,
|
|||
|
|
) -> None:
|
|||
|
|
"""
|
|||
|
|
Constructor.
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
logger: Logger instance.
|
|||
|
|
base_url: base service url.
|
|||
|
|
timezone: timezone for datetime objects.
|
|||
|
|
"""
|
|||
|
|
self.logger = logger
|
|||
|
|
self.base_url = base_url
|
|||
|
|
self.timezone = timezone
|
|||
|
|
|
|||
|
|
self._session: aiohttp.ClientSession | None = None
|
|||
|
|
|
|||
|
|
self._ssl_context = None
|
|||
|
|
if APP_CONFIG.app.local:
|
|||
|
|
self._ssl_context = ssl.create_default_context(cafile=APP_CONFIG.certs.ca_bundle_file)
|
|||
|
|
self._ssl_context.load_cert_chain(certfile=APP_CONFIG.certs.cert_file, keyfile=APP_CONFIG.certs.key_file)
|
|||
|
|
|
|||
|
|
def form_base_headers(self) -> dict:
|
|||
|
|
"""Form metadata for call."""
|
|||
|
|
metadata_pairs = {
|
|||
|
|
"request-id": str(uuid.uuid4()),
|
|||
|
|
"request-time": str(datetime.now(tz=self.timezone).isoformat()),
|
|||
|
|
"system-id": APP_CONFIG.app.system_id,
|
|||
|
|
}
|
|||
|
|
return {metakey: metavalue for metakey, metavalue in metadata_pairs.items() if metavalue}
|
|||
|
|
|
|||
|
|
async def __aenter__(self) -> Self:
|
|||
|
|
"""Async context manager enter."""
|
|||
|
|
self._session = aiohttp.ClientSession(
|
|||
|
|
base_url=self.base_url,
|
|||
|
|
connector=aiohttp.TCPConnector(limit=100),
|
|||
|
|
headers=self.form_base_headers(),
|
|||
|
|
)
|
|||
|
|
return self
|
|||
|
|
|
|||
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
|
|||
|
|
"""Async context manager exit."""
|
|||
|
|
if exc_val is not None:
|
|||
|
|
self.logger.error(f"{exc_type}: {exc_val}")
|
|||
|
|
await self._session.close()
|
|||
|
|
|
|||
|
|
async def _get_request(
|
|||
|
|
self,
|
|||
|
|
url: str,
|
|||
|
|
encoding: str | None = None,
|
|||
|
|
content_type: str | None = "application/json",
|
|||
|
|
**kwargs,
|
|||
|
|
) -> Any:
|
|||
|
|
"""Get realization."""
|
|||
|
|
|
|||
|
|
kwargs["ssl"] = self._ssl_context
|
|||
|
|
|
|||
|
|
async with self._session.get(url, **kwargs) as response:
|
|||
|
|
if APP_CONFIG.app.debug:
|
|||
|
|
self.logger.debug(f"Response: {(await response.text(errors='ignore'))[:100]}")
|
|||
|
|
return await response.json(encoding=encoding, content_type=content_type)
|
|||
|
|
|
|||
|
|
@log_method_call
|
|||
|
|
async def get_quotes_data(self) -> MainData:
|
|||
|
|
"""Получить данные котировок от SuperTenera"""
|
|||
|
|
data = await self._get_request(APP_CONFIG.supertenera.quotes_endpoint)
|
|||
|
|
|
|||
|
|
# mock_path = Path(__file__).parent / "supertenera_response.json"
|
|||
|
|
# with open(mock_path, "r", encoding="utf-8") as f:
|
|||
|
|
# data = json.load(f)
|
|||
|
|
|
|||
|
|
return MainData.model_validate(data)
|
|||
|
|
|
|||
|
|
@log_method_call
|
|||
|
|
async def ping(self, **kwargs) -> Literal[True]:
|
|||
|
|
"""
|
|||
|
|
Быстрая проверка доступности SuperTenera API.
|
|||
|
|
|
|||
|
|
True - если ответ < 400, иначе SuperTeneraConnectionError.
|
|||
|
|
"""
|
|||
|
|
kwargs["ssl"] = self._ssl_context
|
|||
|
|
try:
|
|||
|
|
async with self._session.get(
|
|||
|
|
APP_CONFIG.supertenera.quotes_endpoint, timeout=APP_CONFIG.supertenera.timeout, **kwargs
|
|||
|
|
) as resp:
|
|||
|
|
resp.raise_for_status()
|
|||
|
|
return True
|
|||
|
|
except aiohttp.ClientResponseError as e:
|
|||
|
|
raise SuperTeneraConnectionError(
|
|||
|
|
f"Ошибка подключения к SuperTenera API при проверке системы - {e.status}."
|
|||
|
|
) from e
|
|||
|
|
except TimeoutError as e:
|
|||
|
|
raise SuperTeneraConnectionError(
|
|||
|
|
f"Ошибка Timeout подключения к SuperTenera API при проверке системы."
|
|||
|
|
) from e
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_async_tenera_interface() -> SuperTeneraInterface:
|
|||
|
|
"""Get SuperTenera instance."""
|
|||
|
|
from dataloader.context import APP_CTX
|
|||
|
|
|
|||
|
|
return SuperTeneraInterface(
|
|||
|
|
logger=APP_CTX.get_logger(),
|
|||
|
|
base_url=APP_CTX.get_tenera_base_url(),
|
|||
|
|
timezone=APP_CTX.get_pytz_timezone(),
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
src\dataloader\interfaces\tenera\schemas.py -
|
|||
|
|
"""Схемы ответов"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import re
|
|||
|
|
from typing import Literal
|
|||
|
|
|
|||
|
|
from pydantic import BaseModel, ConfigDict, Field, RootModel, field_validator
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TeneraBaseModel(BaseModel):
|
|||
|
|
"""Базовая модель для всех схем SuperTenera с настройкой populate_by_name."""
|
|||
|
|
|
|||
|
|
model_config = ConfigDict(
|
|||
|
|
populate_by_name=True,
|
|||
|
|
extra="ignore",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# --- TimePoint models ---
|
|||
|
|
|
|||
|
|
|
|||
|
|
class EmptyTimePoint(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Модель заглушка для полностью пустых значений в точке времени.
|
|||
|
|
Позволяет корректно валидировать случаи, когда JSON поле {} без содержимого.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
pass # pylint: disable=unnecessary-pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CbrTimePoint(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Структура данных точки времени для источника Центрального банка России (ЦБР).
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
- value: Строка с числовым значением ("80,32")
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
value: str
|
|||
|
|
|
|||
|
|
|
|||
|
|
class InvestingNumeric(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Структура данных точки времени для источника Investing.com в формате по странам.
|
|||
|
|
|
|||
|
|
Поля (alias на русском):
|
|||
|
|
- profit: Доходность
|
|||
|
|
- base_value: Базовое
|
|||
|
|
- max_value: Максимальное
|
|||
|
|
- min_value: Минимальное
|
|||
|
|
- change: Изменение
|
|||
|
|
- change_ptc: Процент изменений
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
profit: str = Field(alias="Доходность")
|
|||
|
|
base_value: str = Field(alias="Осн.")
|
|||
|
|
max_value: str = Field(alias="Макс.")
|
|||
|
|
min_value: str = Field(alias="Мин.")
|
|||
|
|
change: str = Field(alias="Изм.")
|
|||
|
|
change_ptc: str = Field(alias="Изм. %")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class InvestingCandlestick(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Структура данных точки времени для источника Investing.com в формате свечи.
|
|||
|
|
|
|||
|
|
Поля (alias латинскими заглавными буквами):
|
|||
|
|
- open_: "O"
|
|||
|
|
- high: "H"
|
|||
|
|
- low: "L"
|
|||
|
|
- close: "C"
|
|||
|
|
- interest: "I" | None
|
|||
|
|
- value: "V"
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
open_: str = Field(alias="O")
|
|||
|
|
high: str = Field(alias="H")
|
|||
|
|
low: str = Field(alias="L")
|
|||
|
|
close: str = Field(alias="C")
|
|||
|
|
interest: str | None = Field(alias="I")
|
|||
|
|
value: str = Field(alias="V")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class InvestingTimePoint(RootModel[EmptyTimePoint | InvestingNumeric | InvestingCandlestick]):
|
|||
|
|
"""
|
|||
|
|
Union-модель точки времени для источника Investing.com.
|
|||
|
|
|
|||
|
|
1) {} -> EmptyTImePoint
|
|||
|
|
2) numeric -> InvestingNumeric
|
|||
|
|
3) свечной -> InvestingCandlestick
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
pass # pylint: disable=unnecessary-pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SgxTimePoint(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Структура данных точки времени для источника Сингапурской биржи (SGX).
|
|||
|
|
|
|||
|
|
Поля (alias латинскими заглавными буквами):
|
|||
|
|
- open_: "O"
|
|||
|
|
- high: "H"
|
|||
|
|
- low: "L"
|
|||
|
|
- close: "C"
|
|||
|
|
- interest: "I"
|
|||
|
|
- value: "V"
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
open_: str | None = Field(alias="O")
|
|||
|
|
high: str | None = Field(alias="H")
|
|||
|
|
low: str | None = Field(alias="L")
|
|||
|
|
close: str | None = Field(alias="C")
|
|||
|
|
interest: str | None = Field(alias="I")
|
|||
|
|
value: str | None = Field(alias="V")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TradingEconomicsEmptyString(RootModel[str]):
|
|||
|
|
"""
|
|||
|
|
Валидирует точно пустую строку ("").
|
|||
|
|
|
|||
|
|
Используется для точек данных TradingEconomics, содержащих пустые строковые значения.
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
- root: Строка, которая должна быть точно пустой ("")
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
root: str
|
|||
|
|
|
|||
|
|
@field_validator("root", mode="before")
|
|||
|
|
@classmethod
|
|||
|
|
def _must_be_empty(cls, v) -> Literal[""]:
|
|||
|
|
if v == "":
|
|||
|
|
return v
|
|||
|
|
raise ValueError("not an empty string")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TradingEconomicsStringPercent(RootModel[str]):
|
|||
|
|
"""
|
|||
|
|
Валидирует строки-проценты вида "3.1%" или "-0,14%".
|
|||
|
|
|
|||
|
|
Принимает как запятую, так и точку в качестве десятичного разделителя.
|
|||
|
|
Шаблон: опциональный минус, цифры, опциональная десятичная часть, знак процента.
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
root: Строка с процентным значением (например: "3.1%", "-0,14%", "15%")
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
root: str
|
|||
|
|
|
|||
|
|
@field_validator("root")
|
|||
|
|
@classmethod
|
|||
|
|
def _check_percent(cls, v) -> str:
|
|||
|
|
if isinstance(v, str) and re.match(r"^-?\d+(?:[.,]\d+)?%$", v):
|
|||
|
|
return v
|
|||
|
|
raise ValueError(f"invalid percent string: {v!r}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TradingEconomicsStringTime(RootModel[str]):
|
|||
|
|
"""
|
|||
|
|
Валидирует строки времени в формате "h:mm AM/PM".
|
|||
|
|
|
|||
|
|
Примеры: "01:15 AM", "12:30 PM", "9:45 AM"
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
root: Строка времени в 12-часовом формате с AM/PM
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
root: str
|
|||
|
|
|
|||
|
|
@field_validator("root")
|
|||
|
|
@classmethod
|
|||
|
|
def _check_time(cls, v) -> str:
|
|||
|
|
if isinstance(v, str) and re.match(r"^(0?[1-9]|1[0-2]):[0-5]\d\s[AP]M$", v):
|
|||
|
|
return v
|
|||
|
|
raise ValueError(f"invalid time string: {v!r}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TradingEconomicsNumeric(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Полный числовой формат данных от TradingEconomics.
|
|||
|
|
|
|||
|
|
Содержит полную рыночную информацию с ценой, дневным изменением, процентами
|
|||
|
|
и различными периодическими изменениями (недельными, месячными, с начала года, год к году).
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
price: Текущая цена инструмента (алиас: "Price")
|
|||
|
|
day: Дневное изменение в абсолютных значениях (алиас: "Day")
|
|||
|
|
percent: Дневное изменение в процентах (алиас: "%")
|
|||
|
|
weekly: Недельное изменение (алиас: "Weekly")
|
|||
|
|
monthly: Месячное изменение (алиас: "Monthly")
|
|||
|
|
ytd: Изменение с начала года (алиас: "YTD")
|
|||
|
|
yoy: Изменение год к году (алиас: "YoY")
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
price: str = Field(alias="Price")
|
|||
|
|
day: str = Field(alias="Day")
|
|||
|
|
percent: str = Field(alias="%")
|
|||
|
|
weekly: str = Field(alias="Weekly")
|
|||
|
|
monthly: str = Field(alias="Monthly")
|
|||
|
|
ytd: str = Field(alias="YTD")
|
|||
|
|
yoy: str = Field(alias="YoY")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TradingEconomicsLastPrev(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Формат Last/Previous/Unit от TradingEconomics.
|
|||
|
|
|
|||
|
|
Содержит текущее значение, предыдущее значение и единицу измерения.
|
|||
|
|
Обычно используется для экономических индикаторов и статистики.
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
last: Последнее (текущее) значение показателя (алиас: "Last")
|
|||
|
|
previous: Предыдущее значение показателя (алиас: "Previous")
|
|||
|
|
unit: Единица измерения показателя (алиас: "Unit")
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
last: str = Field(alias="Last")
|
|||
|
|
previous: str = Field(alias="Previous")
|
|||
|
|
unit: str = Field(alias="Unit")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TradingEconomicsTimePoint(
|
|||
|
|
RootModel[
|
|||
|
|
EmptyTimePoint
|
|||
|
|
| TradingEconomicsEmptyString
|
|||
|
|
| TradingEconomicsStringPercent
|
|||
|
|
| TradingEconomicsStringTime
|
|||
|
|
| TradingEconomicsNumeric
|
|||
|
|
| TradingEconomicsLastPrev
|
|||
|
|
]
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
Объединение всех возможных форматов точек времени TradingEconomics.
|
|||
|
|
|
|||
|
|
Поддерживает:
|
|||
|
|
- Пустые объекты ({})
|
|||
|
|
- Пустые строки ("")
|
|||
|
|
- Строки-проценты ("3.1%", "-0,14%")
|
|||
|
|
- Строки времени ("01:15 AM")
|
|||
|
|
- Полные числовые объекты с полями цена/день/%
|
|||
|
|
- Объекты Last/Previous/Unit для экономических индикаторов
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
root: Один из поддерживаемых типов точек времени TradingEconomics
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
pass # pylint: disable=unnecessary-pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BloombergTimePoint(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Структура данных точки времени для источника Bloomberg.
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
- value: Строка с числовым значением ("80,32")
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
value: str
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TradingViewTimePoint(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Структура данных точки времени для источника TradingView.
|
|||
|
|
|
|||
|
|
Поля (alias латинскими заглавными буквами):
|
|||
|
|
- open_: "O"
|
|||
|
|
- high: "H"
|
|||
|
|
- low: "L"
|
|||
|
|
- close: "C"
|
|||
|
|
- volume: "Vol"
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
open_: str | None = Field(alias="O")
|
|||
|
|
high: str | None = Field(alias="H")
|
|||
|
|
low: str | None = Field(alias="L")
|
|||
|
|
close: str | None = Field(alias="C")
|
|||
|
|
volume: str | None = Field(alias="Vol")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TimePointUnion(
|
|||
|
|
RootModel[
|
|||
|
|
EmptyTimePoint
|
|||
|
|
| CbrTimePoint
|
|||
|
|
| InvestingTimePoint
|
|||
|
|
| SgxTimePoint
|
|||
|
|
| TradingEconomicsTimePoint
|
|||
|
|
| BloombergTimePoint
|
|||
|
|
| TradingViewTimePoint
|
|||
|
|
]
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
Универсальное объединение для точек времени от всех поддерживаемых источников данных.
|
|||
|
|
|
|||
|
|
Обрабатывает структуры данных от:
|
|||
|
|
- ЦБР (Центральный банк России)
|
|||
|
|
- Investing.com
|
|||
|
|
- SGX (Сингапурская биржа)
|
|||
|
|
- TradingEconomics
|
|||
|
|
- Bloomberg
|
|||
|
|
- TradingView
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
root: Точка времени от любого из поддерживаемых источников данных
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
pass # pylint: disable=unnecessary-pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
InstrumentData = dict[str | int, TimePointUnion]
|
|||
|
|
"""Тип: Отображение timestamp -> TimePointUnion."""
|
|||
|
|
|
|||
|
|
SourceData = dict[str, InstrumentData]
|
|||
|
|
"""Тип: Отображение имени инструмента -> InstrumentData."""
|
|||
|
|
|
|||
|
|
|
|||
|
|
class MainData(TeneraBaseModel):
|
|||
|
|
"""
|
|||
|
|
Основной контейнер данных для всех источников финансовых данных от SuperTenera.
|
|||
|
|
|
|||
|
|
Содержит опциональные данные от нескольких поставщиков финансовых данных,
|
|||
|
|
структурированные по источникам, а затем по инструментам.
|
|||
|
|
|
|||
|
|
Поля:
|
|||
|
|
cbr: Данные от Центрального банка России (опционально)
|
|||
|
|
investing: Данные от Investing.com (опционально)
|
|||
|
|
sgx: Данные от Сингапурской биржи (опционально)
|
|||
|
|
tradingeconomics: Данные от TradingEconomics (опционально)
|
|||
|
|
bloomberg: Данные от Bloomberg (опционально)
|
|||
|
|
trading_view: Данные от TradingView (опционально, алиас: "trading_view")
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
cbr: SourceData | None = None
|
|||
|
|
investing: SourceData | None = None
|
|||
|
|
sgx: SourceData | None = None
|
|||
|
|
tradingeconomics: SourceData | None = None
|
|||
|
|
bloomberg: SourceData | None = None
|
|||
|
|
trading_view: SourceData | None = Field(default=None, alias="trading_view")
|
|||
|
|
|
|||
|
|
@field_validator("investing", mode="before")
|
|||
|
|
@classmethod
|
|||
|
|
def _filter_investing(cls, v) -> SourceData | None:
|
|||
|
|
"""
|
|||
|
|
Фильтрация данных от Investing.com.
|
|||
|
|
|
|||
|
|
Убираем:
|
|||
|
|
- все ключи, у которых значение null
|
|||
|
|
- все ключи, которые выглядят как чистые числа (timestamps)
|
|||
|
|
|
|||
|
|
:param v: Объект с данными от Investing.com
|
|||
|
|
:return: Отфильтрованный объект
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
if isinstance(v, dict):
|
|||
|
|
return {key: value for key, value in v.items() if value is not None and not str(key).isdigit()}
|
|||
|
|
return v
|
|||
|
|
|
|||
|
|
src\dataloader\interfaces\tenera\__init__.py -
|
|||
|
|
from . import schemas
|
|||
|
|
from .interface import SuperTeneraInterface, get_async_tenera_interface
|
|||
|
|
|
|||
|
|
__all__ = [
|
|||
|
|
"schemas",
|
|||
|
|
"SuperTeneraInterface",
|
|||
|
|
"get_async_tenera_interface",
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
src\dataloader\models\quote.py -
|
|||
|
|
"""Quote модель."""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import TYPE_CHECKING
|
|||
|
|
|
|||
|
|
from sqlalchemy import JSON, TIMESTAMP, BigInteger, ForeignKey, String, UniqueConstraint, func
|
|||
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|||
|
|
|
|||
|
|
from dataloader.base import Base
|
|||
|
|
|
|||
|
|
if TYPE_CHECKING:
|
|||
|
|
from .quote_section import QuoteSection
|
|||
|
|
from .quote_value import QuoteValue
|
|||
|
|
|
|||
|
|
|
|||
|
|
class Quote(Base):
|
|||
|
|
"""Представляет custom_cib_quotes.quotes."""
|
|||
|
|
|
|||
|
|
__tablename__ = "quotes"
|
|||
|
|
__table_args__ = (UniqueConstraint("quote_sect_id", "name", name="ak_uq_quote_name_and_quotes"),)
|
|||
|
|
|
|||
|
|
quote_id: Mapped[int] = mapped_column(BigInteger(), primary_key=True)
|
|||
|
|
name: Mapped[str] = mapped_column(String, nullable=False)
|
|||
|
|
params: Mapped[dict | None] = mapped_column(JSON)
|
|||
|
|
srce: Mapped[str | None] = mapped_column(String)
|
|||
|
|
ticker: Mapped[str | None] = mapped_column(String)
|
|||
|
|
quote_sect_id: Mapped[int] = mapped_column(
|
|||
|
|
ForeignKey("quotes_sect.quote_sect_id", ondelete="CASCADE", onupdate="CASCADE"),
|
|||
|
|
nullable=False,
|
|||
|
|
)
|
|||
|
|
last_update_dttm: Mapped[datetime | None] = mapped_column(TIMESTAMP(timezone=True))
|
|||
|
|
|
|||
|
|
load_dttm: Mapped[datetime] = mapped_column(
|
|||
|
|
TIMESTAMP(timezone=False),
|
|||
|
|
nullable=False,
|
|||
|
|
server_default=func.current_timestamp(),
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
section: Mapped[QuoteSection] = relationship(back_populates="quotes")
|
|||
|
|
values: Mapped[list[QuoteValue]] = relationship(
|
|||
|
|
back_populates="quote",
|
|||
|
|
cascade="all, delete-orphan",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def __repr__(self) -> str:
|
|||
|
|
return f"<Quote id={self.quote_id} name=‘{self.name}’>"
|
|||
|
|
|
|||
|
|
src\dataloader\models\quote_value.py -
|
|||
|
|
"""Quote-value модель."""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import TYPE_CHECKING
|
|||
|
|
|
|||
|
|
from sqlalchemy import TIMESTAMP, BigInteger, DateTime, Float, ForeignKey, String, UniqueConstraint, func
|
|||
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|||
|
|
|
|||
|
|
from dataloader.base import Base
|
|||
|
|
|
|||
|
|
if TYPE_CHECKING:
|
|||
|
|
from .quote import Quote
|
|||
|
|
|
|||
|
|
|
|||
|
|
class QuoteValue(Base):
|
|||
|
|
"""Представляет custom_cib_quotes.quotes_values."""
|
|||
|
|
|
|||
|
|
__tablename__ = "quotes_values"
|
|||
|
|
__table_args__ = (UniqueConstraint("quote_id", "dt", name="ak_uq_quote_and_date_quotes"),)
|
|||
|
|
|
|||
|
|
quotes_values_id: Mapped[int] = mapped_column(
|
|||
|
|
BigInteger(),
|
|||
|
|
primary_key=True,
|
|||
|
|
autoincrement=True,
|
|||
|
|
)
|
|||
|
|
quote_id: Mapped[int] = mapped_column(
|
|||
|
|
ForeignKey("quotes.quote_id", ondelete="RESTRICT", onupdate="RESTRICT"),
|
|||
|
|
nullable=False,
|
|||
|
|
)
|
|||
|
|
dt: Mapped[datetime] = mapped_column(DateTime, nullable=False)
|
|||
|
|
|
|||
|
|
price_o: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
price_c: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
price_h: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
price_l: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
volume: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
|
|||
|
|
load_dttm: Mapped[datetime] = mapped_column(
|
|||
|
|
TIMESTAMP(timezone=False),
|
|||
|
|
nullable=False,
|
|||
|
|
server_default=func.current_timestamp(),
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
unit: Mapped[str | None] = mapped_column(String)
|
|||
|
|
key: Mapped[int | None] = mapped_column(BigInteger())
|
|||
|
|
|
|||
|
|
value_profit: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_base: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_max: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_min: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_chng: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_chng_prc: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
|
|||
|
|
price_i: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
price: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_day: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_prc: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_weekly_prc: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_monthly_prc: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_ytd_prc: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_yoy_prc: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_last: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
value_previous: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
|
|||
|
|
is_empty_str_flg: Mapped[bool | None] = mapped_column()
|
|||
|
|
interest: Mapped[float | None] = mapped_column(Float)
|
|||
|
|
|
|||
|
|
quote: Mapped[Quote] = relationship(back_populates="values")
|
|||
|
|
|
|||
|
|
def __repr__(self) -> str:
|
|||
|
|
return f"<QuoteValue id={self.quotes_values_id} dt={self.dt}>"
|
|||
|
|
|
|||
|
|
src\dataloader\models\quote_section.py -
|
|||
|
|
"""Quote-section модель."""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import TYPE_CHECKING
|
|||
|
|
|
|||
|
|
from sqlalchemy import JSON, TIMESTAMP, Integer, Sequence, String, func
|
|||
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|||
|
|
|
|||
|
|
from dataloader.base import Base
|
|||
|
|
|
|||
|
|
if TYPE_CHECKING:
|
|||
|
|
from .quote import Quote
|
|||
|
|
|
|||
|
|
|
|||
|
|
class QuoteSection(Base):
|
|||
|
|
"""Представляет custom_cib_quotes.quotes_sect."""
|
|||
|
|
|
|||
|
|
__tablename__ = "quotes_sect"
|
|||
|
|
|
|||
|
|
quote_sect_id: Mapped[int] = mapped_column(Integer(), Sequence("quotes_section_id_seq"), primary_key=True)
|
|||
|
|
name: Mapped[str] = mapped_column(String, nullable=False)
|
|||
|
|
params: Mapped[dict | None] = mapped_column(JSON)
|
|||
|
|
load_dttm: Mapped[datetime] = mapped_column(
|
|||
|
|
TIMESTAMP(timezone=False),
|
|||
|
|
nullable=False,
|
|||
|
|
server_default=func.current_timestamp(),
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
quotes: Mapped[list[Quote]] = relationship(
|
|||
|
|
back_populates="section",
|
|||
|
|
cascade="all, delete-orphan",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def __repr__(self) -> str:
|
|||
|
|
return f"<QuoteSection id={self.quote_sect_id} name=‘{self.name}’>"
|
|||
|
|
|
|||
|
|
src\dataloader\cruds\base.py -
|
|||
|
|
"""Базовый класс для CRUD"""
|
|||
|
|
|
|||
|
|
from typing import Generic, Protocol, TypeVar
|
|||
|
|
|
|||
|
|
import sqlalchemy as sa
|
|||
|
|
from sqlalchemy import exc
|
|||
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|||
|
|
|
|||
|
|
from dataloader.cruds.exceptions import InvalidDataException
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BaseProtocol(Protocol): # pylint: disable=too-few-public-methods
|
|||
|
|
"""Протокол для таблиц с id"""
|
|||
|
|
|
|||
|
|
id: int
|
|||
|
|
|
|||
|
|
|
|||
|
|
ID = TypeVar("ID", bound=BaseProtocol)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BaseCRUD(Generic[ID]):
|
|||
|
|
"""Базовый класс для CRUD"""
|
|||
|
|
|
|||
|
|
_table: type[ID]
|
|||
|
|
|
|||
|
|
def __init__(self, table: type[ID], session: AsyncSession, logger) -> None:
|
|||
|
|
"""
|
|||
|
|
Инициализация класса для взаимодействия с таблицей
|
|||
|
|
|
|||
|
|
:param table: Таблица
|
|||
|
|
:param session: Сессия
|
|||
|
|
:param logger: Логгер
|
|||
|
|
"""
|
|||
|
|
self._table = table
|
|||
|
|
self._session = session
|
|||
|
|
self._logger = logger
|
|||
|
|
|
|||
|
|
async def get(self, pk_id: ID) -> ID:
|
|||
|
|
"""
|
|||
|
|
Получение объекта по id
|
|||
|
|
|
|||
|
|
:param pk_id: Значение primary key таблицы
|
|||
|
|
:return: Объект
|
|||
|
|
"""
|
|||
|
|
return await self._session.get(self._table, pk_id)
|
|||
|
|
|
|||
|
|
async def get_all(self) -> list[ID]:
|
|||
|
|
"""
|
|||
|
|
Получение всех объектов из таблицы. Возвращает список объектов.
|
|||
|
|
|
|||
|
|
:return: Список объектов
|
|||
|
|
"""
|
|||
|
|
result = await self._session.scalars(sa.select(self._table))
|
|||
|
|
return list(result)
|
|||
|
|
|
|||
|
|
async def create(self, obj: ID) -> ID:
|
|||
|
|
"""
|
|||
|
|
Добавление объекта в таблицу.
|
|||
|
|
|
|||
|
|
Возвращает добавленный объект.
|
|||
|
|
|
|||
|
|
:raise
|
|||
|
|
InvalidDataException: Если в таблице есть объект с таким id или неверно указаны данные
|
|||
|
|
:param obj:
|
|||
|
|
:return:
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
self._session.add(obj)
|
|||
|
|
await self._session.commit()
|
|||
|
|
except exc.IntegrityError as e:
|
|||
|
|
self._logger.error(f"IntegrityError: {e}")
|
|||
|
|
await self._session.rollback()
|
|||
|
|
raise InvalidDataException from e
|
|||
|
|
await self._session.refresh(obj)
|
|||
|
|
return obj
|
|||
|
|
|
|||
|
|
async def update(self, obj: ID) -> None:
|
|||
|
|
"""Обновление объекта в таблице"""
|
|||
|
|
|
|||
|
|
async def delete(self, obj: ID) -> None:
|
|||
|
|
"""Удаление объекта из таблицы"""
|
|||
|
|
|
|||
|
|
src\dataloader\cruds\quotes\schemas.py -
|
|||
|
|
"""Pydantic DTO for incoming SuperTenera payloads."""
|
|||
|
|
|
|||
|
|
from datetime import date
|
|||
|
|
from typing import Any
|
|||
|
|
|
|||
|
|
from pydantic import BaseModel, Field, field_validator
|
|||
|
|
|
|||
|
|
|
|||
|
|
class QuoteValueIn(BaseModel):
|
|||
|
|
dt: date
|
|||
|
|
price_o: float | None = Field(None, alias="price_o")
|
|||
|
|
price_c: float | None = Field(None, alias="price_c")
|
|||
|
|
price_h: float | None = Field(None, alias="price_h")
|
|||
|
|
price_l: float | None = Field(None, alias="price_l")
|
|||
|
|
volume: float | None = None
|
|||
|
|
unit: str | None = None
|
|||
|
|
key: int | None = None
|
|||
|
|
value_profit: float | None = None
|
|||
|
|
value_base: float | None = None
|
|||
|
|
value_max: float | None = None
|
|||
|
|
value_min: float | None = None
|
|||
|
|
value_chng: float | None = None
|
|||
|
|
value_chng_prc: float | None = None
|
|||
|
|
price_i: float | None = None
|
|||
|
|
value_day: float | None = None
|
|||
|
|
value_prc: float | None = None
|
|||
|
|
value_weekly_prc: float | None = None
|
|||
|
|
value_monthly_prc: float | None = None
|
|||
|
|
value_ytd_prc: float | None = None
|
|||
|
|
value_yoy_prc: float | None = None
|
|||
|
|
value_last: float | None = None
|
|||
|
|
value_previous: float | None = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
class QuoteIn(BaseModel):
|
|||
|
|
section_name: str
|
|||
|
|
name: str
|
|||
|
|
params: dict[str, Any] | None = None
|
|||
|
|
srce: str | None = None
|
|||
|
|
ticker: str | None = None
|
|||
|
|
update_func: str | None = None
|
|||
|
|
values: list[QuoteValueIn]
|
|||
|
|
|
|||
|
|
@field_validator("values")
|
|||
|
|
def non_empty(cls, v) -> Any:
|
|||
|
|
if not v:
|
|||
|
|
raise ValueError("values list must not be empty")
|
|||
|
|
return v
|
|||
|
|
|
|||
|
|
src\dataloader\cruds\quotes\crud.py -
|
|||
|
|
"""CRUD-helpers for quotes-related tables."""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
from collections.abc import Sequence
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
import sqlalchemy as sa
|
|||
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|||
|
|
|
|||
|
|
from dataloader.context import APP_CTX
|
|||
|
|
from dataloader.cruds import BaseCRUD, InvalidDataException
|
|||
|
|
from dataloader.models import Quote, QuoteSection, QuoteValue
|
|||
|
|
|
|||
|
|
logger = APP_CTX.get_logger()
|
|||
|
|
|
|||
|
|
|
|||
|
|
class QuoteSectionCRUD(BaseCRUD[QuoteSection]):
|
|||
|
|
"""CRUD for QuoteSection."""
|
|||
|
|
|
|||
|
|
async def get_by_name(self, name: str) -> QuoteSection | None:
|
|||
|
|
stmt = sa.select(QuoteSection).where(QuoteSection.name == name)
|
|||
|
|
res = await self._session.scalar(stmt)
|
|||
|
|
return res
|
|||
|
|
|
|||
|
|
async def get_or_create(self, name: str, params: dict | None = None) -> QuoteSection:
|
|||
|
|
existing = await self.get_by_name(name)
|
|||
|
|
if existing:
|
|||
|
|
return existing
|
|||
|
|
|
|||
|
|
obj = QuoteSection(name=name, params=params or {})
|
|||
|
|
return await self.create(obj)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class QuoteCRUD(BaseCRUD[Quote]):
|
|||
|
|
"""CRUD for Quote with UPSERT-semantics."""
|
|||
|
|
|
|||
|
|
async def get_by_name(
|
|||
|
|
self,
|
|||
|
|
section_id: int,
|
|||
|
|
name: str,
|
|||
|
|
) -> Quote | None:
|
|||
|
|
stmt = sa.select(Quote).where(
|
|||
|
|
Quote.quote_sect_id == section_id,
|
|||
|
|
Quote.name == name,
|
|||
|
|
)
|
|||
|
|
res = await self._session.scalar(stmt)
|
|||
|
|
return res
|
|||
|
|
|
|||
|
|
async def upsert(
|
|||
|
|
self,
|
|||
|
|
section: QuoteSection,
|
|||
|
|
*,
|
|||
|
|
name: str,
|
|||
|
|
params: dict | None = None,
|
|||
|
|
srce: str | None = None,
|
|||
|
|
ticker: str | None = None,
|
|||
|
|
) -> Quote:
|
|||
|
|
"""Insert or update a quote and return the resulting row."""
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
now = datetime.now(tz=APP_CTX.get_pytz_timezone()).replace(tzinfo=None)
|
|||
|
|
|
|||
|
|
stmt = (
|
|||
|
|
pg_insert(Quote)
|
|||
|
|
.values(
|
|||
|
|
quote_sect_id=section.quote_sect_id,
|
|||
|
|
name=name,
|
|||
|
|
params=params,
|
|||
|
|
srce=srce,
|
|||
|
|
ticker=ticker,
|
|||
|
|
last_update_dttm=now,
|
|||
|
|
)
|
|||
|
|
.on_conflict_do_update(
|
|||
|
|
index_elements=["quote_sect_id", "name"],
|
|||
|
|
set_={
|
|||
|
|
"params": pg_insert(Quote).excluded.params,
|
|||
|
|
"srce": pg_insert(Quote).excluded.srce,
|
|||
|
|
"ticker": pg_insert(Quote).excluded.ticker,
|
|||
|
|
"last_update_dttm": now,
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
.returning(Quote)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
res = await self._session.scalar(stmt)
|
|||
|
|
|
|||
|
|
if not res:
|
|||
|
|
raise InvalidDataException("Failed to upsert quote")
|
|||
|
|
await self._session.commit()
|
|||
|
|
return res
|
|||
|
|
except Exception as e:
|
|||
|
|
await self._session.rollback()
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
|
|||
|
|
class QuoteValueCRUD(BaseCRUD[QuoteValue]):
|
|||
|
|
"""CRUD for QuoteValue with bulk UPSERT."""
|
|||
|
|
|
|||
|
|
async def bulk_upsert(
|
|||
|
|
self,
|
|||
|
|
quote: Quote,
|
|||
|
|
values: Sequence[dict],
|
|||
|
|
) -> None:
|
|||
|
|
"""Bulk insert / update values for a quote."""
|
|||
|
|
if not values:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
now = datetime.now(tz=APP_CTX.get_pytz_timezone()).replace(tzinfo=None)
|
|||
|
|
quote_id = quote.quote_id
|
|||
|
|
|
|||
|
|
update_columns = {
|
|||
|
|
c.name
|
|||
|
|
for c in QuoteValue.__table__.columns
|
|||
|
|
if c.name not in {"quotes_values_id", "quote_id", "dt", "load_dttm"}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
payload = [
|
|||
|
|
{
|
|||
|
|
"dt": item["dt"],
|
|||
|
|
"quote_id": quote_id,
|
|||
|
|
"load_dttm": now,
|
|||
|
|
**{col: item.get(col) for col in update_columns},
|
|||
|
|
}
|
|||
|
|
for item in values
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
insert_stmt = pg_insert(QuoteValue).values(payload)
|
|||
|
|
|
|||
|
|
update_cols = {col: insert_stmt.excluded[col] for col in update_columns}
|
|||
|
|
|
|||
|
|
stmt = insert_stmt.on_conflict_do_update(
|
|||
|
|
index_elements=["quote_id", "dt"],
|
|||
|
|
set_=update_cols,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
await self._session.execute(stmt)
|
|||
|
|
await self._session.commit()
|
|||
|
|
|
|||
|
|
async def list_for_period(
|
|||
|
|
self,
|
|||
|
|
quote_id: int,
|
|||
|
|
dt_from,
|
|||
|
|
dt_to,
|
|||
|
|
) -> list[QuoteValue]:
|
|||
|
|
stmt = (
|
|||
|
|
sa.select(QuoteValue)
|
|||
|
|
.where(
|
|||
|
|
QuoteValue.quote_id == quote_id,
|
|||
|
|
QuoteValue.dt.between(dt_from, dt_to),
|
|||
|
|
)
|
|||
|
|
.order_by(QuoteValue.dt)
|
|||
|
|
)
|
|||
|
|
res = await self._session.scalars(stmt)
|
|||
|
|
return list(res)
|
|||
|
|
|
|||
|
|
src\dataloader\api\v1\service.py -
|
|||
|
|
"""Бизнес-логика загрузки котировок из SuperTenera и сохранения в БД."""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
from datetime import date, datetime
|
|||
|
|
from typing import Any
|
|||
|
|
|
|||
|
|
from dataloader.context import APP_CTX
|
|||
|
|
from dataloader.cruds.quotes.crud import QuoteCRUD, QuoteSectionCRUD, QuoteValueCRUD
|
|||
|
|
from dataloader.interfaces.tenera.interface import get_async_tenera_interface
|
|||
|
|
from dataloader.interfaces.tenera.schemas import (
|
|||
|
|
BloombergTimePoint,
|
|||
|
|
CbrTimePoint,
|
|||
|
|
InvestingCandlestick,
|
|||
|
|
InvestingNumeric,
|
|||
|
|
InvestingTimePoint,
|
|||
|
|
SgxTimePoint,
|
|||
|
|
TimePointUnion,
|
|||
|
|
TradingEconomicsEmptyString,
|
|||
|
|
TradingEconomicsLastPrev,
|
|||
|
|
TradingEconomicsNumeric,
|
|||
|
|
TradingEconomicsStringPercent,
|
|||
|
|
TradingEconomicsStringTime,
|
|||
|
|
TradingEconomicsTimePoint,
|
|||
|
|
TradingViewTimePoint,
|
|||
|
|
)
|
|||
|
|
from dataloader.models import Quote, QuoteSection, QuoteValue
|
|||
|
|
|
|||
|
|
logger = APP_CTX.get_logger()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _to_float(value: str | int | float | None) -> float | None:
|
|||
|
|
"""Преобразует строковые числа с запятыми/процентами к float."""
|
|||
|
|
if value is None:
|
|||
|
|
return None
|
|||
|
|
if isinstance(value, int | float):
|
|||
|
|
return float(value)
|
|||
|
|
s = str(value).strip().replace(" ", "").replace("%", "").replace(",", ".")
|
|||
|
|
if s == "":
|
|||
|
|
return None
|
|||
|
|
try:
|
|||
|
|
return float(s)
|
|||
|
|
except ValueError:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _parse_ts_to_datetime(ts: str) -> datetime | None:
|
|||
|
|
"""Преобразует строку с Unix timestamp в datetime без таймзоны, но в таймзоне приложения."""
|
|||
|
|
if not ts or not ts.strip().isdigit():
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
timestamp = int(ts.strip())
|
|||
|
|
dt_aware = datetime.fromtimestamp(timestamp, tz=APP_CTX.get_pytz_timezone())
|
|||
|
|
return dt_aware.replace(tzinfo=None)
|
|||
|
|
except (ValueError, OSError, OverflowError):
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _build_value_row(source: str, dt: date, point: Any) -> dict[str, Any] | None: # noqa: C901
|
|||
|
|
"""Строит строку для `quotes_values` по источнику и типу точки."""
|
|||
|
|
if isinstance(point, int):
|
|||
|
|
return {"dt": dt, "key": point}
|
|||
|
|
|
|||
|
|
if isinstance(point, TimePointUnion):
|
|||
|
|
inner = point.root
|
|||
|
|
if isinstance(inner, InvestingTimePoint):
|
|||
|
|
deep_inner = inner.root
|
|||
|
|
if isinstance(deep_inner, InvestingNumeric):
|
|||
|
|
return {
|
|||
|
|
"dt": dt,
|
|||
|
|
"value_profit": _to_float(deep_inner.profit),
|
|||
|
|
"value_base": _to_float(deep_inner.base_value),
|
|||
|
|
"value_max": _to_float(deep_inner.max_value),
|
|||
|
|
"value_min": _to_float(deep_inner.min_value),
|
|||
|
|
"value_chng": _to_float(deep_inner.change),
|
|||
|
|
"value_chng_prc": _to_float(deep_inner.change_ptc),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if isinstance(deep_inner, InvestingCandlestick):
|
|||
|
|
return {
|
|||
|
|
"dt": dt,
|
|||
|
|
"price_o": _to_float(getattr(deep_inner, "open_", None) or getattr(deep_inner, "open", None)),
|
|||
|
|
"price_h": _to_float(deep_inner.high),
|
|||
|
|
"price_l": _to_float(deep_inner.low),
|
|||
|
|
"price_c": _to_float(deep_inner.close),
|
|||
|
|
"volume": _to_float(deep_inner.value),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if isinstance(inner, TradingViewTimePoint | SgxTimePoint):
|
|||
|
|
return {
|
|||
|
|
"dt": dt,
|
|||
|
|
"price_o": _to_float(getattr(inner, "open_", None) or getattr(inner, "open", None)),
|
|||
|
|
"price_h": _to_float(inner.high),
|
|||
|
|
"price_l": _to_float(inner.low),
|
|||
|
|
"price_c": _to_float(inner.close),
|
|||
|
|
"volume": _to_float(
|
|||
|
|
getattr(inner, "volume", None) or getattr(inner, "interest", None) or getattr(inner, "value", None)
|
|||
|
|
),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if isinstance(inner, BloombergTimePoint):
|
|||
|
|
return {
|
|||
|
|
"dt": dt,
|
|||
|
|
"value_base": _to_float(inner.value),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if isinstance(inner, CbrTimePoint):
|
|||
|
|
return {
|
|||
|
|
"dt": dt,
|
|||
|
|
"value_base": _to_float(inner.value),
|
|||
|
|
}
|
|||
|
|
if isinstance(inner, TradingEconomicsTimePoint):
|
|||
|
|
deep_inner = inner.root
|
|||
|
|
|
|||
|
|
if isinstance(deep_inner, TradingEconomicsNumeric):
|
|||
|
|
return {
|
|||
|
|
"dt": dt,
|
|||
|
|
"price_i": _to_float(deep_inner.price),
|
|||
|
|
"value_day": _to_float(deep_inner.day),
|
|||
|
|
"value_prc": _to_float(deep_inner.percent),
|
|||
|
|
"value_weekly_prc": _to_float(deep_inner.weekly),
|
|||
|
|
"value_monthly_prc": _to_float(deep_inner.monthly),
|
|||
|
|
"value_ytd_prc": _to_float(deep_inner.ytd),
|
|||
|
|
"value_yoy_prc": _to_float(deep_inner.yoy),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if isinstance(deep_inner, TradingEconomicsLastPrev):
|
|||
|
|
return {
|
|||
|
|
"dt": dt,
|
|||
|
|
"value_last": _to_float(deep_inner.last),
|
|||
|
|
"value_previous": _to_float(deep_inner.previous),
|
|||
|
|
"unit": str(deep_inner.unit) if deep_inner.unit is not None else None,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if isinstance(deep_inner, TradingEconomicsStringPercent):
|
|||
|
|
return {
|
|||
|
|
"dt": dt,
|
|||
|
|
"value_prc": _to_float(deep_inner.root),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if isinstance(deep_inner, TradingEconomicsStringTime):
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
if isinstance(deep_inner, TradingEconomicsEmptyString):
|
|||
|
|
return {
|
|||
|
|
"dt": dt,
|
|||
|
|
"is_empty_str_flg": True,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def fetch_and_save_data() -> None:
|
|||
|
|
"""Загружает данные из SuperTenera и апсертом сохраняет их в БД."""
|
|||
|
|
logger.info("ETL start")
|
|||
|
|
|
|||
|
|
async with get_async_tenera_interface() as tenera:
|
|||
|
|
data = await tenera.get_quotes_data()
|
|||
|
|
|
|||
|
|
async with APP_CTX.async_session_maker() as session:
|
|||
|
|
section_crud = QuoteSectionCRUD(QuoteSection, session, logger)
|
|||
|
|
quote_crud = QuoteCRUD(Quote, session, logger)
|
|||
|
|
value_crud = QuoteValueCRUD(QuoteValue, session, logger)
|
|||
|
|
|
|||
|
|
for source_name in ("cbr", "investing", "sgx", "tradingeconomics", "bloomberg", "trading_view"):
|
|||
|
|
source_data = getattr(data, source_name)
|
|||
|
|
if not source_data:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
section = await section_crud.get_by_name(source_name)
|
|||
|
|
if section is None:
|
|||
|
|
logger.warning(f"Section ‘{source_name}’ not found. Skipping source.")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
for instrument_name, instrument_data in source_data.items():
|
|||
|
|
quote = await quote_crud.upsert(
|
|||
|
|
section=section,
|
|||
|
|
name=instrument_name,
|
|||
|
|
)
|
|||
|
|
rows: list[dict[str, Any]] = []
|
|||
|
|
for ts, tp in instrument_data.items():
|
|||
|
|
dt = _parse_ts_to_datetime(str(ts))
|
|||
|
|
if not dt:
|
|||
|
|
continue
|
|||
|
|
row = _build_value_row(source_name, dt, tp)
|
|||
|
|
if row is None:
|
|||
|
|
continue
|
|||
|
|
rows.append(row)
|
|||
|
|
|
|||
|
|
await value_crud.bulk_upsert(quote, rows)
|
|||
|
|
|
|||
|
|
logger.info("ETL complete")
|
|||
|
|
|
|||
|
|
src\dataloader\api\v1\router.py -
|
|||
|
|
"""
|
|||
|
|
Основное ядро API co всеми endpoint-ми.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from fastapi import APIRouter, BackgroundTasks
|
|||
|
|
|
|||
|
|
from dataloader.context import APP_CTX
|
|||
|
|
|
|||
|
|
from .service import fetch_and_save_data
|
|||
|
|
|
|||
|
|
router = APIRouter()
|
|||
|
|
logger = APP_CTX.get_logger()
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.post("/trigger_data")
|
|||
|
|
async def trigger_data(background_tasks: BackgroundTasks) -> dict[str, str]:
|
|||
|
|
background_tasks.add_task(fetch_and_save_data)
|
|||
|
|
return {"message": "ETL started"}
|
|||
|
|
|
|||
|
|
и в конфиге -
|
|||
|
|
|
|||
|
|
class SuperTenera(BaseAppSettings):
|
|||
|
|
"""
|
|||
|
|
Настройки интеграции с другими сервисами.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
host: Annotated[str, BeforeValidator(strip_slashes)] = Field(
|
|||
|
|
validation_alias="SUPERTENERA_HOST", default="ci03801737-ift-tenera-giga.delta.sbrf.ru/atlant360bc/"
|
|||
|
|
)
|
|||
|
|
port: str = Field(validation_alias="SUPERTENERA_PORT", default="443")
|
|||
|
|
|
|||
|
|
quotes_endpoint: Annotated[str, BeforeValidator(strip_slashes)] = Field(
|
|||
|
|
validation_alias="SUPERTENERA_QUOTES_ENDPOINT", default="/get_gigaparser_quotes/"
|
|||
|
|
)
|
|||
|
|
timeout: int = 20
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def base_url(self) -> str:
|
|||
|
|
"""Возвращает абсолютный URL"""
|
|||
|
|
domain, raw_path = self.host.split("/", 1) if "/" in self.host else (self.host, "")
|
|||
|
|
return build_url(self.protocol, domain, self.port, raw_path)
|