From 6e1b23b4f2041cb0b53937a41e81b332dec7407d Mon Sep 17 00:00:00 2001 From: ashleychandy Date: Mon, 15 Jun 2026 15:40:56 +0000 Subject: [PATCH 1/4] add block-pinned balance fetching --- crates/account-balances/src/cached.rs | 706 +++++++++++++++--- crates/account-balances/src/lib.rs | 70 +- crates/account-balances/src/simulation.rs | 82 +- crates/autopilot/src/solvable_orders.rs | 33 +- .../src/domain/competition/pre_processing.rs | 2 +- 5 files changed, 703 insertions(+), 190 deletions(-) diff --git a/crates/account-balances/src/cached.rs b/crates/account-balances/src/cached.rs index f06362a29e..b608402313 100644 --- a/crates/account-balances/src/cached.rs +++ b/crates/account-balances/src/cached.rs @@ -1,5 +1,5 @@ use { - crate::{BalanceFetching, Query, TransferSimulationError}, + crate::{BalanceFetching, BlockNumber, Query, TransferSimulationError}, alloy_primitives::U256, anyhow::Result, ethrpc::block_stream::{CurrentBlockWatcher, into_stream}, @@ -12,63 +12,261 @@ use { tracing::{Instrument, instrument}, }; -type BlockNumber = u64; +#[derive(Debug, Clone, Copy)] +pub struct CachePolicy { + staleness_tolerance: BlockNumber, + eviction_time: BlockNumber, +} + +impl Default for CachePolicy { + fn default() -> Self { + Self { + staleness_tolerance: Self::DEFAULT_STALENESS_TOLERANCE, + eviction_time: 5, + } + } +} + +impl CachePolicy { + const DEFAULT_STALENESS_TOLERANCE: BlockNumber = 1; + + fn is_within_staleness_tolerance( + &self, + updated_block: BlockNumber, + current_block: BlockNumber, + ) -> bool { + let oldest_acceptable = current_block.saturating_sub(self.staleness_tolerance); + updated_block >= oldest_acceptable && updated_block <= current_block + } + + fn should_retain(&self, block: BlockNumber, current_block: BlockNumber) -> bool { + block >= current_block.saturating_sub(self.eviction_time) + } + + fn is_block_too_old_to_cache(&self, block: BlockNumber, current_block: BlockNumber) -> bool { + !self.should_retain(block, current_block) + } + + fn is_valid_block_stamp(&self, block: BlockNumber, current_block: BlockNumber) -> bool { + if block > current_block { + return false; + } + self.should_retain(block, current_block) + } +} + +#[derive(Debug, Clone)] +struct BalanceEntry { + last_accessed_block: BlockNumber, + updated_block: BlockNumber, + balance: U256, +} + +impl BalanceEntry { + fn new(balance: U256, stamp: BlockNumber) -> Self { + Self { + last_accessed_block: stamp, + updated_block: stamp, + balance, + } + } + + fn update_last_accessed(&mut self, stamp: BlockNumber) { + self.last_accessed_block = self.last_accessed_block.max(stamp); + } + + fn is_within_staleness_tolerance( + &self, + current_block: BlockNumber, + policy: &CachePolicy, + ) -> bool { + policy.is_within_staleness_tolerance(self.updated_block, current_block) + } + + fn should_retain(&self, current_block: BlockNumber, policy: &CachePolicy) -> bool { + policy.should_retain(self.last_accessed_block, current_block) + } + + fn merge_update(&mut self, balance: U256, stamp: BlockNumber) { + if stamp > self.updated_block { + self.updated_block = stamp; + self.balance = balance; + self.update_last_accessed(stamp); + } + } +} + +type SharedResult = Result>; -/// Balances get removed from the cache after this many blocks without being -/// requested. -const EVICTION_TIME: BlockNumber = 5; +struct InFlightEntry { + sender: tokio::sync::watch::Sender>, +} + +impl InFlightEntry { + fn new() -> Self { + let (sender, _receiver) = tokio::sync::watch::channel(None); + Self { sender } + } + + fn subscribe(&self) -> tokio::sync::watch::Receiver> { + self.sender.subscribe() + } + + fn publish(&self, result: SharedResult) { + let _ = self.sender.send(Some(result)); + } +} + +type InFlightResult = tokio::sync::watch::Receiver>; +type InFlightFetchList = Vec<(usize, Query)>; +type InFlightWaitList = Vec<(usize, Query, InFlightResult)>; +type InFlightPartition = (InFlightFetchList, InFlightWaitList); + +struct InFlightGuard { + cache: Arc>, + queries: Vec, +} + +impl InFlightGuard { + fn new(cache: Arc>, queries: Vec) -> Self { + Self { cache, queries } + } +} + +impl Drop for InFlightGuard { + fn drop(&mut self) { + if let Ok(mut cache) = self.cache.lock() { + cache.complete_in_flight(&self.queries); + } + } +} -#[derive(Default)] struct BalanceCache { - last_seen_block: BlockNumber, + current_block: BlockNumber, data: HashMap, + policy: CachePolicy, + in_flight: HashMap, } impl BalanceCache { - /// Retrieves cached balance and updates the `requested_at` field. - fn get_cached_balance(&mut self, query: &Query) -> Option { + fn new(policy: CachePolicy) -> Self { + Self { + current_block: 0, + data: HashMap::new(), + policy, + in_flight: HashMap::new(), + } + } + + fn log_stale_block_warning(&self, incoming_block: BlockNumber, context: &str) { + tracing::debug!( + incoming_block = incoming_block, + current_cached_block = self.current_block, + "{}", + context + ); + } + + fn read_and_touch(&mut self, query: &Query) -> Option { match self.data.get_mut(query) { Some(entry) => { - entry.requested_at = self.last_seen_block; - Some(entry.balance) + if entry.is_within_staleness_tolerance(self.current_block, &self.policy) { + entry.update_last_accessed(self.current_block); + Some(entry.balance) + } else { + None + } } None => None, } } - /// Only updates existing balances. This should always be used in the - /// background task. fn update_balance(&mut self, query: &Query, balance: U256, update_block: BlockNumber) { - if update_block < self.last_seen_block { - // This should never realistically happen. + if update_block < self.current_block { + self.log_stale_block_warning( + update_block, + "update_balance: discarding stale block result", + ); return; } - if let Some(entry) = self.data.get_mut(query) { - entry.updated_at = update_block; + if let Some(entry) = self.data.get_mut(query) + && update_block > entry.updated_block + { + entry.updated_block = update_block; entry.balance = balance; } } - /// Only inserts new balances. This should always be used when we needed to - /// fetch a balance because it was requested by a backend component. - fn insert_balance(&mut self, query: Query, balance: U256, requested_at: BlockNumber) { - self.data.insert( - query, - BalanceEntry { - requested_at, - updated_at: requested_at, - balance, - }, - ); + fn is_block_too_old_to_cache(&self, block: BlockNumber) -> bool { + self.policy + .is_block_too_old_to_cache(block, self.current_block) } -} -#[derive(Debug, Clone)] -struct BalanceEntry { - requested_at: BlockNumber, - updated_at: BlockNumber, - balance: U256, + fn is_valid_block_stamp(&self, block: BlockNumber) -> bool { + self.policy.is_valid_block_stamp(block, self.current_block) + } + + fn merge_results(&mut self, results: &[Result], queries: &[Query], stamp: BlockNumber) { + for (query, result) in queries.iter().zip(results.iter()) { + if let Ok(balance) = result { + self.upsert_balance(query, *balance, stamp); + } + } + } + + fn upsert_balance(&mut self, query: &Query, balance: U256, stamp: BlockNumber) { + if !self.is_valid_block_stamp(stamp) { + if stamp > self.current_block { + tracing::warn!( + stamp = stamp, + current_block = self.current_block, + "upsert_balance: rejecting future block stamp (potential cache poisoning)" + ); + } else { + self.log_stale_block_warning( + stamp, + "upsert_balance: discarding too-old block result", + ); + } + return; + } + + self.data + .entry(query.clone()) + .and_modify(|entry| entry.merge_update(balance, stamp)) + .or_insert_with(|| BalanceEntry::new(balance, stamp)); + } + + fn cleanup_stale_entries(&mut self) { + self.data + .retain(|_, entry| entry.should_retain(self.current_block, &self.policy)); + } + + fn mark_in_flight(&mut self, query: &Query) -> Option { + if let Some(entry) = self.in_flight.get(query) { + Some(entry.subscribe()) + } else { + self.in_flight.insert(query.clone(), InFlightEntry::new()); + None + } + } + + fn complete_in_flight(&mut self, queries: &[Query]) { + for query in queries { + self.in_flight.remove(query); + } + } + + fn store_in_flight_result(&mut self, query: &Query, result: &Result) { + if let Some(entry) = self.in_flight.get(query) { + let shared_result = match result { + Ok(balance) => Ok(*balance), + Err(err) => Err(Arc::from(err.to_string())), + }; + entry.publish(shared_result); + } + } } pub struct Balances { @@ -78,39 +276,290 @@ pub struct Balances { impl Balances { pub fn new(inner: Arc) -> Self { + Self::with_policy(inner, CachePolicy::default()) + } + + pub fn with_policy(inner: Arc, policy: CachePolicy) -> Self { Self { inner, - balance_cache: Default::default(), + balance_cache: Arc::new(Mutex::new(BalanceCache::new(policy))), } } + + fn with_cache_mut(&self, f: F) -> Option + where + F: FnOnce(&mut BalanceCache) -> R, + { + self.balance_cache + .lock() + .map(|mut cache| f(&mut cache)) + .map_err(|err| { + tracing::error!("cache mutex poisoned: {}", err); + }) + .ok() + } + + fn with_cache(&self, f: F) -> Option + where + F: FnOnce(&BalanceCache) -> R, + { + self.balance_cache + .lock() + .map(|cache| f(&cache)) + .map_err(|err| { + tracing::error!("cache mutex poisoned: {}", err); + }) + .ok() + } } struct CacheResponse { - // The indices and results of queries that were in the cache. cached: Vec<(usize, Result)>, - // Indices of queries that were not in the cache. missing: Vec, - requested_at: BlockNumber, } impl Balances { fn get_cached_balances(&self, queries: &[Query]) -> CacheResponse { - let mut cache = self.balance_cache.lock().unwrap(); - let (cached, missing) = queries - .iter() - .enumerate() - .partition_map(|(i, query)| match cache.get_cached_balance(query) { - Some(balance) => itertools::Either::Left((i, Ok(balance))), - None => itertools::Either::Right(i), + self.with_cache_mut(|cache| { + let (cached, missing) = queries.iter().enumerate().partition_map(|(i, query)| { + match cache.read_and_touch(query) { + Some(balance) => itertools::Either::Left((i, Ok(balance))), + None => itertools::Either::Right(i), + } }); - CacheResponse { + CacheResponse { cached, missing } + }) + .unwrap_or_else(|| CacheResponse { + cached: vec![], + missing: queries.iter().enumerate().map(|(i, _)| i).collect(), + }) + } + + fn apply_fetch_results(&self, results: &[Result], queries: &[Query], stamp: BlockNumber) { + let has_any_ok = results.iter().any(|r| r.is_ok()); + if !has_any_ok { + tracing::trace!("skipping cache update: all fetch results failed"); + return; + } + + self.with_cache_mut(|cache| { + if cache.is_block_too_old_to_cache(stamp) { + tracing::debug!( + requested_block = stamp, + current_block = cache.current_block, + "discarding stale results" + ); + return; + } + + cache.merge_results(results, queries, stamp); + }); + } + + async fn handle_block_pinned_request( + &self, + queries: &[Query], + bn: BlockNumber, + ) -> Vec> { + if queries.is_empty() { + return vec![]; + } + + tracing::debug!( + block = bn, + query_count = queries.len(), + "block-pinned fetch starting, cache read and write will be skipped" + ); + self.inner.get_balances(queries, Some(bn)).await + } + + async fn handle_none_request(&self, queries: &[Query]) -> Vec> { + let CacheResponse { cached, missing } = self.get_cached_balances(queries); + + if missing.is_empty() { + return cached.into_iter().map(|(_, result)| result).collect(); + } + + let missing_queries: Vec = missing.iter().map(|i| queries[*i].clone()).collect(); + + let partition_result = self.partition_in_flight(&missing_queries); + let (needs_fetch, needs_wait) = match partition_result { + Ok(partition) => partition, + Err(_) => { + return self + .fetch_and_merge(&missing_queries, &missing, cached, queries.len()) + .await; + } + }; + + let (fetched_results, waited_results) = tokio::join!( + self.fetch_in_flight(needs_fetch), + self.wait_for_in_flight(needs_wait) + ); + + self.merge_in_flight_results( + queries.len(), cached, - missing, - requested_at: cache.last_seen_block, + &missing, + fetched_results, + waited_results, + ) + } + + fn partition_in_flight(&self, missing_queries: &[Query]) -> Result { + match self.balance_cache.lock() { + Ok(mut cache) => Ok(missing_queries + .iter() + .enumerate() + .partition_map(|(i, query)| { + if let Some(cell) = cache.mark_in_flight(query) { + itertools::Either::Right((i, query.clone(), cell)) + } else { + itertools::Either::Left((i, query.clone())) + } + })), + Err(_) => Err(()), + } + } + + async fn fetch_in_flight( + &self, + needs_fetch: InFlightFetchList, + ) -> Vec<((usize, Query), Result)> { + if needs_fetch.is_empty() { + return vec![]; + } + + let fetch_queries: Vec = needs_fetch.iter().map(|(_, q)| q.clone()).collect(); + let fetch_block = self.with_cache(|cache| cache.current_block).unwrap_or(0); + + let _guard = InFlightGuard::new(self.balance_cache.clone(), fetch_queries.clone()); + + let results = self + .inner + .get_balances(&fetch_queries, Some(fetch_block)) + .await; + + debug_assert_eq!( + results.len(), + fetch_queries.len(), + "get_balances contract violation" + ); + + self.publish_in_flight_results(&fetch_queries, &results); + self.apply_fetch_results(&results, &fetch_queries, fetch_block); + + needs_fetch.into_iter().zip(results).collect() + } + + fn publish_in_flight_results(&self, queries: &[Query], results: &[Result]) { + self.with_cache_mut(|cache| { + for (query, result) in queries.iter().zip(results.iter()) { + cache.store_in_flight_result(query, result); + } + }); + } + + async fn wait_for_in_flight(&self, needs_wait: InFlightWaitList) -> Vec<(usize, Result)> { + if needs_wait.is_empty() { + return vec![]; } + + let mut results = vec![]; + for (i, _query, mut receiver) in needs_wait { + let result = self.await_in_flight_result(&mut receiver).await; + results.push((i, result)); + } + results + } + + async fn await_in_flight_result(&self, receiver: &mut InFlightResult) -> Result { + let wait_result = receiver.wait_for(|opt| opt.is_some()).await; + + if wait_result.is_err() { + return Err(anyhow::anyhow!("in-flight request cancelled")); + } + + drop(wait_result); + + let shared_result = receiver + .borrow_and_update() + .as_ref() + .expect("value must be Some after wait_for") + .clone(); + + match shared_result { + Ok(balance) => Ok(balance), + Err(err_str) => Err(anyhow::anyhow!("{}", err_str)), + } + } + + fn merge_in_flight_results( + &self, + total_len: usize, + cached: Vec<(usize, Result)>, + missing: &[usize], + fetched_results: Vec<((usize, Query), Result)>, + waited_results: Vec<(usize, Result)>, + ) -> Vec> { + let mut results: Vec>> = (0..total_len).map(|_| None).collect(); + + for (i, result) in cached { + results[i] = Some(result); + } + + for ((i, _), result) in fetched_results { + results[missing[i]] = Some(result); + } + + for (i, result) in waited_results { + results[missing[i]] = Some(result); + } + + results + .into_iter() + .map(|r| r.expect("all indices must be filled")) + .collect() + } + + async fn fetch_and_merge( + &self, + missing_queries: &[Query], + missing: &[usize], + cached: Vec<(usize, Result)>, + total_len: usize, + ) -> Vec> { + let fetch_block = self.with_cache(|cache| cache.current_block).unwrap_or(0); + + let new_balances = self + .inner + .get_balances(missing_queries, Some(fetch_block)) + .await; + + debug_assert_eq!( + new_balances.len(), + missing_queries.len(), + "get_balances contract violation" + ); + + self.apply_fetch_results(&new_balances, missing_queries, fetch_block); + + let mut results: Vec>> = (0..total_len).map(|_| None).collect(); + + for (i, result) in cached { + results[i] = Some(result); + } + + for (i, result) in missing.iter().zip(new_balances) { + results[*i] = Some(result); + } + + results + .into_iter() + .map(|r| r.expect("all indices must be filled")) + .collect() } - /// Spawns task that refreshes the cached balances on every new block. pub fn spawn_background_task(&self, block_stream: CurrentBlockWatcher) { let inner = self.inner.clone(); let cache = self.balance_cache.clone(); @@ -120,34 +569,48 @@ impl Balances { while let Some(block) = stream.next().await { let balances_to_update = { let mut cache = cache.lock().unwrap(); - cache.last_seen_block = block.number; + cache.current_block = block.number; + let policy = cache.policy; cache .data .iter() .filter_map(|(query, entry)| { - // Only update balances that have been requested recently. - let oldest_allowed_request = - cache.last_seen_block.saturating_sub(EVICTION_TIME); - (entry.requested_at >= oldest_allowed_request).then_some(query.clone()) + entry + .should_retain(block.number, &policy) + .then_some(query.clone()) }) .collect_vec() }; - let results = inner.get_balances(&balances_to_update).await; + let results = if !balances_to_update.is_empty() { + Some( + inner + .get_balances(&balances_to_update, Some(block.number)) + .await, + ) + } else { + None + }; let mut cache = cache.lock().unwrap(); - balances_to_update - .into_iter() - .zip(results) - .for_each(|(query, result)| { - if let Ok(balance) = result { - cache.update_balance(&query, balance, block.number); + if let Some(results) = results { + for (query, result) in balances_to_update.into_iter().zip(results) { + match result { + Ok(balance) => { + cache.update_balance(&query, balance, block.number); + } + Err(err) => { + tracing::warn!( + ?query, + error = ?err, + block = block.number, + "background balance update failed" + ); + } } - }); - cache.data.retain(|_, value| { - // Only keep balances where we know we have the most recent data. - value.updated_at >= block.number - }); + } + } + cache.cleanup_stale_entries(); } tracing::error!("block stream terminated unexpectedly"); }; @@ -158,32 +621,15 @@ impl Balances { #[async_trait::async_trait] impl BalanceFetching for Balances { #[instrument(skip_all)] - async fn get_balances(&self, queries: &[Query]) -> Vec> { - let CacheResponse { - mut cached, - missing, - requested_at, - } = self.get_cached_balances(queries); - - if missing.is_empty() { - return cached.into_iter().map(|(_, result)| result).collect(); - } - - let missing_queries: Vec = missing.iter().map(|i| queries[*i].clone()).collect(); - let new_balances = self.inner.get_balances(&missing_queries).await; - - { - let mut cache = self.balance_cache.lock().unwrap(); - for (query, result) in missing_queries.into_iter().zip(new_balances.iter()) { - if let Ok(balance) = result { - cache.insert_balance(query, *balance, requested_at) - } - } + async fn get_balances( + &self, + queries: &[Query], + block_number: Option, + ) -> Vec> { + match block_number { + Some(bn) => self.handle_block_pinned_request(queries, bn).await, + None => self.handle_none_request(queries).await, } - - cached.extend(missing.into_iter().zip(new_balances)); - cached.sort_by_key(|(i, _)| *i); - cached.into_iter().map(|(_, balance)| balance).collect() } async fn can_transfer( @@ -217,21 +663,27 @@ mod tests { } } + impl CachePolicy { + fn eviction_time_for_test(&self) -> BlockNumber { + self.eviction_time + } + } + #[tokio::test] async fn caches_ok_results() { let mut inner = MockBalanceFetching::new(); inner .expect_get_balances() .times(1) - .withf(|arg| arg == [query(1)]) - .returning(|_| vec![Ok(U256::ONE)]); + .withf(|queries, block| queries == [query(1)] && *block == Some(0)) + .returning(|_, _| vec![Ok(U256::ONE)]); let fetcher = Balances::new(Arc::new(inner)); // 1st call to `inner`. - let result = fetcher.get_balances(&[query(1)]).await; + let result = fetcher.get_balances(&[query(1)], None).await; assert_eq!(result[0].as_ref().unwrap(), &U256::ONE); // Fetches balance from cache and skips calling `inner`. - let result = fetcher.get_balances(&[query(1)]).await; + let result = fetcher.get_balances(&[query(1)], None).await; assert_eq!(result[0].as_ref().unwrap(), &U256::ONE); } @@ -241,14 +693,14 @@ mod tests { inner .expect_get_balances() .times(2) - .withf(|arg| arg == [query(1)]) - .returning(|_| vec![Err(anyhow::anyhow!("some error"))]); + .withf(|queries, block| queries == [query(1)] && *block == Some(0)) + .returning(|_, _| vec![Err(anyhow::anyhow!("some error"))]); let fetcher = Balances::new(Arc::new(inner)); // 1st call to `inner`. - assert!(fetcher.get_balances(&[query(1)]).await[0].is_err()); + assert!(fetcher.get_balances(&[query(1)], None).await[0].is_err()); // 2nd call to `inner`. - assert!(fetcher.get_balances(&[query(1)]).await[0].is_err()); + assert!(fetcher.get_balances(&[query(1)], None).await[0].is_err()); } #[tokio::test] @@ -259,30 +711,37 @@ mod tests { let mut inner = MockBalanceFetching::new(); inner .expect_get_balances() - .times(2) - .withf(|arg| arg == [query(1)]) - .returning(|_| vec![Ok(U256::ONE)]); + .times(1) + .withf(|queries, block| queries == [query(1)] && *block == Some(0)) + .returning(|_, _| vec![Ok(U256::ONE)]); + inner + .expect_get_balances() + .times(1) + .withf(|queries, block| queries == [query(1)] && *block == Some(1)) + .returning(|_, _| vec![Ok(U256::ONE)]); let fetcher = Balances::new(Arc::new(inner)); fetcher.spawn_background_task(receiver); - // 1st call to `inner`. Balance gets cached. - let result = fetcher.get_balances(&[query(1)]).await; + let result = fetcher.get_balances(&[query(1)], None).await; assert_eq!(result[0].as_ref().unwrap(), &U256::ONE); - // New block gets detected. sender .send(BlockInfo { number: 1, ..Default::default() }) .unwrap(); - // Wait for block to be noticed and cache to be updated. (2nd call to inner) - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + { + let cache = fetcher.balance_cache.lock().unwrap(); + let entry = cache.data.get(&query(1)).unwrap(); + assert_eq!(entry.balance, U256::ONE); + assert_eq!(entry.updated_block, 1); + } - // Balance was already updated so this will hit the cache and skip calling - // `inner`. - let result = fetcher.get_balances(&[query(1)]).await; + let result = fetcher.get_balances(&[query(1)], None).await; assert_eq!(result[0].as_ref().unwrap(), &U256::ONE); } @@ -292,26 +751,26 @@ mod tests { inner .expect_get_balances() .times(1) - .withf(|arg| arg == [query(1)]) - .returning(|_| vec![Ok(U256::ONE)]); + .withf(|queries, block| queries == [query(1)] && *block == Some(0)) + .returning(|_, _| vec![Ok(U256::ONE)]); inner .expect_get_balances() .times(1) - .withf(|arg| arg == [query(2)]) - .returning(|_| vec![Ok(U256::from(2))]); + .withf(|queries, block| queries == [query(2)] && *block == Some(0)) + .returning(|_, _| vec![Ok(U256::from(2))]); let fetcher = Balances::new(Arc::new(inner)); // 1st call to `inner` putting balance 1 into the cache. - let result = fetcher.get_balances(&[query(1)]).await; + let result = fetcher.get_balances(&[query(1)], None).await; assert_eq!(result[0].as_ref().unwrap(), &U256::ONE); // Fetches balance 1 from cache and balance 2 fresh. (2nd call to `inner`) - let result = fetcher.get_balances(&[query(1), query(2)]).await; + let result = fetcher.get_balances(&[query(1), query(2)], None).await; assert_eq!(result[0].as_ref().unwrap(), &U256::ONE); assert_eq!(result[1].as_ref().unwrap(), &U256::from(2)); // Now balance 2 is also in the cache. Skipping call to `inner`. - let result = fetcher.get_balances(&[query(2)]).await; + let result = fetcher.get_balances(&[query(2)], None).await; assert_eq!(result[0].as_ref().unwrap(), &U256::from(2)); } @@ -320,13 +779,14 @@ mod tests { let first_block = BlockInfo::default(); let (sender, receiver) = tokio::sync::watch::channel(first_block); + let policy = CachePolicy::default(); let mut inner = MockBalanceFetching::new(); inner .expect_get_balances() - .times(7) - .returning(|_| vec![Ok(U256::ONE)]); + .times(6) + .returning(|_, _| vec![Ok(U256::ONE)]); - let fetcher = Balances::new(Arc::new(inner)); + let fetcher = Balances::with_policy(Arc::new(inner), policy); fetcher.spawn_background_task(receiver); let cached_entry = || { @@ -336,10 +796,10 @@ mod tests { assert!(cached_entry().is_none()); // 1st call to `inner`. Balance gets cached. - let result = fetcher.get_balances(&[query(1)]).await; + let result = fetcher.get_balances(&[query(1)], None).await; assert_eq!(result[0].as_ref().unwrap(), &U256::ONE); - for block in 1..=EVICTION_TIME + 1 { + for block in 1..=policy.eviction_time_for_test() + 1 { assert!(cached_entry().is_some()); // New block gets detected. sender @@ -348,7 +808,7 @@ mod tests { ..Default::default() }) .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } assert!(cached_entry().is_none()); } diff --git a/crates/account-balances/src/lib.rs b/crates/account-balances/src/lib.rs index 0eaf66ce16..111f39aaec 100644 --- a/crates/account-balances/src/lib.rs +++ b/crates/account-balances/src/lib.rs @@ -15,6 +15,8 @@ use { mod cached; mod simulation; +pub type BlockNumber = u64; + #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct Query { pub owner: Address, @@ -55,16 +57,12 @@ impl From for TransferSimulationError { #[cfg_attr(any(test, feature = "test-util"), mockall::automock)] #[async_trait::async_trait] pub trait BalanceFetching: Send + Sync { - // Returns the balance available to the allowance manager for the given owner - // and token taking both balance as well as "allowance" into account. - async fn get_balances(&self, queries: &[Query]) -> Vec>; - - // Check that the settlement contract can make use of this user's token balance. - // This check could fail if the user does not have enough balance, has not - // given the allowance to the allowance manager or if the token does not - // allow freely transferring amounts around for example if it is paused - // or takes a fee on transfer. If the node supports the trace_callMany we - // can perform more extensive tests. + async fn get_balances( + &self, + queries: &[Query], + block_number: Option, + ) -> Vec>; + async fn can_transfer( &self, query: &Query, @@ -88,6 +86,27 @@ pub fn cached( cached } +#[derive(Clone, Debug, Default)] +pub(crate) struct SimulateParams { + amount: Option, + balance_override: Option, + block_number: Option, +} + +impl SimulateParams { + pub(crate) fn new( + amount: Option, + balance_override: Option, + block_number: Option, + ) -> Self { + Self { + amount, + balance_override, + block_number, + } + } +} + #[derive(Clone)] pub struct BalanceSimulator { settlement: GPv2Settlement::Instance, @@ -122,15 +141,24 @@ impl BalanceSimulator { self.vault } - pub async fn simulate( + fn block_id_from_number(block_number: Option) -> alloy_rpc_types::BlockId { + block_number + .map(alloy_rpc_types::BlockId::number) + .unwrap_or_else(alloy_rpc_types::BlockId::latest) + } + + pub(crate) async fn simulate( &self, owner: Address, token: Address, source: SellTokenSource, interactions: &[InteractionData], - amount: Option, - balance_override: Option, + params: SimulateParams, ) -> Result { + let amount = params.amount; + let balance_override = params.balance_override; + let block_number = params.block_number; + let overrides: StateOverride = match balance_override { Some(overrides) => self .balance_overrider @@ -140,13 +168,6 @@ impl BalanceSimulator { .collect(), None => Default::default(), }; - // We simulate the balances from the Settlement contract's context. This - // allows us to check: - // 1. How the pre-interactions would behave as part of the settlement - // 2. Simulate the actual VaultRelayer transfers that would happen as part of a - // settlement - // - // This allows us to end up with very accurate balance simulations. let balance_call = Balances::Balances::balanceCall { contracts: Balances::Balances::Contracts { settlement: *self.settlement.address(), @@ -167,14 +188,17 @@ impl BalanceSimulator { .collect(), }; - let response = self + let block_id = Self::block_id_from_number(block_number); + + let call_builder = self .settlement .simulateDelegatecall(*self.balances.address(), balance_call.abi_encode().into()) .with_cloned_provider() .state(overrides) .from(*SIMULATION_ACCOUNT) - .call() - .await?; + .block(block_id); + + let response = call_builder.call().await?; let (token_balance, allowance, effective_balance, can_transfer, transfer_revert_reason) = <( diff --git a/crates/account-balances/src/simulation.rs b/crates/account-balances/src/simulation.rs index b4f3ebe857..6386848a37 100644 --- a/crates/account-balances/src/simulation.rs +++ b/crates/account-balances/src/simulation.rs @@ -4,8 +4,9 @@ use { super::{BalanceFetching, Query, TransferSimulationError}, - crate::BalanceSimulator, + crate::{BalanceSimulator, BlockNumber, SimulateParams}, alloy_primitives::{Address, U256}, + alloy_rpc_types::BlockId, anyhow::Result, contracts::{BalancerV2Vault::BalancerV2Vault, ERC20}, ethrpc::{Web3, alloy::ProviderLabelingExt}, @@ -21,13 +22,6 @@ pub struct Balances { impl Balances { pub fn new(web3: &Web3, balance_simulator: BalanceSimulator) -> Self { - // Note that the balances simulation **will fail** if the `vault` - // address is not a contract and the `source` is set to one of - // `SellTokenSource::{External, Internal}` (i.e. the Vault contract is - // needed). This is because Solidity generates code to verify that - // contracts exist at addresses that get called. This allows us to - // properly check if the `source` is not supported for the deployment - // work without additional code paths :tada:! let web3 = web3.labeled("balanceFetching"); Self { @@ -44,7 +38,17 @@ impl Balances { self.balance_simulator.vault } - async fn tradable_balance_simulated(&self, query: &Query) -> Result { + fn block_id_from_number(block_number: Option) -> BlockId { + block_number + .map(BlockId::number) + .unwrap_or_else(BlockId::latest) + } + + async fn tradable_balance_simulated( + &self, + query: &Query, + block_number: Option, + ) -> Result { let simulation = self .balance_simulator .simulate( @@ -52,8 +56,7 @@ impl Balances { query.token, query.source, &query.interactions, - None, - query.balance_override.clone(), + SimulateParams::new(None, query.balance_override.clone(), block_number), ) .await?; Ok(if simulation.can_transfer { @@ -67,26 +70,33 @@ impl Balances { &self, query: &Query, token: &ERC20::Instance, + block_number: Option, ) -> Result { + let block_id = Self::block_id_from_number(block_number); + let usable_balance = match query.source { SellTokenSource::Erc20 => { - let balance = token.balanceOf(query.owner); - let allowance = token.allowance(query.owner, self.vault_relayer()); + let balance_call = token.balanceOf(query.owner).block(block_id); + let allowance_call = token + .allowance(query.owner, self.vault_relayer()) + .block(block_id); let (balance, allowance) = futures::try_join!( - balance.call().into_future(), - allowance.call().into_future() + balance_call.call().into_future(), + allowance_call.call().into_future() )?; std::cmp::min(balance, allowance) } SellTokenSource::External => { let vault = BalancerV2Vault::new(self.vault(), &self.web3.provider); - let balance = token.balanceOf(query.owner); - let approved = vault.hasApprovedRelayer(query.owner, self.vault_relayer()); - let allowance = token.allowance(query.owner, self.vault()); + let balance_call = token.balanceOf(query.owner).block(block_id); + let approved_call = vault + .hasApprovedRelayer(query.owner, self.vault_relayer()) + .block(block_id); + let allowance_call = token.allowance(query.owner, self.vault()).block(block_id); let (balance, approved, allowance) = futures::try_join!( - balance.call().into_future(), - approved.call().into_future(), - allowance.call().into_future() + balance_call.call().into_future(), + approved_call.call().into_future(), + allowance_call.call().into_future() )?; match approved { true => std::cmp::min(balance, allowance), @@ -95,14 +105,19 @@ impl Balances { } SellTokenSource::Internal => { let vault = BalancerV2Vault::new(self.vault(), &self.web3.provider); - let balance = vault.getInternalBalance(query.owner, vec![query.token]); - let approved = vault.hasApprovedRelayer(query.owner, self.vault_relayer()); + let tokens = vec![query.token]; + let balance_call = vault + .getInternalBalance(query.owner, tokens) + .block(block_id); + let approved_call = vault + .hasApprovedRelayer(query.owner, self.vault_relayer()) + .block(block_id); let (balance, approved) = futures::try_join!( - balance.call().into_future(), - approved.call().into_future() + balance_call.call().into_future(), + approved_call.call().into_future() )?; match approved { - true => balance[0], // internal approvals are always U256::MAX + true => balance[0], false => U256::ZERO, } } @@ -114,16 +129,20 @@ impl Balances { #[async_trait::async_trait] impl BalanceFetching for Balances { #[instrument(skip_all)] - async fn get_balances(&self, queries: &[Query]) -> Vec> { - // TODO(nlordell): Use `Multicall` here to use fewer node round-trips + async fn get_balances( + &self, + queries: &[Query], + block_number: Option, + ) -> Vec> { let futures = queries .iter() .map(|query| async { if query.interactions.is_empty() { let token = ERC20::Instance::new(query.token, self.web3.provider.clone()); - self.tradable_balance_simple(query, &token).await + self.tradable_balance_simple(query, &token, block_number) + .await } else { - self.tradable_balance_simulated(query).await + self.tradable_balance_simulated(query, block_number).await } }) .collect::>(); @@ -143,8 +162,7 @@ impl BalanceFetching for Balances { query.token, query.source, &query.interactions, - Some(amount), - query.balance_override.clone(), + SimulateParams::new(Some(amount), query.balance_override.clone(), None), ) .await .map_err(|err| TransferSimulationError::Other(err.into()))?; diff --git a/crates/autopilot/src/solvable_orders.rs b/crates/autopilot/src/solvable_orders.rs index dd3007b7e1..cbb5cb1cd1 100644 --- a/crates/autopilot/src/solvable_orders.rs +++ b/crates/autopilot/src/solvable_orders.rs @@ -4,7 +4,7 @@ use { domain::{self, auction::Price}, infra::{self, banned}, }, - account_balances::{BalanceFetching, Query}, + account_balances::{BalanceFetching, BlockNumber, Query}, alloy::primitives::{Address, U256}, anyhow::{Context, Result}, bad_tokens::list_based::DenyListedTokens, @@ -231,12 +231,16 @@ impl SolvableOrdersCache { .collect(); let (balances, orders, cow_amms, in_flight) = { - let queries = orders - .iter() - .map(|o| Query::from_order(o)) - .collect::>(); + let queries = if self.disable_order_balance_filter { + vec![] + } else { + orders + .iter() + .map(|o| Query::from_order(o)) + .collect::>() + }; tokio::join!( - self.fetch_balances(queries), + self.fetch_balances(queries, block), self.filter_invalid_orders(orders, &mut invalid_order_uids), self.timed_future("cow_amm_registry", self.cow_amm_registry.amms()), self.fetch_in_flight_orders(block), @@ -380,18 +384,24 @@ impl SolvableOrdersCache { .collect() } - async fn fetch_balances(&self, queries: Vec) -> HashMap { + async fn fetch_balances( + &self, + queries: Vec, + block: BlockNumber, + ) -> HashMap { let fetched_balances = self .timed_future( "balance_filtering", - self.balance_fetcher.get_balances(&queries), + self.balance_fetcher.get_balances(&queries, Some(block)), ) .await; - if self.disable_order_balance_filter { + + if queries.is_empty() { + tracing::trace!(block = %block, "no balance queries for solvable orders at block"); return Default::default(); } - tracing::trace!("fetched balances for solvable orders"); + tracing::trace!(block = %block, query_count = queries.len(), "fetched balances for solvable orders at specific block"); queries .into_iter() .zip(fetched_balances) @@ -403,7 +413,8 @@ impl SolvableOrdersCache { token = ?query.token, source = ?query.source, error = ?err, - "failed to get balance" + block = %block, + "failed to get balance at block" ); None } diff --git a/crates/driver/src/domain/competition/pre_processing.rs b/crates/driver/src/domain/competition/pre_processing.rs index d4ae216b8e..eaa3325154 100644 --- a/crates/driver/src/domain/competition/pre_processing.rs +++ b/crates/driver/src/domain/competition/pre_processing.rs @@ -329,7 +329,7 @@ impl Utilities { }) .collect::>(); - let balances = self.balance_fetcher.get_balances(&queries).await; + let balances = self.balance_fetcher.get_balances(&queries, None).await; let result: HashMap<_, _> = queries .into_iter() From ab4d76fcca16a6eda3a080499486ed11be963baf Mon Sep 17 00:00:00 2001 From: ashleychandy Date: Tue, 16 Jun 2026 22:53:30 +0000 Subject: [PATCH 2/4] skip balance fetching when balance filter is disabled --- crates/autopilot/src/solvable_orders.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/autopilot/src/solvable_orders.rs b/crates/autopilot/src/solvable_orders.rs index cbb5cb1cd1..32b1b84c3a 100644 --- a/crates/autopilot/src/solvable_orders.rs +++ b/crates/autopilot/src/solvable_orders.rs @@ -230,15 +230,18 @@ impl SolvableOrdersCache { .map(|order| order.metadata.uid) .collect(); - let (balances, orders, cow_amms, in_flight) = { - let queries = if self.disable_order_balance_filter { - vec![] - } else { - orders - .iter() - .map(|o| Query::from_order(o)) - .collect::>() - }; + let (balances, orders, cow_amms, in_flight) = if self.disable_order_balance_filter { + let (orders, cow_amms, in_flight) = tokio::join!( + self.filter_invalid_orders(orders, &mut invalid_order_uids), + self.timed_future("cow_amm_registry", self.cow_amm_registry.amms()), + self.fetch_in_flight_orders(block), + ); + (HashMap::new(), orders, cow_amms, in_flight) + } else { + let queries = orders + .iter() + .map(|o| Query::from_order(o)) + .collect::>(); tokio::join!( self.fetch_balances(queries, block), self.filter_invalid_orders(orders, &mut invalid_order_uids), From 3e59770af2521ccf4618e42f436f73fd42828d8c Mon Sep 17 00:00:00 2001 From: ashleychandy Date: Tue, 16 Jun 2026 23:00:11 +0000 Subject: [PATCH 3/4] handle poisoned mutex in cache background task --- crates/account-balances/src/cached.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/crates/account-balances/src/cached.rs b/crates/account-balances/src/cached.rs index b608402313..39eb035ec8 100644 --- a/crates/account-balances/src/cached.rs +++ b/crates/account-balances/src/cached.rs @@ -568,7 +568,17 @@ impl Balances { let task = async move { while let Some(block) = stream.next().await { let balances_to_update = { - let mut cache = cache.lock().unwrap(); + let mut cache = match cache.lock() { + Ok(guard) => guard, + Err(err) => { + tracing::error!( + block = block.number, + error = %err, + "cache mutex poisoned in background task, skipping block update" + ); + continue; + } + }; cache.current_block = block.number; let policy = cache.policy; cache @@ -592,7 +602,17 @@ impl Balances { None }; - let mut cache = cache.lock().unwrap(); + let mut cache = match cache.lock() { + Ok(guard) => guard, + Err(err) => { + tracing::error!( + block = block.number, + error = %err, + "cache mutex poisoned in background task, skipping balance updates" + ); + continue; + } + }; if let Some(results) = results { for (query, result) in balances_to_update.into_iter().zip(results) { match result { From 7b81ff951641009e14e527afa69b95ec799d79d4 Mon Sep 17 00:00:00 2001 From: ashleychandy Date: Thu, 18 Jun 2026 22:36:59 +0000 Subject: [PATCH 4/4] fetching with concurrent waiting --- crates/account-balances/src/cached.rs | 210 ++++++++++++++++++++++---- 1 file changed, 177 insertions(+), 33 deletions(-) diff --git a/crates/account-balances/src/cached.rs b/crates/account-balances/src/cached.rs index 39eb035ec8..465f17e94c 100644 --- a/crates/account-balances/src/cached.rs +++ b/crates/account-balances/src/cached.rs @@ -22,14 +22,40 @@ impl Default for CachePolicy { fn default() -> Self { Self { staleness_tolerance: Self::DEFAULT_STALENESS_TOLERANCE, - eviction_time: 5, + eviction_time: Self::DEFAULT_EVICTION_TIME, } } } impl CachePolicy { + const DEFAULT_EVICTION_TIME: BlockNumber = 5; const DEFAULT_STALENESS_TOLERANCE: BlockNumber = 1; + pub fn new(staleness_tolerance: BlockNumber, eviction_time: BlockNumber) -> Self { + Self { + staleness_tolerance, + eviction_time, + } + } + + pub fn with_staleness_tolerance(mut self, tolerance: BlockNumber) -> Self { + self.staleness_tolerance = tolerance; + self + } + + pub fn with_eviction_time(mut self, time: BlockNumber) -> Self { + self.eviction_time = time; + self + } + + pub fn staleness_tolerance(&self) -> BlockNumber { + self.staleness_tolerance + } + + pub fn eviction_time(&self) -> BlockNumber { + self.eviction_time + } + fn is_within_staleness_tolerance( &self, updated_block: BlockNumber, @@ -96,7 +122,9 @@ impl BalanceEntry { } } -type SharedResult = Result>; +type SharedResult = Result>; + +type InFlightKey = (Query, Option); struct InFlightEntry { sender: tokio::sync::watch::Sender>, @@ -124,19 +152,19 @@ type InFlightPartition = (InFlightFetchList, InFlightWaitList); struct InFlightGuard { cache: Arc>, - queries: Vec, + keys: Vec, } impl InFlightGuard { - fn new(cache: Arc>, queries: Vec) -> Self { - Self { cache, queries } + fn new(cache: Arc>, keys: Vec) -> Self { + Self { cache, keys } } } impl Drop for InFlightGuard { fn drop(&mut self) { if let Ok(mut cache) = self.cache.lock() { - cache.complete_in_flight(&self.queries); + cache.complete_in_flight(&self.keys); } } } @@ -145,7 +173,7 @@ struct BalanceCache { current_block: BlockNumber, data: HashMap, policy: CachePolicy, - in_flight: HashMap, + in_flight: HashMap, } impl BalanceCache { @@ -243,26 +271,37 @@ impl BalanceCache { .retain(|_, entry| entry.should_retain(self.current_block, &self.policy)); } - fn mark_in_flight(&mut self, query: &Query) -> Option { - if let Some(entry) = self.in_flight.get(query) { + fn mark_in_flight( + &mut self, + query: &Query, + block: Option, + ) -> Option { + let key = (query.clone(), block); + if let Some(entry) = self.in_flight.get(&key) { Some(entry.subscribe()) } else { - self.in_flight.insert(query.clone(), InFlightEntry::new()); + self.in_flight.insert(key, InFlightEntry::new()); None } } - fn complete_in_flight(&mut self, queries: &[Query]) { - for query in queries { - self.in_flight.remove(query); + fn complete_in_flight(&mut self, keys: &[InFlightKey]) { + for key in keys { + self.in_flight.remove(key); } } - fn store_in_flight_result(&mut self, query: &Query, result: &Result) { - if let Some(entry) = self.in_flight.get(query) { + fn store_in_flight_result( + &mut self, + query: &Query, + block: Option, + result: &Result, + ) { + let key = (query.clone(), block); + if let Some(entry) = self.in_flight.get(&key) { let shared_result = match result { Ok(balance) => Ok(*balance), - Err(err) => Err(Arc::from(err.to_string())), + Err(err) => Err(Arc::new(anyhow::anyhow!("{:#}", err))), }; entry.publish(shared_result); } @@ -365,12 +404,75 @@ impl Balances { return vec![]; } + let (needs_fetch_queries, needs_wait_list) = self + .with_cache_mut(|cache| { + let (needs_fetch, needs_wait): (Vec<_>, Vec<_>) = + queries.iter().enumerate().partition_map(|(i, query)| { + if let Some(receiver) = cache.mark_in_flight(query, Some(bn)) { + itertools::Either::Right((i, query.clone(), receiver)) + } else { + itertools::Either::Left((i, query.clone())) + } + }); + (needs_fetch, needs_wait) + }) + .unwrap_or_else(|| { + ( + queries + .iter() + .enumerate() + .map(|(i, q)| (i, q.clone())) + .collect(), + vec![], + ) + }); + + if needs_fetch_queries.is_empty() { + tracing::trace!( + block = bn, + query_count = queries.len(), + "block-pinned request: all queries in-flight, waiting" + ); + let waited_results = self.wait_for_in_flight(needs_wait_list).await; + return self.reconstruct_results(queries.len(), vec![], waited_results); + } + tracing::debug!( block = bn, query_count = queries.len(), - "block-pinned fetch starting, cache read and write will be skipped" + fetch_count = needs_fetch_queries.len(), + wait_count = needs_wait_list.len(), + "block-pinned request: fetching and waiting (no caching)" + ); + + let (fetched_results, waited_results) = tokio::join!( + self.fetch_in_flight_for_block(needs_fetch_queries, bn), + self.wait_for_in_flight(needs_wait_list) ); - self.inner.get_balances(queries, Some(bn)).await + + self.reconstruct_results(queries.len(), fetched_results, waited_results) + } + + fn reconstruct_results( + &self, + total_len: usize, + fetched_results: Vec<((usize, Query), Result)>, + waited_results: Vec<(usize, Result)>, + ) -> Vec> { + let mut results: Vec>> = (0..total_len).map(|_| None).collect(); + + for ((i, _), result) in fetched_results { + results[i] = Some(result); + } + + for (i, result) in waited_results { + results[i] = Some(result); + } + + results + .into_iter() + .map(|r| r.expect("all indices must be filled")) + .collect() } async fn handle_none_request(&self, queries: &[Query]) -> Vec> { @@ -382,7 +484,7 @@ impl Balances { let missing_queries: Vec = missing.iter().map(|i| queries[*i].clone()).collect(); - let partition_result = self.partition_in_flight(&missing_queries); + let partition_result = self.partition_in_flight(&missing_queries, None); let (needs_fetch, needs_wait) = match partition_result { Ok(partition) => partition, Err(_) => { @@ -406,13 +508,17 @@ impl Balances { ) } - fn partition_in_flight(&self, missing_queries: &[Query]) -> Result { + fn partition_in_flight( + &self, + missing_queries: &[Query], + block: Option, + ) -> Result { match self.balance_cache.lock() { Ok(mut cache) => Ok(missing_queries .iter() .enumerate() .partition_map(|(i, query)| { - if let Some(cell) = cache.mark_in_flight(query) { + if let Some(cell) = cache.mark_in_flight(query, block) { itertools::Either::Right((i, query.clone(), cell)) } else { itertools::Either::Left((i, query.clone())) @@ -433,7 +539,8 @@ impl Balances { let fetch_queries: Vec = needs_fetch.iter().map(|(_, q)| q.clone()).collect(); let fetch_block = self.with_cache(|cache| cache.current_block).unwrap_or(0); - let _guard = InFlightGuard::new(self.balance_cache.clone(), fetch_queries.clone()); + let keys: Vec = fetch_queries.iter().map(|q| (q.clone(), None)).collect(); + let _guard = InFlightGuard::new(self.balance_cache.clone(), keys); let results = self .inner @@ -446,16 +553,51 @@ impl Balances { "get_balances contract violation" ); - self.publish_in_flight_results(&fetch_queries, &results); + self.publish_in_flight_results(&fetch_queries, &results, None); self.apply_fetch_results(&results, &fetch_queries, fetch_block); needs_fetch.into_iter().zip(results).collect() } - fn publish_in_flight_results(&self, queries: &[Query], results: &[Result]) { + async fn fetch_in_flight_for_block( + &self, + needs_fetch: InFlightFetchList, + block: BlockNumber, + ) -> Vec<((usize, Query), Result)> { + if needs_fetch.is_empty() { + return vec![]; + } + + let fetch_queries: Vec = needs_fetch.iter().map(|(_, q)| q.clone()).collect(); + + let keys: Vec = fetch_queries + .iter() + .map(|q| (q.clone(), Some(block))) + .collect(); + let _guard = InFlightGuard::new(self.balance_cache.clone(), keys); + + let results = self.inner.get_balances(&fetch_queries, Some(block)).await; + + debug_assert_eq!( + results.len(), + fetch_queries.len(), + "get_balances contract violation" + ); + + self.publish_in_flight_results(&fetch_queries, &results, Some(block)); + + needs_fetch.into_iter().zip(results).collect() + } + + fn publish_in_flight_results( + &self, + queries: &[Query], + results: &[Result], + block: Option, + ) { self.with_cache_mut(|cache| { for (query, result) in queries.iter().zip(results.iter()) { - cache.store_in_flight_result(query, result); + cache.store_in_flight_result(query, block, result); } }); } @@ -465,12 +607,14 @@ impl Balances { return vec![]; } - let mut results = vec![]; - for (i, _query, mut receiver) in needs_wait { - let result = self.await_in_flight_result(&mut receiver).await; - results.push((i, result)); - } - results + let futures = needs_wait + .into_iter() + .map(|(i, _query, mut receiver)| async move { + let result = self.await_in_flight_result(&mut receiver).await; + (i, result) + }); + + futures::future::join_all(futures).await } async fn await_in_flight_result(&self, receiver: &mut InFlightResult) -> Result { @@ -490,7 +634,7 @@ impl Balances { match shared_result { Ok(balance) => Ok(balance), - Err(err_str) => Err(anyhow::anyhow!("{}", err_str)), + Err(err) => Err(anyhow::anyhow!("{:#}", err)), } } @@ -685,7 +829,7 @@ mod tests { impl CachePolicy { fn eviction_time_for_test(&self) -> BlockNumber { - self.eviction_time + self.eviction_time() } }