Skip to content

S3 Operations Pattern

Purpose

Standardize S3 file storage, retrieval, and lifecycle management across all modules. S3 is used for document files, intermediate processing artifacts, export assemblies, and inter-module data transfer.

S3 Path Conventions

Document Storage

All document files follow a case-scoped prefix structure:

s3://{bucket}/{env}/cases/{case_id}/batches/{batch_id}/documents/{document_id}/{filename}

Export Artifacts

Export outputs (PDFs, ZIPs, images) use a parallel path:

s3://{bucket}/{env}/cases/{case_id}/exports/{export_id}/{filename}

Intermediate Processing

Temporary files used during processing (e.g., chunk dispatch payloads, extracted text) use a tmp/ prefix:

s3://{bucket}/{env}/tmp/{case_id}/{batch_id}/{job_id}/{filename}

Cross-Module Data Transfer

When modules exchange large payloads that exceed SQS message limits (256 KB), data is staged in S3:

s3://{bucket}/{env}/staging/{source_module}/{target_module}/{job_id}/{filename}

File Operations

Upload with Content Type Detection

# shell/utils/s3_ops.py

import mimetypes
from typing import Optional

def upload_file(
    bucket: str,
    key: str,
    body: bytes,
    content_type: Optional[str] = None,
) -> None:
    """Upload file to S3 with content type."""
    if not content_type:
        content_type, _ = mimetypes.guess_type(key)
        content_type = content_type or "application/octet-stream"

    s3_client.put_object(
        Bucket=bucket,
        Key=key,
        Body=body,
        ContentType=content_type,
    )

Download with Streaming

For large files, stream instead of loading into memory:

def download_file_streaming(bucket: str, key: str, local_path: str) -> None:
    """Stream large file from S3 to local disk."""
    s3_client.download_file(bucket, key, local_path)


def get_file_bytes(bucket: str, key: str) -> bytes:
    """Download small file into memory."""
    response = s3_client.get_object(Bucket=bucket, Key=key)
    return response["Body"].read()

Multipart Upload for Large Files

Files exceeding 100 MB should use multipart upload:

from boto3.s3.transfer import TransferConfig

MULTIPART_THRESHOLD = 100 * 1024 * 1024  # 100 MB
MULTIPART_CHUNKSIZE = 50 * 1024 * 1024   # 50 MB

transfer_config = TransferConfig(
    multipart_threshold=MULTIPART_THRESHOLD,
    multipart_chunksize=MULTIPART_CHUNKSIZE,
    max_concurrency=4,
)

def upload_large_file(bucket: str, key: str, local_path: str) -> None:
    """Upload large file using multipart transfer."""
    s3_client.upload_file(
        local_path, bucket, key,
        Config=transfer_config,
    )

Presigned URLs

For time-limited access without IAM credentials (e.g., frontend downloads):

def generate_presigned_url(
    bucket: str, key: str, expiration: int = 3600
) -> str:
    """Generate presigned URL for temporary access."""
    return s3_client.generate_presigned_url(
        "get_object",
        Params={"Bucket": bucket, "Key": key},
        ExpiresIn=expiration,
    )

Lifecycle and Cleanup

Temporary File Cleanup

Intermediate processing files under tmp/ should be deleted after batch completion:

def cleanup_tmp_files(bucket: str, case_id: int, batch_id: int) -> int:
    """Delete all temporary files for a completed batch."""
    prefix = f"{Config.ENV}/tmp/{case_id}/{batch_id}/"
    deleted = 0

    paginator = s3_client.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        objects = page.get("Contents", [])
        if objects:
            s3_client.delete_objects(
                Bucket=bucket,
                Delete={"Objects": [{"Key": obj["Key"]} for obj in objects]},
            )
            deleted += len(objects)

    return deleted

S3 Lifecycle Rules

Configure at the bucket level for automated cleanup:

Prefix Rule Retention
{env}/tmp/ Auto-expire 7 days
{env}/staging/ Auto-expire 3 days
{env}/cases/*/exports/ Transition to Glacier 90 days
{env}/cases/*/documents/ No expiration Permanent

Batch Completion Cleanup

After BATCH_END_FINISHED, the job processor triggers cleanup:

def on_batch_complete(case_id: int, batch_id: int) -> None:
    """Clean up temporary artifacts after batch completion."""
    cleanup_tmp_files(Config.S3_BUCKET, case_id, batch_id)
    log_message("info", f"Cleaned up tmp files for case={case_id} batch={batch_id}")

Error Handling

Retry on Transient Errors

S3 operations may fail with transient errors (503 SlowDown, 500 InternalError). Use boto3's built-in retry configuration:

from botocore.config import Config as BotoConfig

s3_client = boto3.client(
    "s3",
    config=BotoConfig(
        retries={"max_attempts": 3, "mode": "adaptive"},
    ),
)

Missing File Handling

from botocore.exceptions import ClientError

def file_exists(bucket: str, key: str) -> bool:
    """Check if S3 object exists without downloading."""
    try:
        s3_client.head_object(Bucket=bucket, Key=key)
        return True
    except ClientError as e:
        if e.response["Error"]["Code"] == "404":
            return False
        raise

Key Rules

  1. Case-scoped prefixes — all document files live under cases/{case_id}/
  2. Content type on upload — always set ContentType, don't rely on S3 defaults
  3. Multipart for large files — use TransferConfig for files > 100 MB
  4. Stream large downloads — use download_file(), not get_object().read() for large files
  5. Clean up tmp/ on batch completion — don't leave intermediate artifacts
  6. S3 lifecycle rules for staging — automated expiration prevents storage cost creep
  7. Retry with adaptive mode — boto3 handles 503 SlowDown automatically
  8. Presigned URLs for frontend — time-limited, no IAM credentials needed
  9. Never store secrets in S3 — use Secrets Manager
  10. Environment prefix — all paths include {env}/ to isolate dev/staging/prod
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.