S3 Operations Pattern¶
Purpose¶
Standardize S3 file storage, retrieval, and lifecycle management across all modules. S3 is used for document files, intermediate processing artifacts, export assemblies, and inter-module data transfer.
S3 Path Conventions¶
Document Storage¶
All document files follow a case-scoped prefix structure:
Export Artifacts¶
Export outputs (PDFs, ZIPs, images) use a parallel path:
Intermediate Processing¶
Temporary files used during processing (e.g., chunk dispatch payloads,
extracted text) use a tmp/ prefix:
Cross-Module Data Transfer¶
When modules exchange large payloads that exceed SQS message limits (256 KB), data is staged in S3:
File Operations¶
Upload with Content Type Detection¶
# shell/utils/s3_ops.py
import mimetypes
from typing import Optional
def upload_file(
bucket: str,
key: str,
body: bytes,
content_type: Optional[str] = None,
) -> None:
"""Upload file to S3 with content type."""
if not content_type:
content_type, _ = mimetypes.guess_type(key)
content_type = content_type or "application/octet-stream"
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=body,
ContentType=content_type,
)
Download with Streaming¶
For large files, stream instead of loading into memory:
def download_file_streaming(bucket: str, key: str, local_path: str) -> None:
"""Stream large file from S3 to local disk."""
s3_client.download_file(bucket, key, local_path)
def get_file_bytes(bucket: str, key: str) -> bytes:
"""Download small file into memory."""
response = s3_client.get_object(Bucket=bucket, Key=key)
return response["Body"].read()
Multipart Upload for Large Files¶
Files exceeding 100 MB should use multipart upload:
from boto3.s3.transfer import TransferConfig
MULTIPART_THRESHOLD = 100 * 1024 * 1024 # 100 MB
MULTIPART_CHUNKSIZE = 50 * 1024 * 1024 # 50 MB
transfer_config = TransferConfig(
multipart_threshold=MULTIPART_THRESHOLD,
multipart_chunksize=MULTIPART_CHUNKSIZE,
max_concurrency=4,
)
def upload_large_file(bucket: str, key: str, local_path: str) -> None:
"""Upload large file using multipart transfer."""
s3_client.upload_file(
local_path, bucket, key,
Config=transfer_config,
)
Presigned URLs¶
For time-limited access without IAM credentials (e.g., frontend downloads):
def generate_presigned_url(
bucket: str, key: str, expiration: int = 3600
) -> str:
"""Generate presigned URL for temporary access."""
return s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=expiration,
)
Lifecycle and Cleanup¶
Temporary File Cleanup¶
Intermediate processing files under tmp/ should be deleted after batch completion:
def cleanup_tmp_files(bucket: str, case_id: int, batch_id: int) -> int:
"""Delete all temporary files for a completed batch."""
prefix = f"{Config.ENV}/tmp/{case_id}/{batch_id}/"
deleted = 0
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
objects = page.get("Contents", [])
if objects:
s3_client.delete_objects(
Bucket=bucket,
Delete={"Objects": [{"Key": obj["Key"]} for obj in objects]},
)
deleted += len(objects)
return deleted
S3 Lifecycle Rules¶
Configure at the bucket level for automated cleanup:
| Prefix | Rule | Retention |
|---|---|---|
{env}/tmp/ |
Auto-expire | 7 days |
{env}/staging/ |
Auto-expire | 3 days |
{env}/cases/*/exports/ |
Transition to Glacier | 90 days |
{env}/cases/*/documents/ |
No expiration | Permanent |
Batch Completion Cleanup¶
After BATCH_END_FINISHED, the job processor triggers cleanup:
def on_batch_complete(case_id: int, batch_id: int) -> None:
"""Clean up temporary artifacts after batch completion."""
cleanup_tmp_files(Config.S3_BUCKET, case_id, batch_id)
log_message("info", f"Cleaned up tmp files for case={case_id} batch={batch_id}")
Error Handling¶
Retry on Transient Errors¶
S3 operations may fail with transient errors (503 SlowDown, 500 InternalError). Use boto3's built-in retry configuration:
from botocore.config import Config as BotoConfig
s3_client = boto3.client(
"s3",
config=BotoConfig(
retries={"max_attempts": 3, "mode": "adaptive"},
),
)
Missing File Handling¶
from botocore.exceptions import ClientError
def file_exists(bucket: str, key: str) -> bool:
"""Check if S3 object exists without downloading."""
try:
s3_client.head_object(Bucket=bucket, Key=key)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
raise
Key Rules¶
- Case-scoped prefixes — all document files live under
cases/{case_id}/ - Content type on upload — always set ContentType, don't rely on S3 defaults
- Multipart for large files — use TransferConfig for files > 100 MB
- Stream large downloads — use
download_file(), notget_object().read()for large files - Clean up tmp/ on batch completion — don't leave intermediate artifacts
- S3 lifecycle rules for staging — automated expiration prevents storage cost creep
- Retry with adaptive mode — boto3 handles 503 SlowDown automatically
- Presigned URLs for frontend — time-limited, no IAM credentials needed
- Never store secrets in S3 — use Secrets Manager
- Environment prefix — all paths include
{env}/to isolate dev/staging/prod
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.