diff --git a/blueprints/hackers-cover-letter.md b/blueprints/hackers-cover-letter.md new file mode 100644 index 0000000000000..09beaed099ac3 --- /dev/null +++ b/blueprints/hackers-cover-letter.md @@ -0,0 +1,218 @@ +# [PATCH] recovery_pause_on_logical_slot_conflict: enable continuous logical decoding from an archive-only standby + +Draft cover letter for pgsql-hackers. Not yet sent. + +--- + +Hackers, + +This patch adds a new GUC, `recovery_pause_on_logical_slot_conflict` +(`PGC_SIGHUP`, default `off`), that makes WAL replay on a standby pause — +and later auto-resume — instead of invalidating an otherwise-healthy +logical replication slot when a catalog `PRUNE_ON_ACCESS` record's +`snapshotConflictHorizon` has overtaken the slot's `catalog_xmin`. + +## Motivation + +A logical replication slot created on a standby that receives WAL only +via `restore_command` — no streaming link to the primary — cannot feed +`hot_standby_feedback` upstream, so it has no natural way to keep the +primary's catalog horizon pinned. Without this GUC, such a slot is +invalidated the first time replay applies a catalog vacuum record whose +horizon exceeds the slot's `catalog_xmin`, typically within +~`2 * autovacuum_naptime` of slot creation. + +That makes continuous logical decoding from an archive-only standby +(a useful building block for CDC off a compliance / read-replica / cold +tier) effectively impossible today. With this GUC on, the same workload +runs as a service: + +1. Replay encounters a conflicting prune record. +2. The startup process requests a recovery pause and waits. +3. The downstream consumer drains the slot past the pause LSN. +4. A periodic re-scan notices the slot no longer blocks, clears the + pause, and advances `catalog_xmin` past the horizon so the subsequent + `InvalidateObsoleteReplicationSlots()` call is a no-op. +5. Replay continues to the next conflict; the cycle repeats. + +No operator action is required between drain and resume. A +drain-aware consumer turns this into a true continuous pipeline; +`pg_wal_replay_resume()` remains available as the "give up on this slot" +escape hatch. + +## What the patch does + +- Adds the GUC (`bool`, `PGC_SIGHUP`, default `off`, + group `REPLICATION_STANDBY`). One boolean early-return on the hot path + when `off`; no new shared-memory state. +- Hooks a single choke point: `ResolveRecoveryConflictWithSnapshot()` + calls `MaybePauseOnLogicalSlotConflict()` before + `InvalidateObsoleteReplicationSlots()` for the + `RS_INVAL_HORIZON` / `isCatalogRel` case. +- Reuses the existing `recoveryNotPausedCV` / + `SetRecoveryPause` / `ConfirmRecoveryPaused` machinery. + `RECOVERY_PAUSE_REQUESTED → RECOVERY_PAUSED` is driven from inside + `MaybePauseOnLogicalSlotConflict` so + `pg_get_wal_replay_pause_state()` reflects reality. +- Wait loop: + - `ProcessStartupProcInterrupts()` + - `CheckForStandbyTrigger()` — escape so `pg_promote()` doesn't stall + - `AnySlotStillBlocksConflict()` — auto-resume predicate + - `ConfirmRecoveryPaused()` + - `ConditionVariableTimedSleep(&recoveryNotPausedCV, 1s, ...)` +- Auto-resume predicate treats a slot as no longer blocking when any + of the following holds: + - `data.invalidated != RS_INVAL_NONE` (dropped, WAL-removed, etc.) + - `data.synced` (managed by the slot-sync worker — upstream's + concern, not ours) + - `catalog_xmin` has advanced past the horizon + (`pg_replication_slot_advance()`) + - `confirmed_flush_lsn` has reached the pause-point LSN (drained) +- On wait exit, advance `catalog_xmin` (and `xmin`) past the horizon on + drained slots so the fall-through invalidation is a no-op. Slots that + weren't drained are left alone and get invalidated normally — that is + the "give up" path when an operator uses `pg_wal_replay_resume()` + manually. + +## Edge cases we thought about + +- **In-progress slots.** Slots whose `effective_catalog_xmin` is still + `InvalidTransactionId` (still inside `DecodingContextFindStartpoint`) + are skipped in both the pause check and the advance. Pausing for one + would deadlock: `DecodingContextFindStartpoint` needs replay to move + forward to reach `SNAPBUILD_CONSISTENT`. Invalidating an in-progress + slot is harmless — the caller retries. + +- **Synced slots.** Slots with `data.synced = true` are skipped. Writing + their fields from the startup process would race with the slot-sync + worker, and `ALTER` / `DROP_REPLICATION_SLOT` error out on a synced + slot so the operator-facing recipe doesn't apply. + +- **`PrecedesOrEquals` vs `Precedes`.** We use + `TransactionIdPrecedesOrEquals` to match + `DetermineSlotInvalidationCause`. With strict `Precedes`, a slot whose + `catalog_xmin` was just advanced to exactly `horizon` by a previous + pause-and-advance cycle would fail to re-pause on the next prune + record with `horizon == catalog_xmin`, yet would still be invalidated + by the fall-through. + +- **`pg_promote()` during pause.** `CheckForStandbyTrigger()` — which + actually consumes `PROMOTE_SIGNAL_FILE`, not just reads the flag — is + called in the wait loop. Without that, `pg_promote(wait => true)` + stalls for the full `wait_seconds` and returns false. + +- **`max_slot_wal_keep_size` while paused.** The checkpointer is a + separate process and runs restartpoints even while the startup process + is asleep in the wait loop, so `RS_INVAL_WAL_REMOVED` can be applied + out of band. The auto-resume predicate picks that up on the next tick + and lets replay continue. The same mechanism handles an operator + dropping or advancing the slot. + +## Known limitations + +- **Persistence of the post-wait advance.** We mark slots dirty but do + not force `SaveSlotToPath`. If the standby crashes between resume and + the next restartpoint, the on-disk `catalog_xmin` is the pre-advance + value and replay re-encounters the same conflict on restart, re-pauses, + and the consumer re-drains. The drain is idempotent so there is no + data loss, but it is an operator-visible hiccup. `SaveSlotToPath` is + currently static in `slot.c` and not trivially callable from the + startup process (no `MyReplicationSlot`); we'd appreciate feedback on + whether to expose it or accept the current behavior. + +- **No backstop timeout.** If no consumer ever drains, the standby sits + paused indefinitely. We considered a timeout GUC but chose not to wire + one in — it is orthogonal and can be added later. Promote is already + an escape. + +- **Scope: `RS_INVAL_HORIZON` only.** Non-horizon invalidation causes + (WAL-removed, idle-timeout) reflect explicit operator policy and + should continue to invalidate; the patch does not touch them. + +- **Opt-in, default off.** Upstream behavior is unchanged for every + existing deployment; only operators who want continuous decoding from + a WAL-shipping standby flip the GUC on. + +## Tests + +New TAP test `src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl` +(~30 s wall-clock, 10 assertions). Three archive-only standbys from the +same basebackup: + +1. **GUC on**: pauses on conflict; orchestrator drains; slot stays + `reserved`; ≥2000 decoded events across ≥1 pause cycle. +2. **GUC off (baseline)**: under identical WAL, slot goes to `lost` — + proves the conflict actually fires *and* that upstream behavior is + unchanged. +3. **GUC on + `pg_promote(wait=>true, wait_seconds=>30)`**: asserts + promote returns `t` in under 10 s. Guards the + `CheckForStandbyTrigger()` escape in the wait loop. + +The Phase-1 / Phase-2 split in the test is deliberate: slot creation +must reach `SNAPBUILD_CONSISTENT` before any conflicting prune record is +replayed, or `DecodingContextFindStartpoint` and our pause code +deadlock. The test takes basebackup, runs `pg_log_standby_snapshot()` + +`pg_switch_wal()` twice, waits for the anchor segment to archive, +creates slots, *then* runs the catalog-churning workload. The rationale +is commented inline. + +## Files + + src/backend/access/transam/xlogrecovery.c ~25 +/- + src/backend/storage/ipc/standby.c ~320 +/- + src/backend/utils/misc/guc_parameters.dat 9 ++ + src/backend/utils/misc/postgresql.conf.sample 4 ++ + src/include/access/xlogrecovery.h 3 ++ + src/include/storage/standby.h 2 ++ + src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl 296 +++ + +Two functions are promoted from static to extern so the new code in +`standby.c` can drive them: `ConfirmRecoveryPaused()` and +`CheckForStandbyTrigger()`. Both are called only from the wait loop and +are direct parallels to how `recoveryPausesHere()` uses them today. + +## Open questions + +1. **GUC name.** `recovery_pause_on_logical_slot_conflict` is long but + descriptive. An alternative: `logical_slot_conflict_action` with + enum values (`invalidate` | `pause`). Happy to rename. + +2. **Single GUC, auto-resume always.** We could have added a second knob + for "pause only, never auto-resume", but we couldn't find a use case + where a standby that pauses on conflict actually wants to keep + sitting paused after the consumer has drained. Manual + `pg_wal_replay_resume()` still works as the "give up on this slot" + escape. If people want the explicit two-step behavior back, we can + add a mode flag. + +3. **Persistence.** Force `SaveSlotToPath` on advance, or accept the + crash-redo behavior? + +4. **Broader invalidation causes.** The patch scopes to + `RS_INVAL_HORIZON`. Other causes (WAL-removed, idle-timeout) reflect + operator policy and are probably right to keep invalidating — but + someone might have a use case we haven't seen. + +Feedback and review appreciated. + +Thanks, + +— + +## Appendix: commit layout + +The series on the working branch is: + +- `1ef78be` Pause recovery on logical slot conflict + (core feature + GUC + wait loop + manual-resume behavior) +- `ffd897c` Add TAP test for recovery_pause_on_logical_slot_conflict +- `cd2b7be` Refactor `050_recovery_pause_on_slot_conflict.pl` for + readability (extracts helpers; no behavior change) +- `39adedd` Auto-resume recovery once the logical slot conflict is + resolved (extracts `AnySlotStillBlocksConflict` helper, re-scans in + the wait loop, flips pause state when nothing blocks; keeps manual + `pg_wal_replay_resume()` as the escape) + +For a single-patch submission we'd likely squash to two commits +(feature+test, documentation) or three (feature, test, docs); the +refactor commit exists to keep diff review tractable and would fold in. diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index c236e2b796928..3ad968a86790e 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -96,6 +96,24 @@ const char *recoveryTargetName; XLogRecPtr recoveryTargetLSN; int recovery_min_apply_delay = 0; +/* + * If true, when WAL replay on a standby is about to invalidate an otherwise- + * active logical replication slot because a catalog PRUNE_ON_ACCESS record's + * snapshotConflictHorizon has overtaken the slot's catalog_xmin, pause replay + * instead. Replay auto-resumes once the consumer has drained the slot past + * the pause point (or the slot is dropped, advanced, or otherwise no longer + * blocking); pg_wal_replay_resume() also forces continuation. See + * MaybePauseOnLogicalSlotConflict() in standby.c. + * + * Motivated by blueprints/LOGICAL_DECODING_ARCHIVED_WALS.md §4.2.3 / US-4: + * an archive-only logical-decoding standby cannot feed hot_standby_feedback + * to the primary, so it has no natural way to keep the primary's catalog + * horizon pinned. Without this GUC, any logical slot created on such a + * standby is invalidated the first time replay applies a catalog vacuum + * record whose horizon exceeds the slot's catalog_xmin. + */ +bool recovery_pause_on_logical_slot_conflict = false; + /* options formerly taken from recovery.conf for XLOG streaming */ char *PrimaryConnInfo = NULL; char *PrimarySlotName = NULL; @@ -363,7 +381,7 @@ static bool recoveryStopsAfter(XLogReaderState *record); static char *getRecoveryStopReason(void); static void recoveryPausesHere(bool endOfRecovery); static bool recoveryApplyDelay(XLogReaderState *record); -static void ConfirmRecoveryPaused(void); +/* ConfirmRecoveryPaused is extern for use by MaybePauseOnLogicalSlotConflict */ static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, bool fetching_ckpt, @@ -386,7 +404,7 @@ static int XLogFileRead(XLogSegNo segno, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source); -static bool CheckForStandbyTrigger(void); +/* CheckForStandbyTrigger is extern for MaybePauseOnLogicalSlotConflict */ static void SetPromoteIsTriggered(void); static bool HotStandbyActiveInReplay(void); @@ -3080,7 +3098,7 @@ SetRecoveryPause(bool recoveryPause) * Confirm the recovery pause by setting the recovery pause state to * RECOVERY_PAUSED. */ -static void +void ConfirmRecoveryPaused(void) { /* If recovery pause is requested then set it paused */ @@ -4435,7 +4453,11 @@ SetPromoteIsTriggered(void) /* * Check whether a promote request has arrived. */ -static bool +/* + * Non-static: MaybePauseOnLogicalSlotConflict needs this to break its wait + * loop on promotion, same as recoveryPausesHere does. + */ +bool CheckForStandbyTrigger(void) { if (LocalPromoteIsTriggered) diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 29af773394832..521d24fb74d42 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -24,8 +24,11 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/startup.h" #include "replication/slot.h" +#include "storage/condition_variable.h" #include "storage/bufmgr.h" +#include "storage/lwlock.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" @@ -503,8 +506,265 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, * reached, e.g. due to using a physical replication slot. */ if (IsLogicalDecodingEnabled() && isCatalogRel) + { + MaybePauseOnLogicalSlotConflict(locator.dbOid, snapshotConflictHorizon); InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, snapshotConflictHorizon); + } +} + +/* + * Returns true if at least one non-synced logical slot in `dboid` still + * blocks replay past snapshotConflictHorizon. + * + * "Blocks" means: the slot is in use, not invalidated, snapbuild-consistent + * (effective_catalog_xmin is valid — skipping in-progress slots avoids a + * deadlock with DecodingContextFindStartpoint), and its catalog_xmin + * precedes-or-equals the horizon. + * + * Use PrecedesOrEquals (not Precedes) to match DetermineSlotInvalidationCause. + * Otherwise a slot whose catalog_xmin was just advanced to exactly horizon by + * a previous pause-and-advance cycle fails to re-pause on the next prune + * record with the same horizon, yet would still be invalidated by the + * fall-through InvalidateObsoleteReplicationSlots call. + * + * Synced slots are skipped: writing their fields from the startup process + * would race the slot-sync worker, and ALTER / DROP_REPLICATION_SLOT errors + * out on a synced slot so the operator-facing recipe does not apply. + * + * When conflict_lsn is valid (in-wait auto-resume check), slots whose + * confirmed_flush_lsn has reached conflict_lsn are treated as not blocking: + * the consumer has caught up to the pause point and the post-wait advance + * code will bump their catalog_xmin past the horizon. Pass InvalidXLogRecPtr + * for the initial pause-or-not decision (we don't yet have a pause point). + */ +static bool +AnySlotStillBlocksConflict(Oid dboid, TransactionId snapshotConflictHorizon, + XLogRecPtr conflict_lsn) +{ + int i; + bool blocking = false; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + Oid slot_db; + TransactionId slot_xmin; + TransactionId slot_effective_xmin; + XLogRecPtr slot_confirmed; + bool is_candidate; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + slot_db = s->data.database; + slot_xmin = s->data.catalog_xmin; + slot_effective_xmin = s->effective_catalog_xmin; + slot_confirmed = s->data.confirmed_flush; + is_candidate = (s->data.invalidated == RS_INVAL_NONE && + slot_db != InvalidOid && + TransactionIdIsValid(slot_effective_xmin) && + !s->data.synced); + SpinLockRelease(&s->mutex); + + if (!is_candidate) + continue; + if (slot_db != dboid) + continue; + if (!TransactionIdIsValid(slot_xmin)) + continue; + if (!TransactionIdPrecedesOrEquals(slot_xmin, snapshotConflictHorizon)) + continue; + if (conflict_lsn != InvalidXLogRecPtr && + slot_confirmed >= conflict_lsn) + continue; + + blocking = true; + break; + } + LWLockRelease(ReplicationSlotControlLock); + + return blocking; +} + +/* + * If recovery_pause_on_logical_slot_conflict is enabled, and replay is about + * to apply a catalog PRUNE_ON_ACCESS record whose snapshotConflictHorizon + * would cause the invalidation of at least one non-invalidated logical slot + * in the same database, request a recovery pause and wait until the conflict + * is resolved. + * + * The wait exits in any of: + * - Auto-resume: a periodic re-scan finds no slot still blocking. Any of + * draining past the pause LSN, dropping the slot, pg_replication_slot_ + * advance(), or out-of-band invalidation (e.g. max_slot_wal_keep_size + * applied by the checkpointer, which runs even while startup is paused + * here) will satisfy this. The post-wait advance then bumps catalog_xmin + * on drained slots so the fall-through InvalidateObsoleteReplicationSlots() + * is a no-op. + * - Manual resume: pg_wal_replay_resume() flips the state to NOT_PAUSED. + * Any slot still blocking at that point is invalidated by the + * fall-through — the "give up on this slot" escape hatch. + * - Promote: CheckForStandbyTrigger() consumes PROMOTE_SIGNAL_FILE and we + * return early so the startup process can finish promotion. + * + * Only invoked from ResolveRecoveryConflictWithSnapshot(), before any buffer + * locks are taken, so pausing here does not deadlock with anything. + */ +void +MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon) +{ + XLogRecPtr conflict_lsn; + + if (!recovery_pause_on_logical_slot_conflict) + return; + if (!TransactionIdIsValid(snapshotConflictHorizon)) + return; + + if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon, + InvalidXLogRecPtr)) + return; + + conflict_lsn = GetXLogReplayRecPtr(NULL); + + ereport(LOG, + (errmsg("recovery paused: WAL redo at %X/%X would invalidate a logical replication slot", + LSN_FORMAT_ARGS(conflict_lsn)), + errdetail("snapshotConflictHorizon %u exceeds catalog_xmin of at least one active logical slot in database %u.", + snapshotConflictHorizon, dboid), + errhint("Recovery will resume automatically once the slot is drained past %X/%X, dropped, advanced, or invalidated for another reason; pg_wal_replay_resume() forces continuation (invalidating any remaining blocking slot).", + LSN_FORMAT_ARGS(conflict_lsn)))); + + SetRecoveryPause(true); + + while (GetRecoveryPauseState() != RECOVERY_NOT_PAUSED) + { + ProcessStartupProcInterrupts(); + + /* + * If the operator gave up on the slot and triggered a promotion + * instead, bail out of the wait so the startup process can proceed + * with the promotion path. Must use CheckForStandbyTrigger (which + * actually consumes PROMOTE_SIGNAL_FILE), not PromoteIsTriggered + * (which only reads a flag populated by the former). Mirrors the + * same escape in recoveryPausesHere(). + */ + if (CheckForStandbyTrigger()) + { + ConditionVariableCancelSleep(); + return; + } + + /* + * Auto-resume: if nothing is still blocking this conflict, clear + * the pause and let the loop condition exit. The post-wait advance + * will bump catalog_xmin on any slot that drained past conflict_lsn + * so the fall-through InvalidateObsoleteReplicationSlots() is a + * no-op. Slots invalidated out of band (dropped, WAL-removed, + * etc.) are simply not in the scan anymore. + */ + if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon, + conflict_lsn)) + { + SetRecoveryPause(false); + break; + } + + /* + * Promote RECOVERY_PAUSE_REQUESTED to RECOVERY_PAUSED so that + * observers (pg_get_wal_replay_pause_state() / monitoring) see the + * pause as actually taken, not just requested. + */ + ConfirmRecoveryPaused(); + ConditionVariableTimedSleep(&XLogRecoveryCtl->recoveryNotPausedCV, + 1000, WAIT_EVENT_RECOVERY_PAUSE); + } + ConditionVariableCancelSleep(); + + /* + * Wait is over. For any slot whose consumer drained up to (or past) + * conflict_lsn, advance catalog_xmin past the horizon so the subsequent + * InvalidateObsoleteReplicationSlots() fall-through is a no-op. Slots + * that did not drain are left alone and get invalidated normally — the + * "I didn't act, just let the slot die" path that runs when an operator + * manually resumed without draining. + */ + { + int j; + + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + for (j = 0; j < max_replication_slots; j++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[j]; + bool advance; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + /* + * Skip synced slots — same reason as in the pause-check scan. + * Writing their fields would race the slot-sync worker. + */ + advance = (s->data.invalidated == RS_INVAL_NONE && + s->data.database == dboid && + !s->data.synced && + s->data.confirmed_flush >= conflict_lsn && + ((TransactionIdIsValid(s->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(s->data.catalog_xmin, + snapshotConflictHorizon)) || + (TransactionIdIsValid(s->data.xmin) && + TransactionIdPrecedesOrEquals(s->data.xmin, + snapshotConflictHorizon)))); + if (advance) + { + TransactionId new_xmin = snapshotConflictHorizon; + + TransactionIdAdvance(new_xmin); /* strictly > horizon */ + if (TransactionIdIsValid(s->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(s->data.catalog_xmin, + snapshotConflictHorizon)) + { + s->data.catalog_xmin = new_xmin; + s->effective_catalog_xmin = new_xmin; + } + if (TransactionIdIsValid(s->data.xmin) && + TransactionIdPrecedesOrEquals(s->data.xmin, + snapshotConflictHorizon)) + { + s->data.xmin = new_xmin; + s->effective_xmin = new_xmin; + } + s->just_dirtied = true; + s->dirty = true; + } + SpinLockRelease(&s->mutex); + + if (advance) + ereport(LOG, + (errmsg("advanced catalog_xmin of logical slot \"%s\" past conflict horizon %u", + NameStr(s->data.name), snapshotConflictHorizon), + errdetail("Slot's confirmed_flush_lsn %X/%X reached the conflict record at %X/%X; consumer drained past the pause point.", + LSN_FORMAT_ARGS(s->data.confirmed_flush), + LSN_FORMAT_ARGS(conflict_lsn)))); + } + LWLockRelease(ReplicationSlotControlLock); + + /* + * NOTE: the advance above only marks the slot dirty; it is persisted + * to disk at the next restartpoint. If the standby crashes between + * here and the next restartpoint, the on-disk catalog_xmin is the + * pre-advance value, and on recovery restart we re-encounter the + * same conflict record, re-pause, and the operator re-drains. No + * data loss — the drain is idempotent for the same slot state — but + * an operator-visible hiccup. A future iteration could tighten this + * by calling SaveSlotToPath directly; SaveSlotToPath is currently + * static in slot.c and not trivially callable from the startup + * process (no MyReplicationSlot). + */ + } } /* diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 83af594d4af4b..ac7c355c22c1f 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2440,6 +2440,14 @@ max => 'INT_MAX', }, +{ name => 'recovery_pause_on_logical_slot_conflict', type => 'bool', + context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Pauses recovery instead of invalidating an active logical slot on catalog conflict.', + long_desc => 'When WAL replay on a standby is about to invalidate a logical replication slot because a catalog PRUNE_ON_ACCESS record has overtaken the slot\'s catalog_xmin, pause recovery instead. Recovery resumes automatically once the slot has been drained past the pause point, dropped, advanced, or invalidated for another reason (e.g. max_slot_wal_keep_size). pg_wal_replay_resume() also forces continuation, invalidating any remaining blocking slot.', + variable => 'recovery_pause_on_logical_slot_conflict', + boot_val => 'false', +}, + { name => 'recovery_prefetch', type => 'enum', context => 'PGC_SIGHUP', group => 'WAL_RECOVERY', short_desc => 'Prefetch referenced blocks during recovery.', long_desc => 'Look ahead in the WAL to find references to uncached data.', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ac38cddaaf9a6..414fed447cfe2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -403,6 +403,10 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#recovery_pause_on_logical_slot_conflict = off # pause recovery instead of invalidating + # a logical slot on catalog conflict; + # auto-resumes once the slot is drained, + # dropped, or otherwise unblocks #sync_replication_slots = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index ba7750dca0b45..58d578ce85373 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -129,6 +129,7 @@ extern PGDLLIMPORT XLogRecoveryCtlData *XLogRecoveryCtl; extern PGDLLIMPORT bool recoveryTargetInclusive; extern PGDLLIMPORT int recoveryTargetAction; extern PGDLLIMPORT int recovery_min_apply_delay; +extern PGDLLIMPORT bool recovery_pause_on_logical_slot_conflict; extern PGDLLIMPORT char *PrimaryConnInfo; extern PGDLLIMPORT char *PrimarySlotName; extern PGDLLIMPORT char *recoveryRestoreCommand; @@ -213,6 +214,8 @@ extern bool HotStandbyActive(void); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern RecoveryPauseState GetRecoveryPauseState(void); extern void SetRecoveryPause(bool recoveryPause); +extern void ConfirmRecoveryPaused(void); +extern bool CheckForStandbyTrigger(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern TimestampTz GetLatestXTime(void); extern TimestampTz GetCurrentChunkReplayStartTime(void); diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 8715c08e94f20..510d1d57e1c83 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -75,6 +75,8 @@ extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHo extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, bool isCatalogRel, RelFileLocator locator); +extern void MaybePauseOnLogicalSlotConflict(Oid dboid, + TransactionId snapshotConflictHorizon); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl b/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl new file mode 100644 index 0000000000000..d1a03475e95b2 --- /dev/null +++ b/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl @@ -0,0 +1,329 @@ +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Exercise the recovery_pause_on_logical_slot_conflict GUC on a standby. +# +# Two-phase flow so the slot is fully consistent BEFORE any catalog- +# prune WAL record is replayed — otherwise slot creation would block +# inside DecodingContextFindStartpoint while replay pauses on the +# prune, and we would deadlock. (Fix #1, bbd5d4e13bc, narrows the +# window but doesn't remove it; keeping the two-phase flow explicit +# makes the test robust.) +# +# Phase 1 — bring up a consistent logical slot on the standby from a +# quiet primary archive: +# * take basebackup +# * pg_log_standby_snapshot() → snapbuild path (a) anchor +# * wait for the snapshot's segment to archive +# * start standby, let replay catch up, create slot (quick — no +# prune records in the archive yet). +# +# Phase 2 — churn the primary's catalog so the standby's replay +# eventually hits a catalog-prune record that would invalidate the +# slot: +# * run CREATE / DROP of transient tables (pg_class churn) +# * run ANALYZE x2 + VACUUM pg_statistic / pg_class (HOT prune on +# catalog relations in db=postgres) +# * wait for those segments to archive +# * orchestrator loop on the standby: when +# pg_get_wal_replay_pause_state() returns paused, drain the slot +# via pg_logical_slot_get_changes, call pg_wal_replay_resume, +# continue. + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; +use Time::HiRes qw(usleep); + +# --------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------- + +# Build the primary, seed the workload table, take a basebackup, and +# produce a "clean" archive: one that contains a standby snapshot but +# no catalog-prune WAL yet. Returns ($node_primary, $backup_name). +sub setup_primary_with_clean_archive +{ + my $node_primary = PostgreSQL::Test::Cluster->new('primary'); + $node_primary->init(allows_streaming => 'logical', has_archiving => 1); + $node_primary->append_conf('postgresql.conf', qq[ +wal_level = logical +archive_mode = on +archive_timeout = 1s +autovacuum = on +autovacuum_naptime = 5s +fsync = off +synchronous_commit = off +]); + $node_primary->start; + + $node_primary->safe_psql('postgres', qq[ + CREATE TABLE events (id serial PRIMARY KEY, payload text); + ALTER TABLE events REPLICA IDENTITY FULL; + INSERT INTO events (payload) VALUES ('seed'); + ]); + + my $backup_name = 'backup1'; + $node_primary->backup($backup_name); + + # Quiet-moment RUNNING_XACTS in post-backup WAL — provides path (a) + # anchor for snapbuild. + $node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + + # Force the segment containing that anchor to archive so the standby + # can see it via restore_command. Switch TWICE: first switch closes + # the segment with the snapshot record; second switch gives + # snapbuild the forward WAL it needs to decide the slot is + # consistent. Without the second switch, + # DecodingContextFindStartpoint blocks on 'waiting for WAL to + # become available at seg N+1' — flaky slot creation. + my $phase1_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase1_seg' + FROM pg_stat_archiver + ]) or die "Timed out waiting for phase-1 segment $phase1_seg to archive"; + + return ($node_primary, $backup_name); +} + +# Bring up an archive-only standby from $backup_name on $node_primary +# with recovery_pause_on_logical_slot_conflict set to $guc_value. Waits +# for replay to catch up, then returns the node. +sub create_archive_standby +{ + my ($node_primary, $backup_name, $name, $guc_value) = @_; + + my $standby = PostgreSQL::Test::Cluster->new($name); + $standby->init_from_backup($node_primary, $backup_name, + has_streaming => 0, has_restoring => 1); + $standby->append_conf('postgresql.conf', qq[ +hot_standby = on +recovery_pause_on_logical_slot_conflict = $guc_value +wal_level = logical +max_standby_archive_delay = -1 +max_standby_streaming_delay = -1 +]); + $standby->start; + $standby->poll_query_until('postgres', + "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); + + return $standby; +} + +# Churn the primary's catalog enough to emit catalog-prune WAL records, +# then force and wait for those records to reach the archive. +sub run_catalog_churn +{ + my ($node_primary) = @_; + + # Transient tables exercise pg_class / pg_attribute / pg_type / pg_depend. + $node_primary->safe_psql('postgres', qq[ + INSERT INTO events (payload) + SELECT 'row-' || g FROM generate_series(1, 3000) g; + ]); + for (my $i = 0; $i < 20; $i++) { + $node_primary->safe_psql('postgres', + "CREATE TABLE churn_$i (id int, payload text); DROP TABLE churn_$i;"); + } + # Two ANALYZE calls make first-generation pg_statistic rows dead by + # overwriting them; VACUUM then emits Heap2/PRUNE_ON_ACCESS. + $node_primary->safe_psql('postgres', qq[ + ANALYZE events; + ANALYZE events; + VACUUM pg_class; + VACUUM pg_attribute; + VACUUM pg_type; + VACUUM pg_depend; + VACUUM pg_statistic; + ]); + + my $phase2_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase2_seg' + FROM pg_stat_archiver + ]) or die "Timed out waiting for phase-2 segment $phase2_seg to archive"; + + return; +} + +# Orchestrator loop for the GUC-on standby: when replay pauses, drain +# the slot via pg_logical_slot_get_changes and call +# pg_wal_replay_resume(). Exits when replay stops advancing or when +# $deadline_seconds have passed. Returns ($pauses_seen, $total_drained) +# and includes a final drain of anything left on the slot. +sub drain_and_resume_loop +{ + my ($standby, $slot_name, $deadline_seconds) = @_; + + my $total_drained = 0; + my $pauses_seen = 0; + my $last_replay = ''; + my $stall_ticks = 0; + my $deadline = time() + $deadline_seconds; + + while (time() < $deadline) { + my $state = $standby->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + my $replay = $standby->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); + + if ($state eq 'paused' || $state eq 'pause requested') { + my $got = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $got; + $pauses_seen++; + $standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); + $stall_ticks = 0; + } elsif ($replay eq $last_replay) { + $stall_ticks++; + last if $stall_ticks > 10; + } else { + $stall_ticks = 0; + } + + $last_replay = $replay; + usleep(500_000); + } + + my $final = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $final; + + return ($pauses_seen, $total_drained); +} + +# Poll until $standby reports replay as paused, up to ~30 seconds. +# Returns 1 on success, 0 on timeout. +sub wait_for_replay_paused +{ + my ($standby) = @_; + + for (my $i = 0; $i < 60; $i++) { + my $s = $standby->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + return 1 if $s eq 'paused'; + usleep(500_000); + } + return 0; +} + +# --------------------------------------------------------------------- +# Main script +# --------------------------------------------------------------------- + +# 1. GUC visibility. +my ($node_primary, $backup_name) = setup_primary_with_clean_archive(); + +my $guc = $node_primary->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_settings WHERE name = 'recovery_pause_on_logical_slot_conflict'"); +is($guc, '1', 'recovery_pause_on_logical_slot_conflict GUC is registered'); + +# 2. Phase 1: bring up BOTH standbys (GUC-on and GUC-off) while the +# archive still contains only the quiet-moment snapshot — no prune +# records yet. Slot creation reaches SNAPBUILD_CONSISTENT quickly on +# both. Later, when Phase 2 ships the prune records, the two standbys +# diverge: the GUC-on one pauses and drains; the GUC-off one +# invalidates. +my $node_standby = create_archive_standby($node_primary, $backup_name, + 'standby', 'on'); +my $node_standby_off = create_archive_standby($node_primary, $backup_name, + 'standby_off', 'off'); + +$node_standby->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('t_slot', 'test_decoding'); +]); +$node_standby_off->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('t_slot_off', 'test_decoding'); +]); + +my $slot_ready = $node_standby->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot' +]); +is($slot_ready, 'reserved', "slot created cleanly in Phase 1 (state: $slot_ready)"); + +my $off_slot_ready = $node_standby_off->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot_off' +]); +is($off_slot_ready, 'reserved', + "baseline slot created cleanly in Phase 1 (state: $off_slot_ready)"); + +# 3. Phase 2: catalog churn on primary, then wait for archive. +run_catalog_churn($node_primary); + +# 4. Orchestrator loop on the GUC-on standby. +my ($pauses_seen, $total_drained) = + drain_and_resume_loop($node_standby, 't_slot', 60); + +my $slot_state = $node_standby->safe_psql('postgres', qq[ + SELECT wal_status || '|' || COALESCE(invalidation_reason, '') + FROM pg_replication_slots WHERE slot_name = 't_slot'; +]); +like($slot_state, qr/^reserved\|/, + "slot survived catalog prune with GUC on (state: $slot_state)"); + +cmp_ok($pauses_seen, '>=', 1, + "at least one pause event was handled ($pauses_seen seen)"); + +cmp_ok($total_drained, '>=', 2000, + "at least 2000 decoded events ($total_drained got)"); + +# 5. Baseline assertion: the GUC-off standby, faced with the exact same +# Phase-2 archive, should invalidate its slot. This confirms the test +# setup actually triggers the conflict AND that GUC-off behavior is +# unchanged from upstream — if this ever starts passing with state +# "reserved", either the test stopped reproducing the trigger or the +# GUC-off path accidentally benefits from our patch. +my $off_state = 'reserved'; +for (my $i = 0; $i < 60; $i++) { + $off_state = $node_standby_off->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot_off'; + ]); + last if $off_state eq 'lost'; + usleep(500_000); +} + +is($off_state, 'lost', + "baseline (GUC off): slot invalidates as expected under catalog prune"); + +# 6. Promote-during-pause: bring up a third standby, get it paused by +# the GUC, then call pg_promote() and assert promotion actually +# completes (rather than stalling until someone also runs +# pg_wal_replay_resume). Guards the CheckForStandbyTrigger() escape +# path in the wait loop. +my $node_standby_p = create_archive_standby($node_primary, $backup_name, + 'standby_promote', 'on'); +$node_standby_p->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('promote_slot', 'test_decoding')"); + +# Phase-2 archive is already shipped so a pause will happen within a +# few seconds. +my $paused = wait_for_replay_paused($node_standby_p); +ok($paused, "promote-test standby reached paused state before promotion"); + +# Call pg_promote with a short wait. Without the CheckForStandbyTrigger +# escape in the wait loop, this stalls for the full wait_seconds and +# returns false; with the fix, it returns true in ~1 second. +my $t0 = time(); +my $promoted = $node_standby_p->safe_psql('postgres', + "SELECT pg_promote(wait => true, wait_seconds => 30)"); +my $elapsed = time() - $t0; +is($promoted, 't', "pg_promote returned true while standby was paused by GUC"); +cmp_ok($elapsed, '<', 10, + "pg_promote completed in under 10s (actual: ${elapsed}s)"); + +$node_standby_p->stop; +$node_standby_off->stop; +$node_standby->stop; +$node_primary->stop; + +done_testing();