A Ruby framework for building high-performance parallel data pipelines with multiple execution strategies (threads, fork, IPC).
┌─────────────────────────────────────────────────────────────────┐
│ Task (top-level coordinator) │
│ └─ root_pipeline: Pipeline │
│ ├─ stages: [Producer, Consumer, Router, ...] │
│ ├─ dag: Directed Acyclic Graph for routing │
│ └─ stats: Per-stage and pipeline-level metrics │
└─────────────────────────────────────────────────────────────────┘
↓
├─→ Runner: Manages job lifecycle, signals, statistics
│
└─→ Pipeline.run() creates Workers for each stage
│
├─→ Producer Worker (autonomous, no input)
│ └─→ emit(item) → OutputQueue
│
├─→ Transformer Worker (streaming, executor-managed)
│ ├─→ InputQueue.pop() consumes items
│ ├─→ execute(item) via Executor
│ └─→ emit(result) → OutputQueue
│
└─→ Consumer Worker (streaming, executor-managed)
├─→ InputQueue.pop() consumes items
└─→ execute(item) processes final data
1. Worker Thread Created (one per stage)
↓
2. StageContext Created (tracks sources, queues, stats)
↓
3. Executor Created (determines concurrency: thread, fork, etc.)
↓
4. Stage.run_stage(ctx) called
├─→ for Producer: emit all data, send EndOfSource
├─→ for Consumer: loop on InputQueue until EndOfStage
└─→ End signals cascade to downstream stages
| Type | Run Mode | Input | Output | Executor | Example |
|---|---|---|---|---|---|
| Producer | :autonomous |
✗ | ✓ | None | producer { 10.times { emit(i) } } |
| Processor | :streaming |
✓ | ✓ | Thread/Fork | processor { |x| emit(x*2) } |
| Consumer | :streaming |
✓ | ✗ | Thread/Fork | consumer { |x| puts x } |
| Accumulator | :streaming |
✓ | ✓ | Thread/Fork | Batches N items before emit |
| Router | :streaming |
✓ | ✓ | None | Auto-inserted for fan-out |
| Pipeline | :composite |
✓ | ✓ | Special | Wraps nested pipeline |
Thread Pool (:thread) COW Fork (:cow_fork) IPC Fork (:ipc_fork)
└─ Concurrency: Threads └─ Concurrency: Processes └─ Concurrency: Processes
Shared memory Copy-On-Write memory IPC pipes
GVL limited No GVL Data serialized
Best: I/O-bound Best: CPU w/ large data Best: CPU long-running
dag.add_edge(stage_a, stage_b) # Producer → Consumer
dag.upstream(stage) # All sources
dag.downstream(stage) # All sinks
dag.validate! # Cycle detectionstats = pipeline.stats
stats.for_stage(stage).items_produced
stats.for_stage(stage).items_consumed
stats.bottleneck() # Slowest stage
stats.throughput # items/secondQueue → SizedQueue(1000 default) → InputQueue → ConsumerStage
↑ ↓
└─ emit(item) from Producer pop() loops until EndOfStageProducer.execute() completes
↓ send_end_signals()
↓ EndOfSource to all downstreams
InputQueue.pop() receives EndOfSource from ALL upstreams
↓ returns EndOfStage sentinel
ConsumerStage.execute() breaks from loop
↓ send_end_signals()
↓ cascade continues downstream
emit(item)
↓
OutputQueue → downstream_queues[*]
↓
SizedQueue (bounded, provides backpressure)
↓
InputQueue.pop() → item
↓
executor → Thread.new { execute(item) }
↓
emit(result) → next stage's queues
stats = pipeline.stats
puts "Throughput: #{stats.throughput} items/s"
puts "Bottleneck: #{stats.bottleneck.stage_name}"# Add hooks to Runner/Pipeline
on_statistics_update { |stats| hud.update(stats) }
on_stage_complete { |stage, stats| hud.mark_done(stage) }result = runner.run
stats = task.root_pipeline.stats
# Access metrics after executionPer-Stage:
items_produced- total items emitteditems_consumed- total items processedthroughput- items/second (derived)latency_samples- p50, p95, p99 latencystart_time,end_time
Pipeline-level:
total_produced- total items from all stagestotal_consumed- total items processedbottleneck()- slowest stagethroughput- overall items/secondruntime- end_time - start_time
lib/minigun/
├── minigun.rb # Module entry, Platform.fork check
├── pipeline.rb # DAG management, stage orchestration
├── stage.rb # Stage hierarchy (Producer/Consumer/Router)
├── worker.rb # Thread wrapper, lifecycle
├── runner.rb # Job lifecycle, signal handling
├── task.rb # Top-level coordinator
├── dag.rb # Graph w/ TSort
├── stats.rb # Per-stage metrics (atomic counters, latency)
├── queue_wrappers.rb # InputQueue/OutputQueue abstractions
├── dsl.rb # User-facing DSL
├── execution/
│ └── executor.rb # Concurrency strategies
└── signal.rb # Signal objects (EndOfSource, EndOfStage)
examples/ # 100+ example pipelines
spec/ # Tests
The stage with minimum throughput limits overall performance:
Pipeline throughput = min(stage.throughput for all stages)
Automatic flow control via bounded queues:
Producer fast → OutputQueue fills (SizedQueue max=1000)
→ emit() blocks waiting for consumer
→ Consumer catches up → queue drains
Route items to stages without DAG connection:
output.to(:specific_stage, item) # Send to specific stage
output.to(:other_stage) << item # Alternative syntaxDetermines stage behavior:
:autonomous- generates data (Producer):streaming- processes queue items (Consumer/Router):composite- manages nested pipeline (PipelineStage)
producer :extract { data.each { emit(_1) } }
processor :transform { |x| emit(transform(x)) }
consumer :load { |x| database.insert(x) }producer :source { data.each { emit(_1) } }
processor :heavy, threads: 8 { |x| emit(expensive(x)) }
consumer :store { |x| cache.set(x) }producer :source, to: [:branch_a, :branch_b] { data.each { emit(_1) } }
processor :branch_a { |x| emit(path_a(x)) }
processor :branch_b { |x| emit(path_b(x)) }producer :source { items.each { emit(_1) } }
accumulator :batch, max_size: 100 { |item| emit(item) }
consumer :process { |batch| process_batch(batch) }add_stage(type, name, options, &block)- Register stagerun(context, job_id: nil)- Execute pipelinebuild_dag_routing!()- Validate DAGfind_stage(name)- Lookup stage
run_mode()- Returns:autonomous,:streaming,:compositeexecute(context, input_queue, output_queue, stats)- Process itemsemit(item)- Send item downstream (via output_queue)queue_size()- Get configured queue size
start()- Create execution threadjoin()- Wait for completion
for_stage(stage)- Get stage statsthroughput()- items/secondbottleneck()- Slowest stageincrement_produced(count)- Atomic counter
execute_stage(stage, context, input_queue, output_queue)- Concurrently executeshutdown()- Cleanup resources
Architecture Files (5 critical files):
/lib/minigun.rb- Module setup/lib/minigun/pipeline.rb- DAG & orchestration/lib/minigun/stage.rb- Stage types/lib/minigun/worker.rb- Thread lifecycle/lib/minigun/execution/executor.rb- Concurrency strategies
Supporting Files:
dag.rb- Routing graphstats.rb- Metricsrunner.rb- Job lifecycletask.rb- Top-level coordinatorqueue_wrappers.rb- Queue abstractionsdsl.rb- User DSL