love-bot/bot/middlewares/auth_middleware.py

78 lines
3.2 KiB
Python
Raw Normal View History

2025-04-28 14:52:32 +02:00
import logging
from typing import Callable, Dict, Any, Awaitable, Set
from aiogram import BaseMiddleware
from aiogram.types import Message, TelegramObject, User as AiogramUser
from bot.config import settings
from bot.database.db import get_all_users, add_user, get_user_by_telegram_id
from bot.database.models import User as DbUser
authenticated_user_ids: Set[int] = set()
initial_users_loaded = False
async def load_initial_users():
"""Loads existing user IDs from the DB into memory on startup."""
global authenticated_user_ids, initial_users_loaded
if not initial_users_loaded:
logging.info("Loading initial users from database...")
existing_users = await get_all_users()
for user in existing_users:
authenticated_user_ids.add(user.telegram_id)
if len(authenticated_user_ids) >= 2:
break
initial_users_loaded = True
logging.info(f"Loaded {len(authenticated_user_ids)} users: {authenticated_user_ids}")
class AuthMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
if not initial_users_loaded:
await load_initial_users()
if not isinstance(event, Message):
return await handler(event, data)
aiogram_user: AiogramUser = data.get('event_from_user')
if not aiogram_user:
return await handler(event, data)
telegram_id = aiogram_user.id
if telegram_id in authenticated_user_ids:
db_user = await get_user_by_telegram_id(telegram_id)
if db_user:
data['db_user'] = db_user
return await handler(event, data)
else:
logging.warning(f"User {telegram_id} is in authenticated_user_ids but not found in DB.")
return
if event.text and event.text == settings.secret_password.get_secret_value():
if len(authenticated_user_ids) < 2:
db_user = await add_user(telegram_id, aiogram_user.username)
if db_user:
authenticated_user_ids.add(telegram_id)
data['db_user'] = db_user
logging.info(f"User {telegram_id} ({aiogram_user.username}) authenticated successfully. Total users: {len(authenticated_user_ids)}")
return
else:
logging.error(f"Failed to add user {telegram_id} to DB after password check.")
await event.answer("Произошла ошибка при регистрации. Попробуйте позже.")
return
else:
logging.info(f"Authentication attempt blocked for user {telegram_id}. Limit of 2 users reached.")
await event.answer("Извините, бот уже используется двумя пользователями.")
return
else:
if event.text != settings.secret_password.get_secret_value():
await event.answer("Для начала работы введите секретный пароль.")
return