Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 96 additions & 6 deletions src/tasks/cache/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,29 @@ impl CacheTask {
Self { envs: env, bundles, txns }
}

/// Returns the rollup block number and basefee of the current sim env, or
/// `(0, 0)` if no env has been published yet.
///
/// Read at ingest time rather than cached across `select!` iterations: the
/// loop re-enters `select!` for every channel event, so a value assigned in
/// the `envs.changed()` arm is not visible when a bundle or transaction is
/// later received in a different arm. Reading the live env here ensures
/// items are ranked against the correct basefee.
fn current_block_and_basefee(&self) -> (u64, u64) {
self.envs
.borrow()
.as_ref()
.map(|env| {
let rollup = env.rollup_env();
(rollup.number.to::<u64>(), rollup.basefee)
})
.unwrap_or_default()
}

async fn task_future(mut self, cache: SimCache) {
let mut summary = IngestionSummary::default();

loop {
let mut basefee = 0;
tokio::select! {
biased;
res = self.envs.changed() => {
Expand All @@ -54,7 +72,7 @@ impl CacheTask {

summary.log_and_reset();

basefee = sim_env.basefee;
let basefee = sim_env.basefee;
info!(
basefee,
block_env_number = sim_env.number.to::<u64>(),
Expand All @@ -70,10 +88,9 @@ impl CacheTask {
Some(bundle) = self.bundles.recv() => {
summary.bundles_received += 1;

let env_block = self.envs.borrow()
.as_ref()
.map(|e| e.rollup_env().number.to::<u64>())
.unwrap_or_default();
// Read the block number and basefee from the live env at
// ingest time (see `current_block_and_basefee`).
let (env_block, basefee) = self.current_block_and_basefee();
let bundle_block = bundle.bundle.block_number();

// Don't insert bundles for past blocks
Expand Down Expand Up @@ -106,6 +123,7 @@ impl CacheTask {
continue;
};

let (_, basefee) = self.current_block_and_basefee();
match txn.try_into_recovered() {
Ok(recovered_tx) => {
cache.add_tx(recovered_tx, basefee);
Expand Down Expand Up @@ -186,3 +204,75 @@ impl IngestionSummary {
*self = IngestionSummary { has_logged: true, ..IngestionSummary::default() };
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::tasks::env::Environment;
use crate::test_utils::{create_transfer_tx, scenarios_test_block_env as test_block_env};
use alloy::consensus::TxEnvelope;
use alloy::primitives::U256;
use alloy::signers::local::PrivateKeySigner;
use signet_sim::SimItem;
use std::time::Duration;

// Regression for the `basefee = 0` ingest bug: `basefee` was re-initialised
// to 0 at the top of every `select!` iteration and assigned only in the
// `envs.changed()` arm, so `add_tx`/`add_bundle` (which run in other arms)
// always ranked items with basefee 0. The ingest arms must read the basefee
// from the live env instead.
#[tokio::test]
async fn cache_task_ranks_with_env_basefee() {
const BASEFEE: u64 = 60_000_000_000; // 60 gwei, below the tx's 100 gwei max fee
const PRIORITY: u128 = 50_000_000_000; // 50 gwei tip

// Env whose rollup block carries BASEFEE.
let block_env = test_block_env(100, BASEFEE, 1_000, 30_000_000);
let sim_env = SimEnv {
rollup: Environment::new(block_env, Default::default()),
host: Environment::for_testing(),
span: tracing::Span::none(),
};

let (_env_tx, env_rx) = watch::channel(Some(sim_env));
let (_bundle_tx, bundle_rx) = mpsc::unbounded_channel();
let (txn_tx, txn_rx) = mpsc::unbounded_channel();

let (cache, _jh) = CacheTask::new(env_rx, bundle_rx, txn_rx).spawn();

// A tx whose rank depends on the basefee: rank is effective_gas_price
// (min(max_fee, basefee + tip)) * gas_limit, i.e. min(100, 60+50)=100 gwei
// per gas at BASEFEE vs min(100, 0+50)=50 gwei at basefee 0.
let signer = PrivateKeySigner::random();
let recovered =
create_transfer_tx(&signer, signer.address(), U256::from(1u64), 0, 1, PRIORITY)
.unwrap();
let envelope: TxEnvelope = recovered.inner().clone();

let item = SimItem::from(recovered);
let expected = item.calculate_total_fee(BASEFEE);
let with_zero = item.calculate_total_fee(0);
assert_ne!(
expected, with_zero,
"basefee must affect the rank for this test to be meaningful"
);

txn_tx.send(ReceivedTx::Tx(envelope)).unwrap();

// Wait for the task to ingest the tx.
let mut rank = None;
for _ in 0..200 {
if let Some((r, _)) = cache.read_best(1).into_iter().next() {
rank = Some(r);
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}

assert_eq!(
rank.expect("tx was never ingested into the cache"),
expected,
"tx must be ranked with the live env basefee, not 0",
);
}
}