Skip to content

Structured Logging Pattern

Purpose

Produce consistent, machine-parseable JSON logs with distributed context (caseId, batchId, documentId) for CloudWatch querying and alerting.

Architecture

Lambda invocation
configure_log_context(caseId=..., batchId=..., ...)  ← Set once per record
log_message("info", "Processing document")  ← Context auto-included
JSONFormatter → stdout → CloudWatch

Log Context (Module-Level Dict)

Context is set once per SQS record and persists for all log calls during that record's processing:

# core/utils/helpers.py

_log_context: dict[str, str | list[int]] = {
    "caseId": "nil",
    "batchId": "nil",
    "jobId": "nil",
    "documentId": "nil",
    "exhibitIds": "[]",
}
MODULE_NAME = "docLoader"  # Module identifier in all log entries


def configure_log_context(
    *,
    caseId: str | None = None,
    batchId: str | None = None,
    jobId: str | None = None,
    documentId: str | None = None,
    exhibitIds: list[int] | None = None,
) -> None:
    """Set context fields. Only provided fields are updated."""
    if caseId is not None:
        _log_context["caseId"] = caseId
    if batchId is not None:
        _log_context["batchId"] = batchId
    if jobId is not None:
        _log_context["jobId"] = jobId
    if documentId is not None:
        _log_context["documentId"] = documentId
    if exhibitIds is not None:
        _log_context["exhibitIds"] = str(exhibitIds)

JSON Formatter

Custom formatter that produces CloudWatch-friendly JSON with level normalization and null handling:

def create_json_formatter():
    class JSONFormatter(logging.Formatter):
        def format(self, record):
            ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

            # Normalize log levels for consistency
            lvl = record.levelname
            if lvl == "WARNING":
                lvl = "WARN"
            elif lvl == "CRITICAL":
                lvl = "FATAL"

            # Normalize None/null to "nil" for consistent querying
            def norm(val):
                if val is None or (isinstance(val, str) and val.lower() == "none"):
                    return "nil"
                return val

            log_entry = {
                "timestamp": ts,
                "level": lvl,
                "message": record.getMessage() or "",
                "caseId": norm(getattr(record, "caseId", _log_context.get("caseId"))),
                "batchId": norm(getattr(record, "batchId", _log_context.get("batchId"))),
                "jobId": norm(getattr(record, "jobId", _log_context.get("jobId"))),
                "documentId": norm(getattr(record, "documentId", _log_context.get("documentId"))),
                "moduleName": MODULE_NAME,
                "exhibitIds": str(
                    getattr(record, "exhibitIds", _log_context.get("exhibitIds", "[]"))
                ),
            }

            # Append any extra fields from the log call, excluding standard LogRecord noise
            standard_fields = {
                "name", "msg", "args", "levelname", "levelno", "pathname",
                "filename", "module", "exc_info", "exc_text", "stack_info",
                "lineno", "funcName", "created", "msecs", "relativeCreated",
                "thread", "threadName", "processName", "process", "taskName",
            }
            for key, val in record.__dict__.items():
                if key not in log_entry and key not in standard_fields:
                    log_entry[key] = val

            return json.dumps(log_entry)

    return JSONFormatter()

Logger Setup

Global logger per Lambda instance — efficient because Lambda handles one request at a time:

_logger: logging.Logger | None = None

def get_logger():
    global _logger
    if _logger:
        return _logger

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG if Config.DEBUGGER else logging.INFO)

    # Silence noisy AWS SDK loggers
    for noisy in ("boto3", "botocore", "s3transfer", "urllib3"):
        logging.getLogger(noisy).setLevel(logging.WARNING)

    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(create_json_formatter())
    logger.handlers.clear()
    logger.addHandler(handler)

    _logger = logger
    return logger

Log Message API

Single function for all logging — context auto-included:

def log_message(level, message, extra=None, exc_info=False):
    """Structured log with context and optional exception trace."""
    logger = get_logger()
    payload = _log_context.copy()
    if extra:
        payload.update(extra)

    methods = {
        "debug": logger.debug,
        "info": logger.info,
        "warning": logger.warning,
        "error": logger.error,
        "exception": logger.exception,
    }
    fn = methods.get(level.lower(), logger.info)
    fn(message, extra=payload, exc_info=exc_info)
    sys.stdout.flush()  # Ensure CloudWatch gets the log immediately

Output Format

{
    "timestamp": "2024-03-15T14:30:22Z",
    "level": "INFO",
    "message": "Processing document",
    "caseId": "1001",
    "batchId": "42",
    "jobId": "job-abc-123",
    "documentId": "doc-xyz-456",
    "moduleName": "docLoader",
    "exhibitIds": "[101, 102]"
}

CloudWatch Insights Queries

-- Find all errors for a specific case
fields @timestamp, level, message, documentId
| filter caseId = "1001" and level = "ERROR"
| sort @timestamp desc

-- Track a document through its pipeline
fields @timestamp, level, message, moduleName
| filter documentId = "doc-xyz-456"
| sort @timestamp asc

-- Count errors by module
stats count(*) as errors by moduleName
| filter level = "ERROR"

Key Rules

  1. Use log_message() everywhere — never use print() or logger.info() directly
  2. Call configure_log_context() once per record — at the start of handler processing
  3. Reset context in handler finally block — prevent leaking between records
  4. Use exc_info=True for unexpected errors — captures full stack trace
  5. Silence noisy loggers — boto3, botocore, urllib3 at WARNING level
  6. Flush stdout — Lambda may buffer, sys.stdout.flush() ensures delivery
  7. "nil" for missing values — consistent querying, not null/None/empty string
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.