Problem Statement / Feature Objective
Meter events from the same source can arrive at different collector nodes with wall-clock timestamps that are not causally consistent — two events may have the same millisecond timestamp even though one causally precedes the other (e.g., events A (on meter M, seq 1) and B (on meter M, seq 2) both receive wall-clock time T due to coarse clock resolution). The system currently relies solely on the meter's sequence number for ordering, but cross-meter causality (e.g., meter M1 reads 100 kWh, then meter M2 reads derived value) is invisible. A hybrid logical clock (HLC) component must be deployed that extends each event with an HLC timestamp that captures both physical time and Lamport-style causal order, enabling causally consistent read-back and deterministic replay across all collectors.
Technical Invariants & Bounds
- HLC format: 64-bit physical component (milliseconds since Unix epoch, 48 bits usable) + 16-bit logical component = 64 bits total, stored as
u64.
- HLC update rule on event receipt:
hlc = max(hlc.physical, wall_clock_ms); if hlc.physical == wall_clock_ms then hlc.logical++ else hlc.logical = 0.
- HLC propagation: when a collector forwards an event to another collector (e.g., for cross-region settlement), the recipient applies
hlc = max(hlc, event.hlc) before processing.
- Maximum logical counter wrap: 16 bits = 65,535 ticks per millisecond; if exceeded (should never happen under normal load), the physical component is incremented by 1 and logical resets to 0.
- Eventual consistency target: after partition healing, all collectors must agree on a total order of events within 5 seconds (measured via HLC comparison).
- Backward compatibility: existing
timestamp_ms: i64 fields in protobuf are retained as the wall-clock component; a new hlc_timestamp: u64 field is added alongside.
Codebase Navigation Guide
src/ingestion/hlc.rs — new module: HybridLogicalClock struct and HlcTimestamp(u64) newtype.
src/ingestion/collector.rs — each collector holds an Arc<Mutex<HybridLogicalClock>>; on each event receipt, call hlc.tick(wall_clock_ms) and assign the result to the event.
src/ingestion/watermark.rs — extend watermarks to include HLC timestamps alongside offsets; use HLC for merge conflict resolution.
src/ingestion/ordering.rs — new module: CausalOrderer that buffers events by source and emits them in HLC order.
src/types/meter_event.rs — add hlc_timestamp: u64 field; update serialization.
src/rpc/proto/telemetry.proto — add optional uint64 hlc_timestamp = 15 to MeterEvent message.
tests/ingestion/hlc_causal_test.rs — tests for causality preservation under concurrent access.
Implementation Blueprint
- In
hlc.rs, define HlcTimestamp(u64) with methods: fn physical(self) -> u64, fn logical(self) -> u16, fn new(physical: u64, logical: u16) -> Self. Implement Ord such that physical is compared first, then logical.
- Define
HybridLogicalClock { current: AtomicU64 }. Implement fn tick(&self, wall_clock_ms: u64) -> HlcTimestamp: atomically load current, extract physical (upper 48 bits) and logical (lower 16 bits). Compute candidate physical = max(old_physical, wall_clock_ms). If candidate == old_physical, new_logical = old_logical + 1; else new_logical = 0. If new_logical == 0xFFFF, increment candidate by 1 and set new_logical = 0. Store (candidate << 16) | new_logical via CAS; retry on failure.
- In
collector.rs, on each event receipt (before any processing), call hlc.tick(event.wall_clock_ms) and store the result in event.hlc_timestamp. If the event already carries an HLC from a downstream meter (rare), apply hlc = max(hlc, event.hlc) first.
- In
ordering.rs, implement CausalOrderer { buffer: BTreeMap<MeterSourceId, BinaryHeap<OrderedEvent>> } where OrderedEvent is ordered by HlcTimestamp. When a new event arrives, push it into the source's heap. A background flush task pops events from each source's heap in HLC order and forwards them to the next pipeline stage. Emit events only after their HLC timestamp is at least max(hlc.physical - MAX_CLOCK_SKEW_MS) to avoid waiting indefinitely for stragglers. Default MAX_CLOCK_SKEW_MS = 200.
- Update watermark CRDT merge logic: when two watermarks report different offsets for the same source, resolve not by max offset but by max HLC. If HLCs are equal, use max offset.
- Update protobuf definitions and all serialization paths to include
hlc_timestamp. Maintain backward compatibility by populating the new field from timestamp_ms on deserialization of old events.
- Write a test simulating the classic "causal delivery" scenario: collector 1 receives event A (seq 1), forwards it to collector 2 with a delay; collector 2 receives event B (seq 2) directly from the meter first; verify that the output stream delivers A before B (HLC ordering preserves causality). Use a simulated network delay via
tokio::time::advance.
Problem Statement / Feature Objective
Meter events from the same source can arrive at different collector nodes with wall-clock timestamps that are not causally consistent — two events may have the same millisecond timestamp even though one causally precedes the other (e.g., events A (on meter M, seq 1) and B (on meter M, seq 2) both receive wall-clock time T due to coarse clock resolution). The system currently relies solely on the meter's sequence number for ordering, but cross-meter causality (e.g., meter M1 reads 100 kWh, then meter M2 reads derived value) is invisible. A hybrid logical clock (HLC) component must be deployed that extends each event with an HLC timestamp that captures both physical time and Lamport-style causal order, enabling causally consistent read-back and deterministic replay across all collectors.
Technical Invariants & Bounds
u64.hlc = max(hlc.physical, wall_clock_ms); if hlc.physical == wall_clock_ms then hlc.logical++ else hlc.logical = 0.hlc = max(hlc, event.hlc)before processing.timestamp_ms: i64fields in protobuf are retained as the wall-clock component; a newhlc_timestamp: u64field is added alongside.Codebase Navigation Guide
src/ingestion/hlc.rs— new module:HybridLogicalClockstruct andHlcTimestamp(u64)newtype.src/ingestion/collector.rs— each collector holds anArc<Mutex<HybridLogicalClock>>; on each event receipt, callhlc.tick(wall_clock_ms)and assign the result to the event.src/ingestion/watermark.rs— extend watermarks to include HLC timestamps alongside offsets; use HLC for merge conflict resolution.src/ingestion/ordering.rs— new module:CausalOrdererthat buffers events by source and emits them in HLC order.src/types/meter_event.rs— addhlc_timestamp: u64field; update serialization.src/rpc/proto/telemetry.proto— addoptional uint64 hlc_timestamp = 15toMeterEventmessage.tests/ingestion/hlc_causal_test.rs— tests for causality preservation under concurrent access.Implementation Blueprint
hlc.rs, defineHlcTimestamp(u64)with methods:fn physical(self) -> u64,fn logical(self) -> u16,fn new(physical: u64, logical: u16) -> Self. ImplementOrdsuch that physical is compared first, then logical.HybridLogicalClock { current: AtomicU64 }. Implementfn tick(&self, wall_clock_ms: u64) -> HlcTimestamp: atomically load current, extract physical (upper 48 bits) and logical (lower 16 bits). Compute candidate physical = max(old_physical, wall_clock_ms). If candidate == old_physical, new_logical = old_logical + 1; else new_logical = 0. If new_logical == 0xFFFF, increment candidate by 1 and set new_logical = 0. Store (candidate << 16) | new_logical via CAS; retry on failure.collector.rs, on each event receipt (before any processing), callhlc.tick(event.wall_clock_ms)and store the result inevent.hlc_timestamp. If the event already carries an HLC from a downstream meter (rare), applyhlc = max(hlc, event.hlc)first.ordering.rs, implementCausalOrderer { buffer: BTreeMap<MeterSourceId, BinaryHeap<OrderedEvent>> }whereOrderedEventis ordered byHlcTimestamp. When a new event arrives, push it into the source's heap. A background flush task pops events from each source's heap in HLC order and forwards them to the next pipeline stage. Emit events only after their HLC timestamp is at leastmax(hlc.physical - MAX_CLOCK_SKEW_MS)to avoid waiting indefinitely for stragglers. Default MAX_CLOCK_SKEW_MS = 200.hlc_timestamp. Maintain backward compatibility by populating the new field fromtimestamp_mson deserialization of old events.tokio::time::advance.