Skip to content

Commit b11e77a

Browse files
committed
fix: deferred validation + per-node frontier eliminates false positives
- Validator runs only after consumer drains (via 'validate' profile) - Per-node frontier restored: min(lm, olr) per N{x} prefix - Single Kafka topic via RegexRouter (not ByLogicalTableRouter) - fuzz-test.sh actions: up/run/status/validate/logs/down Result: 26,867 events validated, 0 mismatches, 39 known LOB issues. Non-LOB accuracy: 100%.
1 parent f60348c commit b11e77a

3 files changed

Lines changed: 56 additions & 43 deletions

File tree

tests/dbz-twin/rac/docker-compose-fuzz.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ services:
5959
environment:
6060
KAFKA_BOOTSTRAP: localhost:9092
6161
SQLITE_DB: /app/data/fuzz.db
62+
LM_TOPIC: lm-events
63+
OLR_TOPIC: olr-events
6264
volumes:
6365
- ./kafka-consumer.py:/app/kafka-consumer.py:ro
6466
- fuzz-data:/app/data
@@ -67,6 +69,7 @@ services:
6769
image: python:3.12-slim
6870
container_name: fuzz-validator
6971
network_mode: host
72+
profiles: ["validate"]
7073
depends_on:
7174
consumer:
7275
condition: service_started

tests/dbz-twin/rac/fuzz-test.sh

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -286,43 +286,47 @@ action_status() {
286286
}
287287

288288
action_validate() {
289-
echo "=== Waiting for validator to complete ==="
290-
echo " Validator will idle-timeout after processing all events..."
291-
echo ""
292-
293-
# Wait for validator to complete
294-
docker wait fuzz-validator > /dev/null 2>&1 || true
295-
local exit_code
296-
exit_code=$(docker inspect fuzz-validator --format '{{.State.ExitCode}}' 2>/dev/null || echo "1")
297-
298-
# Save log
299-
local vlog="/tmp/fuzz-validator-$(date +%Y%m%d-%H%M%S).log"
300-
docker logs fuzz-validator > "$vlog" 2>&1
301-
echo " Validator log: $vlog"
302-
echo ""
303-
304-
# Show summary
305-
tail -15 "$vlog"
289+
echo "=== Running validation ==="
290+
291+
# Wait for consumer to catch up (no new events for 30s)
292+
echo " Waiting for consumer to drain..."
293+
local prev_count=0 idle_count=0
294+
while true; do
295+
local cur_count
296+
cur_count=$(docker logs --tail 1 fuzz-consumer 2>/dev/null | grep -oP 'LM=\K[0-9]+' || echo "0")
297+
if [[ "$cur_count" == "$prev_count" ]]; then
298+
idle_count=$(( idle_count + 1 ))
299+
[[ $idle_count -ge 6 ]] && break # 30s idle
300+
else
301+
idle_count=0
302+
prev_count=$cur_count
303+
fi
304+
sleep 5
305+
done
306+
echo " Consumer drained (LM events: $cur_count)"
306307

308+
# Start validator (uses 'validate' profile)
309+
docker compose -f "$COMPOSE_FILE" run --rm validator
310+
local exit_code=$?
307311
echo ""
308312
echo " OLR memory: $(_olr_memory_mb) MB"
309313

310314
# OLR errors
311315
local olr_errors
312316
olr_errors=$(ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \
313-
"podman logs $OLR_CONTAINER 2>&1 | grep -c 'ERROR\|ASAN\|AddressSanitizer'" 2>/dev/null || echo "0")
317+
"podman logs $OLR_CONTAINER 2>&1 | grep -c 'ERROR\|ASAN\|AddressSanitizer'" 2>/dev/null | tr -d '[:space:]' || echo "0")
318+
[[ -z "$olr_errors" ]] && olr_errors=0
314319
if [[ "$olr_errors" -gt 0 ]]; then
315320
echo " WARNING: $olr_errors OLR errors detected"
316321
ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \
317322
"podman logs $OLR_CONTAINER 2>&1 | grep 'ERROR\|ASAN' | tail -5" 2>/dev/null
318323
fi
319324

320325
echo ""
321-
if [[ "$exit_code" == "0" ]]; then
326+
if [[ "$exit_code" -eq 0 ]]; then
322327
echo "=== PASS: Fuzz test completed ==="
323328
else
324329
echo "=== FAIL: Fuzz test found mismatches ==="
325-
echo " Investigate: ./fuzz-test.sh logs validator"
326330
fi
327331

328332
return "$exit_code"

tests/dbz-twin/rac/validator.py

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -138,42 +138,48 @@ def main():
138138
prev_lm_count = lm_count
139139
prev_olr_count = olr_count
140140

141-
# Find safe frontier: min of max event_id on each side.
142-
# With single-topic Kafka delivery, events arrive in commit order
143-
# within each adapter, so max(event_id) is a reliable watermark.
144-
lm_max = conn.execute(
145-
"SELECT MAX(event_id) FROM lm_events").fetchone()[0]
146-
olr_max = conn.execute(
147-
"SELECT MAX(event_id) FROM olr_events").fetchone()[0]
148-
149-
if lm_max is None or olr_max is None:
141+
# Find safe frontier per node: min(lm, olr) for each N{x} prefix.
142+
# Event_ids from two RAC nodes interleave non-monotonically in
143+
# commit order, so a global frontier would validate events before
144+
# the other side has delivered them.
145+
node_frontiers = {}
146+
for node_prefix in ('N1', 'N2'):
147+
lm_node_max = conn.execute(
148+
"SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?",
149+
(f'{node_prefix}_%',)).fetchone()[0]
150+
olr_node_max = conn.execute(
151+
"SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?",
152+
(f'{node_prefix}_%',)).fetchone()[0]
153+
if lm_node_max and olr_node_max:
154+
node_frontiers[node_prefix] = min(lm_node_max, olr_node_max)
155+
156+
if not node_frontiers:
150157
continue
151158

152-
frontier = min(lm_max, olr_max)
159+
frontier = max(node_frontiers.values())
153160
if frontier <= cursor_event_id:
154-
# Check idle timeout
155161
if time.time() - last_new_events > IDLE_TIMEOUT:
156162
print(f"[validator] Idle timeout ({IDLE_TIMEOUT}s). "
157163
f"Final validation pass...", flush=True)
158-
frontier = min(lm_max, olr_max)
159164
if frontier <= cursor_event_id:
160165
break
161166
else:
162167
continue
163168

164-
# Fetch distinct event_ids in range from both sides
169+
# Fetch event_ids within each node's safe frontier
165170
lm_ids = set()
166171
olr_ids = set()
167-
for r in conn.execute(
168-
"SELECT DISTINCT event_id FROM lm_events "
169-
"WHERE event_id > ? AND event_id <= ? ORDER BY event_id",
170-
(cursor_event_id, frontier)).fetchall():
171-
lm_ids.add(r['event_id'])
172-
for r in conn.execute(
173-
"SELECT DISTINCT event_id FROM olr_events "
174-
"WHERE event_id > ? AND event_id <= ? ORDER BY event_id",
175-
(cursor_event_id, frontier)).fetchall():
176-
olr_ids.add(r['event_id'])
172+
for node_prefix, nf in node_frontiers.items():
173+
for r in conn.execute(
174+
"SELECT DISTINCT event_id FROM lm_events "
175+
"WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?",
176+
(cursor_event_id, nf, f'{node_prefix}_%')).fetchall():
177+
lm_ids.add(r['event_id'])
178+
for r in conn.execute(
179+
"SELECT DISTINCT event_id FROM olr_events "
180+
"WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?",
181+
(cursor_event_id, nf, f'{node_prefix}_%')).fetchall():
182+
olr_ids.add(r['event_id'])
177183

178184
all_ids = sorted(lm_ids | olr_ids)
179185

0 commit comments

Comments
 (0)