Skip to content

Commit 9c915e0

Browse files
kixelatedclaude
andauthored
Clarify group delivery semantics with recv_group and next_group_ordered (#1324)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 583ab94 commit 9c915e0

21 files changed

Lines changed: 269 additions & 47 deletions

File tree

doc/rs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
253253
let mut track = broadcast.subscribe("chat").await?;
254254

255255
// Read groups and frames
256-
while let Some(group) = track.next_group().await? {
256+
while let Some(group) = track.recv_group().await? {
257257
while let Some(frame) = group.read().await? {
258258
println!("Received: {:?}", frame);
259259
}

js/clock/src/main.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ async function subscribe(config: Config) {
147147

148148
// Handle groups and frames like the Rust implementation
149149
for (;;) {
150-
const group = await track.nextGroup();
150+
const group = await track.recvGroup();
151151
if (!group) {
152152
console.log("❌ Connection ended");
153153
break;

js/hang/src/container/legacy.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ export class Consumer {
109109
async #run() {
110110
// Start fetching groups in the background
111111
for (;;) {
112-
const consumer = await this.#track.nextGroup();
112+
const consumer = await this.#track.recvGroup();
113113
if (!consumer) break;
114114

115115
// To improve TTV, we always start with the first group.

js/lite/examples/subscribe.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ async function main() {
1212

1313
// Read data as it arrives
1414
for (;;) {
15-
const group = await track.nextGroup();
15+
const group = await track.recvGroup();
1616
if (!group) break;
1717

1818
for (;;) {

js/lite/src/ietf/publisher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ export class Publisher {
161161
// Serve track groups, racing with stream close (= Unsubscribe)
162162
const serving = (async () => {
163163
for (;;) {
164-
const group = await track.nextGroup();
164+
const group = await track.recvGroup();
165165
if (!group) return;
166166
void this.#runGroup(msg.requestId, group);
167167
}

js/lite/src/lite/publisher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ export class Publisher {
201201
async #runTrack(sub: bigint, broadcast: Path.Valid, track: Track, stream: Writer) {
202202
try {
203203
for (;;) {
204-
const next = track.nextGroup();
204+
const next = track.recvGroup();
205205
const group = await Promise.race([next, stream.closed]);
206206
if (!group) {
207207
next.then((group) => group?.close()).catch(() => {});

js/lite/src/track.test.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { expect, test } from "bun:test";
2+
import { Group } from "./group.ts";
3+
import { Track } from "./track.ts";
4+
5+
test("nextGroupOrdered skips late arrivals", async () => {
6+
const track = new Track("test");
7+
8+
track.writeGroup(new Group(5));
9+
10+
const first = await track.nextGroupOrdered();
11+
expect(first?.sequence).toBe(5);
12+
13+
// Late arrivals with sequence <= last returned are skipped.
14+
track.writeGroup(new Group(3));
15+
track.writeGroup(new Group(4));
16+
track.writeGroup(new Group(7));
17+
18+
const next = await track.nextGroupOrdered();
19+
expect(next?.sequence).toBe(7);
20+
});
21+
22+
test("nextGroupOrdered returns buffered groups in sequence", async () => {
23+
const track = new Track("test");
24+
25+
track.writeGroup(new Group(3));
26+
track.writeGroup(new Group(5));
27+
28+
expect((await track.nextGroupOrdered())?.sequence).toBe(3);
29+
expect((await track.nextGroupOrdered())?.sequence).toBe(5);
30+
});
31+
32+
test("recvGroup after nextGroupOrdered still returns late arrivals", async () => {
33+
const track = new Track("test");
34+
35+
track.writeGroup(new Group(5));
36+
37+
// Ordered returns seq 5, advancing its cursor.
38+
const ordered = await track.nextGroupOrdered();
39+
expect(ordered?.sequence).toBe(5);
40+
41+
// recvGroup is independent of the ordered cursor: a late seq 3 still surfaces.
42+
track.writeGroup(new Group(3));
43+
const recv = await track.recvGroup();
44+
expect(recv?.sequence).toBe(3);
45+
});
46+
47+
test("nextGroupOrdered returns undefined when track closes", async () => {
48+
const track = new Track("test");
49+
track.close();
50+
expect(await track.nextGroupOrdered()).toBeUndefined();
51+
});

js/lite/src/track.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export class Track {
1111

1212
state = new TrackState();
1313
#next?: number;
14+
#nextSequence = 0;
1415

1516
readonly closed: Promise<Error | undefined>;
1617

@@ -91,7 +92,14 @@ export class Track {
9192
group.close();
9293
}
9394

94-
async nextGroup(): Promise<Group | undefined> {
95+
/**
96+
* Receive the next group available on this track, in arrival order.
97+
*
98+
* Groups may arrive out of order or with gaps due to network conditions.
99+
* Use {@link nextGroupOrdered} if you need groups in sequence order,
100+
* skipping those that arrive too late.
101+
*/
102+
async recvGroup(): Promise<Group | undefined> {
95103
for (;;) {
96104
const groups = this.state.groups.peek();
97105
if (groups.length > 0) {
@@ -106,6 +114,33 @@ export class Track {
106114
}
107115
}
108116

117+
/**
118+
* @deprecated Use {@link recvGroup} for arrival order, or {@link nextGroupOrdered} for sequence order.
119+
*/
120+
async nextGroup(): Promise<Group | undefined> {
121+
return this.recvGroup();
122+
}
123+
124+
/**
125+
* Return the next group with a strictly-greater sequence number than the last returned.
126+
*
127+
* Late arrivals (with a sequence number at or below the last one returned) are silently skipped.
128+
*
129+
* NOTE: This will be renamed to `nextGroup` in the next major version.
130+
*/
131+
async nextGroupOrdered(): Promise<Group | undefined> {
132+
for (;;) {
133+
const group = await this.recvGroup();
134+
if (!group) return undefined;
135+
if (group.sequence < this.#nextSequence) {
136+
group.close();
137+
continue;
138+
}
139+
this.#nextSequence = group.sequence + 1;
140+
return group;
141+
}
142+
}
143+
109144
async readFrame(): Promise<Uint8Array | undefined> {
110145
return (await this.readFrameSequence())?.data;
111146
}

js/watch/src/audio/decoder.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ export class Decoder {
292292
// Process data segments
293293
// TODO: Use a consumer wrapper for CMAF to support latency control
294294
for (;;) {
295-
const group = await sub.nextGroup();
295+
const group = await sub.recvGroup();
296296
if (!group) break;
297297

298298
effect.spawn(async () => {

js/watch/src/video/decoder.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ class DecoderTrack {
380380
// Process data segments
381381
// TODO: Use a consumer wrapper for CMAF to support latency control
382382
for (;;) {
383-
const group = await Promise.race([sub.nextGroup(), effect.cancel]);
383+
const group = await Promise.race([sub.recvGroup(), effect.cancel]);
384384
if (!group) break;
385385

386386
effect.spawn(async () => {

0 commit comments

Comments
 (0)