diff --git a/Cargo.lock b/Cargo.lock index cd11c452e8..50d5907769 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3761,7 +3761,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.114", ] [[package]] @@ -3938,6 +3938,7 @@ dependencies = [ "eth-domain-types", "ethrpc", "event-indexing", + "flate2", "futures", "gas-price-estimation", "hex-literal", @@ -3974,6 +3975,7 @@ dependencies = [ "tikv-jemallocator", "token-info", "tokio", + "tokio-stream", "toml", "tower 0.5.3", "tower-http", diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index a79476d896..2b20a15263 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -55,7 +55,7 @@ prometheus = { workspace = true } prometheus-metric-storage = { workspace = true } rand = { workspace = true } request-sharing = { workspace = true } -reqwest = { workspace = true, features = ["query"] } +reqwest = { workspace = true, features = ["query", "stream"] } s3 = { workspace = true } serde = { workspace = true, features = ["derive"] } serde-ext = { workspace = true } @@ -67,6 +67,7 @@ thiserror = { workspace = true } tikv-jemallocator = { workspace = true } token-info = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "time"] } +tokio-stream = { workspace = true } toml = { workspace = true } tower = { workspace = true } tower-http = { workspace = true, features = ["decompression-br", "limit", "trace"] } @@ -92,6 +93,7 @@ alloy = { workspace = true, features = ["signer-mnemonic"] } app-data = { workspace = true, features = ["test_helpers"] } contracts = { workspace = true } ethrpc = { workspace = true, features = ["test-util"] } +flate2 = { workspace = true } maplit = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["process", "test-util"] } diff --git a/crates/driver/src/infra/persistence/mod.rs b/crates/driver/src/infra/persistence/mod.rs index c03a3e0840..9c7c510157 100644 --- a/crates/driver/src/infra/persistence/mod.rs +++ b/crates/driver/src/infra/persistence/mod.rs @@ -5,6 +5,7 @@ use { }, bytes::Bytes, std::sync::Arc, + tokio::sync::oneshot, tracing::Instrument, }; @@ -52,16 +53,25 @@ impl Persistence { } } - /// Saves the given auction with liquidity with fire and forget mentality - /// (non-blocking operation) - pub fn archive_auction(&self, auction_id: Id, body: Bytes) { + /// Whether auction archival to S3 is configured. + pub fn archives_enabled(&self) -> bool { + self.s3.is_some() + } + + /// Archives the auction body to S3 (fire and forget). The body is + /// gzip-compressed while streaming to the solver; the compressed bytes + /// arrive through `compressed` once serialization finishes. + pub fn archive_auction_gzipped(&self, auction_id: Id, compressed: oneshot::Receiver) { let Some(uploader) = self.s3.clone() else { return; }; tokio::spawn( async move { + let Ok(compressed) = compressed.await else { + return; + }; match uploader - .upload_json_bytes(auction_id.to_string(), body) + .upload_gzipped(auction_id.to_string(), compressed) .await { Ok(key) => { diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index d11982a098..2a8f06f7df 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -27,23 +27,19 @@ use { signers::{Signature, aws::AwsSigner, local::PrivateKeySigner}, }, anyhow::Result, - bytes::Bytes, derive_more::{From, Into}, eth_domain_types as eth, num::BigRational, observe::tracing::distributed::headers::tracing_headers, reqwest::header::HeaderName, - std::{ - collections::HashMap, - sync::atomic::{AtomicUsize, Ordering}, - time::{Duration, Instant}, - }, + std::{collections::HashMap, time::Duration}, thiserror::Error, tracing::{Instrument, instrument}, }; pub mod dto; pub mod eip7702; +mod streaming; // TODO At some point I should be checking that the names are unique, I don't // think I'm doing that. @@ -369,8 +365,6 @@ impl Solver { auction: &Auction, liquidity: &[liquidity::Liquidity], ) -> Result, Error> { - let start = Instant::now(); - let flashloan_hints = self.assemble_flashloan_hints(auction); let wrappers = self.assemble_wrappers(auction); @@ -389,21 +383,26 @@ impl Solver { self.config.haircut_bps, ); - let body = serialize_body(auction_dto); + let url = shared::url::join(&self.config.endpoint, "solve"); - if let Some(id) = auction.id() { - // Only auctions with IDs are real auctions (/quote requests don't have an ID). - // Only for those it makes sense to archive them and measure the execution time. - self.persistence.archive_auction(id, body.clone()); - ::observe::metrics::metrics().measure_auction_overhead( - start, - "driver", - "serialize_request", - ); - } + // Real auctions (those with an ID) are archived to S3; quotes aren't, so + // they skip the gzip capture entirely and just stream the body. + let archive_id = self + .persistence + .archives_enabled() + .then(|| auction.id()) + .flatten(); + let body: reqwest::Body = match archive_id { + // Stream the request body while capturing a gzipped copy for S3, so + // neither the request nor the archive holds the full JSON at once. + Some(id) => { + let (body, compressed) = streaming::stream_body_and_gzip(auction_dto); + self.persistence.archive_auction_gzipped(id, compressed); + body + } + None => streaming::stream_body(auction_dto), + }; - let url = shared::url::join(&self.config.endpoint, "solve"); - super::observe::solver_request(&url, &body); let timeout = match auction.deadline(self.timeouts()).solvers().remaining() { Ok(timeout) => timeout, Err(_) => { @@ -543,45 +542,6 @@ impl Solver { } } -/// Serializes the request body in a way that avoid re-allocating while not -/// overallocating a lot of memory. It does that by keeping track of the biggest -/// request seen for each kind of request (quote with/without liquidity, full -/// auction with/without liquidity) and allocating the correct amount of memory -/// based on the data in the auction. -fn serialize_body(auction_dto: solvers_dto::auction::Auction) -> Bytes { - // these values store the biggest allocation we needed for each - // category of requests - static QUOTE_WITH_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000); - static QUOTE_WITHOUT_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000); - static AUCTION_WITH_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000); - static AUCTION_WITHOUT_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000); - - // based on the "shape" of the auction we pick the allocation size - let is_auction = auction_dto.id.is_some(); - let with_liquidity = !auction_dto.liquidity.is_empty(); - let memory_target = match (is_auction, with_liquidity) { - (false, false) => "E_WITHOUT_LIQUIDITY, - (false, true) => "E_WITH_LIQUIDITY, - (true, false) => &AUCTION_WITHOUT_LIQUIDITY, - (true, true) => &AUCTION_WITH_LIQUIDITY, - }; - - // pre-allocate biggest request size + 0.5% (to avoid re-allocations when - // the request grows only slightly) - let pre_alloc_size = memory_target.load(Ordering::Relaxed); - let pre_alloc_size = pre_alloc_size + pre_alloc_size * 5 / 1_000; - let mut buffer = Vec::with_capacity(pre_alloc_size); - - serde_json::to_writer(&mut buffer, &auction_dto).unwrap(); - - // now that we know how much memory was actually needed we update the memory - // targets - memory_target.fetch_max(buffer.len(), Ordering::Relaxed); - tracing::trace!(pre_alloc_size, final_size = buffer.len(), "body allocation"); - - Bytes::from(buffer) -} - #[cfg(test)] mod tests { use { diff --git a/crates/driver/src/infra/solver/streaming/best_effort_sink.rs b/crates/driver/src/infra/solver/streaming/best_effort_sink.rs new file mode 100644 index 0000000000..18ee19542a --- /dev/null +++ b/crates/driver/src/infra/solver/streaming/best_effort_sink.rs @@ -0,0 +1,67 @@ +use {super::Finalize, std::io::Write}; + +/// A [`Write`] adapter that makes its inner writer best-effort: on the first +/// error it logs once and drops the inner, after which writes are accepted as +/// no-ops. Lets a non-critical sink fall out of a tee without aborting the +/// sinks that must finish. +pub(super) struct BestEffortSink(Option); + +impl BestEffortSink { + pub(super) fn new(inner: W) -> Self { + Self(Some(inner)) + } +} + +impl Write for BestEffortSink { + fn write(&mut self, data: &[u8]) -> std::io::Result { + if let Some(inner) = &mut self.0 + && let Err(err) = inner.write_all(data) + { + // The sink was declared best-effort, so its failure is non-critical: + // log it, stop writing to it, and let the remaining sinks carry on. + tracing::debug!(?err, "best-effort sink failed; dropping it"); + self.0 = None; + } + Ok(data.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + if let Some(inner) = &mut self.0 { + let _ = inner.flush(); + } + Ok(()) + } +} + +impl Finalize for BestEffortSink { + fn finalize(self) { + if let Some(inner) = self.0 { + inner.finalize(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// A failing inner must not surface as an error to the caller. + #[test] + fn swallows_inner_failure() { + struct Failing; + impl Write for Failing { + fn write(&mut self, _: &[u8]) -> std::io::Result { + Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "gone")) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + let mut writer = BestEffortSink::new(Failing); + assert_eq!(writer.write(b"hello").unwrap(), 5); + assert_eq!(writer.write(b"world").unwrap(), 5); + writer.flush().unwrap(); + } +} diff --git a/crates/driver/src/infra/solver/streaming/mod.rs b/crates/driver/src/infra/solver/streaming/mod.rs new file mode 100644 index 0000000000..18975804a0 --- /dev/null +++ b/crates/driver/src/infra/solver/streaming/mod.rs @@ -0,0 +1,223 @@ +mod best_effort_sink; +mod tee_writer; + +use { + best_effort_sink::BestEffortSink, + bytes::Bytes, + futures::StreamExt, + std::{ + convert::Infallible, + io::{BufWriter, Write}, + }, + tee_writer::TeeWriter, + tokio::sync::{mpsc, oneshot}, + tokio_stream::wrappers::ReceiverStream, +}; + +const CHUNK_SIZE: usize = 64 * 1024; + +/// Two is enough for double buffering: the consumer transmits one block while +/// the serializer produces the next; more only raises the memory ceiling. +const CHANNEL_CAPACITY: usize = 2; + +/// Serializes `value` to JSON on a blocking thread, streaming it into the +/// returned reqwest body. Backpressure from the body channel caps memory. +pub fn stream_body(value: T) -> reqwest::Body +where + T: serde::Serialize + Send + 'static, +{ + stream_into(value, std::io::sink()) +} + +/// Like [`stream_body`], but also gzips the same serialization in one pass. The +/// receiver yields the compressed bytes once serialization finishes. +pub fn stream_body_and_gzip(value: T) -> (reqwest::Body, oneshot::Receiver) +where + T: serde::Serialize + Send + 'static, +{ + let (gzip, compressed) = GzipCapture::new(); + let body = stream_into(value, gzip); + (body, compressed) +} + +/// Serializes `value` into a tee of the body channel and `secondary` on a +/// blocking thread, then finalizes each sink. Serialization always runs to +/// completion even if the request receiver drops, so the secondary sink (the +/// gzip archive) is captured regardless of the request outcome. +fn stream_into(value: T, secondary: S) -> reqwest::Body +where + T: serde::Serialize + Send + 'static, + S: Write + Finalize + Send + 'static, +{ + let (tx, rx) = mpsc::channel::(CHANNEL_CAPACITY); + let body = reqwest::Body::wrap_stream(ReceiverStream::new(rx).map(Ok::<_, Infallible>)); + // spawn_blocking loses the current span; carry it so the logs keep context. + let span = tracing::Span::current(); + tokio::task::spawn_blocking(move || { + let _guard = span.enter(); + // Covers serialization *and* socket transmission (since network "pulls" the + // serialization along). Kept as `serialize_request` phase to avoid breaking + // Grafana dashboards. + let start = std::time::Instant::now(); + + // Best effort channel so if sending the JSON to solver fails, + // we can still upload it to S3 + let channel = BestEffortSink::new(ChannelWriter(tx)); + let mut writer = BufWriter::with_capacity(CHUNK_SIZE, TeeWriter::new(channel, secondary)); + if let Err(err) = serde_json::to_writer(&mut writer, &value) { + tracing::warn!(?err, "serializing streamed request body failed"); + return; + } + let tee = match writer.into_inner() { + Ok(tee) => tee, + Err(err) => { + tracing::warn!(err = ?err.error(), "flushing streamed request body failed"); + return; + } + }; + // Measure before `finalize` so the gzip finish cost stays out of the metric. + observe::metrics::metrics().measure_auction_overhead(start, "driver", "serialize_request"); + tee.finalize(); + }); + body +} + +/// A [`Write`] that sends each block to the body channel, blocking when it's +/// full so serialization can't outpace the request. Dropping it ends the body. +/// A dropped receiver (the solver connection went away) surfaces as an error; +/// wrap it in [`BestEffortSink`] to keep serializing the archive past that +/// point. +struct ChannelWriter(mpsc::Sender); + +impl Write for ChannelWriter { + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.0 + .blocking_send(Bytes::copy_from_slice(data)) + .map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "request body receiver dropped", + ) + })?; + Ok(data.len()) + } + + /// `write` sends each block downstream immediately, so a write effectively + /// always flushes and there's nothing buffered left to do here. + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +/// Consumes a sink once the body is fully written, running any cleanup (e.g. +/// finishing a gzip stream). Only called on success; an aborted serialization +/// drops the sink instead. +trait Finalize { + fn finalize(self); +} + +impl Finalize for std::io::Sink { + fn finalize(self) {} +} + +impl Finalize for ChannelWriter { + /// Dropping the sender closes the channel, ending the request body. + fn finalize(self) {} +} + +/// A [`Write`] sink that gzips into memory and delivers the bytes over a +/// oneshot when finalized. +struct GzipCapture { + writer: s3::GzipWriter, + tx: oneshot::Sender, +} + +impl GzipCapture { + fn new() -> (Self, oneshot::Receiver) { + let (tx, rx) = oneshot::channel(); + ( + Self { + writer: s3::GzipWriter::new(), + tx, + }, + rx, + ) + } +} + +impl Finalize for GzipCapture { + /// A dropped receiver just means the archive was no longer wanted. + fn finalize(self) { + match self.writer.finish() { + Ok(compressed) => { + let _ = self.tx.send(Bytes::from(compressed)); + } + Err(err) => tracing::debug!(?err, "gzip of archived request body failed"), + } + } +} + +impl Write for GzipCapture { + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.writer.write(data) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.writer.flush() + } +} + +#[cfg(test)] +mod tests { + use {super::*, flate2::read::GzDecoder, serde_json::json, std::io::Read}; + + #[tokio::test] + async fn channel_writer_forwards_all_bytes() { + let data = vec![7u8; CHUNK_SIZE * 2 + 123]; + let (tx, mut rx) = mpsc::channel::(CHANNEL_CAPACITY); + let expected = data.clone(); + let writer = tokio::task::spawn_blocking(move || { + let mut writer = BufWriter::with_capacity(CHUNK_SIZE, ChannelWriter(tx)); + writer.write_all(&data).unwrap(); + writer.flush().unwrap(); + }); + + let mut reassembled = Vec::new(); + while let Some(chunk) = rx.recv().await { + reassembled.extend_from_slice(&chunk); + } + writer.await.unwrap(); + + assert_eq!(reassembled, expected); + } + + #[tokio::test] + async fn gzip_capture_matches_serialized_value() { + let value = json!({ "a": 1, "b": [1, 2, 3], "c": "hello" }); + // Keep `body` alive: dropping its receiver would abort serialization + // before the gzip is captured. + let (_body, gzip_rx) = stream_body_and_gzip(value.clone()); + + let compressed = gzip_rx.await.unwrap(); + let mut decoded = Vec::new(); + GzDecoder::new(&compressed[..]) + .read_to_end(&mut decoded) + .unwrap(); + assert_eq!(decoded, serde_json::to_vec(&value).unwrap()); + } + + #[tokio::test] + async fn gzip_captured_even_if_request_body_dropped() { + let value = json!({ "a": 1, "b": [1, 2, 3], "c": "hello" }); + let (body, gzip_rx) = stream_body_and_gzip(value.clone()); + // The solver connection going away mid-stream must not skip the archive. + drop(body); + + let compressed = gzip_rx.await.unwrap(); + let mut decoded = Vec::new(); + GzDecoder::new(&compressed[..]) + .read_to_end(&mut decoded) + .unwrap(); + assert_eq!(decoded, serde_json::to_vec(&value).unwrap()); + } +} diff --git a/crates/driver/src/infra/solver/streaming/tee_writer.rs b/crates/driver/src/infra/solver/streaming/tee_writer.rs new file mode 100644 index 0000000000..541bcc70a3 --- /dev/null +++ b/crates/driver/src/infra/solver/streaming/tee_writer.rs @@ -0,0 +1,65 @@ +use {super::Finalize, std::io::Write}; + +/// A [`Write`] that fans every write out to two writers (à la UNIX `tee`), so +/// one serialization pass feeds two sinks — a request body and a gzip copy. A +/// write succeeds only if it succeeds on both; wrap a sink to make it +/// best-effort if its failure shouldn't abort the other. +pub(super) struct TeeWriter { + primary: A, + secondary: B, +} + +impl TeeWriter { + pub(super) fn new(primary: A, secondary: B) -> Self { + Self { primary, secondary } + } +} + +impl Finalize for TeeWriter { + fn finalize(self) { + self.primary.finalize(); + self.secondary.finalize(); + } +} + +impl Write for TeeWriter { + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.primary.write_all(data)?; + self.secondary.write_all(data)?; + Ok(data.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.primary.flush()?; + self.secondary.flush() + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + flate2::{Compression, read::GzDecoder, write::GzEncoder}, + std::io::Read, + }; + + /// The tee must hand identical bytes to both sinks. + #[test] + fn forwards_to_both_sinks() { + let data = b"the quick brown fox jumps over the lazy dog".repeat(100); + let mut plain = Vec::new(); + let mut gzip = GzEncoder::new(Vec::new(), Compression::new(3)); + { + let mut tee = TeeWriter::new(&mut plain, &mut gzip); + tee.write_all(&data).unwrap(); + } + let compressed = gzip.finish().unwrap(); + + assert_eq!(plain, data); + let mut decoded = Vec::new(); + GzDecoder::new(&compressed[..]) + .read_to_end(&mut decoded) + .unwrap(); + assert_eq!(decoded, data); + } +} diff --git a/crates/s3/src/lib.rs b/crates/s3/src/lib.rs index ba461c7e40..d5ad7d2162 100644 --- a/crates/s3/src/lib.rs +++ b/crates/s3/src/lib.rs @@ -7,11 +7,49 @@ use { primitives::{ByteStream, SdkBody}, }, bytes::Bytes, - flate2::{Compression, bufread::GzEncoder}, + flate2::{Compression, bufread, write}, serde::Serialize, - std::io::Read, + std::io::{Read, Write}, }; +/// gzip level for every object archived to S3, so the eager and streaming +/// compression paths produce identical output. +const COMPRESSION_LEVEL: u32 = 3; + +/// A streaming gzip sink: write plaintext in, then [`GzipWriter::finish`] for +/// the compressed bytes. Lets callers compress while streaming a payload +/// elsewhere, without materializing the full plaintext. +pub struct GzipWriter(write::GzEncoder>); + +impl GzipWriter { + pub fn new() -> Self { + Self(write::GzEncoder::new( + Vec::new(), + Compression::new(COMPRESSION_LEVEL), + )) + } + + pub fn finish(self) -> std::io::Result> { + self.0.finish() + } +} + +impl Default for GzipWriter { + fn default() -> Self { + Self::new() + } +} + +impl Write for GzipWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.flush() + } +} + #[derive(Default)] pub struct Config { pub bucket: String, @@ -58,6 +96,15 @@ impl Uploader { let compressed = tokio::task::spawn_blocking(move || Self::gzip(&content)) .await .context("compression task panicked")??; + self.upload_gzipped(id, Bytes::from(compressed)).await + } + + /// Uploads bytes that are already gzip-compressed JSON, tagging the object + /// with `Content-Encoding: gzip`. + // NOTE: PUT's the whole gzipped blob, if the gzipped JSON is larger than 5MB + // it might be worth it to consider multipart uploads as it's the minimium S3 + // part size + pub async fn upload_gzipped(&self, id: String, compressed: Bytes) -> Result { let key = std::path::Path::new(&self.filename_prefix) .join(format!("{id}.json")) .to_str() @@ -96,7 +143,7 @@ impl Uploader { /// Compresses the input bytes using Gzip. fn gzip(bytes: &[u8]) -> Result> { - let mut encoder = GzEncoder::new(bytes, Compression::new(3)); + let mut encoder = bufread::GzEncoder::new(bytes, Compression::new(COMPRESSION_LEVEL)); let mut encoded: Vec = Vec::with_capacity(bytes.len()); encoder.read_to_end(&mut encoded).context("gzip encoding")?; Ok(encoded)