dataloader/tests/integration_tests/test_queue_repository.py

690 lines
20 KiB
Python
Raw Permalink Normal View History

2025-11-05 12:41:56 +01:00
from __future__ import annotations
2025-11-05 18:45:13 +01:00
from datetime import datetime, timedelta, timezone
2025-11-05 16:49:12 +01:00
from uuid import uuid4
2025-11-05 12:41:56 +01:00
import pytest
2025-11-05 16:49:12 +01:00
from sqlalchemy import select
2025-11-05 12:41:56 +01:00
from sqlalchemy.ext.asyncio import AsyncSession
2025-11-05 16:49:12 +01:00
from dataloader.storage.models import DLJob
2025-11-05 12:41:56 +01:00
from dataloader.storage.repositories import QueueRepository
2025-11-05 18:45:13 +01:00
from dataloader.storage.schemas import CreateJobRequest
2025-11-05 12:41:56 +01:00
@pytest.mark.integration
class TestQueueRepository:
"""
Интеграционные тесты для QueueRepository.
"""
async def test_create_or_get_creates_new_job(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест создания новой задачи в очереди.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"param": "value"},
idempotency_key="test_key_1",
lock_key="test_lock_1",
partition_key="part1",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer="test_producer",
consumer_group="test_group",
)
created_id, status = await repo.create_or_get(req)
assert created_id == job_id
assert status == "queued"
async def test_create_or_get_returns_existing_job(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест идемпотентности - повторный вызов возвращает существующую задачу.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key="idempotent_key_1",
lock_key="lock1",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
created_id_1, status_1 = await repo.create_or_get(req)
req_2 = CreateJobRequest(
job_id="different_job_id",
queue="different_queue",
task="different_task",
args={},
idempotency_key="idempotent_key_1",
lock_key="lock2",
partition_key="",
priority=200,
available_at=datetime.now(timezone.utc),
max_attempts=3,
lease_ttl_sec=30,
producer=None,
consumer_group=None,
)
created_id_2, status_2 = await repo.create_or_get(req_2)
assert created_id_1 == created_id_2 == job_id
assert status_1 == status_2 == "queued"
async def test_get_status_returns_job_status(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест получения статуса задачи.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"key": "val"},
idempotency_key=None,
lock_key="lock",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
status = await repo.get_status(job_id)
assert status is not None
assert status.job_id == job_id
assert status.status == "queued"
assert status.attempt == 0
assert status.error is None
async def test_get_status_returns_none_for_nonexistent_job(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Тест получения статуса несуществующей задачи.
"""
repo = QueueRepository(db_session)
status = await repo.get_status("00000000-0000-0000-0000-000000000000")
assert status is None
async def test_cancel_sets_cancel_requested_flag(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест установки флага отмены.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
result = await repo.cancel(job_id)
assert result is True
status = await repo.get_status(job_id)
assert status is not None
async def test_claim_one_returns_job_for_processing(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест захвата задачи для обработки.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"data": "test"},
idempotency_key=None,
lock_key="lock_claim",
partition_key="partition1",
priority=50,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=120,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
assert claimed is not None
assert claimed["job_id"] == job_id
assert claimed["queue"] == queue_name
assert claimed["task"] == task_name
assert claimed["args"] == {"data": "test"}
assert claimed["lock_key"] == "lock_claim"
assert claimed["attempt"] == 1
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "running"
assert status.attempt == 1
async def test_claim_one_returns_none_when_no_jobs(
self,
db_session: AsyncSession,
clean_queue_tables,
queue_name: str,
):
"""
Тест захвата при пустой очереди.
"""
repo = QueueRepository(db_session)
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
assert claimed is None
async def test_heartbeat_updates_lease(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест обновления heartbeat и продления lease.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_hb",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
success, cancel_requested = await repo.heartbeat(job_id, ttl_sec=90)
assert success is True
assert cancel_requested is False
async def test_finish_ok_marks_job_succeeded(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест успешного завершения задачи.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_finish",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
await repo.finish_ok(job_id)
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "succeeded"
assert status.finished_at is not None
async def test_finish_fail_or_retry_requeues_on_retry(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест повторной постановки при ошибке с возможностью retry.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_retry",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=3,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
await repo.finish_fail_or_retry(job_id, err="Test error")
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "queued"
assert status.error == "Test error"
assert status.attempt == 1
async def test_finish_fail_or_retry_marks_failed_when_max_attempts_reached(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест финальной ошибки при достижении max_attempts.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_fail",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=1,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
await repo.finish_fail_or_retry(job_id, err="Final error")
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "failed"
assert status.error == "Final error"
assert status.finished_at is not None
async def test_requeue_lost_returns_expired_jobs(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Тест reaper - возврат протухших задач в очередь.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock_lost",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=1,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=15)
import asyncio
2025-11-05 18:45:13 +01:00
2025-11-05 12:41:56 +01:00
await asyncio.sleep(2)
requeued = await repo.requeue_lost()
assert job_id in requeued
status = await repo.get_status(job_id)
assert status is not None
assert status.status == "queued"
2025-11-05 16:49:12 +01:00
async def test_claim_one_fails_on_advisory_lock_and_sets_backoff(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Проверка ветки отказа advisory-lock: задача возвращается в queued с отложенным available_at.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={"k": "v"},
idempotency_key=None,
lock_key="lock-fail-adv",
partition_key="",
priority=10,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=30,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
async def _false_lock(_: str) -> bool:
return False
repo._try_advisory_lock = _false_lock # type: ignore[method-assign]
before = datetime.now(timezone.utc)
claimed = await repo.claim_one(queue_name, claim_backoff_sec=15)
after = datetime.now(timezone.utc)
assert claimed is None
st = await repo.get_status(job_id)
assert st is not None
assert st.status == "queued"
2025-11-05 18:45:13 +01:00
row = (
await db_session.execute(select(DLJob).where(DLJob.job_id == job_id))
).scalar_one()
2025-11-05 16:49:12 +01:00
assert row.available_at >= before + timedelta(seconds=15)
assert row.available_at <= after + timedelta(seconds=60)
async def test_heartbeat_when_not_running_returns_false(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Heartbeat для нерunning задачи возвращает (False, False).
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-hb-not-running",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
ok, cancel = await repo.heartbeat(job_id, ttl_sec=30)
assert ok is False
assert cancel is False
async def test_finish_fail_or_retry_marks_canceled_branch(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Ветка is_canceled=True помечает задачу как canceled и завершает её.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-cancel",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=60,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=5)
2025-11-05 18:45:13 +01:00
await repo.finish_fail_or_retry(
job_id, err="Canceled by test", is_canceled=True
)
2025-11-05 16:49:12 +01:00
st = await repo.get_status(job_id)
assert st is not None
assert st.status == "canceled"
assert st.error == "Canceled by test"
assert st.finished_at is not None
async def test_requeue_lost_no_expired_returns_empty(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
requeue_lost без протухших задач возвращает пустой список.
"""
repo = QueueRepository(db_session)
req = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-none-expired",
partition_key="",
priority=100,
available_at=datetime.now(timezone.utc),
max_attempts=5,
lease_ttl_sec=120,
producer=None,
consumer_group=None,
)
await repo.create_or_get(req)
await repo.claim_one(queue_name, claim_backoff_sec=5)
res = await repo.requeue_lost(now=datetime.now(timezone.utc))
assert res == []
st = await repo.get_status(job_id)
assert st is not None
assert st.status == "running"
async def test_private_helpers_resolve_queue_and_advisory_unlock_are_executable(
self,
db_session: AsyncSession,
clean_queue_tables,
job_id: str,
queue_name: str,
task_name: str,
):
"""
Прямые прогоны приватных методов для покрытия редких веток.
"""
repo = QueueRepository(db_session)
rq = CreateJobRequest(
job_id=job_id,
queue=queue_name,
task=task_name,
args={},
idempotency_key=None,
lock_key="lock-direct-unlock",
partition_key="",
priority=1,
available_at=datetime.now(timezone.utc),
max_attempts=1,
lease_ttl_sec=5,
producer=None,
consumer_group=None,
)
await repo.create_or_get(rq)
missing_uuid = str(uuid4())
qname = await repo._resolve_queue(missing_uuid) # type: ignore[attr-defined]
assert qname == ""
await repo._advisory_unlock("lock-direct-unlock") # type: ignore[attr-defined]
async def test_cancel_returns_false_for_nonexistent_job(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Возвращает False при отмене несуществующей задачи.
"""
repo = QueueRepository(db_session)
assert await repo.cancel(str(uuid4())) is False
async def test_finish_ok_silent_when_job_absent(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Тихо завершается, если задача не найдена.
"""
repo = QueueRepository(db_session)
await repo.finish_ok(str(uuid4()))
async def test_finish_fail_or_retry_noop_when_job_absent(
self,
db_session: AsyncSession,
clean_queue_tables,
):
"""
Тихо выходит при отсутствии задачи.
"""
repo = QueueRepository(db_session)
await repo.finish_fail_or_retry(str(uuid4()), err="no-op")