25 lines
650 B
Python
25 lines
650 B
Python
"""Notification operations for SNS"""
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
import boto3
|
|
|
|
sns = boto3.client('sns')
|
|
TOPIC = os.environ['SNS_TOPIC_ARN']
|
|
ENV = os.environ.get('ENVIRONMENT', 'prod')
|
|
|
|
|
|
def send_notification(filename: str, result: dict, status: str) -> None:
|
|
"""Send SNS notification"""
|
|
sns.publish(
|
|
TopicArn=TOPIC,
|
|
Subject=f'Image Processing {status.title()}',
|
|
Message=json.dumps({
|
|
'filename': filename,
|
|
'status': status,
|
|
'details': result,
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'environment': ENV
|
|
}, indent=2)
|
|
)
|