Telemetry Stream Ingestion Memory Backpressure with Zero-Copy Binary Frames
Problem Statement
The telemetry stream handler in src/ingestion/stream_handler.rs receives binary-framed telemetry over TCP (custom protocol: 4-byte length prefix + CBOR payload). The handle_stream() function at line 140 uses tokio::io::AsyncBufReadExt::read_until() to delimit frames by newline (\n) — however the binary protocol does not use newline delimiters. This causes read_until() to buffer the entire TCP stream until a \n byte is encountered, which for binary CBOR payloads may never occur (or occur inside a CBOR binary string). Under a sustained 80 Mbps telemetry stream, the unbounded internal buffer grows to 2+ GB before OOM. Even when the delimiter is eventually found, the buffer contains thousands of concatenated frames that must be split manually, defeating the purpose of read_until(). Additionally, each frame is allocated as a new Vec<u8> — zero-copy slicing is not used.
State Invariants & Parameters
- Telemetry stream rate: 80 Mbps (10,000 frames/s × ~1KB each)
- TCP receive buffer: 64KB (kernel default)
read_until() internal buffer: unbounded (grows until delimiter found)
MAX_FRAME_SIZE: 64KB (protocol spec)
- Current allocation: 1
Vec<u8> per frame = 10,000 allocs/s
- Invariant:
resident_memory <= 512MB and alloc_rate <= 1000/s
Affected Code Paths
src/ingestion/stream_handler.rs:130-180 — read_until on binary stream
src/ingestion/frame_parser.rs:30-70 — CBOR frame parser allocating per frame
src/ingestion/buffer_pool.rs — Buffer pool needed but not implemented
src/ingestion/tests/stream_handler_bench.rs — No throughput benchmark
Resolution Blueprint
- Replace
read_until() with tokio::io::AsyncReadExt::read_exact() to read the 4-byte length prefix, then read_exact() to read exactly length bytes into a pre-allocated buffer from a pool. This eliminates the delimiter issue and bounds memory per frame to MAX_FRAME_SIZE.
- Implement a buffer pool (
BufferPool in src/ingestion/buffer_pool.rs) using tokio::sync::Semaphore-gated recycling: allocate 1024 buffers of MAX_FRAME_SIZE (64KB × 1024 = 64MB total). On frame read, acquire() a buffer from the pool, read into it, parse CBOR via ciborium::from_reader(&buffer[..length]), then release() the buffer back.
- Use zero-copy CBOR deserialization where possible:
ciborium::de::from_reader() can parse from &[u8] slices without copying. Pass the pooled buffer slice directly to the deserializer.
- Add a frame size validator: before reading the payload, validate
length <= MAX_FRAME_SIZE. If exceeded, drain the connection and log security event (potential malformed frame attack).
- Add a benchmark in
tests/benchmarks/stream_throughput.rs using criterion that measures throughput, allocation count, and peak memory at 80 Mbps sustained, compared against the current read_until() approach.
Labels
Complexity: Hardcore
Layer: Core-Engine
Type: Race-Condition
Telemetry Stream Ingestion Memory Backpressure with Zero-Copy Binary Frames
Problem Statement
The telemetry stream handler in
src/ingestion/stream_handler.rsreceives binary-framed telemetry over TCP (custom protocol: 4-byte length prefix + CBOR payload). Thehandle_stream()function at line 140 usestokio::io::AsyncBufReadExt::read_until()to delimit frames by newline (\n) — however the binary protocol does not use newline delimiters. This causesread_until()to buffer the entire TCP stream until a\nbyte is encountered, which for binary CBOR payloads may never occur (or occur inside a CBOR binary string). Under a sustained 80 Mbps telemetry stream, the unbounded internal buffer grows to 2+ GB before OOM. Even when the delimiter is eventually found, the buffer contains thousands of concatenated frames that must be split manually, defeating the purpose ofread_until(). Additionally, each frame is allocated as a newVec<u8>— zero-copy slicing is not used.State Invariants & Parameters
read_until()internal buffer: unbounded (grows until delimiter found)MAX_FRAME_SIZE: 64KB (protocol spec)Vec<u8>per frame = 10,000 allocs/sresident_memory <= 512MBandalloc_rate <= 1000/sAffected Code Paths
src/ingestion/stream_handler.rs:130-180—read_untilon binary streamsrc/ingestion/frame_parser.rs:30-70— CBOR frame parser allocating per framesrc/ingestion/buffer_pool.rs— Buffer pool needed but not implementedsrc/ingestion/tests/stream_handler_bench.rs— No throughput benchmarkResolution Blueprint
read_until()withtokio::io::AsyncReadExt::read_exact()to read the 4-byte length prefix, thenread_exact()to read exactlylengthbytes into a pre-allocated buffer from a pool. This eliminates the delimiter issue and bounds memory per frame toMAX_FRAME_SIZE.BufferPoolinsrc/ingestion/buffer_pool.rs) usingtokio::sync::Semaphore-gated recycling: allocate 1024 buffers ofMAX_FRAME_SIZE(64KB × 1024 = 64MB total). On frame read,acquire()a buffer from the pool, read into it, parse CBOR viaciborium::from_reader(&buffer[..length]), thenrelease()the buffer back.ciborium::de::from_reader()can parse from&[u8]slices without copying. Pass the pooled buffer slice directly to the deserializer.length <= MAX_FRAME_SIZE. If exceeded, drain the connection and log security event (potential malformed frame attack).tests/benchmarks/stream_throughput.rsusingcriterionthat measures throughput, allocation count, and peak memory at 80 Mbps sustained, compared against the currentread_until()approach.Labels
Complexity: HardcoreLayer: Core-EngineType: Race-Condition