Skip to content

Commit f97923d

Browse files
authored
Merge pull request #4 from not-empty/feature/monitor-redis-keys
including redis monitor keys
2 parents 56a538a + 6b42ec9 commit f97923d

11 files changed

Lines changed: 808 additions & 34 deletions

File tree

examples/monitor/monitor.py

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
#!/usr/bin/env python3
2+
"""
3+
OmniQ observer
4+
5+
Polls OmniQ monitoring keys and optional transactional keys to help validate
6+
behavior under stress.
7+
8+
Examples:
9+
python omniq_observer.py --redis-url redis://omniq-redis:6379/0
10+
python omniq_observer.py --redis-url redis://omniq-redis:6379/0 --interval 0.5 --csv omniq_observer.csv
11+
python omniq_observer.py --redis-url redis://omniq-redis:6379/0 --queues emails,pdfs --raw-verify
12+
"""
13+
14+
from __future__ import annotations
15+
16+
import argparse
17+
import csv
18+
import os
19+
import signal
20+
import sys
21+
import time
22+
from dataclasses import dataclass
23+
from typing import Iterable
24+
25+
import redis
26+
27+
28+
@dataclass
29+
class QueueSnapshot:
30+
ts_ms: int
31+
queue: str
32+
paused: int
33+
waiting: int
34+
group_waiting: int
35+
waiting_total: int
36+
active: int
37+
delayed: int
38+
failed: int
39+
completed_kept: int
40+
groups_ready: int
41+
last_activity_ms: int
42+
last_enqueue_ms: int
43+
last_reserve_ms: int
44+
last_finish_ms: int
45+
raw_waiting: int | None = None
46+
raw_group_waiting: int | None = None
47+
raw_waiting_total: int | None = None
48+
raw_active: int | None = None
49+
raw_delayed: int | None = None
50+
raw_failed: int | None = None
51+
raw_completed_kept: int | None = None
52+
raw_groups_ready: int | None = None
53+
ok_waiting: int | None = None
54+
ok_group_waiting: int | None = None
55+
ok_waiting_total: int | None = None
56+
ok_active: int | None = None
57+
ok_delayed: int | None = None
58+
ok_failed: int | None = None
59+
ok_completed_kept: int | None = None
60+
ok_groups_ready: int | None = None
61+
62+
63+
class Observer:
64+
def __init__(
65+
self,
66+
redis_client: redis.Redis,
67+
queues: list[str] | None,
68+
raw_verify: bool,
69+
csv_path: str | None,
70+
console_every: int,
71+
) -> None:
72+
self.r = redis_client
73+
self.explicit_queues = queues
74+
self.raw_verify = raw_verify
75+
self.csv_path = csv_path
76+
self.console_every = max(1, console_every)
77+
self._stop = False
78+
self._loop_n = 0
79+
self._csv_file = None
80+
self._csv_writer = None
81+
82+
def stop(self, *_args) -> None:
83+
self._stop = True
84+
85+
def install_signal_handlers(self) -> None:
86+
signal.signal(signal.SIGINT, self.stop)
87+
signal.signal(signal.SIGTERM, self.stop)
88+
89+
@staticmethod
90+
def _to_i(value: object) -> int:
91+
if value is None:
92+
return 0
93+
if isinstance(value, bytes):
94+
value = value.decode("utf-8", errors="replace")
95+
if value == "":
96+
return 0
97+
try:
98+
return int(float(value))
99+
except Exception:
100+
return 0
101+
102+
def _discover_queues(self) -> list[str]:
103+
if self.explicit_queues is not None:
104+
return self.explicit_queues
105+
names = sorted(
106+
q.decode("utf-8", errors="replace") if isinstance(q, bytes) else str(q)
107+
for q in self.r.smembers("omniq:queues")
108+
)
109+
return names
110+
111+
def _read_stats(self, queue: str) -> dict[str, int]:
112+
stats = self.r.hgetall(f"{queue}:stats")
113+
decoded: dict[str, int] = {}
114+
for k, v in stats.items():
115+
key = k.decode("utf-8", errors="replace") if isinstance(k, bytes) else str(k)
116+
decoded[key] = self._to_i(v)
117+
return decoded
118+
119+
def _scan_group_waiting(self, queue: str) -> int:
120+
total = 0
121+
cursor = 0
122+
pattern = f"{queue}:g:*:wait"
123+
while True:
124+
cursor, keys = self.r.scan(cursor=cursor, match=pattern, count=200)
125+
if keys:
126+
pipe = self.r.pipeline(transaction=False)
127+
for key in keys:
128+
pipe.llen(key)
129+
lengths = pipe.execute()
130+
total += sum(self._to_i(x) for x in lengths)
131+
if cursor == 0:
132+
break
133+
return total
134+
135+
def _read_raw(self, queue: str) -> dict[str, int]:
136+
raw = {
137+
"waiting": self._to_i(self.r.llen(f"{queue}:wait")),
138+
"active": self._to_i(self.r.zcard(f"{queue}:active")),
139+
"delayed": self._to_i(self.r.zcard(f"{queue}:delayed")),
140+
"failed": self._to_i(self.r.llen(f"{queue}:failed")),
141+
"completed_kept": self._to_i(self.r.llen(f"{queue}:completed")),
142+
"groups_ready": self._to_i(self.r.zcard(f"{queue}:groups:ready")),
143+
}
144+
raw["group_waiting"] = self._scan_group_waiting(queue)
145+
raw["waiting_total"] = raw["waiting"] + raw["group_waiting"]
146+
return raw
147+
148+
def snapshot_queue(self, queue: str) -> QueueSnapshot:
149+
now_ms = int(time.time() * 1000)
150+
stats = self._read_stats(queue)
151+
paused = 1 if self.r.exists(f"{queue}:paused") else 0
152+
153+
snap = QueueSnapshot(
154+
ts_ms=now_ms,
155+
queue=queue,
156+
paused=paused,
157+
waiting=stats.get("waiting", 0),
158+
group_waiting=stats.get("group_waiting", 0),
159+
waiting_total=stats.get("waiting_total", 0),
160+
active=stats.get("active", 0),
161+
delayed=stats.get("delayed", 0),
162+
failed=stats.get("failed", 0),
163+
completed_kept=stats.get("completed_kept", 0),
164+
groups_ready=stats.get("groups_ready", 0),
165+
last_activity_ms=stats.get("last_activity_ms", 0),
166+
last_enqueue_ms=stats.get("last_enqueue_ms", 0),
167+
last_reserve_ms=stats.get("last_reserve_ms", 0),
168+
last_finish_ms=stats.get("last_finish_ms", 0),
169+
)
170+
171+
if self.raw_verify:
172+
raw = self._read_raw(queue)
173+
snap.raw_waiting = raw["waiting"]
174+
snap.raw_group_waiting = raw["group_waiting"]
175+
snap.raw_waiting_total = raw["waiting_total"]
176+
snap.raw_active = raw["active"]
177+
snap.raw_delayed = raw["delayed"]
178+
snap.raw_failed = raw["failed"]
179+
snap.raw_completed_kept = raw["completed_kept"]
180+
snap.raw_groups_ready = raw["groups_ready"]
181+
182+
snap.ok_waiting = int(snap.waiting == snap.raw_waiting)
183+
snap.ok_group_waiting = int(snap.group_waiting == snap.raw_group_waiting)
184+
snap.ok_waiting_total = int(snap.waiting_total == snap.raw_waiting_total)
185+
snap.ok_active = int(snap.active == snap.raw_active)
186+
snap.ok_delayed = int(snap.delayed == snap.raw_delayed)
187+
snap.ok_failed = int(snap.failed == snap.raw_failed)
188+
snap.ok_completed_kept = int(snap.completed_kept == snap.raw_completed_kept)
189+
snap.ok_groups_ready = int(snap.groups_ready == snap.raw_groups_ready)
190+
191+
return snap
192+
193+
def _ensure_csv(self) -> None:
194+
if not self.csv_path or self._csv_writer is not None:
195+
return
196+
path = Path(self.csv_path)
197+
path.parent.mkdir(parents=True, exist_ok=True)
198+
exists = path.exists() and path.stat().st_size > 0
199+
self._csv_file = path.open("a", newline="", encoding="utf-8")
200+
self._csv_writer = csv.DictWriter(self._csv_file, fieldnames=list(QueueSnapshot.__dataclass_fields__.keys()))
201+
if not exists:
202+
self._csv_writer.writeheader()
203+
self._csv_file.flush()
204+
205+
def write_csv(self, snaps: Iterable[QueueSnapshot]) -> None:
206+
if not self.csv_path:
207+
return
208+
self._ensure_csv()
209+
assert self._csv_writer is not None
210+
for snap in snaps:
211+
self._csv_writer.writerow(snap.__dict__)
212+
assert self._csv_file is not None
213+
self._csv_file.flush()
214+
215+
def print_console(self, snaps: list[QueueSnapshot]) -> None:
216+
if not snaps:
217+
print(f"[{int(time.time())}] no queues discovered")
218+
return
219+
220+
headers = ["queue", "paused", "wait", "gwait", "wtotal", "active", "delayed", "failed", "done", "gready"]
221+
rows = []
222+
for s in snaps:
223+
rows.append([
224+
s.queue,
225+
str(s.paused),
226+
str(s.waiting),
227+
str(s.group_waiting),
228+
str(s.waiting_total),
229+
str(s.active),
230+
str(s.delayed),
231+
str(s.failed),
232+
str(s.completed_kept),
233+
str(s.groups_ready),
234+
])
235+
236+
widths = [len(h) for h in headers]
237+
for row in rows:
238+
for i, cell in enumerate(row):
239+
widths[i] = max(widths[i], len(cell))
240+
241+
def fmt(row: list[str]) -> str:
242+
return " ".join(cell.ljust(widths[i]) for i, cell in enumerate(row))
243+
244+
print()
245+
print(fmt(headers))
246+
print(fmt(["-" * w for w in widths]))
247+
for row in rows:
248+
print(fmt(row))
249+
250+
if self.raw_verify:
251+
mismatches = []
252+
for s in snaps:
253+
bad = []
254+
if s.ok_waiting == 0:
255+
bad.append("waiting")
256+
if s.ok_group_waiting == 0:
257+
bad.append("group_waiting")
258+
if s.ok_waiting_total == 0:
259+
bad.append("waiting_total")
260+
if s.ok_active == 0:
261+
bad.append("active")
262+
if s.ok_delayed == 0:
263+
bad.append("delayed")
264+
if s.ok_failed == 0:
265+
bad.append("failed")
266+
if s.ok_completed_kept == 0:
267+
bad.append("completed_kept")
268+
if s.ok_groups_ready == 0:
269+
bad.append("groups_ready")
270+
if bad:
271+
mismatches.append(f"{s.queue}: {', '.join(bad)}")
272+
if mismatches:
273+
print("verify:", " | ".join(mismatches))
274+
else:
275+
print("verify: all stats match raw keys")
276+
277+
def run(self, interval_s: float, duration_s: float | None, once: bool) -> int:
278+
self.install_signal_handlers()
279+
started = time.monotonic()
280+
281+
while not self._stop:
282+
queues = self._discover_queues()
283+
snaps = [self.snapshot_queue(q) for q in queues]
284+
self.write_csv(snaps)
285+
286+
if self._loop_n % self.console_every == 0:
287+
self.print_console(snaps)
288+
289+
self._loop_n += 1
290+
291+
if once:
292+
break
293+
if duration_s is not None and (time.monotonic() - started) >= duration_s:
294+
break
295+
296+
time.sleep(interval_s)
297+
298+
if self._csv_file is not None:
299+
self._csv_file.close()
300+
return 0
301+
302+
303+
def build_parser() -> argparse.ArgumentParser:
304+
p = argparse.ArgumentParser(description="Observe OmniQ queue stats and optional raw Redis validation.")
305+
p.add_argument("--redis-url", default=os.getenv("REDIS_URL", "redis://omniq-redis:6379/0"))
306+
p.add_argument("--queues", default="", help="Comma-separated queue names. Empty means discover from omniq:queues.")
307+
p.add_argument("--interval", type=float, default=1.0, help="Polling interval in seconds.")
308+
p.add_argument("--duration", type=float, default=None, help="Optional total run duration in seconds.")
309+
p.add_argument("--csv", default="", help="Optional CSV output path.")
310+
p.add_argument("--raw-verify", action="store_true", help="Compare monitoring stats against transactional keys.")
311+
p.add_argument("--console-every", type=int, default=1, help="Print every N loops.")
312+
p.add_argument("--once", action="store_true", help="Run a single snapshot and exit.")
313+
return p
314+
315+
316+
def main() -> int:
317+
args = build_parser().parse_args()
318+
try:
319+
client = redis.Redis.from_url(args.redis_url, decode_responses=False)
320+
client.ping()
321+
except Exception as exc:
322+
print(f"Redis connection failed: {exc}", file=sys.stderr)
323+
return 2
324+
325+
queues = [q.strip() for q in args.queues.split(",") if q.strip()] or None
326+
327+
observer = Observer(
328+
redis_client=client,
329+
queues=queues,
330+
raw_verify=args.raw_verify,
331+
csv_path=args.csv or None,
332+
console_every=args.console_every,
333+
)
334+
return observer.run(
335+
interval_s=max(0.05, args.interval),
336+
duration_s=args.duration,
337+
once=args.once,
338+
)
339+
340+
341+
if __name__ == "__main__":
342+
raise SystemExit(main())

0 commit comments

Comments
 (0)