Skip to content

Testing Patterns

Purpose

Standardize test structure, fixture patterns, and mocking strategies across all NGE service modules. Tests mock at the shell boundary and exercise core logic directly.

Test Directory Structure

lib/lambda/tests/
├── conftest.py              # Auto-use fixtures: mock AWS, DB sessions, secrets, env vars
├── test_handler.py          # Handler routing, exception dispatch, idempotency
├── test_process.py          # Core business logic (document processing pipeline)
├── test_checkpoints.py      # Checkpoint state machine, resume logic
├── test_db_models.py        # ORM model creation, validation, constraints
├── test_sns_ops.py          # SNS publishing (mocked client)
├── test_sqs_ops.py          # SQS operations (requeue, DLQ, redrive)
├── test_es_ops.py           # Elasticsearch operations (mocked client)
└── test_batch_processor.py  # Batch-level orchestration, chunk dispatch

conftest.py Foundation

Every module's conftest.py establishes the mocking baseline. All fixtures are autouse=True — they apply to every test automatically.

Environment Variables (MUST be first)

# conftest.py — set env vars BEFORE any module imports

import os
import sys

# Environment variables must be set before importing module code
os.environ["ENV"] = "test"
os.environ["RDS_DBNAME"] = "test_db"
os.environ["WRITER_PROXY_ENDPOINT"] = "localhost"
os.environ["READER_PROXY_ENDPOINT"] = "localhost"
os.environ["RDS_SECRET_ARN"] = "arn:aws:secretsmanager:us-east-1:123456789:secret:test"
os.environ["RDS_SECRET_NAME"] = "test-secret"
os.environ["SNS_TOPIC_ARN"] = "arn:aws:sns:us-east-1:123456789:test-topic"
os.environ["SNS_STATUS_MONITOR_ARN"] = "arn:aws:sns:us-east-1:123456789:test-monitor"
os.environ["DEPLOY_REGION"] = "us-east-1"
os.environ["STACK_NAME_PREFIX"] = "test"
os.environ["S3_BUCKET"] = "test-bucket"

# Add source directories to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src", "module_name"))

AWS Service Mocks

import pytest
from unittest.mock import MagicMock, patch


@pytest.fixture(autouse=True)
def mock_aws_secrets():
    """Mock Secrets Manager — prevents real AWS calls."""
    with patch("shell.utils.aws_secrets.get_secret", return_value={
        "username": "test_user",
        "password": "test_pass",
    }):
        yield


@pytest.fixture(autouse=True)
def mock_boto3_clients():
    """Mock all boto3 clients — SNS, SQS, S3."""
    mock_sns = MagicMock()
    mock_sqs = MagicMock()
    mock_s3 = MagicMock()

    def client_factory(service, **kwargs):
        clients = {"sns": mock_sns, "sqs": mock_sqs, "s3": mock_s3}
        return clients.get(service, MagicMock())

    with patch("boto3.client", side_effect=client_factory):
        yield {"sns": mock_sns, "sqs": mock_sqs, "s3": mock_s3}

Database Session Mocks

@pytest.fixture(autouse=True)
def mock_db_sessions():
    """Mock database sessions — writer and reader."""
    mock_session = MagicMock()
    mock_session.query.return_value = mock_session
    mock_session.filter.return_value = mock_session
    mock_session.first.return_value = None
    mock_session.all.return_value = []

    # writer_session and reader_session are context managers
    writer_cm = MagicMock()
    writer_cm.__enter__ = MagicMock(return_value=mock_session)
    writer_cm.__exit__ = MagicMock(return_value=False)

    reader_cm = MagicMock()
    reader_cm.__enter__ = MagicMock(return_value=mock_session)
    reader_cm.__exit__ = MagicMock(return_value=False)

    with patch("shell.db.database.writer_session", return_value=writer_cm):
        with patch("shell.db.database.reader_session", return_value=reader_cm):
            yield mock_session

Testing Core Logic

Test core functions directly — pass mock sessions, assert behavior

# test_process.py

def test_create_exhibit_data():
    """Core logic receives a session, returns exhibit data."""
    mock_session = MagicMock()
    import_attributes = {
        "npcase_id": 1001,
        "batch_id": 1,
        "document_id": "doc-123",
        "filename": "test.pdf",
    }

    result = create_exhibit_data(mock_session, import_attributes)

    assert result["exhibit_id"] is not None
    mock_session.add.assert_called_once()
    mock_session.flush.assert_called_once()


def test_create_exhibit_data_with_existing():
    """Verify idempotency — skip if exhibit already exists."""
    mock_session = MagicMock()
    mock_session.query.return_value.filter.return_value.first.return_value = (
        MagicMock(id=42)  # Existing exhibit
    )

    result = create_exhibit_data(mock_session, import_attributes)

    assert result["exhibit_id"] == 42
    mock_session.add.assert_not_called()  # No duplicate creation

Testing Handler Routing

Verify each exception type triggers the correct SQS behavior

# test_handler.py

def test_handler_recoverable_exception(mock_db_sessions):
    """RecoverableException triggers requeue with backoff."""
    event = make_sqs_event({"caseId": 1001, "documentId": "doc-123"})

    with patch("core.process.load_document", side_effect=RecoverableException("doc-123", "retry")):
        with patch("shell.utils.sqs_ops.handle_requeue_exception") as mock_requeue:
            result = lambda_handler(event, None)
            mock_requeue.assert_called_once()


def test_handler_permanent_failure(mock_db_sessions):
    """PermanentFailureException sends to DLQ."""
    event = make_sqs_event({"caseId": 1001, "documentId": "doc-123"})

    with patch("core.process.load_document", side_effect=PermanentFailureException("bad data")):
        with patch("shell.utils.sqs_ops.send_to_dlq") as mock_dlq:
            result = lambda_handler(event, None)
            mock_dlq.assert_called_once()


def test_handler_silent_success(mock_db_sessions):
    """SilentSuccessException deletes message as success."""
    event = make_sqs_event({"caseId": 1001, "documentId": "doc-123"})

    with patch("core.process.load_document", side_effect=SilentSuccessException("already done")):
        result = lambda_handler(event, None)
        assert result["batchItemFailures"] == []

Testing Idempotency

Every handler must produce the same result on duplicate delivery

def test_idempotent_duplicate_delivery(mock_db_sessions):
    """Processing the same message twice produces the same result."""
    event = make_sqs_event({"caseId": 1001, "documentId": "doc-123"})

    # First invocation — processes normally
    with patch("core.process.load_document") as mock_load:
        lambda_handler(event, None)
        mock_load.assert_called_once()

    # Second invocation — checkpoint exists, skips processing
    mock_db_sessions.query.return_value.filter.return_value.first.return_value = (
        MagicMock(checkpoint_id=Checkpoints.PROCESS_COMPLETE.value)
    )

    with patch("core.process.load_document") as mock_load:
        lambda_handler(event, None)
        # load_document should detect completion and return early

Testing Checkpoint Resume

def test_resume_from_exhibit_created(mock_db_sessions):
    """Pipeline resumes from EXHIBIT_CREATED, skips earlier steps."""
    checkpoint = MagicMock(checkpoint_id=Checkpoints.EXHIBIT_CREATED.value)

    processor = DocumentProcessor(checkpoint, import_attributes, initial=False)

    with patch.object(processor, "_handle_duplication") as mock_dedup:
        with patch.object(processor, "_create_exhibit") as mock_exhibit:
            with patch.object(processor, "_create_attachments") as mock_attach:
                processor.run()

                mock_dedup.assert_not_called()   # Skipped — already past
                mock_exhibit.assert_not_called()  # Skipped — already past
                mock_attach.assert_called_once()  # Resumes here

Testing Database Retry

def test_retry_on_deadlock(mock_db_sessions):
    """@retry_on_db_conflict retries on MySQL deadlock."""
    from sqlalchemy.exc import OperationalError

    # Simulate deadlock on first call, success on second
    deadlock_error = OperationalError("deadlock", None, None)
    deadlock_error.orig = MagicMock(args=(1213,))

    call_count = 0

    @retry_on_db_conflict(max_retries=3, base_delay=0.01)
    def flaky_write(session):
        nonlocal call_count
        call_count += 1
        if call_count == 1:
            raise deadlock_error
        return "success"

    result = flaky_write(mock_db_sessions)
    assert result == "success"
    assert call_count == 2

Test Event Factory

Helper to create properly structured SQS events:

# conftest.py or test_helpers.py

def make_sqs_event(message: dict) -> dict:
    """Create a properly nested SQS event with SNS envelope."""
    sns_message = {
        "source": "test",
        "jobId": message.get("jobId", "job-1"),
        "caseId": message.get("caseId", 1001),
        "batchId": message.get("batchId", 1),
        "documentId": message.get("documentId", "doc-1"),
        "eventType": message.get("eventType", "DOCUMENT_PROCESSED"),
        "status": message.get("status", "SUCCESS"),
        "timestamp": "2024-01-01T00:00:00Z",
        "eventDetail": json.dumps(message.get("eventDetail", {})),
    }

    return {
        "Records": [{
            "messageId": "test-msg-1",
            "receiptHandle": "test-receipt",
            "body": json.dumps({"Message": json.dumps(sns_message)}),
        }]
    }

Key Rules

  1. Mock at the shell boundary — test core logic directly with mock sessions
  2. autouse fixtures for all AWS services — no test should make real AWS calls
  3. Set env vars before imports — module-level config reads happen at import time
  4. Test every exception type — verify RecoverableException, PermanentFailure, SilentSuccess routing
  5. Test idempotency explicitly — process the same message twice, verify no side effects
  6. Test checkpoint resume — verify pipeline skips completed steps
  7. Test deadlock retry — verify @retry_on_db_conflict retries on MySQL 1213/1205
  8. Use make_sqs_event() — factory maintains the double-encoded SNS envelope format
  9. Never use real database connections — mock sessions in all tests
  10. Test both success and failure paths — every function should have at least one of each
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.