Reference Implementation: workers (Legacy)¶
Overview¶
The workers repo is the Legacy document processing engine — a collection of 29 specialized Ruby worker classes orchestrated by a custom polling daemon. Workers handle document ingestion, file conversion, image processing, video transcoding, OCR, email parsing, and export generation.
This is the Legacy counterpart to NGE's documentloader, documentextractor, and documentuploader modules. The NGE modules were built to replace specific worker pipelines with event-driven Lambda/ECS architecture.
Architecture¶
workers/
├── job_daemon.rb # Entry point — CLI option parsing, daemon startup
├── console.rb # IRB console with API/S3 loaded
├── Gemfile # Ruby 3.1.4, processing dependencies
├── workers/
│ ├── job_worker.rb # Base class — handles/run pattern, auto-registration
│ ├── preprocess_worker.rb # Router — identifies file type, delegates to specialist
│ ├── container_worker.rb # PST/MBOX/ZIP archive extraction
│ ├── conversion_worker.rb # Office/HTML/email → PDF via LibreOffice/wkhtmltopdf
│ ├── page_worker.rb # Image generation, OCR (Tesseract), S3 upload
│ ├── case_folder_import_worker.rb # S3 case folder imports with load file support
│ ├── batch_split_worker.rb # Splits large imports (>2000) for parallelism
│ ├── document_pdf_create_worker.rb # PDF creation for document viewer
│ ├── native_pdf_ocr_worker.rb # OCR via ocrmypdf + Nutrient integration
│ ├── treatment_worker.rb # Litigation presentation images (callout/highlight)
│ ├── image_manipulation_worker.rb # Markups, rotations, redactions
│ ├── transcode_worker.rb # Video transcoding via FFmpeg
│ ├── video_stitch_worker.rb # Multi-segment video stitching
│ ├── parse_transcript_worker.rb # Deposition transcript parsing (LEF/PTX/CMS)
│ ├── exhibit_zip_volume_worker.rb # Export ZIP volume creation
│ ├── exhibit_loadfile_worker.rb # Export load file generation
│ ├── s3_case_delete_worker.rb # Case S3 cleanup (10-day timeout, safety checks)
│ ├── spreadsheet_conversion_worker.rb # XLS/XLSB/CSV → XLSX
│ ├── native_placeholder_worker.rb # Bates-stamped placeholder images
│ └── aux/ # Auxiliary workers (loaded separately)
├── lib/
│ ├── job_daemon.rb # Core daemon: poll loop, worker dispatch, error handling
│ ├── forking_memory_watcher.rb # Fork-based memory management (400MB limit)
│ ├── nextpoint_importer.rb # Import framework (ZIP, case folder, load files)
│ ├── nextpoint_email_parser.rb # Email parsing (MSG, EML, MBOX)
│ ├── email_processor.rb # Email → exhibit with deduplication
│ ├── s3_uploader.rb # S3 upload mixin with expansion metrics
│ ├── document_pdf.rb # PDF generation and merging
│ ├── stamper.rb # Bates number stamp generation
│ ├── openoffice_converter.rb # LibreOffice document conversion
│ ├── ffmpeg_converter.rb # Video transcoding
│ ├── tesseract_ocr.rb # OCR via Tesseract
│ ├── tika.rb # Apache Tika content extraction
│ ├── job_metrics.rb # Job timing/size/expansion metrics
│ ├── import_support/ # Load file format processors
│ ├── export_support/ # Export template system
│ ├── image_manipulation/ # Image markup services
│ └── shared/ # Symlink to shared_libs repo
├── system/
│ ├── auto_shutdown_checker.rb # EC2 idle instance auto-shutdown (5 min)
│ ├── upload_job_daemon_logs_at_shutdown.rb # S3 log upload on shutdown
│ └── tika-server.jar # Apache Tika server
└── test/unit/ # Test::Unit tests with mocha mocking
Pattern Mapping¶
| Pattern | workers Implementation | NGE Equivalent |
|---|---|---|
| Job dispatch | Custom polling daemon (JobDaemon) polls Rails API every 30s via get_next_job() |
SNS/SQS event-driven with Lambda triggers |
| Worker registration | Ruby inherited hook auto-registers subclasses; handles :job_type macro |
Lambda handler routing by event type |
| Job state machine | API-managed: processing → complete/error/retry via update_job() |
Checkpoint pipeline (11-step state machine) |
| Multi-tenancy | One daemon instance per case (npcase_id); buffer box for dynamic assignment |
Per-case database schema switching |
| Memory management | ForkingMemoryWatcher — fork/exec per job, kill at 400MB RSS |
Lambda memory limits (configurable) |
| Error handling | Worker returns [result, error_message] tuple; retry status for transient failures |
Exception hierarchy (Recoverable/Permanent/Silent) |
| File processing pipeline | Cascading delegation: Preprocess → Container/Conversion → Page | Checkpoint pipeline steps |
| External tool orchestration | LibreOffice, Ghostscript, Tesseract, FFmpeg, Tika, wkhtmltopdf, pdftk | Hyland Filters (v11/v23), Apache PDFBox, Nutrient |
| S3 operations | NextPointS3 from shared_libs (caching, multipart, MD5 dedup) |
shell/utils/s3_ops.py |
| API communication | HTTP/XML via NextPointAPI with HMAC-SHA1 auth |
Direct database access + SNS events |
| Progress tracking | update_job(id, 'processing', progress: percent) via API |
Checkpoint updates + SNS events |
| Batch splitting | BatchSplitWorker splits >2000 items into parallel batch parts |
Chunk dispatch pattern |
| Retry logic | Worker returns 'retry'; API reschedules (no backoff in worker) |
SQS visibility timeout with exponential backoff |
| Deployment | EC2 instances with auto-shutdown checker; systemd services | Lambda (auto-scaling) or ECS Fargate tasks |
| Logging | File-based logs uploaded to S3 on shutdown | CloudWatch Logs with structured JSON |
Key Design Decisions¶
Custom Polling Daemon (Not Sidekiq/Resque)¶
The workers use a custom JobDaemon that polls the Rails API for jobs rather than
using a message queue. This design predates modern job frameworks and provides:
- Central job management — the Rails API controls job priority, assignment, and retry scheduling. Workers are stateless consumers.
- Single-tenant isolation — each daemon instance is locked to one
npcase_id, preventing cross-case data leakage. The "buffer box" pattern allows dynamic case assignment before the first job. - Priority support — workers can restrict to
high,medium, orlowpriority jobs via CLI flags or EC2 user data.
NGE replacement: SNS/SQS event-driven architecture eliminates polling overhead, provides automatic scaling, and supports per-batch infrastructure lifecycle (create/teardown queues dynamically).
Fork-Based Memory Safety¶
ForkingMemoryWatcher forks a child process for the actual JobDaemon. It monitors
RSS memory every few seconds and kills/re-forks the child if it exceeds 400MB. This
handles memory leaks from external tools (LibreOffice, Ghostscript, ImageMagick)
without restarting the entire daemon.
NGE replacement: Lambda has built-in memory limits (configurable up to 10GB). ECS Fargate tasks use container resource limits.
Cascading Job Type Delegation¶
PreprocessWorker acts as a router that identifies the file type and delegates to
the appropriate specialist worker:
PreprocessWorker (identifies file type via Nextpoint::FileIdentifier)
├── ContainerWorker (PST/MBOX/ZIP → extract items → spawn child jobs)
├── ConversionWorker (Office/HTML/email → PDF via LibreOffice/wkhtmltopdf)
└── PageWorker (single page → images, OCR, S3 upload)
Each worker creates child jobs via @api.create_job(), forming a processing tree.
The run_as_job_type(:other_type) method allows a worker to delegate to another
worker within the same process (avoids creating a new API job).
PreprocessWorker routing logic: - Downloads attachment from S3, checks for MacBinary encoding - Identifies true file type, renames if extension is wrong - Routes: containers (PST/MBX/MBOX/ZIP), convertible formats (Office/email/HTML), images (direct to PageWorker), unsupported types (AVI, EXE, DWG, etc. → marked unsupported) - Special detection: password-protected PDFs, PDF packages, animated GIFs
PageWorker OCR auto-detection: If attachment has <20 words of search text, OCR is performed automatically via Tesseract. Can be forced on/off via job details. Max image dimensions: 5100x6600 pixels.
NGE replacement: documentloader uses a linear checkpoint pipeline (11 steps) rather than a tree of delegated jobs. documentextractor handles file format conversion. documentuploader handles page image generation.
API-Driven State (No Direct Database Access)¶
Workers use the API for most state reads/writes. Exception: the export
pipeline (ExhibitBroker in lib/export_support/) executes raw SQL against
per-case MySQL databases via 12 prepared statements for performance-critical
exhibit data retrieval. All other workers go through NextPointAPI HTTP calls:
get_next_job()/update_job()— job lifecyclecreate_exhibit()/update_attachment()— document creationregister_as_worker()/ping()— worker lifecyclecreate_job()— spawn child jobs
This design isolates workers from database schema details but creates a chatty HTTP protocol and single point of failure (Rails API).
NGE replacement: Lambda functions access databases directly via SQLAlchemy
sessions (writer_session() / reader_session()), eliminating the API bottleneck.
External Tool Dependencies¶
Workers depend on 10+ external tools installed on EC2:
| Tool | Purpose | NGE Replacement |
|---|---|---|
| LibreOffice (unoconv) | Office → PDF conversion | Hyland Filters v23 |
| Ghostscript | PDF → TIFF/PNG rendering | Apache PDFBox |
| Tesseract | OCR | Hyland Filters OCR |
| FFmpeg | Video transcoding | Still in Legacy |
| wkhtmltopdf | HTML → PDF | Hyland Filters |
| Apache Tika | Content/metadata extraction | Hyland Filters |
| xpdf | PDF text extraction | Hyland Filters |
| pdftk | PDF manipulation | Apache PDFBox |
| libpst | PST email archive extraction | Hyland Filters |
| ImageMagick/GD2 | Image processing | Nutrient (PSPDFKit) |
Worker Lifecycle¶
1. EC2 instance boots
2. job_daemon.rb parses CLI options (--environment, --npcase_id, --priority)
3. ForkingMemoryWatcher.new.start — forks child process
4. JobDaemon registers with Rails API: register_as_worker(ip, npcase_id, instance_id)
5. Poll loop (every 30s):
a. api.get_next_job(npcase_id, priority_restriction)
b. If job found → dispatch to registered worker class
c. Worker.run() processes job, calls api.update_job() with progress
d. On completion: update_job(id, 'complete', include_next_job: true)
e. GC.start after each job
6. If memory > 400MB → ForkingMemoryWatcher kills and re-forks child
7. On shutdown (TERM signal):
a. Graceful: finish current job
b. Second TERM: force kill child process
c. Upload logs to S3
EC2 Instance Lifecycle¶
Workers run on dedicated EC2 instances with auto-scaling:
- Buffer box: Starts without
npcase_id, pollsbuffer_box_work_requestuntil assigned - Case-locked: Once assigned a case, the instance is locked to that case ID via
file-based state (
/mnt/npcase_id.dat) - Auto-shutdown:
auto_shutdown_checker.rb(cron every 5 min) shuts down idle instances after 5 minutes of no jobs. Createsauto_shutdown_check.lockduring evaluation —JobDaemonchecks this lock to avoid fetching new jobs during shutdown evaluation. Respects.no_auto_shutdownfile to disable. - Priority partitioning: Multiple daemons per instance with different priority restrictions (configured via EC2 user data)
Integration Points with NGE¶
What NGE Replaced¶
| Legacy Worker | NGE Module | Status |
|---|---|---|
PreprocessWorker + ContainerWorker |
documentloader | Replaced (NGE) |
ConversionWorker (file extraction) |
documentextractor | Replaced (NGE) |
PageWorker (image generation) |
documentuploader | Replaced (NGE) |
document_pdf_create_worker (PDF pages) |
documentpageservice | Replaced (NGE) |
ZipWorker / UnzipZipFromS3Worker |
unzipservice | Replaced (NGE) |
ExhibitZipVolumeWorker + ExhibitLoadfileWorker |
documentexporter | Replaced (NGE) |
What Remains in Legacy¶
| Legacy Worker | Purpose | NGE Status |
|---|---|---|
TranscodeWorker / VideoStitchWorker |
Video processing | No NGE equivalent |
ParseTranscriptWorker |
Deposition transcript parsing | Hybrid — internal nge_enabled branching |
TreatmentWorker |
Litigation presentation images | Hybrid — uses Nutrient for image source when nge_enabled |
DownloadExhibitPdfWorker |
PDF download/export | Hybrid — uses Nutrient API for NGE PDFs, S3 for Legacy |
DepositionZipWorker |
Deposition export packaging | Hybrid — skips batch creation for NGE batches |
ImageManipulationWorker |
Markup/redaction application | Partially in Nutrient |
SpreadsheetConversionWorker |
Spreadsheet conversion | No NGE equivalent |
S3CaseDeleteWorker |
Case cleanup | No NGE equivalent |
NativePdfOcrWorker |
PDF OCR with Nutrient | Hybrid Legacy+NGE |
Patterns to Preserve vs Deprecate¶
Preserve¶
- Cascading job delegation — the router pattern (
PreprocessWorker→ specialist) is sound; NGE implements it differently via checkpoint steps - Memory watchdog — fork-based memory management is robust for external tool usage
- Buffer box pattern — dynamic case assignment is useful for resource pooling
- Progress tracking — percentage-based progress reporting to the API
Deprecate¶
- HTTP polling for jobs — replaced by SNS/SQS event-driven triggers
- XML-based API protocol — replaced by direct database access in NGE
- Single-tenant EC2 instances — replaced by multi-tenant Lambda/ECS
- File-based case locking — replaced by per-case database schema naming
- Manual EC2 lifecycle — replaced by Lambda auto-scaling and ECS Fargate
Key File Locations¶
| File | Purpose |
|---|---|
job_daemon.rb (root) |
Entry point with CLI parsing |
lib/job_daemon.rb |
Core daemon loop and worker dispatch |
lib/forking_memory_watcher.rb |
Fork-based memory management |
workers/job_worker.rb |
Base worker class with handles macro |
workers/preprocess_worker.rb |
File type router |
workers/container_worker.rb |
Archive extraction (PST/MBOX/ZIP) |
workers/conversion_worker.rb |
Document → PDF conversion |
workers/page_worker.rb |
Page image generation + OCR |
lib/nextpoint_importer.rb |
Import framework |
lib/nextpoint_email_parser.rb |
Email parsing |
system/auto_shutdown_checker.rb |
EC2 idle shutdown |
Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.