Skip to content

SNS Event Publishing Pattern

Purpose

Standardize how modules publish domain events to SNS, ensuring consistent message structure, naming, and filtering across the platform.

Event Type Enum

Every module defines its events as an Enum. This is the source of truth for all event types the module can publish or consume.

# shell/utils/sns_ops.py

from enum import Enum

class EventType(Enum):
    # Lifecycle events
    JOB_STARTED = "JOB_STARTED"
    JOB_FINISHED = "JOB_FINISHED"

    # Processing events
    DOCUMENT_PROCESSED = "DOCUMENT_PROCESSED"
    DOCUMENT_LOADED = "DOCUMENT_LOADED"

    # Batch events
    BATCH_END_START = "BATCH_END_START"
    BATCH_END_FINISHED = "BATCH_END_FINISHED"

    # Completion events
    LOADER_FINISHED = "LOADER_FINISHED"
    LOADER_CANCELLED = "LOADER_CANCELLED"

    # Cancellation
    IMPORT_CANCELLED = "IMPORT_CANCELLED"

Standard Message Structure

Every SNS message follows this structure:

message_json = {
    "source": "loader",              # Module name — identifies the publisher
    "jobId": str,                    # Job identifier
    "caseId": int,                   # Case (tenant) identifier
    "batchId": int,                  # Batch identifier
    "documentId": str,               # Document being processed
    "exhibitIds": list[int],         # Created exhibit IDs
    "eventType": str,                # EventType enum value
    "status": str,                   # "SUCCESS", "FAILED", "CANCELLED"
    "eventDetail": dict | str,       # Event-specific payload
    "timestamp": str,                # ISO 8601 timestamp
}

SNS Publisher Class

class SNS:
    """Stateful publisher — initialized with base message, updated per event."""

    message_json: ClassVar[dict] = {}
    topic_arn: ClassVar[str] = ""

    def __init__(self, message: dict):
        """Initialize with base message from SQS record."""
        SNS.message_json = {
            "source": MODULE_NAME,
            "jobId": message.get("jobId"),
            "caseId": message.get("caseId"),
            "batchId": message.get("batchId"),
            "eventType": message.get("eventType"),
            "status": "SUCCESS",
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
        SNS.topic_arn = os.environ["SNS_TOPIC_ARN"]

    @classmethod
    def publish_sns_message(cls, updates: dict) -> None:
        """Publish event with updates merged into base message."""
        merged = {**cls.message_json, **updates}
        cls._publish(merged)

    @classmethod
    def update_event_details(cls, updates: dict) -> None:
        """Update base message with additional event details."""
        cls.message_json.update(updates)

    @classmethod
    def _publish(cls, message: dict) -> None:
        """Publish to SNS with message attributes for filtering."""
        client = boto3.client("sns")
        client.publish(
            TopicArn=cls.topic_arn,
            Message=json.dumps(message),
            MessageAttributes={
                "eventType": {
                    "DataType": "String",
                    "StringValue": message["eventType"],
                },
                "caseId": {
                    "DataType": "Number",
                    "StringValue": str(message["caseId"]),
                },
                "batchId": {
                    "DataType": "Number",
                    "StringValue": str(message["batchId"]),
                },
            },
        )

SNS Filter Policies

Subscribers use filter policies to receive only relevant events:

filter_policy = {
    "eventType": [
        "DOCUMENT_PROCESSED",
        "BATCH_DOCUMENT_RELATIONS",
        "BATCH_ATTACHMENTS",
    ],
    "caseId": [{"numeric": ["=", case_id]}],
    "batchId": [{"numeric": ["=", batch_id]}],
}

# Only messages matching ALL conditions are delivered to this subscriber's queue

Naming Convention

  • Events are past tense facts: DOCUMENT_LOADED, JOB_FINISHED
  • NOT commands: LOAD_DOCUMENT, FINISH_JOB
  • Module prefix when needed for clarity: LOADER_FINISHED, LOADER_CANCELLED
  • Use SCREAMING_SNAKE_CASE for enum values

Key Rules

  1. Always publish with MessageAttributes — filter policies depend on them
  2. Include caseId and batchId in every event — they're the primary routing keys
  3. eventDetail is free-form — module-specific payload, but always a dict or JSON string
  4. status is required — "SUCCESS", "FAILED", or "CANCELLED"
  5. timestamp in UTC ISO 8601 — for ordering and debugging
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.