@@ -27,10 +27,11 @@ Roboflow is a distributed data transformation pipeline that converts robotics ba
2727| -------| ---------| -----------|
2828| ` roboflow-core ` | Foundation types, error handling, registry | ` RoboflowError ` , ` CodecValue ` , ` TypeRegistry ` |
2929| ` roboflow-storage ` | Storage abstraction layer | ` Storage ` , ` LocalStorage ` , ` S3Storage ` , ` StorageFactory ` |
30- | ` roboflow-dataset ` | Dataset format writers | ` LerobotWriter ` , ` DatasetWriter ` , ` ImageData ` |
31- | ` roboflow-distributed ` | Distributed coordination via TiKV | ` TiKVClient ` , ` BatchController ` , ` Worker ` , ` Catalog ` |
32- | ` roboflow-sources ` | Data source implementations | ` BagSource ` , ` McapSource ` , ` RrdSource ` |
33- | ` roboflow-sinks ` | Data sink implementations | ` LerobotSink ` , ` ZarrSink ` , ` DatasetFrame ` |
30+ | ` roboflow-executor ` | Stage-based task executor | ` StageExecutor ` , ` Pipeline ` , ` ExecutionPolicy ` , ` SlotPool ` |
31+ | ` roboflow-media ` | Image and video encoding/decoding | ` ImageData ` , ` VideoEncoder ` , ` ConcurrentVideoEncoder ` |
32+ | ` roboflow-dataset ` | Dataset format writers and sources | ` LerobotWriter ` , ` DatasetWriter ` , ` Source ` , ` BagSource ` , ` McapSource ` |
33+ | ` roboflow-pipeline ` | Pipeline execution and stages | ` DatasetPipelineExecutor ` , ` DiscoverStage ` , ` ConvertStage ` , ` MergeStage ` |
34+ | ` roboflow-distributed ` | Distributed coordination via TiKV | ` TiKVClient ` , ` BatchController ` , ` Worker ` , ` Scanner ` , ` Finalizer ` |
3435
3536## Core Abstractions
3637
@@ -55,21 +56,33 @@ trait SeekableStorage: Storage {
5556- ** S3** : AWS S3-compatible storage
5657- ** OSS** : Alibaba Cloud Object Storage
5758
58- ### Pipeline Stages
59+ ### Source/Sink Pattern
5960
6061``` rust
6162trait Source : Send + Sync {
6263 async fn initialize (& mut self , config : & SourceConfig ) -> SourceResult <SourceMetadata >;
6364 async fn read_batch (& mut self , size : usize ) -> SourceResult <Option <Vec <TimestampedMessage >>>;
6465 async fn finalize (& mut self ) -> SourceResult <SourceStats >;
6566}
67+ ```
68+
69+ ** Supported sources:**
70+ - ** MCAP** : Streaming and memory-mapped reads
71+ - ** ROS1 Bag** : Legacy bag format support
72+ - ** RRD** : Rerun data format
73+
74+ ### Pipeline Stages
6675
67- trait Sink : Send + Sync {
68- async fn initialize (& mut self , config : & SinkConfig ) -> SinkResult <()>;
69- async fn write_frame (& mut self , frame : DatasetFrame ) -> SinkResult <()>;
70- async fn flush (& mut self ) -> SinkResult <()>;
71- async fn finalize (& mut self ) -> SinkResult <SinkStats >;
72- fn supports_checkpointing (& self ) -> bool ;
76+ ``` rust
77+ // Stage-based execution inspired by Spark
78+ pub struct DiscoverStage ;
79+ pub struct ConvertStage ;
80+ pub struct MergeStage ;
81+
82+ // Pipeline executor
83+ pub struct DatasetPipelineExecutor {
84+ writer : Box <dyn DatasetWriter >,
85+ config : DatasetPipelineConfig ,
7386}
7487```
7588
@@ -117,18 +130,19 @@ The distributed system uses a Kubernetes-inspired design with TiKV as the contro
117130| kubelet heartbeat | HeartbeatManager | Worker liveness |
118131| Finalizers | Finalizer controller | Cleanup handling |
119132| Job/CronJob | BatchSpec, WorkUnit | Work scheduling |
133+ | Scheduler | Scanner | File discovery and job creation |
120134
121135### Batch State Machine
122136
123137```
124138┌──────────┐ ┌─────────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
125139│ Pending │───▶│ Discovering │───▶│ Running │───▶│ Merging │───▶│ Complete │
126140└──────────┘ └─────────────┘ └──────────┘ └──────────┘ └──────────┘
127- │
128- ▼
129- ┌──────────┐
130- │ Failed │
131- └──────────┘
141+ │
142+ ▼
143+ ┌──────────┐
144+ │ Failed │
145+ └──────────┘
132146```
133147
134148### TiKV Key Structure
@@ -142,6 +156,27 @@ roboflow/worker/{pod_id}/lock → LockRecord
142156roboflow/worker/{pod_id}/checkpoint→ CheckpointState
143157```
144158
159+ ## CLI Commands
160+
161+ The unified ` roboflow ` binary provides all operations:
162+
163+ ``` bash
164+ # Run unified service (default: all roles)
165+ roboflow run
166+
167+ # Run specific roles
168+ roboflow run --role worker
169+ roboflow run --role finalizer
170+
171+ # Job management
172+ roboflow submit s3://bucket/file.bag --output s3://bucket/out/
173+ roboflow jobs list
174+ roboflow batch list
175+
176+ # Health check
177+ roboflow health
178+ ```
179+
145180## Dataset Writing
146181
147182### LeRobot Format
@@ -151,23 +186,30 @@ struct LerobotConfig {
151186 pub dataset : DatasetConfig ,
152187 pub mappings : Vec <Mapping >,
153188 pub video : VideoConfig ,
154- pub flushing : FlushingConfig , // Incremental flushing
189+ pub flushing : FlushingConfig ,
190+ pub streaming : StreamingConfig ,
155191}
156192
157- struct FlushingConfig {
158- pub max_frames_per_chunk : usize , // Default: 1000
159- pub max_memory_bytes : usize , // Default: 2GB
160- pub incremental_video_encoding : bool ,
193+ struct StreamingConfig {
194+ pub finalize_metadata_in_coordinator : bool ,
161195}
162196```
163197
164- ### Incremental Flushing
198+ ### Video Encoding
165199
166- To prevent OOM on long recordings, the writer processes data in chunks:
200+ ``` rust
201+ // Concurrent video encoder for parallel chunk encoding
202+ pub struct ConcurrentVideoEncoder {
203+ config : ConcurrentEncoderConfig ,
204+ }
167205
168- 1 . ** Frame-based** : Flush after N frames (configurable, default 1000)
169- 2 . ** Memory-based** : Flush when memory exceeds threshold (default 2GB)
170- 3 . ** Output structure** : ` data/chunk-000/ ` , ` data/chunk-001/ ` , etc.
206+ pub struct ConcurrentEncoderConfig {
207+ pub storage : Arc <dyn Storage >,
208+ pub key_prefix : String ,
209+ pub codec : VideoCodec ,
210+ pub crf : u8 ,
211+ }
212+ ```
171213
172214### Upload Coordinator
173215
@@ -176,7 +218,6 @@ struct EpisodeUploadCoordinator {
176218 pub storage : Arc <dyn Storage >,
177219 pub config : UploadConfig ,
178220 pub progress : Option <UploadProgress >,
179- // Worker pool for parallel uploads
180221}
181222
182223struct UploadConfig {
@@ -213,7 +254,7 @@ let data = arena.alloc_vec::<u8>(size);
213254
214255``` toml
215256[source ]
216- type = " mcap" # or "bag", "rrd", "hdf5"
257+ type = " mcap" # or "bag", "rrd"
217258path = " s3://bucket/path/to/data.mcap"
218259
219260# Optional: topic filtering
@@ -325,17 +366,15 @@ enum CircuitState {
325366
326367| Flag | Purpose |
327368| ------| ---------|
328- | ` distributed ` | TiKV distributed coordination (always enabled) |
329- | ` dataset-hdf5 ` | HDF5 dataset format support |
330- | ` dataset-parquet ` | Parquet dataset format support |
331- | ` cloud-storage ` | S3/OSS cloud storage support |
332- | ` gpu ` | GPU compression (Linux only) |
333369| ` jemalloc ` | jemalloc allocator (Linux only) |
334370| ` cli ` | CLI support for binaries |
371+ | ` profiling ` | Profiling support for profiler binary |
372+ | ` cpuid ` | CPU-aware WindowLog detection (x86_64 only) |
373+ | ` io-uring-io ` | io_uring support for Linux 5.6+ |
335374
336375## See Also
337376
338377- ` CLAUDE.md ` - Developer guidelines and conventions
339- - ` tests/s3_pipeline_tests.rs ` - Integration tests
340- - ` crates/roboflow-dataset/src/lerobot/ ` - Dataset writer implementation
378+ - ` tests/ ` - Integration and E2E tests
379+ - ` crates/roboflow-dataset/src/ ` - Dataset writer and source implementations
341380- ` crates/roboflow-distributed/src/ ` - Distributed coordination
0 commit comments