Skip to content

Commit eabd7c6

Browse files
committed
test: add continuous data validation framework
- Dockerfile.swingbench: containerized Swingbench load generator - validator.py: tails LogMiner + OLR JSONL files, matches events by content in real-time, stops swingbench on mismatch via Docker socket - docker-compose.yaml: full stack with receiver, dbz-logminer, dbz-olr, swingbench, validator, prometheus - VALIDATION-PLAN.md: architecture and design decisions Designed for long-running (hours/days) continuous validation. On mismatch, swingbench is stopped immediately to preserve redo logs and event history for offline replay.
1 parent 5868f84 commit eabd7c6

4 files changed

Lines changed: 376 additions & 0 deletions

File tree

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
FROM eclipse-temurin:21-jre
2+
3+
ARG SWINGBENCH_URL=https://github.com/domgiles/swingbench-public/releases/download/production/swingbenchlatest.zip
4+
5+
RUN apt-get update && apt-get install -y --no-install-recommends unzip curl && \
6+
curl -sL -o /tmp/swingbench.zip "$SWINGBENCH_URL" && \
7+
unzip -qo /tmp/swingbench.zip -d /opt && \
8+
rm /tmp/swingbench.zip && \
9+
chmod +x /opt/swingbench/bin/* && \
10+
apt-get remove -y unzip curl && apt-get autoremove -y && rm -rf /var/lib/apt/lists/*
11+
12+
ENV PATH="/opt/swingbench/bin:${PATH}"
13+
14+
WORKDIR /opt/swingbench
15+
16+
ENTRYPOINT ["charbench"]
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Continuous Data Validation Framework
2+
3+
## Goal
4+
5+
Run OLR and LogMiner Debezium adapters simultaneously under sustained load,
6+
continuously validate both produce identical events, and stop immediately on
7+
any mismatch — preserving redo logs and event history for replay.
8+
9+
## Architecture
10+
11+
```
12+
Oracle RAC (VM)
13+
└── OLR container (reads redo → TCP:5000)
14+
15+
Host (docker-compose):
16+
swingbench → continuous OLTP load via CMAN (port 1521)
17+
dbz-logminer → LogMiner adapter → POST /logminer → receiver
18+
dbz-olr → OLR adapter → POST /olr → receiver
19+
receiver → writes logminer.jsonl + olr.jsonl
20+
validator → tails both files, matches events, stops on mismatch
21+
```
22+
23+
## Components
24+
25+
### receiver (existing, no changes needed)
26+
- Writes events to `logminer.jsonl` and `olr.jsonl`
27+
- Provides `/metrics` for throughput/latency monitoring
28+
29+
### swingbench (new container)
30+
- `Dockerfile.swingbench` — eclipse-temurin:21 + Swingbench
31+
- Connects to Oracle via CMAN (`VM_IP:1521`)
32+
- Configurable users and runtime via env vars / command args
33+
- Stopped by validator on mismatch
34+
35+
### validator (new)
36+
- Python script that tails both JSONL files
37+
- Extracts match key: `(table, op, sorted(after_columns))`
38+
- Maintains two multisets (one per adapter)
39+
- Match window: events from one adapter are held for N seconds waiting
40+
for the matching event from the other adapter
41+
- On timeout (event in one adapter but not the other): MISMATCH → stop
42+
- On content diff (same key but different values): MISMATCH → stop
43+
- On match: remove from both sets, increment match counter
44+
- Logs progress every 10s: matched count, pending LM, pending OLR
45+
46+
### On mismatch:
47+
1. Validator sends `docker stop swingbench` (DML stops)
48+
2. Logs the mismatched events with full detail
49+
3. Redo logs on VM are preserved (no log switch)
50+
4. JSONL files preserved for offline replay
51+
5. Exit with non-zero code
52+
53+
## Docker Compose
54+
55+
```yaml
56+
services:
57+
receiver: # existing
58+
dbz-logminer: # existing
59+
dbz-olr: # existing
60+
swingbench:
61+
image: swingbench:latest
62+
network_mode: host
63+
command: ["-cs", "//VM_IP:1521/ORCLPDB", "-u", "soe", "-p", "soe",
64+
"-c", "/opt/swingbench/configs/SOE_Server_Side_V2.xml",
65+
"-uc", "4", "-rt", "99:00.00", "-nc", "-nr", "-s"]
66+
validator:
67+
image: python:3.12-slim
68+
network_mode: host
69+
volumes:
70+
- ./output:/app/output:ro
71+
- /var/run/docker.sock:/var/run/docker.sock
72+
command: ["python3", "/app/validator.py",
73+
"--logminer", "/app/output/logminer.jsonl",
74+
"--olr", "/app/output/olr.jsonl",
75+
"--match-window", "60"]
76+
```
77+
78+
## Match Key Design
79+
80+
For INSERT: `(table, "c", hash(sorted(after_columns)))`
81+
For UPDATE: `(table, "u", hash(sorted(before_columns)), hash(sorted(after_columns)))`
82+
For DELETE: `(table, "d", hash(sorted(before_columns)))`
83+
84+
Using hash of column values (not full content) keeps memory bounded for
85+
long-running tests. Store full content only for recent unmatched events
86+
(within match window) for mismatch reporting.
87+
88+
## Open Questions
89+
90+
- Match window duration: 60s? 120s? Depends on max lag between adapters.
91+
- Should validator also check event count periodically?
92+
- Memory management for very long runs (hours/days)?
93+
- Should we also validate ordering within the same table/key?

tests/sql/environments/rac/debezium/perf/docker-compose.yaml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,50 @@ services:
3636
- ../../../../../debezium/lib/debezium-connector-oracle-3.5.0.Beta1.jar:/debezium/lib/debezium-connector-oracle-3.5.0.Beta1.jar:ro
3737
- dbz-olr-data:/debezium/data
3838

39+
swingbench:
40+
image: swingbench:latest
41+
container_name: perf-swingbench
42+
network_mode: host
43+
# Default: 4 users, run for 99 hours (effectively forever until stopped)
44+
# Override with: docker compose run swingbench -uc 8 -rt 01:00.00
45+
command:
46+
- "-cs"
47+
- "//192.168.122.130:1521/ORCLPDB"
48+
- "-u"
49+
- "soe"
50+
- "-p"
51+
- "soe"
52+
- "-c"
53+
- "/opt/swingbench/configs/SOE_Server_Side_V2.xml"
54+
- "-uc"
55+
- "${SWINGBENCH_USERS:-4}"
56+
- "-rt"
57+
- "${SWINGBENCH_RUNTIME:-99:00.00}"
58+
- "-nc"
59+
- "-nr"
60+
- "-s"
61+
62+
validator:
63+
image: python:3.12-slim
64+
container_name: perf-validator
65+
network_mode: host
66+
depends_on:
67+
receiver:
68+
condition: service_started
69+
volumes:
70+
- ./validator.py:/app/validator.py:ro
71+
- ./output:/app/output:ro
72+
- /var/run/docker.sock:/var/run/docker.sock
73+
command:
74+
- "python3"
75+
- "/app/validator.py"
76+
- "--logminer"
77+
- "/app/output/logminer.jsonl"
78+
- "--olr"
79+
- "/app/output/olr.jsonl"
80+
- "--match-window"
81+
- "${MATCH_WINDOW:-120}"
82+
3983
prometheus:
4084
image: prom/prometheus:latest
4185
container_name: perf-prometheus
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
#!/usr/bin/env python3
2+
"""Real-time validator: tails LogMiner and OLR JSONL files, matches events.
3+
4+
Stops the swingbench container on mismatch. Designed for long-running
5+
continuous validation of OLR vs LogMiner data correctness.
6+
7+
Usage:
8+
python3 validator.py --logminer output/logminer.jsonl --olr output/olr.jsonl
9+
"""
10+
11+
import argparse
12+
import json
13+
import os
14+
import subprocess
15+
import sys
16+
import time
17+
from collections import defaultdict
18+
19+
SENTINEL_TABLE = 'DEBEZIUM_SENTINEL'
20+
POLL_INTERVAL = 1.0 # seconds between file polls
21+
REPORT_INTERVAL = 10.0 # seconds between progress reports
22+
DEFAULT_MATCH_WINDOW = 120 # seconds to wait for matching event
23+
24+
25+
def normalize_value(v):
26+
if v is None:
27+
return None
28+
return str(v)
29+
30+
31+
def event_key(event):
32+
"""Extract a content-based match key from a Debezium event."""
33+
source = event.get('source', {})
34+
table = source.get('table', '')
35+
op = event.get('op', '')
36+
37+
if table == SENTINEL_TABLE:
38+
return None # skip sentinel
39+
40+
after = event.get('after') or {}
41+
before = event.get('before') or {}
42+
43+
after_norm = tuple(sorted((k, normalize_value(v)) for k, v in after.items()))
44+
before_norm = tuple(sorted((k, normalize_value(v)) for k, v in before.items()))
45+
46+
if op == 'c':
47+
return (table, op, after_norm)
48+
elif op == 'u':
49+
return (table, op, before_norm, after_norm)
50+
elif op == 'd':
51+
return (table, op, before_norm)
52+
else:
53+
return None # skip unknown ops (heartbeats, etc.)
54+
55+
56+
def tail_file(path, position):
57+
"""Read new lines from file starting at position. Returns (lines, new_position)."""
58+
try:
59+
size = os.path.getsize(path)
60+
except OSError:
61+
return [], position
62+
63+
if size <= position:
64+
return [], position
65+
66+
lines = []
67+
with open(path, 'r') as f:
68+
f.seek(position)
69+
for line in f:
70+
line = line.strip()
71+
if line:
72+
lines.append(line)
73+
new_position = f.tell()
74+
return lines, new_position
75+
76+
77+
def stop_swingbench():
78+
"""Stop the swingbench container via Docker socket."""
79+
import socket
80+
try:
81+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
82+
sock.connect('/var/run/docker.sock')
83+
request = (
84+
'POST /v1.40/containers/perf-swingbench/stop HTTP/1.1\r\n'
85+
'Host: localhost\r\n'
86+
'Content-Length: 0\r\n'
87+
'\r\n'
88+
)
89+
sock.sendall(request.encode())
90+
response = sock.recv(4096).decode()
91+
sock.close()
92+
if '204' in response or '304' in response:
93+
print(' Swingbench stopped', flush=True)
94+
else:
95+
print(f' WARNING: Unexpected response: {response[:100]}', flush=True)
96+
except Exception as e:
97+
print(f' WARNING: Failed to stop swingbench: {e}', flush=True)
98+
99+
100+
def main():
101+
parser = argparse.ArgumentParser(description='Real-time OLR vs LogMiner validator')
102+
parser.add_argument('--logminer', required=True, help='Path to logminer.jsonl')
103+
parser.add_argument('--olr', required=True, help='Path to olr.jsonl')
104+
parser.add_argument('--match-window', type=int, default=DEFAULT_MATCH_WINDOW,
105+
help=f'Seconds to wait for matching event (default: {DEFAULT_MATCH_WINDOW})')
106+
parser.add_argument('--stop-on-fail', action='store_true', default=True,
107+
help='Stop swingbench on mismatch (default: true)')
108+
args = parser.parse_args()
109+
110+
print(f'Validator starting', flush=True)
111+
print(f' LogMiner: {args.logminer}', flush=True)
112+
print(f' OLR: {args.olr}', flush=True)
113+
print(f' Match window: {args.match_window}s', flush=True)
114+
print(flush=True)
115+
116+
# Pending events: key -> [(timestamp, channel, full_event), ...]
117+
# When both sides produce the same key, they cancel out (match).
118+
lm_pending = {} # key -> (timestamp, event_json)
119+
olr_pending = {} # key -> (timestamp, event_json)
120+
121+
lm_pos = 0
122+
olr_pos = 0
123+
matched = 0
124+
lm_total = 0
125+
olr_total = 0
126+
skipped = 0
127+
last_report = time.time()
128+
129+
while True:
130+
now = time.time()
131+
132+
# Tail both files
133+
lm_lines, lm_pos = tail_file(args.logminer, lm_pos)
134+
olr_lines, olr_pos = tail_file(args.olr, olr_pos)
135+
136+
# Process LogMiner events
137+
for line in lm_lines:
138+
try:
139+
event = json.loads(line)
140+
except json.JSONDecodeError:
141+
continue
142+
key = event_key(event)
143+
if key is None:
144+
skipped += 1
145+
continue
146+
lm_total += 1
147+
148+
if key in olr_pending:
149+
# Match found — OLR already has this event
150+
del olr_pending[key]
151+
matched += 1
152+
else:
153+
lm_pending[key] = (now, line)
154+
155+
# Process OLR events
156+
for line in olr_lines:
157+
try:
158+
event = json.loads(line)
159+
except json.JSONDecodeError:
160+
continue
161+
key = event_key(event)
162+
if key is None:
163+
skipped += 1
164+
continue
165+
olr_total += 1
166+
167+
if key in lm_pending:
168+
# Match found — LogMiner already has this event
169+
del lm_pending[key]
170+
matched += 1
171+
else:
172+
olr_pending[key] = (now, line)
173+
174+
# Check for expired events (exceeded match window)
175+
expired_lm = [(k, ts, line) for k, (ts, line) in lm_pending.items()
176+
if now - ts > args.match_window]
177+
expired_olr = [(k, ts, line) for k, (ts, line) in olr_pending.items()
178+
if now - ts > args.match_window]
179+
180+
if expired_lm or expired_olr:
181+
print(flush=True)
182+
print('!!! MISMATCH DETECTED !!!', flush=True)
183+
print(f' Matched so far: {matched}', flush=True)
184+
print(f' LogMiner total: {lm_total}, OLR total: {olr_total}', flush=True)
185+
print(f' LogMiner pending: {len(lm_pending)}, OLR pending: {len(olr_pending)}', flush=True)
186+
print(flush=True)
187+
188+
if expired_lm:
189+
print(f' Events in LogMiner but NOT in OLR ({len(expired_lm)} expired):', flush=True)
190+
for key, ts, line in expired_lm[:5]:
191+
age = now - ts
192+
print(f' [{age:.0f}s old] table={key[0]} op={key[1]}', flush=True)
193+
print(f' {line[:200]}', flush=True)
194+
195+
if expired_olr:
196+
print(f' Events in OLR but NOT in LogMiner ({len(expired_olr)} expired):', flush=True)
197+
for key, ts, line in expired_olr[:5]:
198+
age = now - ts
199+
print(f' [{age:.0f}s old] table={key[0]} op={key[1]}', flush=True)
200+
print(f' {line[:200]}', flush=True)
201+
202+
if args.stop_on_fail:
203+
stop_swingbench()
204+
205+
print(flush=True)
206+
print('VALIDATION FAILED', flush=True)
207+
sys.exit(1)
208+
209+
# Progress report
210+
if now - last_report >= REPORT_INTERVAL:
211+
print(f'[{time.strftime("%H:%M:%S")}] '
212+
f'matched={matched:,} '
213+
f'lm={lm_total:,} olr={olr_total:,} '
214+
f'pending: lm={len(lm_pending):,} olr={len(olr_pending):,} '
215+
f'skipped={skipped:,}',
216+
flush=True)
217+
last_report = now
218+
219+
time.sleep(POLL_INTERVAL)
220+
221+
222+
if __name__ == '__main__':
223+
main()

0 commit comments

Comments
 (0)