forked from bersler/OpenLogReplicator
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvalidator.py
More file actions
228 lines (189 loc) · 7.88 KB
/
validator.py
File metadata and controls
228 lines (189 loc) · 7.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#!/usr/bin/env python3
"""Real-time validator: tails LogMiner and OLR JSONL files, matches events.
Stops the swingbench container on mismatch. Designed for long-running
continuous validation of OLR vs LogMiner data correctness.
Usage:
python3 validator.py --logminer output/logminer.jsonl --olr output/olr.jsonl
"""
import argparse
import json
import os
import subprocess
import sys
import time
from collections import defaultdict
SENTINEL_TABLE = 'DEBEZIUM_SENTINEL'
POLL_INTERVAL = 1.0 # seconds between file polls
REPORT_INTERVAL = 10.0 # seconds between progress reports
DEFAULT_MATCH_WINDOW = 120 # seconds to wait for matching event
def normalize_value(v):
if v is None:
return None
return str(v)
def event_key(event):
"""Extract a content-based match key from a Debezium event."""
source = event.get('source', {})
table = source.get('table', '')
op = event.get('op', '')
if table in (SENTINEL_TABLE, 'BENCHMARK_RESULTS'):
return None # skip sentinel and Swingbench internal tables
after = event.get('after') or {}
before = event.get('before') or {}
after_norm = tuple(sorted((k, normalize_value(v)) for k, v in after.items()))
before_norm = tuple(sorted((k, normalize_value(v)) for k, v in before.items()))
if op == 'c':
return (table, op, after_norm)
elif op == 'u':
return (table, op, before_norm, after_norm)
elif op == 'd':
return (table, op, before_norm)
else:
return None # skip unknown ops (heartbeats, etc.)
def tail_file(path, position):
"""Read new lines from file starting at position. Returns (lines, new_position)."""
try:
size = os.path.getsize(path)
except OSError:
return [], position
if size <= position:
return [], position
lines = []
with open(path, 'r') as f:
f.seek(position)
for line in f:
line = line.strip()
if line:
lines.append(line)
new_position = f.tell()
return lines, new_position
def stop_swingbench():
"""Stop the swingbench container via Docker socket."""
import socket
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect('/var/run/docker.sock')
request = (
'POST /v1.40/containers/perf-swingbench/stop HTTP/1.1\r\n'
'Host: localhost\r\n'
'Content-Length: 0\r\n'
'\r\n'
)
sock.sendall(request.encode())
response = sock.recv(4096).decode()
sock.close()
if '204' in response or '304' in response:
print(' Swingbench stopped', flush=True)
else:
print(f' WARNING: Unexpected response: {response[:100]}', flush=True)
except Exception as e:
print(f' WARNING: Failed to stop swingbench: {e}', flush=True)
def main():
parser = argparse.ArgumentParser(description='Real-time OLR vs LogMiner validator')
parser.add_argument('--logminer', required=True, help='Path to logminer.jsonl')
parser.add_argument('--olr', required=True, help='Path to olr.jsonl')
parser.add_argument('--match-window', type=int, default=DEFAULT_MATCH_WINDOW,
help=f'Seconds to wait for matching event (default: {DEFAULT_MATCH_WINDOW})')
parser.add_argument('--no-stop-on-fail', dest='stop_on_fail', action='store_false', default=True,
help='Do not stop swingbench on mismatch')
args = parser.parse_args()
print(f'Validator starting', flush=True)
print(f' LogMiner: {args.logminer}', flush=True)
print(f' OLR: {args.olr}', flush=True)
print(f' Match window: {args.match_window}s', flush=True)
print(flush=True)
# Pending events: key -> [(timestamp, event_json), ...]
# When both sides produce the same key, they cancel out (match).
# Using lists to handle duplicate events (same key can appear multiple times).
lm_pending = {} # key -> [(timestamp, event_json), ...]
olr_pending = {} # key -> [(timestamp, event_json), ...]
lm_pos = 0
olr_pos = 0
matched = 0
lm_total = 0
olr_total = 0
skipped = 0
last_report = time.time()
while True:
now = time.time()
# Tail both files
lm_lines, lm_pos = tail_file(args.logminer, lm_pos)
olr_lines, olr_pos = tail_file(args.olr, olr_pos)
# Process LogMiner events
for line in lm_lines:
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
key = event_key(event)
if key is None:
skipped += 1
continue
lm_total += 1
if key in olr_pending and olr_pending[key]:
# Match found — OLR already has this event
olr_pending[key].pop(0)
if not olr_pending[key]:
del olr_pending[key]
matched += 1
else:
lm_pending.setdefault(key, []).append((now, line))
# Process OLR events
for line in olr_lines:
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
key = event_key(event)
if key is None:
skipped += 1
continue
olr_total += 1
if key in lm_pending and lm_pending[key]:
# Match found — LogMiner already has this event
lm_pending[key].pop(0)
if not lm_pending[key]:
del lm_pending[key]
matched += 1
else:
olr_pending.setdefault(key, []).append((now, line))
# Check for expired events (exceeded match window)
expired_lm = [(k, ts, line) for k, entries in lm_pending.items()
for ts, line in entries if now - ts > args.match_window]
expired_olr = [(k, ts, line) for k, entries in olr_pending.items()
for ts, line in entries if now - ts > args.match_window]
if expired_lm or expired_olr:
print(flush=True)
print('!!! MISMATCH DETECTED !!!', flush=True)
print(f' Matched so far: {matched}', flush=True)
print(f' LogMiner total: {lm_total}, OLR total: {olr_total}', flush=True)
print(f' LogMiner pending: {sum(len(v) for v in lm_pending.values())}, OLR pending: {sum(len(v) for v in olr_pending.values())}', flush=True)
print(flush=True)
if expired_lm:
print(f' Events in LogMiner but NOT in OLR ({len(expired_lm)} expired):', flush=True)
for key, ts, line in expired_lm[:5]:
age = now - ts
print(f' [{age:.0f}s old] table={key[0]} op={key[1]}', flush=True)
print(f' {line[:200]}', flush=True)
if expired_olr:
print(f' Events in OLR but NOT in LogMiner ({len(expired_olr)} expired):', flush=True)
for key, ts, line in expired_olr[:5]:
age = now - ts
print(f' [{age:.0f}s old] table={key[0]} op={key[1]}', flush=True)
print(f' {line[:200]}', flush=True)
if args.stop_on_fail:
stop_swingbench()
print(flush=True)
print('VALIDATION FAILED', flush=True)
sys.exit(1)
# Progress report
if now - last_report >= REPORT_INTERVAL:
print(f'[{time.strftime("%H:%M:%S")}] '
f'matched={matched:,} '
f'lm={lm_total:,} olr={olr_total:,} '
f'pending: lm={sum(len(v) for v in lm_pending.values()):,} olr={sum(len(v) for v in olr_pending.values()):,} '
f'skipped={skipped:,}',
flush=True)
last_report = now
time.sleep(POLL_INTERVAL)
if __name__ == '__main__':
main()