From ecc60f7b50eb619e2b956bb7bbae301036e30561 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Fri, 20 Mar 2026 17:41:17 -0700 Subject: [PATCH 1/4] Revert "SPOC-99: Remove fixed replay queue size limit and eliminate worker restarts on overflow." This reverts commit 9cbbffc028571390f459b7318693988fdc12749c. --- include/spock.h | 2 +- src/spock.c | 7 +++---- src/spock_apply.c | 47 ++++++++++++++++++++++++++--------------------- 3 files changed, 30 insertions(+), 26 deletions(-) diff --git a/include/spock.h b/include/spock.h index 891d616b..a0fe5cf6 100644 --- a/include/spock.h +++ b/include/spock.h @@ -50,7 +50,7 @@ extern bool spock_include_ddl_repset; extern bool allow_ddl_from_functions; extern int restart_delay_default; extern int restart_delay_on_exception; -extern int spock_replay_queue_size; /* Deprecated - no longer used */ +extern int spock_replay_queue_size; extern int spock_feedback_frequency; extern bool check_all_uc_indexes; extern bool spock_enable_quiet_mode; diff --git a/src/spock.c b/src/spock.c index 2aa74433..7b4e5094 100644 --- a/src/spock.c +++ b/src/spock.c @@ -139,7 +139,7 @@ bool spock_include_ddl_repset = false; bool allow_ddl_from_functions = false; int restart_delay_default; int restart_delay_on_exception; -int spock_replay_queue_size; /* Deprecated - no longer used */ +int spock_replay_queue_size; int spock_feedback_frequency; bool check_all_uc_indexes = false; bool spock_enable_quiet_mode = false; @@ -1150,9 +1150,8 @@ _PG_init(void) NULL); DefineCustomIntVariable("spock.exception_replay_queue_size", - "DEPRECATED: apply-worker replay queue size (no longer used)", - "This setting is deprecated and has no effect. " - "The replay queue now dynamically allocates memory as needed.", + "apply-worker replay queue size for exception", + NULL, &spock_replay_queue_size, 4, 0, diff --git a/src/spock_apply.c b/src/spock_apply.c index 46d7d96d..d96d3383 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -124,6 +124,7 @@ static ApplyReplayEntry * apply_replay_head = NULL; static ApplyReplayEntry * apply_replay_tail = NULL; static ApplyReplayEntry * apply_replay_next = NULL; static int apply_replay_bytes = 0; +static bool apply_replay_overflow = false; /* Number of tuples inserted after which we switch to multi-insert. */ #define MIN_MULTI_INSERT_TUPLES 5 @@ -2268,10 +2269,6 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started) if (!failed) { - /* - * Follow spock.exception_behavior GUC instead of restarting - * worker - */ if (exception_behaviour == TRANSDISCARD || exception_behaviour == SUB_DISABLE) RollbackAndReleaseCurrentSubTransaction(); @@ -2971,24 +2968,28 @@ apply_work(PGconn *streamConn) last_received = end_lsn; /* - * Append the entry to the end of the replay queue if we - * read it from the stream. Dynamic allocation means no - * fixed size limit - queue grows as needed. Note: - * spock_replay_queue_size is deprecated and no longer - * checked. + * Append the entry to the end of the replay queue + * if we read it from the stream but check for overflow. */ if (queue_append) { apply_replay_bytes += msg->len; - if (apply_replay_head == NULL) + if (apply_replay_bytes < spock_replay_queue_size) { - apply_replay_head = apply_replay_tail = entry; + if (apply_replay_head == NULL) + { + apply_replay_head = apply_replay_tail = entry; + } + else + { + apply_replay_tail->next = entry; + apply_replay_tail = entry; + } } else { - apply_replay_tail->next = entry; - apply_replay_tail = entry; + apply_replay_overflow = true; } } @@ -3009,10 +3010,10 @@ apply_work(PGconn *streamConn) replication_handler(msg); - /* - * Note: No overflow handling needed - dynamic allocation - * used - */ + if (queue_append && apply_replay_overflow) + { + apply_replay_entry_free(entry); + } } else if (c == 'k') { @@ -3186,11 +3187,14 @@ apply_work(PGconn *streamConn) } /* - * Note: Replay queue overflow handling removed - dynamic allocation - * prevents overflow. We no longer kill and restart apply workers for - * queue overflow. Exception handling now follows - * spock.exception_behavior setting. + * If we ran out of queue space we also need to bail out. */ + if (apply_replay_overflow) + { + elog(LOG, "SPOCK: caught exception after replay queue overrun " + "- forcing apply worker restart"); + PG_RE_THROW(); + } /* * Reaching this point means that we are dealing with the first @@ -3940,6 +3944,7 @@ apply_replay_queue_reset(void) apply_replay_tail = NULL; apply_replay_next = NULL; apply_replay_bytes = 0; + apply_replay_overflow = false; MemoryContextReset(ApplyReplayContext); } From c4f3ff9785fbe4f2654bb405c0af2bb1c3854a8f Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 22 Mar 2026 13:38:37 -0700 Subject: [PATCH 2/4] Update replication_set expected output after revert of SPOC-99 --- tests/regress/expected/replication_set.out | 34 +++++++++++----------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index 398ea9ac..36057f1a 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -455,14 +455,14 @@ SELECT ) AS error_message FROM spock.exception_log ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message ------------------+--------------+------------+-----------+----------------------------------------------------+------------------------------------------ + command_counter | table_schema | table_name | operation | remote_new_tup | error_message +-----------------+--------------+------------+-----------+----------------------------------------------------+--------------------------- 1 | | | INSERT | | Spock can't find relation - 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 1 | | | INSERT | | Spock can't find relation + 1 | | | INSERT | | Spock can't find relation + 1 | | | INSERT | | Spock can't find relation + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown (6 rows) \c :provider_dsn @@ -572,18 +572,18 @@ SELECT ) AS error_message FROM spock.exception_log ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message + command_counter | table_schema | table_name | operation | remote_new_tup | error_message -----------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- 1 | | | INSERT | | Spock can't find relation - 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 7 | | | UPDATE | | Spock can't find relation - 8 | | | UPDATE | | Spock can't find relation - 9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 10 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) + 1 | | | INSERT | | Spock can't find relation + 1 | | | UPDATE | | Spock can't find relation + 1 | | | UPDATE | | Spock can't find relation + 1 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) + 1 | | | INSERT | | Spock can't find relation + 1 | | | INSERT | | Spock can't find relation + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown + 2 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | unknown (10 rows) \c :provider_dsn From 878aee8732dcb4f95d94b05b82ccce80d78e5b82 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 22 Mar 2026 13:39:00 -0700 Subject: [PATCH 3/4] Fix replay queue overflow check units mismatch GUC_UNIT_MB causes spock_replay_queue_size to be stored internally in KB, but apply_replay_bytes accumulates in bytes. Multiply by 1024 to compare in the same units, restoring the intended 4MB default threshold. --- src/spock_apply.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/spock_apply.c b/src/spock_apply.c index d96d3383..f319fb6b 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -2975,7 +2975,12 @@ apply_work(PGconn *streamConn) { apply_replay_bytes += msg->len; - if (apply_replay_bytes < spock_replay_queue_size) + /* + * spock_replay_queue_size is stored in KB (GUC_UNIT_MB + * converts the user-facing MB value to KB internally), + * so multiply by 1024 to compare against bytes. + */ + if (apply_replay_bytes < spock_replay_queue_size * 1024) { if (apply_replay_head == NULL) { From 2618c3fc3b7bebfa89c6bf196c16bdbce0af91f5 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 22 Mar 2026 14:17:01 -0700 Subject: [PATCH 4/4] Increase spock.exception_replay_queue_size default from 4MB to 16MB --- src/spock.c | 2 +- tests/regress/expected/replication_set.out | 34 +++++++++++----------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/spock.c b/src/spock.c index 7b4e5094..2829eb10 100644 --- a/src/spock.c +++ b/src/spock.c @@ -1153,7 +1153,7 @@ _PG_init(void) "apply-worker replay queue size for exception", NULL, &spock_replay_queue_size, - 4, + 16, 0, MAX_KILOBYTES / 1024, PGC_SIGHUP, diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index 36057f1a..398ea9ac 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -455,14 +455,14 @@ SELECT ) AS error_message FROM spock.exception_log ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message ------------------+--------------+------------+-----------+----------------------------------------------------+--------------------------- + command_counter | table_schema | table_name | operation | remote_new_tup | error_message +-----------------+--------------+------------+-----------+----------------------------------------------------+------------------------------------------ 1 | | | INSERT | | Spock can't find relation - 1 | | | INSERT | | Spock can't find relation - 1 | | | INSERT | | Spock can't find relation - 1 | | | INSERT | | Spock can't find relation - 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown - 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown + 2 | | | INSERT | | Spock can't find relation + 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 4 | | | INSERT | | Spock can't find relation + 5 | | | INSERT | | Spock can't find relation + 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid (6 rows) \c :provider_dsn @@ -572,18 +572,18 @@ SELECT ) AS error_message FROM spock.exception_log ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message + command_counter | table_schema | table_name | operation | remote_new_tup | error_message -----------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- 1 | | | INSERT | | Spock can't find relation - 1 | | | INSERT | | Spock can't find relation - 1 | | | UPDATE | | Spock can't find relation - 1 | | | UPDATE | | Spock can't find relation - 1 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) - 1 | | | INSERT | | Spock can't find relation - 1 | | | INSERT | | Spock can't find relation - 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown - 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown - 2 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | unknown + 2 | | | INSERT | | Spock can't find relation + 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 4 | | | INSERT | | Spock can't find relation + 5 | | | INSERT | | Spock can't find relation + 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 7 | | | UPDATE | | Spock can't find relation + 8 | | | UPDATE | | Spock can't find relation + 9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 10 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) (10 rows) \c :provider_dsn