Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 108 additions & 10 deletions server/src/listeners/order_book/dob_tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use crate::{
multicast::dob::{DobEvent, DobEventSender},
order_book::{
Coin, Oid, Px, Side, Sz,
Coin, Oid, Px, Side, Sz, sz_to_fixed,
per_instrument_seq::PerInstrumentSeqCounter,
},
protocol::dob::{
Expand All @@ -34,12 +34,18 @@ use tokio::sync::mpsc::error::TrySendError;
/// an `.await`, so a sync mutex is the correct primitive.
pub(crate) type SharedSeqCounter = Arc<Mutex<PerInstrumentSeqCounter>>;

/// Resolver returning the on-wire `(instrument_id, qty_exponent)` for a coin,
/// or `None` if the coin is not in the active registry. The `qty_exponent` is
/// required so emitted quantities can be scaled to the venue's per-instrument
/// fixed-point representation (the publisher's internal `Sz` is at 10^8).
pub(crate) type CoinResolver = Box<dyn Fn(&Coin) -> Option<(u32, i8)> + Send + Sync>;

pub(crate) struct DobApplyTap {
sender: DobEventSender,
seq: SharedSeqCounter,
source_id: u16,
channel_id: u8,
coin_resolver: Box<dyn Fn(&Coin) -> Option<u32> + Send + Sync>,
coin_resolver: CoinResolver,
}

impl DobApplyTap {
Expand All @@ -53,14 +59,14 @@ impl DobApplyTap {
source_id: u16,
channel_id: u8,
seq: SharedSeqCounter,
coin_resolver: Box<dyn Fn(&Coin) -> Option<u32> + Send + Sync>,
coin_resolver: CoinResolver,
) -> Self {
Self { sender, seq, source_id, channel_id, coin_resolver }
}

/// Emits an `OrderAdd` event for a newly resting order.
pub(crate) fn emit_order_add(&mut self, coin: &Coin, inner_order: &InnerL4Order, timestamp_ns: u64) {
let Some(instrument_id) = (self.coin_resolver)(coin) else {
let Some((instrument_id, qty_exponent)) = (self.coin_resolver)(coin) else {
log::warn!("dob_tap: unknown coin '{}' — skipping OrderAdd", coin.value());
return;
};
Expand All @@ -78,14 +84,14 @@ impl DobApplyTap {
order_id: inner_order.oid,
enter_timestamp_ns: timestamp_ns,
price: inner_order.limit_px.value() as i64,
quantity: inner_order.sz.value(),
quantity: sz_to_fixed(inner_order.sz, qty_exponent),
});
self.try_send(event, "OrderAdd");
}

/// Emits an `OrderCancel` event when an order is removed from the book.
pub(crate) fn emit_order_cancel(&mut self, coin: &Coin, oid: Oid, timestamp_ns: u64) {
let Some(instrument_id) = (self.coin_resolver)(coin) else {
let Some((instrument_id, _qty_exponent)) = (self.coin_resolver)(coin) else {
log::warn!("dob_tap: unknown coin '{}' — skipping OrderCancel", coin.value());
return;
};
Expand All @@ -110,7 +116,7 @@ impl DobApplyTap {
exec_quantity: Sz,
timestamp_ns: u64,
) {
let Some(instrument_id) = (self.coin_resolver)(coin) else {
let Some((instrument_id, qty_exponent)) = (self.coin_resolver)(coin) else {
log::warn!("dob_tap: unknown coin '{}' — skipping OrderExecute", coin.value());
return;
};
Expand All @@ -125,7 +131,7 @@ impl DobApplyTap {
trade_id: 0, // Phase 1: no trade_id available from diff alone
timestamp_ns,
exec_price: exec_price.value() as i64,
exec_quantity: exec_quantity.value(),
exec_quantity: sz_to_fixed(exec_quantity, qty_exponent),
});
self.try_send(event, "OrderExecute");
}
Expand Down Expand Up @@ -163,8 +169,16 @@ mod tests {
Coin::new("BTC")
}

fn btc_resolver() -> Box<dyn Fn(&Coin) -> Option<u32> + Send + Sync> {
Box::new(|coin: &Coin| if coin.value() == "BTC" { Some(0) } else { None })
/// Default resolver: BTC -> (id=0, qty_exponent=-8). The -8 case is the
/// no-op for `sz_to_fixed`, so existing tests that assert on `Sz::value()`
/// (e.g. 50_000_000 from a parsed `Sz`) continue to hold.
fn btc_resolver() -> CoinResolver {
Box::new(|coin: &Coin| if coin.value() == "BTC" { Some((0, -8)) } else { None })
}

/// Resolver with a non-trivial qty_exponent for testing wire scaling.
fn btc_resolver_with_qty_exponent(qty_exponent: i8) -> CoinResolver {
Box::new(move |coin: &Coin| if coin.value() == "BTC" { Some((0, qty_exponent)) } else { None })
}

fn make_inner_order(oid: u64, side: Side, limit_px: u64, sz: u64) -> InnerL4Order {
Expand Down Expand Up @@ -326,4 +340,88 @@ mod tests {
other => panic!("expected BatchBoundary, got {:?}", other),
}
}

/// `Sz::parse_from_str("0.5")` is `0.5 * 1e8 = 50_000_000` internally.
/// With `qty_exponent = -3`, the wire representation is `0.5 * 10^3 = 500`.
#[tokio::test]
async fn emit_order_add_scales_quantity_to_qty_exponent() {
let (tx, mut rx) = channel(4);
let seq = Arc::new(Mutex::new(PerInstrumentSeqCounter::new()));
let mut tap = DobApplyTap::new(tx, 1, 0, seq, btc_resolver_with_qty_exponent(-3));
let order = make_inner_order(
42,
Side::Bid,
Px::parse_from_str("100").unwrap().value(),
Sz::parse_from_str("0.5").unwrap().value(),
);

tap.emit_order_add(&btc_coin(), &order, 1_000);

match rx.recv().await.unwrap() {
DobEvent::OrderAdd(msg) => {
assert_eq!(
msg.quantity, 500,
"quantity must be scaled to qty_exponent=-3 (expected 500, got {})",
msg.quantity,
);
}
other => panic!("expected OrderAdd, got {other:?}"),
}
}

/// `qty_exponent = 0` (e.g. instrument `2Z` from issue #10) — wire qty must
/// be the integer count (`Sz::value() / 1e8`), not the raw 10^8 internal.
#[tokio::test]
async fn emit_order_add_scales_quantity_at_qty_exponent_zero() {
let (tx, mut rx) = channel(4);
let seq = Arc::new(Mutex::new(PerInstrumentSeqCounter::new()));
let mut tap = DobApplyTap::new(tx, 1, 0, seq, btc_resolver_with_qty_exponent(0));
let order = make_inner_order(
42,
Side::Bid,
Px::parse_from_str("100").unwrap().value(),
Sz::parse_from_str("2921").unwrap().value(),
);

tap.emit_order_add(&btc_coin(), &order, 1_000);

match rx.recv().await.unwrap() {
DobEvent::OrderAdd(msg) => {
assert_eq!(
msg.quantity, 2921,
"quantity must be scaled to qty_exponent=0 (expected 2921, got {})",
msg.quantity,
);
}
other => panic!("expected OrderAdd, got {other:?}"),
}
}

/// `OrderExecute.exec_quantity` must follow the same scaling rule.
#[tokio::test]
async fn emit_order_execute_scales_exec_quantity_to_qty_exponent() {
let (tx, mut rx) = channel(4);
let seq = Arc::new(Mutex::new(PerInstrumentSeqCounter::new()));
let mut tap = DobApplyTap::new(tx, 1, 0, seq, btc_resolver_with_qty_exponent(-3));
let exec_quantity = Sz::parse_from_str("1.234").unwrap();

tap.emit_order_execute(
&btc_coin(),
Oid::new(77),
Px::parse_from_str("100").unwrap(),
exec_quantity,
2_000,
);

match rx.recv().await.unwrap() {
DobEvent::OrderExecute(msg) => {
assert_eq!(
msg.exec_quantity, 1234,
"exec_quantity must be scaled to qty_exponent=-3 (expected 1234, got {})",
msg.exec_quantity,
);
}
other => panic!("expected OrderExecute, got {other:?}"),
}
}
}
66 changes: 48 additions & 18 deletions server/src/listeners/order_book/parity_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,11 @@ use crate::types::inner::InnerL4Order;
use crate::types::node_data::{Batch, NodeDataOrderDiff, NodeDataOrderStatus};
use crate::types::{L4Order, OrderDiff};

/// Use exponent -8 so that fixed-point scales of TOB and DoB align with the
/// internal `Px`/`Sz` representation (which is `value * 1e8`). With -8,
/// `price_to_fixed("100", -8) == 10_000_000_000 == Px("100").value()`, so a
/// price of 100 in human form survives both encoding paths as the same
/// `i64` on the wire.
/// Price exponent stays at -8 across all parity-test scenarios so that the
/// internal `Px` representation (`value * 1e8`) aligns with both TOB and DoB
/// price encodings. (Price-side scaling for non-trivial `price_exponent` is a
/// separate concern and not covered by this test family.)
const PRICE_EXPONENT: i8 = -8;
const QTY_EXPONENT: i8 = -8;
const TEST_INSTRUMENT_ID: u32 = 0;
const TEST_COIN: &str = "BTC";

Expand Down Expand Up @@ -191,17 +189,17 @@ fn cancel_diff(oid: u64, px: &str) -> NodeDataOrderDiff {
}

/// Build a registry containing a single instrument matching `TEST_COIN` /
/// `TEST_INSTRUMENT_ID`, with the price/qty exponents the parity test
/// requires.
fn build_test_registry() -> RegistryState {
/// `TEST_INSTRUMENT_ID`, with the given `qty_exponent`. The `price_exponent`
/// is fixed at `PRICE_EXPONENT`.
fn build_test_registry(qty_exponent: i8) -> RegistryState {
RegistryState::new(vec![UniverseEntry {
instrument_id: TEST_INSTRUMENT_ID,
coin: TEST_COIN.to_string(),
is_delisted: false,
info: InstrumentInfo {
instrument_id: TEST_INSTRUMENT_ID,
price_exponent: PRICE_EXPONENT,
qty_exponent: QTY_EXPONENT,
qty_exponent,
symbol: make_symbol(TEST_COIN),
},
}])
Expand Down Expand Up @@ -258,8 +256,12 @@ async fn drain_collector(sock: &UdpSocket, timeout: Duration) -> Vec<Vec<u8>> {
frames
}

#[tokio::test]
async fn tob_dob_best_bid_ask_parity_at_end_state() {
/// Runs the full TOB/DoB parity scenario at a given `qty_exponent`. The
/// scenario applies the same fixed sequence of L4 events; only the registry's
/// `qty_exponent` (and therefore the on-wire qty scaling) varies. End-state
/// quantities are bid=2 and ask=7 (decimal), which the assertions verify
/// after applying the wire scaling rule `wire = decimal * 10^-qty_exponent`.
async fn run_tob_dob_parity_scenario(qty_exponent: i8) {
// 1. Bind a UDP collector that the DoB emitter will multicast-loop into.
let dob_collector = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))
.await
Expand Down Expand Up @@ -304,8 +306,8 @@ async fn tob_dob_best_bid_ask_parity_at_end_state() {
/* source_id = */ 1,
/* channel_id = */ 0,
seq_counter.clone(),
Box::new(|c: &Coin| {
if c.value() == TEST_COIN { Some(TEST_INSTRUMENT_ID) } else { None }
Box::new(move |c: &Coin| {
if c.value() == TEST_COIN { Some((TEST_INSTRUMENT_ID, qty_exponent)) } else { None }
}),
);
listener.set_dob_tap(tap);
Expand Down Expand Up @@ -420,7 +422,7 @@ async fn tob_dob_best_bid_ask_parity_at_end_state() {
// does. The actual websocket-server publisher loop is intentionally
// NOT spun up: the conversion is the load-bearing step, and decoupling
// keeps the test fast and deterministic.
let registry = build_test_registry();
let registry = build_test_registry(qty_exponent);
let inst = registry.active.get(TEST_COIN).expect("instrument registered").clone();
let (_time, l2_snapshots) = listener.l2_snapshots_for_test().expect("snapshot ready");
let (tob_bid_price, tob_bid_qty, tob_ask_price, tob_ask_qty) =
Expand Down Expand Up @@ -479,9 +481,37 @@ async fn tob_dob_best_bid_ask_parity_at_end_state() {
);

// Sanity: the test sequence ends with bid=100 (qty=2), ask=109 (qty=7).
// With PRICE_EXPONENT=QTY_EXPONENT=-8 these become _ × 1e8.
// PRICE_EXPONENT=-8 fixes the price scale to × 1e8; qty scale follows
// `qty_exponent`: wire = decimal × 10^-qty_exponent.
let qty_scale = 10u64.pow(((-qty_exponent) as i32).max(0) as u32);
assert_eq!(tob_bid_price, 100 * 100_000_000);
assert_eq!(tob_bid_qty, 2 * 100_000_000);
assert_eq!(tob_bid_qty, 2 * qty_scale);
assert_eq!(tob_ask_price, 109 * 100_000_000);
assert_eq!(tob_ask_qty, 7 * 100_000_000);
assert_eq!(tob_ask_qty, 7 * qty_scale);
}

/// Original parity test at `qty_exponent = -8`: this is the no-op case for
/// `sz_to_fixed` (publisher's internal `Sz` already at 10^8). Confirms TOB
/// and DoB still agree under the trivial scaling.
#[tokio::test]
async fn tob_dob_best_bid_ask_parity_at_qty_exponent_neg8() {
run_tob_dob_parity_scenario(-8).await;
}

/// Parity test at `qty_exponent = -3` — the regression test for issue #10.
/// Before the fix, DoB emitted `OrderAdd.quantity` and
/// `OrderExecute.exec_quantity` at the publisher's raw 10^8 scale while TOB
/// emitted the venue's per-instrument scale, so this assertion would fail
/// with `dob_qty = tob_qty * 10^(8 + qty_exponent)`.
#[tokio::test]
async fn tob_dob_best_bid_ask_parity_at_qty_exponent_neg3() {
run_tob_dob_parity_scenario(-3).await;
}

/// Parity test at `qty_exponent = 0` — exercises the integer-only case (e.g.
/// instrument `2Z` from issue #10). With the bug present, the failure is
/// most extreme here: `dob_qty / tob_qty == 10^8`.
#[tokio::test]
async fn tob_dob_best_bid_ask_parity_at_qty_exponent_zero() {
run_tob_dob_parity_scenario(0).await;
}
Loading