diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index 41a6673834..6c45b97ffe 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -2,7 +2,7 @@ use { self::solution::settlement, super::{ Mempools, - mempools::SubmissionMode, + mempools::{self, SubmissionMode}, time::{self, Remaining}, }, crate::{ @@ -35,7 +35,7 @@ use { cmp::Reverse, collections::{BTreeMap, HashMap, HashSet, VecDeque}, sync::{Arc, Mutex}, - time::Instant, + time::{Duration, Instant}, }, tokio::{sync::mpsc, task}, tracing::{Instrument, instrument}, @@ -64,6 +64,17 @@ type Balances = HashMap; /// auction still find its settlement. const MAX_CONCURRENT_AUCTIONS: usize = 5; +/// How long an EIP-7702 submission account is benched after it fails to +/// broadcast for an account-specific reason (no gas for the tx, a stuck nonce, +/// a pending tx that can't be replaced). While benched, the account is held out +/// of the selection pool, so the driver stops assigning settlements to a stuck +/// lane until it recovers. The account is retried automatically once the +/// cooldown elapses; a still-broken account is simply re-benched after a single +/// failed attempt. +// ponytail: a fixed cooldown is enough; lift to driver config only if operators +// need per-network tuning. +const UNHEALTHY_ACCOUNT_COOLDOWN: Duration = Duration::from_secs(30); + /// An ongoing competition. There is one competition going on per solver at any /// time. The competition stores settlements to solutions generated by the /// driver, and allows them to be executed onchain when requested later. The @@ -132,6 +143,7 @@ impl SubmitterPool { return Some(SubmitterGuard { inner: GuardInner::Direct(permit), solver_address: self.solver_address, + quarantine: None, }); } @@ -170,6 +182,30 @@ impl SubmitterPool { Some(SubmitterGuard { inner, solver_address: self.solver_address, + quarantine: None, + }) + } + + /// Non-blocking variant of [`acquire`]. Returns `None` when no slot is + /// immediately free. Used to fall back to another submission account on + /// retry: acquiring without blocking (while the failed account's slot is + /// still held) avoids deadlocking against other concurrent settlements. + fn try_acquire(&self) -> Option { + let inner = if let Ok(permit) = Arc::clone(&self.direct_slot).try_acquire_owned() { + GuardInner::Direct(permit) + } else { + let delegated = self.delegated.as_ref()?; + let mut channel = delegated.acquire.try_lock().ok()?; + let account = channel.try_recv().ok()?; + GuardInner::Delegated { + account, + release: delegated.release.clone(), + } + }; + Some(SubmitterGuard { + inner, + solver_address: self.solver_address, + quarantine: None, }) } @@ -184,6 +220,10 @@ impl SubmitterPool { struct SubmitterGuard { inner: GuardInner, solver_address: eth::Address, + /// When set, a delegated account is returned to the pool only after this + /// delay instead of immediately, benching an unhealthy EOA (see + /// [`SubmitterGuard::quarantine`]). Always `None` for the direct slot. + quarantine: Option<(Duration, mempools::AccountFailure)>, } enum GuardInner { @@ -209,13 +249,33 @@ impl SubmitterGuard { GuardInner::Dropped => unreachable!(), } } + + /// Bench the underlying delegated account: once this guard drops, the + /// account is withheld from the pool for `cooldown` so concurrent and + /// subsequent settlements stop selecting a stuck or underfunded lane until + /// it recovers. No-op for the direct solver EOA, which is never benched. + fn quarantine(&mut self, cooldown: Duration, reason: mempools::AccountFailure) { + if matches!(self.inner, GuardInner::Delegated { .. }) { + self.quarantine = Some((cooldown, reason)); + } + } } impl Drop for SubmitterGuard { fn drop(&mut self) { let inner = std::mem::replace(&mut self.inner, GuardInner::Dropped); if let GuardInner::Delegated { account, release } = inner { + let quarantine = self.quarantine; tokio::spawn(async move { + if let Some((cooldown, reason)) = quarantine { + tracing::warn!( + submitter = ?account.address(), + ?reason, + ?cooldown, + "benching unhealthy submission account before returning it to the pool" + ); + tokio::time::sleep(cooldown).await; + } if release.send(account).await.is_err() { tracing::error!("failed to return submission account to pool: channel closed"); } @@ -225,6 +285,54 @@ impl Drop for SubmitterGuard { } } +/// Submit a settlement with health-aware fallback: run `submit` from `guard`, +/// and on an account-specific failure (`SubmitterUnusable`) bench that account +/// and retry from another one while the deadline still holds. Accounts spent on +/// this request are held until it returns, so each retry picks a different one. +/// The submission step is injected as a closure so the fallback flow is +/// unit-testable without a live mempool. +async fn submit_with_fallback( + pool: &SubmitterPool, + mut guard: SubmitterGuard, + cooldown: Duration, + deadline_reached: impl Fn() -> bool, + mut submit: F, +) -> Result +where + F: FnMut(SubmissionMode) -> Fut, + Fut: std::future::Future>, +{ + let mut spent = Vec::new(); + loop { + // acquire()/try_acquire() can wait, so the deadline may pass before we get + // here. Check it before each submit so we never submit past it. + if deadline_reached() { + break Err(mempools::Error::Other(anyhow::anyhow!( + "submission deadline reached before submitting" + ))); + } + let executed = submit(guard.submission_mode()).await; + let Err(mempools::Error::SubmitterUnusable(reason)) = &executed else { + break executed; + }; + let reason = *reason; + // Bench the failed account (with the reason for the log) regardless of + // whether we can retry, so it stops being selected while unhealthy. + guard.quarantine(cooldown, reason); + let Some(next) = pool.try_acquire() else { + // No other account currently free; give up and report the error. + break executed; + }; + tracing::warn!( + failed = ?guard.submission_mode(), + next = ?next.submission_mode(), + ?reason, + "submission account unusable, retrying from another account" + ); + spent.push(std::mem::replace(&mut guard, next)); + } +} + /// Wrapper around a spawned settlement task's [`JoinHandle`]. When dropped /// (e.g. because the HTTP handler was cancelled by the autopilot), the task is /// aborted after a short grace period to allow cleanup (e.g. cancelling a @@ -912,17 +1020,37 @@ impl Competition { // Acquire a submission slot. The pool prefers the direct solver EOA // (no forwarding overhead); falls back to a delegated EIP-7702 // submission account when the solver EOA is busy. + // + // If submission fails because the chosen account is unusable (no gas, + // stale nonce, a pending tx that can't be replaced), bench that account + // and retry the settlement from another one while the deadline still + // holds. Benching withholds the account from the pool for a cooldown so + // concurrent and subsequent settlements stop selecting a stuck lane + // until it recovers. Accounts spent on this request are also held until + // it finishes, so each retry picks a different one. This is safe against + // double-submission: a settlement that actually lands always returns + // `Ok`, so only failures (where nothing was broadcast) are ever retried. let guard = self .submitter_pool .acquire() .await .ok_or(Error::SubmissionError)?; - let mode = guard.submission_mode(); - - let executed = self - .mempools - .execute(&settlement, submission_deadline, &mode) - .await; + let executed = { + let mempools = &self.mempools; + let settlement = &settlement; + submit_with_fallback( + &self.submitter_pool, + guard, + UNHEALTHY_ACCOUNT_COOLDOWN, + || self.eth.current_block().borrow().number >= submission_deadline.0, + |mode| async move { + mempools + .execute(settlement, submission_deadline, &mode) + .await + }, + ) + .await + }; notify::executed( &self.solver, @@ -1102,3 +1230,279 @@ pub enum Error { #[error("could not parse the request")] MalformedRequest, } + +#[cfg(test)] +mod submitter_pool_tests { + use { + super::*, + alloy::primitives::{Address, address}, + }; + + const SOLVER: eth::Address = address!("0000000000000000000000000000000000000099"); + + fn delegated_address(guard: &SubmitterGuard) -> eth::Address { + match guard.submission_mode() { + SubmissionMode::Delegated { submitter_eoa, .. } => submitter_eoa, + other => panic!("expected a delegated slot, got {other:?}"), + } + } + + /// An account benched via [`SubmitterGuard::quarantine`] must not be handed + /// back out while it cools down, whereas a normally-released account + /// returns to the pool immediately. This is the core of "stop assigning + /// settlements to a stuck lane until it recovers" from issue #4541. + #[tokio::test] + async fn quarantined_account_is_withheld_from_the_pool() { + let pool = SubmitterPool::new( + SOLVER, + vec![ + Account::Address(Address::with_last_byte(1)), + Account::Address(Address::with_last_byte(2)), + ], + 0, + ); + + // Direct slot first, then both delegated accounts; pool now drained. + let direct = pool.acquire().await.expect("direct slot"); + let mut first = pool.try_acquire().expect("first delegated account"); + let second = pool.try_acquire().expect("second delegated account"); + assert!(pool.try_acquire().is_none(), "pool should be drained"); + + let benched = delegated_address(&first); + let released = delegated_address(&second); + + // Bench `first` for a cooldown long enough to outlast the test; release + // `second` the normal way. + first.quarantine(Duration::from_secs(3600), mempools::AccountFailure::Nonce); + drop(first); + drop(second); + + // Let the (non-benched) release task return `second` to the pool. + tokio::time::sleep(Duration::from_millis(50)).await; + + let reacquired = pool.try_acquire().expect("released account returns"); + assert_eq!(delegated_address(&reacquired), released); + assert_ne!(delegated_address(&reacquired), benched); + assert!( + pool.try_acquire().is_none(), + "benched account must stay out of the pool while cooling down" + ); + + drop(direct); + drop(reacquired); + } + + /// The direct solver EOA is the primary submitter and must never be + /// benched, even if it is the account that failed. + #[test] + fn direct_slot_is_never_quarantined() { + let permit = Arc::new(tokio::sync::Semaphore::new(1)) + .try_acquire_owned() + .unwrap(); + let mut guard = SubmitterGuard { + inner: GuardInner::Direct(permit), + solver_address: SOLVER, + quarantine: None, + }; + guard.quarantine(Duration::from_secs(3600), mempools::AccountFailure::Nonce); + assert!( + guard.quarantine.is_none(), + "direct slot must not be quarantined" + ); + } + + use std::cell::RefCell; + + fn txid(b: u8) -> eth::TxId { + eth::TxId(alloy::primitives::B256::repeat_byte(b)) + } + + /// On an account-specific failure the settlement is retried from a + /// different account: here Direct fails, then the delegated account + /// succeeds. + #[tokio::test] + async fn fallback_retries_from_a_second_account() { + let pool = SubmitterPool::new( + SOLVER, + vec![Account::Address(Address::with_last_byte(1))], + 0, + ); + let guard = pool.acquire().await.expect("direct slot"); + let modes = RefCell::new(Vec::new()); + let result = submit_with_fallback( + &pool, + guard, + Duration::from_secs(3600), + || false, + |mode| { + let mut modes = modes.borrow_mut(); + let n = modes.len(); + modes.push(mode); + std::future::ready(if n == 0 { + Err(mempools::Error::SubmitterUnusable( + mempools::AccountFailure::Nonce, + )) + } else { + Ok(txid(0x11)) + }) + }, + ) + .await; + let modes = modes.into_inner(); + assert_eq!(modes.len(), 2, "should have tried a second account"); + assert!(matches!(modes[0], SubmissionMode::Direct(_))); + assert!(matches!(modes[1], SubmissionMode::Delegated { .. })); + assert!(matches!(result, Ok(h) if h.0 == txid(0x11).0)); + } + + /// Fallback can also go the other way: a delegated account fails, then the + /// freed direct solver EOA takes over. + #[tokio::test] + async fn fallback_can_go_from_delegated_to_direct() { + let pool = SubmitterPool::new( + SOLVER, + vec![Account::Address(Address::with_last_byte(1))], + 0, + ); + let direct = pool.acquire().await.expect("direct slot"); + let guard = pool.try_acquire().expect("delegated account"); + drop(direct); // free the direct slot so the retry can grab it + let modes = RefCell::new(Vec::new()); + let result = submit_with_fallback( + &pool, + guard, + Duration::from_secs(3600), + || false, + |mode| { + let mut modes = modes.borrow_mut(); + let n = modes.len(); + modes.push(mode); + std::future::ready(if n == 0 { + Err(mempools::Error::SubmitterUnusable( + mempools::AccountFailure::InsufficientFunds, + )) + } else { + Ok(txid(0x22)) + }) + }, + ) + .await; + let modes = modes.into_inner(); + assert_eq!(modes.len(), 2); + assert!(matches!(modes[0], SubmissionMode::Delegated { .. })); + assert!(matches!(modes[1], SubmissionMode::Direct(_))); + assert!(matches!(result, Ok(h) if h.0 == txid(0x22).0)); + } + + /// With no other account free, the account failure is surfaced after a + /// single attempt rather than retried. + #[tokio::test] + async fn no_retry_when_no_other_account_is_free() { + let pool = SubmitterPool::new(SOLVER, vec![], 0); + let guard = pool.acquire().await.expect("direct slot"); + let modes = RefCell::new(Vec::new()); + let result = submit_with_fallback( + &pool, + guard, + Duration::from_secs(3600), + || false, + |mode| { + modes.borrow_mut().push(mode); + std::future::ready(Err(mempools::Error::SubmitterUnusable( + mempools::AccountFailure::Nonce, + ))) + }, + ) + .await; + assert_eq!(modes.into_inner().len(), 1); + assert!(matches!( + result, + Err(mempools::Error::SubmitterUnusable( + mempools::AccountFailure::Nonce + )) + )); + } + + /// A deadline that passed while waiting for a slot fails fast without ever + /// submitting. + #[tokio::test] + async fn no_submit_when_deadline_already_passed() { + let pool = SubmitterPool::new(SOLVER, vec![], 0); + let guard = pool.acquire().await.expect("direct slot"); + let modes = RefCell::new(Vec::new()); + let result = submit_with_fallback( + &pool, + guard, + Duration::from_secs(3600), + || true, // deadline already reached + |mode| { + modes.borrow_mut().push(mode); + std::future::ready(Ok(txid(0x33))) + }, + ) + .await; + assert!( + modes.into_inner().is_empty(), + "must not submit past the deadline" + ); + assert!(matches!(result, Err(mempools::Error::Other(_)))); + } + + /// A non-account error (e.g. a connection reset, whose tx may be on the + /// wire) is never retried, even when another account is available. + #[tokio::test] + async fn no_retry_on_a_non_account_error() { + let pool = SubmitterPool::new( + SOLVER, + vec![Account::Address(Address::with_last_byte(1))], + 0, + ); + let guard = pool.acquire().await.expect("direct slot"); + let modes = RefCell::new(Vec::new()); + let result = submit_with_fallback( + &pool, + guard, + Duration::from_secs(3600), + || false, + |mode| { + modes.borrow_mut().push(mode); + std::future::ready(Err(mempools::Error::Other(anyhow::anyhow!( + "connection reset" + )))) + }, + ) + .await; + assert_eq!( + modes.into_inner().len(), + 1, + "a non-account error is not retried" + ); + assert!(matches!(result, Err(mempools::Error::Other(_)))); + } + + /// A settlement that lands on the first attempt returns immediately with no + /// retry. + #[tokio::test] + async fn no_retry_on_immediate_success() { + let pool = SubmitterPool::new( + SOLVER, + vec![Account::Address(Address::with_last_byte(1))], + 0, + ); + let guard = pool.acquire().await.expect("direct slot"); + let modes = RefCell::new(Vec::new()); + let result = submit_with_fallback( + &pool, + guard, + Duration::from_secs(3600), + || false, + |mode| { + modes.borrow_mut().push(mode); + std::future::ready(Ok(txid(0x44))) + }, + ) + .await; + assert_eq!(modes.into_inner().len(), 1); + assert!(matches!(result, Ok(h) if h.0 == txid(0x44).0)); + } +} diff --git a/crates/driver/src/domain/mempools.rs b/crates/driver/src/domain/mempools.rs index 95e2feb325..ec18f59046 100644 --- a/crates/driver/src/domain/mempools.rs +++ b/crates/driver/src/domain/mempools.rs @@ -69,16 +69,51 @@ impl Mempools { ) -> Result { let mut stats = vec![Outcome::Superseded; self.mempools.len()]; + // Capture an account-specific failure reported by any mempool so it is + // not masked by a different error that `select_ok` happens to return + // last (it yields the LAST error when all futures fail). The caller + // relies on `SubmitterUnusable` escaping here to bench the account and + // retry the settlement from another one. + let account_failure: std::sync::Arc>> = + Default::default(); + // Set once any mempool actually broadcasts a tx. After that, retrying from + // another account could double-submit the settlement, so `race_error` must + // not surface a retryable `SubmitterUnusable` even if a sibling mempool + // rejected pre-broadcast. + let any_broadcast = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + // Set if any mempool fails before broadcast with an ambiguous error whose tx + // might still reach the chain (timeout, connection reset, `already known`, + // `nonce too high`, ...). Unlike a clean account rejection, such a failure + // means we can't be sure nothing was sent, so it must not trigger a retry. + let saw_nonretryable = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let res = select_ok(self.mempools.iter().zip(stats.iter_mut()).map( |(mempool, stat)| { + let account_failure = std::sync::Arc::clone(&account_failure); + let any_broadcast = std::sync::Arc::clone(&any_broadcast); + let saw_nonretryable = std::sync::Arc::clone(&saw_nonretryable); async move { let result = self - .submit(mempool, settlement, submission_deadline, mode) + .submit(mempool, settlement, submission_deadline, mode, &any_broadcast) .instrument(tracing::info_span!("mempool", kind = %mempool)) .await; // Log inline so errors from mempools that later get superseded still surface; // metrics are emitted from `update_metrics` once the race outcome is known. observe::mempool_log(mempool, settlement, &result); + match &result { + Err(Error::SubmitterUnusable(reason)) => { + *account_failure.lock().unwrap() = Some(*reason); + } + // A disabled mempool was skipped without touching the network. + Err(Error::Disabled) => {} + // Any other error (timeout, connection reset, `already known`, + // `nonce too high`, ...) may have put a tx on the wire, so it + // must block a retry from another account. + Err(_) => { + saw_nonretryable.store(true, std::sync::atomic::Ordering::SeqCst) + } + Ok(_) => {} + } *stat = Outcome::from(&result); result } @@ -92,7 +127,15 @@ impl Mempools { self.update_metrics(&stats); - Ok(res?.tx_hash) + match res { + Ok(success) => Ok(success.tx_hash), + Err(err) => Err(race_error( + err, + *account_failure.lock().unwrap(), + any_broadcast.load(std::sync::atomic::Ordering::SeqCst), + saw_nonretryable.load(std::sync::atomic::Ordering::SeqCst), + )), + } } /// A mempool is disabled if all of the following are true: @@ -125,6 +168,7 @@ impl Mempools { settlement: &Settlement, submission_deadline: BlockNo, mode: &SubmissionMode, + broadcasted: &std::sync::atomic::AtomicBool, ) -> Result { if self.is_disabled(mempool, settlement) { return Err(Error::Disabled); @@ -209,6 +253,32 @@ impl Mempools { ?signer, "submitting settlement tx" ); + + // Proactively check the signer can cover the gas before broadcasting, so an + // underfunded account falls back to another one without a wasted submission + // (issue #4541). A failed balance lookup is not authoritative, so proceed + // and let the node decide. + let required_balance = settlement + .gas + .required_balance(eth::U256::from(final_gas_price.max_fee_per_gas)); + match self.ethereum.balance(signer).await { + Ok(balance) if balance < required_balance => { + tracing::warn!( + ?signer, + ?balance, + ?required_balance, + "submission account balance too low for gas, falling back" + ); + return Err(Error::SubmitterUnusable(AccountFailure::InsufficientFunds)); + } + Ok(_) => {} + Err(err) => tracing::warn!( + ?signer, + ?err, + "could not check submission account balance before submitting" + ), + } + let hash = mempool .submit( tx.clone(), @@ -218,6 +288,9 @@ impl Mempools { nonce, ) .await?; + // The tx is now on the wire. Record it so a sibling mempool's pre-broadcast + // account failure can't trigger a retry that double-submits this settlement. + broadcasted.store(true, std::sync::atomic::Ordering::SeqCst); // Wait for the transaction to be mined, expired or failing. let result = async { @@ -503,6 +576,10 @@ impl From<&Result> for Outcome { reason: "Expired", blocks_passed: err.blocks_passed(), }, + Err(Error::SubmitterUnusable(_)) => Outcome::Failed { + reason: "SubmitterUnusable", + blocks_passed: None, + }, Err(Error::Other(_)) => Outcome::Failed { reason: "Other", blocks_passed: None, @@ -632,10 +709,126 @@ pub enum Error { }, #[error("Strategy disabled for this tx")] Disabled, + /// The submission account could not broadcast the transaction for a reason + /// specific to that account (e.g. insufficient gas funds, stale nonce, a + /// pending tx that can't be replaced). Nothing was broadcast, so the same + /// settlement can safely be retried from a different account. + #[error("submission account unusable: {0}")] + SubmitterUnusable(AccountFailure), #[error("Failed to submit: {0:?}")] Other(#[from] anyhow::Error), } +/// Account-specific reasons a node rejects a transaction at submission time, +/// before it is broadcast. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum AccountFailure { + InsufficientFunds, + Nonce, + ReplacementUnderpriced, +} + +impl AccountFailure { + /// Stable label for metrics/logging. + pub fn as_str(self) -> &'static str { + match self { + AccountFailure::InsufficientFunds => "insufficient_funds", + AccountFailure::Nonce => "nonce", + AccountFailure::ReplacementUnderpriced => "replacement_underpriced", + } + } +} + +impl std::fmt::Display for AccountFailure { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Classify a failed `eth_sendRawTransaction` by its error message. Returns +/// `Some` only for failures specific to the *sending account*; transaction- +/// and node-level failures return `None` so they keep their existing handling. +/// +/// `already known` is intentionally excluded: it means our exact transaction is +/// already in the mempool and may still be mined, so it is not safe to retry +/// from another account. +pub fn classify_submission_failure(message: &str) -> Option { + let message = message.to_lowercase(); + // Match on keyword pairs rather than exact phrases so non-Geth clients are + // also covered (e.g. Nethermind's `SenderInsufficientFunds` / `NonceTooLow`, + // which omit the spaces Geth uses). + if message.contains("insufficient") && message.contains("funds") { + Some(AccountFailure::InsufficientFunds) + } else if message.contains("nonce") && message.contains("low") { + Some(AccountFailure::Nonce) + } else if message.contains("replacement") && message.contains("underpriced") { + // Only *replacement* underpricing is account-specific (a stuck nonce that + // another account avoids). A plain/global "underpriced" means the fee is + // below the node's minimum, which every account hits the same way, so it + // must not be benched or retried. + Some(AccountFailure::ReplacementUnderpriced) + } else { + None + } +} + +/// Choose which error to surface when the mempool race produced no success. +/// +/// `select_ok` yields whichever error finished last, so a `SubmitterUnusable` +/// reported by one mempool can be masked by a generic error (e.g. a timeout) +/// from another. The caller benches the account and retries on +/// `SubmitterUnusable`, so surface it whenever a mempool reported one, unless +/// one of the following holds: `last_error` is a settlement-specific terminal +/// failure (revert/expired), which is authoritative; `broadcasted` is set, +/// meaning some mempool already put a tx on the wire and retrying could +/// double-submit; or `saw_nonretryable` is set, meaning a mempool failed before +/// broadcast with an ambiguous error (timeout, connection reset, `already +/// known`, `nonce too high`) whose tx might still be live, which is likewise +/// unsafe to retry. +fn race_error( + last_error: Error, + account_failure: Option, + broadcasted: bool, + saw_nonretryable: bool, +) -> Error { + // A settlement-specific terminal failure is authoritative and must not be + // retried from another account, so it always wins over an account failure. + if matches!( + last_error, + Error::Revert { .. } | Error::SimulationRevert { .. } | Error::Expired { .. } + ) { + return last_error; + } + // Once a tx is on the wire, never surface a retryable account failure: + // downgrade even a `SubmitterUnusable` that `select_ok` happened to return + // last, since a retry from another account could double-submit the + // settlement. + if broadcasted { + return match last_error { + Error::SubmitterUnusable(reason) => Error::Other(anyhow!( + "submission account unusable ({reason}) after a transaction was already broadcast" + )), + other => other, + }; + } + match account_failure { + // Only retry when every active mempool failed cleanly before sending. If a + // sibling lane returned an ambiguous error, its tx might still be on the + // wire, so retrying from another account could double-submit; keep the real + // error instead, stripping a `SubmitterUnusable` that `select_ok` happened + // to return last so it can't leak a retry. + Some(reason) if !saw_nonretryable => Error::SubmitterUnusable(reason), + Some(reason) => match last_error { + Error::SubmitterUnusable(_) => Error::Other(anyhow!( + "submission account unusable ({reason}) alongside an ambiguous failure from \ + another mempool; not retrying" + )), + other => other, + }, + None => last_error, + } +} + impl Error { /// Number of blocks between the first submission and when the error was /// returned, if the error carries that timing. @@ -655,7 +848,7 @@ impl Error { submission_deadline, .. } => (*submitted_at_block, *submission_deadline), - Self::Disabled | Self::Other(_) => return None, + Self::Disabled | Self::SubmitterUnusable(_) | Self::Other(_) => return None, }; Some(end.saturating_sub(start).0) } @@ -745,4 +938,145 @@ mod tests { SUBMISSION_NONCE )); } + + #[test] + fn classifies_account_specific_submission_failures() { + use AccountFailure::*; + // Geth-family messages, with the surrounding wrapper text nodes add. + assert_eq!( + classify_submission_failure( + "server returned an error response: error code -32000: insufficient funds for gas \ + * price + value" + ), + Some(InsufficientFunds) + ); + assert_eq!( + classify_submission_failure("error code -32000: nonce too low"), + Some(Nonce) + ); + assert_eq!( + classify_submission_failure("replacement transaction underpriced"), + Some(ReplacementUnderpriced) + ); + // Case-insensitive. + assert_eq!( + classify_submission_failure("Insufficient Funds For Transfer"), + Some(InsufficientFunds) + ); + } + + #[test] + fn does_not_classify_non_account_failures_as_account_specific() { + // Settlement-level and node-level failures must NOT be retried from a + // different account. + assert_eq!(classify_submission_failure("execution reverted"), None); + // `already known` means our exact tx is already pending and may mine; + // retrying elsewhere could double-submit, so it must not be classified. + assert_eq!(classify_submission_failure("already known"), None); + assert_eq!(classify_submission_failure("too many requests"), None); + assert_eq!( + classify_submission_failure("connection reset by peer"), + None + ); + // `nonce too high` is a gap (the tx gets queued), not a clean + // rejection, so it must not be treated as retryable. + assert_eq!(classify_submission_failure("nonce too high"), None); + // Plain/global underpricing is a node/network fee-floor rejection, not + // account-specific: another account uses the same fee path, so retrying + // would just re-fail and bench every submitter. + assert_eq!(classify_submission_failure("transaction underpriced"), None); + assert_eq!( + classify_submission_failure("max fee per gas less than block base fee"), + None + ); + } + + #[test] + fn classifies_non_geth_client_wording() { + use AccountFailure::*; + // Clients such as Nethermind use spaceless variants; keyword-pair + // matching still classifies them. + assert_eq!( + classify_submission_failure("SenderInsufficientFunds"), + Some(InsufficientFunds) + ); + assert_eq!(classify_submission_failure("NonceTooLow"), Some(Nonce)); + assert_eq!( + classify_submission_failure("ReplacementTransactionUnderpriced"), + Some(ReplacementUnderpriced) + ); + } + + #[test] + fn account_failure_is_surfaced_over_a_masking_race_error() { + use AccountFailure::*; + // Pre-broadcast, every lane failed cleanly: the account-specific failure is + // surfaced so the settlement is benched and retried (issue #4541), even when + // `select_ok` returns a different clean error (here `Disabled`) last. + assert!(matches!( + race_error(Error::Disabled, Some(Nonce), false, false), + Error::SubmitterUnusable(Nonce) + )); + // ...and when the account failure is itself the error returned last. + assert!(matches!( + race_error(Error::SubmitterUnusable(Nonce), Some(Nonce), false, false), + Error::SubmitterUnusable(Nonce) + )); + // Pre-broadcast but a sibling lane returned an ambiguous error (timeout, + // connection reset, `already known`, `nonce too high`): its tx might still be + // on the wire, so the account failure must NOT be surfaced, or the settlement + // could be retried from another EOA and double-submitted. + assert!(matches!( + race_error( + Error::Other(anyhow!("connection reset")), + Some(Nonce), + false, + true, + ), + Error::Other(_) + )); + // Even when the account failure is the error returned last, an ambiguous + // sibling downgrades it so the caller does not retry. + assert!(matches!( + race_error(Error::SubmitterUnusable(Nonce), Some(Nonce), false, true), + Error::Other(_) + )); + // With no account-specific failure, the original error is preserved. + assert!(matches!( + race_error(Error::Disabled, None, false, false), + Error::Disabled + )); + // A settlement-specific terminal failure is authoritative and must never + // be turned into a retryable account failure. + assert!(matches!( + race_error( + Error::SimulationRevert { + submitted_at_block: BlockNo(1), + reverted_at_block: BlockNo(2), + }, + Some(InsufficientFunds), + false, + false, + ), + Error::SimulationRevert { .. } + )); + // Post-broadcast: once a mempool put a tx on the wire, an account failure + // another mempool reported must NOT be surfaced, or the settlement could be + // retried from another EOA and double-submitted. + assert!(matches!( + race_error( + Error::Other(anyhow!("Block stream finished unexpectedly")), + Some(Nonce), + true, + false, + ), + Error::Other(_) + )); + // Even when the last error is itself the pre-broadcast account failure, a + // broadcast elsewhere downgrades it so the caller does not retry. + assert!(matches!( + race_error(Error::SubmitterUnusable(Nonce), Some(Nonce), true, false), + Error::Other(_) + )); + } } diff --git a/crates/driver/src/infra/mempool/mod.rs b/crates/driver/src/infra/mempool/mod.rs index 5b7d3c9884..249e3d97a3 100644 --- a/crates/driver/src/infra/mempool/mod.rs +++ b/crates/driver/src/infra/mempool/mod.rs @@ -175,7 +175,15 @@ impl Mempool { ?signer, "failed to submit tx to mempool" ); - Err(mempools::Error::Other(err)) + // A rejection at broadcast time means nothing was sent, so an + // account-specific failure (no gas, stale nonce, ...) can be + // retried from a different submission account. Format the whole + // error chain (`{:#}`) so the node's message is matched even if + // context is wrapped around it. + match mempools::classify_submission_failure(&format!("{err:#}")) { + Some(reason) => Err(mempools::Error::SubmitterUnusable(reason)), + None => Err(mempools::Error::Other(err)), + } } } } diff --git a/crates/driver/src/infra/notify/mod.rs b/crates/driver/src/infra/notify/mod.rs index 8e6d8d97d9..6ac92ea541 100644 --- a/crates/driver/src/infra/notify/mod.rs +++ b/crates/driver/src/infra/notify/mod.rs @@ -114,7 +114,9 @@ pub fn executed( Err(Error::Revert { tx_id: hash, .. }) => notification::Settlement::Revert(*hash), Err(Error::SimulationRevert { .. }) => notification::Settlement::SimulationRevert, Err(Error::Expired { .. }) => notification::Settlement::Expired, - Err(Error::Other(_) | Error::Disabled) => notification::Settlement::Fail, + Err(Error::Other(_) | Error::Disabled | Error::SubmitterUnusable(_)) => { + notification::Settlement::Fail + } }; solver.notify( diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index e10f15be34..9e8c1796d4 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -391,7 +391,8 @@ pub fn mempool_log( } /// Emit per-mempool race counters with the final, reclassified label -/// (`Success` / `Revert` / `Expired` / `Other` / `Superseded` / `Disabled`). +/// (`Success` / `Revert` / `Expired` / `SubmitterUnusable` / `Other` / +/// `Superseded` / `Disabled`). /// Called once per mempool after the race resolves. pub fn mempool_submission_result(mempool: &Mempool, label: &str, blocks_passed: Option) { let name = mempool.to_string();