Description
Currently, all TableProvider implementations (System, User, Shared, Stream) utilize an Eager Loading strategy. When a query is executed, the scan() method fetches all matching rows from RocksDB into a Rust Vec, converts them into a single giant Arrow RecordBatch, and wraps them in a MemoryExec.
The Problem:
- High Latency (TTFB): A query cannot start processing until the entire dataset is loaded from RocksDB and deserialized. This causes ~5ms latency even for small result sets due to allocation overhead.
- Memory Unsafe: Querying a large table (e.g., 1 million audit logs) loads all 1 million structs into RAM immediately, leading to potential OOM (Out of Memory) crashes.
- No Pipelining: DataFusion cannot process data in chunks; it must wait for the full read to complete.
The Goal:
Refactor the storage layer to implement Streaming Execution. The scan() method should return a custom ExecutionPlan that yields RecordBatches lazily (e.g., in chunks of 1024 rows) as the query engine requests them.
Technical Implementation Plan
1. Update EntityStore Trait (kalamdb-store)
The current scan_all method returns Result<Vec<T>>. We need a streaming equivalent.
2. Implement Custom Execution Plan (kalamdb-core)
Create a reusable DataFusion adapter for RocksDB iteration.
3. Refactor System Tables
Update kalamdb-system providers to use the new streaming executor.
4. Refactor User Data Tables
Update kalamdb-core/src/tables to support streaming for user data.
Acceptance Criteria
Description
Currently, all
TableProviderimplementations (System, User, Shared, Stream) utilize an Eager Loading strategy. When a query is executed, thescan()method fetches all matching rows from RocksDB into a RustVec, converts them into a single giant ArrowRecordBatch, and wraps them in aMemoryExec.The Problem:
The Goal:
Refactor the storage layer to implement Streaming Execution. The
scan()method should return a customExecutionPlanthat yieldsRecordBatches lazily (e.g., in chunks of 1024 rows) as the query engine requests them.Technical Implementation Plan
1. Update
EntityStoreTrait (kalamdb-store)The current
scan_allmethod returnsResult<Vec<T>>. We need a streaming equivalent.scan_iter(...) -> Result<impl Iterator<Item = Result<T>>>to theEntityStoretrait to allow row-by-row retrieval from RocksDB without allocating a vector.2. Implement Custom Execution Plan (
kalamdb-core)Create a reusable DataFusion adapter for RocksDB iteration.
struct RocksDbScanExec<T>which implementsdatafusion::physical_plan::ExecutionPlan.struct RocksDbStream<T>which implementsdatafusion::physical_plan::RecordBatchStream.poll_next(), it should fetchRecordBatch, and yield it.3. Refactor System Tables
Update
kalamdb-systemproviders to use the new streaming executor.AuditLogsTableProviderJobsTableProviderUsersTableProviderNamespacesTableProviderSystemTablesProvider(Registry)4. Refactor User Data Tables
Update
kalamdb-core/src/tablesto support streaming for user data.UserTableProvider: Ensure it streams data forSELECT *queries instead of loading the full partition.SharedTableProviderStreamTableProviderAcceptance Criteria
scan()methods no longer callscan_all()or allocateVec<T>for the full result set.RecordBatchStreamthat yields batches of a configurable size (default 1024).SELECT * FROM large_tableremains constant (buffer size) rather than linear to table size.SELECT * FROM system.tablesdrops to < 1ms in release mode.