itcloud/backend/src/app/services/asset_service.py

326 lines
9.4 KiB
Python
Raw Normal View History

2025-12-30 13:35:19 +01:00
"""Asset management service."""
import os
2025-12-30 15:53:16 +01:00
from typing import AsyncIterator, Optional, Tuple
2025-12-30 13:35:19 +01:00
from fastapi import HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.models import Asset, AssetStatus, AssetType
from app.infra.s3_client import S3Client
from app.repositories.asset_repository import AssetRepository
class AssetService:
"""Service for asset management operations."""
def __init__(self, session: AsyncSession, s3_client: S3Client):
"""
Initialize asset service.
Args:
session: Database session
s3_client: S3 client instance
"""
self.asset_repo = AssetRepository(session)
self.s3_client = s3_client
def _get_asset_type(self, content_type: str) -> AssetType:
"""Determine asset type from content type."""
if content_type.startswith("image/"):
return AssetType.PHOTO
elif content_type.startswith("video/"):
return AssetType.VIDEO
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unsupported content type",
)
async def create_upload(
self,
user_id: str,
original_filename: str,
content_type: str,
size_bytes: int,
) -> tuple[Asset, dict]:
"""
Create an asset and generate pre-signed upload URL.
Args:
user_id: Owner user ID
original_filename: Original filename
content_type: MIME type
size_bytes: File size in bytes
Returns:
Tuple of (asset, presigned_post_data)
"""
asset_type = self._get_asset_type(content_type)
_, ext = os.path.splitext(original_filename)
# Create asset record
asset = await self.asset_repo.create(
user_id=user_id,
asset_type=asset_type,
original_filename=original_filename,
content_type=content_type,
size_bytes=size_bytes,
storage_key_original="", # Will be set after upload
)
# Generate storage key
storage_key = self.s3_client.generate_storage_key(
user_id=user_id,
asset_id=asset.id,
prefix="o",
extension=ext,
)
# Update asset with storage key
asset.storage_key_original = storage_key
await self.asset_repo.update(asset)
# Generate pre-signed POST
presigned_post = self.s3_client.generate_presigned_post(
storage_key=storage_key,
content_type=content_type,
max_size=size_bytes,
)
return asset, presigned_post
async def finalize_upload(
self,
user_id: str,
asset_id: str,
etag: Optional[str] = None,
sha256: Optional[str] = None,
) -> Asset:
"""
Finalize upload and mark asset as ready.
Args:
user_id: User ID
asset_id: Asset ID
etag: Optional S3 ETag
sha256: Optional file SHA256 hash
Returns:
Updated asset
Raises:
HTTPException: If asset not found or not authorized
"""
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Asset not found",
)
# Verify file was uploaded
if not self.s3_client.object_exists(asset.storage_key_original):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="File not found in storage",
)
asset.status = AssetStatus.READY
if sha256:
asset.sha256 = sha256
await self.asset_repo.update(asset)
return asset
async def list_assets(
self,
user_id: str,
limit: int = 50,
cursor: Optional[str] = None,
asset_type: Optional[AssetType] = None,
) -> tuple[list[Asset], Optional[str], bool]:
"""
List user's assets.
Args:
user_id: User ID
limit: Maximum number of results
cursor: Pagination cursor
asset_type: Filter by asset type
Returns:
Tuple of (assets, next_cursor, has_more)
"""
assets = await self.asset_repo.list_by_user(
user_id=user_id,
limit=limit + 1, # Fetch one more to check if there are more
cursor=cursor,
asset_type=asset_type,
)
has_more = len(assets) > limit
if has_more:
assets = assets[:limit]
next_cursor = assets[-1].id if has_more and assets else None
return assets, next_cursor, has_more
async def get_asset(self, user_id: str, asset_id: str) -> Asset:
"""
Get asset by ID.
Args:
user_id: User ID
asset_id: Asset ID
Returns:
Asset instance
Raises:
HTTPException: If asset not found or not authorized
"""
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Asset not found",
)
return asset
async def get_download_url(
self, user_id: str, asset_id: str, kind: str = "original"
) -> str:
"""
Get pre-signed download URL for an asset.
Args:
user_id: User ID
asset_id: Asset ID
kind: 'original' or 'thumb'
Returns:
Pre-signed download URL
Raises:
HTTPException: If asset not found or not authorized
"""
asset = await self.get_asset(user_id, asset_id)
if kind == "thumb":
storage_key = asset.storage_key_thumb
if not storage_key:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Thumbnail not available",
)
else:
storage_key = asset.storage_key_original
return self.s3_client.generate_presigned_url(storage_key)
2025-12-30 15:53:16 +01:00
async def stream_media(
self, user_id: str, asset_id: str, kind: str = "original"
) -> Tuple[AsyncIterator[bytes], str, int]:
"""
Stream media file content from S3.
Args:
user_id: User ID
asset_id: Asset ID
kind: 'original' or 'thumb'
Returns:
Tuple of (file_stream, content_type, content_length)
Raises:
HTTPException: If asset not found or not authorized
"""
asset = await self.get_asset(user_id, asset_id)
if kind == "thumb":
storage_key = asset.storage_key_thumb
content_type = "image/jpeg" # thumbnails are always JPEG
if not storage_key:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Thumbnail not available",
)
else:
storage_key = asset.storage_key_original
content_type = asset.content_type
# Stream file from S3
file_stream, content_length = await self.s3_client.stream_object(storage_key)
return file_stream, content_type, content_length
2025-12-30 13:35:19 +01:00
async def delete_asset(self, user_id: str, asset_id: str) -> Asset:
"""
Soft delete an asset.
Args:
user_id: User ID
asset_id: Asset ID
Returns:
Updated asset
"""
asset = await self.get_asset(user_id, asset_id)
return await self.asset_repo.soft_delete(asset)
async def restore_asset(self, user_id: str, asset_id: str) -> Asset:
"""
Restore a soft-deleted asset.
Args:
user_id: User ID
asset_id: Asset ID
Returns:
Updated asset
"""
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Asset not found",
)
2025-12-30 14:00:44 +01:00
if not asset.deleted_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Asset is not deleted",
)
2025-12-30 13:35:19 +01:00
return await self.asset_repo.restore(asset)
async def purge_asset(self, user_id: str, asset_id: str) -> None:
"""
Permanently delete an asset.
Args:
user_id: User ID
asset_id: Asset ID
"""
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Asset not found",
)
if not asset.deleted_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Asset must be deleted before purging",
)
# Delete from S3
self.s3_client.delete_object(asset.storage_key_original)
if asset.storage_key_thumb:
self.s3_client.delete_object(asset.storage_key_thumb)
# Delete from database
await self.asset_repo.delete(asset)