Skip to content

Commit 80602f2

Browse files
committed
Fix replay queue spill-to-disk CodeRabbit review findings
- Fix memory leak: spill-read entries during replay were allocated in TopMemoryContext but never freed because entry_spilled was only set on the first-pass path. Add !entry->from_pq to the free condition so replay-path spill-read entries are freed after processing. - Set GUC minimum for spock.exception_replay_queue_size from -1 to 0, removing an undocumented value that silently behaved the same as 0. - Use explicit subscription name in spill_transaction test when polling sub_show_status to avoid ambiguity with multiple subscriptions.
1 parent 4c7b75f commit 80602f2

4 files changed

Lines changed: 17 additions & 14 deletions

File tree

src/spock.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1159,7 +1159,7 @@ _PG_init(void)
11591159
"Set to 0 to disable spilling (unlimited memory).",
11601160
&spock_replay_queue_size,
11611161
4,
1162-
-1,
1162+
0,
11631163
MAX_KILOBYTES / 1024,
11641164
PGC_SIGHUP,
11651165
GUC_UNIT_MB,

src/spock_apply.c

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3053,9 +3053,11 @@ apply_work(PGconn *streamConn)
30533053
* In-memory entries (both first-pass and replay) live in
30543054
* ApplyReplayContext and are freed by
30553055
* MemoryContextReset inside apply_replay_queue_reset,
3056-
* called from handle_commit.
3056+
* called on applying a COMMIT.
3057+
*
3058+
* !from_pq catches spill-read entries during replay.
30573059
*/
3058-
if (entry_spilled)
3060+
if (entry_spilled || !entry->from_pq)
30593061
apply_replay_entry_free(entry);
30603062
}
30613063
else if (c == 'k')
@@ -3991,9 +3993,16 @@ apply_replay_spill_write_entry(int len, char *data)
39913993
Assert(len > 0);
39923994
Assert(data != NULL);
39933995

3996+
/*
3997+
* Increment the count before writing so that a partial or failed write
3998+
* (ERROR from BufFileWrite) leaves the count higher than the number of
3999+
* complete records on disk. During replay, apply_replay_spill_read_entry()
4000+
* will attempt to read this record, hit EOF or a short read, and raise
4001+
* ERROR — which triggers a clean worker restart via the outer PG_CATCH.
4002+
*/
4003+
apply_replay_spill_count++;
39944004
BufFileWrite(apply_replay_spill_file, &len, sizeof(int));
39954005
BufFileWrite(apply_replay_spill_file, data, len);
3996-
apply_replay_spill_count++;
39974006
}
39984007

39994008
/*
@@ -4195,11 +4204,6 @@ apply_replay_queue_append_entry(ApplyReplayEntry *entry, StringInfo msg)
41954204

41964205
apply_replay_spill_write_entry(msg->len, msg->data);
41974206

4198-
/* XXX: keep DEBUG1 logging until spill-to-disk code is proven stable */
4199-
if (xact_action_counter % 100 == 0)
4200-
elog(DEBUG1, "SPOCK %s: replay queue spill entry %u: ",
4201-
MySubscription->name, xact_action_counter);
4202-
42034207
/*
42044208
* Move the entry struct from ApplyReplayContext to TopMemoryContext.
42054209
* handle_commit calls apply_replay_queue_reset which does
@@ -4209,9 +4213,8 @@ apply_replay_queue_append_entry(ApplyReplayEntry *entry, StringInfo msg)
42094213
*/
42104214
oldctx = MemoryContextSwitchTo(TopMemoryContext);
42114215
mc_entry = (ApplyReplayEntry *) palloc(sizeof(ApplyReplayEntry));
4212-
/* nosemgrep: palloc above allocates exactly sizeof(ApplyReplayEntry)
4213-
* bytes — the same size memcpy copies — so no overflow is possible. */
4214-
memcpy(mc_entry, entry, sizeof(ApplyReplayEntry));
4216+
/* palloc and memcpy use the same sizeof — no overflow possible. */
4217+
memcpy(mc_entry, entry, sizeof(ApplyReplayEntry)); /* nosemgrep */
42154218
MemoryContextSwitchTo(oldctx);
42164219

42174220
pfree(entry);

tests/regress/expected/spill_transaction.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ COPY test_spill (id) FROM PROGRAM 'seq 1 50000' WITH (FORMAT text);
267267
DO $$
268268
BEGIN
269269
SET LOCAL statement_timeout = '30s';
270-
WHILE (SELECT status FROM spock.sub_show_status()) <> 'disabled' LOOP
270+
WHILE (SELECT status FROM spock.sub_show_status('test_subscription')) <> 'disabled' LOOP
271271
PERFORM pg_sleep(0.1);
272272
END LOOP;
273273
END;

tests/regress/sql/spill_transaction.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ COPY test_spill (id) FROM PROGRAM 'seq 1 50000' WITH (FORMAT text);
178178
DO $$
179179
BEGIN
180180
SET LOCAL statement_timeout = '30s';
181-
WHILE (SELECT status FROM spock.sub_show_status()) <> 'disabled' LOOP
181+
WHILE (SELECT status FROM spock.sub_show_status('test_subscription')) <> 'disabled' LOOP
182182
PERFORM pg_sleep(0.1);
183183
END LOOP;
184184
END;

0 commit comments

Comments
 (0)