dataloader/TZ.md

307 lines
14 KiB
Markdown
Raw Permalink Normal View History

2025-11-04 23:11:41 +01:00
# ТЗ: `dataloader` (один пакет, async, PG-очередь, LISTEN/NOTIFY)
## 1) Назначение и рамки
2025-11-05 14:01:37 +01:00
`dataloader` - сервис постановки и исполнения долгих ETL-задач через одну общую очередь в Postgres. Сервис предоставляет HTTP-ручки для триггера задач, мониторинга статуса и отмены; внутри процесса запускает N асинхронных воркеров, которые конкурируют за задачи через `SELECT … FOR UPDATE SKIP LOCKED`, держат lease/heartbeat, делают идемпотентные записи в целевые БД и корректно обрабатывают повторы.
2025-11-04 23:11:41 +01:00
2025-11-05 14:01:37 +01:00
Архитектura и инфраструктурные части соответствуют шаблону `rest_template.md`: единый пакет, `os_router.py` с `/health` и `/status`, middleware логирования, структура каталогов и конфиг-классы - **как в шаблоне**.
2025-11-04 23:11:41 +01:00
---
## 2) Архитектура (одно приложение, async)
* **FastAPI-приложение**: HTTP API v1, инфраструктурные роуты (`/health`, `/status`) из шаблона, middleware и логирование из шаблона.
2025-11-05 14:01:37 +01:00
* **WorkerManager**: на `startup` читает конфиг (`WORKERS_JSON`) и поднимает M асинхронных воркер-циклов (по очередям и уровням параллелизма). На `shutdown` - мягкая остановка.
2025-11-04 23:11:41 +01:00
* **PG Queue**: одна таблица `dl_jobs` на все очереди и сервисы; журнал `dl_job_events`; триггеры LISTEN/NOTIFY для пробуждения воркеров без активного поллинга.
---
## 3) Структура репозитория (один пакет, как в шаблоне)
```
dataloader/
├── src/
│ └── dataloader/
│ ├── __main__.py # точка входа FastAPI + запуск WorkerManager (по шаблону)
│ ├── config.py # Pydantic Settings: DSN, тайминги, WORKERS_JSON
│ ├── base.py
│ ├── context.py # AppContext: engine/sessionmaker, DI
│ ├── exceptions.py
│ ├── logger/ # не менять тип и контракты
│ │ ├── __init__.py
│ │ ├── context_vars.py
│ │ ├── logger.py
│ │ ├── models.py
│ │ ├── utils.py
│ │ └── uvicorn_logging_config.py
│ ├── api/
│ │ ├── __init__.py # регистрация роутов (v1, os_router, metric_router)
│ │ ├── middleware.py
│ │ ├── os_router.py # /health, /status
│ │ ├── metric_router.py
│ │ └── v1/
│ │ ├── router.py # POST /jobs/trigger, GET /jobs/{id}/status, POST /jobs/{id}/cancel
│ │ ├── schemas.py # pydantic запросы/ответы
│ │ ├── service.py # бизнес-логика
│ │ ├── models.py
│ │ ├── exceptions.py
│ │ └── utils.py
│ ├── storage/
│ │ ├── db.py # async engine + sessionmaker
│ │ └── repositories.py # SQL-операции по очереди и событиям
│ └── workers/
│ ├── manager.py # создание asyncio Tasks воркеров по конфигу
│ ├── base.py # общий PG-воркер: claim/lease/heartbeat/retry
│ └── pipelines/
│ ├── __init__.py
│ └── registry.py # реестр обработчиков по task
├── tests/
│ └── integration_tests/
│ ├── conftest.py
│ └── v1_api/
│ ├── constants.py
│ └── test_service.py
├── pyproject.toml
├── Dockerfile
├── .env
└── .gitignore
```
Структура, ролевые файлы и подход соответствуют `rest_template.md`.
---
## 4) DDL очереди (общая для всех сервисов)
> Таблицы уже созданы и доступны приложению.
```sql
CREATE TYPE dl_status AS ENUM ('queued','running','succeeded','failed','canceled','lost');
CREATE TABLE dl_jobs (
job_id uuid PRIMARY KEY,
queue text NOT NULL,
task text NOT NULL,
args jsonb NOT NULL DEFAULT '{}'::jsonb,
idempotency_key text UNIQUE,
lock_key text NOT NULL,
partition_key text NOT NULL DEFAULT '',
priority int NOT NULL DEFAULT 100,
available_at timestamptz NOT NULL DEFAULT now(),
status dl_status NOT NULL DEFAULT 'queued',
attempt int NOT NULL DEFAULT 0,
max_attempts int NOT NULL DEFAULT 5,
lease_ttl_sec int NOT NULL DEFAULT 60,
lease_expires_at timestamptz,
heartbeat_at timestamptz,
cancel_requested boolean NOT NULL DEFAULT false,
progress jsonb NOT NULL DEFAULT '{}'::jsonb,
error text,
producer text,
consumer_group text,
created_at timestamptz NOT NULL DEFAULT now(),
started_at timestamptz,
finished_at timestamptz,
CONSTRAINT dl_jobs_chk_positive CHECK (priority >= 0 AND attempt >= 0 AND max_attempts >= 0 AND lease_ttl_sec > 0)
);
CREATE INDEX ix_dl_jobs_claim ON dl_jobs(queue, available_at, priority, created_at)
WHERE status = 'queued';
CREATE INDEX ix_dl_jobs_running_lease ON dl_jobs(lease_expires_at)
WHERE status = 'running';
CREATE INDEX ix_dl_jobs_status_queue ON dl_jobs(status, queue);
CREATE TABLE dl_job_events (
event_id bigserial PRIMARY KEY,
job_id uuid NOT NULL REFERENCES dl_jobs(job_id) ON DELETE CASCADE,
queue text NOT NULL,
ts timestamptz NOT NULL DEFAULT now(),
kind text NOT NULL,
payload jsonb
);
CREATE OR REPLACE FUNCTION notify_job_ready() RETURNS trigger AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
PERFORM pg_notify('dl_jobs', NEW.queue);
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
IF NEW.status = 'queued' AND NEW.available_at <= now()
AND (OLD.status IS DISTINCT FROM NEW.status OR OLD.available_at IS DISTINCT FROM NEW.available_at) THEN
PERFORM pg_notify('dl_jobs', NEW.queue);
END IF;
RETURN NEW;
END IF;
RETURN NEW;
END $$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS dl_jobs_notify_ins ON dl_jobs;
CREATE TRIGGER dl_jobs_notify_ins
AFTER INSERT ON dl_jobs
FOR EACH ROW EXECUTE FUNCTION notify_job_ready();
DROP TRIGGER IF EXISTS dl_jobs_notify_upd ON dl_jobs;
CREATE TRIGGER dl_jobs_notify_upd
AFTER UPDATE OF status, available_at ON dl_jobs
FOR EACH ROW EXECUTE FUNCTION notify_job_ready();
```
---
## 5) Контракты API (v1)
* `POST /api/v1/jobs/trigger`
Вход: `{queue: str, task: str, args?: dict, idempotency_key?: str, lock_key: str, partition_key?: str, priority?: int, available_at?: RFC3339}`
Выход: `{job_id: UUID, status: str}`
Поведение: идемпотентная постановка; триггер LISTEN/NOTIFY срабатывает за счёт триггера в БД.
* `GET /api/v1/jobs/{job_id}/status`
Выход: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress: {}}`
* `POST /api/v1/jobs/{job_id}/cancel`
Выход: `{… как status …}`
Поведение: устанавливает `cancel_requested = true`. Воркер кооперативно завершает задачу между чанками.
2025-11-05 14:01:37 +01:00
Инфраструктурные эндпоинты `/health`, `/status`, мидлвар и регистрация роутов - **как в шаблоне**.
2025-11-04 23:11:41 +01:00
---
## 6) Протокол выполнения (воркер)
1. **Claim** одной задачи:
```sql
WITH cte AS (
SELECT job_id
FROM dl_jobs
WHERE status='queued' AND queue=:queue AND available_at <= now()
ORDER BY priority ASC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE dl_jobs j
SET status='running',
started_at = COALESCE(started_at, now()),
attempt = attempt + 1,
lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
heartbeat_at = now()
FROM cte
WHERE j.job_id = cte.job_id
RETURNING j.job_id, j.task, j.args, j.lock_key, j.partition_key, j.lease_ttl_sec;
```
2025-11-05 14:01:37 +01:00
Затем `SELECT pg_try_advisory_lock(hashtext(:lock_key))`. Если `false` - `backoff`:
2025-11-04 23:11:41 +01:00
```sql
UPDATE dl_jobs
SET status='queued', available_at = now() + make_interval(secs => :sec)
WHERE job_id=:jid;
```
2. **Heartbeat** раз в `DL_HEARTBEAT_SEC`:
```sql
UPDATE dl_jobs
SET heartbeat_at = now(),
lease_expires_at = now() + make_interval(secs => :ttl)
WHERE job_id = :jid AND status='running';
```
3. **Завершение**:
* Успех:
```sql
UPDATE dl_jobs
SET status='succeeded', finished_at=now(), lease_expires_at=NULL
WHERE job_id=:jid;
```
* Ошибка/ретрай:
```sql
UPDATE dl_jobs
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
available_at = CASE WHEN attempt < max_attempts THEN now() + make_interval(secs => 30 * attempt) ELSE now() END,
error = :err,
lease_expires_at = NULL,
finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
WHERE job_id=:jid;
```
Всегда выставлять/снимать advisory-lock на `lock_key`.
2025-11-05 14:01:37 +01:00
4. **Отмена**: воркер проверяет `cancel_requested` между чанками; при `true` завершает пайплайн (обычно как `canceled` либо как `failed` без ретраев - политика проекта).
2025-11-04 23:11:41 +01:00
5. **Reaper** (фон у приложения): раз в `DL_REAPER_PERIOD_SEC` возвращает «потерянные» задачи в очередь.
```sql
UPDATE dl_jobs
SET status='queued', available_at=now(), lease_expires_at=NULL
WHERE status='running'
AND lease_expires_at IS NOT NULL
AND lease_expires_at < now()
RETURNING job_id;
```
---
## 7) Оптимизация и SLA
2025-11-05 14:01:37 +01:00
* Claim - O(log N) благодаря частичному индексу `ix_dl_jobs_claim`.
* Reaper - O(log N) по индексу `ix_dl_jobs_running_lease`.
* `/health` - без БД; время ответа ≤ 20 мс. `/jobs/*` - не держат долгих транзакций.
* Гарантия доставки: **at-least-once**; операции записи в целевые таблицы - идемпотентны (реализуется в конкретных пайплайнах).
2025-11-04 23:11:41 +01:00
* Конкуренция: один `lock_key` одновременно исполняется одним воркером; параллелизм достигается независимыми `partition_key`.
---
## 8) Конфигурация (ENV)
2025-11-05 14:01:37 +01:00
* `DL_DB_DSN` - DSN Postgres (async).
* `WORKERS_JSON` - JSON-список конфигураций воркеров, напр.: `[{"queue":"load.cbr","concurrency":2},{"queue":"load.sgx","concurrency":1}]`.
2025-11-04 23:11:41 +01:00
* `DL_HEARTBEAT_SEC` (деф. 10), `DL_DEFAULT_LEASE_TTL_SEC` (деф. 60), `DL_REAPER_PERIOD_SEC` (деф. 10), `DL_CLAIM_BACKOFF_SEC` (деф. 15).
2025-11-05 14:01:37 +01:00
* Логирование, middleware, `uvicorn_logging_config` - **из шаблона без изменения контрактов**.
2025-11-04 23:11:41 +01:00
---
## 9) Эксплуатация и деплой
* Один контейнер, один Pod, **несколько async-воркеров** внутри процесса (через `WorkerManager`).
2025-11-05 14:01:37 +01:00
* Масштабирование - количеством реплик Deployment: очередь в БД, `FOR UPDATE SKIP LOCKED` и advisory-lock обеспечат корректность в гонке.
2025-11-04 23:11:41 +01:00
* Пробы: `readiness/liveness` на `/health` из `os_router.py`.
2025-11-05 14:01:37 +01:00
* Завершение: на SIGTERM - остановить reaper, подать сигнал воркерам для мягкой остановки, дождаться тасков с таймаутом.
2025-11-04 23:11:41 +01:00
---
## 10) Безопасность, аудит, наблюдаемость
2025-11-05 14:01:37 +01:00
* Структурные логи через `logger/*` шаблона; маскирование чувствительных полей - как в `logger/utils.py`.
2025-11-04 23:11:41 +01:00
* Журнал жизненного цикла в `dl_job_events` (queued/picked/heartbeat/requeue/done/failed/canceled).
2025-11-05 14:01:37 +01:00
* Метрики (BETA) - через `metric_router.py` из шаблона при необходимости.
2025-11-04 23:11:41 +01:00
---
## 11) Тест-план
* Интеграционные тесты `v1`: постановка → статус → отмена.
* E2E: постановка → claim мок-воркером → heartbeat → done → статус `succeeded`.
* Конкуренция: два воркера на один `lock_key` → один backoff, один исполняет.
* Reaper: просроченный lease → возврат в `queued`.
---
## 12) TODO
* [ ] `context.py`: инициализация engine/sessionmaker, AppContext (как в шаблоне).
* [ ] `api/v1/router.py`: `trigger`, `status`, `cancel`.
* [ ] `api/v1/service.py`: бизнес-логика поверх репозитория.
* [ ] `storage/repositories.py`: SQL для create_or_get, get_status, cancel, requeue_lost.
* [ ] `workers/base.py`: claim/heartbeat/finish/retry/cancel/advisory-lock.
* [ ] `workers/manager.py`: парсинг `WORKERS_JSON`, создание тасков, graceful shutdown.
* [ ] Тесты `tests/integration_tests/v1_api/test_service.py` по стилю шаблона.
* [ ] Документация `.env` и примеры `WORKERS_JSON`.
* [x] **БД-таблицы уже созданы** (DDL применён).