diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 6690a29c517..e83e03ed2a0 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -219,6 +219,14 @@ pub trait SubgraphStore: Send + Sync + 'static { /// being set up async fn least_block_ptr(&self, id: &DeploymentHash) -> Result, StoreError>; + /// Return the earliest block for which the deployment with this `id` + /// retains complete data. For unpruned deployments this is the start + /// block; for pruned deployments it advances as historical entity + /// versions are removed. Blocks earlier than this are not queryable + /// and cannot be used as a graft point because the entity versions + /// live at that block are no longer present. + async fn earliest_block_number(&self, id: &DeploymentHash) -> Result; + async fn is_healthy(&self, id: &DeploymentHash) -> Result; /// Find all deployment locators for the subgraph with the given hash. diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 5717f7ad273..fda11f7e115 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -547,7 +547,23 @@ pub struct Graft { } impl Graft { - async fn validate( + /// Validate that this `Graft` can be performed against the configured + /// base subgraph. Checks (in order): + /// 1. the base has processed at least one block, + /// 2. the graft block is at or above the base's `earliest_block_number` + /// (the prune floor — grafting below it would silently produce a + /// subgraph with reset state for any entity whose live-at-graft + /// version was a closed historical version), + /// 3. the base has processed past `self.block` (i.e., its head is at + /// or above the graft block), + /// 4. the graft block is at least `reorg_threshold` blocks behind the + /// base's head, so a reorg of the base cannot invalidate the copy, + /// 5. if the base is unhealthy, the graft block is strictly before its + /// failure block. + /// + /// `pub` so that tooling and tests can validate a graft directly + /// without resolving a full `UnvalidatedSubgraphManifest`. + pub async fn validate( &self, store: Arc, ) -> Result<(), SubgraphManifestValidationError> { @@ -562,6 +578,31 @@ impl Graft { .await .map_err(|e| GraftBaseInvalid(e.to_string()))?; + // Reject grafts below the base's `earliest_block_number` (its prune + // floor). If the base has been pruned past `self.block`, the entity + // versions that were live at the graft block are gone, and grafting + // would silently produce a subgraph with reset state for any entity + // whose live-at-graft version was a closed historical version + // (heavily-updated mutable singletons are the worst-affected case). + // We only consult `earliest_block_number` when the base has + // processed at least one block, since the `(None, _)` arm below + // emits a clearer error otherwise. + if last_processed_block.is_some() { + let earliest_block = store + .earliest_block_number(&self.base) + .await + .map_err(|e| GraftBaseInvalid(e.to_string()))?; + if self.block < earliest_block { + return Err(GraftBaseInvalid(format!( + "failed to graft onto `{}` at block {} because the base \ + subgraph only retains data starting at block {}; earlier \ + blocks have been pruned. Graft at block {} or later, or \ + use a base subgraph with sufficient retention.", + self.base, self.block, earliest_block, earliest_block + ))); + } + } + // We are being defensive here: we don't know which specific // instance of a subgraph we will use as the base for the graft, // since the notion of which of these instances is active can change diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index d764d451ad6..22e16049155 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1562,12 +1562,41 @@ impl DeploymentStore { dst.catalog.site.namespace ); + // Defense in depth against grafting below the base's prune + // floor. `Graft::validate` enforces this at manifest validation + // time, but callers that reach this code path directly + // (graphman, tests, custom tooling) bypass the registrar. The + // copy reads entity versions whose `lower(block_range) <= block`; + // if the version live at `block` has already been pruned, those + // versions are silently missing and the resulting graft has + // reset state for any entity whose live-at-graft version was + // closed. + let src_earliest_before = { + let mut conn = self.pool.get_permitted().await?; + deployment::state(&mut conn, &src.site) + .await? + .earliest_block_number + }; + if block.number < src_earliest_before { + return Err(StoreError::Unknown(anyhow::anyhow!( + "cannot graft `{}` onto `{}` at block {} because the base \ + subgraph only retains data starting at block {}; earlier \ + blocks have been pruned. Graft at block {} or later, or \ + use a base subgraph with sufficient retention.", + dst.catalog.site.namespace, + src.catalog.site.namespace, + block.number, + src_earliest_before, + src_earliest_before, + ))); + } + let src_manifest_idx_and_name = src_deployment.manifest.template_idx_and_name()?; - let dst_manifest_idx_and_name = self - .load_deployment(dst.site.clone()) - .await? - .manifest - .template_idx_and_name()?; + // Keep `dst_deployment` bound so we can read its + // `history_blocks` below; otherwise the chained + // `.manifest.template_idx_and_name()?` would consume it. + let dst_deployment = self.load_deployment(dst.site.clone()).await?; + let dst_manifest_idx_and_name = dst_deployment.manifest.template_idx_and_name()?; // Copy subgraph data // We allow both not copying tables at all from the source, as well @@ -1644,10 +1673,25 @@ impl DeploymentStore { info!(logger, "Rewound subgraph to block {}", block.number; "time_ms" => start.elapsed().as_millis()); + // Use the *maximum* of the source's and destination's + // `history_blocks` rather than overwriting the destination + // with the source's value. The destination has just + // received the source's full retained history, so its + // retention must be at least as long as the source's + // (otherwise the inherited data would be immediately + // eligible for pruning); but if the destination's + // manifest requests longer retention (notably + // `prune: never`, which yields `BLOCK_NUMBER_MAX`), that + // intent must be honoured. The previous unconditional + // overwrite silently downgraded `prune: never` children + // to whatever retention the parent used. deployment::set_history_blocks( conn, &dst.site, - src_deployment.manifest.history_blocks, + src_deployment + .manifest + .history_blocks + .max(dst_deployment.manifest.history_blocks), ) .await?; @@ -1660,6 +1704,39 @@ impl DeploymentStore { // removed data just before we copied it deployment::copy_earliest_block(conn, &src.site, &dst.site).await?; + // Detect the prune-during-copy race: if the base pruned + // past our graft block while the copy was running, the + // `copy_earliest_block` call above propagated that floor + // into `dst`, leaving us with `earliest_block > graft_block` + // — a structurally invalid graft. Fail the transaction so + // the bookkeeping is rolled back; partial copied data + // remains in `dst`'s tables and will require operator + // cleanup (e.g. delete + redeploy) before retrying. + // + // We re-read the source's earliest rather than the + // destination's because the destination row exists but has + // not yet been advanced past genesis (`forward_block_ptr` + // below is what sets its head), so `deployment::state` on + // the destination would error with "has not started + // syncing yet". The source has a head and is the value + // `copy_earliest_block` just propagated. + let src_earliest_after = deployment::state(conn, &src.site) + .await? + .earliest_block_number; + if src_earliest_after > block.number { + return Err(StoreError::Unknown(anyhow::anyhow!( + "graft of `{}` at block {} was invalidated mid-copy: \ + the base subgraph `{}` was pruned past the graft block \ + (earliest is now {}). The deployment must be deleted \ + and recreated, ideally targeting a graft block within \ + the base's retained range.", + dst.catalog.site.namespace, + block.number, + src.catalog.site.namespace, + src_earliest_after, + ))); + } + // Set the block ptr to the graft point to signal that we successfully // performed the graft crate::deployment::forward_block_ptr(conn, &dst.site, &block).await?; diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 8c268562aa7..466696ba07d 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1520,6 +1520,18 @@ impl Inner { .await } + /// Return the current `history_blocks` retention setting recorded in + /// the deployment's manifest. Mirrors [`Self::set_history_blocks`] and + /// is useful for tooling and tests that need to inspect the effective + /// retention after operations such as grafting. + pub async fn history_blocks( + &self, + deployment: &DeploymentLocator, + ) -> Result { + let site = self.find_site(deployment.id.into()).await?; + Ok(self.load_deployment(site).await?.manifest.history_blocks) + } + pub async fn load_deployment( &self, site: Arc, @@ -1888,6 +1900,12 @@ impl SubgraphStoreTrait for SubgraphStore { store.block_ptr(site.cheap_clone()).await } + async fn earliest_block_number(&self, id: &DeploymentHash) -> Result { + let (store, site) = self.store(id).await?; + let state = store.deployment_state(site.cheap_clone()).await?; + Ok(state.earliest_block_number) + } + async fn is_healthy(&self, id: &DeploymentHash) -> Result { let (store, site) = self.store(id).await?; let health = store.health(&site).await?; diff --git a/store/test-store/tests/postgres/graft.rs b/store/test-store/tests/postgres/graft.rs index 6536fc3e06c..a11cf80b3e0 100644 --- a/store/test-store/tests/postgres/graft.rs +++ b/store/test-store/tests/postgres/graft.rs @@ -670,3 +670,198 @@ fn prune() { }) } } + +/// `Graft::validate` rejects a graft block that is below the base +/// subgraph's `earliest_block_number` (i.e. into already-pruned history). +/// +/// This is the manifest-level check: it is what stops a normal `subgraph +/// deploy` from registering a graft that cannot be performed correctly. +/// Defense in depth for callers that bypass the registrar (`test_store`, +/// graphman, etc.) is exercised by `graft_store_path_rejects_below_prune_floor` +/// (which goes straight to the store/copy path). +#[test] +fn graft_validate_rejects_below_prune_floor() { + struct Progress; + impl PruneReporter for Progress {} + + run_test(|store, src| async move { + // Set up the same pruned state as `graft_store_path_rejects_below_prune_floor`: + // close user 2's block-1 version by updating at block 3, then prune + // with `history_blocks = 2` over `[0, 6]` so `earliest_block = 4`. + let user2_v2 = create_test_entity( + "2", + USER, + "Cindini", + "dinici@email.com", + 44_i32, + 157.1, + true, + Some("red"), + 4, + ); + transact_and_wait(&store, &src, BLOCKS[3].clone(), vec![user2_v2]) + .await + .unwrap(); + transact_and_wait(&store, &src, BLOCKS[6].clone(), vec![]) + .await + .unwrap(); + let req = PruneRequest::new(&src, 2, 1, 0, 6)?; + store + .prune(Box::new(Progress), &src, req) + .await + .expect("pruning works"); + + // Asking `Graft::validate` to graft at block 2 (below earliest_block = 4) + // must yield `GraftBaseInvalid` with an actionable message. + let graft = Graft { + base: src.hash.clone(), + block: 2, + }; + let err = graft + .validate(store.cheap_clone()) + .await + .expect_err("graft below the prune floor must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("only retains data starting at block 4"), + "expected prune-floor error mentioning earliest block 4, got: {msg}" + ); + + // Sanity: the new check is bounded — grafting *at* the prune floor + // (block == earliest) must not trigger the prune-floor rejection. + // The graft may still fail for unrelated reasons (e.g. reorg + // threshold), but never with the prune-floor message. + let graft_at_floor = Graft { + base: src.hash.clone(), + block: 4, + }; + if let Err(err) = graft_at_floor.validate(store.cheap_clone()).await { + let msg = err.to_string(); + assert!( + !msg.contains("only retains data starting"), + "graft at the prune floor must not trigger the prune-floor \ + rejection; got: {msg}" + ); + } + Ok(()) + }) +} + +/// The store-level graft path (`DeploymentStore::start_subgraph`) rejects a +/// graft whose block is below the base subgraph's `earliest_block_number`. +/// +/// This is the defense-in-depth check that complements +/// [`graft_validate_rejects_below_prune_floor`]: it covers callers that +/// reach the store path without going through `Graft::validate` (graphman, +/// custom deploy tooling, and `test_store::create_subgraph` itself). Without +/// this check, the copy reads entity versions where +/// `lower(block_range) <= block` and silently misses the version live at +/// `block` whenever pruning removed it, leaving heavily-updated mutable +/// entities reset to their default state in the grafted subgraph. +#[test] +fn graft_store_path_rejects_below_prune_floor() { + struct Progress; + impl PruneReporter for Progress {} + + run_test(|store, src| async move { + // Same pruned-base setup as `graft_validate_rejects_below_prune_floor`: + // update user 2 at block 3 so its live-at-block-2 version `[1,3)` + // becomes a closed historical version, then prune with + // `history_blocks = 2` over `[0, 6]` so `earliest_block = 4`. + let user2_v2 = create_test_entity( + "2", + USER, + "Cindini", + "dinici@email.com", + 44_i32, + 157.1, + true, + Some("red"), + 4, + ); + transact_and_wait(&store, &src, BLOCKS[3].clone(), vec![user2_v2]) + .await + .unwrap(); + transact_and_wait(&store, &src, BLOCKS[6].clone(), vec![]) + .await + .unwrap(); + let req = PruneRequest::new(&src, 2, 1, 0, 6)?; + store + .prune(Box::new(Progress), &src, req) + .await + .expect("pruning works"); + + // Grafting at block 2 must be rejected even though + // `test_store::create_subgraph` bypasses `Graft::validate`. The + // store-level pre-copy check fails the start with an actionable + // error before any copy work happens. + let graft_id = DeploymentHash::new("grafted_below_floor").unwrap(); + let err = + create_grafted_subgraph(&graft_id, GRAFT_GQL, src.hash.as_str(), BLOCKS[2].clone()) + .await + .expect_err("graft below prune floor must be rejected at the store layer"); + let msg = err.to_string(); + assert!( + msg.contains("only retains data starting at block 4"), + "expected prune-floor error mentioning earliest block 4, got: {msg}" + ); + + // Grafting at the prune floor (block 4) is still allowed: the live + // versions there survived pruning and the copy can use them. + let graft_ok = DeploymentHash::new("grafted_at_floor").unwrap(); + create_grafted_subgraph(&graft_ok, GRAFT_GQL, src.hash.as_str(), BLOCKS[4].clone()) + .await + .expect("graft at earliest_block must be accepted"); + + Ok(()) + }) +} + +/// Grafting must not silently shorten the destination's `history_blocks` +/// retention. +/// +/// Before the fix, `start_subgraph` unconditionally wrote the source +/// subgraph's `history_blocks` into the destination, so a `prune: never` +/// child grafted from a `prune: auto` parent silently inherited the +/// parent's aggressive retention window. The fix takes the *maximum* of +/// the two, so the destination keeps at least as much history as it has +/// (the inherited copied data) but is never downgraded from a longer +/// retention it explicitly requested. +/// +/// The test setup matches that scenario: the destination is built by +/// `test_store::create_subgraph`, whose manifest has `indexer_hints: None` +/// and therefore reports `history_blocks = BLOCK_NUMBER_MAX` (the +/// `prune: never` equivalent), while we manually lower the source's +/// retention so old / new behaviour are observably different. +#[test] +fn graft_history_blocks_takes_max_of_parent_and_child() { + run_test(|store, src| async move { + // Lower the source's `history_blocks` to a small value so that + // the "max vs overwrite" outcomes diverge measurably. + let src_hb = 2; + let reorg_threshold = 1; + store + .set_history_blocks(&src, src_hb, reorg_threshold) + .await + .expect("lowering source history_blocks"); + + let graft_id = DeploymentHash::new("grafted_history_blocks").unwrap(); + let dst = + create_grafted_subgraph(&graft_id, GRAFT_GQL, src.hash.as_str(), BLOCKS[1].clone()) + .await + .expect("graft succeeds"); + + let dst_hb = store + .history_blocks(&dst) + .await + .expect("read dst history_blocks"); + assert_eq!( + dst_hb, BLOCK_NUMBER_MAX, + "graft must keep the destination's longer retention \ + (BLOCK_NUMBER_MAX = {BLOCK_NUMBER_MAX}) rather than overwriting it \ + with the source's smaller value ({src_hb})" + ); + + Ok(()) + }) +}