Minigun is a high-performance data processing pipeline framework for Ruby with support for threads, COW forks, and IPC forks. This document outlines a comprehensive enhancement plan based on the current codebase state and TODOS.md.
- Cross-Boundary Routing ✓ (Completed 2025-01-04)
- Remove "skip" from hanging example tests
- IPC fork getting input via
tofrom various sources (IPC, COW, threads, master) - IPC fork doing output routing
- IPC to COW, COW to IPC, IPC to master routing
- IPC/COW fan-out/fan-in patterns
- Routing patterns
- output.to of IpcQueues - implemented via IpcRoutedOutputQueue
- cow_fork getting IPC input via to from IPC
- cow_fork getting IPC input via to from COW
- cow_fork getting IPC input via to from threads
- cow_fork getting IPC input via to from master
- cow_fork doing IPC output - COW now uses IpcOutputQueue
- ipc 2 cow, cow to ipc, ipc to master - all working
- ipc/cow fan-out/fan-in - examples 80, 81, 82, 84 working
- routing to inner stages of pipelines
- routing to inner stages of cow and ipc fork via an ingress delegator
- Additional scenarios
- test reroute with IPC/COW complex scenarios, inner routing, etc. - all tests passing
- producers inside IPC/COW forks
- routing with multiple forked processes - round-robin via IPC workers
- start of IPC/COW stage should not require await - added await: true option
- :worker_finished event seems like it should not work like it does. It resends back into the master... hmmm
- cleanup pipeline, etc constructor args
- wait_for_first_item implmentation look wonky
- make StageContext and actual class
- Transmit stats across forks
- Transmit logs across forks--look at Puma
- Support MINIGUN_LOG_LEVEL var
-
Initial HUD work:
- make a HUD inspired by htop to run as part of CLI
- two columns:
- LHS: flow diagram (ascii) on the other side, with ascii flow animations. make it inspired by cyberpunk (blade-runner/matrix/hackers)
- RHS: list of processes on one side
- use keys to navigate the hud.
- use ascii colors
- Before doing anything, plan it all out.
-
Running hud
- task.hud to run in IRB/Rails console
- Test task.hud to run in rake
-
Add hud to all examples when running
- Add idiomatic representation for each example
-
HUD UI improvement
- Introduce Hud::DiagramStage and DiagramPipeline/Executor
- Re-add throughput and bottleneck icons to stages
- Improve animations, use 24-frame counter (or just int counter which rolls over?)
- Arrows on lines?
- fix up/down of stages (not clearing lines)
- auto-size width of stage columns
- p95 rather than p99?
- HUD IPC support
- Process tree, forked routing
- process wrappers
-
HUD QoL
- % completion metrics
- CPU / MEM / disk / processes / queues
- tab menu?
- Error/log stream at bottom
-
Graceful shutdown
- signal trapping, child state management/killing
- Kill child threads/forks/ractors
- Ctrl+C once to start graceful shutdown (send end signals from all producers)
- Press Ctrl+C again to force quit.
-
to_mermaid
-
child culling (look at puma)
-
supervision tree of processes
-
htop-like monitoring dashboard (CLI)
-
Interactive examples in web docs
-
ASCII art drawing of tree
-
IPC batches?
-
Acks on queued items, guaranteed delivery?
-
Hooks (fork, stage, nesting)
-
Add process supervision tree?
-
IPC Reliability
- Address potential reliability issues with IPC
- Add timeout handling
- Handle pipe failures gracefully
- Stats reporting back to parent process via IPC
-
Execution Context Improvements
- Implement proposed DSL:
threads(10) do ... end processes(10) do ... end ractors(10) do ... end thread_per_batch(max: 10) do ... end process_per_batch(max: 10) do ... end ractor_per_batch(max: 10) do ... end
- Implement proposed DSL:
Priority: HIGH
-
Comprehensive Error Handling
- Define error handling strategy for each executor type
- Implement retry mechanisms with backoff
- Add circuit breaker patterns for failing stages
- Handle errors in hooks properly
-
Process Management
- Signal trapping for graceful shutdown
- Child process state management
- Child process culling (reference Puma's implementation)
- Supervision tree for processes
- Wait for last forked process to finish properly
Priority: MEDIUM
-
Batch Operators
batch- create batches from streamdebatch- flatten batches back to streamrebatch- change batch size
-
Flush Timers
- Time-based batch flushing
- Consolidate accumulator and batch logic
Priority: MEDIUM
-
Fiber Support
- Add Fiber-based executor
- Implement fiber pool
- Add
fibers(n)DSL method - Test fiber interop with other executors
-
Ractor Support (Ruby 3.0+)
- Complete ractor executor implementation
- Add examples for ractor usage
- Test ractor limitations and workarounds
Priority: MEDIUM
- Advanced Routing
- Weighted routing / load balancing
- Better name resolution (local -> up the chain)
- Routing to inner stages of pipelines (double nested)
- Ambiguous routing error handling (error unless immediate neighbor)
- Aliases:
produce_to_stage,consume_from_stage
Priority: MEDIUM
-
Configuration Infrastructure
- Global configuration object
- Pipeline-level scoped limits
- Stage-level configuration overrides
- Configurable queue lengths per stage
- Validation: warn if stage explicit value > global limit
-
Configuration Options
min_threads,max_threadsmax_processes,max_ractorsmax_retries,batch_sizekeepalive_seconds(nil, 0, 5, infinity)- Mark & sweep GC for keepalive management
Priority: MEDIUM
- Hook System
- Implement comprehensive hook DSL
before_fork,after_fork- currently exist, test coverageafter_each_fork- per-fork executionafter_each- generic lifecycle hookrescue_error- error handling hook (accepts error class)- Hook prepending mechanism
- Test that pipeline-level hooks apply to ALL children
- Hooks for nesting scenarios
Priority: MEDIUM
-
README Updates
- Document sequential routing vs explicit routing (
to/from) - Document parallel execution behavior
- Document fan-out/fan-in architecture:
- Every consumer has an input queue
- Fan-out adds producer output queues + router (load balancer)
- Fan-in connects producers directly to consumer queue
- Document
emit_to_stagedirect routing - Document yield syntax (classes use
yieldinstead of|output|) - String/symbol name handling (always convert to string)
- Document sequential routing vs explicit routing (
-
Architecture Documentation
- Multi-parent scenarios - end of queue detection
- Threading model vs fork model
- Concurrent execution abstractions (ractor, thread, fork)
- Pipeline to stage, stage to pipeline routing patterns
- Make pipeline and DAG accessible/inspectable
-
API Documentation
- Document all DSL methods
- Document stage lifecycle
- Document executor selection criteria
- Add more real-world examples
Priority: MEDIUM
-
Stats & Monitoring
- Move stats tracking from Runner to Task
- Make per-item latency tracking optional
- Make stats collection optional for performance
- IPC stats reporting back to parent
- Verbose logging of fork events
- Custom logging levels per stage
-
Monitoring Dashboard
- htop-like CLI dashboard
- Real-time pipeline visualization
- Stage throughput metrics
- Queue depth monitoring
- Worker pool utilization
-
Visualization
- Generate Mermaid diagrams from pipeline definition
- DAG visualization
- Execution flow diagrams
Priority: LOW
-
Dynamic Scaling
- Auto-scale thread/process pools based on queue depth
- Adaptive pipeline behavior
- Dynamic stage addition/removal
-
Named Stage Keepalive
- Keep named stages alive for reuse
- Keepalive timeout configurations
- Mark & sweep GC for stage lifecycle
Priority: LOW
-
Control Flow Keywords
parallelblocks for explicit parallel executionsequential/sequence/seriesblocks- Influence DAG building based on keywords
-
Custom Stage Types
- Better support for custom stage classes
- Stage class registry
- Plugin system for third-party stages
Priority: LOW
-
Pipeline Coordination
- Lock DAG/pipelines/task when running (prevent modification)
- Auto-start triggers:
auto_start = false,trigger(:stage) - Pipeline/stage introspection methods:
task.pipeline(:foo) task.stages(:bar) task.minigun.dag task.pipeline(:foo).stage(:bar) task.pipelines task.stages
-
Better ID Generation
- Replace current ID generation with better system
- Consider format like
_jf02ASj3 - Ensure uniqueness across distributed scenarios
-
Input/Output Streams
- Harden with
InputOutputStreamabstraction - Unified interface for queue operations
- Better separation of concerns
- Harden with
-
Unit Tests
- Stats tracking for all stage types
- All executor types
- Hook execution and propagation
- Signal handling
- Error propagation
-
Integration Tests
- Cross-pipeline routing scenarios
- Complex DAG topologies
- All executor combinations
- Graceful shutdown scenarios
-
Example Tests
- All examples in
/examplesshould have specs - Examples should demonstrate best practices
- Extract more real-world use cases
- All examples in
- Stage Names: Already completed (name as first arg)
- Executor Naming: Consider renaming for clarity
threaded→spawn_threads?fork_accumulate→spawn_forks?ractor_accumulate→spawn_ractors?
- Signal System: New signal classes may change APIs
- Hook System: New hook names may conflict
- Stage to Worker refactoring
- Signal system cleanup
- Error handling improvements
- Process management
- Batch operators
- Fiber support
- Configuration system
- Hook system completion
- Documentation updates
- Monitoring dashboard
- Mermaid diagram generation
- More examples
- Dynamic scaling
- Advanced routing
- Performance optimizations
- API stabilization for 1.0
-
Naming: What should the gem be called?
- Current: minigun
- Alternatives: nordstream, permian (from todos)
-
Execution Context: Should execution contexts be implicit or explicit?
- Risk of conflicts with base context
-
Pipeline Locking: Should running pipelines be immutable?
- Pro: Safety, no race conditions
- Con: Less flexibility for dynamic scenarios
-
Stats Overhead: Should stats be opt-in or opt-out?
- Consider performance-critical scenarios
-
Cross-Executor Routing: How to handle IPC/COW/thread boundaries?
- Serialization requirements
- Performance implications
# From TODOS.md - marked as suspicious
def execute(context, item: nil, _input_queue: nil, output_queue: nil)
return unless @block
context.instance_exec(item, output_queue, &@block)
endThis method should be reviewed for:
- Safety of instance_exec
- Parameter naming consistency
- Error handling
# From TODOS.md - registry registration pattern
pipeline_name = @pipeline.is_a?(Pipeline) ? @pipeline.name : nil
task.registry.register(self, pipeline_name: pipeline_name)This pattern is already implemented but may need review.
- All tests passing on Ruby 3.0+, 3.1+, 3.2+, 3.3+
- JRuby compatibility maintained
- Documentation completeness >90%
- Example coverage for all major features
- Performance benchmarks established
- Memory leak testing automated
- Production usage in at least 3 projects
- CONTRIBUTING.md with development setup
- Code style guide (RuboCop configuration)
- PR template
- Issue templates
- Changelog maintenance process
- Release process documentation