Skip to content

Predictive Backpressure via Adaptive Window-Sizing Scheduler for Multi-Stage Ingestion Pipeline #46

Description

@elizabetheonoja-art

Problem Statement / Feature Objective

The telemetry ingestion pipeline comprises five serial stages: TCP accept → reassembly → parsing → tariff evaluation → storage write. Each stage communicates with the next via bounded channels, and when any stage's channel fills, backpressure propagates linearly upstream, eventually causing TCP connection drops. The current fixed-capacity channel sizing (e.g., 10,000 slots) is brittle: too small causes premature drops under load bursts; too large causes memory exhaustion. An adaptive window-sizing scheduler must be implemented that monitors each stage's processing latency, throughput, and buffer occupancy, then dynamically adjusts the per-stage credit-based window size using an AIMD (Additive Increase Multiplicative Decrease) congestion control algorithm modeled after TCP CUBIC.

Technical Invariants & Bounds

  • Pipeline stages enumerated: Accept, Reassemble, Parse, Evaluate, Store (5 stages).
  • Measurement period: every 100 ms, each stage records its processing_latency_p50/p99 (measured via Histogram from hdrhistogram), throughput_events_per_sec, and buffer_occupancy (0.0–1.0).
  • AIMD parameters: additive increase step = 64 slots per RTT (where RTT = 1 second, the scheduling interval); multiplicative decrease factor = 0.7 when buffer occupancy exceeds 0.85 or p99 latency exceeds 2× the running baseline.
  • Minimum window size: 1,024 slots per stage; maximum: 131,072 slots per stage.
  • Total pipeline memory budget: 512 MB across all channel buffers; scheduler must enforce this global cap.
  • Window adjustment propagation: downstream windows are adjusted first, then upstream — preventing head-of-line blocking amplification.

Codebase Navigation Guide

  • src/ingestion/scheduler.rs — new module: AdaptiveScheduler holding per-stage state, AIMD logic, and global budget enforcement.
  • src/ingestion/pipeline.rs — pipeline definition; replace fixed mpsc::channel(capacity) with WindowedChannel that supports dynamic resizing via fn resize(new_cap: usize).
  • src/ingestion/stages/accept.rs, reassemble.rs, parse.rs, evaluate.rs, store.rs — each stage reports metrics to the scheduler.
  • src/telemetry/metrics.rs — add scheduler_window_size{stage}, scheduler_credits_granted, scheduler_backpressure_events.
  • src/ingestion/lib.rs — create the scheduler at startup and pass Arc<AdaptiveScheduler> to each stage.
  • tests/ingestion/adaptive_backpressure_test.rs — stress test generating bursty load patterns (square wave, sawtooth, random) and verifying zero drops with bounded memory.

Implementation Blueprint

  1. In scheduler.rs, define struct StageState { window_size: AtomicU64, occupancy: AtomicF64, p99_latency_ns: AtomicU64, baseline_p99: AtomicU64, last_adjustment: Mutex<Instant> }. The scheduler runs a tokio interval timer every 1 second.
  2. On each tick, the scheduler collects metrics from all stages. Starting from the terminal stage (Store) and working backward to Accept, compute the new window size: if occupancy > 0.85 or p99 > 2.0 × baseline_p99, set new_window = max(window * 0.7, MIN_WINDOW). Otherwise, set new_window = min(window + 64, MAX_WINDOW). Clamp the total budget.
  3. Implement WindowedChannel<T> that wraps a tokio mpsc::Sender/Receiver pair but allows resizing. Resizing is achieved by creating a new channel with the target capacity and bridging via a drain task that forwards messages from the old channel to the new one. This avoids data loss during resizing.
  4. In pipeline.rs, instantiate WindowedChannel at each stage boundary with an initial capacity of MIN_WINDOW. On a scheduler adjustment, call channel.resize(new_window).
  5. Each stage exposes a StageMetrics struct that gets updated atomically on each processed event: record_latency(duration), record_throughput(), current_occupancy(). The stage pushes a snapshot to the scheduler on each tick via a shared Arc<StageState>.
  6. Implement total budget enforcement: the scheduler maintains the sum of all window sizes; if a stage's AIMD result would cause exceed BUDGET_MAX (512 MB / slot_size), scale all windows down proportionally, logging a warning.
  7. Write tests using proptest to generate synthetic latency distributions and verify that under high-variance load the scheduler converges to stable window sizes within 30 seconds and maintains occupancy between 0.5 and 0.85 for at least 90% of the time. Also test the pathological case where one stage blocks entirely — verify that upstream stages drain to minimum window within 5 seconds.

Metadata

Metadata

Assignees

Labels

Complexity: HardcoreIssues requiring deep systems-level engineering rigorGrantFox OSSIssue tracked in GrantFox OSSLayer: Core-EngineCore engine layerMaybe RewardedIssue may be eligible for a GrantFox rewardOfficial CampaignCampaign: Official CampaignType: Core-ArchitectureCore architecture concerns, invariants, and structural design

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions