From 601cb8588c5af8d7d31eba7444f27b0e28b186dd Mon Sep 17 00:00:00 2001 From: Andrew McConnell Date: Tue, 28 Apr 2026 16:14:14 -0500 Subject: [PATCH] dob: scale wire qty fields to per-instrument qty_exponent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The DoB emitter was writing OrderAdd.quantity, OrderExecute.exec_quantity, and SnapshotOrder.quantity at the publisher's internal 10^8 fixed-point (Sz::value()) while the accompanying InstrumentDefinition reported the venue's per-instrument qty_exponent. Subscribers decode display = raw * 10^qty_exponent, so the on-wire error was 10^(8 + qty_exponent)x for every instrument with qty_exponent != -8 — i.e. every HL instrument. TOB scaled correctly because it parsed level.sz() as a string with qty_to_fixed(level.sz(), inst.qty_exponent). DoB emitted Sz::value() directly. Fix scales DoB qty fields the same way. - order_book::sz_to_fixed: helper that converts an internal Sz (qty * 1e8) to the wire u64 for a given qty_exponent. Lossless for HL: venue qty is always a multiple of 10^qty_exponent so the divisor 10^(8+qty_exponent) divides exactly. - dob_tap: CoinResolver widened to Fn(&Coin) -> Option<(u32, i8)> so the apply tap can scale per-instrument. Applied at OrderAdd.quantity and OrderExecute.exec_quantity emission. - multicast/dob: emit_snapshot takes qty_exponent and applies the same scaling at SnapshotOrder.quantity. run_dob_snapshot_task threads qty_exponent through the priority and round-robin paths from registry. - websocket_server: construction-site resolver returns qty_exponent alongside instrument_id from the registry's active map. Tests: - dob_tap unit tests: scaling assertions at qty_exponent = -3 and 0. - multicast/dob: snapshot_qty_scaling_tests verify SnapshotOrder.quantity on the wire at qty_exponent = -3 and 0. - parity_tests: TOB-DoB end-state parity at qty_exponent = -8, -3, and 0. Without the fix the -3 and 0 variants fail with dob_qty / tob_qty == 10^(8 + qty_exponent), which is the exact symptom from the field. Refs: hyperliquid#10 --- server/src/listeners/order_book/dob_tap.rs | 118 ++++++++++- .../src/listeners/order_book/parity_tests.rs | 66 +++++-- server/src/multicast/dob.rs | 184 ++++++++++++++++-- server/src/order_book/mod.rs | 2 +- server/src/order_book/types.rs | 51 +++++ server/src/servers/websocket_server.rs | 10 +- 6 files changed, 384 insertions(+), 47 deletions(-) diff --git a/server/src/listeners/order_book/dob_tap.rs b/server/src/listeners/order_book/dob_tap.rs index ac8fc8f..082befb 100644 --- a/server/src/listeners/order_book/dob_tap.rs +++ b/server/src/listeners/order_book/dob_tap.rs @@ -10,7 +10,7 @@ use crate::{ multicast::dob::{DobEvent, DobEventSender}, order_book::{ - Coin, Oid, Px, Side, Sz, + Coin, Oid, Px, Side, Sz, sz_to_fixed, per_instrument_seq::PerInstrumentSeqCounter, }, protocol::dob::{ @@ -34,12 +34,18 @@ use tokio::sync::mpsc::error::TrySendError; /// an `.await`, so a sync mutex is the correct primitive. pub(crate) type SharedSeqCounter = Arc>; +/// Resolver returning the on-wire `(instrument_id, qty_exponent)` for a coin, +/// or `None` if the coin is not in the active registry. The `qty_exponent` is +/// required so emitted quantities can be scaled to the venue's per-instrument +/// fixed-point representation (the publisher's internal `Sz` is at 10^8). +pub(crate) type CoinResolver = Box Option<(u32, i8)> + Send + Sync>; + pub(crate) struct DobApplyTap { sender: DobEventSender, seq: SharedSeqCounter, source_id: u16, channel_id: u8, - coin_resolver: Box Option + Send + Sync>, + coin_resolver: CoinResolver, } impl DobApplyTap { @@ -53,14 +59,14 @@ impl DobApplyTap { source_id: u16, channel_id: u8, seq: SharedSeqCounter, - coin_resolver: Box Option + Send + Sync>, + coin_resolver: CoinResolver, ) -> Self { Self { sender, seq, source_id, channel_id, coin_resolver } } /// Emits an `OrderAdd` event for a newly resting order. pub(crate) fn emit_order_add(&mut self, coin: &Coin, inner_order: &InnerL4Order, timestamp_ns: u64) { - let Some(instrument_id) = (self.coin_resolver)(coin) else { + let Some((instrument_id, qty_exponent)) = (self.coin_resolver)(coin) else { log::warn!("dob_tap: unknown coin '{}' — skipping OrderAdd", coin.value()); return; }; @@ -78,14 +84,14 @@ impl DobApplyTap { order_id: inner_order.oid, enter_timestamp_ns: timestamp_ns, price: inner_order.limit_px.value() as i64, - quantity: inner_order.sz.value(), + quantity: sz_to_fixed(inner_order.sz, qty_exponent), }); self.try_send(event, "OrderAdd"); } /// Emits an `OrderCancel` event when an order is removed from the book. pub(crate) fn emit_order_cancel(&mut self, coin: &Coin, oid: Oid, timestamp_ns: u64) { - let Some(instrument_id) = (self.coin_resolver)(coin) else { + let Some((instrument_id, _qty_exponent)) = (self.coin_resolver)(coin) else { log::warn!("dob_tap: unknown coin '{}' — skipping OrderCancel", coin.value()); return; }; @@ -110,7 +116,7 @@ impl DobApplyTap { exec_quantity: Sz, timestamp_ns: u64, ) { - let Some(instrument_id) = (self.coin_resolver)(coin) else { + let Some((instrument_id, qty_exponent)) = (self.coin_resolver)(coin) else { log::warn!("dob_tap: unknown coin '{}' — skipping OrderExecute", coin.value()); return; }; @@ -125,7 +131,7 @@ impl DobApplyTap { trade_id: 0, // Phase 1: no trade_id available from diff alone timestamp_ns, exec_price: exec_price.value() as i64, - exec_quantity: exec_quantity.value(), + exec_quantity: sz_to_fixed(exec_quantity, qty_exponent), }); self.try_send(event, "OrderExecute"); } @@ -163,8 +169,16 @@ mod tests { Coin::new("BTC") } - fn btc_resolver() -> Box Option + Send + Sync> { - Box::new(|coin: &Coin| if coin.value() == "BTC" { Some(0) } else { None }) + /// Default resolver: BTC -> (id=0, qty_exponent=-8). The -8 case is the + /// no-op for `sz_to_fixed`, so existing tests that assert on `Sz::value()` + /// (e.g. 50_000_000 from a parsed `Sz`) continue to hold. + fn btc_resolver() -> CoinResolver { + Box::new(|coin: &Coin| if coin.value() == "BTC" { Some((0, -8)) } else { None }) + } + + /// Resolver with a non-trivial qty_exponent for testing wire scaling. + fn btc_resolver_with_qty_exponent(qty_exponent: i8) -> CoinResolver { + Box::new(move |coin: &Coin| if coin.value() == "BTC" { Some((0, qty_exponent)) } else { None }) } fn make_inner_order(oid: u64, side: Side, limit_px: u64, sz: u64) -> InnerL4Order { @@ -326,4 +340,88 @@ mod tests { other => panic!("expected BatchBoundary, got {:?}", other), } } + + /// `Sz::parse_from_str("0.5")` is `0.5 * 1e8 = 50_000_000` internally. + /// With `qty_exponent = -3`, the wire representation is `0.5 * 10^3 = 500`. + #[tokio::test] + async fn emit_order_add_scales_quantity_to_qty_exponent() { + let (tx, mut rx) = channel(4); + let seq = Arc::new(Mutex::new(PerInstrumentSeqCounter::new())); + let mut tap = DobApplyTap::new(tx, 1, 0, seq, btc_resolver_with_qty_exponent(-3)); + let order = make_inner_order( + 42, + Side::Bid, + Px::parse_from_str("100").unwrap().value(), + Sz::parse_from_str("0.5").unwrap().value(), + ); + + tap.emit_order_add(&btc_coin(), &order, 1_000); + + match rx.recv().await.unwrap() { + DobEvent::OrderAdd(msg) => { + assert_eq!( + msg.quantity, 500, + "quantity must be scaled to qty_exponent=-3 (expected 500, got {})", + msg.quantity, + ); + } + other => panic!("expected OrderAdd, got {other:?}"), + } + } + + /// `qty_exponent = 0` (e.g. instrument `2Z` from issue #10) — wire qty must + /// be the integer count (`Sz::value() / 1e8`), not the raw 10^8 internal. + #[tokio::test] + async fn emit_order_add_scales_quantity_at_qty_exponent_zero() { + let (tx, mut rx) = channel(4); + let seq = Arc::new(Mutex::new(PerInstrumentSeqCounter::new())); + let mut tap = DobApplyTap::new(tx, 1, 0, seq, btc_resolver_with_qty_exponent(0)); + let order = make_inner_order( + 42, + Side::Bid, + Px::parse_from_str("100").unwrap().value(), + Sz::parse_from_str("2921").unwrap().value(), + ); + + tap.emit_order_add(&btc_coin(), &order, 1_000); + + match rx.recv().await.unwrap() { + DobEvent::OrderAdd(msg) => { + assert_eq!( + msg.quantity, 2921, + "quantity must be scaled to qty_exponent=0 (expected 2921, got {})", + msg.quantity, + ); + } + other => panic!("expected OrderAdd, got {other:?}"), + } + } + + /// `OrderExecute.exec_quantity` must follow the same scaling rule. + #[tokio::test] + async fn emit_order_execute_scales_exec_quantity_to_qty_exponent() { + let (tx, mut rx) = channel(4); + let seq = Arc::new(Mutex::new(PerInstrumentSeqCounter::new())); + let mut tap = DobApplyTap::new(tx, 1, 0, seq, btc_resolver_with_qty_exponent(-3)); + let exec_quantity = Sz::parse_from_str("1.234").unwrap(); + + tap.emit_order_execute( + &btc_coin(), + Oid::new(77), + Px::parse_from_str("100").unwrap(), + exec_quantity, + 2_000, + ); + + match rx.recv().await.unwrap() { + DobEvent::OrderExecute(msg) => { + assert_eq!( + msg.exec_quantity, 1234, + "exec_quantity must be scaled to qty_exponent=-3 (expected 1234, got {})", + msg.exec_quantity, + ); + } + other => panic!("expected OrderExecute, got {other:?}"), + } + } } diff --git a/server/src/listeners/order_book/parity_tests.rs b/server/src/listeners/order_book/parity_tests.rs index 8af3579..e01a84d 100644 --- a/server/src/listeners/order_book/parity_tests.rs +++ b/server/src/listeners/order_book/parity_tests.rs @@ -66,13 +66,11 @@ use crate::types::inner::InnerL4Order; use crate::types::node_data::{Batch, NodeDataOrderDiff, NodeDataOrderStatus}; use crate::types::{L4Order, OrderDiff}; -/// Use exponent -8 so that fixed-point scales of TOB and DoB align with the -/// internal `Px`/`Sz` representation (which is `value * 1e8`). With -8, -/// `price_to_fixed("100", -8) == 10_000_000_000 == Px("100").value()`, so a -/// price of 100 in human form survives both encoding paths as the same -/// `i64` on the wire. +/// Price exponent stays at -8 across all parity-test scenarios so that the +/// internal `Px` representation (`value * 1e8`) aligns with both TOB and DoB +/// price encodings. (Price-side scaling for non-trivial `price_exponent` is a +/// separate concern and not covered by this test family.) const PRICE_EXPONENT: i8 = -8; -const QTY_EXPONENT: i8 = -8; const TEST_INSTRUMENT_ID: u32 = 0; const TEST_COIN: &str = "BTC"; @@ -191,9 +189,9 @@ fn cancel_diff(oid: u64, px: &str) -> NodeDataOrderDiff { } /// Build a registry containing a single instrument matching `TEST_COIN` / -/// `TEST_INSTRUMENT_ID`, with the price/qty exponents the parity test -/// requires. -fn build_test_registry() -> RegistryState { +/// `TEST_INSTRUMENT_ID`, with the given `qty_exponent`. The `price_exponent` +/// is fixed at `PRICE_EXPONENT`. +fn build_test_registry(qty_exponent: i8) -> RegistryState { RegistryState::new(vec![UniverseEntry { instrument_id: TEST_INSTRUMENT_ID, coin: TEST_COIN.to_string(), @@ -201,7 +199,7 @@ fn build_test_registry() -> RegistryState { info: InstrumentInfo { instrument_id: TEST_INSTRUMENT_ID, price_exponent: PRICE_EXPONENT, - qty_exponent: QTY_EXPONENT, + qty_exponent, symbol: make_symbol(TEST_COIN), }, }]) @@ -258,8 +256,12 @@ async fn drain_collector(sock: &UdpSocket, timeout: Duration) -> Vec> { frames } -#[tokio::test] -async fn tob_dob_best_bid_ask_parity_at_end_state() { +/// Runs the full TOB/DoB parity scenario at a given `qty_exponent`. The +/// scenario applies the same fixed sequence of L4 events; only the registry's +/// `qty_exponent` (and therefore the on-wire qty scaling) varies. End-state +/// quantities are bid=2 and ask=7 (decimal), which the assertions verify +/// after applying the wire scaling rule `wire = decimal * 10^-qty_exponent`. +async fn run_tob_dob_parity_scenario(qty_exponent: i8) { // 1. Bind a UDP collector that the DoB emitter will multicast-loop into. let dob_collector = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) .await @@ -304,8 +306,8 @@ async fn tob_dob_best_bid_ask_parity_at_end_state() { /* source_id = */ 1, /* channel_id = */ 0, seq_counter.clone(), - Box::new(|c: &Coin| { - if c.value() == TEST_COIN { Some(TEST_INSTRUMENT_ID) } else { None } + Box::new(move |c: &Coin| { + if c.value() == TEST_COIN { Some((TEST_INSTRUMENT_ID, qty_exponent)) } else { None } }), ); listener.set_dob_tap(tap); @@ -420,7 +422,7 @@ async fn tob_dob_best_bid_ask_parity_at_end_state() { // does. The actual websocket-server publisher loop is intentionally // NOT spun up: the conversion is the load-bearing step, and decoupling // keeps the test fast and deterministic. - let registry = build_test_registry(); + let registry = build_test_registry(qty_exponent); let inst = registry.active.get(TEST_COIN).expect("instrument registered").clone(); let (_time, l2_snapshots) = listener.l2_snapshots_for_test().expect("snapshot ready"); let (tob_bid_price, tob_bid_qty, tob_ask_price, tob_ask_qty) = @@ -479,9 +481,37 @@ async fn tob_dob_best_bid_ask_parity_at_end_state() { ); // Sanity: the test sequence ends with bid=100 (qty=2), ask=109 (qty=7). - // With PRICE_EXPONENT=QTY_EXPONENT=-8 these become _ × 1e8. + // PRICE_EXPONENT=-8 fixes the price scale to × 1e8; qty scale follows + // `qty_exponent`: wire = decimal × 10^-qty_exponent. + let qty_scale = 10u64.pow(((-qty_exponent) as i32).max(0) as u32); assert_eq!(tob_bid_price, 100 * 100_000_000); - assert_eq!(tob_bid_qty, 2 * 100_000_000); + assert_eq!(tob_bid_qty, 2 * qty_scale); assert_eq!(tob_ask_price, 109 * 100_000_000); - assert_eq!(tob_ask_qty, 7 * 100_000_000); + assert_eq!(tob_ask_qty, 7 * qty_scale); +} + +/// Original parity test at `qty_exponent = -8`: this is the no-op case for +/// `sz_to_fixed` (publisher's internal `Sz` already at 10^8). Confirms TOB +/// and DoB still agree under the trivial scaling. +#[tokio::test] +async fn tob_dob_best_bid_ask_parity_at_qty_exponent_neg8() { + run_tob_dob_parity_scenario(-8).await; +} + +/// Parity test at `qty_exponent = -3` — the regression test for issue #10. +/// Before the fix, DoB emitted `OrderAdd.quantity` and +/// `OrderExecute.exec_quantity` at the publisher's raw 10^8 scale while TOB +/// emitted the venue's per-instrument scale, so this assertion would fail +/// with `dob_qty = tob_qty * 10^(8 + qty_exponent)`. +#[tokio::test] +async fn tob_dob_best_bid_ask_parity_at_qty_exponent_neg3() { + run_tob_dob_parity_scenario(-3).await; +} + +/// Parity test at `qty_exponent = 0` — exercises the integer-only case (e.g. +/// instrument `2Z` from issue #10). With the bug present, the failure is +/// most extreme here: `dob_qty / tob_qty == 10^8`. +#[tokio::test] +async fn tob_dob_best_bid_ask_parity_at_qty_exponent_zero() { + run_tob_dob_parity_scenario(0).await; } diff --git a/server/src/multicast/dob.rs b/server/src/multicast/dob.rs index a16081a..bd11729 100644 --- a/server/src/multicast/dob.rs +++ b/server/src/multicast/dob.rs @@ -350,6 +350,8 @@ impl DobSnapshotEmitter { /// snapshot was anchored; subscribers reconcile the snapshot stream with /// the delta stream against this value. `last_instrument_seq` is the most /// recently emitted per-instrument seq for `instrument_id` (or 0 if none). + /// `qty_exponent` is the venue's per-instrument exponent used to scale + /// each `SnapshotOrder.quantity` from internal `Sz` (10^8 fixed) to wire. /// The empty-orders case is supported and emits Begin + End with no /// `SnapshotOrder` frames in between. pub(crate) async fn emit_snapshot( @@ -357,6 +359,7 @@ impl DobSnapshotEmitter { instrument_id: u32, anchor_seq: u64, last_instrument_seq: u32, + qty_exponent: i8, orders: Vec, ) -> std::io::Result<()> { let snapshot_id = self.next_snapshot_id(); @@ -428,7 +431,7 @@ impl DobSnapshotEmitter { order_flags: 0, // Phase 1 leaves these at 0; mirrors OrderAdd in dob_tap. enter_timestamp_ns, price: order.limit_px.value() as i64, - quantity: order.sz.value(), + quantity: crate::order_book::sz_to_fixed(order.sz, qty_exponent), }; encode_snapshot_order(buf, &msg); fb.commit_message(); @@ -512,7 +515,9 @@ pub(crate) async fn run_dob_snapshot_task( if let Some(DobSnapshotRequest::Priority { instrument_id, anchor_seq }) = priority_queue.pop_front() { - let Some(coin) = lookup_coin_for_instrument(®istry, instrument_id).await else { + let Some((coin, qty_exponent)) = + lookup_coin_for_instrument(®istry, instrument_id).await + else { log::warn!( "dob_snapshot: priority request for unknown instrument_id {instrument_id}, skipping" ); @@ -533,7 +538,7 @@ pub(crate) async fn run_dob_snapshot_task( .clone_coin_orders(&coin) .unwrap_or_default(); emitter - .emit_snapshot(instrument_id, anchor_seq, last_instrument_seq, orders) + .emit_snapshot(instrument_id, anchor_seq, last_instrument_seq, qty_exponent, orders) .await?; continue; } @@ -548,7 +553,7 @@ pub(crate) async fn run_dob_snapshot_task( let active_len = u32::try_from(active.len()).unwrap_or(u32::MAX).max(1); let slot_budget = emitter.config.round_duration / active_len; - for (instrument_id, coin) in active { + for (instrument_id, coin, qty_exponent) in active { // Re-check the priority queue between slots so a recovery snapshot // does not have to wait a full cycle. loop { @@ -581,7 +586,7 @@ pub(crate) async fn run_dob_snapshot_task( let emit_start = std::time::Instant::now(); emitter - .emit_snapshot(instrument_id, anchor_seq, last_instrument_seq, orders) + .emit_snapshot(instrument_id, anchor_seq, last_instrument_seq, qty_exponent, orders) .await?; let elapsed = emit_start.elapsed(); if elapsed < slot_budget { @@ -591,32 +596,32 @@ pub(crate) async fn run_dob_snapshot_task( } } -/// Linear-scan lookup of `instrument_id -> Coin` against the active instrument -/// set. Used by the priority path; cost is fine because priority requests are -/// rare (only on validation mismatch via `apply_recovery`). +/// Linear-scan lookup of `instrument_id -> (Coin, qty_exponent)` against the +/// active instrument set. Used by the priority path; cost is fine because +/// priority requests are rare (only on validation mismatch via `apply_recovery`). async fn lookup_coin_for_instrument( registry: &Arc>, instrument_id: u32, -) -> Option { +) -> Option<(crate::order_book::Coin, i8)> { let state = registry.read().await; state .active .iter() .find(|(_coin, info)| info.instrument_id == instrument_id) - .map(|(coin, _info)| crate::order_book::Coin::new(coin)) + .map(|(coin, info)| (crate::order_book::Coin::new(coin), info.qty_exponent)) } -/// Snapshot of the active instrument set as `(instrument_id, Coin)` pairs. -/// The scheduler iterates this once per round; if the set changes mid-cycle -/// we'll pick up the change on the next round. +/// Snapshot of the active instrument set as `(instrument_id, Coin, qty_exponent)` +/// triples. The scheduler iterates this once per round; if the set changes +/// mid-cycle we'll pick up the change on the next round. async fn list_active_instruments( registry: &Arc>, -) -> Vec<(u32, crate::order_book::Coin)> { +) -> Vec<(u32, crate::order_book::Coin, i8)> { let state = registry.read().await; state .active .iter() - .map(|(coin, info)| (info.instrument_id, crate::order_book::Coin::new(coin))) + .map(|(coin, info)| (info.instrument_id, crate::order_book::Coin::new(coin), info.qty_exponent)) .collect() } @@ -1088,3 +1093,152 @@ mod snapshot_anchor_tests { handle.abort(); } } + +#[cfg(test)] +mod snapshot_qty_scaling_tests { + //! Verifies `SnapshotOrder.quantity` is scaled to the venue's + //! `qty_exponent` (issue #10). Internal `Sz` carries quantity at the + //! publisher's fixed 10^8 scale; the wire field must be + //! `qty * 10^-qty_exponent` (i.e. `Sz::value() / 10^(8 + qty_exponent)`). + use super::*; + use crate::order_book::{Coin, Px, Side, Sz}; + use crate::protocol::dob::constants::{ + DEFAULT_MTU, FRAME_HEADER_SIZE, MSG_TYPE_SNAPSHOT_ORDER, SNAPSHOT_ORDER_SIZE, + }; + use alloy::primitives::Address; + use tokio::net::UdpSocket; + + fn make_inner_order(coin: &Coin, oid: u64, side: Side, px: &str, sz: &str) -> crate::types::inner::InnerL4Order { + crate::types::inner::InnerL4Order { + user: Address::new([0; 20]), + coin: coin.clone(), + side, + limit_px: Px::parse_from_str(px).unwrap(), + sz: Sz::parse_from_str(sz).unwrap(), + oid, + timestamp: 0, + trigger_condition: String::new(), + is_trigger: false, + trigger_px: String::new(), + is_position_tpsl: false, + reduce_only: false, + order_type: String::new(), + tif: None, + cloid: None, + } + } + + /// Walks the captured datagrams and returns the `quantity` field of the + /// first `SnapshotOrder` found. Each datagram is one DoB frame; a single + /// frame may pack multiple SnapshotOrder messages, but the first message + /// in the frame is sufficient to verify scaling. + fn first_snapshot_order_quantity(frames: &[Vec]) -> Option { + for frame in frames { + if frame.len() < FRAME_HEADER_SIZE + SNAPSHOT_ORDER_SIZE { + continue; + } + let body = &frame[FRAME_HEADER_SIZE..]; + if body[0] != MSG_TYPE_SNAPSHOT_ORDER { + continue; + } + // SnapshotOrder body offsets: quantity at 36..44 (see encode_snapshot_order). + return Some(u64::from_le_bytes(body[36..44].try_into().ok()?)); + } + None + } + + #[tokio::test] + async fn snapshot_order_quantity_scales_to_qty_exponent() { + // 1. Bind collector and the snapshot emitter pointing at it. + let collector = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).await.unwrap(); + let collector_addr = match collector.local_addr().unwrap() { + std::net::SocketAddr::V4(a) => a, + _ => unreachable!(), + }; + let mut emitter = DobSnapshotEmitter::bind(DobSnapshotConfig { + group_addr: *collector_addr.ip(), + port: collector_addr.port(), + bind_addr: Ipv4Addr::LOCALHOST, + channel_id: 0, + mtu: DEFAULT_MTU, + round_duration: Duration::from_secs(60), + }) + .await + .unwrap(); + + // 2. One order with sz="1.234" → Sz::value() == 1.234 * 1e8 == 123_400_000. + // With qty_exponent=-3, wire quantity must be 1234. + let coin = Coin::new("BTC"); + let orders = vec![make_inner_order(&coin, 42, Side::Bid, "100", "1.234")]; + + // 3. Emit one snapshot triad. + emitter + .emit_snapshot( + /* instrument_id = */ 0, + /* anchor_seq = */ 0, + /* last_instrument_seq = */ 0, + /* qty_exponent = */ -3, + orders, + ) + .await + .unwrap(); + + // 4. Drain frames (Begin, Orders, End all emit immediately above). + let mut frames: Vec> = Vec::new(); + loop { + let mut buf = [0u8; 4096]; + match tokio::time::timeout(Duration::from_millis(200), collector.recv_from(&mut buf)).await { + Ok(Ok((n, _))) => frames.push(buf[..n].to_vec()), + _ => break, + } + } + + // 5. Verify the SnapshotOrder.quantity was scaled to qty_exponent. + let qty = first_snapshot_order_quantity(&frames) + .expect("captured at least one SnapshotOrder frame"); + assert_eq!( + qty, 1234, + "SnapshotOrder.quantity must equal 1234 for sz=1.234 at qty_exponent=-3 (got {})", + qty, + ); + } + + #[tokio::test] + async fn snapshot_order_quantity_at_qty_exponent_zero() { + // qty_exponent=0 case (e.g. instrument 2Z from issue #10): wire must + // be the integer count, not Sz::value() (which is qty * 1e8). + let collector = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).await.unwrap(); + let collector_addr = match collector.local_addr().unwrap() { + std::net::SocketAddr::V4(a) => a, + _ => unreachable!(), + }; + let mut emitter = DobSnapshotEmitter::bind(DobSnapshotConfig { + group_addr: *collector_addr.ip(), + port: collector_addr.port(), + bind_addr: Ipv4Addr::LOCALHOST, + channel_id: 0, + mtu: DEFAULT_MTU, + round_duration: Duration::from_secs(60), + }) + .await + .unwrap(); + + let coin = Coin::new("2Z"); + let orders = vec![make_inner_order(&coin, 42, Side::Bid, "100", "2921")]; + + emitter.emit_snapshot(0, 0, 0, 0, orders).await.unwrap(); + + let mut frames: Vec> = Vec::new(); + loop { + let mut buf = [0u8; 4096]; + match tokio::time::timeout(Duration::from_millis(200), collector.recv_from(&mut buf)).await { + Ok(Ok((n, _))) => frames.push(buf[..n].to_vec()), + _ => break, + } + } + + let qty = first_snapshot_order_quantity(&frames) + .expect("captured at least one SnapshotOrder frame"); + assert_eq!(qty, 2921); + } +} diff --git a/server/src/order_book/mod.rs b/server/src/order_book/mod.rs index ce5a835..b3f152f 100644 --- a/server/src/order_book/mod.rs +++ b/server/src/order_book/mod.rs @@ -9,7 +9,7 @@ pub(crate) mod multi_book; pub mod per_instrument_seq; pub(crate) mod types; -pub(crate) use types::{Coin, InnerOrder, Oid, Px, Side, Sz}; +pub(crate) use types::{Coin, InnerOrder, Oid, Px, Side, Sz, sz_to_fixed}; pub use per_instrument_seq::PerInstrumentSeqCounter; #[derive(Clone, Default)] diff --git a/server/src/order_book/types.rs b/server/src/order_book/types.rs index 2ab7c13..06aee21 100644 --- a/server/src/order_book/types.rs +++ b/server/src/order_book/types.rs @@ -147,3 +147,54 @@ impl Sz { s.trim_end_matches('.').to_string() } } + +/// Converts an internal `Sz` (which carries a quantity at the publisher's +/// fixed 10^8 scale) to the wire encoding for an instrument whose +/// `qty_exponent` is `qty_exponent`. The wire encoding is `qty * 10^-qty_exponent`, +/// and `Sz::value() == qty * 10^8`, so the divisor is `10^(8 + qty_exponent)`. +/// +/// HL's `qty_exponent` range is `-8..=0` in practice (today: `-5..=0`); for any +/// value in that range the divisor is `>= 1` and the division is exact for any +/// quantity emitted by the venue (which is always an integer multiple of +/// `10^qty_exponent`). +#[must_use] +pub(crate) fn sz_to_fixed(sz: Sz, qty_exponent: i8) -> u64 { + let div_pow = 8i32 + i32::from(qty_exponent); + if div_pow <= 0 { + sz.value().saturating_mul(10u64.pow((-div_pow) as u32)) + } else { + sz.value() / 10u64.pow(div_pow as u32) + } +} + +#[cfg(test)] +mod sz_to_fixed_tests { + use super::*; + + #[test] + fn qty_exponent_zero_divides_by_1e8() { + // Sz::parse_from_str("2921") -> 2921 * 10^8. qty_exponent=0 wants 2921. + let sz = Sz::parse_from_str("2921").unwrap(); + assert_eq!(sz_to_fixed(sz, 0), 2921); + } + + #[test] + fn qty_exponent_negative_three() { + // "1.234" with qty_exponent=-3 should yield 1234. + let sz = Sz::parse_from_str("1.234").unwrap(); + assert_eq!(sz_to_fixed(sz, -3), 1234); + } + + #[test] + fn qty_exponent_negative_eight_is_identity() { + // qty_exponent=-8 means the wire representation already matches Sz::value(). + let sz = Sz::parse_from_str("0.00000017").unwrap(); + assert_eq!(sz_to_fixed(sz, -8), sz.value()); + } + + #[test] + fn zero_quantity_is_zero() { + assert_eq!(sz_to_fixed(Sz::new(0), -3), 0); + assert_eq!(sz_to_fixed(Sz::new(0), 0), 0); + } +} diff --git a/server/src/servers/websocket_server.rs b/server/src/servers/websocket_server.rs index f09413c..682a8d6 100644 --- a/server/src/servers/websocket_server.rs +++ b/server/src/servers/websocket_server.rs @@ -150,10 +150,14 @@ pub async fn run_websocket_server( // Coin resolver for the apply tap: non-blocking try_read() so it is // safe to call from the sync apply_updates path without risk of - // blocking a tokio worker. - let coin_resolver: Box Option + Send + Sync> = { + // blocking a tokio worker. Returns `(instrument_id, qty_exponent)` so + // the tap can scale internal `Sz` (10^8 fixed) to the venue's wire + // representation per instrument. + let coin_resolver: crate::listeners::order_book::dob_tap::CoinResolver = { let reg = Arc::clone(®istry); - Box::new(move |coin: &Coin| reg.try_read().ok()?.active.get(&coin.value()).map(|info| info.instrument_id)) + Box::new(move |coin: &Coin| { + reg.try_read().ok()?.active.get(&coin.value()).map(|info| (info.instrument_id, info.qty_exponent)) + }) }; // Shared per-instrument seq counter: bumped by the apply tap, read by // the snapshot emitter when populating SnapshotBegin.last_instrument_seq.