diff --git a/.github/workflows/on_push.yml b/.github/workflows/on_push.yml index 668fa3c9..c6d155ca 100644 --- a/.github/workflows/on_push.yml +++ b/.github/workflows/on_push.yml @@ -9,8 +9,6 @@ on: jobs: test: uses: ./.github/workflows/test.yml - test-crdb: - uses: ./.github/workflows/test_crdb.yml publish: needs: test uses: ./.github/workflows/publish.yml diff --git a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java index 1fe7d686..ea890bfc 100644 --- a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java +++ b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java @@ -7,8 +7,10 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; import javax.sql.DataSource; @@ -19,6 +21,12 @@ public class MigrationManager { private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); + private static final Set ONLINE_MIGRATIONS = + Set.of(22, 23, 24, 25, 26, 27, 29, 30, 31, 32, 34, 35); + + private static final long MIGRATION_LOCK_ID = 1234567890L; + private static final int MIGRATION_LOCK_TIMEOUT_SEC = 30; + public static void runMigrations(DBOSConfig config) { Objects.requireNonNull(config, "DBOS Config must not be null"); @@ -46,6 +54,30 @@ public static void runMigrations( } } + private static boolean shouldMigrate( + Connection conn, String schema, boolean useListenNotify, boolean isCockroach) + throws SQLException { + var schemaSql = "SELECT 1 FROM information_schema.schemata WHERE schema_name = ?"; + try (var stmt = conn.prepareStatement(schemaSql)) { + stmt.setString(1, schema); + try (var rs = stmt.executeQuery()) { + if (!rs.next()) return true; + } + } + var tableSql = + "SELECT 1 FROM information_schema.tables" + + " WHERE table_schema = ? AND table_name = 'dbos_migrations'"; + try (var stmt = conn.prepareStatement(tableSql)) { + stmt.setString(1, schema); + try (var rs = stmt.executeQuery()) { + if (!rs.next()) return true; + } + } + var currentVersion = getCurrentSysDbVersion(conn, schema); + var latestVersion = getMigrations(schema, useListenNotify, isCockroach).size(); + return currentVersion < latestVersion; + } + private static void runMigrations(DataSource ds, String schema, boolean useListenNotify) { Objects.requireNonNull(ds, "Data Source must not be null"); schema = SystemDatabase.sanitizeSchema(schema); @@ -54,17 +86,72 @@ private static void runMigrations(DataSource ds, String schema, boolean useListe throw new IllegalArgumentException("Schema name must not contain single or double quotes"); } - try (var conn = ds.getConnection()) { + try (var checkConn = ds.getConnection()) { + var isCockroach = SystemDatabase.isCockroach(checkConn); + if (isCockroach) { + useListenNotify = false; + } + + // Skip advisory lock and migration work entirely if already up-to-date. + if (!shouldMigrate(checkConn, schema, useListenNotify, isCockroach)) { + return; + } + } catch (SQLException e) { + throw new RuntimeException("Failed to run migrations", e); + } + + // Use a dedicated connection held in autocommit mode to hold the session-level advisory + // lock for the entire migration run. Keeping the lock on a separate connection prevents + // CockroachDB (and other databases) from releasing the lock when migration transactions + // commit on the main connection. + try (var lockConn = ds.getConnection(); + var migrConn = ds.getConnection()) { - var isCockroach = SystemDatabase.isCockroach(conn); + var isCockroach = SystemDatabase.isCockroach(migrConn); if (isCockroach) { useListenNotify = false; } - ensureDbosSchema(conn, schema); - ensureMigrationTable(conn, schema); - var migrations = getMigrations(schema, useListenNotify); - runDbosMigrations(conn, schema, migrations); + boolean locked = false; + lockConn.setAutoCommit(true); + long deadline = System.currentTimeMillis() + MIGRATION_LOCK_TIMEOUT_SEC * 1000L; + while (true) { + try (var stmt = lockConn.prepareStatement("SELECT pg_try_advisory_lock(?)")) { + stmt.setLong(1, MIGRATION_LOCK_ID); + try (var rs = stmt.executeQuery()) { + if (rs.next() && rs.getBoolean(1)) { + locked = true; + break; + } + } + } + if (System.currentTimeMillis() >= deadline) { + logger.warn( + "Could not acquire migration advisory lock within {}s. Attempting migrations without lock.", + MIGRATION_LOCK_TIMEOUT_SEC); + break; + } + Thread.sleep(1000); + } + + try { + ensureDbosSchema(migrConn, schema); + ensureMigrationTable(migrConn, schema); + var migrations = getMigrations(schema, useListenNotify, isCockroach); + runDbosMigrations(migrConn, schema, migrations, isCockroach); + } finally { + if (locked) { + try (var stmt = lockConn.prepareStatement("SELECT pg_advisory_unlock(?)")) { + stmt.setLong(1, MIGRATION_LOCK_ID); + stmt.execute(); + } catch (SQLException e) { + logger.warn("Failed to release migration advisory lock", e); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Migration interrupted while waiting for advisory lock", e); } catch (SQLException e) { throw new RuntimeException("Failed to run migrations", e); } @@ -182,18 +269,39 @@ public static int getCurrentSysDbVersion(Connection conn, String schema) { private static boolean notificationsPrimaryKeyExists(Connection conn, String schema) throws SQLException { - var sql = - "SELECT 1 FROM information_schema.table_constraints" - + " WHERE table_schema = ? AND table_name = 'notifications' AND constraint_type = 'PRIMARY KEY'"; - try (var stmt = conn.prepareStatement(sql)) { - stmt.setString(1, schema); - try (var rs = stmt.executeQuery()) { - return rs.next(); - } + try (var rs = conn.getMetaData().getPrimaryKeys(null, schema, "notifications")) { + return rs.next(); } } static void runDbosMigrations(Connection conn, String schema, List migrations) { + try { + runDbosMigrations(conn, schema, migrations, SystemDatabase.isCockroach(conn)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @FunctionalInterface + private interface SqlAction { + void run(Connection conn) throws SQLException; + } + + private static void runInTransaction(Connection conn, SqlAction action) throws SQLException { + conn.setAutoCommit(false); + try { + action.run(conn); + conn.commit(); + } catch (SQLException e) { + conn.rollback(); + throw e; + } finally { + conn.setAutoCommit(true); + } + } + + static void runDbosMigrations( + Connection conn, String schema, List migrations, boolean isCockroach) { Objects.requireNonNull(schema, "schema must not be null"); var lastApplied = getCurrentSysDbVersion(conn, schema); @@ -205,53 +313,94 @@ static void runDbosMigrations(Connection conn, String schema, List migra logger.info("Applying DBOS system database schema migration {}", migrationIndex); - // Migration 10 adds a primary key to notifications. Skip the DDL if one already exists - // (guard for installs that were created before the primary key was added to migration 1). - boolean skipMigration = false; - if (migrationIndex == 10) { - try { - skipMigration = notificationsPrimaryKeyExists(conn, schema); - } catch (SQLException e) { - throw new RuntimeException("Failed to check notifications primary key", e); - } - if (skipMigration) { - logger.info("Migration 10 skipped, primary key already exists"); - } - } - - if (!skipMigration) { - try (var stmt = conn.createStatement()) { - stmt.execute(migrations.get(i)); - } catch (SQLException e) { - throw new RuntimeException("Failed to run migration %d".formatted(migrationIndex), e); - } - } + var migrationSql = migrations.get(i); + var versionBefore = lastApplied; try { - if (lastApplied == 0) { - var sql = "INSERT INTO \"%s\".dbos_migrations (version) VALUES (?)".formatted(schema); - try (var stmt = conn.prepareStatement(sql)) { - stmt.setLong(1, migrationIndex); - stmt.executeUpdate(); + if (migrationSql.isBlank()) { + // No DDL (e.g. migration 20 on CockroachDB); just record the version. + logger.info("Migration {} has no statements; skipping.", migrationIndex); + runInTransaction( + conn, c -> bumpMigrationVersion(c, schema, migrationIndex, versionBefore)); + } else if (migrationIndex == 10 && notificationsPrimaryKeyExists(conn, schema)) { + // Migration 10 adds a primary key to notifications. Skip the DDL if one already exists + // (guard for installs created before the primary key was added to migration 1). + logger.info("Migration 10 skipped, primary key already exists"); + runInTransaction( + conn, c -> bumpMigrationVersion(c, schema, migrationIndex, versionBefore)); + } else if (ONLINE_MIGRATIONS.contains(migrationIndex) && !isCockroach) { + // CONCURRENTLY index DDL cannot run inside a transaction. Clean up any indexes left + // invalid by a prior failed attempt, run the DDL in autocommit, then bump the version + // in its own transaction. + cleanupInvalidIndexes(conn, schema); + try (var stmt = conn.createStatement()) { + stmt.execute(migrationSql); } + runInTransaction( + conn, c -> bumpMigrationVersion(c, schema, migrationIndex, versionBefore)); } else { - var sql = "UPDATE \"%s\".dbos_migrations SET version = ?".formatted(schema); - try (var stmt = conn.prepareStatement(sql)) { - stmt.setLong(1, migrationIndex); - stmt.executeUpdate(); - } + // Standard migration: DDL and version bump in one transaction. + runInTransaction( + conn, + c -> { + try (var stmt = c.createStatement()) { + stmt.execute(migrationSql); + } + bumpMigrationVersion(c, schema, migrationIndex, versionBefore); + }); } } catch (SQLException e) { - throw new RuntimeException("Failed to update dbos migration version", e); + throw new RuntimeException("Failed to run migration %d".formatted(migrationIndex), e); } lastApplied = migrationIndex; } } - public static List getMigrations(String schema, boolean useListenNotify) { - Objects.requireNonNull(schema); + private static void bumpMigrationVersion( + Connection conn, String schema, int version, int versionBefore) throws SQLException { + if (versionBefore == 0) { + var sql = "INSERT INTO \"%s\".dbos_migrations (version) VALUES (?)".formatted(schema); + try (var stmt = conn.prepareStatement(sql)) { + stmt.setLong(1, version); + stmt.executeUpdate(); + } + } else { + var sql = "UPDATE \"%s\".dbos_migrations SET version = ?".formatted(schema); + try (var stmt = conn.prepareStatement(sql)) { + stmt.setLong(1, version); + stmt.executeUpdate(); + } + } + } + + private static void cleanupInvalidIndexes(Connection conn, String schema) throws SQLException { + var sql = + "SELECT i.relname FROM pg_index ix " + + "JOIN pg_class i ON i.oid = ix.indexrelid " + + "JOIN pg_class t ON t.oid = ix.indrelid " + + "JOIN pg_namespace n ON n.oid = t.relnamespace " + + "WHERE NOT ix.indisvalid AND n.nspname = ?"; + var invalidIndexes = new ArrayList(); + try (var stmt = conn.prepareStatement(sql)) { + stmt.setString(1, schema); + try (var rs = stmt.executeQuery()) { + while (rs.next()) { + invalidIndexes.add(rs.getString(1)); + } + } + } + for (var idxName : invalidIndexes) { + logger.warn("Dropping invalid index {}.{} left by a prior failed migration", schema, idxName); + try (var stmt = conn.createStatement()) { + stmt.execute("DROP INDEX CONCURRENTLY IF EXISTS \"%s\".\"%s\"".formatted(schema, idxName)); + } + } + } + public static List getMigrations( + String schema, boolean useListenNotify, boolean isCockroach) { + Objects.requireNonNull(schema); var migrations = List.of( migration1(useListenNotify), @@ -272,7 +421,23 @@ public static List getMigrations(String schema, boolean useListenNotify) MIGRATION_16, MIGRATION_17, MIGRATION_18, - MIGRATION_19); + MIGRATION_19, + migration20(useListenNotify, isCockroach), + MIGRATION_21, + migration22(isCockroach), + migration23(isCockroach), + migration24(isCockroach), + migration25(isCockroach), + migration26(isCockroach), + migration27(isCockroach), + migration28(isCockroach), + migration29(isCockroach), + migration30(isCockroach), + migration31(isCockroach), + migration32(isCockroach), + MIGRATION_33, + migration34(isCockroach), + migration35(isCockroach)); return migrations.stream().map(m -> m.formatted(schema)).toList(); } @@ -628,4 +793,147 @@ ON CONFLICT (workflow_uuid) """ CREATE INDEX "idx_operation_outputs_completed_at_function_name" ON "%1$s"."operation_outputs" ("completed_at_epoch_ms", "function_name"); """; + + static String migration20(boolean useListenNotify, boolean isCockroach) { + if (isCockroach) return ""; + var m = + """ + ALTER FUNCTION "%1$s".enqueue_workflow( + TEXT, TEXT, JSON[], JSON, TEXT, TEXT, TEXT, TEXT, BIGINT, BIGINT, TEXT, INTEGER, TEXT + ) SET search_path = pg_catalog, pg_temp; + + ALTER FUNCTION "%1$s".send_message( + TEXT, JSON, TEXT, TEXT + ) SET search_path = pg_catalog, pg_temp; + """; + if (useListenNotify) { + m += + """ + ALTER FUNCTION "%1$s".notifications_function() SET search_path = pg_catalog, pg_temp; + ALTER FUNCTION "%1$s".workflow_events_function() SET search_path = pg_catalog, pg_temp; + """; + } + return m; + } + + static final String MIGRATION_21 = + """ + CREATE TABLE "%1$s".queues ( + queue_id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::TEXT, + name TEXT NOT NULL UNIQUE, + concurrency INTEGER, + worker_concurrency INTEGER, + rate_limit_max INTEGER, + rate_limit_period_sec DOUBLE PRECISION, + priority_enabled BOOLEAN NOT NULL DEFAULT FALSE, + partition_queue BOOLEAN NOT NULL DEFAULT FALSE, + polling_interval_sec DOUBLE PRECISION NOT NULL DEFAULT 1.0, + created_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000.0)::bigint, + updated_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000.0)::bigint + ); + """; + + private static String concurrently(boolean isCockroach) { + return isCockroach ? "" : "CONCURRENTLY"; + } + + static String migration22(boolean isCockroach) { + return "DROP INDEX " + + concurrently(isCockroach) + + " IF EXISTS \"%1$s\".\"idx_workflow_status_forked_from\""; + } + + static String migration23(boolean isCockroach) { + return "CREATE INDEX " + + concurrently(isCockroach) + + " IF NOT EXISTS \"idx_workflow_status_forked_from\"" + + " ON \"%1$s\".\"workflow_status\" (\"forked_from\") WHERE \"forked_from\" IS NOT NULL"; + } + + static String migration24(boolean isCockroach) { + return "DROP INDEX " + + concurrently(isCockroach) + + " IF EXISTS \"%1$s\".\"idx_workflow_status_parent_workflow_id\""; + } + + static String migration25(boolean isCockroach) { + return "CREATE INDEX " + + concurrently(isCockroach) + + " IF NOT EXISTS \"idx_workflow_status_parent_workflow_id\"" + + " ON \"%1$s\".\"workflow_status\" (\"parent_workflow_id\")" + + " WHERE \"parent_workflow_id\" IS NOT NULL"; + } + + static String migration26(boolean isCockroach) { + return "DROP INDEX " + + concurrently(isCockroach) + + " IF EXISTS \"%1$s\".\"workflow_status_executor_id_index\""; + } + + static String migration27(boolean isCockroach) { + // New partial unique index uses a different name to avoid collision with the old constraint + return "CREATE UNIQUE INDEX " + + concurrently(isCockroach) + + " IF NOT EXISTS \"uq_workflow_status_dedup_id\"" + + " ON \"%1$s\".\"workflow_status\" (\"queue_name\", \"deduplication_id\")" + + " WHERE \"deduplication_id\" IS NOT NULL"; + } + + static String migration28(boolean isCockroach) { + // CockroachDB implements unique constraints as indexes and rejects ALTER TABLE DROP CONSTRAINT; + // Postgres rejects DROP INDEX on a constraint-backed index. + if (isCockroach) { + return "DROP INDEX IF EXISTS \"%1$s\".\"uq_workflow_status_queue_name_dedup_id\" CASCADE"; + } + return "ALTER TABLE \"%1$s\".workflow_status" + + " DROP CONSTRAINT IF EXISTS uq_workflow_status_queue_name_dedup_id"; + } + + static String migration29(boolean isCockroach) { + return "CREATE INDEX " + + concurrently(isCockroach) + + " IF NOT EXISTS \"idx_workflow_status_pending\"" + + " ON \"%1$s\".\"workflow_status\" (\"created_at\") WHERE \"status\" = 'PENDING'"; + } + + static String migration30(boolean isCockroach) { + return "CREATE INDEX " + + concurrently(isCockroach) + + " IF NOT EXISTS \"idx_workflow_status_failed\"" + + " ON \"%1$s\".\"workflow_status\" (\"status\", \"created_at\")" + + " WHERE \"status\" IN ('ERROR', 'CANCELLED', 'MAX_RECOVERY_ATTEMPTS_EXCEEDED')"; + } + + static String migration31(boolean isCockroach) { + return "DROP INDEX " + + concurrently(isCockroach) + + " IF EXISTS \"%1$s\".\"workflow_status_status_index\""; + } + + static String migration32(boolean isCockroach) { + return "CREATE INDEX " + + concurrently(isCockroach) + + " IF NOT EXISTS \"idx_workflow_status_in_flight\"" + + " ON \"%1$s\".\"workflow_status\" (\"queue_name\", \"status\", \"priority\", \"created_at\")" + + " WHERE \"status\" IN ('ENQUEUED', 'PENDING')"; + } + + // ALTER TABLE ADD COLUMN with constant default is a fast catalog-only update on Postgres. + static final String MIGRATION_33 = + "ALTER TABLE \"%1$s\".\"workflow_status\"" + + " ADD COLUMN IF NOT EXISTS \"rate_limited\" BOOLEAN NOT NULL DEFAULT FALSE"; + + static String migration34(boolean isCockroach) { + return "CREATE INDEX " + + concurrently(isCockroach) + + " IF NOT EXISTS \"idx_workflow_status_rate_limited\"" + + " ON \"%1$s\".\"workflow_status\" (\"queue_name\", \"started_at_epoch_ms\")" + + " WHERE \"rate_limited\" = TRUE"; + } + + static String migration35(boolean isCockroach) { + return "DROP INDEX " + + concurrently(isCockroach) + + " IF EXISTS \"%1$s\".\"idx_workflow_status_queue_status_started\""; + } } diff --git a/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java b/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java index 7f9bd918..a3784164 100644 --- a/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java +++ b/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java @@ -15,6 +15,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.concurrent.CopyOnWriteArrayList; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.Assertions; @@ -88,7 +89,10 @@ void testRunMigrations_CreatesTables() throws Exception { } } - var migrations = new ArrayList<>(MigrationManager.getMigrations(Constants.DB_SCHEMA, true)); + var migrations = + new ArrayList<>( + MigrationManager.getMigrations( + Constants.DB_SCHEMA, true, PgContainer.USE_COCKROACH_DB)); var version = getVersion(conn); assertEquals(migrations.size(), version); } @@ -153,7 +157,9 @@ void testRunMigrations_customSchema(String schema) throws Exception { } } - var migrations = new ArrayList<>(MigrationManager.getMigrations(schema, true)); + var migrations = + new ArrayList<>( + MigrationManager.getMigrations(schema, true, PgContainer.USE_COCKROACH_DB)); var version = getVersion(conn, schema); assertEquals(migrations.size(), version); } @@ -201,7 +207,10 @@ void testRunMigrations_IsIdempotent() throws Exception { void testAddingNewMigration() throws Exception { testRunMigrations_CreatesTables(); - var migrations = new ArrayList<>(MigrationManager.getMigrations(Constants.DB_SCHEMA, true)); + var migrations = + new ArrayList<>( + MigrationManager.getMigrations( + Constants.DB_SCHEMA, true, PgContainer.USE_COCKROACH_DB)); migrations.add("CREATE TABLE dummy_table(id SERIAL PRIMARY KEY);"); try (var conn = dataSource.getConnection()) { @@ -273,7 +282,8 @@ void testOriginalMigration1ThenAllMigrations_NotificationsPrimaryKey() throws Ex assertTableExists(metaData, "notifications"); // Now run all current migrations (including migration10 which ensures primary key) - var allMigrations = MigrationManager.getMigrations(Constants.DB_SCHEMA, true); + var allMigrations = + MigrationManager.getMigrations(Constants.DB_SCHEMA, true, PgContainer.USE_COCKROACH_DB); MigrationManager.runDbosMigrations(conn, Constants.DB_SCHEMA, allMigrations); // Verify that the notifications table has a primary key @@ -285,6 +295,187 @@ void testOriginalMigration1ThenAllMigrations_NotificationsPrimaryKey() throws Ex } } + @Test + void testConcurrentMigrations() throws Exception { + Assumptions.assumeFalse( + PgContainer.USE_COCKROACH_DB, "PG-only: relies on Postgres advisory locks"); + var dbosConfig = pgContainer.dbosConfig(); + int numInstances = 10; + + var errors = new CopyOnWriteArrayList(); + var threads = new ArrayList(); + for (int i = 0; i < numInstances; i++) { + final int idx = i; + threads.add( + new Thread( + () -> { + try { + MigrationManager.runMigrations(dbosConfig); + } catch (Throwable e) { + errors.add( + "Instance %d: %s: %s" + .formatted(idx, e.getClass().getSimpleName(), e.getMessage())); + } + })); + } + for (var t : threads) t.start(); + for (var t : threads) t.join(); + + assertTrue( + errors.isEmpty(), + "%d/%d concurrent migrations failed:\n%s" + .formatted(errors.size(), numInstances, String.join("\n", errors))); + + try (var conn = dataSource.getConnection()) { + var migrations = + MigrationManager.getMigrations(Constants.DB_SCHEMA, true, PgContainer.USE_COCKROACH_DB); + assertEquals(migrations.size(), getVersion(conn)); + } + } + + @Test + void testOnlineMigrationsAreIdempotent() throws Exception { + Assumptions.assumeFalse(PgContainer.USE_COCKROACH_DB, "PG-only online migration test"); + + var dbosConfig = pgContainer.dbosConfig(); + MigrationManager.runMigrations(dbosConfig); + + var schema = Constants.DB_SCHEMA; + var migrations = MigrationManager.getMigrations(schema, true, false); + // min(ONLINE_MIGRATIONS) == 22, so rewind to 21 + int rewindTo = 21; + int expectedFinal = migrations.size(); + + try (var conn = dataSource.getConnection(); + var stmt = conn.createStatement()) { + stmt.executeUpdate( + "UPDATE \"%s\".dbos_migrations SET version = %d".formatted(schema, rewindTo)); + } + + assertDoesNotThrow( + () -> MigrationManager.runMigrations(dbosConfig), + "Re-running online migrations against an already-migrated schema must succeed"); + + try (var conn = dataSource.getConnection()) { + assertEquals(expectedFinal, getVersion(conn)); + } + } + + @Test + void testVersionNotBumpedOnMigrationFailure() throws Exception { + Assumptions.assumeFalse( + PgContainer.USE_COCKROACH_DB, "PG-only: tests the online migration code path"); + + var dbosConfig = pgContainer.dbosConfig(); + MigrationManager.runMigrations(dbosConfig); + + var schema = Constants.DB_SCHEMA; + int rewindTo = 31; // one before migration 32 + var allMigrations = MigrationManager.getMigrations(schema, true, false); + int expectedFinal = allMigrations.size(); + + // Rewind so migration 32 is pending again + try (var conn = dataSource.getConnection(); + var stmt = conn.createStatement()) { + stmt.executeUpdate( + "UPDATE \"%s\".dbos_migrations SET version = %d".formatted(schema, rewindTo)); + } + + // Replace migration 32 (list index 31) with invalid SQL — must throw without bumping version + var patchedMigrations = new ArrayList<>(allMigrations); + patchedMigrations.set(31, "THIS IS NOT VALID SQL"); + + try (var conn = dataSource.getConnection()) { + assertThrows( + RuntimeException.class, + () -> MigrationManager.runDbosMigrations(conn, schema, patchedMigrations)); + } + + try (var conn = dataSource.getConnection()) { + assertEquals(rewindTo, getVersion(conn)); + } + + // Re-run with real migrations: IF NOT EXISTS guards make 32+ idempotent given the index + // still exists from the original full migration run above. + try (var conn = dataSource.getConnection()) { + MigrationManager.runDbosMigrations(conn, schema, allMigrations); + } + + try (var conn = dataSource.getConnection()) { + assertEquals(expectedFinal, getVersion(conn)); + } + } + + @Test + void testRunnerResumesAfterInvalidIndex() throws Exception { + Assumptions.assumeFalse(PgContainer.USE_COCKROACH_DB, "PG-only: relies on pg_index.indisvalid"); + + var dbosConfig = pgContainer.dbosConfig(); + MigrationManager.runMigrations(dbosConfig); + + var schema = Constants.DB_SCHEMA; + String targetIndex = "idx_workflow_status_in_flight"; + int rewindTo = 31; // one before migration 32 which creates targetIndex + int expectedFinal = MigrationManager.getMigrations(schema, true, false).size(); + + // Drop the valid index, create a non-CONCURRENTLY copy with the same name, then mark it + // INVALID — this mimics what Postgres leaves behind when CREATE INDEX CONCURRENTLY aborts + // mid-build. + try (var conn = dataSource.getConnection()) { + conn.setAutoCommit(true); + try (var stmt = conn.createStatement()) { + stmt.execute("DROP INDEX IF EXISTS \"%s\".\"%s\"".formatted(schema, targetIndex)); + stmt.execute( + ("CREATE INDEX \"%s\" ON \"%s\".workflow_status" + + " (queue_name, status, priority, created_at)" + + " WHERE status IN ('ENQUEUED', 'PENDING')") + .formatted(targetIndex, schema)); + stmt.execute( + ("UPDATE pg_index SET indisvalid = false" + + " WHERE indexrelid = '\"%s\".\"%s\"'::regclass") + .formatted(schema, targetIndex)); + } + } + + // Confirm the planted index is INVALID + try (var conn = dataSource.getConnection(); + var stmt = + conn.prepareStatement( + "SELECT indisvalid FROM pg_index WHERE indexrelid = ?::regclass")) { + stmt.setString(1, "\"%s\".\"%s\"".formatted(schema, targetIndex)); + try (var rs = stmt.executeQuery()) { + assertTrue(rs.next()); + assertFalse(rs.getBoolean("indisvalid"), "Index should be invalid before migration re-run"); + } + } + + // Rewind version so the runner re-executes migration 32 + try (var conn = dataSource.getConnection(); + var stmt = conn.createStatement()) { + stmt.executeUpdate( + "UPDATE \"%s\".dbos_migrations SET version = %d".formatted(schema, rewindTo)); + } + + // Re-run migrations: cleanupInvalidIndexes should drop the invalid index, then 32+ rebuild it + assertDoesNotThrow(() -> MigrationManager.runMigrations(dbosConfig)); + + // Index now exists and is valid + try (var conn = dataSource.getConnection(); + var stmt = + conn.prepareStatement( + "SELECT indisvalid FROM pg_index WHERE indexrelid = ?::regclass")) { + stmt.setString(1, "\"%s\".\"%s\"".formatted(schema, targetIndex)); + try (var rs = stmt.executeQuery()) { + assertTrue(rs.next()); + assertTrue(rs.getBoolean("indisvalid"), "Index should be valid after migration re-run"); + } + } + + try (var conn = dataSource.getConnection()) { + assertEquals(expectedFinal, getVersion(conn)); + } + } + static void assertTableExists(DatabaseMetaData metaData, String tableName) throws Exception { assertTableExists(metaData, tableName, Constants.DB_SCHEMA); }