From b63f24e3ed75c4a2e1880dc9d74ce3e36ccb0bb2 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 29 May 2026 16:30:42 -0700 Subject: [PATCH 1/3] store: Create postponed indexes in the background The runner used to block block processing while postponed indexes were created. Since these are built with CREATE INDEX CONCURRENTLY and can take a long time, spawn the work in a background task so indexing continues. Errors are only logged; the postponed_indexes_created flag is set only on success, so a failed run is retried the next time the deployment starts. --- store/postgres/src/deployment_store.rs | 61 +++++++++++++++++++------- store/postgres/src/writable.rs | 4 +- 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index a69c4dcbac5..d96334e7cff 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -42,7 +42,7 @@ use graph::internal_error; use graph::prelude::{ AttributeNames, BlockNumber, BlockPtr, CheapClone, DeploymentHash, DeploymentState, ENV_VARS, Entity, EntityQuery, Error, Logger, QueryExecutionError, StopwatchMetrics, StoreError, - UnfailOutcome, Value, anyhow, debug, info, o, warn, + UnfailOutcome, Value, anyhow, debug, error, info, o, warn, }; use graph::schema::{ApiSchema, EntityKey, EntityType, InputSchema}; @@ -1686,28 +1686,55 @@ impl DeploymentStore { /// this is a one-shot operation: indexes are created exactly once, /// and we never recreate them — even if an external process removes /// indexes that it considers unused. - pub(crate) async fn create_postponed_indexes(&self, site: Arc) -> Result<(), StoreError> { - let layout = self.find_layout(site.cheap_clone()).await?; - let creat = layout.index_creator(true, true); - let mut conn = self.pool.get_permitted().await?; + /// + /// The actual index creation runs in the background so that callers are + /// not blocked while the (potentially long-running) `CREATE INDEX + /// CONCURRENTLY` statements execute. Because of that, errors are only + /// logged and not returned to the caller. The `postponed_indexes_created` + /// flag is only set once all indexes have been created successfully, so a + /// failed run is retried the next time `graph-node` starts the + /// deployment. + pub(crate) fn create_postponed_indexes(&self, site: Arc) { + async fn run(store: DeploymentStore, site: Arc) -> Result<(), StoreError> { + let layout = store.find_layout(site.cheap_clone()).await?; + let creat = layout.index_creator(true, true); + let mut conn = store.pool.get_permitted().await?; - if deployment::postponed_indexes_created(&mut conn, &site).await? { - return Ok(()); - } + if deployment::postponed_indexes_created(&mut conn, &site).await? { + return Ok(()); + } - for table in layout.tables.values() { - let indexes = table.indexes(&layout.input_schema).map_err(|e| { - StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e)) - })?; - for idx in indexes { - if idx.to_postpone() { - IndexCreator::execute(&creat, &mut conn, &idx).await?; + for table in layout.tables.values() { + let indexes = table.indexes(&layout.input_schema).map_err(|e| { + StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e)) + })?; + for idx in indexes { + if idx.to_postpone() { + IndexCreator::execute(&creat, &mut conn, &idx).await?; + } } } + + deployment::set_postponed_indexes_created(&mut conn, &site).await?; + Ok(()) } - deployment::set_postponed_indexes_created(&mut conn, &site).await?; - Ok(()) + let store = self.cheap_clone(); + let logger = Logger::new(&self.logger, o!("component" => "IndexCreation")); + graph::spawn(async move { + let logger2 = logger.cheap_clone(); + let res = retry::forever(&logger2, "create_postponed_indexes", || { + let store = store.cheap_clone(); + let site = site.cheap_clone(); + async move { run(store, site).await } + }) + .await; + match res { + Ok(()) => debug!(logger, "Created postponed indexes"), + Err(e) => error!(logger, "Failed to create postponed indexes"; + "error" => e.to_string()), + } + }); } // If the current block of the deployment is the same as the fatal error, diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index a6a782308f3..ad691ddaaf4 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -495,8 +495,8 @@ impl SyncStore { async fn create_postponed_indexes(&self) -> Result<(), StoreError> { self.writable - .create_postponed_indexes(self.site.cheap_clone()) - .await + .create_postponed_indexes(self.site.cheap_clone()); + Ok(()) } fn input_schema(&self) -> InputSchema { From 5e691b9c8aeff7932223e4616b7a37a772565f99 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 29 May 2026 16:34:14 -0700 Subject: [PATCH 2/3] store: Rebuild invalid postponed indexes left by interrupted runs A CREATE INDEX CONCURRENTLY that is interrupted (e.g. by a node restart) leaves an invalid index behind. Because postponed indexes are created with 'if not exists', such a leftover was skipped on retry and never rebuilt, silently leaving the deployment without that index. Before creating each postponed index, drop any invalid remnant so the retry rebuilds it, mirroring what create_manual_index already does. --- store/postgres/src/deployment_store.rs | 27 +++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index d96334e7cff..b7adf4fcee6 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1693,7 +1693,9 @@ impl DeploymentStore { /// logged and not returned to the caller. The `postponed_indexes_created` /// flag is only set once all indexes have been created successfully, so a /// failed run is retried the next time `graph-node` starts the - /// deployment. + /// deployment. A run that was interrupted (for example by a restart while + /// a `CREATE INDEX CONCURRENTLY` was still in flight) can leave an invalid + /// index behind; such remnants are dropped and rebuilt on the next run. pub(crate) fn create_postponed_indexes(&self, site: Arc) { async fn run(store: DeploymentStore, site: Arc) -> Result<(), StoreError> { let layout = store.find_layout(site.cheap_clone()).await?; @@ -1704,14 +1706,33 @@ impl DeploymentStore { return Ok(()); } + let schema_name = site.namespace.as_str(); for table in layout.tables.values() { let indexes = table.indexes(&layout.input_schema).map_err(|e| { StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e)) })?; for idx in indexes { - if idx.to_postpone() { - IndexCreator::execute(&creat, &mut conn, &idx).await?; + if !idx.to_postpone() { + continue; } + + // A previous run that was interrupted (e.g. by a node + // restart) can leave an invalid index behind. Since we + // create indexes with `if not exists`, such a leftover + // would be skipped and never rebuilt, so drop it first. + // `check_index_is_valid` returns `false` both when the + // index is missing and when it is invalid; the + // `drop index ... if exists` is a no-op in the former + // case and removes the invalid remnant in the latter. + if let Some(name) = idx.name() + && !catalog::check_index_is_valid(&mut conn, schema_name, &name).await? + { + let drop_sql = + format!("drop index concurrently if exists {schema_name}.{name}"); + sql_query(drop_sql).execute(&mut conn).await?; + } + + IndexCreator::execute(&creat, &mut conn, &idx).await?; } } From 7efcf6260c360872cb7de51bf009a80f48203359 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 2 Jun 2026 14:21:54 -0700 Subject: [PATCH 3/3] store: Make postponed index creation robust against concurrent builds Acquire a fresh connection for each postponed index instead of holding one for the whole run, which can take hours. Before starting a `CREATE INDEX CONCURRENTLY`, wait for any index creation already running in the deployment's schema to finish, using `index_creation_is_running` to poll `pg_stat_progress_create_index`. Have CreateIndex::name return a string slice to avoid an allocation. --- store/postgres/src/catalog.rs | 30 +++++++ store/postgres/src/deployment_store.rs | 113 +++++++++++++++++++------ store/postgres/src/relational/index.rs | 4 +- store/postgres/src/writable.rs | 2 +- 4 files changed, 122 insertions(+), 27 deletions(-) diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index fcc10bf2c93..0fbee8ff416 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -882,6 +882,36 @@ pub(crate) async fn check_index_is_valid( Ok(matches!(result, Some(true))) } +/// Check whether there is an index creation currently running for any index +/// in the given schema. If there is, return the PID of the backend that is +/// creating the index and the name of the index being created. +pub(crate) async fn index_creation_is_running( + conn: &mut AsyncPgConnection, + schema_name: &str, +) -> Result, StoreError> { + #[derive(Queryable, QueryableByName)] + struct IndexCreationCheck { + #[diesel(sql_type = Integer)] + pid: i32, + #[diesel(sql_type = Text)] + index_name: String, + } + + let query = " + select ci.pid, ci.index_relid::regclass as index_name \ + from pg_stat_progress_create_index ci \ + join pg_class c on ci.relid = c.oid \ + join pg_namespace n on c.relnamespace = n.oid \ + where n.nspname = $1"; + sql_query(query) + .bind::(schema_name) + .get_result::(conn) + .await + .optional() + .map(|check| check.map(|check| (check.pid, check.index_name))) + .map_err::(Into::into) +} + pub(crate) async fn indexes_for_table( conn: &mut AsyncPgConnection, schema_name: &str, diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index b7adf4fcee6..dd4586296fc 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1,6 +1,7 @@ use detail::DeploymentDetail; use diesel::sql_query; use diesel_async::{AsyncConnection as _, RunQueryDsl, SimpleAsyncConnection}; +use graph::util::backoff::ExponentialBackoff; use tokio::task::JoinHandle; use graph::anyhow::Context; @@ -1696,17 +1697,88 @@ impl DeploymentStore { /// deployment. A run that was interrupted (for example by a restart while /// a `CREATE INDEX CONCURRENTLY` was still in flight) can leave an invalid /// index behind; such remnants are dropped and rebuilt on the next run. - pub(crate) fn create_postponed_indexes(&self, site: Arc) { - async fn run(store: DeploymentStore, site: Arc) -> Result<(), StoreError> { - let layout = store.find_layout(site.cheap_clone()).await?; - let creat = layout.index_creator(true, true); + pub(crate) fn create_postponed_indexes(&self, logger: &Logger, site: Arc) { + async fn index_creation_is_running( + store: &DeploymentStore, + site: &Site, + ) -> Result, StoreError> { let mut conn = store.pool.get_permitted().await?; + catalog::index_creation_is_running(&mut conn, site.namespace.as_str()).await + } - if deployment::postponed_indexes_created(&mut conn, &site).await? { - return Ok(()); + async fn postponed_indexes_created( + store: &DeploymentStore, + site: &Site, + ) -> Result { + let mut conn = store.pool.get_permitted().await?; + deployment::postponed_indexes_created(&mut conn, site).await + } + + async fn wait_for_concurrent_index_creation( + logger: &Logger, + store: &DeploymentStore, + site: &Site, + ) -> Result<(), StoreError> { + let mut backoff = + ExponentialBackoff::new(Duration::from_secs(1), Duration::from_mins(5)); + let mut last_log = Instant::now() - Duration::from_mins(2); + while let Some((pid, index_name)) = index_creation_is_running(store, site).await? { + if last_log.elapsed() > Duration::from_mins(1) { + info!(logger, + "Waiting for concurrent index creation to finish"; + "pid" => pid, + "index_name" => index_name, + ); + last_log = Instant::now(); + } + backoff.sleep_async().await; } + Ok(()) + } + async fn create_index( + store: &DeploymentStore, + layout: &Layout, + site: &Site, + idx: &CreateIndex, + ) -> Result<(), StoreError> { + let mut conn = store.pool.get_permitted().await?; let schema_name = site.namespace.as_str(); + + // A previous run that was interrupted (e.g. by a node + // restart) can leave an invalid index behind. Since we + // create indexes with `if not exists`, such a leftover + // would be skipped and never rebuilt, so drop it first. + // `check_index_is_valid` returns `false` both when the + // index is missing and when it is invalid; the + // `drop index ... if exists` is a no-op in the former + // case and removes the invalid remnant in the latter. + if let Some(name) = idx.name() + && !catalog::check_index_is_valid(&mut conn, schema_name, name).await? + { + let drop_sql = format!("drop index concurrently if exists {schema_name}.{name}"); + sql_query(drop_sql).execute(&mut conn).await?; + } + + let creat = layout.index_creator(true, true); + IndexCreator::execute(&creat, &mut conn, idx).await + } + + async fn run( + logger: Logger, + store: DeploymentStore, + site: Arc, + ) -> Result<(), StoreError> { + let layout = store.find_layout(site.cheap_clone()).await?; + + // Since this entire run can take many hours, we avoid holding a + // connection for the whole time. Instead, we get a new + // connection for each index that we create. + + if postponed_indexes_created(&store, &site).await? { + return Ok(()); + } + for table in layout.tables.values() { let indexes = table.indexes(&layout.input_schema).map_err(|e| { StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e)) @@ -1716,38 +1788,31 @@ impl DeploymentStore { continue; } - // A previous run that was interrupted (e.g. by a node - // restart) can leave an invalid index behind. Since we - // create indexes with `if not exists`, such a leftover - // would be skipped and never rebuilt, so drop it first. - // `check_index_is_valid` returns `false` both when the - // index is missing and when it is invalid; the - // `drop index ... if exists` is a no-op in the former - // case and removes the invalid remnant in the latter. - if let Some(name) = idx.name() - && !catalog::check_index_is_valid(&mut conn, schema_name, &name).await? - { - let drop_sql = - format!("drop index concurrently if exists {schema_name}.{name}"); - sql_query(drop_sql).execute(&mut conn).await?; - } + wait_for_concurrent_index_creation(&logger, &store, &site).await?; + + create_index(&store, &layout, &site, &idx).await?; - IndexCreator::execute(&creat, &mut conn, &idx).await?; + debug!(logger, "Created index"; + "index_name" => idx.name().unwrap_or(""), + "table_name" => table.name.as_str(), + ); } } + let mut conn = store.pool.get_permitted().await?; deployment::set_postponed_indexes_created(&mut conn, &site).await?; Ok(()) } let store = self.cheap_clone(); - let logger = Logger::new(&self.logger, o!("component" => "IndexCreation")); + let logger = Logger::new(logger, o!("component" => "IndexCreation")); graph::spawn(async move { let logger2 = logger.cheap_clone(); let res = retry::forever(&logger2, "create_postponed_indexes", || { let store = store.cheap_clone(); let site = site.cheap_clone(); - async move { run(store, site).await } + let logger = logger2.cheap_clone(); + async move { run(logger, store, site).await } }) .await; match res { diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index d28c78d7cf0..dc66a04f9a7 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -666,10 +666,10 @@ impl CreateIndex { } } - pub fn name(&self) -> Option { + pub fn name(&self) -> Option<&str> { match self { CreateIndex::Unknown { .. } => None, - CreateIndex::Parsed { name, .. } => Some(name.clone()), + CreateIndex::Parsed { name, .. } => Some(name.as_str()), } } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index ad691ddaaf4..633688aeb1a 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -495,7 +495,7 @@ impl SyncStore { async fn create_postponed_indexes(&self) -> Result<(), StoreError> { self.writable - .create_postponed_indexes(self.site.cheap_clone()); + .create_postponed_indexes(&self.logger, self.site.cheap_clone()); Ok(()) }