From b04c2e6324928e45b9182256b372d9d5106cf80e Mon Sep 17 00:00:00 2001 From: Andrew McConnell Date: Sat, 11 Apr 2026 12:08:19 -0500 Subject: [PATCH] edge: add integration tests, synthetic publisher, and devnet guide MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rounds out the edge feed parser with the test and operator materials that let someone run the feature themselves. The integration tests exercise the full pipeline — parser + sink — against synthetic wire-format bytes, covering: normal refdata-then-marketdata ordering, the cold-start scenario where marketdata arrives first and is buffered until refdata lands, the buffer cap + most-recent-wins overwrite semantics, CSV output, and JSON output. The testutil package provides a programmable synthetic Top-of-Book publisher. publisher.go builds raw frames in the wire format (header + messages) and is used by tests to construct deterministic scenarios without needing a real feed. A small topofbook-publisher binary wraps it as a CLI tool that streams frames onto a multicast group, useful for local devnet testing where a real producer isn't available. DEVNET_TEST.md walks through bringing the feature up end-to-end in the local devnet: building containers, adding devices and clients, creating a multicast group with subscriber allowlist, running the synthetic publisher on one client, enabling the edge feed on another, and verifying records flow through. --- .../internal/edge/integration_test.go | 425 ++++++++++++++++++ .../internal/edge/testutil/DEVNET_TEST.md | 175 ++++++++ .../testutil/cmd/topofbook-publisher/main.go | 141 ++++++ .../internal/edge/testutil/publisher.go | 161 +++++++ 4 files changed, 902 insertions(+) create mode 100644 client/doublezerod/internal/edge/integration_test.go create mode 100644 client/doublezerod/internal/edge/testutil/DEVNET_TEST.md create mode 100644 client/doublezerod/internal/edge/testutil/cmd/topofbook-publisher/main.go create mode 100644 client/doublezerod/internal/edge/testutil/publisher.go diff --git a/client/doublezerod/internal/edge/integration_test.go b/client/doublezerod/internal/edge/integration_test.go new file mode 100644 index 000000000..803fd2365 --- /dev/null +++ b/client/doublezerod/internal/edge/integration_test.go @@ -0,0 +1,425 @@ +package edge + +import ( + "bufio" + "encoding/binary" + "encoding/json" + "math" + "os" + "path/filepath" + "testing" + "time" +) + +// TestIntegration_PublisherToJSONL exercises the full pipeline: +// synthetic Top-of-Book frames → parser → JSONL file sink → read-back verification. +// +// This is the edge feed parser equivalent of the pcap fixture tests +// in telemetry/flow-enricher: we construct realistic wire-format data, +// feed it through the real parser and sink, and verify the output. +func TestIntegration_PublisherToJSONL(t *testing.T) { + parser := NewTopOfBookParser() + outputPath := filepath.Join(t.TempDir(), "feed.jsonl") + sink, err := NewJSONFileSink(outputPath) + if err != nil { + t.Fatalf("creating sink: %v", err) + } + defer sink.Close() + + // Simulate a realistic publisher session: + // 1. InstrumentDefinition for BTC-USDT + // 2. InstrumentDefinition for ETH-USDT + // 3. Several quotes for both instruments + // 4. A trade + // 5. A heartbeat + + ts := uint64(time.Date(2026, 4, 10, 14, 30, 0, 0, time.UTC).UnixNano()) + seq := uint64(0) + + // --- Refdata: instrument definitions --- + btcDef := buildInstrumentDef(1, "BTC-USDT", "BTC", "USDT", -2, -8) + ethDef := buildInstrumentDef(2, "ETH-USDT", "ETH", "USDT", -2, -6) + seq++ + refFrame := buildFrame(1, seq, ts, btcDef, ethDef) + processFrame(t, parser, sink, refFrame) + + // --- Hot path: quotes --- + srcTS1 := uint64(time.Date(2026, 4, 10, 14, 30, 0, 100000000, time.UTC).UnixNano()) + q1 := buildQuote(1, 1, srcTS1, 6743250, 125000000, 6743300, 80000000, 0) // BTC + seq++ + processFrame(t, parser, sink, buildFrame(1, seq, ts, q1)) + + srcTS2 := uint64(time.Date(2026, 4, 10, 14, 30, 0, 200000000, time.UTC).UnixNano()) + q2 := buildQuote(2, 1, srcTS2, 350025, 1500000, 350075, 1000000, 0) // ETH + seq++ + processFrame(t, parser, sink, buildFrame(1, seq, ts, q2)) + + // Multiple quotes in one frame (batched). + srcTS3 := uint64(time.Date(2026, 4, 10, 14, 30, 0, 300000000, time.UTC).UnixNano()) + q3 := buildQuote(1, 1, srcTS3, 6743200, 130000000, 6743350, 75000000, 0) + q4 := buildQuote(2, 1, srcTS3, 350000, 1600000, 350100, 900000, 0) + seq++ + processFrame(t, parser, sink, buildFrame(1, seq, ts, q3, q4)) + + // --- Hot path: trade --- + srcTS4 := uint64(time.Date(2026, 4, 10, 14, 30, 0, 400000000, time.UTC).UnixNano()) + trade := buildTrade(1, 1, srcTS4, 6743275, 50000000, 1) // buy + seq++ + processFrame(t, parser, sink, buildFrame(1, seq, ts, trade)) + + // --- Hot path: heartbeat --- + hb := buildHeartbeat(1, ts) + seq++ + processFrame(t, parser, sink, buildFrame(1, seq, ts, hb)) + + // Close sink to flush. + sink.Close() + + // --- Read back and verify --- + records := readJSONL(t, outputPath) + + // Expected: 2 inst defs + 4 quotes + 1 trade + 1 heartbeat = 8 records. + if len(records) != 8 { + t.Fatalf("expected 8 records, got %d", len(records)) + } + + // Verify record types in order. + expectedTypes := []string{ + "instrument_definition", "instrument_definition", + "quote", "quote", "quote", "quote", + "trade", "heartbeat", + } + for i, want := range expectedTypes { + got := records[i]["type"].(string) + if got != want { + t.Errorf("record %d: expected type %q, got %q", i, want, got) + } + } + + // Verify BTC-USDT instrument definition. + instDef := records[0] + if instDef["symbol"] != "BTC-USDT" { + t.Errorf("expected symbol BTC-USDT, got %v", instDef["symbol"]) + } + + // Verify first BTC quote has correct decoded prices. + btcQuote := records[2] + if btcQuote["symbol"] != "BTC-USDT" { + t.Errorf("expected BTC-USDT quote, got symbol %v", btcQuote["symbol"]) + } + fields := btcQuote["fields"].(map[string]any) + bidPrice := fields["bid_price"].(float64) + if math.Abs(bidPrice-67432.50) > 0.01 { + t.Errorf("expected bid_price 67432.50, got %f", bidPrice) + } + bidQty := fields["bid_qty"].(float64) + if math.Abs(bidQty-1.25) > 0.0001 { + t.Errorf("expected bid_qty 1.25, got %f", bidQty) + } + + // Verify ETH-USDT quote. + ethQuote := records[3] + if ethQuote["symbol"] != "ETH-USDT" { + t.Errorf("expected ETH-USDT quote, got symbol %v", ethQuote["symbol"]) + } + ethFields := ethQuote["fields"].(map[string]any) + ethBid := ethFields["bid_price"].(float64) + if math.Abs(ethBid-3500.25) > 0.01 { + t.Errorf("expected ETH bid_price 3500.25, got %f", ethBid) + } + + // Verify trade. + tradeRec := records[6] + if tradeRec["type"] != "trade" { + t.Errorf("expected trade, got %v", tradeRec["type"]) + } + tradeFields := tradeRec["fields"].(map[string]any) + if tradeFields["aggressor_side"] != "buy" { + t.Errorf("expected aggressor_side buy, got %v", tradeFields["aggressor_side"]) + } + tradePrice := tradeFields["trade_price"].(float64) + if math.Abs(tradePrice-67432.75) > 0.01 { + t.Errorf("expected trade_price 67432.75, got %f", tradePrice) + } +} + +// TestIntegration_BufferingThenFlush verifies that the full pipeline +// correctly buffers quotes arriving before instrument definitions, +// then flushes them once the definition arrives. +func TestIntegration_BufferingThenFlush(t *testing.T) { + parser := NewTopOfBookParser() + outputPath := filepath.Join(t.TempDir(), "feed.jsonl") + sink, err := NewJSONFileSink(outputPath) + if err != nil { + t.Fatalf("creating sink: %v", err) + } + defer sink.Close() + + ts := uint64(time.Date(2026, 4, 10, 14, 30, 0, 0, time.UTC).UnixNano()) + + // Send quotes BEFORE instrument definitions (cold-start scenario). + // Use two different instruments so both slots are occupied; the + // buffer now holds at most one pending message per instrument_id. + srcTS := uint64(time.Date(2026, 4, 10, 14, 30, 0, 100000000, time.UTC).UnixNano()) + q1 := buildQuote(1, 1, srcTS, 6743250, 125000000, 6743300, 80000000, 0) // instrument 1 + q2 := buildQuote(2, 1, srcTS, 350025, 1500000, 350075, 1000000, 0) // instrument 2 + processFrame(t, parser, sink, buildFrame(1, 1, ts, q1)) + processFrame(t, parser, sink, buildFrame(1, 2, ts, q2)) + + // No records should have been written yet. + sink.Close() + records := readJSONL(t, outputPath) + if len(records) != 0 { + t.Fatalf("expected 0 records before instrument def, got %d", len(records)) + } + if parser.Buffered() != 2 { + t.Fatalf("expected 2 buffered messages, got %d", parser.Buffered()) + } + + // Re-open sink (simulating continued operation). + sink2, err := NewJSONFileSink(outputPath) + if err != nil { + t.Fatalf("creating sink2: %v", err) + } + defer sink2.Close() + + // Send first instrument definition — only instrument 1's buffered + // quote should flush; instrument 2 remains pending. + instDef := buildInstrumentDef(1, "BTC-USDT", "BTC", "USDT", -2, -8) + processFrame2(t, parser, sink2, buildFrame(1, 3, ts, instDef)) + + if parser.Buffered() != 1 { + t.Fatalf("expected 1 buffered message after partial flush, got %d", parser.Buffered()) + } + + // Send second instrument definition — the last buffered quote flushes. + instDef2 := buildInstrumentDef(2, "ETH-USDT", "ETH", "USDT", -2, -6) + processFrame2(t, parser, sink2, buildFrame(1, 4, ts, instDef2)) + + if parser.Buffered() != 0 { + t.Fatalf("expected empty buffer after full flush, got %d", parser.Buffered()) + } + + sink2.Close() + + records = readJSONL(t, outputPath) + // Should have: 2 instrument_definitions + 2 flushed quotes = 4 records. + if len(records) != 4 { + t.Fatalf("expected 4 records after flush, got %d", len(records)) + } + // After the two instrument definitions land, the expected output + // is: instrument_def(1), quote(1), instrument_def(2), quote(2). + expectedTypes := []string{"instrument_definition", "quote", "instrument_definition", "quote"} + for i, want := range expectedTypes { + if got := records[i]["type"]; got != want { + t.Errorf("record %d: expected %q, got %v", i, want, got) + } + } + if parser.Buffered() != 0 { + t.Errorf("expected 0 buffered after flush, got %d", parser.Buffered()) + } +} + +// TestBufferOverwriteAndCap verifies the bounded per-instrument buffer: +// repeated messages for the same instrument overwrite each other (keeping +// only the most recent), and new instruments are dropped once the cap is +// reached. +func TestBufferOverwriteAndCap(t *testing.T) { + parser := NewTopOfBookParser() + + ts := uint64(time.Date(2026, 4, 10, 14, 30, 0, 0, time.UTC).UnixNano()) + srcTS := ts + + // Flood the buffer with maxBufferedInstruments distinct instruments. + // The +1 beyond the cap should be dropped (not buffered, not errored). + seq := uint64(0) + for i := 0; i < maxBufferedInstruments+1; i++ { + seq++ + q := buildQuote(uint32(i+1), 1, srcTS, 100, 1, 101, 1, 0) + _, err := parser.Parse(buildFrame(1, seq, ts, q)) + if err != nil { + t.Fatalf("parse iter %d: %v", i, err) + } + } + + if got := parser.Buffered(); got != maxBufferedInstruments { + t.Errorf("expected buffered = %d (cap), got %d", maxBufferedInstruments, got) + } + + // Sending another quote for an instrument already in the buffer must + // overwrite in place — buffer size stays at the cap. + seq++ + q := buildQuote(1, 1, srcTS, 999, 999, 999, 999, 0) + if _, err := parser.Parse(buildFrame(1, seq, ts, q)); err != nil { + t.Fatalf("overwrite parse: %v", err) + } + if got := parser.Buffered(); got != maxBufferedInstruments { + t.Errorf("buffer should stay at cap after overwrite, got %d", got) + } + + // Defining instrument 1 flushes exactly one record with the most + // recent (overwritten) values, confirming the overwrite took effect. + seq++ + instDef := buildInstrumentDef(1, "SYM-1", "A", "B", -2, -8) + records, err := parser.Parse(buildFrame(1, seq, ts, instDef)) + if err != nil { + t.Fatalf("instrument def parse: %v", err) + } + + var flushedQuote *Record + for i := range records { + if records[i].Type == "quote" { + flushedQuote = &records[i] + break + } + } + if flushedQuote == nil { + t.Fatal("expected a flushed quote record after instrument def") + } + // The overwrite set bid_price to 9.99 (999 with price_exponent -2). + bidPrice, ok := flushedQuote.Fields["bid_price"].(float64) + if !ok { + t.Fatalf("bid_price missing or wrong type: %v", flushedQuote.Fields["bid_price"]) + } + if math.Abs(bidPrice-9.99) > 0.001 { + t.Errorf("expected overwritten bid_price 9.99, got %v", bidPrice) + } +} + +// TestIntegration_CSVOutput verifies the full pipeline with CSV output. +func TestIntegration_CSVOutput(t *testing.T) { + parser := NewTopOfBookParser() + outputPath := filepath.Join(t.TempDir(), "feed.csv") + sink, err := NewCSVFileSink(outputPath) + if err != nil { + t.Fatalf("creating sink: %v", err) + } + defer sink.Close() + + ts := uint64(time.Date(2026, 4, 10, 14, 30, 0, 0, time.UTC).UnixNano()) + + // Send instrument def, then a quote, then a trade. + instDef := buildInstrumentDef(1, "SOL-USDT", "SOL", "USDT", -4, -6) + processFrame(t, parser, sink, buildFrame(1, 1, ts, instDef)) + + srcTS := uint64(time.Date(2026, 4, 10, 14, 30, 1, 0, time.UTC).UnixNano()) + q := buildQuote(1, 1, srcTS, 1850000, 5000000, 1860000, 3000000, 0) + processFrame(t, parser, sink, buildFrame(1, 2, ts, q)) + + trade := buildTrade(1, 1, srcTS, 1855000, 2000000, 2) // sell + processFrame(t, parser, sink, buildFrame(1, 3, ts, trade)) + + sink.Close() + + // CSV should contain: quote header + quote row + trade header + trade row. + // Instrument definitions and heartbeats are filtered out by CSV sink. + data, err := os.ReadFile(outputPath) + if err != nil { + t.Fatalf("reading output: %v", err) + } + + lines := splitNonEmpty(string(data)) + // 1 quote header + 1 quote row + 1 trade header + 1 trade row = 4 lines. + if len(lines) != 4 { + t.Fatalf("expected 4 CSV lines, got %d: %v", len(lines), lines) + } + + // First non-header line should be the quote. + if lines[1][:5] != "quote" { + t.Errorf("expected quote row, got %q", lines[1][:10]) + } + // Third line is trade header, fourth is trade data. + if lines[3][:5] != "trade" { + t.Errorf("expected trade row, got %q", lines[3][:10]) + } +} + +// processFrame parses a frame and writes any records to the sink. +func processFrame(t *testing.T, parser Parser, sink OutputSink, frame []byte) { + t.Helper() + records, err := parser.Parse(frame) + if err != nil { + t.Fatalf("parse error: %v", err) + } + if len(records) > 0 { + if err := sink.Write(records); err != nil { + t.Fatalf("sink write error: %v", err) + } + } +} + +// processFrame2 is identical to processFrame but doesn't use t.Fatalf +// for the sink write (allows caller to handle). +func processFrame2(t *testing.T, parser Parser, sink OutputSink, frame []byte) { + t.Helper() + records, err := parser.Parse(frame) + if err != nil { + t.Fatalf("parse error: %v", err) + } + if len(records) > 0 { + if err := sink.Write(records); err != nil { + t.Fatalf("sink write error: %v", err) + } + } +} + +// readJSONL reads a JSONL file and returns parsed records. +func readJSONL(t *testing.T, path string) []map[string]any { + t.Helper() + f, err := os.Open(path) + if err != nil { + t.Fatalf("opening %s: %v", path, err) + } + defer f.Close() + + var records []map[string]any + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var r map[string]any + if err := json.Unmarshal(line, &r); err != nil { + t.Fatalf("parsing JSONL line: %v", err) + } + records = append(records, r) + } + return records +} + +func splitNonEmpty(s string) []string { + var out []string + for _, line := range split(s) { + if line != "" { + out = append(out, line) + } + } + return out +} + +func split(s string) []string { + return splitBy(s, '\n') +} + +func splitBy(s string, sep byte) []string { + var result []string + start := 0 + for i := range len(s) { + if s[i] == sep { + result = append(result, s[start:i]) + start = i + 1 + } + } + result = append(result, s[start:]) + return result +} + +// buildFrame and message builders are reused from topofbook_test.go +// (they are in the same package). +// The following are only needed if the test needs additional frame types +// not covered by the existing test helpers. + +func putInt64LE_integration(buf []byte, v int64) { + binary.LittleEndian.PutUint64(buf, uint64(v)) +} diff --git a/client/doublezerod/internal/edge/testutil/DEVNET_TEST.md b/client/doublezerod/internal/edge/testutil/DEVNET_TEST.md new file mode 100644 index 000000000..a6c5f56cd --- /dev/null +++ b/client/doublezerod/internal/edge/testutil/DEVNET_TEST.md @@ -0,0 +1,175 @@ +# Testing Edge Feed Parser in Local Devnet + +This guide walks through testing the edge feed parser end-to-end in the +local devnet environment with real containers and multicast infrastructure. + +## Prerequisites + +- Docker running with sufficient resources (8GB+ RAM for cEOS devices) +- Local devnet not already running (or destroy first with `dev/dzctl destroy -y`) + +## Quick Start + +From the repo root: + +```bash +# 1. Build all container images (includes your code changes) +dev/dzctl build + +# 2. Start core devnet (ledger, manager, activator, controller) +dev/dzctl start + +# 3. Add two devices +dev/dzctl add-device --code dz1 --exchange xams --location ams \ + --cyoa-network-host-id 8 --additional-networks dz1:dz2 +dev/dzctl add-device --code dz2 --exchange xams --location ams \ + --cyoa-network-host-id 9 --additional-networks dz1:dz2 + +# 4. Add a publisher client and a subscriber client +dev/dzctl add-client --cyoa-network-host-id 100 +dev/dzctl add-client --cyoa-network-host-id 110 +``` + +## Identify Your Containers + +```bash +docker ps --filter "label=dz.malbeclabs.com/type=devnet" --format "table {{.Names}}\t{{.Status}}" +``` + +Note the client container names — they include the Solana pubkey, e.g.: +- `dz-local-client-FposHWrkvPP3VErBAWCd4ELWGuh2mgx2Wx6cuNEA4X2S` (publisher) +- `dz-local-client-7bK9xpQwR2tN...` (subscriber) + +## Create Multicast Group and Connect + +```bash +# Get the client pubkeys +PUB_KEY=$(docker exec dz-local-client- solana address) +SUB_KEY=$(docker exec dz-local-client- solana address) + +# Create multicast group +docker exec dz-local-manager doublezero multicast group create \ + --code mg01 --max-bandwidth 10Gbps + +# Add publisher to allowlist +docker exec dz-local-manager doublezero multicast group allowlist publisher add \ + --code mg01 --user-payer $PUB_KEY --client-ip 10.0.100.100 + +# Add subscriber to allowlist +docker exec dz-local-manager doublezero multicast group allowlist subscriber add \ + --code mg01 --user-payer $SUB_KEY --client-ip 10.0.100.110 + +# Connect publisher +docker exec dz-local-client- doublezero connect multicast publisher mg01 + +# Connect subscriber +docker exec dz-local-client- doublezero connect multicast subscriber mg01 + +# Verify connections +docker exec dz-local-client- doublezero status +``` + +## Enable Edge Feed Parser + +```bash +# Enable the feed parser on the subscriber +docker exec dz-local-client- curl -s -X POST \ + --unix-socket /var/run/doublezerod/doublezerod.sock \ + -H 'Content-Type: application/json' \ + -d '{"code":"mg01","parser":"topofbook","format":"json","output":"/tmp/feed.jsonl","marketdata_port":7000,"refdata_port":7001}' \ + http://doublezero/edge/enable + +# Check status +docker exec dz-local-client- curl -s \ + --unix-socket /var/run/doublezerod/doublezerod.sock \ + http://doublezero/edge/status | jq . +``` + +## Send Synthetic Top-of-Book Data + +Build and copy the publisher tool into the publisher container: + +```bash +# Build the publisher binary (from repo root) +CGO_ENABLED=0 go build -o /tmp/topofbook-publisher \ + ./client/doublezerod/internal/edge/testutil/cmd/topofbook-publisher/ + +# Copy into publisher container +docker cp /tmp/topofbook-publisher dz-local-client-:/usr/local/bin/ + +# Get the multicast IP for mg01 +MCAST_IP=$(docker exec dz-local-manager doublezero multicast group list --json | jq -r '.[] | select(.code=="mg01") | .multicast_ip') + +# Run the publisher (sends 3 instruments, 10 quotes/sec for 30s) +docker exec dz-local-client- topofbook-publisher \ + -group $MCAST_IP -port 7000 -instruments 3 -rate 10 -duration 30s +``` + +## Verify Output + +```bash +# Check how many records were decoded +docker exec dz-local-client- wc -l /tmp/feed.jsonl + +# View the first few decoded records +docker exec dz-local-client- head -20 /tmp/feed.jsonl | jq . + +# Check parser status (records_written count) +docker exec dz-local-client- curl -s \ + --unix-socket /var/run/doublezerod/doublezerod.sock \ + http://doublezero/edge/status | jq . + +# Disable the feed parser +docker exec dz-local-client- curl -s -X POST \ + --unix-socket /var/run/doublezerod/doublezerod.sock \ + -H 'Content-Type: application/json' \ + -d '{"code":"mg01"}' \ + http://doublezero/edge/disable +``` + +## Using the CLI Instead of curl + +Once the Rust CLI is deployed in the container image, you can use: + +```bash +docker exec dz-local-client- doublezero edge enable \ + --code mg01 --parser topofbook --format json --output /tmp/feed.jsonl \ + --marketdata-port 7000 --refdata-port 7001 + +docker exec dz-local-client- doublezero edge status --json + +docker exec dz-local-client- doublezero edge disable --code mg01 +``` + +## Testing CSV Output + +```bash +docker exec dz-local-client- curl -s -X POST \ + --unix-socket /var/run/doublezerod/doublezerod.sock \ + -H 'Content-Type: application/json' \ + -d '{"code":"mg01","parser":"topofbook","format":"csv","output":"/tmp/feed.csv","marketdata_port":7000,"refdata_port":7001}' \ + http://doublezero/edge/enable + +# After sending data: +docker exec dz-local-client- head -20 /tmp/feed.csv +``` + +## Testing Unix Socket Output + +```bash +# Enable with socket output +docker exec dz-local-client- curl -s -X POST \ + --unix-socket /var/run/doublezerod/doublezerod.sock \ + -H 'Content-Type: application/json' \ + -d '{"code":"mg01","parser":"topofbook","format":"json","output":"unix:///tmp/feed.sock","marketdata_port":7000,"refdata_port":7001}' \ + http://doublezero/edge/enable + +# Connect a reader to the socket (in another terminal) +docker exec dz-local-client- socat UNIX-CONNECT:/tmp/feed.sock - +``` + +## Cleanup + +```bash +dev/dzctl destroy -y +``` diff --git a/client/doublezerod/internal/edge/testutil/cmd/topofbook-publisher/main.go b/client/doublezerod/internal/edge/testutil/cmd/topofbook-publisher/main.go new file mode 100644 index 000000000..808c354b7 --- /dev/null +++ b/client/doublezerod/internal/edge/testutil/cmd/topofbook-publisher/main.go @@ -0,0 +1,141 @@ +// topofbook-publisher is a standalone tool that sends synthetic Top-of-Book +// v0.1.0 frames to a multicast group. Use it to test the edge feed parser +// in a local devnet environment. +// +// Usage: +// +// topofbook-publisher -group 239.0.0.1 -port 7000 -instruments 3 -rate 10 -duration 30s +package main + +import ( + "flag" + "fmt" + "log" + "net" + "os" + "os/signal" + "time" + + "github.com/malbeclabs/doublezero/client/doublezerod/internal/edge/testutil" +) + +func main() { + group := flag.String("group", "239.0.0.1", "multicast group IP") + port := flag.Int("port", 7000, "destination UDP port") + numInstruments := flag.Int("instruments", 3, "number of instruments to simulate") + rate := flag.Int("rate", 10, "quotes per second per instrument") + duration := flag.Duration("duration", 30*time.Second, "how long to publish (0 = until interrupted)") + flag.Parse() + + groupIP := net.ParseIP(*group) + if groupIP == nil { + log.Fatalf("invalid multicast group IP: %s", *group) + } + + pub, err := testutil.NewPublisher(groupIP, *port) + if err != nil { + log.Fatalf("creating publisher: %v", err) + } + defer pub.Close() + + instruments := generateInstruments(*numInstruments) + + // Send instrument definitions first. + for _, inst := range instruments { + if err := pub.SendInstrumentDefinition(1, inst.id, inst.symbol, inst.leg1, inst.leg2, inst.priceExp, inst.qtyExp); err != nil { + log.Fatalf("sending instrument def for %s: %v", inst.symbol, err) + } + fmt.Printf("sent instrument definition: %s (id=%d, priceExp=%d, qtyExp=%d)\n", inst.symbol, inst.id, inst.priceExp, inst.qtyExp) + } + + // Publish quotes in a loop. + interval := time.Duration(float64(time.Second) / float64(*rate)) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + var deadline <-chan time.Time + if *duration > 0 { + timer := time.NewTimer(*duration) + defer timer.Stop() + deadline = timer.C + } + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt) + + fmt.Printf("publishing %d instruments at %d quotes/sec to %s:%d\n", len(instruments), *rate, *group, *port) + + quoteCount := 0 + tradeCount := 0 + hbCount := 0 + + for { + select { + case <-sigCh: + fmt.Printf("\ninterrupted. sent %d quotes, %d trades, %d heartbeats\n", quoteCount, tradeCount, hbCount) + return + case <-deadline: + fmt.Printf("duration elapsed. sent %d quotes, %d trades, %d heartbeats\n", quoteCount, tradeCount, hbCount) + return + case <-ticker.C: + for _, inst := range instruments { + bidPrice := inst.basePrice + int64(time.Now().UnixNano()%100) - 50 + askPrice := bidPrice + inst.spread + bidQty := uint64(100 + time.Now().UnixNano()%900) + askQty := uint64(100 + time.Now().UnixNano()%900) + + if err := pub.SendQuote(1, inst.id, 1, bidPrice, bidQty, askPrice, askQty); err != nil { + log.Printf("error sending quote for %s: %v", inst.symbol, err) + continue + } + quoteCount++ + + // Occasionally send a trade (1 in 10 quotes). + if quoteCount%10 == 0 { + side := uint8(1) // buy + if time.Now().UnixNano()%2 == 0 { + side = 2 // sell + } + if err := pub.SendTrade(1, inst.id, 1, bidPrice+inst.spread/2, bidQty/2, side); err != nil { + log.Printf("error sending trade for %s: %v", inst.symbol, err) + } + tradeCount++ + } + } + + // Send heartbeat every 100 ticks. + if quoteCount%100 == 0 { + pub.SendHeartbeat(1) + hbCount++ + } + } + } +} + +type instrument struct { + id uint32 + symbol string + leg1 string + leg2 string + priceExp int8 + qtyExp int8 + basePrice int64 + spread int64 +} + +func generateInstruments(n int) []instrument { + templates := []instrument{ + {1, "BTC-USDT", "BTC", "USDT", -2, -8, 6740000, 50}, + {2, "ETH-USDT", "ETH", "USDT", -2, -6, 350000, 25}, + {3, "SOL-USDT", "SOL", "USDT", -4, -6, 1850000, 1000}, + {4, "DOGE-USDT", "DOGE", "USDT", -6, -2, 150000, 100}, + {5, "AVAX-USDT", "AVAX", "USDT", -4, -4, 350000, 500}, + {6, "LINK-USDT", "LINK", "USDT", -4, -4, 150000, 200}, + {7, "DOT-USDT", "DOT", "USDT", -4, -4, 70000, 100}, + {8, "MATIC-USD", "MATIC", "USD", -6, -2, 800000, 500}, + } + if n > len(templates) { + n = len(templates) + } + return templates[:n] +} diff --git a/client/doublezerod/internal/edge/testutil/publisher.go b/client/doublezerod/internal/edge/testutil/publisher.go new file mode 100644 index 000000000..373e79bb5 --- /dev/null +++ b/client/doublezerod/internal/edge/testutil/publisher.go @@ -0,0 +1,161 @@ +// Package testutil provides a synthetic Top-of-Book publisher for testing +// the edge feed parser pipeline end-to-end. +package testutil + +import ( + "encoding/binary" + "net" + "time" +) + +// Publisher sends synthetic Top-of-Book v0.1.0 frames to a multicast group. +type Publisher struct { + conn *net.UDPConn + groupIP net.IP + port int + seq uint64 +} + +// NewPublisher creates a publisher that sends to the given multicast group and port. +func NewPublisher(groupIP net.IP, port int) (*Publisher, error) { + addr := &net.UDPAddr{ + IP: groupIP, + Port: port, + } + conn, err := net.DialUDP("udp4", nil, addr) + if err != nil { + return nil, err + } + return &Publisher{conn: conn, groupIP: groupIP, port: port}, nil +} + +// Close closes the publisher connection. +func (p *Publisher) Close() error { + return p.conn.Close() +} + +// SendInstrumentDefinition sends a frame containing a single InstrumentDefinition message. +func (p *Publisher) SendInstrumentDefinition(channelID uint8, instID uint32, symbol, leg1, leg2 string, priceExp, qtyExp int8) error { + msg := buildInstrumentDef(instID, symbol, leg1, leg2, priceExp, qtyExp) + frame := p.buildFrame(channelID, msg) + _, err := p.conn.Write(frame) + return err +} + +// SendQuote sends a frame containing a single Quote message. +func (p *Publisher) SendQuote(channelID uint8, instID uint32, sourceID uint16, bidPrice int64, bidQty uint64, askPrice int64, askQty uint64) error { + msg := buildQuote(instID, sourceID, bidPrice, bidQty, askPrice, askQty) + frame := p.buildFrame(channelID, msg) + _, err := p.conn.Write(frame) + return err +} + +// SendTrade sends a frame containing a single Trade message. +func (p *Publisher) SendTrade(channelID uint8, instID uint32, sourceID uint16, price int64, qty uint64, side uint8) error { + msg := buildTrade(instID, sourceID, price, qty, side) + frame := p.buildFrame(channelID, msg) + _, err := p.conn.Write(frame) + return err +} + +// SendHeartbeat sends a frame containing a single Heartbeat message. +func (p *Publisher) SendHeartbeat(channelID uint8) error { + msg := buildHeartbeat(channelID) + frame := p.buildFrame(channelID, msg) + _, err := p.conn.Write(frame) + return err +} + +func (p *Publisher) buildFrame(channelID uint8, msgs ...[]byte) []byte { + headerSize := 24 + bodySize := 0 + for _, m := range msgs { + bodySize += len(m) + } + frameLen := headerSize + bodySize + + buf := make([]byte, frameLen) + // Magic "DZ" = 0x445A little-endian + buf[0] = 0x5A + buf[1] = 0x44 + buf[2] = 1 // schema version + buf[3] = channelID + p.seq++ + binary.LittleEndian.PutUint64(buf[4:], p.seq) + binary.LittleEndian.PutUint64(buf[12:], uint64(time.Now().UnixNano())) + buf[20] = uint8(len(msgs)) + buf[21] = 0 + binary.LittleEndian.PutUint16(buf[22:], uint16(frameLen)) + + off := headerSize + for _, m := range msgs { + copy(buf[off:], m) + off += len(m) + } + return buf +} + +func buildInstrumentDef(instID uint32, symbol, leg1, leg2 string, priceExp, qtyExp int8) []byte { + buf := make([]byte, 80) + buf[0] = 0x02 + buf[1] = 80 + binary.LittleEndian.PutUint32(buf[4:], instID) + copy(buf[8:24], padNull(symbol, 16)) + copy(buf[24:32], padNull(leg1, 8)) + copy(buf[32:40], padNull(leg2, 8)) + buf[40] = 1 // crypto spot + buf[41] = byte(priceExp) + buf[42] = byte(qtyExp) + buf[43] = 1 // CLOB + binary.LittleEndian.PutUint64(buf[44:], 1) + binary.LittleEndian.PutUint64(buf[52:], 1) + binary.LittleEndian.PutUint16(buf[78:], 1) + return buf +} + +func buildQuote(instID uint32, sourceID uint16, bidPrice int64, bidQty uint64, askPrice int64, askQty uint64) []byte { + buf := make([]byte, 60) + buf[0] = 0x03 + buf[1] = 60 + binary.LittleEndian.PutUint32(buf[4:], instID) + binary.LittleEndian.PutUint16(buf[8:], sourceID) + buf[10] = 0x03 // bid + ask updated + binary.LittleEndian.PutUint64(buf[12:], uint64(time.Now().UnixNano())) + binary.LittleEndian.PutUint64(buf[20:], uint64(bidPrice)) + binary.LittleEndian.PutUint64(buf[28:], bidQty) + binary.LittleEndian.PutUint64(buf[36:], uint64(askPrice)) + binary.LittleEndian.PutUint64(buf[44:], askQty) + binary.LittleEndian.PutUint16(buf[52:], 1) + binary.LittleEndian.PutUint16(buf[54:], 1) + return buf +} + +func buildTrade(instID uint32, sourceID uint16, price int64, qty uint64, side uint8) []byte { + buf := make([]byte, 52) + buf[0] = 0x04 + buf[1] = 52 + binary.LittleEndian.PutUint32(buf[4:], instID) + binary.LittleEndian.PutUint16(buf[8:], sourceID) + buf[10] = side + binary.LittleEndian.PutUint64(buf[12:], uint64(time.Now().UnixNano())) + binary.LittleEndian.PutUint64(buf[20:], uint64(price)) + binary.LittleEndian.PutUint64(buf[28:], qty) + binary.LittleEndian.PutUint64(buf[36:], 12345) + binary.LittleEndian.PutUint64(buf[44:], qty) + return buf +} + +func buildHeartbeat(channelID uint8) []byte { + buf := make([]byte, 16) + buf[0] = 0x01 + buf[1] = 16 + buf[4] = channelID + binary.LittleEndian.PutUint64(buf[8:], uint64(time.Now().UnixNano())) + return buf +} + +func padNull(s string, n int) []byte { + buf := make([]byte, n) + copy(buf, s) + return buf +}