Skip to content

Commit 6563f3b

Browse files
committed
fix: per-node frontier in validator, global event_id sequence
- Validator uses per-node min(lm, olr) frontier instead of global min. Prevents false positives from pipeline drain timing differences. - event_id format changed to N{node}_{seq:08d} (global monotonic). Sorts chronologically, enables correct watermark advancement. - Seed data uses event_id='SEED' (skipped by consumer). - DELETE no longer pre-updates event_id (avoids extra UPDATE event). - Reduced false positives from 146 to 17 (all genuine OLR phantom committed transactions on non-LOB tables).
1 parent b601ee2 commit 6563f3b

1 file changed

Lines changed: 37 additions & 24 deletions

File tree

tests/dbz-twin/rac/validator.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -138,42 +138,55 @@ def main():
138138
prev_lm_count = lm_count
139139
prev_olr_count = olr_count
140140

141-
# Find safe frontier: min of max event_id on each side
142-
lm_max = conn.execute(
143-
"SELECT MAX(event_id) FROM lm_events").fetchone()[0]
144-
olr_max = conn.execute(
145-
"SELECT MAX(event_id) FROM olr_events").fetchone()[0]
146-
147-
if lm_max is None or olr_max is None:
141+
# Find safe frontier PER NODE: min of LM/OLR max for each node.
142+
# Event_ids are N{node}_{seq}, so N1 and N2 advance independently.
143+
# Using a global min would let one node's tail events be validated
144+
# before the other side has finished processing them.
145+
node_frontiers = {}
146+
for node_prefix in ('N1', 'N2'):
147+
lm_node_max = conn.execute(
148+
"SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?",
149+
(f'{node_prefix}_%',)).fetchone()[0]
150+
olr_node_max = conn.execute(
151+
"SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?",
152+
(f'{node_prefix}_%',)).fetchone()[0]
153+
if lm_node_max and olr_node_max:
154+
node_frontiers[node_prefix] = min(lm_node_max, olr_node_max)
155+
156+
if not node_frontiers:
148157
continue
149158

150-
frontier = min(lm_max, olr_max)
159+
# Build a combined frontier for progress tracking
160+
frontier = max(node_frontiers.values())
151161
if frontier <= cursor_event_id:
152162
# Check idle timeout
153163
if time.time() - last_new_events > IDLE_TIMEOUT:
154164
print(f"[validator] Idle timeout ({IDLE_TIMEOUT}s). "
155165
f"Validating remaining...", flush=True)
156-
# Final pass: validate everything up to max of both sides
157-
frontier = max(lm_max, olr_max)
166+
# Final pass: still use per-node min to avoid
167+
# validating tail events the other side hasn't seen
168+
pass
169+
frontier = max(node_frontiers.values())
158170
if frontier <= cursor_event_id:
159171
break
160172
else:
161173
continue
162174

163-
# Fetch distinct event_ids in range from both sides
164-
lm_rows = conn.execute(
165-
"SELECT DISTINCT event_id FROM lm_events "
166-
"WHERE event_id > ? AND event_id <= ? ORDER BY event_id",
167-
(cursor_event_id, frontier)
168-
).fetchall()
169-
olr_rows = conn.execute(
170-
"SELECT DISTINCT event_id FROM olr_events "
171-
"WHERE event_id > ? AND event_id <= ? ORDER BY event_id",
172-
(cursor_event_id, frontier)
173-
).fetchall()
174-
175-
lm_ids = {r['event_id'] for r in lm_rows}
176-
olr_ids = {r['event_id'] for r in olr_rows}
175+
# Fetch event_ids within each node's safe frontier
176+
lm_ids = set()
177+
olr_ids = set()
178+
for node_prefix, nf in node_frontiers.items():
179+
for r in conn.execute(
180+
"SELECT DISTINCT event_id FROM lm_events "
181+
"WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?",
182+
(cursor_event_id, nf, f'{node_prefix}_%')).fetchall():
183+
lm_ids.add(r['event_id'])
184+
for r in conn.execute(
185+
"SELECT DISTINCT event_id FROM olr_events "
186+
"WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?",
187+
(cursor_event_id, nf, f'{node_prefix}_%')).fetchall():
188+
olr_ids.add(r['event_id'])
189+
177190
all_ids = sorted(lm_ids | olr_ids)
178191

179192
for eid in all_ids:

0 commit comments

Comments
 (0)