From 55003b91472a55812e11cdd9a1d4f39b7d0e1988 Mon Sep 17 00:00:00 2001 From: rslowinski Date: Tue, 2 Jun 2026 16:48:24 +0200 Subject: [PATCH 1/3] graph, store: validate graft block against base subgraph's earliest_block MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reject grafts whose block is below the base subgraph's earliest_block_number. When the base has been pruned past the requested graft block, the versions of mutable entities that were live at the graft block have been removed (closed versions with upper(block_range) <= earliest_block are eligible for pruning). The copy then reads versions where lower(block_range) <= graft_block and finds none for those entities, leaving them absent in the destination; the post-copy re-open (RevertClampQuery) has no version to re-open. Entities whose live-at-graft version is still open (never updated again, hence never pruned) and all immutable entities are unaffected — only heavily-updated mutable singletons are silently reset to their default state. Add SubgraphStore::earliest_block_number and use it in Graft::validate to reject the manifest before deployment registration. Make Graft::validate `pub` so tooling and tests can validate a graft directly without resolving a full UnvalidatedSubgraphManifest. --- graph/src/components/store/traits.rs | 8 +++ graph/src/data/subgraph/mod.rs | 43 +++++++++++++- store/postgres/src/subgraph_store.rs | 6 ++ store/test-store/tests/postgres/graft.rs | 76 ++++++++++++++++++++++++ 4 files changed, 132 insertions(+), 1 deletion(-) diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 6690a29c517..e83e03ed2a0 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -219,6 +219,14 @@ pub trait SubgraphStore: Send + Sync + 'static { /// being set up async fn least_block_ptr(&self, id: &DeploymentHash) -> Result, StoreError>; + /// Return the earliest block for which the deployment with this `id` + /// retains complete data. For unpruned deployments this is the start + /// block; for pruned deployments it advances as historical entity + /// versions are removed. Blocks earlier than this are not queryable + /// and cannot be used as a graft point because the entity versions + /// live at that block are no longer present. + async fn earliest_block_number(&self, id: &DeploymentHash) -> Result; + async fn is_healthy(&self, id: &DeploymentHash) -> Result; /// Find all deployment locators for the subgraph with the given hash. diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 5717f7ad273..fda11f7e115 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -547,7 +547,23 @@ pub struct Graft { } impl Graft { - async fn validate( + /// Validate that this `Graft` can be performed against the configured + /// base subgraph. Checks (in order): + /// 1. the base has processed at least one block, + /// 2. the graft block is at or above the base's `earliest_block_number` + /// (the prune floor — grafting below it would silently produce a + /// subgraph with reset state for any entity whose live-at-graft + /// version was a closed historical version), + /// 3. the base has processed past `self.block` (i.e., its head is at + /// or above the graft block), + /// 4. the graft block is at least `reorg_threshold` blocks behind the + /// base's head, so a reorg of the base cannot invalidate the copy, + /// 5. if the base is unhealthy, the graft block is strictly before its + /// failure block. + /// + /// `pub` so that tooling and tests can validate a graft directly + /// without resolving a full `UnvalidatedSubgraphManifest`. + pub async fn validate( &self, store: Arc, ) -> Result<(), SubgraphManifestValidationError> { @@ -562,6 +578,31 @@ impl Graft { .await .map_err(|e| GraftBaseInvalid(e.to_string()))?; + // Reject grafts below the base's `earliest_block_number` (its prune + // floor). If the base has been pruned past `self.block`, the entity + // versions that were live at the graft block are gone, and grafting + // would silently produce a subgraph with reset state for any entity + // whose live-at-graft version was a closed historical version + // (heavily-updated mutable singletons are the worst-affected case). + // We only consult `earliest_block_number` when the base has + // processed at least one block, since the `(None, _)` arm below + // emits a clearer error otherwise. + if last_processed_block.is_some() { + let earliest_block = store + .earliest_block_number(&self.base) + .await + .map_err(|e| GraftBaseInvalid(e.to_string()))?; + if self.block < earliest_block { + return Err(GraftBaseInvalid(format!( + "failed to graft onto `{}` at block {} because the base \ + subgraph only retains data starting at block {}; earlier \ + blocks have been pruned. Graft at block {} or later, or \ + use a base subgraph with sufficient retention.", + self.base, self.block, earliest_block, earliest_block + ))); + } + } + // We are being defensive here: we don't know which specific // instance of a subgraph we will use as the base for the graft, // since the notion of which of these instances is active can change diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 8c268562aa7..b4435cae0cb 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1888,6 +1888,12 @@ impl SubgraphStoreTrait for SubgraphStore { store.block_ptr(site.cheap_clone()).await } + async fn earliest_block_number(&self, id: &DeploymentHash) -> Result { + let (store, site) = self.store(id).await?; + let state = store.deployment_state(site.cheap_clone()).await?; + Ok(state.earliest_block_number) + } + async fn is_healthy(&self, id: &DeploymentHash) -> Result { let (store, site) = self.store(id).await?; let health = store.health(&site).await?; diff --git a/store/test-store/tests/postgres/graft.rs b/store/test-store/tests/postgres/graft.rs index 6536fc3e06c..c9417d84ada 100644 --- a/store/test-store/tests/postgres/graft.rs +++ b/store/test-store/tests/postgres/graft.rs @@ -670,3 +670,79 @@ fn prune() { }) } } + +/// `Graft::validate` rejects a graft block that is below the base +/// subgraph's `earliest_block_number` (i.e. into already-pruned history). +/// +/// This is the manifest-level check: it is what stops a normal `subgraph +/// deploy` from registering a graft that cannot be performed correctly. +/// Defense in depth for callers that bypass the registrar (`test_store`, +/// graphman, etc.) is exercised by `graft_store_path_rejects_below_prune_floor` +/// (which goes straight to the store/copy path). +#[test] +fn graft_validate_rejects_below_prune_floor() { + struct Progress; + impl PruneReporter for Progress {} + + run_test(|store, src| async move { + // Set up the same pruned state as `graft_store_path_rejects_below_prune_floor`: + // close user 2's block-1 version by updating at block 3, then prune + // with `history_blocks = 2` over `[0, 6]` so `earliest_block = 4`. + let user2_v2 = create_test_entity( + "2", + USER, + "Cindini", + "dinici@email.com", + 44_i32, + 157.1, + true, + Some("red"), + 4, + ); + transact_and_wait(&store, &src, BLOCKS[3].clone(), vec![user2_v2]) + .await + .unwrap(); + transact_and_wait(&store, &src, BLOCKS[6].clone(), vec![]) + .await + .unwrap(); + let req = PruneRequest::new(&src, 2, 1, 0, 6)?; + store + .prune(Box::new(Progress), &src, req) + .await + .expect("pruning works"); + + // Asking `Graft::validate` to graft at block 2 (below earliest_block = 4) + // must yield `GraftBaseInvalid` with an actionable message. + let graft = Graft { + base: src.hash.clone(), + block: 2, + }; + let err = graft + .validate(store.cheap_clone()) + .await + .expect_err("graft below the prune floor must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("only retains data starting at block 4"), + "expected prune-floor error mentioning earliest block 4, got: {msg}" + ); + + // Sanity: the new check is bounded — grafting *at* the prune floor + // (block == earliest) must not trigger the prune-floor rejection. + // The graft may still fail for unrelated reasons (e.g. reorg + // threshold), but never with the prune-floor message. + let graft_at_floor = Graft { + base: src.hash.clone(), + block: 4, + }; + if let Err(err) = graft_at_floor.validate(store.cheap_clone()).await { + let msg = err.to_string(); + assert!( + !msg.contains("only retains data starting"), + "graft at the prune floor must not trigger the prune-floor \ + rejection; got: {msg}" + ); + } + Ok(()) + }) +} From eb7bebeb683defc2734f82cc110b9251ad69f8d8 Mon Sep 17 00:00:00 2001 From: rslowinski Date: Wed, 3 Jun 2026 15:54:26 +0200 Subject: [PATCH 2/3] store: enforce graft prune-floor invariant in start_subgraph MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a pre-copy check rejecting graft.block < src.earliest_block_number, mirroring the manifest-level check in Graft::validate. Callers that reach DeploymentStore::start_subgraph directly (graphman, custom tooling, test-store helpers) bypass the registrar and therefore the manifest validator; the store-level check guarantees the invariant is enforced for every graft. Also add a post-copy check that fails the transaction if the source's earliest_block_number advanced past the graft block while the copy was running — the prune-during-copy race that copy_earliest_block already acknowledges. Partial copied data remains in the destination's tables and must be cleaned up by deleting and recreating the deployment; document this in the error message. --- store/postgres/src/deployment_store.rs | 62 +++++++++++++++++++++ store/test-store/tests/postgres/graft.rs | 70 ++++++++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index d764d451ad6..456abb1be73 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1562,6 +1562,35 @@ impl DeploymentStore { dst.catalog.site.namespace ); + // Defense in depth against grafting below the base's prune + // floor. `Graft::validate` enforces this at manifest validation + // time, but callers that reach this code path directly + // (graphman, tests, custom tooling) bypass the registrar. The + // copy reads entity versions whose `lower(block_range) <= block`; + // if the version live at `block` has already been pruned, those + // versions are silently missing and the resulting graft has + // reset state for any entity whose live-at-graft version was + // closed. + let src_earliest_before = { + let mut conn = self.pool.get_permitted().await?; + deployment::state(&mut conn, &src.site) + .await? + .earliest_block_number + }; + if block.number < src_earliest_before { + return Err(StoreError::Unknown(anyhow::anyhow!( + "cannot graft `{}` onto `{}` at block {} because the base \ + subgraph only retains data starting at block {}; earlier \ + blocks have been pruned. Graft at block {} or later, or \ + use a base subgraph with sufficient retention.", + dst.catalog.site.namespace, + src.catalog.site.namespace, + block.number, + src_earliest_before, + src_earliest_before, + ))); + } + let src_manifest_idx_and_name = src_deployment.manifest.template_idx_and_name()?; let dst_manifest_idx_and_name = self .load_deployment(dst.site.clone()) @@ -1660,6 +1689,39 @@ impl DeploymentStore { // removed data just before we copied it deployment::copy_earliest_block(conn, &src.site, &dst.site).await?; + // Detect the prune-during-copy race: if the base pruned + // past our graft block while the copy was running, the + // `copy_earliest_block` call above propagated that floor + // into `dst`, leaving us with `earliest_block > graft_block` + // — a structurally invalid graft. Fail the transaction so + // the bookkeeping is rolled back; partial copied data + // remains in `dst`'s tables and will require operator + // cleanup (e.g. delete + redeploy) before retrying. + // + // We re-read the source's earliest rather than the + // destination's because the destination row exists but has + // not yet been advanced past genesis (`forward_block_ptr` + // below is what sets its head), so `deployment::state` on + // the destination would error with "has not started + // syncing yet". The source has a head and is the value + // `copy_earliest_block` just propagated. + let src_earliest_after = deployment::state(conn, &src.site) + .await? + .earliest_block_number; + if src_earliest_after > block.number { + return Err(StoreError::Unknown(anyhow::anyhow!( + "graft of `{}` at block {} was invalidated mid-copy: \ + the base subgraph `{}` was pruned past the graft block \ + (earliest is now {}). The deployment must be deleted \ + and recreated, ideally targeting a graft block within \ + the base's retained range.", + dst.catalog.site.namespace, + block.number, + src.catalog.site.namespace, + src_earliest_after, + ))); + } + // Set the block ptr to the graft point to signal that we successfully // performed the graft crate::deployment::forward_block_ptr(conn, &dst.site, &block).await?; diff --git a/store/test-store/tests/postgres/graft.rs b/store/test-store/tests/postgres/graft.rs index c9417d84ada..3cb909695f3 100644 --- a/store/test-store/tests/postgres/graft.rs +++ b/store/test-store/tests/postgres/graft.rs @@ -746,3 +746,73 @@ fn graft_validate_rejects_below_prune_floor() { Ok(()) }) } + +/// The store-level graft path (`DeploymentStore::start_subgraph`) rejects a +/// graft whose block is below the base subgraph's `earliest_block_number`. +/// +/// This is the defense-in-depth check that complements +/// [`graft_validate_rejects_below_prune_floor`]: it covers callers that +/// reach the store path without going through `Graft::validate` (graphman, +/// custom deploy tooling, and `test_store::create_subgraph` itself). Without +/// this check, the copy reads entity versions where +/// `lower(block_range) <= block` and silently misses the version live at +/// `block` whenever pruning removed it, leaving heavily-updated mutable +/// entities reset to their default state in the grafted subgraph. +#[test] +fn graft_store_path_rejects_below_prune_floor() { + struct Progress; + impl PruneReporter for Progress {} + + run_test(|store, src| async move { + // Same pruned-base setup as `graft_validate_rejects_below_prune_floor`: + // update user 2 at block 3 so its live-at-block-2 version `[1,3)` + // becomes a closed historical version, then prune with + // `history_blocks = 2` over `[0, 6]` so `earliest_block = 4`. + let user2_v2 = create_test_entity( + "2", + USER, + "Cindini", + "dinici@email.com", + 44_i32, + 157.1, + true, + Some("red"), + 4, + ); + transact_and_wait(&store, &src, BLOCKS[3].clone(), vec![user2_v2]) + .await + .unwrap(); + transact_and_wait(&store, &src, BLOCKS[6].clone(), vec![]) + .await + .unwrap(); + let req = PruneRequest::new(&src, 2, 1, 0, 6)?; + store + .prune(Box::new(Progress), &src, req) + .await + .expect("pruning works"); + + // Grafting at block 2 must be rejected even though + // `test_store::create_subgraph` bypasses `Graft::validate`. The + // store-level pre-copy check fails the start with an actionable + // error before any copy work happens. + let graft_id = DeploymentHash::new("grafted_below_floor").unwrap(); + let err = + create_grafted_subgraph(&graft_id, GRAFT_GQL, src.hash.as_str(), BLOCKS[2].clone()) + .await + .expect_err("graft below prune floor must be rejected at the store layer"); + let msg = err.to_string(); + assert!( + msg.contains("only retains data starting at block 4"), + "expected prune-floor error mentioning earliest block 4, got: {msg}" + ); + + // Grafting at the prune floor (block 4) is still allowed: the live + // versions there survived pruning and the copy can use them. + let graft_ok = DeploymentHash::new("grafted_at_floor").unwrap(); + create_grafted_subgraph(&graft_ok, GRAFT_GQL, src.hash.as_str(), BLOCKS[4].clone()) + .await + .expect("graft at earliest_block must be accepted"); + + Ok(()) + }) +} From e9f7dc3824063d7efa0d8d20100876b15addb71a Mon Sep 17 00:00:00 2001 From: rslowinski Date: Wed, 3 Jun 2026 15:59:55 +0200 Subject: [PATCH 3/3] store: take the max of source and destination history_blocks during graft The graft finalization previously overwrote the destination's history_blocks with the source's value unconditionally. As a result, a destination subgraph manifesting `prune: never` (history_blocks = BLOCK_NUMBER_MAX) grafted from a `prune: auto` source silently inherited the source's much smaller retention window, contradicting the destination's manifest. Use max(src, dst) instead. The destination's retention is at least as long as the source's (otherwise the inherited copied data would be immediately eligible for pruning) while a destination that requests longer retention is no longer silently downgraded. Also expose `SubgraphStore::history_blocks` mirroring `set_history_blocks`, so the test can verify the post-graft retention value. --- store/postgres/src/deployment_store.rs | 27 ++++++++++--- store/postgres/src/subgraph_store.rs | 12 ++++++ store/test-store/tests/postgres/graft.rs | 49 ++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 6 deletions(-) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 456abb1be73..22e16049155 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1592,11 +1592,11 @@ impl DeploymentStore { } let src_manifest_idx_and_name = src_deployment.manifest.template_idx_and_name()?; - let dst_manifest_idx_and_name = self - .load_deployment(dst.site.clone()) - .await? - .manifest - .template_idx_and_name()?; + // Keep `dst_deployment` bound so we can read its + // `history_blocks` below; otherwise the chained + // `.manifest.template_idx_and_name()?` would consume it. + let dst_deployment = self.load_deployment(dst.site.clone()).await?; + let dst_manifest_idx_and_name = dst_deployment.manifest.template_idx_and_name()?; // Copy subgraph data // We allow both not copying tables at all from the source, as well @@ -1673,10 +1673,25 @@ impl DeploymentStore { info!(logger, "Rewound subgraph to block {}", block.number; "time_ms" => start.elapsed().as_millis()); + // Use the *maximum* of the source's and destination's + // `history_blocks` rather than overwriting the destination + // with the source's value. The destination has just + // received the source's full retained history, so its + // retention must be at least as long as the source's + // (otherwise the inherited data would be immediately + // eligible for pruning); but if the destination's + // manifest requests longer retention (notably + // `prune: never`, which yields `BLOCK_NUMBER_MAX`), that + // intent must be honoured. The previous unconditional + // overwrite silently downgraded `prune: never` children + // to whatever retention the parent used. deployment::set_history_blocks( conn, &dst.site, - src_deployment.manifest.history_blocks, + src_deployment + .manifest + .history_blocks + .max(dst_deployment.manifest.history_blocks), ) .await?; diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index b4435cae0cb..466696ba07d 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1520,6 +1520,18 @@ impl Inner { .await } + /// Return the current `history_blocks` retention setting recorded in + /// the deployment's manifest. Mirrors [`Self::set_history_blocks`] and + /// is useful for tooling and tests that need to inspect the effective + /// retention after operations such as grafting. + pub async fn history_blocks( + &self, + deployment: &DeploymentLocator, + ) -> Result { + let site = self.find_site(deployment.id.into()).await?; + Ok(self.load_deployment(site).await?.manifest.history_blocks) + } + pub async fn load_deployment( &self, site: Arc, diff --git a/store/test-store/tests/postgres/graft.rs b/store/test-store/tests/postgres/graft.rs index 3cb909695f3..a11cf80b3e0 100644 --- a/store/test-store/tests/postgres/graft.rs +++ b/store/test-store/tests/postgres/graft.rs @@ -816,3 +816,52 @@ fn graft_store_path_rejects_below_prune_floor() { Ok(()) }) } + +/// Grafting must not silently shorten the destination's `history_blocks` +/// retention. +/// +/// Before the fix, `start_subgraph` unconditionally wrote the source +/// subgraph's `history_blocks` into the destination, so a `prune: never` +/// child grafted from a `prune: auto` parent silently inherited the +/// parent's aggressive retention window. The fix takes the *maximum* of +/// the two, so the destination keeps at least as much history as it has +/// (the inherited copied data) but is never downgraded from a longer +/// retention it explicitly requested. +/// +/// The test setup matches that scenario: the destination is built by +/// `test_store::create_subgraph`, whose manifest has `indexer_hints: None` +/// and therefore reports `history_blocks = BLOCK_NUMBER_MAX` (the +/// `prune: never` equivalent), while we manually lower the source's +/// retention so old / new behaviour are observably different. +#[test] +fn graft_history_blocks_takes_max_of_parent_and_child() { + run_test(|store, src| async move { + // Lower the source's `history_blocks` to a small value so that + // the "max vs overwrite" outcomes diverge measurably. + let src_hb = 2; + let reorg_threshold = 1; + store + .set_history_blocks(&src, src_hb, reorg_threshold) + .await + .expect("lowering source history_blocks"); + + let graft_id = DeploymentHash::new("grafted_history_blocks").unwrap(); + let dst = + create_grafted_subgraph(&graft_id, GRAFT_GQL, src.hash.as_str(), BLOCKS[1].clone()) + .await + .expect("graft succeeds"); + + let dst_hb = store + .history_blocks(&dst) + .await + .expect("read dst history_blocks"); + assert_eq!( + dst_hb, BLOCK_NUMBER_MAX, + "graft must keep the destination's longer retention \ + (BLOCK_NUMBER_MAX = {BLOCK_NUMBER_MAX}) rather than overwriting it \ + with the source's smaller value ({src_hb})" + ); + + Ok(()) + }) +}