Skip to content

Commit 2d5bc8a

Browse files
committed
fix: address CodeRabbit review findings on PR #9
- validator.py: use lists for pending events to handle duplicate keys, fix --stop-on-fail flag (was not disableable) - debezium-receiver.py: fix 10s window throughput calc (use full window duration, not first-to-last event span)
1 parent eabd7c6 commit 2d5bc8a

2 files changed

Lines changed: 24 additions & 19 deletions

File tree

tests/sql/environments/rac/debezium/perf/validator.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ def main():
103103
parser.add_argument('--olr', required=True, help='Path to olr.jsonl')
104104
parser.add_argument('--match-window', type=int, default=DEFAULT_MATCH_WINDOW,
105105
help=f'Seconds to wait for matching event (default: {DEFAULT_MATCH_WINDOW})')
106-
parser.add_argument('--stop-on-fail', action='store_true', default=True,
107-
help='Stop swingbench on mismatch (default: true)')
106+
parser.add_argument('--no-stop-on-fail', dest='stop_on_fail', action='store_false', default=True,
107+
help='Do not stop swingbench on mismatch')
108108
args = parser.parse_args()
109109

110110
print(f'Validator starting', flush=True)
@@ -113,10 +113,11 @@ def main():
113113
print(f' Match window: {args.match_window}s', flush=True)
114114
print(flush=True)
115115

116-
# Pending events: key -> [(timestamp, channel, full_event), ...]
116+
# Pending events: key -> [(timestamp, event_json), ...]
117117
# When both sides produce the same key, they cancel out (match).
118-
lm_pending = {} # key -> (timestamp, event_json)
119-
olr_pending = {} # key -> (timestamp, event_json)
118+
# Using lists to handle duplicate events (same key can appear multiple times).
119+
lm_pending = {} # key -> [(timestamp, event_json), ...]
120+
olr_pending = {} # key -> [(timestamp, event_json), ...]
120121

121122
lm_pos = 0
122123
olr_pos = 0
@@ -145,12 +146,14 @@ def main():
145146
continue
146147
lm_total += 1
147148

148-
if key in olr_pending:
149+
if key in olr_pending and olr_pending[key]:
149150
# Match found — OLR already has this event
150-
del olr_pending[key]
151+
olr_pending[key].pop(0)
152+
if not olr_pending[key]:
153+
del olr_pending[key]
151154
matched += 1
152155
else:
153-
lm_pending[key] = (now, line)
156+
lm_pending.setdefault(key, []).append((now, line))
154157

155158
# Process OLR events
156159
for line in olr_lines:
@@ -164,25 +167,27 @@ def main():
164167
continue
165168
olr_total += 1
166169

167-
if key in lm_pending:
170+
if key in lm_pending and lm_pending[key]:
168171
# Match found — LogMiner already has this event
169-
del lm_pending[key]
172+
lm_pending[key].pop(0)
173+
if not lm_pending[key]:
174+
del lm_pending[key]
170175
matched += 1
171176
else:
172-
olr_pending[key] = (now, line)
177+
olr_pending.setdefault(key, []).append((now, line))
173178

174179
# Check for expired events (exceeded match window)
175-
expired_lm = [(k, ts, line) for k, (ts, line) in lm_pending.items()
176-
if now - ts > args.match_window]
177-
expired_olr = [(k, ts, line) for k, (ts, line) in olr_pending.items()
178-
if now - ts > args.match_window]
180+
expired_lm = [(k, ts, line) for k, entries in lm_pending.items()
181+
for ts, line in entries if now - ts > args.match_window]
182+
expired_olr = [(k, ts, line) for k, entries in olr_pending.items()
183+
for ts, line in entries if now - ts > args.match_window]
179184

180185
if expired_lm or expired_olr:
181186
print(flush=True)
182187
print('!!! MISMATCH DETECTED !!!', flush=True)
183188
print(f' Matched so far: {matched}', flush=True)
184189
print(f' LogMiner total: {lm_total}, OLR total: {olr_total}', flush=True)
185-
print(f' LogMiner pending: {len(lm_pending)}, OLR pending: {len(olr_pending)}', flush=True)
190+
print(f' LogMiner pending: {sum(len(v) for v in lm_pending.values())}, OLR pending: {sum(len(v) for v in olr_pending.values())}', flush=True)
186191
print(flush=True)
187192

188193
if expired_lm:
@@ -211,7 +216,7 @@ def main():
211216
print(f'[{time.strftime("%H:%M:%S")}] '
212217
f'matched={matched:,} '
213218
f'lm={lm_total:,} olr={olr_total:,} '
214-
f'pending: lm={len(lm_pending):,} olr={len(olr_pending):,} '
219+
f'pending: lm={sum(len(v) for v in lm_pending.values()):,} olr={sum(len(v) for v in olr_pending.values()):,} '
215220
f'skipped={skipped:,}',
216221
flush=True)
217222
last_report = now

tests/sql/scripts/debezium-receiver.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ def compute_metrics(channel):
165165
# 10-second window throughput
166166
cutoff = now_ms - 10000
167167
recent = [t for t in m['timestamps'] if t >= cutoff]
168-
if len(recent) > 1:
169-
window_s = (recent[-1] - recent[0]) / 1000.0
168+
if len(recent) >= 1:
169+
window_s = (now_ms - cutoff) / 1000.0
170170
if window_s > 0:
171171
result['throughput_10s_eps'] = round(len(recent) / window_s, 1)
172172

0 commit comments

Comments
 (0)