diff --git a/packages/bridge-redis/src/index.ts b/packages/bridge-redis/src/index.ts index c4f5dfd..43008da 100644 --- a/packages/bridge-redis/src/index.ts +++ b/packages/bridge-redis/src/index.ts @@ -134,8 +134,20 @@ export class RedisPlugin implements BackendPlugin { this.readers.push(reader); const key = this.key(topic); + // Capture the stream tail *before* subscribe() resolves, so a post() (XADD) racing in right + // after can't be missed. Starting the read loop at '$' is unsafe: '$' only resolves to "the + // last id" when the first blocking XREAD actually registers server-side, and subscribe() + // returns without awaiting that read (`void loop()` below). A message added in that window + // gets an id below the resolved '$' and is dropped forever. A concrete id has no such gap — + // XREAD returns everything strictly after it, including messages added during startup. + let lastId = '0'; + try { + lastId = (await reader.xInfoStream(key)).lastGeneratedId; + } catch { + // stream doesn't exist yet → no history to skip; '0' delivers everything from here on + } + const loop = async (): Promise => { - let lastId = '$'; while (!this.stopped) { let res: | Array<{ name: string; messages: Array<{ id: string; message: Record }> }>