215 lines
7.2 KiB
Python
215 lines
7.2 KiB
Python
|
import asyncio
|
|||
|
import signal
|
|||
|
import time
|
|||
|
|
|||
|
import structlog
|
|||
|
|
|||
|
from .models import AppConfig, ProcessingStats, SimplifyCommand
|
|||
|
from .services import SimplifyService
|
|||
|
from .sources import FileSource
|
|||
|
|
|||
|
logger = structlog.get_logger()
|
|||
|
|
|||
|
|
|||
|
class AsyncRunner:
|
|||
|
def __init__(
|
|||
|
self,
|
|||
|
config: AppConfig,
|
|||
|
simplify_service: SimplifyService,
|
|||
|
max_workers: int = 10,
|
|||
|
) -> None:
|
|||
|
self.config = config
|
|||
|
self.simplify_service = simplify_service
|
|||
|
self.max_workers = max_workers
|
|||
|
|
|||
|
self._task_queue: asyncio.Queue[SimplifyCommand] = asyncio.Queue()
|
|||
|
self._workers: list[asyncio.Task[None]] = []
|
|||
|
self._shutdown_event = asyncio.Event()
|
|||
|
|
|||
|
self.stats = ProcessingStats()
|
|||
|
self._start_time: float | None = None
|
|||
|
|
|||
|
self.logger = structlog.get_logger().bind(service="runner")
|
|||
|
|
|||
|
async def run_from_file(
|
|||
|
self,
|
|||
|
input_file: str,
|
|||
|
force_reprocess: bool = False,
|
|||
|
max_articles: int | None = None,
|
|||
|
) -> ProcessingStats:
|
|||
|
self.logger.info(
|
|||
|
"Запуск обработки статей из файла",
|
|||
|
input_file=input_file,
|
|||
|
force_reprocess=force_reprocess,
|
|||
|
max_workers=self.max_workers,
|
|||
|
max_articles=max_articles,
|
|||
|
)
|
|||
|
|
|||
|
self._setup_signal_handlers()
|
|||
|
|
|||
|
try:
|
|||
|
source = FileSource(input_file)
|
|||
|
await self._load_tasks_from_source(source, force_reprocess, max_articles)
|
|||
|
|
|||
|
await self._run_processing()
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
self.logger.error("Ошибка при выполнении runner", error=str(e))
|
|||
|
raise
|
|||
|
finally:
|
|||
|
await self._cleanup()
|
|||
|
|
|||
|
return self.stats
|
|||
|
|
|||
|
async def _load_tasks_from_source(
|
|||
|
self,
|
|||
|
source: FileSource,
|
|||
|
force_reprocess: bool,
|
|||
|
max_articles: int | None,
|
|||
|
) -> None:
|
|||
|
loaded_count = 0
|
|||
|
|
|||
|
async for command in source.read_urls(force_reprocess):
|
|||
|
if max_articles and loaded_count >= max_articles:
|
|||
|
break
|
|||
|
|
|||
|
await self._task_queue.put(command)
|
|||
|
loaded_count += 1
|
|||
|
|
|||
|
self.logger.info("Задачи загружены в очередь", count=loaded_count)
|
|||
|
|
|||
|
async def _run_processing(self) -> None:
|
|||
|
self._start_time = time.time()
|
|||
|
|
|||
|
self.logger.info("Запуск worker корутин", count=self.max_workers)
|
|||
|
|
|||
|
for i in range(self.max_workers):
|
|||
|
worker = asyncio.create_task(self._worker_loop(worker_id=i))
|
|||
|
self._workers.append(worker)
|
|||
|
|
|||
|
await self._task_queue.join()
|
|||
|
|
|||
|
self._shutdown_event.set()
|
|||
|
|
|||
|
if self._workers:
|
|||
|
await asyncio.gather(*self._workers, return_exceptions=True)
|
|||
|
|
|||
|
async def _worker_loop(self, worker_id: int) -> None:
|
|||
|
worker_logger = self.logger.bind(worker_id=worker_id)
|
|||
|
worker_logger.info("Worker запущен")
|
|||
|
|
|||
|
processed_count = 0
|
|||
|
|
|||
|
while not self._shutdown_event.is_set():
|
|||
|
try:
|
|||
|
try:
|
|||
|
command = await asyncio.wait_for(
|
|||
|
self._task_queue.get(),
|
|||
|
timeout=1.0,
|
|||
|
)
|
|||
|
except asyncio.TimeoutError:
|
|||
|
continue
|
|||
|
|
|||
|
try:
|
|||
|
result = await self.simplify_service.process_command(command)
|
|||
|
|
|||
|
self.stats.add_result(result)
|
|||
|
processed_count += 1
|
|||
|
|
|||
|
if result.success:
|
|||
|
worker_logger.info(
|
|||
|
"Статья обработана успешно",
|
|||
|
url=command.url,
|
|||
|
title=result.title,
|
|||
|
tokens_in=result.token_count_raw,
|
|||
|
tokens_out=result.token_count_simplified,
|
|||
|
)
|
|||
|
else:
|
|||
|
worker_logger.warning(
|
|||
|
"Ошибка при обработке статьи",
|
|||
|
url=command.url,
|
|||
|
error=result.error_message,
|
|||
|
)
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
worker_logger.error(
|
|||
|
"Неожиданная ошибка в worker",
|
|||
|
url=command.url,
|
|||
|
error=str(e),
|
|||
|
)
|
|||
|
|
|||
|
from .models import ProcessingResult
|
|||
|
|
|||
|
error_result = ProcessingResult.failure_result(
|
|||
|
command.url,
|
|||
|
f"Неожиданная ошибка: {e!s}",
|
|||
|
)
|
|||
|
self.stats.add_result(error_result)
|
|||
|
|
|||
|
finally:
|
|||
|
self._task_queue.task_done()
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
worker_logger.error("Критическая ошибка в worker loop", error=str(e))
|
|||
|
break
|
|||
|
|
|||
|
worker_logger.info("Worker завершён", processed_articles=processed_count)
|
|||
|
|
|||
|
def _setup_signal_handlers(self) -> None:
|
|||
|
def signal_handler(signum: int, frame: None) -> None:
|
|||
|
signal_name = signal.Signals(signum).name
|
|||
|
self.logger.info(f"Получен сигнал {signal_name}, начинаем graceful shutdown")
|
|||
|
self._shutdown_event.set()
|
|||
|
|
|||
|
try:
|
|||
|
signal.signal(signal.SIGINT, signal_handler)
|
|||
|
signal.signal(signal.SIGTERM, signal_handler)
|
|||
|
except ValueError:
|
|||
|
self.logger.warning("Не удалось настроить обработчики сигналов")
|
|||
|
|
|||
|
async def _cleanup(self) -> None:
|
|||
|
self.logger.info("Начинаем очистку ресурсов")
|
|||
|
|
|||
|
for worker in self._workers:
|
|||
|
if not worker.done():
|
|||
|
worker.cancel()
|
|||
|
|
|||
|
if self._workers:
|
|||
|
results = await asyncio.gather(*self._workers, return_exceptions=True)
|
|||
|
cancelled_count = sum(1 for r in results if isinstance(r, asyncio.CancelledError))
|
|||
|
if cancelled_count > 0:
|
|||
|
self.logger.info("Workers отменены", count=cancelled_count)
|
|||
|
|
|||
|
self._workers.clear()
|
|||
|
|
|||
|
def get_progress_info(self) -> dict[str, any]:
|
|||
|
elapsed_time = time.time() - self._start_time if self._start_time else 0
|
|||
|
|
|||
|
articles_per_minute = 0
|
|||
|
if elapsed_time > 0:
|
|||
|
articles_per_minute = (self.stats.successful * 60) / elapsed_time
|
|||
|
|
|||
|
return {
|
|||
|
"total_processed": self.stats.total_processed,
|
|||
|
"successful": self.stats.successful,
|
|||
|
"failed": self.stats.failed,
|
|||
|
"success_rate": self.stats.success_rate,
|
|||
|
"elapsed_time": elapsed_time,
|
|||
|
"articles_per_minute": articles_per_minute,
|
|||
|
"queue_size": self._task_queue.qsize(),
|
|||
|
"active_workers": len([w for w in self._workers if not w.done()]),
|
|||
|
}
|
|||
|
|
|||
|
async def health_check(self) -> dict[str, any]:
|
|||
|
checks = await self.simplify_service.health_check()
|
|||
|
|
|||
|
checks.update(
|
|||
|
{
|
|||
|
"runner_active": bool(self._workers and not self._shutdown_event.is_set()),
|
|||
|
"queue_size": self._task_queue.qsize(),
|
|||
|
"workers_count": len(self._workers),
|
|||
|
}
|
|||
|
)
|
|||
|
|
|||
|
return checks
|