dataloader/tests/unit/test_api_service.py

367 lines
12 KiB
Python
Raw Permalink Normal View History

2025-11-05 13:00:41 +01:00
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock, Mock, patch
2025-11-05 18:45:13 +01:00
from uuid import UUID
2025-11-05 13:00:41 +01:00
import pytest
from dataloader.api.v1.schemas import TriggerJobRequest
2025-11-05 18:45:13 +01:00
from dataloader.api.v1.service import JobsService
2025-11-05 13:00:41 +01:00
from dataloader.storage.schemas import JobStatus
@pytest.mark.unit
class TestJobsService:
"""
Unit тесты для JobsService.
"""
def test_init_creates_service_with_session(self):
"""
Тест создания сервиса с сессией.
"""
mock_session = AsyncMock()
with patch("dataloader.api.v1.service.get_logger") as mock_get_logger:
mock_get_logger.return_value = Mock()
service = JobsService(mock_session)
assert service._s == mock_session
assert service._repo is not None
@pytest.mark.asyncio
async def test_trigger_creates_new_job(self):
"""
Тест создания новой задачи через trigger.
"""
mock_session = AsyncMock()
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id,
):
2025-11-05 13:00:41 +01:00
mock_get_logger.return_value = Mock()
mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678")
mock_repo = Mock()
2025-11-05 18:45:13 +01:00
mock_repo.create_or_get = AsyncMock(
return_value=("12345678-1234-5678-1234-567812345678", "queued")
)
2025-11-05 13:00:41 +01:00
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
req = TriggerJobRequest(
queue="test_queue",
task="test.task",
args={"key": "value"},
lock_key="lock_1",
priority=100,
max_attempts=5,
2025-11-05 18:45:13 +01:00
lease_ttl_sec=60,
2025-11-05 13:00:41 +01:00
)
response = await service.trigger(req)
assert response.job_id == UUID("12345678-1234-5678-1234-567812345678")
assert response.status == "queued"
mock_repo.create_or_get.assert_called_once()
@pytest.mark.asyncio
async def test_trigger_with_idempotency_key(self):
"""
Тест создания задачи с idempotency_key.
"""
mock_session = AsyncMock()
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id,
):
2025-11-05 13:00:41 +01:00
mock_get_logger.return_value = Mock()
mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678")
mock_repo = Mock()
2025-11-05 18:45:13 +01:00
mock_repo.create_or_get = AsyncMock(
return_value=("12345678-1234-5678-1234-567812345678", "queued")
)
2025-11-05 13:00:41 +01:00
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
req = TriggerJobRequest(
queue="test_queue",
task="test.task",
args={},
idempotency_key="unique_key_123",
lock_key="lock_1",
priority=100,
max_attempts=5,
2025-11-05 18:45:13 +01:00
lease_ttl_sec=60,
2025-11-05 13:00:41 +01:00
)
response = await service.trigger(req)
assert response.status == "queued"
call_args = mock_repo.create_or_get.call_args[0][0]
assert call_args.idempotency_key == "unique_key_123"
@pytest.mark.asyncio
async def test_trigger_with_available_at(self):
"""
Тест создания задачи с отложенным запуском.
"""
mock_session = AsyncMock()
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id,
):
2025-11-05 13:00:41 +01:00
mock_get_logger.return_value = Mock()
mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678")
mock_repo = Mock()
2025-11-05 18:45:13 +01:00
mock_repo.create_or_get = AsyncMock(
return_value=("12345678-1234-5678-1234-567812345678", "queued")
)
2025-11-05 13:00:41 +01:00
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
future_time = datetime(2025, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
req = TriggerJobRequest(
queue="test_queue",
task="test.task",
args={},
lock_key="lock_1",
available_at=future_time,
priority=100,
max_attempts=5,
2025-11-05 18:45:13 +01:00
lease_ttl_sec=60,
2025-11-05 13:00:41 +01:00
)
2025-11-05 18:45:13 +01:00
await service.trigger(req)
2025-11-05 13:00:41 +01:00
call_args = mock_repo.create_or_get.call_args[0][0]
assert call_args.available_at == future_time
@pytest.mark.asyncio
async def test_trigger_with_optional_fields(self):
"""
Тест создания задачи с опциональными полями.
"""
mock_session = AsyncMock()
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
patch("dataloader.api.v1.service.new_job_id") as mock_new_job_id,
):
2025-11-05 13:00:41 +01:00
mock_get_logger.return_value = Mock()
mock_new_job_id.return_value = UUID("12345678-1234-5678-1234-567812345678")
mock_repo = Mock()
2025-11-05 18:45:13 +01:00
mock_repo.create_or_get = AsyncMock(
return_value=("12345678-1234-5678-1234-567812345678", "queued")
)
2025-11-05 13:00:41 +01:00
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
req = TriggerJobRequest(
queue="test_queue",
task="test.task",
args={},
lock_key="lock_1",
partition_key="partition_1",
producer="test_producer",
consumer_group="test_group",
priority=100,
max_attempts=5,
2025-11-05 18:45:13 +01:00
lease_ttl_sec=60,
2025-11-05 13:00:41 +01:00
)
2025-11-05 18:45:13 +01:00
await service.trigger(req)
2025-11-05 13:00:41 +01:00
call_args = mock_repo.create_or_get.call_args[0][0]
assert call_args.partition_key == "partition_1"
assert call_args.producer == "test_producer"
assert call_args.consumer_group == "test_group"
@pytest.mark.asyncio
async def test_status_returns_job_status(self):
"""
Тест получения статуса существующей задачи.
"""
mock_session = AsyncMock()
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
2025-11-05 13:00:41 +01:00
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_status = JobStatus(
job_id="12345678-1234-5678-1234-567812345678",
status="running",
attempt=1,
started_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
finished_at=None,
heartbeat_at=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc),
error=None,
2025-11-05 18:45:13 +01:00
progress={"step": 1},
2025-11-05 13:00:41 +01:00
)
mock_repo.get_status = AsyncMock(return_value=mock_status)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("12345678-1234-5678-1234-567812345678")
response = await service.status(job_id)
assert response is not None
assert response.job_id == job_id
assert response.status == "running"
assert response.attempt == 1
assert response.progress == {"step": 1}
@pytest.mark.asyncio
async def test_status_returns_none_for_nonexistent_job(self):
"""
Тест получения статуса несуществующей задачи.
"""
mock_session = AsyncMock()
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
2025-11-05 13:00:41 +01:00
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_repo.get_status = AsyncMock(return_value=None)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("00000000-0000-0000-0000-000000000000")
response = await service.status(job_id)
assert response is None
@pytest.mark.asyncio
async def test_cancel_cancels_job_and_returns_status(self):
"""
Тест отмены задачи и получения её статуса.
"""
mock_session = AsyncMock()
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
2025-11-05 13:00:41 +01:00
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_repo.cancel = AsyncMock()
mock_status = JobStatus(
job_id="12345678-1234-5678-1234-567812345678",
status="running",
attempt=1,
started_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
finished_at=None,
heartbeat_at=datetime(2025, 1, 1, 12, 5, 0, tzinfo=timezone.utc),
error=None,
2025-11-05 18:45:13 +01:00
progress={},
2025-11-05 13:00:41 +01:00
)
mock_repo.get_status = AsyncMock(return_value=mock_status)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("12345678-1234-5678-1234-567812345678")
response = await service.cancel(job_id)
assert response is not None
assert response.job_id == job_id
assert response.status == "running"
mock_repo.cancel.assert_called_once_with(str(job_id))
@pytest.mark.asyncio
async def test_cancel_returns_none_for_nonexistent_job(self):
"""
Тест отмены несуществующей задачи.
"""
mock_session = AsyncMock()
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
2025-11-05 13:00:41 +01:00
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_repo.cancel = AsyncMock()
mock_repo.get_status = AsyncMock(return_value=None)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("00000000-0000-0000-0000-000000000000")
response = await service.cancel(job_id)
assert response is None
mock_repo.cancel.assert_called_once()
@pytest.mark.asyncio
async def test_status_handles_empty_progress(self):
"""
Тест обработки None progress.
"""
mock_session = AsyncMock()
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.api.v1.service.get_logger") as mock_get_logger,
patch("dataloader.api.v1.service.QueueRepository") as mock_repo_cls,
):
2025-11-05 13:00:41 +01:00
mock_get_logger.return_value = Mock()
mock_repo = Mock()
mock_status = JobStatus(
job_id="12345678-1234-5678-1234-567812345678",
status="queued",
attempt=0,
started_at=None,
finished_at=None,
heartbeat_at=None,
error=None,
2025-11-05 18:45:13 +01:00
progress=None,
2025-11-05 13:00:41 +01:00
)
mock_repo.get_status = AsyncMock(return_value=mock_status)
mock_repo_cls.return_value = mock_repo
service = JobsService(mock_session)
job_id = UUID("12345678-1234-5678-1234-567812345678")
response = await service.status(job_id)
assert response is not None
assert response.progress == {}