ECS Long-Running Workloads¶
Problem¶
Some processing tasks exceed Lambda's constraints: - 15-minute timeout — large document sets need hours - 10 GB memory / 6 vCPU — image conversion and OCR are resource-intensive - 512 MB ephemeral storage — multi-gigabyte exports need working disk - No sidecar support — some tasks need co-located services (document engines, caches)
Solution¶
Use ECS Fargate tasks orchestrated by Step Functions for compute-heavy, long-running workloads. Reserve Lambda + SQS for event-driven, short-lived processing.
When to Use¶
| Characteristic | Lambda + SQS | ECS (Step Functions or Persistent Workers) |
|---|---|---|
| Duration | < 15 minutes | Minutes to hours |
| Memory | < 10 GB | Up to 30 GB per task |
| Storage | < 10 GB (ephemeral + /tmp) | EFS (elastic) + 200 GB ephemeral |
| Sidecars needed | No | Yes (document engines, caches) |
| Concurrency model | Per-message | Per-volume / per-batch / per-job |
| Error handling | Exception hierarchy → SQS | Exit codes or task status codes |
Two ECS Orchestration Models¶
Choose based on workload characteristics:
| Step Functions + ECS Tasks | Persistent ECS Workers | |
|---|---|---|
| Lifecycle | Task starts, runs, exits | Long-running service, polls for work |
| Best for | Batch jobs with defined stages | Continuous stream of heterogeneous tasks |
| Scaling | Map state concurrency | Auto Scaling + CloudWatch metrics |
| State | Step Functions manages flow | DynamoDB or external state store |
| Multi-job | One execution per job | Service rotates between jobs |
| Example | documentexporter (export volumes → zip → complete) | documentextractor (extraction workers polling task queues) |
Architecture¶
Orchestrator Lambda (triggered by backend or SNS)
│
├─ Upload task input to S3 (avoid container override size limits)
├─ Create Step Functions state machine (or use static definition)
└─ Start execution
│
├─ Map State (parallel tasks, configurable concurrency)
│ └─ ECS Fargate Task
│ ├─ Main container (processing logic)
│ └─ Sidecar containers (optional: engines, caches)
│
├─ Aggregation Step (collect results)
│
├─ Completion Handler (update status, notify)
│
└─ Error Handler (map exit codes to user messages)
Task Resource Sizing¶
// CDK — ECS task definition with appropriate resources
const taskDef = new ecs.FargateTaskDefinition(this, 'ProcessorTask', {
memoryLimitMiB: 8192, // Size for workload
cpu: 4096, // 4 vCPU
ephemeralStorageGiB: 200, // Large scratch space
});
// EFS for shared/persistent working storage
const fileSystem = new efs.FileSystem(this, 'WorkingStorage', {
vpc,
performanceMode: efs.PerformanceMode.GENERAL_PURPOSE,
throughputMode: efs.ThroughputMode.BURSTING,
lifecyclePolicy: efs.LifecyclePolicy.AFTER_7_DAYS,
});
taskDef.addVolume({
name: 'working-storage',
efsVolumeConfiguration: { fileSystemId: fileSystem.fileSystemId },
});
Sidecar Pattern¶
When processing requires co-located services (e.g., document rendering engine), use ECS container dependencies with health checks:
// Main container waits for sidecar to be healthy before starting
const mainContainer = taskDef.addContainer('processor', {
image: ecs.ContainerImage.fromEcrRepository(processorRepo),
essential: true,
logging: ecs.LogDrivers.awsLogs({ streamPrefix: 'processor' }),
});
const sidecarContainer = taskDef.addContainer('document-engine', {
image: ecs.ContainerImage.fromEcrRepository(engineRepo),
essential: false, // Task can survive sidecar failure
healthCheck: {
command: ['CMD-SHELL', 'curl -f http://localhost:5000/health || exit 1'],
interval: Duration.seconds(10),
timeout: Duration.seconds(5),
retries: 3,
},
});
mainContainer.addContainerDependencies({
container: sidecarContainer,
condition: ecs.ContainerDependencyCondition.HEALTHY,
});
Parallel Processing with Map State¶
Step Functions Map state provides bounded parallelism:
// State machine definition — process volumes in parallel
const mapState = new sfn.Map(this, 'ProcessVolumes', {
maxConcurrency: 20, // Bound parallel tasks
itemsPath: '$.volumes', // Array of work items
});
mapState.itemProcessor(
new sfn.StateMachine(this, 'VolumeProcessor', {
definition: runEcsTask, // Each item runs as ECS task
})
);
Exit Code Error Propagation¶
ECS containers communicate errors via process exit codes. The completion handler maps codes to user-facing messages:
# Container — exit with specific code on error
import sys
EXIT_CODES = {
20: "Document not found in external API",
21: "External API error (4xx/5xx)",
22: "Network error calling external API",
30: "Compression failed",
31: "S3 upload failed",
1: "Processing failed (generic)",
}
try:
process_documents()
except DocumentNotFound:
log_error("Document not found")
sys.exit(20)
except APIError as e:
log_error(f"API error: {e}")
sys.exit(21)
// Completion handler — map exit codes to messages
const ERROR_MAP: Record<number, string> = {
20: 'Document not found in external API',
21: 'External API error',
22: 'Network error calling external API',
30: 'Compression failed',
31: 'S3 upload failed',
1: 'Processing failed',
};
function getErrorMessage(exitCode: number): string {
return ERROR_MAP[exitCode] || `Unknown error (exit code ${exitCode})`;
}
S3 for Task Coordination¶
Use S3 to pass data between ECS tasks (avoids Step Functions payload size limits):
# Task 1: Write results to S3
s3.put_object(
Bucket=bucket,
Key=f"{export_key}/results/{task_id}.json",
Body=json.dumps(results),
)
# Task 2: Read results from S3
response = s3.get_object(Bucket=bucket, Key=f"{export_key}/results/{task_id}.json")
results = json.loads(response['Body'].read())
Persistent Worker Pool (Alternative to Step Functions)¶
For workloads with continuous task streams and heterogeneous task types, use persistent ECS services with a worker pool registry instead of Step Functions:
// DynamoDB table as worker service registry
const workerTable = new dynamodb.Table(this, 'WorkerState', {
partitionKey: { name: 'ServiceId', type: dynamodb.AttributeType.NUMBER },
});
workerTable.addGlobalSecondaryIndex({
indexName: 'StatusIndex',
partitionKey: { name: 'Status', type: dynamodb.AttributeType.STRING },
});
workerTable.addGlobalSecondaryIndex({
indexName: 'CaseIdIndex',
partitionKey: { name: 'CaseId', type: dynamodb.AttributeType.STRING },
});
Service claim with conditional update (prevents race conditions):
// Claim a free service — conditional update ensures only one caller wins
async function claimService(serviceId: number, caseId: string, jobId: string): Promise<boolean> {
try {
await dynamoClient.send(new UpdateItemCommand({
TableName: WORKER_TABLE,
Key: { ServiceId: { N: String(serviceId) } },
UpdateExpression: 'SET #status = :busy, CaseId = :caseId, CurrentJobId = :jobId',
ConditionExpression: '#status = :free',
ExpressionAttributeNames: { '#status': 'Status' },
ExpressionAttributeValues: {
':busy': { S: 'BUSY' },
':free': { S: 'FREE' },
':caseId': { S: caseId },
':jobId': { S: jobId },
},
}));
return true;
} catch (e) {
if (e.name === 'ConditionalCheckFailedException') return false; // Lost race
throw e;
}
}
Service claim with exponential backoff (multiple imports competing):
// Retry service claim with exponential backoff + jitter
const MAX_ATTEMPTS = 5;
const BASE_DELAY_MS = 250;
const JITTER_MS = 200;
for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) {
// First check: reuse existing service for same case
const existingService = await findServiceByCaseId(caseId);
if (existingService) {
await addJobToService(existingService.serviceId, jobId);
return existingService;
}
// Second check: claim a free service
const freeService = await findFreeService();
if (freeService && await claimService(freeService.serviceId, caseId, jobId)) {
return freeService;
}
// Backoff before retry
const delay = BASE_DELAY_MS * Math.pow(2, attempt - 1) + Math.random() * JITTER_MS;
await sleep(delay);
}
Priority-based job rotation — when a service finishes one job, it picks up the next by priority:
// Composite sort key: priority in upper bits, timestamp in lower bits
// Lower priority number = higher priority. Timestamp prevents starvation.
function jobOrder(priority: number, createdTimestamp: number): bigint {
return (BigInt(priority) << BigInt(56)) | BigInt(createdTimestamp);
}
Scheduled Cleanup¶
Long-running state machines and ECS tasks accumulate. Schedule cleanup:
// EventBridge rule — daily cleanup of old state machines
const cleanupRule = new events.Rule(this, 'CleanupSchedule', {
schedule: events.Schedule.cron({ minute: '0', hour: '11' }), // 6am CST
});
cleanupRule.addTarget(new targets.LambdaFunction(cleanupLambda));
// Cleanup Lambda — delete state machines older than retention period
const RETENTION_DAYS = 14;
const cutoff = new Date(Date.now() - RETENTION_DAYS * 24 * 60 * 60 * 1000);
for (const sm of stateMachines) {
if (sm.creationDate < cutoff) {
await sfnClient.send(new DeleteStateMachineCommand({
stateMachineArn: sm.stateMachineArn,
}));
}
}
Source¶
Extracted from documentexporter (Step Functions + ECS tasks with sidecars) and documentextractor (persistent ECS worker pool with DynamoDB state, priority scheduling, and dual-queue architecture).
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.