"""Batch operations service for bulk asset management.""" import io import os import tempfile import zipfile from contextlib import contextmanager from pathlib import Path from typing import AsyncIterator, Optional from fastapi import HTTPException, status from loguru import logger from sqlalchemy.ext.asyncio import AsyncSession from app.domain.models import Asset from app.infra.s3_client import S3Client from app.repositories.asset_repository import AssetRepository from app.repositories.folder_repository import FolderRepository @contextmanager def temp_file_manager(): """ Context manager for automatic temp file cleanup (DRY principle). Yields: List to store temp file paths Usage: with temp_file_manager() as temp_files: temp_files.append(path) # Files automatically deleted on exit """ temp_files = [] try: yield temp_files finally: for file_path in temp_files: try: Path(file_path).unlink(missing_ok=True) logger.debug(f"Cleaned up temp file: {file_path}") except Exception as e: logger.warning(f"Failed to cleanup temp file {file_path}: {e}") class BatchOperationsService: """ Service for batch asset operations (SOLID: Single Responsibility). Handles bulk delete, move, and download operations with streaming support. """ def __init__(self, session: AsyncSession, s3_client: S3Client): """ Initialize batch operations service. Args: session: Database session s3_client: S3 client instance """ self.asset_repo = AssetRepository(session) self.folder_repo = FolderRepository(session) self.s3_client = s3_client async def delete_assets_batch( self, user_id: str, asset_ids: list[str], ) -> dict: """ Delete multiple assets (move to trash bucket, delete from DB). Args: user_id: User ID asset_ids: List of asset IDs to delete Returns: Dict with deletion statistics Raises: HTTPException: If no assets found or permission denied """ if not asset_ids: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No asset IDs provided", ) # Get assets with ownership check assets = await self.asset_repo.get_by_ids(user_id, asset_ids) if not assets: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No assets found or permission denied", ) deleted_count = 0 failed_count = 0 for asset in assets: try: # Move files to trash bucket self.s3_client.move_to_trash(asset.storage_key_original) if asset.storage_key_thumb: self.s3_client.move_to_trash(asset.storage_key_thumb) # Delete from database await self.asset_repo.delete(asset) deleted_count += 1 except Exception as e: logger.error(f"Failed to delete asset {asset.id}: {e}") failed_count += 1 return { "deleted": deleted_count, "failed": failed_count, "total": len(asset_ids), } async def move_assets_batch( self, user_id: str, asset_ids: list[str], target_folder_id: Optional[str], ) -> dict: """ Move multiple assets to a folder. Args: user_id: User ID asset_ids: List of asset IDs to move target_folder_id: Target folder ID (None for root) Returns: Dict with move statistics Raises: HTTPException: If no assets found or permission denied """ if not asset_ids: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No asset IDs provided", ) # Validate target folder if specified if target_folder_id: folder = await self.folder_repo.get_by_id(target_folder_id) if not folder or folder.user_id != user_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Target folder not found", ) # Update folder_id for all assets updated_count = await self.asset_repo.update_folder_batch( user_id=user_id, asset_ids=asset_ids, folder_id=target_folder_id, ) return { "moved": updated_count, "requested": len(asset_ids), } async def download_assets_batch( self, user_id: str, asset_ids: list[str], ) -> tuple[bytes, str]: """ Download multiple assets as a ZIP archive. Uses streaming to avoid loading entire archive in memory. Args: user_id: User ID asset_ids: List of asset IDs to download Returns: Tuple of (zip_data, filename) Raises: HTTPException: If no assets found or permission denied """ if not asset_ids: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No asset IDs provided", ) # Get assets with ownership check assets = await self.asset_repo.get_by_ids(user_id, asset_ids) if not assets: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No assets found or permission denied", ) # Create ZIP archive in memory zip_buffer = io.BytesIO() with temp_file_manager() as temp_files: try: with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: # Track filenames to avoid duplicates used_names = set() for asset in assets: try: # Download file from S3 response = self.s3_client.client.get_object( Bucket=self.s3_client.bucket, Key=asset.storage_key_original, ) file_data = response["Body"].read() # Generate unique filename base_name = asset.original_filename unique_name = base_name counter = 1 while unique_name in used_names: name, ext = os.path.splitext(base_name) unique_name = f"{name}_{counter}{ext}" counter += 1 used_names.add(unique_name) # Add to ZIP zip_file.writestr(unique_name, file_data) logger.debug(f"Added {unique_name} to ZIP archive") except Exception as e: logger.error(f"Failed to add asset {asset.id} to ZIP: {e}") # Continue with other files # Get ZIP data zip_data = zip_buffer.getvalue() # Generate filename filename = f"download_{len(assets)}_files.zip" return zip_data, filename except Exception as e: logger.exception(f"Failed to create ZIP archive: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create archive", ) async def download_folder( self, user_id: str, folder_id: str, ) -> tuple[bytes, str]: """ Download all assets in a folder as a ZIP archive. Args: user_id: User ID folder_id: Folder ID Returns: Tuple of (zip_data, filename) Raises: HTTPException: If folder not found or permission denied """ # Verify folder ownership folder = await self.folder_repo.get_by_id(folder_id) if not folder or folder.user_id != user_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Folder not found", ) # Get all assets in folder (with reasonable limit) assets = await self.asset_repo.list_by_folder( user_id=user_id, folder_id=folder_id, limit=1000, # Reasonable limit to prevent memory issues ) if not assets: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Folder is empty", ) # Get asset IDs and use existing download method asset_ids = [asset.id for asset in assets] zip_data, _ = await self.download_assets_batch(user_id, asset_ids) # Use folder name in filename filename = f"{folder.name}.zip" return zip_data, filename