Skip to content

Reference Implementation: workers (Legacy)

Overview

The workers repo is the Legacy document processing engine — a collection of 29 specialized Ruby worker classes orchestrated by a custom polling daemon. Workers handle document ingestion, file conversion, image processing, video transcoding, OCR, email parsing, and export generation.

This is the Legacy counterpart to NGE's documentloader, documentextractor, and documentuploader modules. The NGE modules were built to replace specific worker pipelines with event-driven Lambda/ECS architecture.

Architecture

workers/
├── job_daemon.rb                    # Entry point — CLI option parsing, daemon startup
├── console.rb                       # IRB console with API/S3 loaded
├── Gemfile                          # Ruby 3.1.4, processing dependencies
├── workers/
│   ├── job_worker.rb                # Base class — handles/run pattern, auto-registration
│   ├── preprocess_worker.rb         # Router — identifies file type, delegates to specialist
│   ├── container_worker.rb          # PST/MBOX/ZIP archive extraction
│   ├── conversion_worker.rb         # Office/HTML/email → PDF via LibreOffice/wkhtmltopdf
│   ├── page_worker.rb              # Image generation, OCR (Tesseract), S3 upload
│   ├── case_folder_import_worker.rb # S3 case folder imports with load file support
│   ├── batch_split_worker.rb        # Splits large imports (>2000) for parallelism
│   ├── document_pdf_create_worker.rb # PDF creation for document viewer
│   ├── native_pdf_ocr_worker.rb     # OCR via ocrmypdf + Nutrient integration
│   ├── treatment_worker.rb          # Litigation presentation images (callout/highlight)
│   ├── image_manipulation_worker.rb # Markups, rotations, redactions
│   ├── transcode_worker.rb          # Video transcoding via FFmpeg
│   ├── video_stitch_worker.rb       # Multi-segment video stitching
│   ├── parse_transcript_worker.rb   # Deposition transcript parsing (LEF/PTX/CMS)
│   ├── exhibit_zip_volume_worker.rb # Export ZIP volume creation
│   ├── exhibit_loadfile_worker.rb   # Export load file generation
│   ├── s3_case_delete_worker.rb     # Case S3 cleanup (10-day timeout, safety checks)
│   ├── spreadsheet_conversion_worker.rb # XLS/XLSB/CSV → XLSX
│   ├── native_placeholder_worker.rb # Bates-stamped placeholder images
│   └── aux/                         # Auxiliary workers (loaded separately)
├── lib/
│   ├── job_daemon.rb                # Core daemon: poll loop, worker dispatch, error handling
│   ├── forking_memory_watcher.rb    # Fork-based memory management (400MB limit)
│   ├── nextpoint_importer.rb        # Import framework (ZIP, case folder, load files)
│   ├── nextpoint_email_parser.rb    # Email parsing (MSG, EML, MBOX)
│   ├── email_processor.rb           # Email → exhibit with deduplication
│   ├── s3_uploader.rb               # S3 upload mixin with expansion metrics
│   ├── document_pdf.rb              # PDF generation and merging
│   ├── stamper.rb                   # Bates number stamp generation
│   ├── openoffice_converter.rb      # LibreOffice document conversion
│   ├── ffmpeg_converter.rb          # Video transcoding
│   ├── tesseract_ocr.rb             # OCR via Tesseract
│   ├── tika.rb                      # Apache Tika content extraction
│   ├── job_metrics.rb               # Job timing/size/expansion metrics
│   ├── import_support/              # Load file format processors
│   ├── export_support/              # Export template system
│   ├── image_manipulation/          # Image markup services
│   └── shared/                      # Symlink to shared_libs repo
├── system/
│   ├── auto_shutdown_checker.rb     # EC2 idle instance auto-shutdown (5 min)
│   ├── upload_job_daemon_logs_at_shutdown.rb # S3 log upload on shutdown
│   └── tika-server.jar              # Apache Tika server
└── test/unit/                       # Test::Unit tests with mocha mocking

Pattern Mapping

Pattern workers Implementation NGE Equivalent
Job dispatch Custom polling daemon (JobDaemon) polls Rails API every 30s via get_next_job() SNS/SQS event-driven with Lambda triggers
Worker registration Ruby inherited hook auto-registers subclasses; handles :job_type macro Lambda handler routing by event type
Job state machine API-managed: processingcomplete/error/retry via update_job() Checkpoint pipeline (11-step state machine)
Multi-tenancy One daemon instance per case (npcase_id); buffer box for dynamic assignment Per-case database schema switching
Memory management ForkingMemoryWatcher — fork/exec per job, kill at 400MB RSS Lambda memory limits (configurable)
Error handling Worker returns [result, error_message] tuple; retry status for transient failures Exception hierarchy (Recoverable/Permanent/Silent)
File processing pipeline Cascading delegation: Preprocess → Container/Conversion → Page Checkpoint pipeline steps
External tool orchestration LibreOffice, Ghostscript, Tesseract, FFmpeg, Tika, wkhtmltopdf, pdftk Hyland Filters (v11/v23), Apache PDFBox, Nutrient
S3 operations NextPointS3 from shared_libs (caching, multipart, MD5 dedup) shell/utils/s3_ops.py
API communication HTTP/XML via NextPointAPI with HMAC-SHA1 auth Direct database access + SNS events
Progress tracking update_job(id, 'processing', progress: percent) via API Checkpoint updates + SNS events
Batch splitting BatchSplitWorker splits >2000 items into parallel batch parts Chunk dispatch pattern
Retry logic Worker returns 'retry'; API reschedules (no backoff in worker) SQS visibility timeout with exponential backoff
Deployment EC2 instances with auto-shutdown checker; systemd services Lambda (auto-scaling) or ECS Fargate tasks
Logging File-based logs uploaded to S3 on shutdown CloudWatch Logs with structured JSON

Key Design Decisions

Custom Polling Daemon (Not Sidekiq/Resque)

The workers use a custom JobDaemon that polls the Rails API for jobs rather than using a message queue. This design predates modern job frameworks and provides:

  • Central job management — the Rails API controls job priority, assignment, and retry scheduling. Workers are stateless consumers.
  • Single-tenant isolation — each daemon instance is locked to one npcase_id, preventing cross-case data leakage. The "buffer box" pattern allows dynamic case assignment before the first job.
  • Priority support — workers can restrict to high, medium, or low priority jobs via CLI flags or EC2 user data.

NGE replacement: SNS/SQS event-driven architecture eliminates polling overhead, provides automatic scaling, and supports per-batch infrastructure lifecycle (create/teardown queues dynamically).

Fork-Based Memory Safety

ForkingMemoryWatcher forks a child process for the actual JobDaemon. It monitors RSS memory every few seconds and kills/re-forks the child if it exceeds 400MB. This handles memory leaks from external tools (LibreOffice, Ghostscript, ImageMagick) without restarting the entire daemon.

NGE replacement: Lambda has built-in memory limits (configurable up to 10GB). ECS Fargate tasks use container resource limits.

Cascading Job Type Delegation

PreprocessWorker acts as a router that identifies the file type and delegates to the appropriate specialist worker:

PreprocessWorker (identifies file type via Nextpoint::FileIdentifier)
  ├── ContainerWorker (PST/MBOX/ZIP → extract items → spawn child jobs)
  ├── ConversionWorker (Office/HTML/email → PDF via LibreOffice/wkhtmltopdf)
  └── PageWorker (single page → images, OCR, S3 upload)

Each worker creates child jobs via @api.create_job(), forming a processing tree. The run_as_job_type(:other_type) method allows a worker to delegate to another worker within the same process (avoids creating a new API job).

PreprocessWorker routing logic: - Downloads attachment from S3, checks for MacBinary encoding - Identifies true file type, renames if extension is wrong - Routes: containers (PST/MBX/MBOX/ZIP), convertible formats (Office/email/HTML), images (direct to PageWorker), unsupported types (AVI, EXE, DWG, etc. → marked unsupported) - Special detection: password-protected PDFs, PDF packages, animated GIFs

PageWorker OCR auto-detection: If attachment has <20 words of search text, OCR is performed automatically via Tesseract. Can be forced on/off via job details. Max image dimensions: 5100x6600 pixels.

NGE replacement: documentloader uses a linear checkpoint pipeline (11 steps) rather than a tree of delegated jobs. documentextractor handles file format conversion. documentuploader handles page image generation.

API-Driven State (No Direct Database Access)

Workers use the API for most state reads/writes. Exception: the export pipeline (ExhibitBroker in lib/export_support/) executes raw SQL against per-case MySQL databases via 12 prepared statements for performance-critical exhibit data retrieval. All other workers go through NextPointAPI HTTP calls:

  • get_next_job() / update_job() — job lifecycle
  • create_exhibit() / update_attachment() — document creation
  • register_as_worker() / ping() — worker lifecycle
  • create_job() — spawn child jobs

This design isolates workers from database schema details but creates a chatty HTTP protocol and single point of failure (Rails API).

NGE replacement: Lambda functions access databases directly via SQLAlchemy sessions (writer_session() / reader_session()), eliminating the API bottleneck.

External Tool Dependencies

Workers depend on 10+ external tools installed on EC2:

Tool Purpose NGE Replacement
LibreOffice (unoconv) Office → PDF conversion Hyland Filters v23
Ghostscript PDF → TIFF/PNG rendering Apache PDFBox
Tesseract OCR Hyland Filters OCR
FFmpeg Video transcoding Still in Legacy
wkhtmltopdf HTML → PDF Hyland Filters
Apache Tika Content/metadata extraction Hyland Filters
xpdf PDF text extraction Hyland Filters
pdftk PDF manipulation Apache PDFBox
libpst PST email archive extraction Hyland Filters
ImageMagick/GD2 Image processing Nutrient (PSPDFKit)

Worker Lifecycle

1. EC2 instance boots
2. job_daemon.rb parses CLI options (--environment, --npcase_id, --priority)
3. ForkingMemoryWatcher.new.start — forks child process
4. JobDaemon registers with Rails API: register_as_worker(ip, npcase_id, instance_id)
5. Poll loop (every 30s):
   a. api.get_next_job(npcase_id, priority_restriction)
   b. If job found → dispatch to registered worker class
   c. Worker.run() processes job, calls api.update_job() with progress
   d. On completion: update_job(id, 'complete', include_next_job: true)
   e. GC.start after each job
6. If memory > 400MB → ForkingMemoryWatcher kills and re-forks child
7. On shutdown (TERM signal):
   a. Graceful: finish current job
   b. Second TERM: force kill child process
   c. Upload logs to S3

EC2 Instance Lifecycle

Workers run on dedicated EC2 instances with auto-scaling:

  • Buffer box: Starts without npcase_id, polls buffer_box_work_request until assigned
  • Case-locked: Once assigned a case, the instance is locked to that case ID via file-based state (/mnt/npcase_id.dat)
  • Auto-shutdown: auto_shutdown_checker.rb (cron every 5 min) shuts down idle instances after 5 minutes of no jobs. Creates auto_shutdown_check.lock during evaluation — JobDaemon checks this lock to avoid fetching new jobs during shutdown evaluation. Respects .no_auto_shutdown file to disable.
  • Priority partitioning: Multiple daemons per instance with different priority restrictions (configured via EC2 user data)

Integration Points with NGE

What NGE Replaced

Legacy Worker NGE Module Status
PreprocessWorker + ContainerWorker documentloader Replaced (NGE)
ConversionWorker (file extraction) documentextractor Replaced (NGE)
PageWorker (image generation) documentuploader Replaced (NGE)
document_pdf_create_worker (PDF pages) documentpageservice Replaced (NGE)
ZipWorker / UnzipZipFromS3Worker unzipservice Replaced (NGE)
ExhibitZipVolumeWorker + ExhibitLoadfileWorker documentexporter Replaced (NGE)

What Remains in Legacy

Legacy Worker Purpose NGE Status
TranscodeWorker / VideoStitchWorker Video processing No NGE equivalent
ParseTranscriptWorker Deposition transcript parsing Hybrid — internal nge_enabled branching
TreatmentWorker Litigation presentation images Hybrid — uses Nutrient for image source when nge_enabled
DownloadExhibitPdfWorker PDF download/export Hybrid — uses Nutrient API for NGE PDFs, S3 for Legacy
DepositionZipWorker Deposition export packaging Hybrid — skips batch creation for NGE batches
ImageManipulationWorker Markup/redaction application Partially in Nutrient
SpreadsheetConversionWorker Spreadsheet conversion No NGE equivalent
S3CaseDeleteWorker Case cleanup No NGE equivalent
NativePdfOcrWorker PDF OCR with Nutrient Hybrid Legacy+NGE

Patterns to Preserve vs Deprecate

Preserve

  • Cascading job delegation — the router pattern (PreprocessWorker → specialist) is sound; NGE implements it differently via checkpoint steps
  • Memory watchdog — fork-based memory management is robust for external tool usage
  • Buffer box pattern — dynamic case assignment is useful for resource pooling
  • Progress tracking — percentage-based progress reporting to the API

Deprecate

  • HTTP polling for jobs — replaced by SNS/SQS event-driven triggers
  • XML-based API protocol — replaced by direct database access in NGE
  • Single-tenant EC2 instances — replaced by multi-tenant Lambda/ECS
  • File-based case locking — replaced by per-case database schema naming
  • Manual EC2 lifecycle — replaced by Lambda auto-scaling and ECS Fargate

Key File Locations

File Purpose
job_daemon.rb (root) Entry point with CLI parsing
lib/job_daemon.rb Core daemon loop and worker dispatch
lib/forking_memory_watcher.rb Fork-based memory management
workers/job_worker.rb Base worker class with handles macro
workers/preprocess_worker.rb File type router
workers/container_worker.rb Archive extraction (PST/MBOX/ZIP)
workers/conversion_worker.rb Document → PDF conversion
workers/page_worker.rb Page image generation + OCR
lib/nextpoint_importer.rb Import framework
lib/nextpoint_email_parser.rb Email parsing
system/auto_shutdown_checker.rb EC2 idle shutdown
Ask the Architecture ×

Ask questions about Nextpoint architecture, patterns, rules, or any module. Powered by Claude Opus 4.6.