itcloud/SECURITY_AND_IMPROVEMENTS_A...

2361 lines
72 KiB
Markdown
Raw Permalink Normal View History

2026-01-05 18:20:34 +01:00
# ITCloud - Полный Аудит Безопасности и Рекомендации
**Дата:** 2026-01-05
**Аудитор:** Claude Code (Senior Security & Architecture Review)
**Версия:** 1.0
---
## 📋 EXECUTIVE SUMMARY
Проведен комплексный аудит проекта ITCloud (облачное хранилище фото/видео). Проект демонстрирует хорошую архитектуру (Clean Architecture) и некоторые правильные практики безопасности, но имеет **критические уязвимости** и **отсутствуют ключевые функции**.
### Статус готовности к продакшену: ❌ НЕ ГОТОВ
**Критические блокеры:**
1. ❌ Отсутствует rate limiting (защита от brute force)
2. ❌ Нет soft delete/корзины (указано в требованиях)
3. ❌ Отсутствует управление квотами хранилища
4. ❌ Нет механизма отзыва токенов
5. ❌ Отсутствует стратегия бэкапов БД
6. ❌ Минимальное покрытие тестами (<10%)
**Оценка времени до готовности к продакшену: 6-8 недель**
---
## 🔴 1. КРИТИЧЕСКИЕ УЯЗВИМОСТИ БЕЗОПАСНОСТИ
### 1.1 Отсутствует Rate Limiting
**Серьезность: КРИТИЧЕСКАЯ** 🔴
**Расположение:** Все API endpoints
**Файлы:** `backend/src/app/main.py`
**Проблема:**
- Нет ограничений на количество запросов
- Атакующий может делать неограниченное количество попыток логина
- Возможна DoS атака через перегрузку API
**Риски:**
- Brute force атаки на пароли пользователей
- Credential stuffing атаки
- Исчерпание ресурсов сервера
- Спам загрузок файлов
**Решение:**
```python
# Установить: pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Для каждого endpoint:
@router.post("/login")
@limiter.limit("5/minute") # 5 попыток в минуту
async def login(...):
...
@router.post("/uploads/create")
@limiter.limit("100/hour") # 100 загрузок в час
async def create_upload(...):
...
```
**Рекомендуемые лимиты:**
- `/auth/login`: 5 запросов/минуту
- `/auth/register`: 3 запроса/час
- `/uploads/create`: 100 запросов/час
- Остальные endpoints: 1000 запросов/час
---
### 1.2 JWT Secret по умолчанию слабый
**Серьезность: КРИТИЧЕСКАЯ** 🔴
**Расположение:** `backend/.env.example:18`
**Проблема:**
```env
JWT_SECRET=your-secret-key-change-this-in-production
```
Если пользователь задеплоит с дефолтным секретом, токены можно подделать.
**Решение:**
1. Добавить валидацию при старте:
```python
# В config.py
if settings.APP_ENV == "prod" and settings.JWT_SECRET in [
"your-secret-key-change-this-in-production",
"changeme",
"secret",
]:
raise ValueError("КРИТИЧЕСКАЯ ОШИБКА: Используется слабый JWT_SECRET в продакшене!")
```
2. Генерировать случайный секрет при деплое:
```python
import secrets
print(secrets.token_urlsafe(64))
```
---
### 1.3 Отсутствует ограничение размера при чтении файла в память
**Серьезность: ВЫСОКАЯ** 🔴
**Расположение:** `backend/src/app/services/asset_service.py:138-182`
**Проблема:**
```python
# Строка 151
file_data = await file.read() # Читает ВЕСЬ файл в память!
```
Атакующий может загрузить огромный файл и вызвать OOM (Out Of Memory).
**Решение:**
Стримить файл чанками в S3:
```python
async def upload_file_to_s3(
self,
user_id: str,
asset_id: str,
file: UploadFile,
) -> Asset:
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(status_code=404, detail="Asset not found")
# Проверить размер ПЕРЕД чтением
if file.size and file.size > self.config.max_upload_size_bytes:
raise HTTPException(status_code=413, detail="File too large")
# Стрим в S3 чанками
try:
await self.s3_client.upload_fileobj_streaming(
file.file, # file-like object
self.config.media_bucket,
asset.storage_key_original,
content_type=asset.content_type,
)
except Exception as e:
logger.error(f"S3 upload failed: {e}")
raise HTTPException(status_code=500, detail="Upload failed")
```
Добавить в `S3Client`:
```python
async def upload_fileobj_streaming(
self,
fileobj,
bucket: str,
key: str,
content_type: str,
chunk_size: int = 8 * 1024 * 1024, # 8MB chunks
):
"""Stream file to S3 in chunks"""
try:
await asyncio.to_thread(
self.client.upload_fileobj,
fileobj,
bucket,
key,
ExtraArgs={
'ContentType': content_type,
'ServerSideEncryption': 'AES256', # Важно!
},
Config=boto3.s3.transfer.TransferConfig(
multipart_threshold=chunk_size,
multipart_chunksize=chunk_size,
),
)
except ClientError as e:
logger.error(f"S3 upload error: {e}")
raise
```
---
### 1.4 Отсутствует CSRF защита
**Серьезность: ВЫСОКАЯ** 🟡 (частично смягчено Bearer токенами)
**Расположение:** `backend/src/app/main.py`
**Текущее состояние:**
- Используются Bearer токены в заголовках → CSRF не критичен
- НО: если в будущем перейдете на cookie-based auth → станет критично
**Рекомендация на будущее:**
```python
from fastapi_csrf_protect import CsrfProtect
@app.post("/api/v1/assets/{asset_id}/delete")
async def delete_asset(
asset_id: str,
csrf_protect: CsrfProtect = Depends(),
):
await csrf_protect.validate_csrf(request)
# ...
```
---
### 1.5 Небезопасная генерация share токенов
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/repositories/share_repository.py:25-27`
**Проблема:**
```python
token = secrets.token_urlsafe(32) # Хорошо!
# НО нет проверки на коллизии
```
**Решение:**
```python
async def create(
self,
owner_user_id: str,
asset_id: Optional[str] = None,
album_id: Optional[str] = None,
expires_at: Optional[datetime] = None,
password_hash: Optional[str] = None,
) -> Share:
# Генерировать с проверкой уникальности
max_attempts = 5
for attempt in range(max_attempts):
token = secrets.token_urlsafe(32)
# Проверить, что токен не существует
existing = await self.session.execute(
select(Share).where(Share.token == token)
)
if not existing.scalar_one_or_none():
break
else:
raise RuntimeError("Failed to generate unique share token")
share = Share(
owner_user_id=owner_user_id,
asset_id=asset_id,
album_id=album_id,
token=token,
expires_at=expires_at,
password_hash=password_hash,
)
self.session.add(share)
await self.session.flush()
await self.session.refresh(share)
return share
```
---
## 🟡 2. ПРОБЛЕМЫ АУТЕНТИФИКАЦИИ И АВТОРИЗАЦИИ
### 2.1 Отсутствует механизм отзыва токенов
**Серьезность: ВЫСОКАЯ** 🔴
**Расположение:** `backend/src/app/infra/security.py`
**Проблема:**
- JWT токены включают `jti` (token ID), но нет механизма отзыва
- Если токен скомпрометирован, он остается валидным до истечения срока (15 минут для access, 14 дней для refresh)
- Пользователь не может "выйти со всех устройств"
**Решение с Redis:**
```python
# В infra/redis_client.py
from redis.asyncio import Redis
class TokenBlacklist:
def __init__(self, redis: Redis):
self.redis = redis
async def revoke_token(self, jti: str, ttl_seconds: int):
"""Добавить токен в blacklist"""
await self.redis.setex(f"blacklist:{jti}", ttl_seconds, "1")
async def is_revoked(self, jti: str) -> bool:
"""Проверить, отозван ли токен"""
return await self.redis.exists(f"blacklist:{jti}") > 0
# В security.py
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
blacklist: TokenBlacklist = Depends(get_token_blacklist),
) -> User:
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
jti: str = payload.get("jti")
# Проверить blacklist
if await blacklist.is_revoked(jti):
raise HTTPException(status_code=401, detail="Token has been revoked")
# ... остальная логика
```
**Добавить endpoint:**
```python
@router.post("/auth/logout")
async def logout(
current_user: CurrentUser,
token: str = Depends(oauth2_scheme),
blacklist: TokenBlacklist = Depends(get_token_blacklist),
):
"""Отозвать текущий токен"""
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
jti = payload.get("jti")
exp = payload.get("exp")
ttl = exp - int(datetime.utcnow().timestamp())
await blacklist.revoke_token(jti, ttl)
return {"message": "Logged out successfully"}
```
---
### 2.2 Нет refresh token rotation
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/api/v1/auth.py`
**Проблема:**
- Нет endpoint для обновления access token через refresh token
- Пользователи должны логиниться заново каждые 15 минут
**Решение:**
```python
@router.post("/auth/refresh", response_model=AuthTokens)
async def refresh_token(
refresh_token: str = Body(..., embed=True),
db: AsyncSession = Depends(get_db),
blacklist: TokenBlacklist = Depends(get_token_blacklist),
):
"""Обновить access token используя refresh token"""
try:
# Декодировать refresh token
payload = jwt.decode(
refresh_token,
settings.JWT_SECRET,
algorithms=["HS256"]
)
# Проверить тип токена
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
# Проверить blacklist
jti = payload.get("jti")
if await blacklist.is_revoked(jti):
raise HTTPException(status_code=401, detail="Token has been revoked")
user_id = payload.get("sub")
# Получить пользователя
user_repo = UserRepository(db)
user = await user_repo.get_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User not found")
# ВАЖНО: Отозвать старый refresh token (rotation)
exp = payload.get("exp")
ttl = exp - int(datetime.utcnow().timestamp())
await blacklist.revoke_token(jti, ttl)
# Сгенерировать новые токены
auth_service = AuthService(db)
tokens = auth_service.create_tokens(user)
return tokens
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
```
**Frontend:**
```typescript
// В api.ts
async refreshAccessToken(): Promise<AuthTokens> {
const refreshToken = localStorage.getItem('refresh_token');
if (!refreshToken) {
throw new Error('No refresh token');
}
const { data } = await this.client.post('/auth/refresh', {
refresh_token: refreshToken,
});
localStorage.setItem('access_token', data.access_token);
localStorage.setItem('refresh_token', data.refresh_token);
return data;
}
// Добавить в interceptor
this.client.interceptors.response.use(
(response) => response,
async (error) => {
const originalRequest = error.config;
if (error.response?.status === 401 && !originalRequest._retry) {
originalRequest._retry = true;
try {
await this.refreshAccessToken();
return this.client(originalRequest);
} catch (refreshError) {
// Refresh failed, redirect to login
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
if (!['/login', '/register'].includes(window.location.pathname)) {
window.location.href = '/login';
}
return Promise.reject(refreshError);
}
}
return Promise.reject(error);
}
);
```
---
### 2.3 Слабая валидация паролей
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/api/schemas.py:16`
**Текущее состояние:**
```python
password: str = Field(min_length=8)
```
Можно установить пароль "12345678" ❌
**Решение:**
```python
import re
from pydantic import field_validator
class UserRegisterRequest(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
@field_validator("password")
@classmethod
def validate_password_strength(cls, v: str) -> str:
"""Проверка сложности пароля"""
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
# Проверки
has_upper = re.search(r'[A-Z]', v)
has_lower = re.search(r'[a-z]', v)
has_digit = re.search(r'\d', v)
has_special = re.search(r'[!@#$%^&*(),.?":{}|<>]', v)
checks_passed = sum([
bool(has_upper),
bool(has_lower),
bool(has_digit),
bool(has_special),
])
if checks_passed < 3:
raise ValueError(
"Password must contain at least 3 of: "
"uppercase letter, lowercase letter, digit, special character"
)
# Проверка на распространенные пароли
common_passwords = [
"password", "12345678", "qwerty123", "admin123"
]
if v.lower() in common_passwords:
raise ValueError("This password is too common")
return v
```
---
### 2.4 Нет блокировки аккаунта после неудачных попыток
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/services/auth_service.py:55-84`
**Проблема:**
Можно делать бесконечные попытки логина (если нет rate limiting)
**Решение с Redis:**
```python
class LoginAttemptTracker:
def __init__(self, redis: Redis):
self.redis = redis
self.max_attempts = 5
self.lockout_duration = 900 # 15 минут
async def record_failed_attempt(self, email: str):
"""Записать неудачную попытку"""
key = f"login_attempts:{email}"
attempts = await self.redis.incr(key)
if attempts == 1:
# Установить TTL на первую попытку
await self.redis.expire(key, 3600) # 1 час
if attempts >= self.max_attempts:
# Заблокировать аккаунт
await self.redis.setex(
f"account_locked:{email}",
self.lockout_duration,
"1"
)
async def clear_attempts(self, email: str):
"""Очистить счетчик после успешного логина"""
await self.redis.delete(f"login_attempts:{email}")
async def is_locked(self, email: str) -> bool:
"""Проверить, заблокирован ли аккаунт"""
return await self.redis.exists(f"account_locked:{email}") > 0
async def get_lockout_remaining(self, email: str) -> int:
"""Получить оставшееся время блокировки (секунды)"""
return await self.redis.ttl(f"account_locked:{email}")
# В auth.py
@router.post("/login", response_model=AuthResponse)
async def login(
data: UserLoginRequest,
session: DatabaseSession,
tracker: LoginAttemptTracker = Depends(get_login_tracker),
):
# Проверить блокировку
if await tracker.is_locked(data.email):
remaining = await tracker.get_lockout_remaining(data.email)
raise HTTPException(
status_code=429,
detail=f"Account locked due to too many failed attempts. "
f"Try again in {remaining // 60} minutes."
)
auth_service = AuthService(session)
try:
user, tokens = await auth_service.login(data.email, data.password)
# Успешный логин - очистить счетчик
await tracker.clear_attempts(data.email)
return AuthResponse(user=user, tokens=tokens)
except HTTPException as e:
if e.status_code == 401:
# Неудачная попытка
await tracker.record_failed_attempt(data.email)
raise
```
---
## 🔍 3. ПРОБЛЕМЫ ВАЛИДАЦИИ ВХОДНЫХ ДАННЫХ
### 3.1 Недостаточная валидация Content-Type
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/api/schemas.py:85-91`
**Текущее состояние:**
```python
@field_validator("content_type")
@classmethod
def validate_content_type(cls, v: str) -> str:
if not (v.startswith("image/") or v.startswith("video/")):
raise ValueError("Only image/* and video/* content types are supported")
return v
```
Принимает ЛЮБОЙ image/* или video/* тип, включая опасные (SVG с JS, вредоносные кодеки)
**Решение - whitelist:**
```python
ALLOWED_IMAGE_TYPES = {
"image/jpeg",
"image/jpg",
"image/png",
"image/gif",
"image/webp",
"image/heic",
"image/heif",
}
ALLOWED_VIDEO_TYPES = {
"video/mp4",
"video/mpeg",
"video/quicktime", # .mov
"video/x-msvideo", # .avi
"video/x-matroska", # .mkv
"video/webm",
}
@field_validator("content_type")
@classmethod
def validate_content_type(cls, v: str) -> str:
v = v.lower().strip()
if v not in ALLOWED_IMAGE_TYPES and v not in ALLOWED_VIDEO_TYPES:
raise ValueError(
f"Content type '{v}' not supported. "
f"Allowed: {', '.join(ALLOWED_IMAGE_TYPES | ALLOWED_VIDEO_TYPES)}"
)
return v
```
---
### 3.2 Отсутствует проверка "магических байтов"
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** Upload flow
**Проблема:**
Полагаемся на content-type от клиента. Вредоносный файл может притвориться изображением.
**Решение с python-magic:**
```bash
pip install python-magic-bin # Windows
pip install python-magic # Linux/Mac
```
```python
import magic
async def verify_file_type(file: UploadFile, expected_type: str) -> bool:
"""Проверить реальный тип файла по магическим байтам"""
# Прочитать первые 2048 байт
header = await file.read(2048)
await file.seek(0) # Вернуться в начало
# Определить MIME type
mime = magic.from_buffer(header, mime=True)
# Проверить соответствие
if expected_type.startswith("image/"):
return mime in ALLOWED_IMAGE_TYPES
elif expected_type.startswith("video/"):
return mime in ALLOWED_VIDEO_TYPES
return False
# В asset_service.py
async def create_upload(
self,
user_id: str,
original_filename: str,
content_type: str,
size_bytes: int,
folder_id: Optional[str] = None,
) -> tuple[Asset, dict]:
# ... существующий код ...
# Добавить в метаданные для проверки при finalize
asset.metadata = {
"expected_content_type": content_type,
"needs_verification": True,
}
async def finalize_upload(
self,
user_id: str,
asset_id: str,
etag: Optional[str] = None,
sha256: Optional[str] = None,
) -> Asset:
# ... существующий код ...
# Проверить магические байты если требуется
if asset.metadata.get("needs_verification"):
# Загрузить первые байты из S3
try:
response = await self.s3_client.get_object_range(
self.config.media_bucket,
asset.storage_key_original,
bytes_range=(0, 2047),
)
header = response['Body'].read()
mime = magic.from_buffer(header, mime=True)
expected = asset.metadata.get("expected_content_type")
# Проверить соответствие
if (expected.startswith("image/") and mime not in ALLOWED_IMAGE_TYPES) or \
(expected.startswith("video/") and mime not in ALLOWED_VIDEO_TYPES):
# Удалить файл из S3
await self.s3_client.delete_object(
self.config.media_bucket,
asset.storage_key_original
)
asset.status = AssetStatus.FAILED
await self.asset_repo.update(asset)
raise HTTPException(
status_code=400,
detail="File type verification failed"
)
except Exception as e:
logger.warning(f"File verification failed: {e}")
```
---
### 3.3 Защита от Path Traversal реализована ✅
**Серьезность: N/A (ХОРОШО)** ✅
**Расположение:** `backend/src/app/services/asset_service.py:23-50`
**Статус:** Правильно реализовано:
- Используется `os.path.basename()`
- Удаляются path separators и null bytes
- Ограничение длины имени файла
Это **правильная** реализация. Оставить как есть.
---
## 🗄️ 4. БЕЗОПАСНОСТЬ S3 ХРАНИЛИЩА
### 4.1 Отсутствует шифрование S3 объектов
**Серьезность: СРЕДНЯЯ-ВЫСОКАЯ** 🟡
**Расположение:** `backend/src/app/infra/s3_client.py`
**Проблема:**
Файлы хранятся в S3 без шифрования на стороне сервера.
**Решение:**
```python
# Во ВСЕХ методах загрузки добавить ServerSideEncryption
async def upload_object(
self,
bucket: str,
key: str,
data: bytes,
content_type: str,
) -> None:
try:
await asyncio.to_thread(
self.client.put_object,
Bucket=bucket,
Key=key,
Body=data,
ContentType=content_type,
ServerSideEncryption='AES256', # ← ДОБАВИТЬ
)
except ClientError as e:
logger.error(f"S3 upload error: {e}")
raise
# Для presigned URLs тоже нужно добавить
def generate_presigned_post(
self,
bucket: str,
key: str,
content_type: str,
max_size_bytes: int,
expires_in: int = 600,
) -> dict:
try:
return self.client.generate_presigned_post(
Bucket=bucket,
Key=key,
Fields={
"Content-Type": content_type,
"x-amz-server-side-encryption": "AES256", # ← ДОБАВИТЬ
},
Conditions=[
{"Content-Type": content_type},
["content-length-range", 1, max_size_bytes],
{"x-amz-server-side-encryption": "AES256"}, # ← ДОБАВИТЬ
],
ExpiresIn=expires_in,
)
except ClientError as e:
logger.error(f"Presigned POST generation error: {e}")
raise
```
**Важно:** Также настроить bucket policy:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyUnencryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::your-bucket-name/*",
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "AES256"
}
}
}
]
}
```
---
### 4.2 Hardcoded bucket name для корзины
**Серьезность: НИЗКАЯ** 🟢
**Расположение:** `backend/src/app/infra/s3_client.py:28, :152`
**Проблема:**
```python
TRASH_BUCKET = "itcloud-trash" # Hardcoded
```
**Решение:**
```python
# В config.py
class Settings(BaseSettings):
# ... existing ...
MEDIA_BUCKET: str
TRASH_BUCKET: str = "itcloud-trash" # default, но можно переопределить
# В s3_client.py
def __init__(self, config: Settings):
self.config = config
# ... existing ...
self.trash_bucket = config.TRASH_BUCKET # Использовать из конфига
```
---
### 4.3 Слишком длинный TTL для pre-signed URLs
**Серьезность: НИЗКАЯ** 🟢
**Расположение:** `backend/src/app/infra/config.py:43`
**Текущее:**
```python
SIGNED_URL_TTL_SECONDS: int = 600 # 10 минут
```
**Рекомендация:**
Для продакшена уменьшить до 300 секунд (5 минут) для чувствительного контента:
```python
SIGNED_URL_TTL_SECONDS: int = Field(default=300, ge=60, le=3600)
```
---
## 🔐 5. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ ПО БЕЗОПАСНОСТИ
### 5.1 CORS Headers слишком permissive
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/main.py:34-37`
**Проблема:**
```python
allow_headers=["*"], # Разрешены ВСЕ заголовки
expose_headers=["*"], # Экспортируются ВСЕ заголовки
```
**Решение:**
```python
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS.split(","),
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
# Whitelist конкретных заголовков
allow_headers=[
"Authorization",
"Content-Type",
"X-Requested-With",
"Accept",
],
# Экспортировать только нужные
expose_headers=[
"Content-Length",
"Content-Type",
"X-Total-Count",
],
max_age=3600,
)
```
---
### 5.2 Добавить Security Headers
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/main.py`
**Добавить middleware для security headers:**
```python
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
# Защита от clickjacking
response.headers["X-Frame-Options"] = "DENY"
# Защита от XSS
response.headers["X-Content-Type-Options"] = "nosniff"
# Защита от XSS для старых браузеров
response.headers["X-XSS-Protection"] = "1; mode=block"
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"img-src 'self' data: https:; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline';"
)
# HSTS (если используется HTTPS)
if request.url.scheme == "https":
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains"
)
# Referrer Policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions Policy
response.headers["Permissions-Policy"] = (
"geolocation=(), microphone=(), camera=()"
)
return response
app.add_middleware(SecurityHeadersMiddleware)
```
---
### 5.3 Токены в localStorage (Frontend)
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `frontend/src/services/api.ts:30-34, :62-64`
**Проблема:**
localStorage доступен для XSS атак.
**Альтернативы:**
1. **httpOnly cookies** для refresh token (лучше всего)
2. **Memory storage** для access token (но теряется при refresh страницы)
**Компромиссное решение:**
```typescript
// Хранить refresh token в httpOnly cookie (настроить на бэкенде)
// Access token держать в памяти
class TokenManager {
private accessToken: string | null = null;
setAccessToken(token: string) {
this.accessToken = token;
// Также можно в sessionStorage (лучше чем localStorage)
sessionStorage.setItem('access_token', token);
}
getAccessToken(): string | null {
if (this.accessToken) return this.accessToken;
return sessionStorage.getItem('access_token');
}
clearTokens() {
this.accessToken = null;
sessionStorage.removeItem('access_token');
// Refresh token будет удален через httpOnly cookie с бэкенда
}
}
```
---
## 🏗️ 6. АРХИТЕКТУРНЫЕ УЛУЧШЕНИЯ
### 6.1 Отсутствует Soft Delete для Assets ❌
**Серьезность: КРИТИЧЕСКАЯ** 🔴
**Расположение:** `backend/src/app/domain/models.py`
**Проблема:**
В спецификации (CLAUDE.md) указано:
> "Soft delete to trash with restore capability"
Но в модели Asset нет поля `deleted_at`:
```python
class Asset(Base):
__tablename__ = "assets"
# ... fields ...
# НЕТ: deleted_at: Mapped[Optional[datetime]] = mapped_column(DateTime)
```
**Решение:**
1. **Создать миграцию:**
```bash
cd backend
alembic revision -m "add_soft_delete_to_assets"
```
2. **Добавить в миграцию:**
```python
def upgrade() -> None:
op.add_column('assets', sa.Column('deleted_at', sa.DateTime(), nullable=True))
op.create_index('ix_assets_deleted_at', 'assets', ['deleted_at'])
def downgrade() -> None:
op.drop_index('ix_assets_deleted_at', table_name='assets')
op.drop_column('assets', 'deleted_at')
```
3. **Обновить модель:**
```python
from typing import Optional
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import DateTime
class Asset(Base):
__tablename__ = "assets"
# ... existing fields ...
deleted_at: Mapped[Optional[datetime]] = mapped_column(
DateTime, nullable=True, index=True, default=None
)
```
4. **Добавить методы в AssetRepository:**
```python
async def soft_delete(self, asset: Asset) -> Asset:
"""Мягкое удаление (в корзину)"""
asset.deleted_at = datetime.utcnow()
await self.session.flush()
await self.session.refresh(asset)
return asset
async def restore(self, asset: Asset) -> Asset:
"""Восстановить из корзины"""
asset.deleted_at = None
await self.session.flush()
await self.session.refresh(asset)
return asset
async def list_trash(
self,
user_id: str,
limit: int = 50,
cursor: Optional[str] = None,
) -> list[Asset]:
"""Список удаленных файлов"""
query = (
select(Asset)
.where(Asset.user_id == user_id)
.where(Asset.deleted_at.isnot(None))
.order_by(Asset.deleted_at.desc())
)
if cursor:
query = query.where(Asset.id < cursor)
query = query.limit(limit + 1)
result = await self.session.execute(query)
return list(result.scalars().all())
async def hard_delete(self, asset: Asset) -> None:
"""Полное удаление (безвозвратно)"""
await self.session.delete(asset)
await self.session.flush()
```
5. **Добавить endpoints:**
```python
# В assets.py
@router.get("/trash", response_model=AssetListResponse)
async def list_trash(
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
limit: int = Query(50, ge=1, le=100),
cursor: Optional[str] = Query(None),
):
"""Список файлов в корзине"""
asset_service = AssetService(session, s3_client)
assets, next_cursor, has_more = await asset_service.list_trash(
user_id=current_user.id,
limit=limit,
cursor=cursor,
)
return AssetListResponse(
items=assets,
next_cursor=next_cursor,
has_more=has_more,
)
@router.post("/{asset_id}/restore", response_model=AssetResponse)
async def restore_asset(
asset_id: str,
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
):
"""Восстановить файл из корзины"""
asset_service = AssetService(session, s3_client)
asset = await asset_service.restore_asset(
user_id=current_user.id,
asset_id=asset_id,
)
return asset
@router.delete("/{asset_id}/purge")
async def purge_asset(
asset_id: str,
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
):
"""Удалить файл безвозвратно из корзины"""
asset_service = AssetService(session, s3_client)
await asset_service.purge_asset(
user_id=current_user.id,
asset_id=asset_id,
)
return {"message": "Asset permanently deleted"}
```
6. **Обновить существующий DELETE endpoint:**
```python
@router.delete("/{asset_id}")
async def delete_asset(
asset_id: str,
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
):
"""Переместить файл в корзину (мягкое удаление)"""
asset_service = AssetService(session, s3_client)
await asset_service.soft_delete_asset(
user_id=current_user.id,
asset_id=asset_id,
)
return {"message": "Asset moved to trash"}
```
7. **Обновить list_assets чтобы НЕ показывать удаленные:**
```python
# В asset_repository.py
async def list_by_user(
self,
user_id: str,
folder_id: Optional[str] = None,
limit: int = 50,
cursor: Optional[str] = None,
) -> list[Asset]:
query = (
select(Asset)
.where(Asset.user_id == user_id)
.where(Asset.deleted_at.is_(None)) # ← ДОБАВИТЬ
.where(Asset.status == AssetStatus.READY)
)
# ... rest of logic
```
8. **Background job для очистки старых файлов:**
```python
# В tasks/cleanup_tasks.py
from datetime import datetime, timedelta
async def cleanup_old_trash():
"""Удалить файлы, которые в корзине >30 дней"""
async with get_db_session() as session:
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
assets = await session.execute(
select(Asset).where(
Asset.deleted_at < thirty_days_ago
)
)
for asset in assets.scalars():
# Удалить из S3
await s3_client.delete_object(bucket, asset.storage_key_original)
if asset.storage_key_thumb:
await s3_client.delete_object(bucket, asset.storage_key_thumb)
# Удалить из БД
await session.delete(asset)
await session.commit()
```
---
### 6.2 Отсутствуют индексы в БД
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/domain/models.py`
**Проблема:**
Медленные запросы при больших объемах данных.
**Решение - добавить составные индексы:**
```python
# В models.py
from sqlalchemy import Index
class Asset(Base):
__tablename__ = "assets"
# ... fields ...
# Составные индексы для часто используемых запросов
__table_args__ = (
Index(
'ix_assets_user_folder_created',
'user_id', 'folder_id', 'created_at'
),
Index(
'ix_assets_user_status',
'user_id', 'status'
),
Index(
'ix_assets_user_deleted',
'user_id', 'deleted_at'
),
)
class Share(Base):
__tablename__ = "shares"
# ... fields ...
__table_args__ = (
Index(
'ix_shares_token_expires',
'token', 'expires_at'
),
Index(
'ix_shares_owner_created',
'owner_user_id', 'created_at'
),
)
class Folder(Base):
__tablename__ = "folders"
# ... fields ...
__table_args__ = (
Index(
'ix_folders_user_parent',
'user_id', 'parent_folder_id'
),
)
```
**Создать миграцию:**
```bash
alembic revision -m "add_composite_indexes"
```
---
### 6.3 ZIP создается в памяти
**Серьезность: ВЫСОКАЯ** 🔴
**Расположение:** `backend/src/app/services/batch_operations_service.py:206`
**Проблема:**
```python
zip_buffer = io.BytesIO() # Весь ZIP в памяти!
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# Добавление файлов...
```
При скачивании 10GB файлов → OOM.
**Решение - стримить через temp file:**
```python
from app.services.batch_operations_service import temp_file_manager
async def download_assets_batch(
self,
user_id: str,
asset_ids: list[str],
) -> tuple[str, bytes]:
"""Скачать несколько файлов как ZIP архив"""
# Использовать временный файл вместо памяти
async with temp_file_manager(suffix='.zip') as temp_path:
# Создать ZIP в temp файле
with zipfile.ZipFile(temp_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for asset_id in asset_ids:
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
continue
# Стримить файл из S3
try:
response = await asyncio.to_thread(
self.s3_client.client.get_object,
Bucket=self.config.media_bucket,
Key=asset.storage_key_original,
)
# Читать чанками
with response['Body'] as stream:
# Генерировать уникальное имя файла
base_name = sanitize_filename(asset.original_filename)
unique_name = self._get_unique_filename(
zip_file,
base_name
)
# Записать в ZIP chunk by chunk
with zip_file.open(unique_name, 'w') as dest:
while True:
chunk = stream.read(8 * 1024 * 1024) # 8MB
if not chunk:
break
dest.write(chunk)
except Exception as e:
logger.warning(f"Failed to add asset {asset_id}: {e}")
continue
# Прочитать готовый ZIP
with open(temp_path, 'rb') as f:
zip_data = f.read()
filename = f"assets_batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip"
return filename, zip_data
# Но ЛУЧШЕ стримить ответ напрямую без чтения в память:
from starlette.responses import FileResponse
@router.post("/batch/download")
async def download_batch(
data: BatchDownloadRequest,
current_user: CurrentUser,
session: DatabaseSession,
):
"""Скачать файлы как ZIP (streaming response)"""
# Создать temp ZIP
async with temp_file_manager(suffix='.zip') as temp_path:
batch_service = BatchOperationsService(session, s3_client)
# Создать ZIP в temp файле (не загружая в память)
await batch_service.create_zip_file(
user_id=current_user.id,
asset_ids=data.asset_ids,
output_path=temp_path,
)
# Вернуть как streaming response
filename = f"assets_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip"
return FileResponse(
path=temp_path,
media_type='application/zip',
filename=filename,
background=BackgroundTask(os.unlink, temp_path), # Удалить после отправки
)
```
---
### 6.4 Смешанные обязанности в FolderService
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/services/folder_service.py:195-223`
**Проблема:**
```python
# Строка 207
# TODO: Should use AssetService methods here
asset.folder_id = None
await self.asset_repo.update(asset)
```
FolderService напрямую работает с AssetRepository, нарушая SRP.
**Решение:**
```python
# Создать Orchestration Service
class FolderManagementService:
"""Оркестрирует операции над папками и связанными ресурсами"""
def __init__(
self,
session: AsyncSession,
s3_client: S3Client,
):
self.folder_service = FolderService(session)
self.asset_service = AssetService(session, s3_client)
async def delete_folder_with_contents(
self,
user_id: str,
folder_id: str,
delete_assets: bool = False,
) -> None:
"""
Удалить папку и обработать связанные assets.
Args:
user_id: ID пользователя
folder_id: ID папки
delete_assets: True - удалить assets, False - переместить в root
"""
# Получить все assets в папке и подпапках
assets = await self.asset_service.list_assets_in_folder_recursive(
user_id=user_id,
folder_id=folder_id,
)
if delete_assets:
# Удалить все assets (soft delete)
for asset in assets:
await self.asset_service.soft_delete_asset(
user_id=user_id,
asset_id=asset.id,
)
else:
# Переместить в root
for asset in assets:
await self.asset_service.move_asset(
user_id=user_id,
asset_id=asset.id,
target_folder_id=None,
)
# Удалить папку
await self.folder_service.delete_folder(
user_id=user_id,
folder_id=folder_id,
)
```
---
### 6.5 Отсутствуют Foreign Keys в моделях
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/domain/models.py`
**Проблема:**
Нет явных relationship и foreign key constraints.
**Решение:**
```python
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Asset(Base):
__tablename__ = "assets"
# ... existing fields ...
user_id: Mapped[str] = mapped_column(
String(36),
ForeignKey("users.id", ondelete="CASCADE"),
index=True
)
folder_id: Mapped[Optional[str]] = mapped_column(
String(36),
ForeignKey("folders.id", ondelete="SET NULL"),
nullable=True,
index=True
)
# Relationships
user: Mapped["User"] = relationship("User", back_populates="assets")
folder: Mapped[Optional["Folder"]] = relationship("Folder", back_populates="assets")
class Folder(Base):
__tablename__ = "folders"
# ... existing fields ...
user_id: Mapped[str] = mapped_column(
String(36),
ForeignKey("users.id", ondelete="CASCADE"),
index=True
)
parent_folder_id: Mapped[Optional[str]] = mapped_column(
String(36),
ForeignKey("folders.id", ondelete="CASCADE"),
nullable=True,
index=True
)
# Relationships
user: Mapped["User"] = relationship("User", back_populates="folders")
parent: Mapped[Optional["Folder"]] = relationship(
"Folder",
remote_side="Folder.id",
back_populates="children"
)
children: Mapped[list["Folder"]] = relationship(
"Folder",
back_populates="parent",
cascade="all, delete-orphan"
)
assets: Mapped[list["Asset"]] = relationship(
"Asset",
back_populates="folder",
cascade="all, delete-orphan"
)
class User(Base):
__tablename__ = "users"
# ... existing fields ...
# Relationships
assets: Mapped[list["Asset"]] = relationship(
"Asset",
back_populates="user",
cascade="all, delete-orphan"
)
folders: Mapped[list["Folder"]] = relationship(
"Folder",
back_populates="user",
cascade="all, delete-orphan"
)
```
**Миграция:**
```bash
alembic revision -m "add_foreign_keys_and_relationships"
```
---
## 📦 7. ОТСУТСТВУЮЩИЕ КЛЮЧЕВЫЕ ФУНКЦИИ
### 7.1 Password Reset Flow
**Приоритет: ВЫСОКИЙ** 🔴
**Что нужно:**
1. Endpoint `/auth/forgot-password` - отправить email с токеном
2. Endpoint `/auth/reset-password` - сбросить пароль по токену
3. Email сервис (SMTP)
4. Токены сброса пароля с expiration
**Пример реализации:**
```python
# В models.py
class PasswordResetToken(Base):
__tablename__ = "password_reset_tokens"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid4()))
user_id: Mapped[str] = mapped_column(String(36), ForeignKey("users.id"), index=True)
token: Mapped[str] = mapped_column(String(64), unique=True, index=True)
expires_at: Mapped[datetime] = mapped_column(DateTime)
used_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
# В auth_service.py
async def request_password_reset(self, email: str) -> None:
"""Создать токен сброса пароля и отправить email"""
user = await self.user_repo.get_by_email(email)
if not user:
# НЕ раскрывать что пользователь не найден (безопасность)
logger.info(f"Password reset requested for non-existent email: {email}")
return
# Создать токен
token = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(hours=1)
reset_token = PasswordResetToken(
user_id=user.id,
token=token,
expires_at=expires_at,
)
self.session.add(reset_token)
await self.session.commit()
# Отправить email
reset_url = f"{settings.FRONTEND_URL}/reset-password?token={token}"
await email_service.send_password_reset(user.email, reset_url)
async def reset_password(self, token: str, new_password: str) -> None:
"""Сбросить пароль по токену"""
# Найти токен
result = await self.session.execute(
select(PasswordResetToken).where(
PasswordResetToken.token == token,
PasswordResetToken.used_at.is_(None),
PasswordResetToken.expires_at > datetime.utcnow(),
)
)
reset_token = result.scalar_one_or_none()
if not reset_token:
raise HTTPException(status_code=400, detail="Invalid or expired token")
# Обновить пароль
user = await self.user_repo.get_by_id(reset_token.user_id)
password_hash = self.password_context.hash(new_password)
user.password_hash = password_hash
# Пометить токен как использованный
reset_token.used_at = datetime.utcnow()
await self.session.commit()
# Endpoints
@router.post("/auth/forgot-password")
async def forgot_password(
data: ForgotPasswordRequest,
session: DatabaseSession,
):
"""Запросить сброс пароля"""
auth_service = AuthService(session)
await auth_service.request_password_reset(data.email)
# Всегда возвращать успех (не раскрывать существование email)
return {"message": "If email exists, reset link has been sent"}
@router.post("/auth/reset-password")
async def reset_password(
data: ResetPasswordRequest,
session: DatabaseSession,
):
"""Сбросить пароль"""
auth_service = AuthService(session)
await auth_service.reset_password(data.token, data.new_password)
return {"message": "Password reset successful"}
```
---
### 7.2 Storage Quota Management
**Приоритет: ВЫСОКИЙ** 🔴
**Что нужно:**
1. Поле `storage_quota_bytes` и `storage_used_bytes` в User
2. Проверка квоты перед загрузкой
3. Endpoint для получения статистики использования
4. Background job для пересчета использования
**Реализация:**
```python
# В models.py
class User(Base):
__tablename__ = "users"
# ... existing fields ...
storage_quota_bytes: Mapped[int] = mapped_column(
BigInteger,
default=10 * 1024 * 1024 * 1024 # 10GB по умолчанию
)
storage_used_bytes: Mapped[int] = mapped_column(
BigInteger,
default=0,
index=True
)
# В asset_service.py
async def create_upload(
self,
user_id: str,
original_filename: str,
content_type: str,
size_bytes: int,
folder_id: Optional[str] = None,
) -> tuple[Asset, dict]:
# Проверить квоту ДО создания upload
user = await self.user_repo.get_by_id(user_id)
if user.storage_used_bytes + size_bytes > user.storage_quota_bytes:
remaining = user.storage_quota_bytes - user.storage_used_bytes
raise HTTPException(
status_code=413,
detail=f"Storage quota exceeded. "
f"Available: {remaining / 1024 / 1024:.2f} MB, "
f"Required: {size_bytes / 1024 / 1024:.2f} MB"
)
# ... existing code ...
async def finalize_upload(
self,
user_id: str,
asset_id: str,
etag: Optional[str] = None,
sha256: Optional[str] = None,
) -> Asset:
# ... existing code ...
# Обновить использованное место
user = await self.user_repo.get_by_id(user_id)
user.storage_used_bytes += asset.size_bytes
await self.session.commit()
return asset
async def soft_delete_asset(
self,
user_id: str,
asset_id: str,
) -> Asset:
# ... existing code ...
# НЕ освобождать место при soft delete
# Место освободится при purge
return asset
async def purge_asset(
self,
user_id: str,
asset_id: str,
) -> None:
"""Безвозвратно удалить asset"""
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(status_code=404, detail="Asset not found")
# Удалить из S3
await self.s3_client.delete_object(
self.config.media_bucket,
asset.storage_key_original
)
if asset.storage_key_thumb:
await self.s3_client.delete_object(
self.config.media_bucket,
asset.storage_key_thumb
)
# Освободить место в квоте
user = await self.user_repo.get_by_id(user_id)
user.storage_used_bytes = max(0, user.storage_used_bytes - asset.size_bytes)
# Удалить из БД
await self.asset_repo.hard_delete(asset)
await self.session.commit()
# Endpoint для статистики
@router.get("/users/me/storage", response_model=StorageStatsResponse)
async def get_storage_stats(
current_user: CurrentUser,
):
"""Получить статистику использования хранилища"""
return StorageStatsResponse(
quota_bytes=current_user.storage_quota_bytes,
used_bytes=current_user.storage_used_bytes,
available_bytes=current_user.storage_quota_bytes - current_user.storage_used_bytes,
percentage_used=round(
(current_user.storage_used_bytes / current_user.storage_quota_bytes) * 100,
2
),
)
```
---
### 7.3 EXIF Metadata Extraction
**Приоритет: СРЕДНЯЯ** 🟡
**Реализация:**
```bash
pip install pillow pillow-heif # Для изображений
pip install ffmpeg-python # Для видео
```
```python
# В tasks/thumbnail_tasks.py
from PIL import Image
from PIL.ExifTags import TAGS
import ffmpeg
def extract_image_metadata(file_path: str) -> dict:
"""Извлечь EXIF из изображения"""
try:
with Image.open(file_path) as img:
exif_data = img._getexif()
if not exif_data:
return {}
metadata = {}
for tag_id, value in exif_data.items():
tag = TAGS.get(tag_id, tag_id)
metadata[tag] = value
return {
"width": img.width,
"height": img.height,
"captured_at": metadata.get("DateTimeOriginal"),
"camera_make": metadata.get("Make"),
"camera_model": metadata.get("Model"),
"gps_latitude": metadata.get("GPSLatitude"),
"gps_longitude": metadata.get("GPSLongitude"),
}
except Exception as e:
logger.error(f"EXIF extraction failed: {e}")
return {}
def extract_video_metadata(file_path: str) -> dict:
"""Извлечь метаданные из видео"""
try:
probe = ffmpeg.probe(file_path)
video_stream = next(
s for s in probe['streams'] if s['codec_type'] == 'video'
)
return {
"width": int(video_stream.get('width', 0)),
"height": int(video_stream.get('height', 0)),
"duration_sec": float(probe['format'].get('duration', 0)),
"codec": video_stream.get('codec_name'),
"bitrate": int(probe['format'].get('bit_rate', 0)),
}
except Exception as e:
logger.error(f"Video metadata extraction failed: {e}")
return {}
@celery_app.task
def generate_thumbnail_and_extract_metadata(asset_id: str):
"""Генерировать thumbnail И извлечь metadata"""
# ... existing thumbnail generation ...
# Извлечь metadata
if asset.type == AssetType.PHOTO:
metadata = extract_image_metadata(temp_file_path)
elif asset.type == AssetType.VIDEO:
metadata = extract_video_metadata(temp_file_path)
# Обновить asset
if metadata.get("width"):
asset.width = metadata["width"]
if metadata.get("height"):
asset.height = metadata["height"]
if metadata.get("captured_at"):
asset.captured_at = parse_datetime(metadata["captured_at"])
if metadata.get("duration_sec"):
asset.duration_sec = metadata["duration_sec"]
db.commit()
```
---
### 7.4 Asset Search
**Приоритет: ВЫСОКИЙ** 🔴
**Простая реализация с PostgreSQL full-text search:**
```python
# В asset_repository.py
async def search_assets(
self,
user_id: str,
query: str,
limit: int = 50,
) -> list[Asset]:
"""Поиск assets по имени файла"""
search_query = (
select(Asset)
.where(Asset.user_id == user_id)
.where(Asset.deleted_at.is_(None))
.where(Asset.status == AssetStatus.READY)
.where(Asset.original_filename.ilike(f"%{query}%"))
.order_by(Asset.created_at.desc())
.limit(limit)
)
result = await self.session.execute(search_query)
return list(result.scalars().all())
# Endpoint
@router.get("/assets/search", response_model=AssetListResponse)
async def search_assets(
q: str = Query(..., min_length=1),
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
limit: int = Query(50, ge=1, le=100),
):
"""Поиск файлов по имени"""
asset_service = AssetService(session, s3_client)
assets = await asset_service.search_assets(
user_id=current_user.id,
query=q,
limit=limit,
)
return AssetListResponse(
items=assets,
next_cursor=None,
has_more=False,
)
```
---
### 7.5 Database Backup Strategy
**Приоритет: КРИТИЧЕСКИЙ** 🔴
**Для SQLite:**
```bash
#!/bin/bash
# backup_db.sh
BACKUP_DIR="/backups"
DB_PATH="/app/data/app.db"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/backup_$TIMESTAMP.db"
# Создать backup
sqlite3 $DB_PATH ".backup '$BACKUP_FILE'"
# Сжать
gzip $BACKUP_FILE
# Загрузить в S3
aws s3 cp "${BACKUP_FILE}.gz" "s3://your-backup-bucket/database/"
# Удалить старые локальные backups (старше 7 дней)
find $BACKUP_DIR -name "backup_*.db.gz" -mtime +7 -delete
# Удалить старые S3 backups (старше 30 дней)
aws s3 ls "s3://your-backup-bucket/database/" | \
awk '{print $4}' | \
while read file; do
# ... deletion logic
done
```
**Cron job:**
```cron
# Каждые 6 часов
0 */6 * * * /app/scripts/backup_db.sh >> /var/log/backup.log 2>&1
```
**Для PostgreSQL:**
```bash
#!/bin/bash
# pg_backup.sh
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="backup_$TIMESTAMP.sql.gz"
# pg_dump с сжатием
pg_dump -h $DB_HOST -U $DB_USER -d $DB_NAME | gzip > "/tmp/$BACKUP_FILE"
# Загрузить в S3
aws s3 cp "/tmp/$BACKUP_FILE" "s3://your-backup-bucket/database/"
# Cleanup
rm "/tmp/$BACKUP_FILE"
```
---
## 🧪 8. ТЕСТИРОВАНИЕ
### 8.1 Текущее покрытие: <10% ❌
**Найдено только:** `backend/tests/test_security.py`
**Что КРИТИЧЕСКИ нужно:**
1. **Unit тесты для services:**
```python
# tests/unit/services/test_asset_service.py
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.mark.asyncio
async def test_create_upload_exceeds_quota():
"""Тест: загрузка превышает квоту"""
user = User(
id="user1",
storage_quota_bytes=1000,
storage_used_bytes=900,
)
asset_service = AssetService(mock_session, mock_s3)
with pytest.raises(HTTPException) as exc:
await asset_service.create_upload(
user_id=user.id,
original_filename="large.jpg",
content_type="image/jpeg",
size_bytes=200, # 900 + 200 > 1000
)
assert exc.value.status_code == 413
assert "quota exceeded" in exc.value.detail.lower()
```
2. **Integration тесты для API:**
```python
# tests/integration/test_auth_api.py
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_login_rate_limiting(client: AsyncClient):
"""Тест: rate limiting блокирует brute force"""
# 5 неудачных попыток
for i in range(5):
response = await client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "wrong"
})
assert response.status_code == 401
# 6-я попытка должна быть заблокирована
response = await client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "wrong"
})
assert response.status_code == 429
```
3. **Security тесты:**
```python
# tests/security/test_path_traversal.py
@pytest.mark.asyncio
async def test_filename_sanitization():
"""Тест: path traversal защита"""
malicious_filenames = [
"../../../etc/passwd",
"..\\..\\..\\windows\\system32",
"test\x00.jpg.exe",
"folder/../../../secret.txt",
]
for filename in malicious_filenames:
sanitized = sanitize_filename(filename)
# Проверить, что путь не содержит separators
assert "/" not in sanitized
assert "\\" not in sanitized
assert "\x00" not in sanitized
assert ".." not in sanitized
```
4. **Load тесты:**
```python
# tests/load/test_upload_performance.py
from locust import HttpUser, task, between
class UploadUser(HttpUser):
wait_time = between(1, 3)
@task
def upload_file(self):
# Симулировать загрузку файла
files = {'file': ('test.jpg', b'fake image data', 'image/jpeg')}
self.client.post("/api/v1/uploads/create", files=files)
```
**Цель:** Достичь минимум 70% code coverage
---
## 📊 9. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ
### 9.1 Frontend Type Safety
**Проблема:** Много `any` типов в `frontend/src/services/api.ts`
**Решение:**
```typescript
// Заменить все any на конкретные типы
export interface Folder {
id: string;
user_id: string;
name: string;
parent_folder_id: string | null;
created_at: string;
updated_at: string;
}
export interface FolderListResponse {
items: Folder[];
}
export interface FolderBreadcrumb {
id: string;
name: string;
}
// В api.ts
async listFolders(
parentFolderId?: string | null,
all: boolean = false
): Promise<FolderListResponse> {
// ...
}
async getFolder(folderId: string): Promise<Folder> {
// ...
}
async getFolderBreadcrumbs(folderId: string): Promise<FolderBreadcrumb[]> {
// ...
}
```
---
### 9.2 Logging Strategy
**Добавить structured logging:**
```python
# В config.py
import sys
from loguru import logger
# Настроить loguru
logger.remove() # Удалить дефолтный handler
if settings.APP_ENV == "prod":
# Production: JSON формат для парсинга
logger.add(
sys.stdout,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}",
level="INFO",
serialize=True, # JSON output
)
else:
# Development: красивый формат
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="DEBUG",
colorize=True,
)
# Логировать в файл
logger.add(
"logs/app.log",
rotation="100 MB",
retention="30 days",
compression="zip",
level="INFO",
)
# Middleware для логирования всех requests
from starlette.middleware.base import BaseHTTPMiddleware
import time
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.time()
# Логировать request
logger.info(
f"Request started",
extra={
"method": request.method,
"path": request.url.path,
"client": request.client.host if request.client else None,
}
)
try:
response = await call_next(request)
# Логировать response
duration = time.time() - start_time
logger.info(
f"Request completed",
extra={
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": round(duration * 1000, 2),
}
)
return response
except Exception as e:
duration = time.time() - start_time
logger.error(
f"Request failed",
extra={
"method": request.method,
"path": request.url.path,
"error": str(e),
"duration_ms": round(duration * 1000, 2),
}
)
raise
app.add_middleware(RequestLoggingMiddleware)
```
---
### 9.3 Environment Validation
**Проверять критичные env vars при старте:**
```python
# В main.py
from app.infra.config import settings
def validate_production_config():
"""Проверить конфигурацию для продакшена"""
errors = []
if settings.APP_ENV == "prod":
# Проверить JWT secret
if settings.JWT_SECRET in ["your-secret-key-change-this-in-production", "changeme"]:
errors.append("JWT_SECRET must be changed in production")
# Проверить S3 конфигурацию
if not settings.S3_ENDPOINT_URL or "localhost" in settings.S3_ENDPOINT_URL:
errors.append("S3_ENDPOINT_URL must be set to production S3")
# Проверить database
if "sqlite" in settings.DATABASE_URL.lower():
logger.warning("Using SQLite in production - consider PostgreSQL")
# Проверить CORS
if "*" in settings.CORS_ORIGINS:
errors.append("CORS_ORIGINS should not contain '*' in production")
if errors:
logger.error("Production configuration errors:")
for error in errors:
logger.error(f" - {error}")
raise ValueError("Invalid production configuration")
@app.on_event("startup")
async def startup():
validate_production_config()
logger.info(f"Application started in {settings.APP_ENV} mode")
```
---
## 🎯 10. ROADMAP ПО ПРИОРИТЕТАМ
### ФАЗА 1: Критическая безопасность (1-2 недели) 🔴
**Блокирует продакшен**
1. ✅ Implement rate limiting (slowapi)
2. ✅ Add token revocation mechanism (Redis blacklist)
3. ✅ Implement refresh token rotation
4. ✅ Add storage quota management
5. ✅ Enable S3 server-side encryption
6. ✅ Fix memory exhaustion in file uploads (streaming)
7. ✅ Add strong password validation
8. ✅ Implement account lockout after failed attempts
9. ✅ Reduce error message verbosity
10. ✅ Add security headers middleware
**Результат:** Приложение защищено от базовых атак
---
### ФАЗА 2: Core Features (2-3 недели) 🟡
**Необходимо для MVP**
1. ✅ Implement soft delete / trash functionality (SPEC REQUIREMENT)
2. ✅ Add password reset flow
3. ✅ Implement multipart upload для больших файлов
4. ✅ Add asset search by filename
5. ✅ Extract EXIF metadata (captured_at, dimensions)
6. ✅ Add database indexes для производительности
7. ✅ Fix ZIP creation (stream instead of memory)
8. ✅ Add foreign key constraints
**Результат:** Полнофункциональное MVP
---
### ФАЗА 3: Production Readiness (2-3 недели) 🟢
**DevOps и мониторинг**
1. ✅ Comprehensive test suite (70%+ coverage)
2. ✅ Database backup automation
3. ✅ Monitoring & alerting setup (Prometheus + Grafana)
4. ✅ Error tracking (Sentry integration)
5. ✅ CI/CD pipeline (GitHub Actions)
6. ✅ Environment configuration validation
7. ✅ Structured logging (loguru + JSON)
8. ✅ Documentation completion
9. ✅ Load testing (Locust)
10. ✅ Security penetration testing
**Результат:** Готово к продакшену
---
### ФАЗА 4: Enhancements (постоянно) 💚
**Улучшения UX**
1. Video transcoding & adaptive streaming (HLS)
2. Albums feature
3. Tags system
4. Advanced search & filters (date range, type, size)
5. Share analytics & permissions
6. Image optimization (WebP conversion)
7. Duplicate detection (SHA256 deduplication)
8. Two-factor authentication (TOTP)
9. Email verification
10. Shared albums (collaborative)
**Результат:** Feature-rich продукт
---
## 📋 11. SUMMARY: КРИТИЧЕСКИЕ ДЕЙСТВИЯ
### Что сделать ПРЯМО СЕЙЧАС перед продакшеном:
1. **Установить rate limiting** - защита от brute force
2. **Реализовать soft delete** - требование спецификации
3. **Добавить storage quota** - предотвратить злоупотребление
4. **Включить S3 encryption** - защита данных в покое
5. **Стримить файлы** - предотвратить OOM
6. **Добавить тесты** - минимум 50% coverage
7. **Настроить backups** - защита от потери данных
8. **Валидировать env vars** - предотвратить misconfig
### Оценка времени:
- **Минимум для продакшена:** 4 недели (Фаза 1 + критичное из Фазы 2)
- **Полное MVP:** 6-8 недель (Фазы 1-3)
- **Production-grade:** 10-12 недель (все фазы)
---
## 🎓 ЗАКЛЮЧЕНИЕ
Проект **ITCloud** демонстрирует хорошую архитектурную основу и правильные практики в некоторых областях:
**Что сделано хорошо:**
- Clean Architecture с четким разделением слоев
- Безопасное хеширование паролей (Argon2)
- Защита от path traversal
- JWT аутентификация
- Pre-signed URLs для S3
- Async/await паттерны
- Docker setup
**Критические проблемы:**
- Отсутствует rate limiting (BRUTE FORCE VULNERABLE)
- Нет soft delete (SPEC VIOLATION)
- Нет storage quota (ABUSE VULNERABLE)
- Memory exhaustion риски
- Минимальное тестирование
- Нет token revocation
**ИТОГОВАЯ ОЦЕНКА:** 6/10 (хорошая база, но не готов к продакшену)
**РЕКОМЕНДАЦИЯ:** Выполнить Фазу 1 и критичные элементы Фазы 2 перед любым публичным запуском.
---
**Дата аудита:** 2026-01-05
**Версия документа:** 1.0
**Следующий аудит:** После внедрения рекомендаций Фазы 1