From ed8210a8ba16fdaf84f111678b4f5cc4bc7b951b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Mon, 29 Jun 2026 12:01:10 +0100 Subject: [PATCH 01/11] Streaming JSON --- Cargo.lock | 4 +- crates/driver/Cargo.toml | 4 +- crates/driver/src/infra/persistence/mod.rs | 20 +- crates/driver/src/infra/solver/mod.rs | 68 ++---- crates/driver/src/infra/solver/streaming.rs | 205 ++++++++++++++++++ .../src/infra/solver/streaming/tee_writer.rs | 63 ++++++ crates/s3/src/lib.rs | 8 + 7 files changed, 318 insertions(+), 54 deletions(-) create mode 100644 crates/driver/src/infra/solver/streaming.rs create mode 100644 crates/driver/src/infra/solver/streaming/tee_writer.rs diff --git a/Cargo.lock b/Cargo.lock index e55be1c7e2..a1bef279af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3761,7 +3761,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.114", ] [[package]] @@ -3938,6 +3938,7 @@ dependencies = [ "eth-domain-types", "ethrpc", "event-indexing", + "flate2", "futures", "gas-price-estimation", "hex-literal", @@ -3974,6 +3975,7 @@ dependencies = [ "tikv-jemallocator", "token-info", "tokio", + "tokio-stream", "toml", "tower 0.5.3", "tower-http", diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index a79476d896..d710682cec 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -38,6 +38,7 @@ derive_more = { workspace = true } eth-domain-types = { workspace = true } ethrpc = { workspace = true } event-indexing = { workspace = true } +flate2 = { workspace = true } futures = { workspace = true } gas-price-estimation = { workspace = true } hex-literal = { workspace = true } @@ -55,7 +56,7 @@ prometheus = { workspace = true } prometheus-metric-storage = { workspace = true } rand = { workspace = true } request-sharing = { workspace = true } -reqwest = { workspace = true, features = ["query"] } +reqwest = { workspace = true, features = ["query", "stream"] } s3 = { workspace = true } serde = { workspace = true, features = ["derive"] } serde-ext = { workspace = true } @@ -67,6 +68,7 @@ thiserror = { workspace = true } tikv-jemallocator = { workspace = true } token-info = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "time"] } +tokio-stream = { workspace = true } toml = { workspace = true } tower = { workspace = true } tower-http = { workspace = true, features = ["decompression-br", "limit", "trace"] } diff --git a/crates/driver/src/infra/persistence/mod.rs b/crates/driver/src/infra/persistence/mod.rs index c03a3e0840..234b22ed81 100644 --- a/crates/driver/src/infra/persistence/mod.rs +++ b/crates/driver/src/infra/persistence/mod.rs @@ -5,6 +5,7 @@ use { }, bytes::Bytes, std::sync::Arc, + tokio::sync::oneshot, tracing::Instrument, }; @@ -52,16 +53,27 @@ impl Persistence { } } - /// Saves the given auction with liquidity with fire and forget mentality - /// (non-blocking operation) - pub fn archive_auction(&self, auction_id: Id, body: Bytes) { + /// Whether auction archival to S3 is configured. + pub fn archives_enabled(&self) -> bool { + self.s3.is_some() + } + + /// Archives the auction body to S3 (fire and forget). The body is + /// gzip-compressed while it is streamed to the solver, so the full + /// uncompressed JSON is never held in memory; the compressed bytes arrive + /// through `compressed` once serialization finishes. An error on the + /// receiver means the request was aborted, so there is nothing to archive. + pub fn archive_auction_gzipped(&self, auction_id: Id, compressed: oneshot::Receiver) { let Some(uploader) = self.s3.clone() else { return; }; tokio::spawn( async move { + let Ok(compressed) = compressed.await else { + return; + }; match uploader - .upload_json_bytes(auction_id.to_string(), body) + .upload_gzipped(auction_id.to_string(), compressed) .await { Ok(key) => { diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index d11982a098..c4b9558790 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -27,7 +27,6 @@ use { signers::{Signature, aws::AwsSigner, local::PrivateKeySigner}, }, anyhow::Result, - bytes::Bytes, derive_more::{From, Into}, eth_domain_types as eth, num::BigRational, @@ -35,7 +34,6 @@ use { reqwest::header::HeaderName, std::{ collections::HashMap, - sync::atomic::{AtomicUsize, Ordering}, time::{Duration, Instant}, }, thiserror::Error, @@ -44,6 +42,7 @@ use { pub mod dto; pub mod eip7702; +mod streaming; // TODO At some point I should be checking that the names are unique, I don't // think I'm doing that. @@ -389,12 +388,26 @@ impl Solver { self.config.haircut_bps, ); - let body = serialize_body(auction_dto); + let url = shared::url::join(&self.config.endpoint, "solve"); - if let Some(id) = auction.id() { - // Only auctions with IDs are real auctions (/quote requests don't have an ID). - // Only for those it makes sense to archive them and measure the execution time. - self.persistence.archive_auction(id, body.clone()); + // Real auctions (those with an ID) are archived to S3; quotes aren't, so + // they skip the gzip capture entirely and just stream the body. + let archive_id = self + .persistence + .archives_enabled() + .then(|| auction.id()) + .flatten(); + let body: reqwest::Body = match archive_id { + // Stream the request body while capturing a gzipped copy for S3, so + // neither the request nor the archive holds the full JSON at once. + Some(id) => { + let (body, compressed) = streaming::stream_body_and_gzip(auction_dto); + self.persistence.archive_auction_gzipped(id, compressed); + body + } + None => streaming::stream_body(auction_dto), + }; + if auction.id().is_some() { ::observe::metrics::metrics().measure_auction_overhead( start, "driver", @@ -402,8 +415,6 @@ impl Solver { ); } - let url = shared::url::join(&self.config.endpoint, "solve"); - super::observe::solver_request(&url, &body); let timeout = match auction.deadline(self.timeouts()).solvers().remaining() { Ok(timeout) => timeout, Err(_) => { @@ -543,45 +554,6 @@ impl Solver { } } -/// Serializes the request body in a way that avoid re-allocating while not -/// overallocating a lot of memory. It does that by keeping track of the biggest -/// request seen for each kind of request (quote with/without liquidity, full -/// auction with/without liquidity) and allocating the correct amount of memory -/// based on the data in the auction. -fn serialize_body(auction_dto: solvers_dto::auction::Auction) -> Bytes { - // these values store the biggest allocation we needed for each - // category of requests - static QUOTE_WITH_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000); - static QUOTE_WITHOUT_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000); - static AUCTION_WITH_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000); - static AUCTION_WITHOUT_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000); - - // based on the "shape" of the auction we pick the allocation size - let is_auction = auction_dto.id.is_some(); - let with_liquidity = !auction_dto.liquidity.is_empty(); - let memory_target = match (is_auction, with_liquidity) { - (false, false) => "E_WITHOUT_LIQUIDITY, - (false, true) => "E_WITH_LIQUIDITY, - (true, false) => &AUCTION_WITHOUT_LIQUIDITY, - (true, true) => &AUCTION_WITH_LIQUIDITY, - }; - - // pre-allocate biggest request size + 0.5% (to avoid re-allocations when - // the request grows only slightly) - let pre_alloc_size = memory_target.load(Ordering::Relaxed); - let pre_alloc_size = pre_alloc_size + pre_alloc_size * 5 / 1_000; - let mut buffer = Vec::with_capacity(pre_alloc_size); - - serde_json::to_writer(&mut buffer, &auction_dto).unwrap(); - - // now that we know how much memory was actually needed we update the memory - // targets - memory_target.fetch_max(buffer.len(), Ordering::Relaxed); - tracing::trace!(pre_alloc_size, final_size = buffer.len(), "body allocation"); - - Bytes::from(buffer) -} - #[cfg(test)] mod tests { use { diff --git a/crates/driver/src/infra/solver/streaming.rs b/crates/driver/src/infra/solver/streaming.rs new file mode 100644 index 0000000000..62406d510a --- /dev/null +++ b/crates/driver/src/infra/solver/streaming.rs @@ -0,0 +1,205 @@ +//! Streams a serializable value into a reqwest body so the full (multi-MB) +//! payload is never held in memory at once. +//! +//! A [`BufWriter`] coalesces the serializer's many tiny writes into +//! `CHUNK_SIZE` blocks, which a [`TeeWriter`] then fans out to the body channel +//! and an optional secondary sink. Both entry points share one core +//! ([`stream_into`]) and differ only in that secondary: [`stream_body`] +//! discards it, [`stream_body_and_gzip`] captures a gzip copy for S3. + +mod tee_writer; + +use { + bytes::Bytes, + flate2::{Compression, write::GzEncoder}, + std::io::{BufWriter, Write}, + tee_writer::TeeWriter, + tokio::sync::{mpsc, oneshot}, + tokio_stream::wrappers::ReceiverStream, +}; + +/// Block size the serializer's writes are coalesced into before being chunked +/// onto the body channel and fed to the gzip encoder. +const CHUNK_SIZE: usize = 64 * 1024; + +/// Blocks buffered in the body channel before the serializing thread is parked. +/// Bounds the streamed request body at ~`CHANNEL_CAPACITY * CHUNK_SIZE` through +/// backpressure (the gzip archive copy, when enabled, is retained separately). +const CHANNEL_CAPACITY: usize = 8; + +/// Serializes `value` to JSON on a blocking thread, streaming it into the +/// returned reqwest body. Backpressure from the body channel caps memory. +pub fn stream_body(value: T) -> reqwest::Body +where + T: serde::Serialize + Send + 'static, +{ + stream_into(value, std::io::sink()) +} + +/// Like [`stream_body`], but also gzips the same serialization for S3 in one +/// pass. The receiver yields the compressed bytes once serialization finishes, +/// or errors (sender dropped) if it aborts — leaving nothing to archive. +pub fn stream_body_and_gzip(value: T) -> (reqwest::Body, oneshot::Receiver) +where + T: serde::Serialize + Send + 'static, +{ + let (gzip, compressed) = GzipCapture::new(); + let body = stream_into(value, gzip); + (body, compressed) +} + +/// Serializes `value` into a tee of the body channel and `secondary` on a +/// blocking thread, then finalizes each sink. Shared by both entry points. +fn stream_into(value: T, secondary: S) -> reqwest::Body +where + T: serde::Serialize + Send + 'static, + S: Write + Finalize + Send + 'static, +{ + let (tx, rx) = mpsc::channel::>(CHANNEL_CAPACITY); + let body = reqwest::Body::wrap_stream(ReceiverStream::new(rx)); + // `spawn_blocking` doesn't inherit the caller's span, so carry it across so + // the diagnostics below keep the auction/request context. + let span = tracing::Span::current(); + tokio::task::spawn_blocking(move || { + let _guard = span.enter(); + let mut writer = + BufWriter::with_capacity(CHUNK_SIZE, TeeWriter::new(ChannelWriter(tx), secondary)); + if let Err(err) = serde_json::to_writer(&mut writer, &value) { + tracing::debug!(?err, "aborting streamed request body"); + return; + } + // `into_inner` flushes the final block into the tee; on error the body + // channel is dropped (truncating the request) and `secondary` with it. + let tee = match writer.into_inner() { + Ok(tee) => tee, + Err(err) => { + tracing::debug!(err = ?err.error(), "flushing streamed request body failed"); + return; + } + }; + let (channel, secondary) = tee.into_parts(); + // Close the body so reqwest can finish; finalize the copy separately. + drop(channel); + secondary.finalize(); + }); + body +} + +/// A [`Write`] that sends each block to the body channel, blocking when it's +/// full so serialization can't outpace the request (backpressure). Dropping it +/// closes the channel, signalling the end of the body. +struct ChannelWriter(mpsc::Sender>); + +impl Write for ChannelWriter { + fn write(&mut self, data: &[u8]) -> std::io::Result { + // `copy_from_slice` copies each block a second time (after `BufWriter`'s + // own buffering). If this ever shows up in a profile, a `BytesMut`-backed + // writer that chunks and `freeze()`s could hand the channel owned bytes + // without the copy — at the cost of reimplementing the coalescing + // `BufWriter` gives us for free. + self.0 + .blocking_send(Ok(Bytes::copy_from_slice(data))) + .map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "request body receiver dropped", + ) + })?; + Ok(data.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +/// A consume-by-value cleanup for the tee's secondary sink, run after the full +/// body has been written (e.g. finishing a gzip stream and handing off the +/// result). [`stream_into`] invokes it only on success; an aborted +/// serialization drops the sink without calling it. +trait Finalize { + fn finalize(self); +} + +/// No-op secondary for [`stream_body`]. +impl Finalize for std::io::Sink { + fn finalize(self) {} +} + +/// A [`Write`] sink that gzips into memory and delivers the bytes over a +/// oneshot when finalized. +struct GzipCapture { + encoder: GzEncoder>, + tx: oneshot::Sender, +} + +impl GzipCapture { + fn new() -> (Self, oneshot::Receiver) { + let (tx, rx) = oneshot::channel(); + let encoder = GzEncoder::new(Vec::new(), Compression::new(3)); + (Self { encoder, tx }, rx) + } +} + +impl Finalize for GzipCapture { + /// Finishes the gzip stream and sends the bytes. A dropped receiver just + /// means archival was no longer wanted. + fn finalize(self) { + match self.encoder.finish() { + Ok(compressed) => drop(self.tx.send(Bytes::from(compressed))), + Err(err) => tracing::debug!(?err, "gzip of archived request body failed"), + } + } +} + +impl Write for GzipCapture { + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.encoder.write(data) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.encoder.flush() + } +} + +#[cfg(test)] +mod tests { + use {super::*, flate2::read::GzDecoder, serde_json::json, std::io::Read}; + + /// The streamed body must reassemble to exactly the serialized JSON. + #[tokio::test] + async fn channel_writer_forwards_all_bytes() { + let data = vec![7u8; CHUNK_SIZE * 2 + 123]; + let (tx, mut rx) = mpsc::channel::>(CHANNEL_CAPACITY); + let expected = data.clone(); + let writer = tokio::task::spawn_blocking(move || { + let mut writer = BufWriter::with_capacity(CHUNK_SIZE, ChannelWriter(tx)); + writer.write_all(&data).unwrap(); + writer.flush().unwrap(); + }); + + let mut reassembled = Vec::new(); + while let Some(chunk) = rx.recv().await { + reassembled.extend_from_slice(&chunk.unwrap()); + } + writer.await.unwrap(); + + assert_eq!(reassembled, expected); + } + + /// The gzip capture must compress exactly the serialized JSON. + #[tokio::test] + async fn gzip_capture_matches_serialized_value() { + let value = json!({ "a": 1, "b": [1, 2, 3], "c": "hello" }); + // Keep `body` alive: dropping its receiver would abort serialization + // before the gzip is captured. + let (_body, gzip_rx) = stream_body_and_gzip(value.clone()); + + let compressed = gzip_rx.await.unwrap(); + let mut decoded = Vec::new(); + GzDecoder::new(&compressed[..]) + .read_to_end(&mut decoded) + .unwrap(); + assert_eq!(decoded, serde_json::to_vec(&value).unwrap()); + } +} diff --git a/crates/driver/src/infra/solver/streaming/tee_writer.rs b/crates/driver/src/infra/solver/streaming/tee_writer.rs new file mode 100644 index 0000000000..0111c7a3e4 --- /dev/null +++ b/crates/driver/src/infra/solver/streaming/tee_writer.rs @@ -0,0 +1,63 @@ +use std::io::Write; + +/// A [`Write`] that fans every write out to two writers, so one serialization +/// pass feeds independent sinks (e.g. a request body and a gzip copy). A write +/// succeeds only if it succeeds on both. +pub(super) struct TeeWriter { + primary: A, + secondary: B, +} + +impl TeeWriter { + pub(super) fn new(primary: A, secondary: B) -> Self { + Self { primary, secondary } + } + + /// Splits the tee back into its two sinks so each can be finalized on its + /// own. + pub(super) fn into_parts(self) -> (A, B) { + (self.primary, self.secondary) + } +} + +impl Write for TeeWriter { + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.primary.write_all(data)?; + self.secondary.write_all(data)?; + Ok(data.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.primary.flush()?; + self.secondary.flush() + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + flate2::{Compression, read::GzDecoder, write::GzEncoder}, + std::io::Read, + }; + + /// The tee must hand identical bytes to both sinks. + #[test] + fn forwards_to_both_sinks() { + let data = b"the quick brown fox jumps over the lazy dog".repeat(100); + let mut plain = Vec::new(); + let mut gzip = GzEncoder::new(Vec::new(), Compression::new(3)); + { + let mut tee = TeeWriter::new(&mut plain, &mut gzip); + tee.write_all(&data).unwrap(); + } + let compressed = gzip.finish().unwrap(); + + assert_eq!(plain, data); + let mut decoded = Vec::new(); + GzDecoder::new(&compressed[..]) + .read_to_end(&mut decoded) + .unwrap(); + assert_eq!(decoded, data); + } +} diff --git a/crates/s3/src/lib.rs b/crates/s3/src/lib.rs index ba461c7e40..53460fecb3 100644 --- a/crates/s3/src/lib.rs +++ b/crates/s3/src/lib.rs @@ -58,6 +58,14 @@ impl Uploader { let compressed = tokio::task::spawn_blocking(move || Self::gzip(&content)) .await .context("compression task panicked")??; + self.upload_gzipped(id, Bytes::from(compressed)).await + } + + /// Uploads bytes that are already gzip-compressed JSON, tagging the object + /// with `Content-Encoding: gzip`. Lets callers that compress while + /// streaming the data elsewhere avoid ever holding the full + /// uncompressed JSON here. + pub async fn upload_gzipped(&self, id: String, compressed: Bytes) -> Result { let key = std::path::Path::new(&self.filename_prefix) .join(format!("{id}.json")) .to_str() From ec3682b46d3a754e3c88ca1412d89c9f957ed005 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Mon, 29 Jun 2026 18:50:15 +0100 Subject: [PATCH 02/11] [driver] Address review on streamed /solve body Review fixes that don't depend on the transmission-timing wrapper: - Reduce CHANNEL_CAPACITY 8 -> 2; double buffering is enough to keep the consumer fed without raising the memory ceiling. - Carry raw `Bytes` over the body channel and wrap them into infallible `Ok`s at the `wrap_stream` boundary instead of channeling `Result`s. - Move the gzip compression policy into the `s3` crate via `GzipWriter`, so the streamed archive copy is compressed with the same settings as the eager upload path. - Use the `mod.rs` layout for the `streaming` module. - Note the UNIX `tee` lineage in `TeeWriter`'s docs. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../solver/{streaming.rs => streaming/mod.rs} | 49 ++++++++++++------- .../src/infra/solver/streaming/tee_writer.rs | 3 +- crates/s3/src/lib.rs | 48 ++++++++++++++++-- 3 files changed, 78 insertions(+), 22 deletions(-) rename crates/driver/src/infra/solver/{streaming.rs => streaming/mod.rs} (83%) diff --git a/crates/driver/src/infra/solver/streaming.rs b/crates/driver/src/infra/solver/streaming/mod.rs similarity index 83% rename from crates/driver/src/infra/solver/streaming.rs rename to crates/driver/src/infra/solver/streaming/mod.rs index 62406d510a..220b3c04c8 100644 --- a/crates/driver/src/infra/solver/streaming.rs +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -11,8 +11,11 @@ mod tee_writer; use { bytes::Bytes, - flate2::{Compression, write::GzEncoder}, - std::io::{BufWriter, Write}, + futures::StreamExt, + std::{ + convert::Infallible, + io::{BufWriter, Write}, + }, tee_writer::TeeWriter, tokio::sync::{mpsc, oneshot}, tokio_stream::wrappers::ReceiverStream, @@ -23,9 +26,11 @@ use { const CHUNK_SIZE: usize = 64 * 1024; /// Blocks buffered in the body channel before the serializing thread is parked. -/// Bounds the streamed request body at ~`CHANNEL_CAPACITY * CHUNK_SIZE` through -/// backpressure (the gzip archive copy, when enabled, is retained separately). -const CHANNEL_CAPACITY: usize = 8; +/// Two is enough for double buffering: the consumer can transmit one block +/// while the serializer produces the next. A larger value would only raise the +/// memory ceiling (~`CHANNEL_CAPACITY * CHUNK_SIZE`) without improving +/// throughput (the gzip archive copy, when enabled, is retained separately). +const CHANNEL_CAPACITY: usize = 2; /// Serializes `value` to JSON on a blocking thread, streaming it into the /// returned reqwest body. Backpressure from the body channel caps memory. @@ -55,8 +60,10 @@ where T: serde::Serialize + Send + 'static, S: Write + Finalize + Send + 'static, { - let (tx, rx) = mpsc::channel::>(CHANNEL_CAPACITY); - let body = reqwest::Body::wrap_stream(ReceiverStream::new(rx)); + let (tx, rx) = mpsc::channel::(CHANNEL_CAPACITY); + // The channel carries raw `Bytes`; `wrap_stream` wants a `TryStream`, so the + // chunks are wrapped into infallible `Ok`s at the boundary. + let body = reqwest::Body::wrap_stream(ReceiverStream::new(rx).map(Ok::<_, Infallible>)); // `spawn_blocking` doesn't inherit the caller's span, so carry it across so // the diagnostics below keep the auction/request context. let span = tracing::Span::current(); @@ -88,7 +95,7 @@ where /// A [`Write`] that sends each block to the body channel, blocking when it's /// full so serialization can't outpace the request (backpressure). Dropping it /// closes the channel, signalling the end of the body. -struct ChannelWriter(mpsc::Sender>); +struct ChannelWriter(mpsc::Sender); impl Write for ChannelWriter { fn write(&mut self, data: &[u8]) -> std::io::Result { @@ -98,7 +105,7 @@ impl Write for ChannelWriter { // without the copy — at the cost of reimplementing the coalescing // `BufWriter` gives us for free. self.0 - .blocking_send(Ok(Bytes::copy_from_slice(data))) + .blocking_send(Bytes::copy_from_slice(data)) .map_err(|_| { std::io::Error::new( std::io::ErrorKind::BrokenPipe, @@ -127,17 +134,23 @@ impl Finalize for std::io::Sink { } /// A [`Write`] sink that gzips into memory and delivers the bytes over a -/// oneshot when finalized. +/// oneshot when finalized. The gzip settings live in the `s3` crate so the +/// archived copy is compressed identically to the eager upload path. struct GzipCapture { - encoder: GzEncoder>, + writer: s3::GzipWriter, tx: oneshot::Sender, } impl GzipCapture { fn new() -> (Self, oneshot::Receiver) { let (tx, rx) = oneshot::channel(); - let encoder = GzEncoder::new(Vec::new(), Compression::new(3)); - (Self { encoder, tx }, rx) + ( + Self { + writer: s3::GzipWriter::new(), + tx, + }, + rx, + ) } } @@ -145,7 +158,7 @@ impl Finalize for GzipCapture { /// Finishes the gzip stream and sends the bytes. A dropped receiver just /// means archival was no longer wanted. fn finalize(self) { - match self.encoder.finish() { + match self.writer.finish() { Ok(compressed) => drop(self.tx.send(Bytes::from(compressed))), Err(err) => tracing::debug!(?err, "gzip of archived request body failed"), } @@ -154,11 +167,11 @@ impl Finalize for GzipCapture { impl Write for GzipCapture { fn write(&mut self, data: &[u8]) -> std::io::Result { - self.encoder.write(data) + self.writer.write(data) } fn flush(&mut self) -> std::io::Result<()> { - self.encoder.flush() + self.writer.flush() } } @@ -170,7 +183,7 @@ mod tests { #[tokio::test] async fn channel_writer_forwards_all_bytes() { let data = vec![7u8; CHUNK_SIZE * 2 + 123]; - let (tx, mut rx) = mpsc::channel::>(CHANNEL_CAPACITY); + let (tx, mut rx) = mpsc::channel::(CHANNEL_CAPACITY); let expected = data.clone(); let writer = tokio::task::spawn_blocking(move || { let mut writer = BufWriter::with_capacity(CHUNK_SIZE, ChannelWriter(tx)); @@ -180,7 +193,7 @@ mod tests { let mut reassembled = Vec::new(); while let Some(chunk) = rx.recv().await { - reassembled.extend_from_slice(&chunk.unwrap()); + reassembled.extend_from_slice(&chunk); } writer.await.unwrap(); diff --git a/crates/driver/src/infra/solver/streaming/tee_writer.rs b/crates/driver/src/infra/solver/streaming/tee_writer.rs index 0111c7a3e4..137df1075b 100644 --- a/crates/driver/src/infra/solver/streaming/tee_writer.rs +++ b/crates/driver/src/infra/solver/streaming/tee_writer.rs @@ -2,7 +2,8 @@ use std::io::Write; /// A [`Write`] that fans every write out to two writers, so one serialization /// pass feeds independent sinks (e.g. a request body and a gzip copy). A write -/// succeeds only if it succeeds on both. +/// succeeds only if it succeeds on both. Named after the UNIX `tee` command, +/// which likewise duplicates its input to multiple outputs. pub(super) struct TeeWriter { primary: A, secondary: B, diff --git a/crates/s3/src/lib.rs b/crates/s3/src/lib.rs index 53460fecb3..be98da0611 100644 --- a/crates/s3/src/lib.rs +++ b/crates/s3/src/lib.rs @@ -7,11 +7,53 @@ use { primitives::{ByteStream, SdkBody}, }, bytes::Bytes, - flate2::{Compression, bufread::GzEncoder}, + flate2::{Compression, bufread, write}, serde::Serialize, - std::io::Read, + std::io::{Read, Write}, }; +/// gzip compression level used for every object archived to S3. Centralized so +/// the eager [`Uploader::upload`] path and callers that compress while +/// streaming the same bytes elsewhere produce identically-compressed objects. +const COMPRESSION_LEVEL: u32 = 3; + +/// A streaming gzip sink: write the plaintext JSON in, then +/// [`GzipWriter::finish`] to get the compressed bytes. Lets callers that stream +/// a payload somewhere else compress the same bytes in one pass — with the +/// exact settings the eager upload path uses — without ever materializing the +/// full plaintext here. +pub struct GzipWriter(write::GzEncoder>); + +impl GzipWriter { + pub fn new() -> Self { + Self(write::GzEncoder::new( + Vec::new(), + Compression::new(COMPRESSION_LEVEL), + )) + } + + /// Finishes the gzip stream and returns the compressed bytes. + pub fn finish(self) -> std::io::Result> { + self.0.finish() + } +} + +impl Default for GzipWriter { + fn default() -> Self { + Self::new() + } +} + +impl Write for GzipWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.flush() + } +} + #[derive(Default)] pub struct Config { pub bucket: String, @@ -104,7 +146,7 @@ impl Uploader { /// Compresses the input bytes using Gzip. fn gzip(bytes: &[u8]) -> Result> { - let mut encoder = GzEncoder::new(bytes, Compression::new(3)); + let mut encoder = bufread::GzEncoder::new(bytes, Compression::new(COMPRESSION_LEVEL)); let mut encoded: Vec = Vec::with_capacity(bytes.len()); encoder.read_to_end(&mut encoded).context("gzip encoding")?; Ok(encoded) From 5703c607e392dd071237fc540718503d2150fbac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Mon, 29 Jun 2026 18:50:47 +0100 Subject: [PATCH 03/11] [observe] Add Measured body stream; reuse for driver and autopilot Extract the request-body transmission timing that lived in autopilot's `ByteStream` into a generic `observe::http_body::Measured` stream wrapper, and apply it to the driver's streamed `/solve` body so the driver->solver hop also reports `to_transmission_start_ms` / `transmission_ms`. autopilot now wraps a plain single-chunk stream with `Measured` and drops the bespoke `ByteStream` type, which was only a `Bytes`-to-stream adapter once its timing moved out. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/infra/solvers/byte_stream.rs | 94 ------------------- .../autopilot/src/infra/solvers/dto/solve.rs | 13 ++- crates/autopilot/src/infra/solvers/mod.rs | 1 - .../driver/src/infra/solver/streaming/mod.rs | 7 +- crates/observe/src/http_body.rs | 81 ++++++++++++++++ crates/observe/src/lib.rs | 1 + 6 files changed, 96 insertions(+), 101 deletions(-) delete mode 100644 crates/autopilot/src/infra/solvers/byte_stream.rs create mode 100644 crates/observe/src/http_body.rs diff --git a/crates/autopilot/src/infra/solvers/byte_stream.rs b/crates/autopilot/src/infra/solvers/byte_stream.rs deleted file mode 100644 index b36f77ce60..0000000000 --- a/crates/autopilot/src/infra/solvers/byte_stream.rs +++ /dev/null @@ -1,94 +0,0 @@ -use { - bytes::Bytes, - futures::Stream, - std::{task::Poll, time::Instant}, -}; - -/// Thin wrapper around a payload that already exists fully serialized -/// in-memory. The purpose of converting that into a stream is to allow -/// measuring the data transfer of the request body (for debugging and -/// optimization purposes). -/// -/// Note that this measurement is only an approximation of the truth. -/// The reason is that this only measures how long it takes `hyper` -/// to load the last byte from the body into the buffer of the network -/// stack. However, given how big `/solve` requests are in practice -/// `hyper` should have to flush the buffer a couple of times so the -/// measured time should be reasonably accurate. -pub struct ByteStream { - data: Bytes, - created_at: Instant, - first_polled_at: Option, - span: tracing::Span, -} - -impl ByteStream { - pub fn new(data: Bytes) -> Self { - Self { - data, - created_at: Instant::now(), - first_polled_at: None, - span: tracing::Span::current(), - } - } -} - -// Since `hyper` uses `Bytes` under the hood which are reference counted -// the chunks we yield can be as big as we want. To minimize overhead -// that's only there for debugging purposes we always yield all the -// data at once. The measurements will still be accurate because `hyper` -// has to poll the stream once more to confirm that it's actually -// exhausted. -impl Stream for ByteStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - let this = self.as_mut().get_mut(); - - if this.first_polled_at.is_none() { - this.first_polled_at = Some(Instant::now()); - } - - if this.data.is_empty() { - let _span = this.span.enter(); - let first_poll = this.first_polled_at.expect("initialized at first poll"); - tracing::debug!( - to_transmission_start_ms = first_poll.duration_since(this.created_at).as_millis(), - transmission_ms = first_poll.elapsed().as_millis(), - "finished streaming http request body" - ); - Poll::Ready(None) - } else { - // steals all the data and leaves 0 bytes in self - let chunk = std::mem::take(&mut this.data); - Poll::Ready(Some(Ok(chunk))) - } - } -} - -#[cfg(test)] -mod tests { - use {super::*, futures::FutureExt, tokio_stream::StreamExt}; - - #[test] - fn byte_stream_yields_bytes() { - let original = Bytes::from_iter(0..100); - let mut stream = ByteStream::new(original.clone()); - - let chunk = stream - .next() - .now_or_never() - .expect("stream is always ready"); - // stream always yields all the data on the first poll - assert_eq!(chunk.unwrap().unwrap(), original); - - let chunk = stream - .next() - .now_or_never() - .expect("stream is always ready"); - assert!(chunk.is_none()); - } -} diff --git a/crates/autopilot/src/infra/solvers/dto/solve.rs b/crates/autopilot/src/infra/solvers/dto/solve.rs index a1bdfc36f8..45913038ad 100644 --- a/crates/autopilot/src/infra/solvers/dto/solve.rs +++ b/crates/autopilot/src/infra/solvers/dto/solve.rs @@ -4,7 +4,7 @@ use { domain, infra::{ persistence::dto::{self, order::Order}, - solvers::{InjectIntoHttpRequest, byte_stream::ByteStream}, + solvers::InjectIntoHttpRequest, }, }, alloy::primitives::{Address, U256}, @@ -14,12 +14,14 @@ use { eth_domain_types as eth, itertools::Itertools, number::serialization::HexOrDecimalU256, + observe::http_body::Measured, reqwest::{RequestBuilder, header::HeaderValue}, serde::{Deserialize, Serialize}, serde_with::{DisplayFromStr, serde_as}, std::{ borrow::Cow, collections::{HashMap, HashSet}, + convert::Infallible, io::Write, time::Duration, }, @@ -132,10 +134,13 @@ impl Request { impl InjectIntoHttpRequest for Request { fn inject(&self, request: RequestBuilder) -> RequestBuilder { + // Serve the already-serialized body as a single-chunk stream so it can + // be wrapped in `Measured` to time how long the body takes to transmit. + // The bytes are already in memory (`hyper`'s `Bytes` are reference + // counted), so there's nothing to gain from chunking. + let body = futures::stream::iter([Ok::<_, Infallible>(self.body.clone())]); let request = request - .body(reqwest::Body::wrap_stream(ByteStream::new( - self.body.clone(), - ))) + .body(reqwest::Body::wrap_stream(Measured::new(body))) // announce which auction this request is for in the // headers to help the driver detect duplicated // `/solve` requests before streaming the body diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index 6efb9bfd4f..b0ff33741a 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -13,7 +13,6 @@ use { url::Url, }; -mod byte_stream; pub mod dto; const RESPONSE_SIZE_LIMIT: usize = 10_000_000; diff --git a/crates/driver/src/infra/solver/streaming/mod.rs b/crates/driver/src/infra/solver/streaming/mod.rs index 220b3c04c8..e1fe31e1d4 100644 --- a/crates/driver/src/infra/solver/streaming/mod.rs +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -12,6 +12,7 @@ mod tee_writer; use { bytes::Bytes, futures::StreamExt, + observe::http_body::Measured, std::{ convert::Infallible, io::{BufWriter, Write}, @@ -62,8 +63,10 @@ where { let (tx, rx) = mpsc::channel::(CHANNEL_CAPACITY); // The channel carries raw `Bytes`; `wrap_stream` wants a `TryStream`, so the - // chunks are wrapped into infallible `Ok`s at the boundary. - let body = reqwest::Body::wrap_stream(ReceiverStream::new(rx).map(Ok::<_, Infallible>)); + // chunks are wrapped into infallible `Ok`s at the boundary. `Measured` logs + // how long the solver took to read the body once it is fully drained. + let stream = Measured::new(ReceiverStream::new(rx)).map(Ok::<_, Infallible>); + let body = reqwest::Body::wrap_stream(stream); // `spawn_blocking` doesn't inherit the caller's span, so carry it across so // the diagnostics below keep the auction/request context. let span = tracing::Span::current(); diff --git a/crates/observe/src/http_body.rs b/crates/observe/src/http_body.rs new file mode 100644 index 0000000000..757baaaed0 --- /dev/null +++ b/crates/observe/src/http_body.rs @@ -0,0 +1,81 @@ +//! Instrumentation for outgoing HTTP request bodies. + +use { + futures::Stream, + pin_project_lite::pin_project, + std::{ + pin::Pin, + task::{Context, Poll}, + time::Instant, + }, +}; + +pin_project! { + /// Wraps an HTTP request body stream and logs, once the body has been fully + /// drained, how long it took to hand off to the network. Useful to tell + /// whether a slow round-trip is dominated by us sending a large (multi-MB) + /// body or by the remote being slow to read it. + /// + /// `to_transmission_start_ms` is the gap between construction and the first + /// poll (how long until the HTTP client started reading the body); + /// `transmission_ms` spans that first poll until the body is exhausted. + /// + /// The numbers are an approximation: a poll only reflects when `hyper` + /// pulled the chunk into the network stack's buffer, not when the bytes + /// actually hit the wire. For bodies large enough to require several buffer + /// flushes that approximation is close enough to be useful. + pub struct Measured { + #[pin] + inner: S, + created_at: Instant, + first_polled_at: Option, + span: tracing::Span, + } +} + +impl Measured { + pub fn new(inner: S) -> Self { + Self { + inner, + created_at: Instant::now(), + first_polled_at: None, + span: tracing::Span::current(), + } + } +} + +impl Stream for Measured { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let first_polled_at = *this.first_polled_at.get_or_insert_with(Instant::now); + let poll = this.inner.poll_next(cx); + if matches!(poll, Poll::Ready(None)) { + let _span = this.span.enter(); + tracing::debug!( + to_transmission_start_ms = + first_polled_at.duration_since(*this.created_at).as_millis(), + transmission_ms = first_polled_at.elapsed().as_millis(), + "finished streaming http request body" + ); + } + poll + } +} + +#[cfg(test)] +mod tests { + use {super::*, bytes::Bytes, futures::StreamExt}; + + #[tokio::test] + async fn passes_items_through_unchanged() { + let inner = + futures::stream::iter(vec![Bytes::from_static(b"ab"), Bytes::from_static(b"cd")]); + let collected: Vec<_> = Measured::new(inner).collect().await; + assert_eq!( + collected, + vec![Bytes::from_static(b"ab"), Bytes::from_static(b"cd")] + ); + } +} diff --git a/crates/observe/src/lib.rs b/crates/observe/src/lib.rs index d847d75193..2564cbc932 100644 --- a/crates/observe/src/lib.rs +++ b/crates/observe/src/lib.rs @@ -6,6 +6,7 @@ pub mod event_bus; pub mod future; #[cfg(unix)] pub mod heap_dump_handler; +pub mod http_body; pub mod metrics; pub mod panic_hook; pub mod tracing; From 1eecefad86fc6be06ad4916be9abf6970f8755b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Mon, 29 Jun 2026 19:13:04 +0100 Subject: [PATCH 04/11] minimize comments --- .../autopilot/src/infra/solvers/dto/solve.rs | 5 +- .../driver/src/infra/solver/streaming/mod.rs | 56 +++++-------------- .../src/infra/solver/streaming/tee_writer.rs | 7 +-- crates/observe/src/http_body.rs | 18 ++---- crates/s3/src/lib.rs | 14 ++--- 5 files changed, 27 insertions(+), 73 deletions(-) diff --git a/crates/autopilot/src/infra/solvers/dto/solve.rs b/crates/autopilot/src/infra/solvers/dto/solve.rs index 45913038ad..94af9ada84 100644 --- a/crates/autopilot/src/infra/solvers/dto/solve.rs +++ b/crates/autopilot/src/infra/solvers/dto/solve.rs @@ -134,10 +134,7 @@ impl Request { impl InjectIntoHttpRequest for Request { fn inject(&self, request: RequestBuilder) -> RequestBuilder { - // Serve the already-serialized body as a single-chunk stream so it can - // be wrapped in `Measured` to time how long the body takes to transmit. - // The bytes are already in memory (`hyper`'s `Bytes` are reference - // counted), so there's nothing to gain from chunking. + // Wrap the in-memory body as a one-chunk stream so `Measured` can time it. let body = futures::stream::iter([Ok::<_, Infallible>(self.body.clone())]); let request = request .body(reqwest::Body::wrap_stream(Measured::new(body))) diff --git a/crates/driver/src/infra/solver/streaming/mod.rs b/crates/driver/src/infra/solver/streaming/mod.rs index e1fe31e1d4..f6ac031d76 100644 --- a/crates/driver/src/infra/solver/streaming/mod.rs +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -1,11 +1,5 @@ //! Streams a serializable value into a reqwest body so the full (multi-MB) //! payload is never held in memory at once. -//! -//! A [`BufWriter`] coalesces the serializer's many tiny writes into -//! `CHUNK_SIZE` blocks, which a [`TeeWriter`] then fans out to the body channel -//! and an optional secondary sink. Both entry points share one core -//! ([`stream_into`]) and differ only in that secondary: [`stream_body`] -//! discards it, [`stream_body_and_gzip`] captures a gzip copy for S3. mod tee_writer; @@ -22,15 +16,10 @@ use { tokio_stream::wrappers::ReceiverStream, }; -/// Block size the serializer's writes are coalesced into before being chunked -/// onto the body channel and fed to the gzip encoder. const CHUNK_SIZE: usize = 64 * 1024; -/// Blocks buffered in the body channel before the serializing thread is parked. -/// Two is enough for double buffering: the consumer can transmit one block -/// while the serializer produces the next. A larger value would only raise the -/// memory ceiling (~`CHANNEL_CAPACITY * CHUNK_SIZE`) without improving -/// throughput (the gzip archive copy, when enabled, is retained separately). +/// Two is enough for double buffering: the consumer transmits one block while +/// the serializer produces the next; more only raises the memory ceiling. const CHANNEL_CAPACITY: usize = 2; /// Serializes `value` to JSON on a blocking thread, streaming it into the @@ -42,9 +31,9 @@ where stream_into(value, std::io::sink()) } -/// Like [`stream_body`], but also gzips the same serialization for S3 in one -/// pass. The receiver yields the compressed bytes once serialization finishes, -/// or errors (sender dropped) if it aborts — leaving nothing to archive. +/// Like [`stream_body`], but also gzips the same serialization in one pass. The +/// receiver yields the compressed bytes when serialization finishes, or errors +/// if it aborts (nothing to archive). pub fn stream_body_and_gzip(value: T) -> (reqwest::Body, oneshot::Receiver) where T: serde::Serialize + Send + 'static, @@ -55,20 +44,16 @@ where } /// Serializes `value` into a tee of the body channel and `secondary` on a -/// blocking thread, then finalizes each sink. Shared by both entry points. +/// blocking thread, then finalizes each sink. fn stream_into(value: T, secondary: S) -> reqwest::Body where T: serde::Serialize + Send + 'static, S: Write + Finalize + Send + 'static, { let (tx, rx) = mpsc::channel::(CHANNEL_CAPACITY); - // The channel carries raw `Bytes`; `wrap_stream` wants a `TryStream`, so the - // chunks are wrapped into infallible `Ok`s at the boundary. `Measured` logs - // how long the solver took to read the body once it is fully drained. let stream = Measured::new(ReceiverStream::new(rx)).map(Ok::<_, Infallible>); let body = reqwest::Body::wrap_stream(stream); - // `spawn_blocking` doesn't inherit the caller's span, so carry it across so - // the diagnostics below keep the auction/request context. + // spawn_blocking loses the current span; carry it so the logs keep context. let span = tracing::Span::current(); tokio::task::spawn_blocking(move || { let _guard = span.enter(); @@ -78,8 +63,6 @@ where tracing::debug!(?err, "aborting streamed request body"); return; } - // `into_inner` flushes the final block into the tee; on error the body - // channel is dropped (truncating the request) and `secondary` with it. let tee = match writer.into_inner() { Ok(tee) => tee, Err(err) => { @@ -88,7 +71,7 @@ where } }; let (channel, secondary) = tee.into_parts(); - // Close the body so reqwest can finish; finalize the copy separately. + // Closing the sender ends the body; finalize the copy after. drop(channel); secondary.finalize(); }); @@ -96,17 +79,11 @@ where } /// A [`Write`] that sends each block to the body channel, blocking when it's -/// full so serialization can't outpace the request (backpressure). Dropping it -/// closes the channel, signalling the end of the body. +/// full so serialization can't outpace the request. Dropping it ends the body. struct ChannelWriter(mpsc::Sender); impl Write for ChannelWriter { fn write(&mut self, data: &[u8]) -> std::io::Result { - // `copy_from_slice` copies each block a second time (after `BufWriter`'s - // own buffering). If this ever shows up in a profile, a `BytesMut`-backed - // writer that chunks and `freeze()`s could hand the channel owned bytes - // without the copy — at the cost of reimplementing the coalescing - // `BufWriter` gives us for free. self.0 .blocking_send(Bytes::copy_from_slice(data)) .map_err(|_| { @@ -123,22 +100,18 @@ impl Write for ChannelWriter { } } -/// A consume-by-value cleanup for the tee's secondary sink, run after the full -/// body has been written (e.g. finishing a gzip stream and handing off the -/// result). [`stream_into`] invokes it only on success; an aborted -/// serialization drops the sink without calling it. +/// Cleanup for the tee's secondary sink once the body is fully written. Only +/// called on success; an aborted serialization drops the sink instead. trait Finalize { fn finalize(self); } -/// No-op secondary for [`stream_body`]. impl Finalize for std::io::Sink { fn finalize(self) {} } /// A [`Write`] sink that gzips into memory and delivers the bytes over a -/// oneshot when finalized. The gzip settings live in the `s3` crate so the -/// archived copy is compressed identically to the eager upload path. +/// oneshot when finalized. struct GzipCapture { writer: s3::GzipWriter, tx: oneshot::Sender, @@ -158,8 +131,7 @@ impl GzipCapture { } impl Finalize for GzipCapture { - /// Finishes the gzip stream and sends the bytes. A dropped receiver just - /// means archival was no longer wanted. + /// A dropped receiver just means the archive was no longer wanted. fn finalize(self) { match self.writer.finish() { Ok(compressed) => drop(self.tx.send(Bytes::from(compressed))), @@ -182,7 +154,6 @@ impl Write for GzipCapture { mod tests { use {super::*, flate2::read::GzDecoder, serde_json::json, std::io::Read}; - /// The streamed body must reassemble to exactly the serialized JSON. #[tokio::test] async fn channel_writer_forwards_all_bytes() { let data = vec![7u8; CHUNK_SIZE * 2 + 123]; @@ -203,7 +174,6 @@ mod tests { assert_eq!(reassembled, expected); } - /// The gzip capture must compress exactly the serialized JSON. #[tokio::test] async fn gzip_capture_matches_serialized_value() { let value = json!({ "a": 1, "b": [1, 2, 3], "c": "hello" }); diff --git a/crates/driver/src/infra/solver/streaming/tee_writer.rs b/crates/driver/src/infra/solver/streaming/tee_writer.rs index 137df1075b..8f5ed86e66 100644 --- a/crates/driver/src/infra/solver/streaming/tee_writer.rs +++ b/crates/driver/src/infra/solver/streaming/tee_writer.rs @@ -1,9 +1,8 @@ use std::io::Write; -/// A [`Write`] that fans every write out to two writers, so one serialization -/// pass feeds independent sinks (e.g. a request body and a gzip copy). A write -/// succeeds only if it succeeds on both. Named after the UNIX `tee` command, -/// which likewise duplicates its input to multiple outputs. +/// A [`Write`] that fans every write out to two writers (à la UNIX `tee`), so +/// one serialization pass feeds two sinks — a request body and a gzip copy. A +/// write succeeds only if it succeeds on both. pub(super) struct TeeWriter { primary: A, secondary: B, diff --git a/crates/observe/src/http_body.rs b/crates/observe/src/http_body.rs index 757baaaed0..c37297fe3e 100644 --- a/crates/observe/src/http_body.rs +++ b/crates/observe/src/http_body.rs @@ -11,19 +11,11 @@ use { }; pin_project! { - /// Wraps an HTTP request body stream and logs, once the body has been fully - /// drained, how long it took to hand off to the network. Useful to tell - /// whether a slow round-trip is dominated by us sending a large (multi-MB) - /// body or by the remote being slow to read it. - /// - /// `to_transmission_start_ms` is the gap between construction and the first - /// poll (how long until the HTTP client started reading the body); - /// `transmission_ms` spans that first poll until the body is exhausted. - /// - /// The numbers are an approximation: a poll only reflects when `hyper` - /// pulled the chunk into the network stack's buffer, not when the bytes - /// actually hit the wire. For bodies large enough to require several buffer - /// flushes that approximation is close enough to be useful. + /// Wraps an HTTP request body stream and, once it is fully drained, logs how + /// long transmission took: `to_transmission_start_ms` (construction until + /// the first poll, i.e. until the client started reading) and + /// `transmission_ms` (first poll until exhausted). Approximate — a poll + /// reflects when `hyper` buffered the chunk, not when it hit the wire. pub struct Measured { #[pin] inner: S, diff --git a/crates/s3/src/lib.rs b/crates/s3/src/lib.rs index be98da0611..33d4360f44 100644 --- a/crates/s3/src/lib.rs +++ b/crates/s3/src/lib.rs @@ -12,16 +12,13 @@ use { std::io::{Read, Write}, }; -/// gzip compression level used for every object archived to S3. Centralized so -/// the eager [`Uploader::upload`] path and callers that compress while -/// streaming the same bytes elsewhere produce identically-compressed objects. +/// gzip level for every object archived to S3, so the eager and streaming +/// compression paths produce identical output. const COMPRESSION_LEVEL: u32 = 3; -/// A streaming gzip sink: write the plaintext JSON in, then -/// [`GzipWriter::finish`] to get the compressed bytes. Lets callers that stream -/// a payload somewhere else compress the same bytes in one pass — with the -/// exact settings the eager upload path uses — without ever materializing the -/// full plaintext here. +/// A streaming gzip sink: write plaintext in, then [`GzipWriter::finish`] for +/// the compressed bytes. Lets callers compress while streaming a payload +/// elsewhere, without materializing the full plaintext. pub struct GzipWriter(write::GzEncoder>); impl GzipWriter { @@ -32,7 +29,6 @@ impl GzipWriter { )) } - /// Finishes the gzip stream and returns the compressed bytes. pub fn finish(self) -> std::io::Result> { self.0.finish() } From 6e061c68d080a2612c891a7aebbe6aa105af1e44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Tue, 30 Jun 2026 09:42:26 +0100 Subject: [PATCH 05/11] rollback the bytestream change --- .../src/infra/solvers/byte_stream.rs | 94 +++++++++++++++++++ .../autopilot/src/infra/solvers/dto/solve.rs | 10 +- crates/autopilot/src/infra/solvers/mod.rs | 1 + .../driver/src/infra/solver/streaming/mod.rs | 4 +- crates/observe/src/http_body.rs | 73 -------------- crates/observe/src/lib.rs | 1 - 6 files changed, 100 insertions(+), 83 deletions(-) create mode 100644 crates/autopilot/src/infra/solvers/byte_stream.rs delete mode 100644 crates/observe/src/http_body.rs diff --git a/crates/autopilot/src/infra/solvers/byte_stream.rs b/crates/autopilot/src/infra/solvers/byte_stream.rs new file mode 100644 index 0000000000..b36f77ce60 --- /dev/null +++ b/crates/autopilot/src/infra/solvers/byte_stream.rs @@ -0,0 +1,94 @@ +use { + bytes::Bytes, + futures::Stream, + std::{task::Poll, time::Instant}, +}; + +/// Thin wrapper around a payload that already exists fully serialized +/// in-memory. The purpose of converting that into a stream is to allow +/// measuring the data transfer of the request body (for debugging and +/// optimization purposes). +/// +/// Note that this measurement is only an approximation of the truth. +/// The reason is that this only measures how long it takes `hyper` +/// to load the last byte from the body into the buffer of the network +/// stack. However, given how big `/solve` requests are in practice +/// `hyper` should have to flush the buffer a couple of times so the +/// measured time should be reasonably accurate. +pub struct ByteStream { + data: Bytes, + created_at: Instant, + first_polled_at: Option, + span: tracing::Span, +} + +impl ByteStream { + pub fn new(data: Bytes) -> Self { + Self { + data, + created_at: Instant::now(), + first_polled_at: None, + span: tracing::Span::current(), + } + } +} + +// Since `hyper` uses `Bytes` under the hood which are reference counted +// the chunks we yield can be as big as we want. To minimize overhead +// that's only there for debugging purposes we always yield all the +// data at once. The measurements will still be accurate because `hyper` +// has to poll the stream once more to confirm that it's actually +// exhausted. +impl Stream for ByteStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.as_mut().get_mut(); + + if this.first_polled_at.is_none() { + this.first_polled_at = Some(Instant::now()); + } + + if this.data.is_empty() { + let _span = this.span.enter(); + let first_poll = this.first_polled_at.expect("initialized at first poll"); + tracing::debug!( + to_transmission_start_ms = first_poll.duration_since(this.created_at).as_millis(), + transmission_ms = first_poll.elapsed().as_millis(), + "finished streaming http request body" + ); + Poll::Ready(None) + } else { + // steals all the data and leaves 0 bytes in self + let chunk = std::mem::take(&mut this.data); + Poll::Ready(Some(Ok(chunk))) + } + } +} + +#[cfg(test)] +mod tests { + use {super::*, futures::FutureExt, tokio_stream::StreamExt}; + + #[test] + fn byte_stream_yields_bytes() { + let original = Bytes::from_iter(0..100); + let mut stream = ByteStream::new(original.clone()); + + let chunk = stream + .next() + .now_or_never() + .expect("stream is always ready"); + // stream always yields all the data on the first poll + assert_eq!(chunk.unwrap().unwrap(), original); + + let chunk = stream + .next() + .now_or_never() + .expect("stream is always ready"); + assert!(chunk.is_none()); + } +} diff --git a/crates/autopilot/src/infra/solvers/dto/solve.rs b/crates/autopilot/src/infra/solvers/dto/solve.rs index 94af9ada84..a1bdfc36f8 100644 --- a/crates/autopilot/src/infra/solvers/dto/solve.rs +++ b/crates/autopilot/src/infra/solvers/dto/solve.rs @@ -4,7 +4,7 @@ use { domain, infra::{ persistence::dto::{self, order::Order}, - solvers::InjectIntoHttpRequest, + solvers::{InjectIntoHttpRequest, byte_stream::ByteStream}, }, }, alloy::primitives::{Address, U256}, @@ -14,14 +14,12 @@ use { eth_domain_types as eth, itertools::Itertools, number::serialization::HexOrDecimalU256, - observe::http_body::Measured, reqwest::{RequestBuilder, header::HeaderValue}, serde::{Deserialize, Serialize}, serde_with::{DisplayFromStr, serde_as}, std::{ borrow::Cow, collections::{HashMap, HashSet}, - convert::Infallible, io::Write, time::Duration, }, @@ -134,10 +132,10 @@ impl Request { impl InjectIntoHttpRequest for Request { fn inject(&self, request: RequestBuilder) -> RequestBuilder { - // Wrap the in-memory body as a one-chunk stream so `Measured` can time it. - let body = futures::stream::iter([Ok::<_, Infallible>(self.body.clone())]); let request = request - .body(reqwest::Body::wrap_stream(Measured::new(body))) + .body(reqwest::Body::wrap_stream(ByteStream::new( + self.body.clone(), + ))) // announce which auction this request is for in the // headers to help the driver detect duplicated // `/solve` requests before streaming the body diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index b0ff33741a..6efb9bfd4f 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -13,6 +13,7 @@ use { url::Url, }; +mod byte_stream; pub mod dto; const RESPONSE_SIZE_LIMIT: usize = 10_000_000; diff --git a/crates/driver/src/infra/solver/streaming/mod.rs b/crates/driver/src/infra/solver/streaming/mod.rs index f6ac031d76..5eaaafd4ef 100644 --- a/crates/driver/src/infra/solver/streaming/mod.rs +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -6,7 +6,6 @@ mod tee_writer; use { bytes::Bytes, futures::StreamExt, - observe::http_body::Measured, std::{ convert::Infallible, io::{BufWriter, Write}, @@ -51,8 +50,7 @@ where S: Write + Finalize + Send + 'static, { let (tx, rx) = mpsc::channel::(CHANNEL_CAPACITY); - let stream = Measured::new(ReceiverStream::new(rx)).map(Ok::<_, Infallible>); - let body = reqwest::Body::wrap_stream(stream); + let body = reqwest::Body::wrap_stream(ReceiverStream::new(rx).map(Ok::<_, Infallible>)); // spawn_blocking loses the current span; carry it so the logs keep context. let span = tracing::Span::current(); tokio::task::spawn_blocking(move || { diff --git a/crates/observe/src/http_body.rs b/crates/observe/src/http_body.rs deleted file mode 100644 index c37297fe3e..0000000000 --- a/crates/observe/src/http_body.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! Instrumentation for outgoing HTTP request bodies. - -use { - futures::Stream, - pin_project_lite::pin_project, - std::{ - pin::Pin, - task::{Context, Poll}, - time::Instant, - }, -}; - -pin_project! { - /// Wraps an HTTP request body stream and, once it is fully drained, logs how - /// long transmission took: `to_transmission_start_ms` (construction until - /// the first poll, i.e. until the client started reading) and - /// `transmission_ms` (first poll until exhausted). Approximate — a poll - /// reflects when `hyper` buffered the chunk, not when it hit the wire. - pub struct Measured { - #[pin] - inner: S, - created_at: Instant, - first_polled_at: Option, - span: tracing::Span, - } -} - -impl Measured { - pub fn new(inner: S) -> Self { - Self { - inner, - created_at: Instant::now(), - first_polled_at: None, - span: tracing::Span::current(), - } - } -} - -impl Stream for Measured { - type Item = S::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let first_polled_at = *this.first_polled_at.get_or_insert_with(Instant::now); - let poll = this.inner.poll_next(cx); - if matches!(poll, Poll::Ready(None)) { - let _span = this.span.enter(); - tracing::debug!( - to_transmission_start_ms = - first_polled_at.duration_since(*this.created_at).as_millis(), - transmission_ms = first_polled_at.elapsed().as_millis(), - "finished streaming http request body" - ); - } - poll - } -} - -#[cfg(test)] -mod tests { - use {super::*, bytes::Bytes, futures::StreamExt}; - - #[tokio::test] - async fn passes_items_through_unchanged() { - let inner = - futures::stream::iter(vec![Bytes::from_static(b"ab"), Bytes::from_static(b"cd")]); - let collected: Vec<_> = Measured::new(inner).collect().await; - assert_eq!( - collected, - vec![Bytes::from_static(b"ab"), Bytes::from_static(b"cd")] - ); - } -} diff --git a/crates/observe/src/lib.rs b/crates/observe/src/lib.rs index 2564cbc932..d847d75193 100644 --- a/crates/observe/src/lib.rs +++ b/crates/observe/src/lib.rs @@ -6,7 +6,6 @@ pub mod event_bus; pub mod future; #[cfg(unix)] pub mod heap_dump_handler; -pub mod http_body; pub mod metrics; pub mod panic_hook; pub mod tracing; From 5bdd45f3963156adee7fb3b3ca6fbebbf9316875 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Tue, 30 Jun 2026 11:20:41 +0100 Subject: [PATCH 06/11] finalize for tee --- crates/driver/src/infra/persistence/mod.rs | 6 ++---- .../driver/src/infra/solver/streaming/mod.rs | 21 +++++++++---------- .../src/infra/solver/streaming/tee_writer.rs | 11 +++++----- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/crates/driver/src/infra/persistence/mod.rs b/crates/driver/src/infra/persistence/mod.rs index 234b22ed81..9c7c510157 100644 --- a/crates/driver/src/infra/persistence/mod.rs +++ b/crates/driver/src/infra/persistence/mod.rs @@ -59,10 +59,8 @@ impl Persistence { } /// Archives the auction body to S3 (fire and forget). The body is - /// gzip-compressed while it is streamed to the solver, so the full - /// uncompressed JSON is never held in memory; the compressed bytes arrive - /// through `compressed` once serialization finishes. An error on the - /// receiver means the request was aborted, so there is nothing to archive. + /// gzip-compressed while streaming to the solver; the compressed bytes + /// arrive through `compressed` once serialization finishes. pub fn archive_auction_gzipped(&self, auction_id: Id, compressed: oneshot::Receiver) { let Some(uploader) = self.s3.clone() else { return; diff --git a/crates/driver/src/infra/solver/streaming/mod.rs b/crates/driver/src/infra/solver/streaming/mod.rs index 5eaaafd4ef..22a80a2293 100644 --- a/crates/driver/src/infra/solver/streaming/mod.rs +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -1,6 +1,3 @@ -//! Streams a serializable value into a reqwest body so the full (multi-MB) -//! payload is never held in memory at once. - mod tee_writer; use { @@ -31,8 +28,7 @@ where } /// Like [`stream_body`], but also gzips the same serialization in one pass. The -/// receiver yields the compressed bytes when serialization finishes, or errors -/// if it aborts (nothing to archive). +/// receiver yields the compressed bytes once serialization finishes. pub fn stream_body_and_gzip(value: T) -> (reqwest::Body, oneshot::Receiver) where T: serde::Serialize + Send + 'static, @@ -68,10 +64,7 @@ where return; } }; - let (channel, secondary) = tee.into_parts(); - // Closing the sender ends the body; finalize the copy after. - drop(channel); - secondary.finalize(); + tee.finalize(); }); body } @@ -98,8 +91,9 @@ impl Write for ChannelWriter { } } -/// Cleanup for the tee's secondary sink once the body is fully written. Only -/// called on success; an aborted serialization drops the sink instead. +/// Consumes a sink once the body is fully written, running any cleanup (e.g. +/// finishing a gzip stream). Only called on success; an aborted serialization +/// drops the sink instead. trait Finalize { fn finalize(self); } @@ -108,6 +102,11 @@ impl Finalize for std::io::Sink { fn finalize(self) {} } +impl Finalize for ChannelWriter { + /// Dropping the sender closes the channel, ending the request body. + fn finalize(self) {} +} + /// A [`Write`] sink that gzips into memory and delivers the bytes over a /// oneshot when finalized. struct GzipCapture { diff --git a/crates/driver/src/infra/solver/streaming/tee_writer.rs b/crates/driver/src/infra/solver/streaming/tee_writer.rs index 8f5ed86e66..b2392f0483 100644 --- a/crates/driver/src/infra/solver/streaming/tee_writer.rs +++ b/crates/driver/src/infra/solver/streaming/tee_writer.rs @@ -1,4 +1,4 @@ -use std::io::Write; +use {super::Finalize, std::io::Write}; /// A [`Write`] that fans every write out to two writers (à la UNIX `tee`), so /// one serialization pass feeds two sinks — a request body and a gzip copy. A @@ -12,11 +12,12 @@ impl TeeWriter { pub(super) fn new(primary: A, secondary: B) -> Self { Self { primary, secondary } } +} - /// Splits the tee back into its two sinks so each can be finalized on its - /// own. - pub(super) fn into_parts(self) -> (A, B) { - (self.primary, self.secondary) +impl Finalize for TeeWriter { + fn finalize(self) { + self.primary.finalize(); + self.secondary.finalize(); } } From f478b5f84644cd1d380ac35bd944373da75353c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Tue, 30 Jun 2026 12:03:33 +0100 Subject: [PATCH 07/11] clean up --- crates/driver/src/infra/solver/streaming/mod.rs | 4 +++- crates/s3/src/lib.rs | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/driver/src/infra/solver/streaming/mod.rs b/crates/driver/src/infra/solver/streaming/mod.rs index 22a80a2293..7c05c80f84 100644 --- a/crates/driver/src/infra/solver/streaming/mod.rs +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -131,7 +131,9 @@ impl Finalize for GzipCapture { /// A dropped receiver just means the archive was no longer wanted. fn finalize(self) { match self.writer.finish() { - Ok(compressed) => drop(self.tx.send(Bytes::from(compressed))), + Ok(compressed) => { + let _ = self.tx.send(Bytes::from(compressed)); + } Err(err) => tracing::debug!(?err, "gzip of archived request body failed"), } } diff --git a/crates/s3/src/lib.rs b/crates/s3/src/lib.rs index 33d4360f44..544b79a942 100644 --- a/crates/s3/src/lib.rs +++ b/crates/s3/src/lib.rs @@ -100,9 +100,9 @@ impl Uploader { } /// Uploads bytes that are already gzip-compressed JSON, tagging the object - /// with `Content-Encoding: gzip`. Lets callers that compress while - /// streaming the data elsewhere avoid ever holding the full - /// uncompressed JSON here. + /// with `Content-Encoding: gzip`. + // NOTE: PUT's the whole gzipped blob, if the gzipped JSON is larger than 5MB + // it might be worth it to consider multipart uploads as it's the minimium S3 part size pub async fn upload_gzipped(&self, id: String, compressed: Bytes) -> Result { let key = std::path::Path::new(&self.filename_prefix) .join(format!("{id}.json")) From b4aef68e5f90977aefd9f75658e8c956e64f1f4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Tue, 30 Jun 2026 12:31:09 +0100 Subject: [PATCH 08/11] fmt --- crates/s3/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/s3/src/lib.rs b/crates/s3/src/lib.rs index 544b79a942..d5ad7d2162 100644 --- a/crates/s3/src/lib.rs +++ b/crates/s3/src/lib.rs @@ -102,7 +102,8 @@ impl Uploader { /// Uploads bytes that are already gzip-compressed JSON, tagging the object /// with `Content-Encoding: gzip`. // NOTE: PUT's the whole gzipped blob, if the gzipped JSON is larger than 5MB - // it might be worth it to consider multipart uploads as it's the minimium S3 part size + // it might be worth it to consider multipart uploads as it's the minimium S3 + // part size pub async fn upload_gzipped(&self, id: String, compressed: Bytes) -> Result { let key = std::path::Path::new(&self.filename_prefix) .join(format!("{id}.json")) From 779134fa1ba74eb9c181fea8d8e248446f3aaeb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Tue, 30 Jun 2026 17:35:18 +0100 Subject: [PATCH 09/11] BestEffortSink to swallow errors, avoiding tee from failing --- .../solver/streaming/best_effort_sink.rs | 67 +++++++++++++++++++ .../driver/src/infra/solver/streaming/mod.rs | 36 ++++++++-- .../src/infra/solver/streaming/tee_writer.rs | 3 +- 3 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 crates/driver/src/infra/solver/streaming/best_effort_sink.rs diff --git a/crates/driver/src/infra/solver/streaming/best_effort_sink.rs b/crates/driver/src/infra/solver/streaming/best_effort_sink.rs new file mode 100644 index 0000000000..18ee19542a --- /dev/null +++ b/crates/driver/src/infra/solver/streaming/best_effort_sink.rs @@ -0,0 +1,67 @@ +use {super::Finalize, std::io::Write}; + +/// A [`Write`] adapter that makes its inner writer best-effort: on the first +/// error it logs once and drops the inner, after which writes are accepted as +/// no-ops. Lets a non-critical sink fall out of a tee without aborting the +/// sinks that must finish. +pub(super) struct BestEffortSink(Option); + +impl BestEffortSink { + pub(super) fn new(inner: W) -> Self { + Self(Some(inner)) + } +} + +impl Write for BestEffortSink { + fn write(&mut self, data: &[u8]) -> std::io::Result { + if let Some(inner) = &mut self.0 + && let Err(err) = inner.write_all(data) + { + // The sink was declared best-effort, so its failure is non-critical: + // log it, stop writing to it, and let the remaining sinks carry on. + tracing::debug!(?err, "best-effort sink failed; dropping it"); + self.0 = None; + } + Ok(data.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + if let Some(inner) = &mut self.0 { + let _ = inner.flush(); + } + Ok(()) + } +} + +impl Finalize for BestEffortSink { + fn finalize(self) { + if let Some(inner) = self.0 { + inner.finalize(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// A failing inner must not surface as an error to the caller. + #[test] + fn swallows_inner_failure() { + struct Failing; + impl Write for Failing { + fn write(&mut self, _: &[u8]) -> std::io::Result { + Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "gone")) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + let mut writer = BestEffortSink::new(Failing); + assert_eq!(writer.write(b"hello").unwrap(), 5); + assert_eq!(writer.write(b"world").unwrap(), 5); + writer.flush().unwrap(); + } +} diff --git a/crates/driver/src/infra/solver/streaming/mod.rs b/crates/driver/src/infra/solver/streaming/mod.rs index 7c05c80f84..efbe48c983 100644 --- a/crates/driver/src/infra/solver/streaming/mod.rs +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -1,6 +1,8 @@ +mod best_effort_sink; mod tee_writer; use { + best_effort_sink::BestEffortSink, bytes::Bytes, futures::StreamExt, std::{ @@ -39,7 +41,9 @@ where } /// Serializes `value` into a tee of the body channel and `secondary` on a -/// blocking thread, then finalizes each sink. +/// blocking thread, then finalizes each sink. Serialization always runs to +/// completion even if the request receiver drops, so the secondary sink (the +/// gzip archive) is captured regardless of the request outcome. fn stream_into(value: T, secondary: S) -> reqwest::Body where T: serde::Serialize + Send + 'static, @@ -51,16 +55,20 @@ where let span = tracing::Span::current(); tokio::task::spawn_blocking(move || { let _guard = span.enter(); - let mut writer = - BufWriter::with_capacity(CHUNK_SIZE, TeeWriter::new(ChannelWriter(tx), secondary)); + // Make the request channel best-effort: if it fails (the solver + // connection went away), serialization keeps feeding the secondary sink + // so the archive still completes. Any error left here is a genuine + // serialization or gzip failure. + let channel = BestEffortSink::new(ChannelWriter(tx)); + let mut writer = BufWriter::with_capacity(CHUNK_SIZE, TeeWriter::new(channel, secondary)); if let Err(err) = serde_json::to_writer(&mut writer, &value) { - tracing::debug!(?err, "aborting streamed request body"); + tracing::warn!(?err, "serializing streamed request body failed"); return; } let tee = match writer.into_inner() { Ok(tee) => tee, Err(err) => { - tracing::debug!(err = ?err.error(), "flushing streamed request body failed"); + tracing::warn!(err = ?err.error(), "flushing streamed request body failed"); return; } }; @@ -71,6 +79,9 @@ where /// A [`Write`] that sends each block to the body channel, blocking when it's /// full so serialization can't outpace the request. Dropping it ends the body. +/// A dropped receiver (the solver connection went away) surfaces as an error; +/// wrap it in [`BestEffortSink`] to keep serializing the archive past that +/// point. struct ChannelWriter(mpsc::Sender); impl Write for ChannelWriter { @@ -187,4 +198,19 @@ mod tests { .unwrap(); assert_eq!(decoded, serde_json::to_vec(&value).unwrap()); } + + #[tokio::test] + async fn gzip_captured_even_if_request_body_dropped() { + let value = json!({ "a": 1, "b": [1, 2, 3], "c": "hello" }); + let (body, gzip_rx) = stream_body_and_gzip(value.clone()); + // The solver connection going away mid-stream must not skip the archive. + drop(body); + + let compressed = gzip_rx.await.unwrap(); + let mut decoded = Vec::new(); + GzDecoder::new(&compressed[..]) + .read_to_end(&mut decoded) + .unwrap(); + assert_eq!(decoded, serde_json::to_vec(&value).unwrap()); + } } diff --git a/crates/driver/src/infra/solver/streaming/tee_writer.rs b/crates/driver/src/infra/solver/streaming/tee_writer.rs index b2392f0483..541bcc70a3 100644 --- a/crates/driver/src/infra/solver/streaming/tee_writer.rs +++ b/crates/driver/src/infra/solver/streaming/tee_writer.rs @@ -2,7 +2,8 @@ use {super::Finalize, std::io::Write}; /// A [`Write`] that fans every write out to two writers (à la UNIX `tee`), so /// one serialization pass feeds two sinks — a request body and a gzip copy. A -/// write succeeds only if it succeeds on both. +/// write succeeds only if it succeeds on both; wrap a sink to make it +/// best-effort if its failure shouldn't abort the other. pub(super) struct TeeWriter { primary: A, secondary: B, From 760cd19ffbb1e42735992a5dd26e8065cdc88022 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Wed, 1 Jul 2026 10:31:44 +0100 Subject: [PATCH 10/11] Handle serialize_request metrics --- crates/driver/src/infra/solver/mod.rs | 14 +------------- crates/driver/src/infra/solver/streaming/mod.rs | 13 +++++++++---- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index c4b9558790..2a8f06f7df 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -32,10 +32,7 @@ use { num::BigRational, observe::tracing::distributed::headers::tracing_headers, reqwest::header::HeaderName, - std::{ - collections::HashMap, - time::{Duration, Instant}, - }, + std::{collections::HashMap, time::Duration}, thiserror::Error, tracing::{Instrument, instrument}, }; @@ -368,8 +365,6 @@ impl Solver { auction: &Auction, liquidity: &[liquidity::Liquidity], ) -> Result, Error> { - let start = Instant::now(); - let flashloan_hints = self.assemble_flashloan_hints(auction); let wrappers = self.assemble_wrappers(auction); @@ -407,13 +402,6 @@ impl Solver { } None => streaming::stream_body(auction_dto), }; - if auction.id().is_some() { - ::observe::metrics::metrics().measure_auction_overhead( - start, - "driver", - "serialize_request", - ); - } let timeout = match auction.deadline(self.timeouts()).solvers().remaining() { Ok(timeout) => timeout, diff --git a/crates/driver/src/infra/solver/streaming/mod.rs b/crates/driver/src/infra/solver/streaming/mod.rs index efbe48c983..29f257a460 100644 --- a/crates/driver/src/infra/solver/streaming/mod.rs +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -55,10 +55,13 @@ where let span = tracing::Span::current(); tokio::task::spawn_blocking(move || { let _guard = span.enter(); - // Make the request channel best-effort: if it fails (the solver - // connection went away), serialization keeps feeding the secondary sink - // so the archive still completes. Any error left here is a genuine - // serialization or gzip failure. + // Covers serialization *and* socket transmission (since network "pulls" the + // serialization along). Kept as `serialize_request` phase to avoid breaking + // Grafana dashboards. + let start = std::time::Instant::now(); + + // Best effort channel so if sending the JSON to solver fails, + // we can still upload it to S3 let channel = BestEffortSink::new(ChannelWriter(tx)); let mut writer = BufWriter::with_capacity(CHUNK_SIZE, TeeWriter::new(channel, secondary)); if let Err(err) = serde_json::to_writer(&mut writer, &value) { @@ -72,6 +75,8 @@ where return; } }; + // Measure before `finalize` so the gzip finish cost stays out of the metric. + observe::metrics::metrics().measure_auction_overhead(start, "driver", "serialize_request"); tee.finalize(); }); body From dead2097a39c3fdcf858943b7aea38100a7488eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Wed, 1 Jul 2026 13:52:42 +0100 Subject: [PATCH 11/11] address comments --- crates/driver/Cargo.toml | 2 +- crates/driver/src/infra/solver/streaming/mod.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index d710682cec..2b20a15263 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -38,7 +38,6 @@ derive_more = { workspace = true } eth-domain-types = { workspace = true } ethrpc = { workspace = true } event-indexing = { workspace = true } -flate2 = { workspace = true } futures = { workspace = true } gas-price-estimation = { workspace = true } hex-literal = { workspace = true } @@ -94,6 +93,7 @@ alloy = { workspace = true, features = ["signer-mnemonic"] } app-data = { workspace = true, features = ["test_helpers"] } contracts = { workspace = true } ethrpc = { workspace = true, features = ["test-util"] } +flate2 = { workspace = true } maplit = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["process", "test-util"] } diff --git a/crates/driver/src/infra/solver/streaming/mod.rs b/crates/driver/src/infra/solver/streaming/mod.rs index 29f257a460..18975804a0 100644 --- a/crates/driver/src/infra/solver/streaming/mod.rs +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -102,6 +102,8 @@ impl Write for ChannelWriter { Ok(data.len()) } + /// `write` sends each block downstream immediately, so a write effectively + /// always flushes and there's nothing buffered left to do here. fn flush(&mut self) -> std::io::Result<()> { Ok(()) }