From 04bbbc28dc774995329b57b6c2c382809f006460 Mon Sep 17 00:00:00 2001 From: Patrick Sharp Date: Fri, 3 Jul 2026 01:34:19 -0700 Subject: [PATCH] fix(bridge-redis): close subscribe-readiness race that could drop a live message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit subscribe() started its XREAD loop at '$' and returned without awaiting the first (blocking) read. '$' only resolves to the stream's last id when that read registers server-side, so a post() (XADD) racing in during the startup window got an id below the resolved '$' and was dropped forever — it never reached the subscriber. Surfaced as an intermittent failure of the "identical backendMsgId + cursor via live push and via catch-up" conformance test under CI load (Node 24). Fix: capture a concrete stream tail id via XINFO STREAM, awaited before subscribe() resolves, and read strictly after it. A concrete id has no gap, so a message added during startup is still delivered. Falls back to '0' when the stream doesn't exist yet (no history to skip). Verified: redis conformance 20/20. Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/bridge-redis/src/index.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/packages/bridge-redis/src/index.ts b/packages/bridge-redis/src/index.ts index c4f5dfd..43008da 100644 --- a/packages/bridge-redis/src/index.ts +++ b/packages/bridge-redis/src/index.ts @@ -134,8 +134,20 @@ export class RedisPlugin implements BackendPlugin { this.readers.push(reader); const key = this.key(topic); + // Capture the stream tail *before* subscribe() resolves, so a post() (XADD) racing in right + // after can't be missed. Starting the read loop at '$' is unsafe: '$' only resolves to "the + // last id" when the first blocking XREAD actually registers server-side, and subscribe() + // returns without awaiting that read (`void loop()` below). A message added in that window + // gets an id below the resolved '$' and is dropped forever. A concrete id has no such gap — + // XREAD returns everything strictly after it, including messages added during startup. + let lastId = '0'; + try { + lastId = (await reader.xInfoStream(key)).lastGeneratedId; + } catch { + // stream doesn't exist yet → no history to skip; '0' delivers everything from here on + } + const loop = async (): Promise => { - let lastId = '$'; while (!this.stopped) { let res: | Array<{ name: string; messages: Array<{ id: string; message: Record }> }>