Structured Logging Pattern¶
Purpose¶
Produce consistent, machine-parseable JSON logs with distributed context (caseId, batchId, documentId) for CloudWatch querying and alerting.
Architecture¶
Lambda invocation
│
▼
configure_log_context(caseId=..., batchId=..., ...) ← Set once per record
│
▼
log_message("info", "Processing document") ← Context auto-included
│
▼
JSONFormatter → stdout → CloudWatch
Log Context (Module-Level Dict)¶
Context is set once per SQS record and persists for all log calls during that record's processing:
# core/utils/helpers.py
_log_context: dict[str, str | list[int]] = {
"caseId": "nil",
"batchId": "nil",
"jobId": "nil",
"documentId": "nil",
"exhibitIds": "[]",
}
MODULE_NAME = "docLoader" # Module identifier in all log entries
def configure_log_context(
*,
caseId: str | None = None,
batchId: str | None = None,
jobId: str | None = None,
documentId: str | None = None,
exhibitIds: list[int] | None = None,
) -> None:
"""Set context fields. Only provided fields are updated."""
if caseId is not None:
_log_context["caseId"] = caseId
if batchId is not None:
_log_context["batchId"] = batchId
if jobId is not None:
_log_context["jobId"] = jobId
if documentId is not None:
_log_context["documentId"] = documentId
if exhibitIds is not None:
_log_context["exhibitIds"] = str(exhibitIds)
JSON Formatter¶
Custom formatter that produces CloudWatch-friendly JSON with level normalization and null handling:
def create_json_formatter():
class JSONFormatter(logging.Formatter):
def format(self, record):
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Normalize log levels for consistency
lvl = record.levelname
if lvl == "WARNING":
lvl = "WARN"
elif lvl == "CRITICAL":
lvl = "FATAL"
# Normalize None/null to "nil" for consistent querying
def norm(val):
if val is None or (isinstance(val, str) and val.lower() == "none"):
return "nil"
return val
log_entry = {
"timestamp": ts,
"level": lvl,
"message": record.getMessage() or "",
"caseId": norm(getattr(record, "caseId", _log_context.get("caseId"))),
"batchId": norm(getattr(record, "batchId", _log_context.get("batchId"))),
"jobId": norm(getattr(record, "jobId", _log_context.get("jobId"))),
"documentId": norm(getattr(record, "documentId", _log_context.get("documentId"))),
"moduleName": MODULE_NAME,
"exhibitIds": str(
getattr(record, "exhibitIds", _log_context.get("exhibitIds", "[]"))
),
}
# Append any extra fields from the log call, excluding standard LogRecord noise
standard_fields = {
"name", "msg", "args", "levelname", "levelno", "pathname",
"filename", "module", "exc_info", "exc_text", "stack_info",
"lineno", "funcName", "created", "msecs", "relativeCreated",
"thread", "threadName", "processName", "process", "taskName",
}
for key, val in record.__dict__.items():
if key not in log_entry and key not in standard_fields:
log_entry[key] = val
return json.dumps(log_entry)
return JSONFormatter()
Logger Setup¶
Global logger per Lambda instance — efficient because Lambda handles one request at a time:
_logger: logging.Logger | None = None
def get_logger():
global _logger
if _logger:
return _logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG if Config.DEBUGGER else logging.INFO)
# Silence noisy AWS SDK loggers
for noisy in ("boto3", "botocore", "s3transfer", "urllib3"):
logging.getLogger(noisy).setLevel(logging.WARNING)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(create_json_formatter())
logger.handlers.clear()
logger.addHandler(handler)
_logger = logger
return logger
Log Message API¶
Single function for all logging — context auto-included:
def log_message(level, message, extra=None, exc_info=False):
"""Structured log with context and optional exception trace."""
logger = get_logger()
payload = _log_context.copy()
if extra:
payload.update(extra)
methods = {
"debug": logger.debug,
"info": logger.info,
"warning": logger.warning,
"error": logger.error,
"exception": logger.exception,
}
fn = methods.get(level.lower(), logger.info)
fn(message, extra=payload, exc_info=exc_info)
sys.stdout.flush() # Ensure CloudWatch gets the log immediately
Output Format¶
{
"timestamp": "2024-03-15T14:30:22Z",
"level": "INFO",
"message": "Processing document",
"caseId": "1001",
"batchId": "42",
"jobId": "job-abc-123",
"documentId": "doc-xyz-456",
"moduleName": "docLoader",
"exhibitIds": "[101, 102]"
}
CloudWatch Insights Queries¶
-- Find all errors for a specific case
fields @timestamp, level, message, documentId
| filter caseId = "1001" and level = "ERROR"
| sort @timestamp desc
-- Track a document through its pipeline
fields @timestamp, level, message, moduleName
| filter documentId = "doc-xyz-456"
| sort @timestamp asc
-- Count errors by module
stats count(*) as errors by moduleName
| filter level = "ERROR"
Key Rules¶
- Use
log_message()everywhere — never useprint()orlogger.info()directly - Call
configure_log_context()once per record — at the start of handler processing - Reset context in handler
finallyblock — prevent leaking between records - Use
exc_info=Truefor unexpected errors — captures full stack trace - Silence noisy loggers — boto3, botocore, urllib3 at WARNING level
- Flush stdout — Lambda may buffer,
sys.stdout.flush()ensures delivery - "nil" for missing values — consistent querying, not null/None/empty string
Ask the Architecture
×
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.