dataloader/TODO.md

81 lines
4.4 KiB
Markdown
Raw Permalink Normal View History

2025-11-04 23:34:42 +01:00
## План внедрения
1. **Шаблон + каркас пакета** - сделано
* Создать структуру из ТЗ (один пакет `dataloader/` по src-layout).
* Подтянуть `rest_template.md` артефакты: `os_router.py`, `middleware.py`, `logger/*`.
* `pyproject.toml`: `fastapi`, `uvicorn`, `pydantic-settings`, `sqlalchemy>=2, async`, `psycopg[binary,pool]` или `asyncpg`, `httpx`, `pytest`, `pytest-asyncio`, `httpx[cli]`.
* **Критерий:** `uvicorn dataloader.__main__:app` поднимается, `/health` отдаёт 200.
2. **Конфиг и контекст** - сделано
* `config.py`: `AppSettings` (DSN, тайминги, WORKERS_JSON).
* `context.py`: `AppContext`, создание `AsyncEngine` и `async_sessionmaker`, DI.
* **Критерий:** `/status` возвращает версию/uptime, движок создаётся на старте без попыток коннекта в `/health`.
3. **Хранилище очереди** - в работе
* `storage/db.py`: фабрики engine/sessionmaker.
* `storage/repositories.py`: методы
* `create_or_get(req)`,
* `get_status(job_id)`,
* `cancel(job_id)`,
* `requeue_lost(now)`,
* вспомогательные `claim_one(queue)`, `heartbeat(job_id, ttl)`, `finish_ok(job_id)`, `finish_fail_or_retry(job_id, err)`.
* Только чистый SQL (как в ТЗ), транзакция на операцию.
* **Критерий:** unit-интегра тест «поставил-прочитал-отменил» проходит.
4. **API v1**
* `api/v1/schemas.py`: `TriggerJobRequest/Response`, `JobStatusResponse`.
* `api/v1/service.py`: бизнес-слой над репозиторием.
* `api/v1/router.py`: `POST /jobs/trigger`, `GET /jobs/{id}/status`, `POST /jobs/{id}/cancel`.
* **Критерий:** ручки соответствуют контрактам, идемпотентность по `idempotency_key` работает.
5. **Базовый воркер**
* `workers/base.py`: класс `PGWorker` с циклами `listen_or_sleep → claim → advisory_lock → _pipeline → heartbeat → finish`.
* Идём строго по SQL из ТЗ: `FOR UPDATE SKIP LOCKED`, lease/heartbeat, backoff при lock.
* **Критерий:** локальный мок-пайплайн выполняется, статус `succeeded`.
6. **Менеджер воркеров**
* `workers/manager.py`: парсинг `WORKERS_JSON`, создание `asyncio.Task` на воркеры; мягкая остановка на shutdown.
* Подключение в `__main__.py` через FastAPI `on_startup/on_shutdown`.
* **Критерий:** при старте создаются нужные таски, при SIGTERM корректно гасим.
7. **Реестр пайплайнов**
* `workers/pipelines/registry.py`: `@register(task)`, `resolve(task)`.
* Пустой эталонный пайплайн (no-op, имитирует 23 чанка).
* **Критерий:** задача с `task="noop"` исполняется через реестр.
8. **Reaper**
* Фоновая async-задача в приложении: `requeue_lost` раз в `DL_REAPER_PERIOD_SEC`.
* **Критерий:** задачи с протухшим `lease_expires_at` возвращаются в `queued`.
9. **Интеграционные тесты**
* `tests/integration_tests/v1_api/test_service.py`:
* trigger → status (queued),
* воркер подхватил → status (running),
* done → status (succeeded),
* cancel во время пайплайна → корректная реакция.
* **Критерий:** тесты зелёные в CI.
10. **Dockerfile и запуск**
* Slim образ на Python 3.11/3.12, `uvicorn` entrypoint.
* ENV-пример `.env`, README с запуском.
* **Критерий:** контейнер стартует, воркеры работают, API доступно.
11. **Наблюдаемость**
* Логи в формате шаблона (структурные, маскирование).
* Простая сводка в `/status` (кол-во активных воркеров, конфиг таймингов).
* **Критерий:** видно ключевые переходы статусов и ошибки пайплайнов.