Skip to content

Commit e4fa21a

Browse files
committed
feat: hybrid 3-connector fuzz test with patched Debezium image
Switch to rophy/debezium-server:3.5.0-2a7978c0af which includes: - debezium-config JAR fix (missing in stock 3.5.0.Final) - mergeLogsByPrecedence RAC fix (thread-aware dedup) Implement hybrid architecture: OLR for non-LOB tables + LogMiner for LOB tables on the "actual" side. - Add dbz-lob-logminer service (LOB-only LogMiner connector) - OLR config: skip-lob-tables=1 - Consumer: subscribe to 3 topics, route olr-lob-events to OLR side - Validator: remove KNOWN_LOB_TABLES exemption, LOB mismatches are real failures with hybrid setup - fuzz-test.sh: 3-connector offset seeding, wait, logs - Remove restart: unless-stopped, remove Beta1 JAR mounts
1 parent 7696dfc commit e4fa21a

6 files changed

Lines changed: 115 additions & 49 deletions

File tree

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Debezium Server — LogMiner adapter → Kafka sink (LOB tables only)
2+
# Complements OLR by handling LOB tables that OLR skips (skip-lob-tables=1).
3+
# Events route to olr-lob-events topic and merge into OLR's "actual" stream.
4+
quarkus.http.port=8083
5+
debezium.sink.type=kafka
6+
debezium.sink.kafka.producer.bootstrap.servers=localhost:9092
7+
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
8+
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
9+
10+
debezium.format.value=json
11+
debezium.format.value.schemas.enable=false
12+
debezium.format.key=json
13+
debezium.format.key.schemas.enable=false
14+
15+
debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
16+
debezium.source.database.connection.adapter=logminer
17+
debezium.source.database.hostname=${VM_HOST}
18+
debezium.source.database.port=1521
19+
debezium.source.database.user=c##dbzuser
20+
debezium.source.database.password=dbz
21+
debezium.source.database.dbname=ORCLCDB
22+
debezium.source.database.pdb.name=ORCLPDB
23+
debezium.source.topic.prefix=olr-lob
24+
debezium.source.schema.include.list=OLR_TEST
25+
debezium.source.table.include.list=OLR_TEST.FUZZ_LOB
26+
debezium.source.snapshot.mode=recovery
27+
debezium.source.log.mining.strategy=online_catalog
28+
debezium.source.lob.enabled=true
29+
30+
# Route all tables to a single topic for ordered delivery
31+
debezium.transforms=route
32+
debezium.transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
33+
debezium.transforms.route.regex=.*
34+
debezium.transforms.route.replacement=olr-lob-events
35+
36+
debezium.source.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
37+
debezium.source.offset.storage.topic=dbz-olr-lob-offsets
38+
debezium.source.offset.storage.partitions=1
39+
debezium.source.offset.storage.replication.factor=1
40+
debezium.source.bootstrap.servers=localhost:9092
41+
debezium.source.offset.flush.interval.ms=0
42+
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
43+
debezium.source.schema.history.internal.file.filename=/debezium/data/schema-history.dat

tests/dbz-twin/rac/config/olr-config.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
"scn-type": 1,
2828
"timestamp-type": 1,
2929
"user-type": 0,
30-
"redo-thread": 0
30+
"redo-thread": 0,
31+
"skip-lob-tables": 1
3132
},
3233
"filter": {
3334
"table": [

tests/dbz-twin/rac/docker-compose-fuzz.yaml

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ services:
2222
start_period: 15s
2323

2424
dbz-logminer:
25-
image: quay.io/debezium/server:3.5.0.Beta1
25+
image: rophy/debezium-server:3.5.0-2a7978c0af
2626
container_name: fuzz-dbz-logminer
2727
network_mode: host
28+
environment:
29+
VM_HOST: ${VM_HOST:?VM_HOST is required}
2830
depends_on:
2931
kafka:
3032
condition: service_healthy
@@ -34,19 +36,33 @@ services:
3436
- dbz-logminer-data:/debezium/data
3537

3638
dbz-olr:
37-
image: quay.io/debezium/server:3.5.0.Beta1
39+
image: rophy/debezium-server:3.5.0-2a7978c0af
3840
container_name: fuzz-dbz-olr
3941
network_mode: host
40-
restart: unless-stopped
42+
environment:
43+
VM_HOST: ${VM_HOST:?VM_HOST is required}
4144
depends_on:
4245
kafka:
4346
condition: service_healthy
4447
volumes:
4548
- ./config/application-olr-kafka.properties:/debezium/config/application.properties:ro
4649
- ../lib/ojdbc8.jar:/debezium/lib/ojdbc8.jar:ro
47-
- ../lib/debezium-connector-oracle-3.5.0.Beta1.jar:/debezium/lib/debezium-connector-oracle-3.5.0.Beta1.jar:ro
4850
- dbz-olr-data:/debezium/data
4951

52+
dbz-lob-logminer:
53+
image: rophy/debezium-server:3.5.0-2a7978c0af
54+
container_name: fuzz-dbz-lob-logminer
55+
network_mode: host
56+
environment:
57+
VM_HOST: ${VM_HOST:?VM_HOST is required}
58+
depends_on:
59+
kafka:
60+
condition: service_healthy
61+
volumes:
62+
- ./config/application-lob-logminer-kafka.properties:/debezium/config/application.properties:ro
63+
- ../lib/ojdbc8.jar:/debezium/lib/ojdbc8.jar:ro
64+
- dbz-lob-logminer-data:/debezium/data
65+
5066
consumer:
5167
build:
5268
context: .
@@ -63,6 +79,7 @@ services:
6379
SQLITE_DB: /app/data/fuzz.db
6480
LM_TOPIC: lm-events
6581
OLR_TOPIC: olr-events
82+
OLR_LOB_TOPIC: olr-lob-events
6683
volumes:
6784
- ./kafka-consumer.py:/app/kafka-consumer.py:ro
6885
- fuzz-data:/app/data
@@ -90,4 +107,5 @@ services:
90107
volumes:
91108
dbz-logminer-data:
92109
dbz-olr-data:
110+
dbz-lob-logminer-data:
93111
fuzz-data:

tests/dbz-twin/rac/fuzz-test.sh

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ _seed_debezium_offsets() {
8484

8585
# topic_prefix matches debezium.source.topic.prefix in each connector config
8686
# offset_topic matches debezium.source.offset.storage.topic
87-
local -A topics=( [logminer]=dbz-lm-offsets [olr]=dbz-olr-offsets )
87+
local -A topics=( [logminer]=dbz-lm-offsets [olr]=dbz-olr-offsets [olr-lob]=dbz-olr-lob-offsets )
8888
for topic_prefix in "${!topics[@]}"; do
8989
local offset_topic="${topics[$topic_prefix]}"
9090
local offset_key="[\"kafka\",{\"server\":\"${topic_prefix}\"}]"
@@ -212,15 +212,16 @@ action_up() {
212212

213213
# Wait for Debezium connectors
214214
echo " Waiting for Debezium connectors..."
215-
for i in $(seq 1 60); do
216-
LM_OK=false; OLR_OK=false
217-
docker logs fuzz-dbz-logminer 2>&1 | tail -10 | grep -q "Starting streaming" && LM_OK=true
218-
docker logs fuzz-dbz-olr 2>&1 | tail -10 | grep -q "streaming client started\|Starting streaming" && OLR_OK=true
219-
if $LM_OK && $OLR_OK; then
220-
echo " Debezium: ready"
215+
for i in $(seq 1 90); do
216+
LM_OK=false; OLR_OK=false; LOB_LM_OK=false
217+
docker logs fuzz-dbz-logminer 2>&1 | grep -q "Starting streaming" && LM_OK=true
218+
docker logs fuzz-dbz-olr 2>&1 | grep -q "streaming client started\|Starting streaming" && OLR_OK=true
219+
docker logs fuzz-dbz-lob-logminer 2>&1 | grep -q "Starting streaming" && LOB_LM_OK=true
220+
if $LM_OK && $OLR_OK && $LOB_LM_OK; then
221+
echo " Debezium: ready (3 connectors)"
221222
break
222223
fi
223-
[[ $i -eq 60 ]] && { echo "ERROR: Debezium connectors did not start" >&2; exit 1; }
224+
[[ $i -eq 90 ]] && { echo "ERROR: Debezium connectors did not start" >&2; exit 1; }
224225
sleep 2
225226
done
226227

@@ -427,19 +428,20 @@ action_validate() {
427428
action_logs() {
428429
local component="${1:-}"
429430
case "$component" in
430-
kafka) docker logs fuzz-kafka 2>&1 ;;
431-
logminer) docker logs fuzz-dbz-logminer 2>&1 ;;
432-
olr) docker logs fuzz-dbz-olr 2>&1 ;;
433-
consumer) docker logs fuzz-consumer 2>&1 ;;
434-
validator) docker logs fuzz-validator 2>&1 ;;
435-
olr-vm) ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman logs $OLR_CONTAINER" 2>/dev/null ;;
431+
kafka) docker logs fuzz-kafka 2>&1 ;;
432+
logminer) docker logs fuzz-dbz-logminer 2>&1 ;;
433+
olr) docker logs fuzz-dbz-olr 2>&1 ;;
434+
lob-logminer) docker logs fuzz-dbz-lob-logminer 2>&1 ;;
435+
consumer) docker logs fuzz-consumer 2>&1 ;;
436+
validator) docker logs fuzz-validator 2>&1 ;;
437+
olr-vm) ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman logs $OLR_CONTAINER" 2>/dev/null ;;
436438
"")
437439
echo "Usage: $0 logs <component>"
438-
echo "Components: kafka, logminer, olr, consumer, validator, olr-vm"
440+
echo "Components: kafka, logminer, olr, lob-logminer, consumer, validator, olr-vm"
439441
;;
440442
*)
441443
echo "Unknown component: $component" >&2
442-
echo "Components: kafka, logminer, olr, consumer, validator, olr-vm"
444+
echo "Components: kafka, logminer, olr, lob-logminer, consumer, validator, olr-vm"
443445
exit 1
444446
;;
445447
esac

tests/dbz-twin/rac/kafka-consumer.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,18 @@ def extract_event_info(event):
9494

9595
LM_TOPIC = os.environ.get('LM_TOPIC', 'lm-events')
9696
OLR_TOPIC = os.environ.get('OLR_TOPIC', 'olr-events')
97+
OLR_LOB_TOPIC = os.environ.get('OLR_LOB_TOPIC', 'olr-lob-events')
9798

9899

99100
def determine_adapter(topic):
100-
"""Determine adapter (logminer or olr) from Kafka topic name."""
101+
"""Determine adapter (logminer or olr) from Kafka topic name.
102+
103+
The OLR LOB topic (LogMiner for LOB tables) is treated as 'olr' because
104+
it complements OLR on the "actual" side of the comparison.
105+
"""
101106
if topic == LM_TOPIC:
102107
return 'logminer'
103-
elif topic == OLR_TOPIC:
108+
elif topic in (OLR_TOPIC, OLR_LOB_TOPIC):
104109
return 'olr'
105110
# Fallback for per-table topics
106111
if topic.startswith('logminer'):
@@ -139,18 +144,19 @@ def main():
139144
sys.exit(1)
140145

141146
# Wait for topics to appear, then subscribe
142-
print(f"Waiting for topics: {LM_TOPIC}, {OLR_TOPIC}...", flush=True)
147+
all_topics = [LM_TOPIC, OLR_TOPIC, OLR_LOB_TOPIC]
148+
print(f"Waiting for topics: {', '.join(all_topics)}...", flush=True)
143149
for attempt in range(60):
144150
topics = consumer.topics()
145151
if LM_TOPIC in topics or OLR_TOPIC in topics:
146-
print(f" Found topics: {[t for t in (LM_TOPIC, OLR_TOPIC) if t in topics]}", flush=True)
152+
print(f" Found topics: {[t for t in all_topics if t in topics]}", flush=True)
147153
break
148154
time.sleep(5)
149155

150-
consumer.subscribe([LM_TOPIC, OLR_TOPIC])
156+
consumer.subscribe(all_topics)
151157
# Force metadata refresh
152158
consumer.poll(timeout_ms=1000)
153-
print(f"Subscribed to {LM_TOPIC} and {OLR_TOPIC}", flush=True)
159+
print(f"Subscribed to {', '.join(all_topics)}", flush=True)
154160

155161
# Track per-event_id sequence numbers for LOB split handling.
156162
lm_seq = {} # event_id -> next seq

tests/dbz-twin/rac/validator.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '10'))
2525
IDLE_TIMEOUT = int(os.environ.get('IDLE_TIMEOUT', '120'))
2626

27-
# Known LOB phantom transaction issues (olr#26, olr#10)
28-
# These produce expected mismatches — report but don't fail
29-
KNOWN_LOB_TABLES = {'FUZZ_LOB'}
27+
# LOB tables that use final-state replay for comparison.
28+
# With the hybrid setup (OLR for non-LOB + LogMiner for LOB), these tables
29+
# should match exactly. Mismatches are treated as real failures.
30+
LOB_TABLES = {'FUZZ_LOB'}
3031

3132

3233
def normalize_value(v):
@@ -132,7 +133,6 @@ def main():
132133
total_validated = 0
133134
total_matched = 0
134135
total_mismatches = 0
135-
total_lob_known = 0 # Known LOB issues (expected)
136136
total_missing_lm = 0
137137
total_missing_olr = 0
138138
total_tail_olr = 0 # OLR ahead of LM at drain time (not a bug)
@@ -245,14 +245,12 @@ def main():
245245
"SELECT table_name FROM olr_events WHERE event_id = ? LIMIT 1",
246246
(eid,)).fetchone()
247247
event_table = tbl_row['table_name'] if tbl_row else '?'
248-
is_lob = event_table in KNOWN_LOB_TABLES
248+
is_lob = event_table in LOB_TABLES
249249

250250
if in_lm and not in_olr:
251251
total_missing_olr += 1
252252
if is_tail:
253253
total_tail_lm += 1
254-
elif is_lob:
255-
total_lob_known += 1
256254
else:
257255
total_mismatches += 1
258256
print(f"[MISSING_OLR] {eid} ({event_table})", flush=True)
@@ -263,8 +261,6 @@ def main():
263261
total_missing_lm += 1
264262
if is_tail:
265263
total_tail_olr += 1
266-
elif is_lob:
267-
total_lob_known += 1
268264
else:
269265
total_mismatches += 1
270266
print(f"[EXTRA_OLR] {eid} ({event_table})", flush=True)
@@ -283,20 +279,26 @@ def main():
283279

284280
if is_lob:
285281
# LOB tables: replay ops into final state, compare end result.
286-
# LogMiner merges INSERT + LOB_WRITE into a single record (L2),
287-
# and OLR may have extra/fewer intermediate events due to
288-
# phantom undo (#15). Comparing final state avoids both issues.
282+
# Both sides use LogMiner (expected=full LM, actual=LOB-only LM),
283+
# so they should produce identical final state.
289284
lm_state, lm_exists = replay_final_state(lm_recs)
290285
olr_state, olr_exists = replay_final_state(olr_recs)
291286

292287
if lm_exists != olr_exists:
293-
total_lob_known += 1
288+
total_mismatches += 1
289+
print(f"[LOB_EXISTENCE] {eid} ({event_table}): "
290+
f"LM exists={lm_exists} OLR exists={olr_exists}",
291+
flush=True)
294292
total_validated += 1
295293
else:
296294
diffs = compare_values(lm_state, olr_state,
297295
event_table, 'after')
298296
if diffs:
299-
total_lob_known += 1
297+
total_mismatches += 1
298+
print(f"[LOB_VALUE_DIFF] {eid} ({event_table}):",
299+
flush=True)
300+
for d in diffs[:5]:
301+
print(d, flush=True)
300302
else:
301303
total_matched += 1
302304
total_validated += 1
@@ -373,7 +375,7 @@ def main():
373375
tail_str = (f" tail_olr={total_tail_olr} tail_lm={total_tail_lm}"
374376
if total_tail_olr or total_tail_lm else "")
375377
print(f"[validator] validated={total_validated} matched={total_matched} "
376-
f"mismatches={total_mismatches} lob_known={total_lob_known} "
378+
f"mismatches={total_mismatches} "
377379
f"missing_olr={total_missing_olr} extra_olr={total_missing_lm}"
378380
f"{tail_str} "
379381
f"lm_total={lm_count} olr_total={olr_count} "
@@ -391,7 +393,6 @@ def main():
391393
print(f" Total validated: {total_validated}", flush=True)
392394
print(f" Matched: {total_matched}", flush=True)
393395
print(f" Mismatches: {total_mismatches}", flush=True)
394-
print(f" LOB known issues: {total_lob_known}", flush=True)
395396
print(f" Missing from OLR: {total_missing_olr}", flush=True)
396397
print(f" Extra in OLR: {total_missing_lm}", flush=True)
397398
if total_tail_olr or total_tail_lm:
@@ -404,13 +405,8 @@ def main():
404405
sys.exit(1)
405406
else:
406407
print("\n RESULT: PASS", flush=True)
407-
qualifiers = []
408-
if total_lob_known > 0:
409-
qualifiers.append(f"{total_lob_known} known LOB issues")
410408
if total_tail_olr + total_tail_lm > 0:
411-
qualifiers.append(f"{total_tail_olr + total_tail_lm} tail events")
412-
if qualifiers:
413-
print(f" ({', '.join(qualifiers)})", flush=True)
409+
print(f" ({total_tail_olr + total_tail_lm} tail events)", flush=True)
414410
sys.exit(0)
415411

416412

0 commit comments

Comments
 (0)