Skip to content

Commit 4102871

Browse files
committed
Refactor replay queue entry lifecycle and fix use-after-free
Refactor apply_replay_queue_append_entry() to return a bool indicating whether the caller must free the entry, instead of requiring the caller to compare pointers and track an entry_spilled flag. The function now updates entry and msg pointers through out-parameters. This fixes a use-after-free: on a COMMIT message, replication_handler() calls handle_commit → apply_replay_queue_reset → MemoryContextReset, which destroys in-memory entries. The old code then accessed entry->from_pq on the freed memory. The new design captures the free decision before replication_handler runs. Also update spill_transaction regression test: move TRUNCATE before subscriber configuration in SUB_DISABLE section to avoid replicating TRUNCATE while exception_behaviour is active, drop command_counter from exception_log queries for stable output, and add sanity checks.
1 parent 80602f2 commit 4102871

3 files changed

Lines changed: 85 additions & 86 deletions

File tree

src/spock_apply.c

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,8 @@ static void get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos,
254254
static void UpdateWorkerStats(XLogRecPtr last_received, XLogRecPtr last_inserted);
255255
static void maybe_advance_forwarded_origin(XLogRecPtr end_lsn, bool xact_had_exception);
256256
static ApplyReplayEntry *apply_replay_queue_next_entry(void);
257-
static ApplyReplayEntry *apply_replay_queue_append_entry(ApplyReplayEntry *entry,
258-
StringInfo msg);
257+
static bool apply_replay_queue_append_entry(ApplyReplayEntry **entry_p,
258+
StringInfo *msg_p);
259259
static void apply_replay_queue_start_replay(void);
260260
static void apply_replay_spill_write_entry(int len, char *data);
261261
static ApplyReplayEntry *apply_replay_spill_read_entry(void);
@@ -2910,7 +2910,7 @@ apply_work(PGconn *streamConn)
29102910
{
29112911
ApplyReplayEntry *entry;
29122912
bool queue_append;
2913-
bool entry_spilled = false;
2913+
bool need_free = false;
29142914
StringInfo msg;
29152915
int c;
29162916

@@ -3013,19 +3013,17 @@ apply_work(PGconn *streamConn)
30133013

30143014
/* Append the entry to the replay queue if from stream */
30153015
if (queue_append)
3016+
need_free = apply_replay_queue_append_entry(&entry,
3017+
&msg);
3018+
else
30163019
{
3017-
ApplyReplayEntry *orig = entry;
3018-
3019-
entry = apply_replay_queue_append_entry(entry, msg);
3020-
entry_spilled = (entry != orig);
3021-
30223020
/*
3023-
* Spilling relocates the entry from ApplyReplayContext
3024-
* to TopMemoryContext, freeing the original. Update msg
3025-
* to point to the new copy's copydata.
3021+
* Replay path: spill-read entries live in
3022+
* TopMemoryContext and must be freed explicitly.
3023+
* Capture this before replication_handler, which
3024+
* may reset ApplyReplayContext and free the entry.
30263025
*/
3027-
if (entry_spilled)
3028-
msg = &entry->copydata;
3026+
need_free = !entry->from_pq;
30293027
}
30303028

30313029
/*
@@ -3045,19 +3043,7 @@ apply_work(PGconn *streamConn)
30453043

30463044
replication_handler(msg);
30473045

3048-
/*
3049-
* Free spilled entries explicitly: their structs live in
3050-
* TopMemoryContext (not ApplyReplayContext), so they are
3051-
* not cleaned up by apply_replay_queue_reset.
3052-
*
3053-
* In-memory entries (both first-pass and replay) live in
3054-
* ApplyReplayContext and are freed by
3055-
* MemoryContextReset inside apply_replay_queue_reset,
3056-
* called on applying a COMMIT.
3057-
*
3058-
* !from_pq catches spill-read entries during replay.
3059-
*/
3060-
if (entry_spilled || !entry->from_pq)
3046+
if (need_free)
30613047
apply_replay_entry_free(entry);
30623048
}
30633049
else if (c == 'k')
@@ -4159,15 +4145,20 @@ apply_replay_queue_next_entry(void)
41594145
* If spock_replay_queue_size is configured and the in-memory limit is
41604146
* exceeded, spill subsequent entries to a temporary file on disk.
41614147
*
4162-
* Returns the entry to use after the call. When spilled, the original
4163-
* entry (in ApplyReplayContext) is freed and replaced by a copy in
4164-
* TopMemoryContext that survives MemoryContextReset(ApplyReplayContext)
4165-
* in handle_commit. The caller can detect spilling by comparing the
4166-
* returned pointer to the original.
4148+
* When spilled, the original entry (in ApplyReplayContext) is freed and
4149+
* replaced by a copy in TopMemoryContext that survives
4150+
* MemoryContextReset(ApplyReplayContext) in handle_commit. The updated
4151+
* entry and msg pointers are written back through entry_p and msg_p.
4152+
*
4153+
* Returns true if the caller must free the entry after processing
4154+
* (i.e. it lives in TopMemoryContext), false if it will be cleaned up
4155+
* automatically by MemoryContextReset(ApplyReplayContext).
41674156
*/
4168-
static ApplyReplayEntry *
4169-
apply_replay_queue_append_entry(ApplyReplayEntry *entry, StringInfo msg)
4157+
static bool
4158+
apply_replay_queue_append_entry(ApplyReplayEntry **entry_p, StringInfo *msg_p)
41704159
{
4160+
ApplyReplayEntry *entry = *entry_p;
4161+
StringInfo msg = *msg_p;
41714162
MemoryContext oldctx;
41724163

41734164
Assert(entry != NULL);
@@ -4218,7 +4209,9 @@ apply_replay_queue_append_entry(ApplyReplayEntry *entry, StringInfo msg)
42184209
MemoryContextSwitchTo(oldctx);
42194210

42204211
pfree(entry);
4221-
return mc_entry;
4212+
*entry_p = mc_entry;
4213+
*msg_p = &mc_entry->copydata;
4214+
return true; /* caller must free */
42224215
}
42234216
else
42244217
{
@@ -4242,7 +4235,7 @@ apply_replay_queue_append_entry(ApplyReplayEntry *entry, StringInfo msg)
42424235
elog(DEBUG1, "SPOCK %s: replay queue keep in-memory entry %u: ",
42434236
MySubscription->name, xact_action_counter);
42444237

4245-
return entry;
4238+
return false; /* freed by MemoryContextReset */
42464239
}
42474240
}
42484241

@@ -4264,7 +4257,9 @@ apply_replay_queue_start_replay(void)
42644257
if (apply_replay_spilling)
42654258
{
42664259
Assert(apply_replay_spill_file != NULL);
4267-
BufFileSeek(apply_replay_spill_file, 0, 0, SEEK_SET);
4260+
if (BufFileSeek(apply_replay_spill_file, 0, 0, SEEK_SET) != 0)
4261+
elog(ERROR, "SPOCK %s: could not seek replay spill file to start",
4262+
MySubscription->name);
42684263
apply_replay_spill_read = 0;
42694264
}
42704265

tests/regress/expected/spill_transaction.out

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -167,16 +167,15 @@ SELECT id, payload FROM test_spill;
167167
-1 | 50000
168168
(1 row)
169169

170-
SELECT command_counter AS cnt,operation AS opt,
171-
remote_new_tup FROM spock.exception_log
170+
SELECT operation AS opt, remote_new_tup FROM spock.exception_log
172171
ORDER BY command_counter DESC LIMIT 5;
173-
cnt | opt | remote_new_tup
174-
-------+--------+-------------------------------------------------------------------------------------------------------------------
175-
50013 | INSERT | [{"value": 50000, "attname": "id", "atttype": "int4"}, {"value": 50000, "attname": "payload", "atttype": "int4"}]
176-
50012 | INSERT | [{"value": 49999, "attname": "id", "atttype": "int4"}, {"value": 49999, "attname": "payload", "atttype": "int4"}]
177-
50011 | INSERT | [{"value": 49998, "attname": "id", "atttype": "int4"}, {"value": 49998, "attname": "payload", "atttype": "int4"}]
178-
50010 | INSERT | [{"value": 49997, "attname": "id", "atttype": "int4"}, {"value": 49997, "attname": "payload", "atttype": "int4"}]
179-
50009 | INSERT | [{"value": 49996, "attname": "id", "atttype": "int4"}, {"value": 49996, "attname": "payload", "atttype": "int4"}]
172+
opt | remote_new_tup
173+
--------+-------------------------------------------------------------------------------------------------------------------
174+
INSERT | [{"value": 50000, "attname": "id", "atttype": "int4"}, {"value": 50000, "attname": "payload", "atttype": "int4"}]
175+
INSERT | [{"value": 49999, "attname": "id", "atttype": "int4"}, {"value": 49999, "attname": "payload", "atttype": "int4"}]
176+
INSERT | [{"value": 49998, "attname": "id", "atttype": "int4"}, {"value": 49998, "attname": "payload", "atttype": "int4"}]
177+
INSERT | [{"value": 49997, "attname": "id", "atttype": "int4"}, {"value": 49997, "attname": "payload", "atttype": "int4"}]
178+
INSERT | [{"value": 49996, "attname": "id", "atttype": "int4"}, {"value": 49996, "attname": "payload", "atttype": "int4"}]
180179
(5 rows)
181180

182181
-- ============================================================
@@ -221,16 +220,15 @@ SELECT count(*), sum(id), sum(payload) FROM test_spill;
221220
50000 | 1249974999 | 1250025000
222221
(1 row)
223222

224-
SELECT command_counter AS cnt,operation AS opt,
225-
remote_new_tup FROM spock.exception_log
223+
SELECT operation AS opt,remote_new_tup FROM spock.exception_log
226224
ORDER BY command_counter DESC LIMIT 5;
227-
cnt | opt | remote_new_tup
228-
--------+--------+-------------------------------------------------------------------------------------------------------------------
229-
100013 | INSERT | [{"value": 50000, "attname": "id", "atttype": "int4"}, {"value": 50000, "attname": "payload", "atttype": "int4"}]
230-
100012 | INSERT | [{"value": 49999, "attname": "id", "atttype": "int4"}, {"value": 49999, "attname": "payload", "atttype": "int4"}]
231-
100011 | INSERT | [{"value": 49998, "attname": "id", "atttype": "int4"}, {"value": 49998, "attname": "payload", "atttype": "int4"}]
232-
100010 | INSERT | [{"value": 49997, "attname": "id", "atttype": "int4"}, {"value": 49997, "attname": "payload", "atttype": "int4"}]
233-
100009 | INSERT | [{"value": 49996, "attname": "id", "atttype": "int4"}, {"value": 49996, "attname": "payload", "atttype": "int4"}]
225+
opt | remote_new_tup
226+
--------+-------------------------------------------------------------------------------------------------------------------
227+
INSERT | [{"value": 50000, "attname": "id", "atttype": "int4"}, {"value": 50000, "attname": "payload", "atttype": "int4"}]
228+
INSERT | [{"value": 49999, "attname": "id", "atttype": "int4"}, {"value": 49999, "attname": "payload", "atttype": "int4"}]
229+
INSERT | [{"value": 49998, "attname": "id", "atttype": "int4"}, {"value": 49998, "attname": "payload", "atttype": "int4"}]
230+
INSERT | [{"value": 49997, "attname": "id", "atttype": "int4"}, {"value": 49997, "attname": "payload", "atttype": "int4"}]
231+
INSERT | [{"value": 49996, "attname": "id", "atttype": "int4"}, {"value": 49996, "attname": "payload", "atttype": "int4"}]
234232
(5 rows)
235233

236234
-- ============================================================
@@ -239,25 +237,35 @@ ORDER BY command_counter DESC LIMIT 5;
239237
-- conflicting row, re-enable, and verify the transaction
240238
-- is applied successfully.
241239
-- ============================================================
240+
\c :provider_dsn
241+
TRUNCATE test_spill RESTART IDENTITY;
242+
SELECT spock.sync_event() as sync_lsn \gset
242243
\c :subscriber_dsn
244+
CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30);
245+
result
246+
--------
247+
t
248+
(1 row)
249+
243250
ALTER SYSTEM SET spock.exception_behaviour = 'sub_disable';
244251
SELECT pg_reload_conf();
245252
pg_reload_conf
246253
----------------
247254
t
248255
(1 row)
249256

250-
\c :provider_dsn
251-
TRUNCATE test_spill RESTART IDENTITY;
252-
SELECT spock.sync_event() as sync_lsn
253-
\gset
254-
\c :subscriber_dsn
255-
CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30);
256-
result
257-
--------
258-
t
257+
-- Check that all works and clean on subscriber
258+
SELECT status FROM spock.sub_show_status('test_subscription');
259+
status
260+
-------------
261+
replicating
259262
(1 row)
260263

264+
SELECT * FROM test_spill; -- empty
265+
id | payload
266+
----+---------
267+
(0 rows)
268+
261269
INSERT INTO test_spill (id, payload) VALUES (-1, 50000); -- Add conflicting record
262270
\c :provider_dsn
263271
COPY test_spill (id) FROM PROGRAM 'seq 1 50000' WITH (FORMAT text);
@@ -304,16 +312,15 @@ SELECT node_name,relname,idxname,conflict_type,conflict_resolution,
304312
-----------+---------+---------+---------------+---------------------+-------------+--------------
305313
(0 rows)
306314

307-
SELECT command_counter AS cnt,operation AS opt,
308-
remote_new_tup FROM spock.exception_log
315+
SELECT operation AS opt, remote_new_tup FROM spock.exception_log
309316
ORDER BY command_counter DESC LIMIT 5;
310-
cnt | opt | remote_new_tup
311-
--------+-------------+-------------------------------------------------------------------------------------------------------------------
312-
150014 | SUB_DISABLE |
313-
150013 | INSERT | [{"value": 50000, "attname": "id", "atttype": "int4"}, {"value": 50000, "attname": "payload", "atttype": "int4"}]
314-
150012 | INSERT | [{"value": 49999, "attname": "id", "atttype": "int4"}, {"value": 49999, "attname": "payload", "atttype": "int4"}]
315-
150011 | INSERT | [{"value": 49998, "attname": "id", "atttype": "int4"}, {"value": 49998, "attname": "payload", "atttype": "int4"}]
316-
150010 | INSERT | [{"value": 49997, "attname": "id", "atttype": "int4"}, {"value": 49997, "attname": "payload", "atttype": "int4"}]
317+
opt | remote_new_tup
318+
-------------+-------------------------------------------------------------------------------------------------------------------
319+
SUB_DISABLE |
320+
INSERT | [{"value": 50000, "attname": "id", "atttype": "int4"}, {"value": 50000, "attname": "payload", "atttype": "int4"}]
321+
INSERT | [{"value": 49999, "attname": "id", "atttype": "int4"}, {"value": 49999, "attname": "payload", "atttype": "int4"}]
322+
INSERT | [{"value": 49998, "attname": "id", "atttype": "int4"}, {"value": 49998, "attname": "payload", "atttype": "int4"}]
323+
INSERT | [{"value": 49997, "attname": "id", "atttype": "int4"}, {"value": 49997, "attname": "payload", "atttype": "int4"}]
317324
(5 rows)
318325

319326
-- ============================================================

tests/regress/sql/spill_transaction.sql

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ SELECT pg_reload_conf();
9292

9393
\c :provider_dsn
9494
TRUNCATE test_spill RESTART IDENTITY;
95-
9695
SELECT spock.sync_event() as sync_lsn
9796
\gset
9897

@@ -112,8 +111,7 @@ CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30);
112111
-- Check: COPY transaction discarded, single record presented
113112
SELECT id, payload FROM test_spill;
114113

115-
SELECT command_counter AS cnt,operation AS opt,
116-
remote_new_tup FROM spock.exception_log
114+
SELECT operation AS opt, remote_new_tup FROM spock.exception_log
117115
ORDER BY command_counter DESC LIMIT 5;
118116

119117
-- ============================================================
@@ -126,7 +124,6 @@ SELECT pg_reload_conf();
126124

127125
\c :provider_dsn
128126
TRUNCATE test_spill RESTART IDENTITY;
129-
130127
SELECT spock.sync_event() as sync_lsn
131128
\gset
132129

@@ -145,8 +142,7 @@ CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30);
145142
-- Check 49999 COPY records applied plus older one has been kept
146143
SELECT count(*), sum(id), sum(payload) FROM test_spill;
147144

148-
SELECT command_counter AS cnt,operation AS opt,
149-
remote_new_tup FROM spock.exception_log
145+
SELECT operation AS opt,remote_new_tup FROM spock.exception_log
150146
ORDER BY command_counter DESC LIMIT 5;
151147

152148
-- ============================================================
@@ -155,18 +151,20 @@ ORDER BY command_counter DESC LIMIT 5;
155151
-- conflicting row, re-enable, and verify the transaction
156152
-- is applied successfully.
157153
-- ============================================================
158-
\c :subscriber_dsn
159-
ALTER SYSTEM SET spock.exception_behaviour = 'sub_disable';
160-
SELECT pg_reload_conf();
161154

162155
\c :provider_dsn
163156
TRUNCATE test_spill RESTART IDENTITY;
164-
165-
SELECT spock.sync_event() as sync_lsn
166-
\gset
157+
SELECT spock.sync_event() as sync_lsn \gset
167158

168159
\c :subscriber_dsn
169160
CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30);
161+
ALTER SYSTEM SET spock.exception_behaviour = 'sub_disable';
162+
SELECT pg_reload_conf();
163+
164+
-- Check that all works and clean on subscriber
165+
SELECT status FROM spock.sub_show_status('test_subscription');
166+
SELECT * FROM test_spill; -- empty
167+
170168
INSERT INTO test_spill (id, payload) VALUES (-1, 50000); -- Add conflicting record
171169

172170
\c :provider_dsn
@@ -200,8 +198,7 @@ SELECT count(*), sum(id), sum(payload) FROM test_spill;
200198
-- Check how the conflict has been processed.
201199
SELECT node_name,relname,idxname,conflict_type,conflict_resolution,
202200
local_tuple,remote_tuple FROM spock.resolutions;
203-
SELECT command_counter AS cnt,operation AS opt,
204-
remote_new_tup FROM spock.exception_log
201+
SELECT operation AS opt, remote_new_tup FROM spock.exception_log
205202
ORDER BY command_counter DESC LIMIT 5;
206203

207204
-- ============================================================

0 commit comments

Comments
 (0)