Concurrency Patterns Across the Nextpoint Platform¶
Overview¶
Nextpoint uses four distinct concurrency models across its platform, each chosen for the specific workload characteristics of that component. This document maps theoretical concurrency concepts to their real implementations.
The Four Concurrency Models¶
1. Lambda Process Isolation (NGE Modules)¶
Model: Each Lambda invocation is its own process — own memory space, no shared state, no threads, no mutexes needed. Concurrency is managed by SQS (message-level parallelism) and Lambda's execution environment.
Where used: documentloader, documentextractor orchestration, documentexporter trigger, nextpoint-ai (orchestrator + processor), search-hit-report-backend
Why it works: - No race conditions possible — each invocation is isolated - SQS provides ordered, at-least-once delivery with visibility timeout - Lambda auto-scales horizontally (up to concurrency limit) - Idempotent handlers protect against duplicate processing
Trade-off: No shared in-memory state between invocations. All state must go through MySQL, S3, or DynamoDB. This adds latency but eliminates concurrency bugs.
# NGE Lambda handler — no concurrency concerns at the application level
# SQS guarantees one message = one Lambda invocation
def handler(event, context):
for record in event["Records"]:
message = json.loads(record["body"])
with writer_session(case_id=message["caseId"]) as session:
process_document(session, message) # No mutex needed
2. Fork-Based Process Parallelism (Legacy Workers)¶
Model: The parent process forks child processes for job execution. Each child gets a copy-on-write memory space. The parent monitors RSS and kills children that exceed memory limits.
Where used: Workers repo (ForkingMemoryWatcher), Rails ES indexing (BulkIndexable)
Why it works: - Bypasses Ruby's GVL (Global VM Lock) for CPU-bound work - Memory isolation — child crash doesn't kill parent - Memory ceiling enforcement — kills at 400MB RSS - Each child handles one job at a time — no shared state within the job
Key implementation — ForkingMemoryWatcher:
Parent (watcher) Child (worker)
│ │
├── fork() ──────────────────────> Start
│ │
├── Check RSS every 5s ├── Poll API for jobs
│ ├── Execute job
│ RSS > 400MB? │
│ ├── Yes: TERM → child │
│ │ Wait for job to finish │
│ │ Re-fork new child ─────> New child
│ └── No: continue monitoring │
│ │
├── Pass SIGTERM/SIGINT to child ├── GC.start after each job
└── Store watcher PID in pidfile └── Write status to .dat file
Key implementation — BulkIndexable (ES fork-based parallel import):
# Monkeypatches elasticsearch-model to fork processes for parallel bulk indexing
# One process per batch, up to Concurrent.processor_count
# Each process gets its own DB connection and ES client
def __find_in_batches(options)
fork do
# Child process — own memory space, own DB connection
batch.each { |record| yield record.as_indexed_json }
end
Process.wait(pid)
end
Trade-off: fork() is expensive (copies page tables). Not suitable for short-lived
tasks. The ForkingMemoryWatcher re-fork cycle adds ~1s overhead per restart.
3. Thread-Based Concurrency (Sidekiq / Puma)¶
Model: Multiple threads share a process, with Ruby's GVL limiting CPU-bound parallelism but allowing concurrent I/O-bound work.
Where used: Rails Sidekiq jobs (70+ jobs), Puma web server, NextPointAPI connection pooling
Why it works for Sidekiq: - Background jobs are I/O-bound (DB queries, S3 uploads, API calls, email) - GVL releases during I/O operations, letting other threads proceed - Threads are lightweight — Sidekiq runs 10-25 threads per process - Redis-backed queue provides job isolation
Thread-safety patterns in the codebase:
a) Thread-local storage (PerCaseModel):
# Avoids shared state entirely — each thread tracks its own case
class PerCaseModel < ApplicationRecord
# Indexed by Thread.current.object_id
@@case_id = {}
@@db_name = {}
@@use_reader = {}
def self.set_case(case_id, use_reader: false)
@@case_id[Thread.current.object_id] = case_id
@@db_name[Thread.current.object_id] = database_name(case_id)
@@use_reader[Thread.current.object_id] = use_reader
establish_connection(connection_config)
end
end
b) Mutex-protected resource pool (ResourcePool):
# Thread-safe connection pool — mutex only protects bookkeeping
class ResourcePool
def retrieve(non_blocking = false)
@mutex.synchronize do # Critical section: pool state only
if resource = @available.pop
@in_use << resource
return resource
elsif @in_use.size < @limit
resource = @constructor.call # Construction inside mutex (fast)
@in_use << resource
return resource
end
end
# I/O happens OUTSIDE the mutex — correct pattern
sleep 0.1 and retry unless non_blocking
end
end
c) File-based cross-process mutex (GlobalNpcaseIdHandler):
# EC2 workers — prevents two daemons from claiming different cases
def set(npcase_id, options = {})
File.open(LOCK_FILE) do |f|
f.flock(File::LOCK_EX) # OS-level exclusive lock
current = read_from_file
raise if current && current != npcase_id # Can't switch without reboot
write_to_file(npcase_id)
end # Lock released on close
end
d) Thread-safe singleton:
# Ensures only one instance created even with concurrent threads
module ThreadSafeSingleton
def instance
@mutex.synchronize do
@instance ||= new
end
end
end
4. ECS Task Isolation (Long-Running Workloads)¶
Model: Each ECS Fargate task is its own container with dedicated CPU and memory. Tasks are triggered by Lambda or API Gateway and run to completion.
Where used: documentexporter, documentextractor workers, documentuploader (Ruby Poller + DocEngine + Redirection sidecar), documentpageservice (Java PDFBox), unzipservice
Why it works: - No memory sharing between tasks - Dedicated CPU allocation (no noisy neighbors) - Natural scaling — more tasks = more parallelism - Container crash doesn't affect other tasks
Concurrency within a task: - documentextractor: Java/Kotlin multi-threaded Hyland Filters (JVM has no GIL) - documentuploader: Multi-container sidecar (Ruby Poller, DocEngine, Redirection) communicate via localhost HTTP — process isolation within the task
Concurrency Anti-Patterns Found in the Codebase¶
1. Class variables as thread-local storage (PerCaseModel)¶
@@case_id indexed by Thread.current.object_id works but is fragile.
If a thread is recycled (Sidekiq thread pool), the old object_id entry leaks.
Thread-local variables (Thread.current[:case_id]) would be cleaner.
Status: Works in production. Not worth changing unless refactoring PerCaseModel.
2. Sleep-based polling in ResourcePool¶
sleep 0.1 and retry when the pool is full creates a busy-wait loop.
A ConditionVariable with wait/signal would be more efficient.
Status: Works in production. Pool contention is rare because connection limits are generous relative to thread counts.
3. Raw SQL without mutex in BulkActionJob¶
BulkActionJob uses raw SQL INSERT INTO exh_designations for performance.
Thread safety relies on MySQL's row-level locking, not Ruby-level synchronization.
This is correct — the database is the synchronization point.
Status: Correct pattern. Database handles concurrency via InnoDB row locks.
4. GC.start after every job (workers daemon)¶
The workers daemon calls GC.start after each job completes. This is a
concurrency-safe pattern (only one job runs per forked child at a time)
but adds latency. In Sidekiq (multi-threaded), this would be problematic
because GC.start pauses ALL threads.
Status: Correct for fork-based workers. Would be an anti-pattern in Sidekiq.
Choosing the Right Model¶
| Workload Type | Model | Why |
|---|---|---|
| Event-driven document processing | Lambda (process isolation) | No shared state, auto-scaling, at-least-once delivery via SQS |
| CPU-bound file conversion | ECS Fargate (task isolation) | Dedicated CPU/memory, no time limit, container dependencies (FFmpeg, etc.) |
| I/O-bound background jobs | Sidekiq threads | Lightweight, GVL releases during I/O, shared Redis queue |
| CPU-bound parallel indexing | Fork (process parallelism) | Bypasses GVL, memory isolation, crash protection |
| Long-running daemons with memory limits | Fork + memory watcher | Memory ceiling enforcement, automatic restart |
| Cross-process coordination | File locks (flock) | Simple, OS-native, no external dependency |
| In-process shared state | Mutex | Simplest correct solution for Ruby thread safety |
How NGE Eliminates Concurrency Complexity¶
The fundamental architectural insight of the NGE migration is that message-driven process isolation eliminates application-level concurrency concerns entirely:
Legacy:
Thread safety → Mutex, thread-local storage, file locks, fork
Race conditions → Manual retry on MySQL deadlocks (3 retries, 1s sleep)
Memory management → ForkingMemoryWatcher kills at 400MB
Coordination → BatesStampCompletionJob polls ProcessingJobs every 15s
NGE:
Thread safety → Not needed (Lambda = one process per invocation)
Race conditions → Idempotent handlers + SQS visibility timeout
Memory management → Lambda configured memory limit (auto-kill)
Coordination → SNS events + Athena polling (PSM)
The trade-off is explicit state management (every state transition goes through MySQL or S3), but the elimination of concurrency bugs is worth it for document processing pipelines where correctness matters more than latency.
References¶
concurrent-write-safety.md— Pattern for handling concurrent DB writesretry-and-resilience.md— Retry patterns including MySQL deadlock handlingidempotent-handlers.md— How idempotent handlers replace thread-safetyexception-hierarchy.md— How exception types control SQS retry behaviorreference-implementations/workers.md— ForkingMemoryWatcher detailsreference-implementations/shared-libs.md— ResourcePool, ThreadSafeSingletonreference-implementations/rails-monolith.md— PerCaseModel thread-safety
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.