dataloader/tests/unit/test_workers_manager.py

295 lines
10 KiB
Python
Raw Permalink Normal View History

2025-11-05 13:00:41 +01:00
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, Mock, patch
2025-11-05 18:45:13 +01:00
2025-11-05 13:00:41 +01:00
import pytest
from dataloader.workers.manager import WorkerManager, WorkerSpec, build_manager_from_env
@pytest.mark.unit
class TestWorkerManager:
"""
Unit тесты для WorkerManager.
"""
def test_init_creates_manager_with_specs(self):
"""
Тест создания менеджера со спецификациями воркеров.
"""
specs = [WorkerSpec(queue="test_queue", concurrency=2)]
with patch("dataloader.workers.manager.APP_CTX") as mock_ctx:
mock_ctx.get_logger.return_value = Mock()
manager = WorkerManager(specs)
assert manager._specs == specs
assert manager._stop.is_set() is False
assert manager._tasks == []
assert manager._reaper_task is None
@pytest.mark.asyncio
async def test_start_creates_worker_tasks(self):
"""
Тест старта воркеров и создания задач.
"""
specs = [
WorkerSpec(queue="queue1", concurrency=2),
WorkerSpec(queue="queue2", concurrency=1),
]
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
):
2025-11-05 13:00:41 +01:00
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
manager = WorkerManager(specs)
await manager.start()
assert len(manager._tasks) == 3
assert manager._reaper_task is not None
assert mock_worker_cls.call_count == 3
await manager.stop()
@pytest.mark.asyncio
async def test_start_with_zero_concurrency_creates_one_worker(self):
"""
Тест, что concurrency=0 создаёт минимум 1 воркер.
"""
specs = [WorkerSpec(queue="test", concurrency=0)]
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
):
2025-11-05 13:00:41 +01:00
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
manager = WorkerManager(specs)
await manager.start()
assert len(manager._tasks) == 1
await manager.stop()
@pytest.mark.asyncio
async def test_stop_cancels_all_tasks(self):
"""
Тест остановки всех задач воркеров.
"""
specs = [WorkerSpec(queue="test", concurrency=2)]
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
):
2025-11-05 13:00:41 +01:00
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
manager = WorkerManager(specs)
await manager.start()
await manager.stop()
assert manager._stop.is_set()
assert len(manager._tasks) == 0
assert manager._reaper_task is None
@pytest.mark.asyncio
async def test_reaper_loop_calls_requeue_lost(self):
"""
Тест, что реапер вызывает requeue_lost.
"""
specs = [WorkerSpec(queue="test", concurrency=1)]
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
patch("dataloader.workers.manager.requeue_lost") as mock_requeue,
):
2025-11-05 13:00:41 +01:00
mock_logger = Mock()
mock_ctx.get_logger.return_value = mock_logger
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_cfg.worker.reaper_period_sec = 1
mock_session = AsyncMock()
mock_sm = MagicMock()
mock_sm.return_value.__aenter__.return_value = mock_session
mock_ctx.sessionmaker = mock_sm
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
mock_requeue.return_value = ["job1", "job2"]
manager = WorkerManager(specs)
await manager.start()
await asyncio.sleep(1.5)
await manager.stop()
assert mock_requeue.call_count >= 1
@pytest.mark.asyncio
async def test_reaper_loop_handles_exceptions(self):
"""
Тест, что реапер обрабатывает исключения и продолжает работу.
"""
specs = [WorkerSpec(queue="test", concurrency=1)]
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
patch("dataloader.workers.manager.PGWorker") as mock_worker_cls,
patch("dataloader.workers.manager.requeue_lost") as mock_requeue,
):
2025-11-05 13:00:41 +01:00
mock_logger = Mock()
mock_ctx.get_logger.return_value = mock_logger
mock_cfg.worker.heartbeat_sec = 10
mock_cfg.worker.claim_backoff_sec = 5
mock_cfg.worker.reaper_period_sec = 0.5
mock_session = AsyncMock()
mock_sm = MagicMock()
mock_sm.return_value.__aenter__.return_value = mock_session
mock_ctx.sessionmaker = mock_sm
mock_worker_instance = Mock()
mock_worker_instance.run = AsyncMock()
mock_worker_cls.return_value = mock_worker_instance
mock_requeue.side_effect = [Exception("DB error"), []]
manager = WorkerManager(specs)
await manager.start()
await asyncio.sleep(1.2)
await manager.stop()
assert mock_logger.exception.call_count >= 1
@pytest.mark.unit
class TestBuildManagerFromEnv:
"""
Unit тесты для build_manager_from_env.
"""
def test_builds_manager_from_config(self):
"""
Тест создания менеджера из конфигурации.
"""
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
):
2025-11-05 13:00:41 +01:00
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.parsed_workers.return_value = [
{"queue": "queue1", "concurrency": 2},
{"queue": "queue2", "concurrency": 3},
]
manager = build_manager_from_env()
assert len(manager._specs) == 2
assert manager._specs[0].queue == "queue1"
assert manager._specs[0].concurrency == 2
assert manager._specs[1].queue == "queue2"
assert manager._specs[1].concurrency == 3
def test_skips_empty_queue_names(self):
"""
Тест, что пустые имена очередей пропускаются.
"""
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
):
2025-11-05 13:00:41 +01:00
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.parsed_workers.return_value = [
{"queue": "", "concurrency": 2},
{"queue": "valid_queue", "concurrency": 1},
{"queue": " ", "concurrency": 3},
]
manager = build_manager_from_env()
assert len(manager._specs) == 1
assert manager._specs[0].queue == "valid_queue"
def test_handles_missing_fields_with_defaults(self):
"""
Тест обработки отсутствующих полей с дефолтными значениями.
"""
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
):
2025-11-05 13:00:41 +01:00
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.parsed_workers.return_value = [
{"queue": "test"},
{"queue": "test2", "concurrency": 0},
]
manager = build_manager_from_env()
assert len(manager._specs) == 2
assert manager._specs[0].concurrency == 1
assert manager._specs[1].concurrency == 1
def test_ensures_minimum_concurrency_of_one(self):
"""
Тест, что concurrency всегда минимум 1.
"""
2025-11-05 18:45:13 +01:00
with (
patch("dataloader.workers.manager.APP_CTX") as mock_ctx,
patch("dataloader.workers.manager.APP_CONFIG") as mock_cfg,
):
2025-11-05 13:00:41 +01:00
mock_ctx.get_logger.return_value = Mock()
mock_cfg.worker.parsed_workers.return_value = [
{"queue": "test1", "concurrency": 0},
{"queue": "test2", "concurrency": -5},
]
manager = build_manager_from_env()
assert len(manager._specs) == 2
assert manager._specs[0].concurrency == 1
assert manager._specs[1].concurrency == 1