Skip to content

server: add streaming ingest and e2e coverage#8

Merged
packethog merged 20 commits into
mainfrom
ss/streaming-ingest-parity
May 15, 2026
Merged

server: add streaming ingest and e2e coverage#8
packethog merged 20 commits into
mainfrom
ss/streaming-ingest-parity

Conversation

@packethog
Copy link
Copy Markdown
Owner

@packethog packethog commented May 7, 2026

Summary

This PR adds opt-in Hyperliquid streaming ingest while preserving the existing block-by-block ingest path. It keeps block mode as the compatibility baseline, adds live-shape fixture coverage for streaming mode, and expands multicast E2E coverage for TOB and DOB payloads.

Major Changes

  • Added --ingest-mode block|stream and --hl-data-root configuration.
  • Added streaming ingest for node_raw_book_diffs_streaming, node_order_statuses_streaming, and node_fills_streaming.
  • Preserved block-mode ingest and existing block-mode multicast E2E behavior.
  • Added streaming accumulation/watermarking for raw diffs and statuses, including out-of-order row handling and late-row observability.
  • Changed raw-book New diffs to insert resting orders directly instead of passing through local matching, matching validator-decided book state.
  • Split TOB fanout from high-volume L4 updates so TOB multicast is not starved by DOB/L4 traffic.
  • Fixed TOB heartbeat behavior so suppressed or unmapped source messages do not reset marketdata activity.
  • Added TOB fill-pair accumulation for live streaming fill rows, including split-row fill pairing and separate accounting for expected one-sided #... token-alias fills.
  • Added multicast publisher Prometheus metrics on a dedicated /metrics listener.
  • Added more actionable TOB suppression, catch-up, fill-pairing, ingest, and latency logs.
  • Added and updated README and ARCHITECTURE documentation for streaming/block hot paths and metric meanings.

E2E And Fixture Coverage

  • Added dual-validator block-vs-stream fixtures and goldens.
  • Added normalized TOB/DOB packet payload comparisons.
  • Added downstream TOB parser E2E coverage using edge-multicast-ref in Docker.
  • Updated streaming fill fixtures to model live validator shape where fill sides can arrive in separate rows.
  • Added assertions for streaming TOB trade emission and semantic equality against block mode where appropriate.

Canary Notes

Validated on aws-tyo-hl-mainnet and tyo-hl-node during streaming-mode canaries. The canaries helped identify and fix:

  • raw-diff New orders being incorrectly matched away locally;
  • late streaming rows being dropped after premature finalization;
  • TOB starvation from shared L4 fanout;
  • stale-source TOB suppression and heartbeat starvation;
  • missing TOB trades from split-row streaming fills;
  • misleading fill orphan metrics caused by expected one-sided #... token-alias fills.

Latest canary confirmed TOB fill-pair metrics split expected one-sided token-alias fills from true orphan drops, while trade packets continue to publish.

Tests Run

  • cargo test -p server multicast::publisher
  • cargo test -p server fill_pair_accumulator -- --nocapture
  • cargo test -p server listeners::order_book::block_mode_multicast_e2e -- --nocapture
  • cargo test -p server dual_validator_fixture_matches_block_and_stream_goldens -- --nocapture
  • cargo test -p server
  • cargo clippy --workspace --all-targets

Clippy exits successfully with pre-existing warning noise.

@packethog packethog changed the title server: add streaming ingest parity tests server: add streaming ingest and e2e coverage May 7, 2026
packethog and others added 3 commits May 7, 2026 13:13
run_websocket_server's `ingest_mode` argument was reaching `hl_listen`
(so the file watcher pointed at the right node_*_streaming directories)
but never reaching the OrderBookListener itself, which was constructed
via `OrderBookListener::new(...)` — defaulted to IngestMode::Block.

Net effect for `--ingest-mode stream` deploys:
  - File watcher correctly read node_*_streaming.
  - process_data → receive_batch → match self.ingest_mode → Block →
    receive_block_batch, populating order_status_cache / order_diff_cache.
  - First snapshot fetch → init_from_snapshot at height H → while-pop_cache
    loop pops a cached batch at height H+N (N >> 1), apply_updates strict
    +1 monotonic check fires, "Failed to apply updates to this book" log,
    retry=true, order_book_state stays None.
  - drain_streaming_blocks (the streaming-mode replay path) is gated
    behind retry==false, so it never runs.
  - Result: snapshot + refdata multicast keeps flowing (those don't
    depend on order_book_state), but DoB / TOB delta emit pipelines
    stay silent.

Reproduced live on aws-tyo-hl-mainnet running this branch with
`--ingest-mode stream`: tcpdump showed 17.2 pkt/s on dob-snapshot-port
(10003) and 0.4 pkt/s combined on the two mktdata ports (9001 + 10001).

The fix is to use `new_with_ingest_mode` so the listener's ingest_mode
matches the file watcher's. With this, in stream mode:
  - receive_stream_batch is invoked → events go into streaming_state.blocks.
  - init_from_snapshot's while-pop_cache loop is a no-op (block-mode
    caches stay empty), retry stays false, order_book_state is set,
    drain_streaming_blocks runs and replays the streaming events
    against the snapshot via apply_stream_diff.

cargo test --workspace: 129 passed, 0 failed.
@armcconnell
Copy link
Copy Markdown
Collaborator

armcconnell commented May 7, 2026

Tested this branch on aws-tyo-hl-mainnet with --ingest-mode stream. DoB/TOB delta multicast was silent (~0.4 pkts/sec on 9001+10001 vs ~17 pkts/sec on snapshot 10003) while file-watcher and snapshot fetch worked.

run_websocket_server threads ingest_mode to hl_listen (so the file watcher resolves node_*_streaming dirs), but builds the listener with OrderBookListener::new(...) which defaults to IngestMode::Block. Result: receive_batch routes to receive_block_batch, populating block-mode caches. init_from_snapshot's pop_cache loop then fails apply_updates' strict +1 check, retry=true, order_book_state stays None, drain_streaming_blocks is gated behind retry==false so it never runs. Snapshot/refdata still emit because they don't depend on order_book_state.

Fix in 43e467f: use new_with_ingest_mode so the listener matches the file watcher. Workspace tests pass.

After 43e467f, init_from_snapshot's streaming-mode replay path
(drain_streaming_blocks) was reachable, but immediately failed with
  Failed to apply cached streaming updates after snapshot:
  Received finalized streaming block 987725382, current height 987725455
because streaming_state.blocks accumulated events while waiting for the
first snapshot fetch (~5-15s of clock drift). Those events have block
heights <= snapshot height — already reflected in the snapshot — so
apply_stream_diff correctly rejects them via its `block_number <
self.height` check.

drain_streaming_blocks bails on the first error, so each snapshot fetch
cycle only clears one stale block. With ~70 stale blocks queued, that
would have taken ~70 minutes of restart-cycles to clear, during which
DoB/TOB delta multicast stays silent.

Fix: in init_from_snapshot, before draining, prune
streaming_state.blocks to drop any block whose height <= snapshot
height. Logs the dropped count for observability.

Reproduced live on aws-tyo-hl-mainnet:
  18:57:26 ERROR ... Received finalized streaming block 987725382, current height 987725455
  18:58:26 ERROR ... Received finalized streaming block 987725383, current height 987726329
  (mktdata: 1 packet / 5s on 9001+10001 throughout)

cargo test --workspace: 129 passed, 0 failed.
…en ctor

Three follow-on fixes after the listener constructed-with-wrong-mode and
prune-stale-blocks fixes from 43e467f and d49d15b. Surfaced when the
streaming pipeline started actually running on aws-tyo-hl-mainnet.

1. Cross-file finalization race (bug)
   hl-node writes streaming statuses, diffs, and fills concurrently;
   notify delivers them through one channel but cross-file ordering
   lags. drain_streaming_blocks may finalize block N (because we saw
   N+1's diffs) when N's statuses then arrive, causing
   ensure_stream_block_not_finalized to return Err and tear down the
   listener. Soft-fix: drop late events with a warn log in
   receive_stream_statuses / receive_stream_diffs. Removes the
   now-dead helper. Test renamed to *_is_dropped and asserts Ok().

2. Per-diff L2 snapshot publish (perf)
   drain_streaming_blocks called publish_l2_snapshot inside the per-
   diff loop. Each call recomputes the L2 snapshot for every active
   instrument and spawns a tokio task — on a busy mainnet feed this
   was the dominant cost and stream mode could not keep pace with
   real-time hl-node throughput. process_data already calls
   l2_snapshots(true) at the end of every file-read chunk (same
   cadence block mode uses), so the per-diff call was redundant
   beyond being expensive. Removed.

3. Remove OrderBookListener::new convenience constructor
   The convenience constructor defaulted to IngestMode::Block, which
   silently created Block-mode listeners in the streaming code path
   (43e467f's bug). Removing it forces every call site to specify
   the mode and makes that bug-class structurally impossible. Test
   fixtures updated to use new_with_ingest_mode explicitly.

New test: init_from_snapshot_prunes_stale_pre_snapshot_streaming_blocks
covers d49d15b directly (no test asserted that prune behavior before).
The existing wire-parity test
dual_validator_capture_matches_block_and_stream_payloads (line 552)
still passes — block-vs-stream multicast frame byte parity is
maintained through all of these changes.

cargo test --workspace: 130 passed, 0 failed.
Both apply_updates (block mode) and apply_stream_diff (stream mode) used
to return Err if a Remove or Update diff referenced an oid that wasn't
on our internal book. That error propagated up to hl_listen, tore down
the listener task, and systemd restarted the process. On a busy mainnet
feed in stream mode this triggered every ~30s — the cancel reached us
before the corresponding snapshot reflected the new order.

Block mode has the same strict check; it just hits this case rarely
enough in production not to crash. The two modes need symmetric
behavior — Steve specifically called out parity as the concern — and
the snapshot validator already reconciles the book against hl-node every
60s with surgical per-coin recovery. So both modes can safely log+skip
the failed op:

  - apply_updates Update/Remove (state.rs:198-231)
  - apply_stream_diff Update/Remove (state.rs:290-310)

The previous batch_boundary close on the failure path is no longer
needed because we don't bail mid-batch — the regular boundary close at
the end of the iteration handles it.

New e2e test missing_order_remove_is_skipped_in_both_modes asserts
both block and stream listeners stay ready after a phantom Remove,
catching regressions in the parity Steve flagged.

Reproduced live on aws-tyo-hl-mainnet:
  19:27:01 ERROR ... OrderDiffs processing error: Unable to find order
           on the book NodeDataOrderDiff { oid: 415431370878, coin: BTC,
           raw_book_diff: Remove }
  followed by NRestarts climbing.

cargo test --workspace: 131 passed, 0 failed.
@armcconnell
Copy link
Copy Markdown
Collaborator

Pushed four follow-on fixes after deploying surfaced each one. All preserve block-vs-stream wire parity (existing dual_validator_capture_matches_block_and_stream_payloads still passes); each fix has an e2e test.

  • d49d15binit_from_snapshot prunes streaming_state.blocks at heights ≤ snapshot height before draining; without it apply_stream_diff rejects pre-snapshot events and order_book_state stays None forever.
  • b16d985 — (a) receive_stream_statuses/diffs drop late events for finalized blocks instead of crashing (cross-file ordering race between the three streaming files); (b) removed per-diff publish_l2_snapshot from drain_streaming_blocks — was the dominant cost and kept stream mode from keeping pace with mainnet throughput; (c) removed OrderBookListener::new convenience ctor that defaulted to Block, so future call sites can't silently misroute.
  • 7caf715apply_updates and apply_stream_diff log+skip Update/Remove for missing orders instead of Err; the snapshot validator's per-coin recovery reconciles within 60 s, so this turns a hard restart cycle into a recoverable transient. Applied symmetrically in both modes for parity.

cargo test --workspace: 131 passed, 0 failed.

@armcconnell
Copy link
Copy Markdown
Collaborator

Update: deployed all fixes to a real HL mainnet node. Stream mode runs but produces ~10 missing-order Update/Remove diffs per second where block mode produces ~0. The soft-tolerance in 7caf715 silently skips them, but the book degrades to an inconsistent state quickly: after 17 min uptime the publisher's self.height froze at 987768488 while hl-node advanced ~6,000 blocks. The snapshot validator's "our height < snapshot height" early-return path fires every minute, so surgical recovery never kicks in either. 14,218 missing-order warns total over that window.

Root cause is the snapshot/streaming-event boundary alignment — 7caf715 treated the symptom, not the cause. Plausible suspects: from_snapshot::remove_triggers dropping orders that streaming events later reference, or the fetched snapshot lagging hl-node's actual on-disk state by some number of blocks.

Switched the mainnet host back to block mode (still on this branch — confirms all fixes preserve block-mode behavior). Stream mode root cause still needs investigation before this can be merged.

armcconnell and others added 3 commits May 8, 2026 09:17
The earlier soft-tolerance fix (7caf715) covered Update/Remove for orders
missing from the book in both apply_updates and apply_stream_diff, but
NOT the third strict failure mode in those functions: a New diff whose
opening status isn't available.

Block mode (apply_updates): the order_statuses batch is filtered by
`is_inserted_into_book()` into an order_map. If a New diff's oid isn't
in that map (status didn't end up in the batch — e.g. transient order),
strict path returned `Unable to find order opening status` and tore the
listener down. Reproduced live on aws-tyo-hl-mainnet running this branch
in block mode (~1 occurrence per 4 hours, recovers in ~10s but generates
restart noise).

Stream mode (apply_stream_diff): drain_streaming_blocks normally defers
New diffs without status (BREAKs the inner loop). Defensive fix here
preserves parity with block mode for any pathway that might still call
apply_stream_diff with order_status=None.

Both modes now log+skip and advance height/time so the diff isn't
replayed; snapshot validation reconciles the coin's book within 60s
via apply_recovery, same recovery pattern Update/Remove already use.

New e2e test new_diff_without_opening_status_is_skipped_in_block_mode
asserts the listener stays healthy after a phantom New, mirroring the
existing missing_order_remove_is_skipped_in_both_modes coverage.

Note: soft-tolerance does NOT cause crossed books — OrderBook::add_order
runs match_order on every add, so the internal book is structurally
non-crossing even when state lingers. The actual cost of soft-tolerance
is up to 60s of phantom executions/wrong events on the wire per affected
coin until snapshot validation detects divergence and emits
InstrumentReset.

cargo test --workspace: 132 passed, 0 failed.
Trade::from_fills used unwrap() to extract the Ask and Bid fills from a
HashMap<Side, NodeDataFill>, and assert_eq! to check coin/tid agreement
between the pair. The pair-up upstream in coin_to_trades is best-effort:
it pops two adjacent fills from the batch and inserts them keyed by Side.
If both happen to share a Side, the second insert overwrites the first
and the HashMap has only one entry, so unwrap() panics. Same shape if
the pair has mismatched coin or tid.

Failure mode in production: this panic kills the tokio worker running
MulticastPublisher::run, which silently terminates the TOB Quote/Trade
emit pipeline. The publisher process itself stays up (other tokio
tasks unaffected), so systemd doesn't restart, NRestarts stays 0,
and the operator only sees TOB ports go quiet on the wire while DoB
keeps working. Reproduced live on aws-tyo-hl-mainnet running the
streaming-ingest-parity branch in block mode, after the upstream
soft-tolerance fixes (43e467f, d49d15b, b16d985, 7caf715, 89a113d)
let the publisher stay up long enough to encounter a malformed pair.

This is malbeclabs/hyperliquid#4 — same bug, surfaced harder by the
absence of restarts.

Fix:
  - Trade::from_fills returns Option<Self>, uses ? on the HashMap
    removes for both sides, and log+returns None on coin/tid mismatch
    (replacing the assert_eq!s).
  - coin_to_trades handles None by logging a warn and continuing.
    The trade pair is dropped on the floor; the rest of the batch is
    still processed normally.

New unit tests in types::trade_from_fills_tests cover all four cases:
matched pair (ok), missing side (None), coin mismatch (None), tid
mismatch (None). cargo test --workspace: 136 passed, 0 failed.
@packethog
Copy link
Copy Markdown
Owner Author

Update on the streaming ingest canary/debugging:

I found two streaming-mode ordering issues and pushed a fix in 1501394.

  1. Raw-book New diffs now insert the resulting resting order directly instead of going through local matching. This fixes cases where raw validator diffs represent already-decided book state, but our local matcher could decide the order crossed and remove it immediately, causing later Update/Remove diffs for the same OID to appear missing.

  2. Streaming block finalization now has a small grace window before marking a block finalized. Live logs showed the missing update/remove warnings were preceded by bursts of:
    dropping late streaming diffs for finalized block ...

    Root cause: streaming files can deliver multiple JSONL rows for the same block, and cross-file / inotify ordering can lag. We were finalizing block N as soon as any later block appeared and the currently buffered rows for N were drained. If more raw-diff rows for N arrived shortly afterward, we dropped them, and later removes for those dropped orders became missing-order warnings.

Validation:

  • Before fix, one 5-minute canary sample showed:
    • late_diffs=17891
    • late_statuses=1884
    • missing_update=74
    • missing_remove=4271
  • Deployed the fixed canary to tyo-hl-node:
    • sha256 facf0b81accad31fd086f720d42e032a87e47ce04c08ac045b3aee4b57b3dcae
    • PID 158748
  • Post-restart samples:
    • 65s: late_diffs=0 late_statuses=0 missing_update=0 missing_remove=0
    • 125s: late_diffs=0 late_statuses=0 missing_update=0 missing_remove=0
    • also validation_behind=0, new_did_not_rest=0

Tests run:

  • cargo test -p server listeners::order_book::block_mode_multicast_e2e -- --nocapture
  • cargo test -p server
  • cargo clippy --workspace --all-targets

Clippy exits 0, with the same pre-existing warning noise.

@packethog
Copy link
Copy Markdown
Owner Author

Update on the DOB unknown coin warnings from tyo-hl-node:

I traced this to instrument metadata resolution rather than the DOB tap itself. The live hl-node files were emitting coins such as xyz:NATGAS, flx:COPPER, hyna:ADA, and spot-token aliases like #250, but our registry did not resolve them correctly.

Root causes fixed in 46830b2:

  1. Builder DEX metadata from Hyperliquid now appears to return already-prefixed names, e.g. xyz:NATGAS. We were always prepending the DEX name, creating registry keys like xyz:xyz:NATGAS, while the validator data uses xyz:NATGAS.

  2. Spot token aliases like #250/#251 appear in raw book files, but the registry only included spot universe entries like @...; it now also registers #<token_index> aliases from spotMeta.tokens.

Downstream impact before the fix:

  • DOB OrderAdd, OrderCancel, and OrderExecute events for affected instruments were skipped because the tap could not map coin -> instrument_id.
  • That means downstream DOB consumers could see incomplete books for affected builder DEX / token-alias instruments.

Validation:

  • Added focused unit tests for prefixed builder DEX names, backwards-compatible unprefixed builder names, and #<token_index> aliases.
  • Ran cargo fmt --check
  • Ran cargo test -p server
  • Ran cargo clippy --workspace --all-targets — exits 0 with the existing warning noise.

Live canary:

  • Built and deployed candidate to tyo-hl-node
  • sha256: c2aab020dced518c594c39be3ed312f44184edfdb72d492904d4f86644183362
  • service PID: 169722
  • after startup catch-up aged out, dob_tap: unknown coin was 0 over the latest 60s window.

@packethog packethog requested a review from armcconnell May 13, 2026 15:53
@packethog packethog self-assigned this May 13, 2026
Copy link
Copy Markdown
Collaborator

@armcconnell armcconnell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-ups (non-blocking):

Important — observability gap. The soft-tolerance branches in server/src/listeners/order_book/state.rs (apply_updates / apply_stream_diff, ~6 places) and the drain-skip branches in server/src/listeners/order_book/mod.rs:1141-1173 log+skip without bumping a metric. Snapshot validation reconciles every 60s so it's recoverable, but a slow-burn validator/publisher divergence is invisible on dashboards. Suggest orderbook_listener_apply_skipped_total{source, kind} with bounded labels.

Minor:

  • server/src/listeners/order_book/progress.rs::now_epoch_ms uses SystemTime for a monotonic report-cadence gate; should be Instant to be NTP-jump-resilient.
  • FillPairAccumulator::evict_to_capacity runs post-batch in server/src/multicast/publisher.rs:119-134; a single batch >MAX_PENDING=100_000 can technically breach the cap mid-batch. Not real at current HL volume but worth doing inline.
  • The block_mode_multicast_e2e module now holds many stream-mode tests too — name is stale. Rename when convenient.

Copy link
Copy Markdown
Collaborator

@armcconnell armcconnell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment for some follow up items but I think we should merge this now.

@packethog packethog merged commit 6a8a6e2 into main May 15, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants