From 8b03564e86087e76ef6ae294a52f50a970587d6c Mon Sep 17 00:00:00 2001 From: Aryan Godara <65490434+AryanGodara@users.noreply.github.com> Date: Wed, 24 Jun 2026 13:58:44 +0530 Subject: [PATCH] Split pool-indexer bootstrap from serve via --bootstrap-only flag --- crates/e2e/tests/e2e/pool_indexer.rs | 56 +++++++++++++++++++-- crates/pool-indexer/README.md | 36 +++++++++++-- crates/pool-indexer/src/arguments.rs | 37 ++++++++++++++ crates/pool-indexer/src/lib.rs | 2 +- crates/pool-indexer/src/run.rs | 75 ++++++++++++++++++++++------ 5 files changed, 180 insertions(+), 26 deletions(-) diff --git a/crates/e2e/tests/e2e/pool_indexer.rs b/crates/e2e/tests/e2e/pool_indexer.rs index 6ee3b4c88c..e8555acd44 100644 --- a/crates/e2e/tests/e2e/pool_indexer.rs +++ b/crates/e2e/tests/e2e/pool_indexer.rs @@ -190,10 +190,8 @@ async fn seed_checkpoint(db: &PgPool, factory: Address, block: u64) { .unwrap(); } -/// Spawns the pool-indexer task and waits for its `/health` endpoint to come -/// up. -async fn spawn_pool_indexer(factory: Address, metrics_port: u16) -> tokio::task::JoinHandle<()> { - let config = Configuration { +fn pool_indexer_config(factory: Address, metrics_port: u16) -> Configuration { + Configuration { database: DatabaseConfig { url: POOL_INDEXER_DB_URL.parse().unwrap(), max_connections: NonZeroU32::new(5).unwrap(), @@ -217,7 +215,13 @@ async fn spawn_pool_indexer(factory: Address, metrics_port: u16) -> tokio::task: metrics: MetricsConfig { bind_address: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, metrics_port)), }, - }; + } +} + +/// Spawns the pool-indexer task and waits for its `/health` endpoint to come +/// up. +async fn spawn_pool_indexer(factory: Address, metrics_port: u16) -> tokio::task::JoinHandle<()> { + let config = pool_indexer_config(factory, metrics_port); let handle = tokio::task::spawn(pool_indexer::run(config)); wait_for_condition(TIMEOUT, || async { reqwest::get(format!("{POOL_INDEXER_HOST}/health")) @@ -647,3 +651,45 @@ async fn pagination(web3: Web3) { }) .await; } + +#[tokio::test] +#[ignore] +async fn local_node_pool_indexer_bootstrap_idempotent() { + run_test(bootstrap_idempotent).await; +} + +/// `--bootstrap-only` on an already-seeded DB must be a fast no-op: detect the +/// existing checkpoint, skip the (here unreachable) subgraph seeder, and return +/// without binding any ports — mirroring a re-run of the bootstrap +/// initContainer on a pod restart. +async fn bootstrap_idempotent(web3: Web3) { + let db = PgPool::connect(POOL_INDEXER_DB_URL).await.unwrap(); + clear_pool_indexer_tables(&db).await; + + // A pre-seeded checkpoint marks the DB as already bootstrapped. No on-chain + // factory is needed: bootstrap reads the checkpoint and returns before any + // seeding or catch-up. The RPC is only used for the chain_id sanity check. + let factory = Address::repeat_byte(0x11); + let head = web3.provider.get_block_number().await.unwrap(); + seed_checkpoint(&db, factory, head).await; + + tokio::time::timeout( + TIMEOUT, + pool_indexer::bootstrap(pool_indexer_config(factory, POOL_INDEXER_METRICS_PORT)), + ) + .await + .expect("bootstrap-only did not exit on an already-seeded DB"); + + let checkpoint: i64 = sqlx::query_scalar( + "SELECT block_number FROM pool_indexer_checkpoints WHERE contract_address = $1", + ) + .bind(factory.as_slice()) + .fetch_one(&db) + .await + .unwrap(); + assert_eq!( + checkpoint, + head.cast_signed(), + "bootstrap mutated an already-seeded checkpoint" + ); +} diff --git a/crates/pool-indexer/README.md b/crates/pool-indexer/README.md index c5f6ad583c..f810d1f457 100644 --- a/crates/pool-indexer/README.md +++ b/crates/pool-indexer/README.md @@ -9,6 +9,27 @@ at a fixed block, catches up to the chain tip via RPC events, then stays live by polling new blocks. Drivers consume it via the `pool-indexer-url` field in their Uniswap V3 liquidity config. +## Bootstrap and serve + +Startup has two phases: + +- **Bootstrap** — initial subgraph seed plus catch-up to the finalized head. + One-time and slow (minutes on a large chain). +- **Serve** — live block polling and the HTTP API. No long startup cost. + +`pool-indexer --config ` runs both in one process: it bootstraps when the +DB has no checkpoint, then serves. This is the single-container deployment. + +`pool-indexer --bootstrap-only true --config ` runs only the bootstrap +phase and then exits 0, binding no HTTP ports. It is **idempotent**: on a DB +that already has a checkpoint it skips the seed and catch-up entirely (never +touching the subgraph) and returns immediately, so re-running it is a fast, safe +no-op. + +Running bootstrap as a separate step ahead of serving keeps serve startup fast: +the serve process finds the checkpoint already present and flips `/startup` ready +almost immediately. + ## Running locally Create `crates/pool-indexer/config.local.toml` first (schema = the @@ -17,12 +38,19 @@ Create `crates/pool-indexer/config.local.toml` first (schema = the sections. String fields accept `%ENV_VAR` so secrets can come from the environment instead of being written into the file. -Then, from the repository root, reset the local stack and start the indexer: +The indexer uses its own database, migrated from `database/sql-pool-indexer` +(separate from the shared autopilot/orderbook set in `database/sql`). From the +repository root: ```bash -# wipes the local DB — dev machines only -docker compose down --volumes docker compose up -d db -docker compose run --rm migrations +# create + migrate the indexer's own database via flyway (database/sql-pool-indexer) +docker compose up migrations-pool-indexer + +# one process for both phases: +cargo run --release -p pool-indexer -- --config crates/pool-indexer/config.local.toml + +# or split bootstrap from serve: +cargo run --release -p pool-indexer -- --bootstrap-only true --config crates/pool-indexer/config.local.toml cargo run --release -p pool-indexer -- --config crates/pool-indexer/config.local.toml ``` diff --git a/crates/pool-indexer/src/arguments.rs b/crates/pool-indexer/src/arguments.rs index ce5085cf1f..656a776903 100644 --- a/crates/pool-indexer/src/arguments.rs +++ b/crates/pool-indexer/src/arguments.rs @@ -7,6 +7,43 @@ pub struct Arguments { #[clap(long, env)] pub config: PathBuf, + /// Run only the bootstrap phase (initial seed + catch-up to the finalized + /// head), then exit; bind no HTTP ports. Idempotent: a fast no-op when the + /// DB is already seeded. Lets the slow seed run as a separate step ahead of + /// serving. + #[clap(long, env, action = clap::ArgAction::Set, default_value_t = false)] + pub bootstrap_only: bool, + #[clap(flatten)] pub logging: LoggingArguments, } + +#[cfg(test)] +mod tests { + use {super::*, clap::Parser}; + + #[test] + fn bootstrap_only_flag_parses() { + let serve = Arguments::parse_from(["pool-indexer", "--config", "/tmp/c.toml"]); + assert!(!serve.bootstrap_only); + + let bootstrap = Arguments::parse_from([ + "pool-indexer", + "--config", + "/tmp/c.toml", + "--bootstrap-only", + "true", + ]); + assert!(bootstrap.bootstrap_only); + + // ArgAction::Set takes a value, so it can be explicitly turned off too. + let explicit_serve = Arguments::parse_from([ + "pool-indexer", + "--config", + "/tmp/c.toml", + "--bootstrap-only", + "false", + ]); + assert!(!explicit_serve.bootstrap_only); + } +} diff --git a/crates/pool-indexer/src/lib.rs b/crates/pool-indexer/src/lib.rs index 7a12692bfb..ab36f57324 100644 --- a/crates/pool-indexer/src/lib.rs +++ b/crates/pool-indexer/src/lib.rs @@ -1,5 +1,5 @@ pub mod config; -pub use run::{run, start}; +pub use run::{bootstrap, run, start}; mod api; mod arguments; diff --git a/crates/pool-indexer/src/run.rs b/crates/pool-indexer/src/run.rs index b9ce460fcf..60f32fb0f5 100644 --- a/crates/pool-indexer/src/run.rs +++ b/crates/pool-indexer/src/run.rs @@ -21,8 +21,47 @@ pub async fn start(args: impl Iterator) { initialize_observability(&args); observe::metrics::setup_registry(None, None); let config = Configuration::from_path(&args.config).expect("failed to load configuration"); - tracing::info!("pool-indexer starting"); - run(config).await; + if args.bootstrap_only { + tracing::info!("pool-indexer bootstrap-only starting"); + bootstrap(config).await; + tracing::info!("pool-indexer bootstrap complete, exiting"); + } else { + tracing::info!("pool-indexer starting"); + run(config).await; + } +} + +/// Runs the bootstrap phase (seed + catch-up to the finalized head) for every +/// factory, then returns. Binds no HTTP ports — meant to run as a separate step +/// ahead of serving. +/// +/// Idempotent: each factory with an existing checkpoint is skipped (see +/// [`bootstrap_factory`]), so re-running on an already-seeded DB is a fast +/// no-op that never touches the subgraph. On return, a subsequent `run` finds +/// the checkpoints present and flips `/startup` ready almost immediately. +pub async fn bootstrap(config: Configuration) { + let db = connect_db(&config).await; + let network = config.network; + let provider = build_provider_checked(&network).await; + let network = Arc::new(network); + + // Seed every factory concurrently, like the serve path. + let mut factory_set = JoinSet::new(); + for factory in network.factories.iter().copied() { + let indexer = UniswapV3Indexer::new( + provider.clone(), + db.clone(), + &network.indexer_config(factory.address), + ); + let db = db.clone(); + let network = network.clone(); + factory_set.spawn(async move { + bootstrap_factory(&db, &indexer, &network, &factory).await; + }); + } + while let Some(result) = factory_set.join_next().await { + result.expect("bootstrap task panicked"); + } } pub async fn run(config: Configuration) { @@ -125,20 +164,7 @@ async fn run_network_indexer(db: PgPool, network: NetworkConfig, barrier: Arc AlloyProvider { .clone() } +/// Builds the RPC provider and asserts the node's chain_id matches config. +/// Catches misconfigured RPC-vs-network pairings (e.g. chain_id=1 pointed at +/// an Arbitrum node) before we index the wrong chain into the DB. +async fn build_provider_checked(network: &NetworkConfig) -> AlloyProvider { + let provider = build_provider(network); + let actual_chain_id = provider + .get_chain_id() + .await + .expect("failed to fetch chain_id from RPC"); + assert_eq!( + actual_chain_id, network.chain_id, + "chain_id mismatch for network {}: config says {}, RPC reports {}", + network.name, network.chain_id, actual_chain_id, + ); + provider +} + async fn connect_db(config: &Configuration) -> sqlx::PgPool { PgPoolOptions::new() .max_connections(config.database.max_connections.get())