Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion packages/bridge-redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,20 @@ export class RedisPlugin implements BackendPlugin {
this.readers.push(reader);
const key = this.key(topic);

// Capture the stream tail *before* subscribe() resolves, so a post() (XADD) racing in right
// after can't be missed. Starting the read loop at '$' is unsafe: '$' only resolves to "the
// last id" when the first blocking XREAD actually registers server-side, and subscribe()
// returns without awaiting that read (`void loop()` below). A message added in that window
// gets an id below the resolved '$' and is dropped forever. A concrete id has no such gap —
// XREAD returns everything strictly after it, including messages added during startup.
let lastId = '0';
try {
lastId = (await reader.xInfoStream(key)).lastGeneratedId;
} catch {
// stream doesn't exist yet → no history to skip; '0' delivers everything from here on
}

const loop = async (): Promise<void> => {
let lastId = '$';
while (!this.stopped) {
let res:
| Array<{ name: string; messages: Array<{ id: string; message: Record<string, string> }> }>
Expand Down
Loading