2025-12-30 13:35:19 +01:00
|
|
|
"""Security utilities for authentication and authorization."""
|
|
|
|
|
|
2025-12-30 14:51:56 +01:00
|
|
|
import hashlib
|
|
|
|
|
from datetime import datetime, timedelta, timezone
|
2025-12-30 13:35:19 +01:00
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
from jose import JWTError, jwt
|
|
|
|
|
from passlib.context import CryptContext
|
|
|
|
|
|
|
|
|
|
from app.infra.config import get_settings
|
|
|
|
|
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
|
|
|
|
# Password hashing context
|
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
|
|
|
"""
|
2025-12-30 14:51:56 +01:00
|
|
|
Hash a password using bcrypt with SHA256 pre-hashing.
|
2025-12-30 13:35:19 +01:00
|
|
|
|
|
|
|
|
Args:
|
2025-12-30 14:51:56 +01:00
|
|
|
password: Plain text password (any length supported)
|
2025-12-30 13:35:19 +01:00
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Hashed password
|
2025-12-30 14:51:56 +01:00
|
|
|
|
|
|
|
|
Note:
|
|
|
|
|
Uses SHA256 pre-hashing to support passwords of any length,
|
|
|
|
|
avoiding bcrypt's 72-byte limitation.
|
2025-12-30 13:35:19 +01:00
|
|
|
"""
|
2025-12-30 14:51:56 +01:00
|
|
|
# Pre-hash with SHA256 to support unlimited password length
|
|
|
|
|
# This is a common technique to work around bcrypt's 72-byte limit
|
|
|
|
|
password_hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
|
|
|
|
|
return pwd_context.hash(password_hash)
|
2025-12-30 13:35:19 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Verify a password against its hash.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
plain_password: Plain text password
|
|
|
|
|
hashed_password: Hashed password to verify against
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
True if password matches, False otherwise
|
|
|
|
|
"""
|
2025-12-30 14:51:56 +01:00
|
|
|
# Apply same SHA256 pre-hashing as hash_password
|
|
|
|
|
password_hash = hashlib.sha256(plain_password.encode('utf-8')).hexdigest()
|
|
|
|
|
return pwd_context.verify(password_hash, hashed_password)
|
2025-12-30 13:35:19 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
|
|
|
|
"""
|
|
|
|
|
Create a JWT access token.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
data: Data to encode in the token
|
|
|
|
|
expires_delta: Optional expiration time delta
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Encoded JWT token
|
|
|
|
|
"""
|
|
|
|
|
to_encode = data.copy()
|
|
|
|
|
if expires_delta:
|
2025-12-30 14:51:56 +01:00
|
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
2025-12-30 13:35:19 +01:00
|
|
|
else:
|
2025-12-30 14:51:56 +01:00
|
|
|
expire = datetime.now(timezone.utc) + timedelta(seconds=settings.jwt_access_ttl_seconds)
|
2025-12-30 13:35:19 +01:00
|
|
|
|
|
|
|
|
to_encode.update({"exp": expire, "type": "access"})
|
|
|
|
|
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_algorithm)
|
|
|
|
|
return encoded_jwt
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_refresh_token(data: dict) -> str:
|
|
|
|
|
"""
|
|
|
|
|
Create a JWT refresh token.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
data: Data to encode in the token
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Encoded JWT token
|
|
|
|
|
"""
|
|
|
|
|
to_encode = data.copy()
|
2025-12-30 14:51:56 +01:00
|
|
|
expire = datetime.now(timezone.utc) + timedelta(seconds=settings.jwt_refresh_ttl_seconds)
|
2025-12-30 13:35:19 +01:00
|
|
|
to_encode.update({"exp": expire, "type": "refresh"})
|
|
|
|
|
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_algorithm)
|
|
|
|
|
return encoded_jwt
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def decode_token(token: str) -> Optional[dict]:
|
|
|
|
|
"""
|
|
|
|
|
Decode and verify a JWT token.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
token: JWT token to decode
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Decoded token payload or None if invalid
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
payload = jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm])
|
|
|
|
|
return payload
|
|
|
|
|
except JWTError:
|
|
|
|
|
return None
|