SNS Event Publishing Pattern¶
Purpose¶
Standardize how modules publish domain events to SNS, ensuring consistent message structure, naming, and filtering across the platform.
Event Type Enum¶
Every module defines its events as an Enum. This is the source of truth for all event types the module can publish or consume.
# shell/utils/sns_ops.py
from enum import Enum
class EventType(Enum):
# Lifecycle events
JOB_STARTED = "JOB_STARTED"
JOB_FINISHED = "JOB_FINISHED"
# Processing events
DOCUMENT_PROCESSED = "DOCUMENT_PROCESSED"
DOCUMENT_LOADED = "DOCUMENT_LOADED"
# Batch events
BATCH_END_START = "BATCH_END_START"
BATCH_END_FINISHED = "BATCH_END_FINISHED"
# Completion events
LOADER_FINISHED = "LOADER_FINISHED"
LOADER_CANCELLED = "LOADER_CANCELLED"
# Cancellation
IMPORT_CANCELLED = "IMPORT_CANCELLED"
Standard Message Structure¶
Every SNS message follows this structure:
message_json = {
"source": "loader", # Module name — identifies the publisher
"jobId": str, # Job identifier
"caseId": int, # Case (tenant) identifier
"batchId": int, # Batch identifier
"documentId": str, # Document being processed
"exhibitIds": list[int], # Created exhibit IDs
"eventType": str, # EventType enum value
"status": str, # "SUCCESS", "FAILED", "CANCELLED"
"eventDetail": dict | str, # Event-specific payload
"timestamp": str, # ISO 8601 timestamp
}
SNS Publisher Class¶
class SNS:
"""Stateful publisher — initialized with base message, updated per event."""
message_json: ClassVar[dict] = {}
topic_arn: ClassVar[str] = ""
def __init__(self, message: dict):
"""Initialize with base message from SQS record."""
SNS.message_json = {
"source": MODULE_NAME,
"jobId": message.get("jobId"),
"caseId": message.get("caseId"),
"batchId": message.get("batchId"),
"eventType": message.get("eventType"),
"status": "SUCCESS",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
SNS.topic_arn = os.environ["SNS_TOPIC_ARN"]
@classmethod
def publish_sns_message(cls, updates: dict) -> None:
"""Publish event with updates merged into base message."""
merged = {**cls.message_json, **updates}
cls._publish(merged)
@classmethod
def update_event_details(cls, updates: dict) -> None:
"""Update base message with additional event details."""
cls.message_json.update(updates)
@classmethod
def _publish(cls, message: dict) -> None:
"""Publish to SNS with message attributes for filtering."""
client = boto3.client("sns")
client.publish(
TopicArn=cls.topic_arn,
Message=json.dumps(message),
MessageAttributes={
"eventType": {
"DataType": "String",
"StringValue": message["eventType"],
},
"caseId": {
"DataType": "Number",
"StringValue": str(message["caseId"]),
},
"batchId": {
"DataType": "Number",
"StringValue": str(message["batchId"]),
},
},
)
SNS Filter Policies¶
Subscribers use filter policies to receive only relevant events:
filter_policy = {
"eventType": [
"DOCUMENT_PROCESSED",
"BATCH_DOCUMENT_RELATIONS",
"BATCH_ATTACHMENTS",
],
"caseId": [{"numeric": ["=", case_id]}],
"batchId": [{"numeric": ["=", batch_id]}],
}
# Only messages matching ALL conditions are delivered to this subscriber's queue
Naming Convention¶
- Events are past tense facts:
DOCUMENT_LOADED,JOB_FINISHED - NOT commands:
,LOAD_DOCUMENTFINISH_JOB - Module prefix when needed for clarity:
LOADER_FINISHED,LOADER_CANCELLED - Use SCREAMING_SNAKE_CASE for enum values
Key Rules¶
- Always publish with MessageAttributes — filter policies depend on them
- Include caseId and batchId in every event — they're the primary routing keys
- eventDetail is free-form — module-specific payload, but always a dict or JSON string
- status is required — "SUCCESS", "FAILED", or "CANCELLED"
- timestamp in UTC ISO 8601 — for ordering and debugging
Ask the Architecture
×
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.