"""Storage operations for DynamoDB and S3""" import os import time import boto3 from config import DYNAMODB_TTL_SECONDS dynamodb = boto3.resource('dynamodb') s3 = boto3.client('s3') TABLE = os.environ['DYNAMODB_TABLE'] ENV = os.environ.get('ENVIRONMENT', 'prod') def write_metadata(filename: str, processed_filename: str, result: dict) -> None: """Write processing metadata to DynamoDB""" dynamodb.Table(TABLE).put_item(Item={ 'filename': filename, 'processed_filename': processed_filename, 'timestamp': result['timestamp'], 'processing_type': result['processing_type'], 'status': result['status'], 'original_size': str(result['original_size']), 'processed_size': str(result['processed_size']), 'hash': result.get('hash', ''), 'ttl': int(time.time()) + DYNAMODB_TTL_SECONDS, 'environment': ENV }) def upload_processed(bucket: str, key: str, data: bytes, content_type: str, metadata: dict) -> None: """Upload processed image to S3""" s3.put_object( Bucket=bucket, Key=key, Body=data, ContentType=content_type, Metadata=metadata ) def get_object(bucket: str, key: str) -> tuple[bytes, int]: """Get object from S3""" obj = s3.get_object(Bucket=bucket, Key=key) return obj['Body'].read(), obj['ContentLength']