Skip to content
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ prometheus = { workspace = true }
prometheus-metric-storage = { workspace = true }
rand = { workspace = true }
request-sharing = { workspace = true }
reqwest = { workspace = true, features = ["query"] }
reqwest = { workspace = true, features = ["query", "stream"] }
s3 = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde-ext = { workspace = true }
Expand All @@ -67,6 +67,7 @@ thiserror = { workspace = true }
tikv-jemallocator = { workspace = true }
token-info = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "time"] }
tokio-stream = { workspace = true }
toml = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true, features = ["decompression-br", "limit", "trace"] }
Expand All @@ -92,6 +93,7 @@ alloy = { workspace = true, features = ["signer-mnemonic"] }
app-data = { workspace = true, features = ["test_helpers"] }
contracts = { workspace = true }
ethrpc = { workspace = true, features = ["test-util"] }
flate2 = { workspace = true }
maplit = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["process", "test-util"] }
Expand Down
18 changes: 14 additions & 4 deletions crates/driver/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
},
bytes::Bytes,
std::sync::Arc,
tokio::sync::oneshot,
tracing::Instrument,
};

Expand Down Expand Up @@ -52,16 +53,25 @@ impl Persistence {
}
}

/// Saves the given auction with liquidity with fire and forget mentality
/// (non-blocking operation)
pub fn archive_auction(&self, auction_id: Id, body: Bytes) {
/// Whether auction archival to S3 is configured.
pub fn archives_enabled(&self) -> bool {
self.s3.is_some()
}

/// Archives the auction body to S3 (fire and forget). The body is
/// gzip-compressed while streaming to the solver; the compressed bytes
/// arrive through `compressed` once serialization finishes.
pub fn archive_auction_gzipped(&self, auction_id: Id, compressed: oneshot::Receiver<Bytes>) {
let Some(uploader) = self.s3.clone() else {
return;
};
tokio::spawn(
async move {
let Ok(compressed) = compressed.await else {
return;
};
match uploader
.upload_json_bytes(auction_id.to_string(), body)
.upload_gzipped(auction_id.to_string(), compressed)
.await
{
Ok(key) => {
Expand Down
80 changes: 20 additions & 60 deletions crates/driver/src/infra/solver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,19 @@ use {
signers::{Signature, aws::AwsSigner, local::PrivateKeySigner},
},
anyhow::Result,
bytes::Bytes,
derive_more::{From, Into},
eth_domain_types as eth,
num::BigRational,
observe::tracing::distributed::headers::tracing_headers,
reqwest::header::HeaderName,
std::{
collections::HashMap,
sync::atomic::{AtomicUsize, Ordering},
time::{Duration, Instant},
},
std::{collections::HashMap, time::Duration},
thiserror::Error,
tracing::{Instrument, instrument},
};

pub mod dto;
pub mod eip7702;
mod streaming;

// TODO At some point I should be checking that the names are unique, I don't
// think I'm doing that.
Expand Down Expand Up @@ -369,8 +365,6 @@ impl Solver {
auction: &Auction,
liquidity: &[liquidity::Liquidity],
) -> Result<Vec<Solution>, Error> {
let start = Instant::now();

let flashloan_hints = self.assemble_flashloan_hints(auction);
let wrappers = self.assemble_wrappers(auction);

Expand All @@ -389,21 +383,26 @@ impl Solver {
self.config.haircut_bps,
);

let body = serialize_body(auction_dto);
let url = shared::url::join(&self.config.endpoint, "solve");

if let Some(id) = auction.id() {
// Only auctions with IDs are real auctions (/quote requests don't have an ID).
// Only for those it makes sense to archive them and measure the execution time.
self.persistence.archive_auction(id, body.clone());
::observe::metrics::metrics().measure_auction_overhead(
start,
"driver",
"serialize_request",
);
}
// Real auctions (those with an ID) are archived to S3; quotes aren't, so
// they skip the gzip capture entirely and just stream the body.
let archive_id = self
.persistence
.archives_enabled()
.then(|| auction.id())
.flatten();
let body: reqwest::Body = match archive_id {
// Stream the request body while capturing a gzipped copy for S3, so
// neither the request nor the archive holds the full JSON at once.
Some(id) => {
let (body, compressed) = streaming::stream_body_and_gzip(auction_dto);
self.persistence.archive_auction_gzipped(id, compressed);
body
}
None => streaming::stream_body(auction_dto),
};
Comment on lines +395 to +404

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior change worth a second look: S3 archival is now coupled to the solver request streaming successfully to completion.

On main, the auction was serialized eagerly into Bytes and archive_auction uploaded that buffer independently of the solver request outcome. Here, the gzipped bytes only reach archive_auction_gzipped when finalize() runs, which only happens if serde_json::to_writer completes — and that requires reqwest to pull the entire body. If the solver connection drops or times out mid-upload, ChannelWriter::blocking_send errors → serialization aborts early → GzipCapture is dropped without finalize() → the oneshot sender is dropped → the archive is silently skipped.

Net effect: auctions whose solver request fails partway through transmission no longer get archived — which may be exactly the auctions you'd want to inspect later. Is dropping the archive in that case intended? If not, consider decoupling the archive from the request stream (e.g. tee into an independent buffer rather than gating finalize on full request consumption).

Secondary note: the spawn_blocking thread is now held for the full duration of the body upload (it blocks on blocking_send until reqwest drains each chunk), rather than being released right after producing the buffer. Per-solve that's a blocking-pool thread occupied for the whole solver round-trip.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I solved this with the TeeWriter and the BestEffortSink, which will allow the TeeWriter to continue uploading to S3 when this scenarion happens


let url = shared::url::join(&self.config.endpoint, "solve");
super::observe::solver_request(&url, &body);
let timeout = match auction.deadline(self.timeouts()).solvers().remaining() {
Ok(timeout) => timeout,
Err(_) => {
Expand Down Expand Up @@ -543,45 +542,6 @@ impl Solver {
}
}

/// Serializes the request body in a way that avoid re-allocating while not
/// overallocating a lot of memory. It does that by keeping track of the biggest
/// request seen for each kind of request (quote with/without liquidity, full
/// auction with/without liquidity) and allocating the correct amount of memory
/// based on the data in the auction.
fn serialize_body(auction_dto: solvers_dto::auction::Auction) -> Bytes {
// these values store the biggest allocation we needed for each
// category of requests
static QUOTE_WITH_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000);
static QUOTE_WITHOUT_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000);
static AUCTION_WITH_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000);
static AUCTION_WITHOUT_LIQUIDITY: AtomicUsize = AtomicUsize::new(1_000);

// based on the "shape" of the auction we pick the allocation size
let is_auction = auction_dto.id.is_some();
let with_liquidity = !auction_dto.liquidity.is_empty();
let memory_target = match (is_auction, with_liquidity) {
(false, false) => &QUOTE_WITHOUT_LIQUIDITY,
(false, true) => &QUOTE_WITH_LIQUIDITY,
(true, false) => &AUCTION_WITHOUT_LIQUIDITY,
(true, true) => &AUCTION_WITH_LIQUIDITY,
};

// pre-allocate biggest request size + 0.5% (to avoid re-allocations when
// the request grows only slightly)
let pre_alloc_size = memory_target.load(Ordering::Relaxed);
let pre_alloc_size = pre_alloc_size + pre_alloc_size * 5 / 1_000;
let mut buffer = Vec::with_capacity(pre_alloc_size);

serde_json::to_writer(&mut buffer, &auction_dto).unwrap();

// now that we know how much memory was actually needed we update the memory
// targets
memory_target.fetch_max(buffer.len(), Ordering::Relaxed);
tracing::trace!(pre_alloc_size, final_size = buffer.len(), "body allocation");

Bytes::from(buffer)
}

#[cfg(test)]
mod tests {
use {
Expand Down
67 changes: 67 additions & 0 deletions crates/driver/src/infra/solver/streaming/best_effort_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use {super::Finalize, std::io::Write};

/// A [`Write`] adapter that makes its inner writer best-effort: on the first
/// error it logs once and drops the inner, after which writes are accepted as
/// no-ops. Lets a non-critical sink fall out of a tee without aborting the
/// sinks that must finish.
pub(super) struct BestEffortSink<W>(Option<W>);

impl<W> BestEffortSink<W> {
pub(super) fn new(inner: W) -> Self {
Self(Some(inner))
}
}

impl<W: Write> Write for BestEffortSink<W> {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
if let Some(inner) = &mut self.0
&& let Err(err) = inner.write_all(data)
{
// The sink was declared best-effort, so its failure is non-critical:
// log it, stop writing to it, and let the remaining sinks carry on.
tracing::debug!(?err, "best-effort sink failed; dropping it");
self.0 = None;
}
Ok(data.len())
}

fn flush(&mut self) -> std::io::Result<()> {
if let Some(inner) = &mut self.0 {
let _ = inner.flush();
}
Ok(())
}
}

impl<W: Finalize> Finalize for BestEffortSink<W> {
fn finalize(self) {
if let Some(inner) = self.0 {
inner.finalize();
}
}
}

#[cfg(test)]
mod tests {
use super::*;

/// A failing inner must not surface as an error to the caller.
#[test]
fn swallows_inner_failure() {
struct Failing;
impl Write for Failing {
fn write(&mut self, _: &[u8]) -> std::io::Result<usize> {
Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "gone"))
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

let mut writer = BestEffortSink::new(Failing);
assert_eq!(writer.write(b"hello").unwrap(), 5);
assert_eq!(writer.write(b"world").unwrap(), 5);
writer.flush().unwrap();
}
}
Loading
Loading