Skip to content

Commit 2d185ab

Browse files
d-csclaude
andcommitted
feat(redis-worker,webapp): observability index for in-flight DRAINING entries
Adds a Redis sorted set `mollifier:draining` mirroring entries currently in DRAINING state (popped by the drainer, not yet acked/failed/requeued), scored by pop wall-clock millis. Maintained atomically with the existing per-entry status transitions: - popAndMarkDraining → ZADD score=now-ms - ackMollifierEntry → ZREM - failMollifierEntry → ZREM - requeueMollifierEntry → ZREM Each pre-existing Lua picks up one extra Redis op; ack/fail also gain a runId arg so they can ZREM without a hash read. Buffer exposes: - getDrainingCount(): ZCARD — gauge value - listStaleDraining(olderThanMs, limit): ZRANGEBYSCORE — forensics after an ECS OOM ("which entries were stranded?") NOT load-bearing for correctness — per-entry hash still carries status, stale-sweep still scans queue LISTs. The set is a fast top-level index so a wiped/out-of-date set just over-reports the gauge; recovery paths are untouched. A test pins this graceful-degradation invariant. Wires `mollifier.draining.current` ObservableGauge polled every 15s on the drainer worker pods. unref'd setInterval so it can't block graceful shutdown; idempotent under dev hot-reload. Test seam exported for unit testing without spinning a real OTel meter. Tests: - 7 redisTest cases in buffer.test.ts (lifecycle on every Lua boundary, requeue-and-repop score replacement, listStaleDraining cutoff/limit, graceful-degradation when set is wiped) - 6 unit tests in webapp for the gauge poller (eager fire, cadence, null buffer no-op, transient-error survives, idempotent start, stop halts loop) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 1ce477c commit 2d185ab

6 files changed

Lines changed: 557 additions & 4 deletions

File tree

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { logger } from "~/services/logger.server";
2+
import { getMollifierBuffer } from "./mollifierBuffer.server";
3+
import { reportDrainingCount } from "./mollifierTelemetry.server";
4+
5+
// How often we ZCARD the draining-tracker set. Each poll is a single
6+
// O(1) Redis call, so cadence is bounded by "how fresh do we want the
7+
// gauge?" rather than cost. 15s gives a tight-enough window to spot a
8+
// brief OOM-induced spike without burning RTTs, and lines up well with
9+
// typical Prometheus scrape intervals.
10+
const POLL_INTERVAL_MS = 15_000;
11+
12+
let intervalHandle: ReturnType<typeof setInterval> | null = null;
13+
14+
// Polls `mollifier:draining` cardinality on an interval and feeds the
15+
// gauge in `mollifierTelemetry.server.ts`. Started from the drainer
16+
// worker bootstrap (alongside `drainer.start()`) so it runs on the same
17+
// pods that actually pop/ack entries — observability is colocated with
18+
// the lifecycle.
19+
//
20+
// Idempotent: a second call is a no-op (Remix dev hot-reload re-runs
21+
// the bootstrap; the existing interval keeps ticking).
22+
export function startMollifierDrainingGauge(opts: {
23+
intervalMs?: number;
24+
getBuffer?: typeof getMollifierBuffer;
25+
} = {}): void {
26+
if (intervalHandle !== null) return;
27+
28+
const intervalMs = opts.intervalMs ?? POLL_INTERVAL_MS;
29+
const getBuffer = opts.getBuffer ?? getMollifierBuffer;
30+
31+
// Fire one poll immediately so the gauge populates before the first
32+
// scrape rather than reading 0 for a full interval after boot.
33+
const tick = async () => {
34+
const buffer = getBuffer();
35+
if (!buffer) return;
36+
try {
37+
const count = await buffer.getDrainingCount();
38+
reportDrainingCount(count);
39+
} catch (err) {
40+
// Transient Redis blip — don't tank the loop, just leave the
41+
// gauge at its last-known value. A sustained Redis outage will
42+
// surface via the drainer's own alerts long before this gauge
43+
// staleness becomes a primary signal.
44+
logger.warn("Mollifier draining gauge poll failed; keeping previous value", { err });
45+
}
46+
};
47+
48+
void tick();
49+
// unref so the interval doesn't keep the process alive past
50+
// graceful shutdown — the gauge is best-effort, not a flush boundary.
51+
intervalHandle = setInterval(() => {
52+
void tick();
53+
}, intervalMs);
54+
intervalHandle.unref?.();
55+
}
56+
57+
// Test seam. Production code never calls this; lifecycle is implicitly
58+
// process-end.
59+
export function stopMollifierDrainingGauge(): void {
60+
if (intervalHandle === null) return;
61+
clearInterval(intervalHandle);
62+
intervalHandle = null;
63+
}

apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,39 @@ meter.addBatchObservableCallback(
9090
[staleEntriesGauge],
9191
);
9292

93+
// Observability gauge for entries currently in DRAINING state — popped
94+
// by the drainer but not yet acked/failed/requeued. Backed by the
95+
// `mollifier:draining` ZSET (see `MollifierBuffer.getDrainingCount`)
96+
// and polled by the loop in `mollifierDrainingGaugeLoop.server.ts`.
97+
//
98+
// Useful for:
99+
// - "Is anything mid-drain right now?" panels
100+
// - Post-crash forensics ("how many entries got stranded by that ECS OOM?")
101+
// - Alerting: a sustained non-zero with no drainer progress is a stall
102+
//
103+
// No `envId` attribute — same high-cardinality constraint as the other
104+
// mollifier gauges. The per-entry hash carries env/org for drill-down.
105+
export const drainingCountGauge = meter.createObservableGauge(
106+
"mollifier.draining.current",
107+
{
108+
description:
109+
"Mollifier buffer entries currently in DRAINING state (popped but not yet acked/failed/requeued)",
110+
},
111+
);
112+
113+
let latestDrainingCount = 0;
114+
115+
export function reportDrainingCount(count: number): void {
116+
latestDrainingCount = count;
117+
}
118+
119+
meter.addBatchObservableCallback(
120+
(result) => {
121+
result.observe(drainingCountGauge, latestDrainingCount);
122+
},
123+
[drainingCountGauge],
124+
);
125+
93126
// Electric SQL's shape-stream protocol adds a `handle=` query param on
94127
// every reconnect after the initial GET. Gating the realtime-buffered
95128
// log/counter on its absence keeps the signal at one tick per

apps/webapp/app/v3/mollifierDrainerWorker.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
getMollifierDrainer,
66
MollifierConfigurationError,
77
} from "./mollifier/mollifierDrainer.server";
8+
import { startMollifierDrainingGauge } from "./mollifier/mollifierDrainingGauge.server";
89

910
declare global {
1011
// eslint-disable-next-line no-var
@@ -92,6 +93,12 @@ export function initMollifierDrainerWorker(
9293
signalsEmitter.on("SIGINT", stopDrainer);
9394
global.__mollifierShutdownRegistered__ = true;
9495
drainer.start();
96+
// Spin up the observability-only gauge poller for the
97+
// `mollifier:draining` ZSET cardinality. Colocated with the
98+
// drainer because that's the loop creating the DRAINING entries
99+
// — same pod, same Redis client lifecycle. Idempotent + unref'd
100+
// so it's safe under dev hot-reload and doesn't block shutdown.
101+
startMollifierDrainingGauge();
95102
}
96103
} catch (error) {
97104
// Deterministic misconfig (shutdown-timeout vs GRACEFUL_SHUTDOWN_TIMEOUT,
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
2+
3+
// Same defensive mocks as mollifierDrainerWorker.test.ts: importing
4+
// the gauge module transitively loads telemetry → meter → OTel
5+
// initialisation, plus the buffer singleton's runtime resolution.
6+
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
7+
vi.mock("~/services/logger.server", () => ({
8+
logger: { warn: vi.fn(), error: vi.fn(), info: vi.fn(), debug: vi.fn() },
9+
}));
10+
11+
const reportDrainingCount = vi.fn();
12+
vi.mock("~/v3/mollifier/mollifierTelemetry.server", () => ({
13+
reportDrainingCount: (count: number) => reportDrainingCount(count),
14+
}));
15+
16+
import {
17+
startMollifierDrainingGauge,
18+
stopMollifierDrainingGauge,
19+
} from "~/v3/mollifier/mollifierDrainingGauge.server";
20+
21+
// The gauge poller reads `mollifier:draining` cardinality on a cadence
22+
// and forwards it to `reportDrainingCount`. These tests pin the
23+
// observable contract: the gauge value is the buffer's count, transient
24+
// errors keep the last value, and the loop never blocks the main thread
25+
// (unref'd interval — verified implicitly because Vitest exits cleanly).
26+
describe("startMollifierDrainingGauge", () => {
27+
beforeEach(() => {
28+
reportDrainingCount.mockReset();
29+
stopMollifierDrainingGauge();
30+
});
31+
32+
afterEach(() => {
33+
stopMollifierDrainingGauge();
34+
});
35+
36+
it("fires an immediate poll on start so the gauge populates before the first scrape", async () => {
37+
const buffer = { getDrainingCount: vi.fn().mockResolvedValue(7) } as any;
38+
startMollifierDrainingGauge({
39+
intervalMs: 100_000, // long — we're checking the immediate fire, not the interval
40+
getBuffer: () => buffer,
41+
});
42+
43+
// Wait one microtask tick so the eager poll resolves.
44+
await new Promise((r) => setImmediate(r));
45+
expect(reportDrainingCount).toHaveBeenCalledWith(7);
46+
expect(buffer.getDrainingCount).toHaveBeenCalledTimes(1);
47+
});
48+
49+
it("polls on the configured cadence", async () => {
50+
const buffer = { getDrainingCount: vi.fn().mockResolvedValue(3) } as any;
51+
startMollifierDrainingGauge({
52+
intervalMs: 20,
53+
getBuffer: () => buffer,
54+
});
55+
56+
// Eager tick + at least one interval tick.
57+
await new Promise((r) => setTimeout(r, 80));
58+
expect(buffer.getDrainingCount.mock.calls.length).toBeGreaterThanOrEqual(2);
59+
expect(reportDrainingCount).toHaveBeenCalledWith(3);
60+
});
61+
62+
it("no-ops when the buffer singleton returns null (mollifier disabled)", async () => {
63+
startMollifierDrainingGauge({
64+
intervalMs: 20,
65+
getBuffer: () => null,
66+
});
67+
await new Promise((r) => setTimeout(r, 60));
68+
expect(reportDrainingCount).not.toHaveBeenCalled();
69+
});
70+
71+
it("swallows a transient ZCARD failure so the loop keeps running", async () => {
72+
let calls = 0;
73+
const buffer = {
74+
getDrainingCount: vi.fn(async () => {
75+
calls += 1;
76+
if (calls === 1) throw new Error("transient redis blip");
77+
return 4;
78+
}),
79+
} as any;
80+
startMollifierDrainingGauge({
81+
intervalMs: 20,
82+
getBuffer: () => buffer,
83+
});
84+
85+
await new Promise((r) => setTimeout(r, 80));
86+
// First call threw → no report. Second call succeeded → reported.
87+
// The gauge keeps its previous value (stale-but-non-zero) between
88+
// the failed poll and the next successful one — better than
89+
// crashing the loop and going silent forever.
90+
expect(reportDrainingCount).toHaveBeenCalledWith(4);
91+
expect(buffer.getDrainingCount.mock.calls.length).toBeGreaterThanOrEqual(2);
92+
});
93+
94+
it("is idempotent: a second start does not spawn a parallel loop", async () => {
95+
const buffer = { getDrainingCount: vi.fn().mockResolvedValue(1) } as any;
96+
startMollifierDrainingGauge({ intervalMs: 25, getBuffer: () => buffer });
97+
startMollifierDrainingGauge({ intervalMs: 25, getBuffer: () => buffer });
98+
99+
await new Promise((r) => setTimeout(r, 90));
100+
// One eager + a small number of interval ticks. Doubled-loop would
101+
// produce ~2× the calls in the same window. Upper bound is generous
102+
// for CI jitter; the property is "single loop", not exact count.
103+
expect(buffer.getDrainingCount.mock.calls.length).toBeLessThan(8);
104+
});
105+
106+
it("stop halts the polling loop", async () => {
107+
const buffer = { getDrainingCount: vi.fn().mockResolvedValue(2) } as any;
108+
startMollifierDrainingGauge({ intervalMs: 20, getBuffer: () => buffer });
109+
await new Promise((r) => setTimeout(r, 50));
110+
const callsAtStop = buffer.getDrainingCount.mock.calls.length;
111+
stopMollifierDrainingGauge();
112+
113+
await new Promise((r) => setTimeout(r, 80));
114+
expect(buffer.getDrainingCount.mock.calls.length).toBe(callsAtStop);
115+
});
116+
});

0 commit comments

Comments
 (0)