Skip to content

Commit e9fc810

Browse files
committed
fix: address CodeRabbit review findings on PR #13
- fuzz-test.sh: capture validator exit code without set -e abort - validator.py: per-node watermark cursors instead of global cursor - validator.py: remove undefined lm_max/olr_max completion guard - validator.py: compare before-images for UPDATE/DELETE events - compare-debezium.py: make FUZZ_LOB exclusion a --exclude-tables CLI flag - docker-compose-fuzz.yaml: build consumer image with pinned deps - fuzz-test.sh: propagate RAC workload failures instead of || true - kafka-consumer.py: periodic trimming of per-event seq maps - fuzz-workload.sql: per-table ID tracking for UPDATE/DELETE targeting - fuzz-workload.sql: add UPDATE/DELETE for WIDE, PART, MAXSTR, INTERVAL - FUZZ-TEST.md: clarify down deletes fuzz-data volume - FUZZ-TEST-PLAN.md: update topics, status, watermark description
1 parent 185a2f6 commit e9fc810

9 files changed

Lines changed: 265 additions & 73 deletions

File tree

tests/dbz-twin/compare-debezium.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python3
22
"""Compare Debezium LogMiner vs OLR adapter outputs.
33
4-
Usage: compare-debezium.py <logminer.jsonl> <olr.jsonl>
4+
Usage: compare-debezium.py [--exclude-tables T1,T2,...] <logminer.jsonl> <olr.jsonl>
55
66
Both inputs are JSONL files with Debezium envelope events:
77
{"before":..., "after":..., "source":..., "op":..., "ts_ms":...}
@@ -13,6 +13,7 @@
1313
Exits 0 on match, 1 on mismatch with diff report.
1414
"""
1515

16+
import argparse
1617
import json
1718
import sys
1819
from collections import defaultdict
@@ -26,9 +27,9 @@
2627
'X19kZWJleml1bV91bmF2YWlsYWJsZV92YWx1ZQ==',
2728
}
2829

29-
# Tables to exclude from comparison (stats/bookkeeping, not test data).
30-
# FUZZ_LOB excluded due to known RAC phantom transaction bugs (olr#26, olr#10).
31-
EXCLUDED_TABLES = {'FUZZ_STATS', 'FUZZ_LOB'}
30+
# Tables always excluded from comparison (stats/bookkeeping, not test data).
31+
# Additional tables can be excluded via --exclude-tables CLI flag.
32+
EXCLUDED_TABLES = {'FUZZ_STATS'}
3233

3334

3435
def is_unavailable(v):
@@ -50,8 +51,9 @@ def normalize_columns(d):
5051
return {k: normalize_value(v) for k, v in d.items()}
5152

5253

53-
def parse_debezium_jsonl(path):
54+
def parse_debezium_jsonl(path, excluded_tables=None):
5455
"""Parse a Debezium JSONL file into normalized records."""
56+
skip_tables = EXCLUDED_TABLES | (excluded_tables or set())
5557
records = []
5658
with open(path) as f:
5759
for line in f:
@@ -71,7 +73,7 @@ def parse_debezium_jsonl(path):
7173
continue
7274

7375
# Skip excluded tables
74-
if table in EXCLUDED_TABLES:
76+
if table in skip_tables:
7577
continue
7678

7779
records.append({
@@ -333,12 +335,18 @@ def compare(lm_records, olr_records):
333335

334336

335337
def main():
336-
if len(sys.argv) != 3:
337-
print(f"Usage: {sys.argv[0]} <logminer.jsonl> <olr.jsonl>", file=sys.stderr)
338-
sys.exit(2)
339-
340-
lm_records = parse_debezium_jsonl(sys.argv[1])
341-
olr_records = parse_debezium_jsonl(sys.argv[2])
338+
parser = argparse.ArgumentParser(
339+
description='Compare Debezium LogMiner vs OLR adapter outputs.')
340+
parser.add_argument('logminer_jsonl', help='LogMiner JSONL file')
341+
parser.add_argument('olr_jsonl', help='OLR JSONL file')
342+
parser.add_argument('--exclude-tables', default='',
343+
help='Comma-separated list of additional tables to exclude')
344+
args = parser.parse_args()
345+
346+
extra_excluded = set(t.strip() for t in args.exclude_tables.split(',') if t.strip())
347+
348+
lm_records = parse_debezium_jsonl(args.logminer_jsonl, extra_excluded)
349+
olr_records = parse_debezium_jsonl(args.olr_jsonl, extra_excluded)
342350

343351
# Merge LogMiner's split LOB events (OLR already emits merged events)
344352
lm_merged = merge_lob_events(lm_records)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
FROM python:3.12-slim
2+
RUN pip install --no-cache-dir kafka-python-ng==2.2.3

tests/dbz-twin/rac/FUZZ-TEST.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ Exit 0 = PASS (no non-LOB mismatches), exit 1 = FAIL.
9999
| `./fuzz-test.sh status` | Show container status, consumer counts, OLR memory |
100100
| `./fuzz-test.sh validate` | Wait for consumer drain, run validator, report PASS/FAIL |
101101
| `./fuzz-test.sh logs <c>` | Show logs: kafka, logminer, olr, consumer, validator, olr-vm |
102-
| `./fuzz-test.sh down` | Stop all containers and remove volumes |
102+
| `./fuzz-test.sh down` | Stop all containers and remove volumes (including fuzz-data) |
103103

104104
## Prerequisites
105105

@@ -135,8 +135,9 @@ CREATE TABLE lm_events (
135135
-- olr_events: identical schema
136136
```
137137

138-
The database persists after `down` is called. Query it directly for
139-
investigation:
138+
The `fuzz-data` volume is deleted by `./fuzz-test.sh down` (which runs
139+
`docker compose down -v`). To inspect the database **before** tearing down,
140+
query it while containers are still running:
140141

141142
```bash
142143
docker run --rm -v rac_fuzz-data:/data python:3.12-slim python3 -c "

tests/dbz-twin/rac/docker-compose-fuzz.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ services:
4949
- dbz-olr-data:/debezium/data
5050

5151
consumer:
52-
image: python:3.12-slim
52+
build:
53+
context: .
54+
dockerfile: Dockerfile.consumer
55+
image: fuzz-consumer:latest
5356
container_name: fuzz-consumer
5457
network_mode: host
5558
depends_on:

tests/dbz-twin/rac/fuzz-test.sh

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,19 @@ SQL
237237
done
238238
echo ""
239239

240-
wait $pid1 || true
241-
wait $pid2 || true
240+
local rc1=0 rc2=0
241+
wait $pid1 || rc1=$?
242+
wait $pid2 || rc2=$?
242243

243244
echo " Node 1: $(grep 'FUZZ_DONE:' "$work_dir/fuzz_out1.log" || echo 'no output')"
244245
echo " Node 2: $(grep 'FUZZ_DONE:' "$work_dir/fuzz_out2.log" || echo 'no output')"
245246

247+
if [[ $rc1 -ne 0 || $rc2 -ne 0 ]]; then
248+
echo "ERROR: fuzz workload failed on one or more RAC nodes (rc1=$rc1, rc2=$rc2)" >&2
249+
echo " Check logs: $work_dir/fuzz_out1.log, $work_dir/fuzz_out2.log" >&2
250+
exit 1
251+
fi
252+
246253
# Flush redo
247254
_exec_sysdba "$work_dir/log_switch.sql" > /dev/null
248255
sleep 3
@@ -306,8 +313,8 @@ action_validate() {
306313
echo " Consumer drained (LM events: $cur_count)"
307314

308315
# Start validator (uses 'validate' profile)
309-
docker compose -f "$COMPOSE_FILE" run --rm validator
310-
local exit_code=$?
316+
local exit_code=0
317+
docker compose -f "$COMPOSE_FILE" run --rm validator || exit_code=$?
311318
echo ""
312319
echo " OLR memory: $(_olr_memory_mb) MB"
313320

tests/dbz-twin/rac/kafka-consumer.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,7 @@
1616
import sys
1717
import time
1818

19-
# kafka-python-ng imported after install check
20-
try:
21-
from kafka import KafkaConsumer
22-
except ImportError:
23-
print("Installing kafka-python-ng...", flush=True)
24-
os.system("pip install -q kafka-python-ng")
25-
from kafka import KafkaConsumer
19+
from kafka import KafkaConsumer
2620

2721
KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP', 'localhost:9092')
2822
SQLITE_DB = os.environ.get('SQLITE_DB', '/app/data/fuzz.db')
@@ -158,9 +152,11 @@ def main():
158152
consumer.poll(timeout_ms=1000)
159153
print(f"Subscribed to {LM_TOPIC} and {OLR_TOPIC}", flush=True)
160154

161-
# Track per-event_id sequence numbers for LOB split handling
155+
# Track per-event_id sequence numbers for LOB split handling.
156+
# Periodically trimmed to avoid unbounded growth during long runs.
162157
lm_seq = {} # event_id -> next seq
163158
olr_seq = {} # event_id -> next seq
159+
SEQ_TRIM_THRESHOLD = 10000 # Trim when maps exceed this size
164160

165161
lm_count = 0
166162
olr_count = 0
@@ -222,6 +218,16 @@ def main():
222218
batch = []
223219
batch_start = time.time()
224220

221+
# Trim seq maps to bound memory during long runs.
222+
# Only seq > 0 matters (LOB splits); most events have seq=0 and
223+
# can be safely evicted since they won't be seen again.
224+
for seq_map in (lm_seq, olr_seq):
225+
if len(seq_map) > SEQ_TRIM_THRESHOLD:
226+
# Keep only entries with seq > 0 (active LOB splits)
227+
to_delete = [k for k, v in seq_map.items() if v <= 1]
228+
for k in to_delete:
229+
del seq_map[k]
230+
225231
# Report progress every 30 seconds
226232
now = time.time()
227233
if now - last_report >= 30:

0 commit comments

Comments
 (0)