Skip to content

Commit 1be3361

Browse files
d-csclaude
andcommitted
fix(webapp): propagate Redis read failures from mollifier stale-sweep slice
`MollifierStaleSweepState.readOrgListSlice` was logging-and-returning `{ orgs: [], total: 0 }` on pipeline-abort or per-result Redis errors. The caller (`runStaleSweepOnce`) treated that as a clean empty cycle: wrote cursor=0, reconciled visited envs against the empty result, and cleared the stale-entry gauge — silencing the alerts the sweep exists to raise. Re-throw the underlying error instead. The interval wrapper's catch logs `stale_sweep.failed` for the failed tick; durable cursor + counts hash stay untouched so the gauge keeps reporting its last-known value until a healthy tick repopulates it. New unit test on the caller contract pins this: error propagates, cursor stays at its seeded value, counts hash retains the seeded env, no snapshot is reported. Separately documents a known limitation in the .server-changes entry: the sweep runs per-webapp-instance, so its stale-entry counter multiplies by N webapps in HA until a distributed lease lands as a follow-up. The system is HA-safe (Redis ops are atomic, no torn state); only the metric output is inflated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 98b16b3 commit 1be3361

3 files changed

Lines changed: 71 additions & 3 deletions

File tree

.server-changes/mollifier-drainer-replay.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ area: webapp
33
type: feature
44
---
55

6-
Mollifier drainer replay: replay buffered entries into `engine.trigger`, stale-entry sweep, a drainer-health gauge, and run-engine cancelled/failed run APIs.
6+
Mollifier drainer replay: replay buffered entries into `engine.trigger`, stale-entry sweep, a drainer-health gauge, and run-engine cancelled/failed run APIs. Known limitation: stale-sweep runs per-webapp instance, so stale-entry counter metrics multiply by N webapps in HA until a distributed lease lands as follow-up.

apps/webapp/app/v3/mollifier/mollifierStaleSweepState.server.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,27 @@ export class MollifierStaleSweepState implements StaleSweepStateStore {
101101
pipeline.lrange(ORG_LIST_KEY, start, start + count - 1);
102102
pipeline.llen(ORG_LIST_KEY);
103103
const results = await pipeline.exec();
104-
if (!results) return { orgs: [], total: 0 };
104+
// `pipeline.exec()` returning null is the abort-on-broken-pipe path.
105+
// Surface it as a thrown error — the previous `return { orgs: [], total: 0 }`
106+
// looked indistinguishable from a genuinely empty org list to the
107+
// caller (`runStaleSweepOnce`), which then wrote cursor=0, reconciled
108+
// visited envs against the empty result, and cleared the stale-entry
109+
// gauge. That hid real Redis problems and silenced the alerts the
110+
// sweep exists to raise.
111+
if (!results) {
112+
throw new Error("MollifierStaleSweepState.readOrgListSlice: pipeline.exec returned null");
113+
}
105114
const [lrangeErr, lrangeRes] = results[0] as [Error | null, string[] | null];
106115
const [llenErr, llenRes] = results[1] as [Error | null, number | null];
107116
if (lrangeErr || llenErr) {
108117
this.logger.error("MollifierStaleSweepState.readOrgListSlice failed", {
109118
lrangeErr: lrangeErr?.message,
110119
llenErr: llenErr?.message,
111120
});
112-
return { orgs: [], total: 0 };
121+
// Same reasoning as the null-result path above — propagate the
122+
// failure so the sweep's interval wrapper records a failed cycle
123+
// and the durable cursor / counts hash stay untouched.
124+
throw lrangeErr ?? llenErr ?? new Error("MollifierStaleSweepState.readOrgListSlice failed");
113125
}
114126
return { orgs: lrangeRes ?? [], total: llenRes ?? 0 };
115127
}

apps/webapp/test/mollifierStaleSweep.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,62 @@ describe("runStaleSweepOnce — unit", () => {
117117
expect(snapshots).toHaveLength(1);
118118
expect(snapshots[0].size).toBe(0);
119119
});
120+
121+
it("surfaces readOrgListSlice failures and leaves durable state untouched", async () => {
122+
// Regression: previously a Redis read failure inside
123+
// `readOrgListSlice` returned `{ orgs: [], total: 0 }` and the
124+
// sweep treated that as a clean empty cycle — writing cursor=0,
125+
// reconciling visited envs against the empty result, and CLEARING
126+
// the stale-entry gauge. That silenced the very alerts the sweep
127+
// exists to raise. The fix re-throws; the caller (this function
128+
// and the interval wrapper above it) must NOT mutate cursor or
129+
// counts when readOrgListSlice fails.
130+
const state = makeFakeState();
131+
// Seed durable state so we can assert it isn't touched on failure.
132+
await state.writeCursor(42);
133+
await state.setEnvStaleCount("env_seed", 7);
134+
await state.rebuildOrgList(["org_pre"]);
135+
// Inject a failure on the very next slice read.
136+
const readErr = new Error("Redis read failed");
137+
let readAttempts = 0;
138+
const failingState = {
139+
...state,
140+
readOrgListSlice: async (start: number, count: number) => {
141+
readAttempts += 1;
142+
throw readErr;
143+
},
144+
};
145+
const spies = spyDeps();
146+
const buffer = {
147+
listOrgs: async () => ["org_pre"],
148+
listEnvsForOrg: async () => [],
149+
listEntriesForEnv: async () => [],
150+
} as unknown as MollifierBuffer;
151+
152+
await expect(
153+
runStaleSweepOnce(
154+
{ staleThresholdMs: 60_000, maxOrgsPerPass: 10 },
155+
{
156+
...spies.deps,
157+
state: failingState,
158+
getBuffer: () => buffer,
159+
now: () => Date.now(),
160+
},
161+
),
162+
).rejects.toThrow("Redis read failed");
163+
164+
expect(readAttempts).toBe(1);
165+
// Cursor untouched (still the seeded 42, not reset to 0).
166+
expect(await state.readCursor()).toBe(42);
167+
// Counts hash untouched — the seeded env's count survives the
168+
// failed cycle so the gauge keeps reporting its last-known value.
169+
const counts = await state.readAllEnvStaleCounts();
170+
expect(counts.get("env_seed")).toBe(7);
171+
// No snapshot was reported because the function threw before
172+
// reaching reportStaleEntrySnapshot.
173+
expect(spies.snapshots).toHaveLength(0);
174+
expect(spies.staleEntryCount).toBe(0);
175+
});
120176
});
121177

122178
describe("runStaleSweepOnce — testcontainers", () => {

0 commit comments

Comments
 (0)