Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions crates/e2e/tests/e2e/pool_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,8 @@ async fn seed_checkpoint(db: &PgPool, factory: Address, block: u64) {
.unwrap();
}

/// Spawns the pool-indexer task and waits for its `/health` endpoint to come
/// up.
async fn spawn_pool_indexer(factory: Address, metrics_port: u16) -> tokio::task::JoinHandle<()> {
let config = Configuration {
fn pool_indexer_config(factory: Address, metrics_port: u16) -> Configuration {
Configuration {
database: DatabaseConfig {
url: POOL_INDEXER_DB_URL.parse().unwrap(),
max_connections: NonZeroU32::new(5).unwrap(),
Expand All @@ -217,7 +215,13 @@ async fn spawn_pool_indexer(factory: Address, metrics_port: u16) -> tokio::task:
metrics: MetricsConfig {
bind_address: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, metrics_port)),
},
};
}
}

/// Spawns the pool-indexer task and waits for its `/health` endpoint to come
/// up.
async fn spawn_pool_indexer(factory: Address, metrics_port: u16) -> tokio::task::JoinHandle<()> {
let config = pool_indexer_config(factory, metrics_port);
let handle = tokio::task::spawn(pool_indexer::run(config));
wait_for_condition(TIMEOUT, || async {
reqwest::get(format!("{POOL_INDEXER_HOST}/health"))
Expand Down Expand Up @@ -647,3 +651,45 @@ async fn pagination(web3: Web3) {
})
.await;
}

#[tokio::test]
#[ignore]
async fn local_node_pool_indexer_bootstrap_idempotent() {
run_test(bootstrap_idempotent).await;
}

/// `--bootstrap-only` on an already-seeded DB must be a fast no-op: detect the
/// existing checkpoint, skip the (here unreachable) subgraph seeder, and return
/// without binding any ports — mirroring a re-run of the bootstrap
/// initContainer on a pod restart.
async fn bootstrap_idempotent(web3: Web3) {
let db = PgPool::connect(POOL_INDEXER_DB_URL).await.unwrap();
clear_pool_indexer_tables(&db).await;

// A pre-seeded checkpoint marks the DB as already bootstrapped. No on-chain
// factory is needed: bootstrap reads the checkpoint and returns before any
// seeding or catch-up. The RPC is only used for the chain_id sanity check.
let factory = Address::repeat_byte(0x11);
let head = web3.provider.get_block_number().await.unwrap();
seed_checkpoint(&db, factory, head).await;

tokio::time::timeout(
TIMEOUT,
pool_indexer::bootstrap(pool_indexer_config(factory, POOL_INDEXER_METRICS_PORT)),
)
.await
.expect("bootstrap-only did not exit on an already-seeded DB");

let checkpoint: i64 = sqlx::query_scalar(
"SELECT block_number FROM pool_indexer_checkpoints WHERE contract_address = $1",
)
.bind(factory.as_slice())
.fetch_one(&db)
.await
.unwrap();
assert_eq!(
checkpoint,
head.cast_signed(),
"bootstrap mutated an already-seeded checkpoint"
);
}
36 changes: 32 additions & 4 deletions crates/pool-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,27 @@ at a fixed block, catches up to the chain tip via RPC events, then stays
live by polling new blocks. Drivers consume it via the `pool-indexer-url`
field in their Uniswap V3 liquidity config.

## Bootstrap and serve

Startup has two phases:

- **Bootstrap** — initial subgraph seed plus catch-up to the finalized head.
One-time and slow (minutes on a large chain).
- **Serve** — live block polling and the HTTP API. No long startup cost.

`pool-indexer --config <toml>` runs both in one process: it bootstraps when the
DB has no checkpoint, then serves. This is the single-container deployment.

`pool-indexer --bootstrap-only true --config <toml>` runs only the bootstrap
phase and then exits 0, binding no HTTP ports. It is **idempotent**: on a DB
that already has a checkpoint it skips the seed and catch-up entirely (never
touching the subgraph) and returns immediately, so re-running it is a fast, safe
no-op.

Running bootstrap as a separate step ahead of serving keeps serve startup fast:
the serve process finds the checkpoint already present and flips `/startup` ready
almost immediately.

## Running locally

Create `crates/pool-indexer/config.local.toml` first (schema = the
Expand All @@ -17,12 +38,19 @@ Create `crates/pool-indexer/config.local.toml` first (schema = the
sections. String fields accept `%ENV_VAR` so secrets can come from the
environment instead of being written into the file.

Then, from the repository root, reset the local stack and start the indexer:
The indexer uses its own database, migrated from `database/sql-pool-indexer`
(separate from the shared autopilot/orderbook set in `database/sql`). From the
repository root:

```bash
# wipes the local DB — dev machines only
docker compose down --volumes
docker compose up -d db
docker compose run --rm migrations
# create + migrate the indexer's own database via flyway (database/sql-pool-indexer)
docker compose up migrations-pool-indexer

# one process for both phases:
cargo run --release -p pool-indexer -- --config crates/pool-indexer/config.local.toml

# or split bootstrap from serve:
cargo run --release -p pool-indexer -- --bootstrap-only true --config crates/pool-indexer/config.local.toml
cargo run --release -p pool-indexer -- --config crates/pool-indexer/config.local.toml
```
37 changes: 37 additions & 0 deletions crates/pool-indexer/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,43 @@ pub struct Arguments {
#[clap(long, env)]
pub config: PathBuf,

/// Run only the bootstrap phase (initial seed + catch-up to the finalized
/// head), then exit; bind no HTTP ports. Idempotent: a fast no-op when the
/// DB is already seeded. Lets the slow seed run as a separate step ahead of
/// serving.
#[clap(long, env, action = clap::ArgAction::Set, default_value_t = false)]
pub bootstrap_only: bool,

#[clap(flatten)]
pub logging: LoggingArguments,
}

#[cfg(test)]
mod tests {
use {super::*, clap::Parser};

#[test]
fn bootstrap_only_flag_parses() {
let serve = Arguments::parse_from(["pool-indexer", "--config", "/tmp/c.toml"]);
assert!(!serve.bootstrap_only);

let bootstrap = Arguments::parse_from([
"pool-indexer",
"--config",
"/tmp/c.toml",
"--bootstrap-only",
"true",
]);
assert!(bootstrap.bootstrap_only);

// ArgAction::Set takes a value, so it can be explicitly turned off too.
let explicit_serve = Arguments::parse_from([
"pool-indexer",
"--config",
"/tmp/c.toml",
"--bootstrap-only",
"false",
]);
assert!(!explicit_serve.bootstrap_only);
}
}
2 changes: 1 addition & 1 deletion crates/pool-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod config;
pub use run::{run, start};
pub use run::{bootstrap, run, start};

mod api;
mod arguments;
Expand Down
75 changes: 59 additions & 16 deletions crates/pool-indexer/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,47 @@ pub async fn start(args: impl Iterator<Item = String>) {
initialize_observability(&args);
observe::metrics::setup_registry(None, None);
let config = Configuration::from_path(&args.config).expect("failed to load configuration");
tracing::info!("pool-indexer starting");
run(config).await;
if args.bootstrap_only {
tracing::info!("pool-indexer bootstrap-only starting");
bootstrap(config).await;
tracing::info!("pool-indexer bootstrap complete, exiting");
} else {
tracing::info!("pool-indexer starting");
run(config).await;
}
}

/// Runs the bootstrap phase (seed + catch-up to the finalized head) for every
/// factory, then returns. Binds no HTTP ports — meant to run as a separate step
/// ahead of serving.
///
/// Idempotent: each factory with an existing checkpoint is skipped (see
/// [`bootstrap_factory`]), so re-running on an already-seeded DB is a fast
/// no-op that never touches the subgraph. On return, a subsequent `run` finds
/// the checkpoints present and flips `/startup` ready almost immediately.
pub async fn bootstrap(config: Configuration) {
let db = connect_db(&config).await;
let network = config.network;
let provider = build_provider_checked(&network).await;
let network = Arc::new(network);

// Seed every factory concurrently, like the serve path.
let mut factory_set = JoinSet::new();
for factory in network.factories.iter().copied() {
let indexer = UniswapV3Indexer::new(
provider.clone(),
db.clone(),
&network.indexer_config(factory.address),
);
let db = db.clone();
let network = network.clone();
factory_set.spawn(async move {
bootstrap_factory(&db, &indexer, &network, &factory).await;
});
}
while let Some(result) = factory_set.join_next().await {
result.expect("bootstrap task panicked");
}
Comment thread
AryanGodara marked this conversation as resolved.
}

pub async fn run(config: Configuration) {
Expand Down Expand Up @@ -125,20 +164,7 @@ async fn run_network_indexer(db: PgPool, network: NetworkConfig, barrier: Arc<St
"starting network indexer",
);

let provider = build_provider(&network);

// Catch misconfigured RPC-vs-network pairings (e.g. chain_id=1 pointed
// at an Arbitrum node) before we index the wrong chain into the DB.
let actual_chain_id = provider
.get_chain_id()
.await
.expect("failed to fetch chain_id from RPC");
assert_eq!(
actual_chain_id, network.chain_id,
"chain_id mismatch for network {}: config says {}, RPC reports {}",
network.name, network.chain_id, actual_chain_id,
);

let provider = build_provider_checked(&network).await;
let network = Arc::new(network);

// One task per factory. Provider + DB pool are shared; checkpoints are
Expand Down Expand Up @@ -255,6 +281,23 @@ fn build_provider(network: &NetworkConfig) -> AlloyProvider {
.clone()
}

/// Builds the RPC provider and asserts the node's chain_id matches config.
/// Catches misconfigured RPC-vs-network pairings (e.g. chain_id=1 pointed at
/// an Arbitrum node) before we index the wrong chain into the DB.
async fn build_provider_checked(network: &NetworkConfig) -> AlloyProvider {
let provider = build_provider(network);
let actual_chain_id = provider
.get_chain_id()
.await
.expect("failed to fetch chain_id from RPC");
assert_eq!(
actual_chain_id, network.chain_id,
"chain_id mismatch for network {}: config says {}, RPC reports {}",
network.name, network.chain_id, actual_chain_id,
);
provider
}

async fn connect_db(config: &Configuration) -> sqlx::PgPool {
PgPoolOptions::new()
.max_connections(config.database.max_connections.get())
Expand Down
Loading