Skip to content

Commit 16840ff

Browse files
kixelatedclaude
andauthored
Add bandwidth estimation for adaptive bitrate control (#1208)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 61b2068 commit 16840ff

29 files changed

Lines changed: 518 additions & 185 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ match version {
111111
- **Formatting/Linting**: Biome for JS/TS formatting and linting
112112
- **UI**: Solid.js for Web Components in `@moq/watch/ui` and `@moq/publish/ui`
113113
- **Builds**: Nix flake for reproducible builds (optional)
114+
- **JS async patterns**: Use `Effect.interval()`, `Effect.timer()`, and `Effect.event()` helpers from `@moq/signals` instead of raw `setInterval`, `setTimeout`, `addEventListener`. These handle cleanup automatically when the Effect is closed.
114115

115116
## Testing Approach
116117

js/lite/src/bandwidth.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { Signal } from "@moq/signals";
2+
3+
/**
4+
* A bandwidth estimate in bits per second, or undefined if unknown.
5+
*
6+
* This is a Signal that can be read synchronously via `peek()`,
7+
* observed reactively via `effect.get()`, or updated via `set()`.
8+
*/
9+
export type Bandwidth = Signal<number | undefined>;
10+
11+
/** Create a new bandwidth signal. */
12+
export function createBandwidth(): Bandwidth {
13+
return new Signal<number | undefined>(undefined);
14+
}

js/lite/src/connection/established.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Announced } from "../announced.ts";
2+
import type { Bandwidth } from "../bandwidth.ts";
23
import type { Broadcast } from "../broadcast.ts";
34
import type * as Path from "../path.ts";
45

@@ -7,6 +8,12 @@ export interface Established {
78
readonly url: URL;
89
readonly version: string;
910

11+
/** Estimated send bitrate from the congestion controller (if supported). */
12+
readonly sendBandwidth?: Bandwidth;
13+
14+
/** Estimated receive bitrate from PROBE (moq-lite-03+ only). */
15+
readonly recvBandwidth?: Bandwidth;
16+
1017
announced(prefix?: Path.Valid): Announced;
1118
publish(path: Path.Valid, broadcast: Broadcast): void;
1219
consume(broadcast: Path.Valid): Broadcast;

js/lite/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * as Signals from "@moq/signals";
22
export * from "./announced.ts";
3+
export * from "./bandwidth.ts";
34
export * from "./broadcast.ts";
45
export * as Connection from "./connection/index.ts";
56
export * from "./group.ts";

js/lite/src/lite/connection.ts

Lines changed: 57 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Announced } from "../announced.ts";
2+
import { type Bandwidth, createBandwidth } from "../bandwidth.ts";
23
import type { Broadcast } from "../broadcast.ts";
34
import type { Established } from "../connection/established.ts";
45
import * as Path from "../path.ts";
@@ -10,7 +11,9 @@ import { SessionInfo } from "./session.ts";
1011
import { StreamId } from "./stream.ts";
1112
import { Subscribe } from "./subscribe.ts";
1213
import { Subscriber } from "./subscriber.ts";
13-
import { type Version, versionName } from "./version.ts";
14+
import { Version, versionName } from "./version.ts";
15+
16+
const SEND_BW_POLL_INTERVAL = 100; // ms
1417

1518
/**
1619
* Represents a connection to a MoQ server.
@@ -39,8 +42,11 @@ export class Connection implements Established {
3942
// Module for distributing tracks.
4043
#subscriber: Subscriber;
4144

42-
// Just to avoid logging when `close()` is called.
43-
#closed = false;
45+
/** Estimated send bitrate from the congestion controller. */
46+
readonly sendBandwidth?: Bandwidth;
47+
48+
/** Estimated receive bitrate from PROBE (moq-lite-03+ only). */
49+
readonly recvBandwidth?: Bandwidth;
4450

4551
/**
4652
* Creates a new Connection instance.
@@ -57,8 +63,19 @@ export class Connection implements Established {
5763
this.version = versionName(version);
5864
this.#version = version;
5965

66+
// Send bandwidth is version-agnostic: depends on browser/QUIC support.
67+
const hasGetStats = typeof (quic as unknown as { getStats?: unknown }).getStats === "function";
68+
if (hasGetStats) {
69+
this.sendBandwidth = createBandwidth();
70+
}
71+
72+
// Recv bandwidth requires PROBE support (not available in older drafts).
73+
if (version !== Version.DRAFT_01 && version !== Version.DRAFT_02) {
74+
this.recvBandwidth = createBandwidth();
75+
}
76+
6077
this.#publisher = new Publisher(this.#quic, this.#version);
61-
this.#subscriber = new Subscriber(this.#quic, this.#version);
78+
this.#subscriber = new Subscriber(this.#quic, this.#version, this.recvBandwidth);
6279

6380
this.#run();
6481
}
@@ -67,9 +84,6 @@ export class Connection implements Established {
6784
* Closes the connection.
6885
*/
6986
close() {
70-
if (this.#closed) return;
71-
72-
this.#closed = true;
7387
this.#publisher.close();
7488
this.#subscriber.close();
7589

@@ -82,62 +96,46 @@ export class Connection implements Established {
8296
}
8397

8498
async #run(): Promise<void> {
85-
const session = this.#runSession();
86-
const bidis = this.#runBidis();
87-
const unis = this.#runUnis();
99+
const tasks: Promise<void>[] = [this.#runSession(), this.#runBidis(), this.#runUnis()];
100+
101+
if (this.sendBandwidth) {
102+
tasks.push(this.#runSendBandwidth(this.sendBandwidth));
103+
}
104+
105+
if (this.recvBandwidth) {
106+
tasks.push(this.#subscriber.runProbe());
107+
}
88108

89109
try {
90-
await Promise.all([session, bidis, unis]);
110+
await Promise.all(tasks);
91111
} catch (err) {
92-
if (!this.#closed) {
93-
console.error("fatal error running connection", err);
94-
}
112+
console.error("fatal error running connection", err);
95113
} finally {
96114
this.close();
97115
}
98116
}
99117

100-
/**
101-
* Publishes a broadcast to the connection.
102-
* @param name - The broadcast path to publish
103-
* @param broadcast - The broadcast to publish
104-
*/
105118
publish(path: Path.Valid, broadcast: Broadcast) {
106119
this.#publisher.publish(path, broadcast);
107120
}
108121

109-
/**
110-
* Gets the next announced broadcast.
111-
*/
112122
announced(prefix = Path.empty()): Announced {
113123
return this.#subscriber.announced(prefix);
114124
}
115125

116-
/**
117-
* Consumes a broadcast from the connection.
118-
*
119-
* @remarks
120-
* If the broadcast is not found, a "not found" error will be thrown when requesting any tracks.
121-
*
122-
* @param broadcast - The path of the broadcast to consume
123-
* @returns A Broadcast instance
124-
*/
125126
consume(broadcast: Path.Valid): Broadcast {
126127
return this.#subscriber.consume(broadcast);
127128
}
128129

129130
async #runSession() {
130131
if (!this.#session) {
131-
// moq-lite draft-03 doesn't use a session stream.
132132
return;
133133
}
134134

135135
try {
136-
// Receive messages until the connection is closed.
137136
for (;;) {
138137
const msg = await SessionInfo.decodeMaybe(this.#session.reader, this.#version);
139138
if (!msg) break;
140-
// TODO use the session info
141139
}
142140
} finally {
143141
console.debug("session stream closed");
@@ -147,9 +145,7 @@ export class Connection implements Established {
147145
async #runBidis() {
148146
for (;;) {
149147
const stream = await Stream.accept(this.#quic);
150-
if (!stream) {
151-
break;
152-
}
148+
if (!stream) break;
153149

154150
this.#runBidi(stream)
155151
.catch((err: unknown) => {
@@ -169,14 +165,11 @@ export class Connection implements Established {
169165
} else if (typ === StreamId.Announce) {
170166
const msg = await AnnounceInterest.decode(stream.reader);
171167
await this.#publisher.runAnnounce(msg, stream);
172-
return;
173168
} else if (typ === StreamId.Subscribe) {
174169
const msg = await Subscribe.decode(stream.reader, this.#version);
175170
await this.#publisher.runSubscribe(msg, stream);
176-
return;
177171
} else if (typ === StreamId.Probe) {
178172
await this.#publisher.runProbe(stream);
179-
return;
180173
} else {
181174
throw new Error(`unknown stream type: ${typ.toString()}`);
182175
}
@@ -187,9 +180,7 @@ export class Connection implements Established {
187180

188181
for (;;) {
189182
const stream = await readers.next();
190-
if (!stream) {
191-
break;
192-
}
183+
if (!stream) break;
193184

194185
this.#runUni(stream)
195186
.then(() => {
@@ -211,10 +202,29 @@ export class Connection implements Established {
211202
}
212203
}
213204

214-
/**
215-
* Returns a promise that resolves when the connection is closed.
216-
* @returns A promise that resolves when closed
217-
*/
205+
async #runSendBandwidth(bandwidth: Bandwidth): Promise<void> {
206+
const quic = this.#quic as unknown as {
207+
getStats: () => Promise<{ estimatedSendRate: number | null }>;
208+
};
209+
210+
return new Promise<void>((resolve) => {
211+
const id = setInterval(async () => {
212+
try {
213+
const stats = await quic.getStats();
214+
bandwidth.set(stats.estimatedSendRate ?? undefined);
215+
} catch {
216+
clearInterval(id);
217+
resolve();
218+
}
219+
}, SEND_BW_POLL_INTERVAL);
220+
221+
void this.closed.then(() => {
222+
clearInterval(id);
223+
resolve();
224+
});
225+
});
226+
}
227+
218228
get closed(): Promise<void> {
219229
return this.#quic.closed.then(() => undefined);
220230
}

js/lite/src/lite/publisher.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ export class Publisher {
270270
getStats?: () => Promise<{ estimatedSendRate: number | null }>;
271271
};
272272
if (!quic.getStats) {
273-
stream.abort(new Error("stats not supported"));
273+
// Close gracefully instead of aborting to avoid killing the session.
274+
stream.close();
274275
return;
275276
}
276277

js/lite/src/lite/subscriber.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Announced } from "../announced.ts";
2+
import type { Bandwidth } from "../bandwidth.ts";
23
import { Broadcast, type TrackRequest } from "../broadcast.ts";
34
import { Group } from "../group.ts";
45
import * as Path from "../path.ts";
@@ -7,6 +8,7 @@ import type { Track } from "../track.ts";
78
import { error } from "../util/error.ts";
89
import { Announce, AnnounceInit, AnnounceInterest } from "./announce.ts";
910
import type { Group as GroupMessage } from "./group.ts";
11+
import { Probe } from "./probe.ts";
1012
import { StreamId } from "./stream.ts";
1113
import { decodeSubscribeResponse, Subscribe } from "./subscribe.ts";
1214
import { Version } from "./version.ts";
@@ -26,15 +28,21 @@ export class Subscriber {
2628
#subscribes = new Map<bigint, Track>();
2729
#subscribeNext = 0n;
2830

31+
// Recv bandwidth producer (Lite03+ only).
32+
#recvBandwidth?: Bandwidth;
33+
2934
/**
3035
* Creates a new Subscriber instance.
3136
* @param quic - The WebTransport session to use
37+
* @param version - The protocol version
38+
* @param recvBandwidth - Optional bandwidth producer for PROBE
3239
*
3340
* @internal
3441
*/
35-
constructor(quic: WebTransport, version: Version) {
42+
constructor(quic: WebTransport, version: Version, recvBandwidth?: Bandwidth) {
3643
this.#quic = quic;
3744
this.version = version;
45+
this.#recvBandwidth = recvBandwidth;
3846
}
3947

4048
/**
@@ -195,6 +203,27 @@ export class Subscriber {
195203
}
196204
}
197205

206+
/**
207+
* Opens a PROBE bidi stream to receive bandwidth estimates from the publisher.
208+
* Returns immediately if recv bandwidth is not supported.
209+
* Errors are fatal and propagate to the connection.
210+
*
211+
* @internal
212+
*/
213+
async runProbe(): Promise<void> {
214+
if (!this.#recvBandwidth) return;
215+
if (this.version === Version.DRAFT_01 || this.version === Version.DRAFT_02) return;
216+
217+
const stream = await Stream.open(this.#quic);
218+
await stream.writer.u53(StreamId.Probe);
219+
220+
for (;;) {
221+
const probe = await Probe.decodeMaybe(stream.reader, this.version);
222+
if (!probe) break;
223+
this.#recvBandwidth.set(probe.bitrate);
224+
}
225+
}
226+
198227
close() {
199228
for (const track of this.#subscribes.values()) {
200229
track.close();

js/publish/src/broadcast.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export class Broadcast {
4343
this.name = Signal.from(props?.name ?? Moq.Path.empty());
4444

4545
this.audio = new Audio.Encoder(props?.audio);
46-
this.video = new Video.Root(props?.video);
46+
this.video = new Video.Root({ ...props?.video, connection: this.connection });
4747
this.location = new Location.Root(props?.location);
4848
this.chat = new Chat.Root(props?.chat);
4949
this.preview = new Preview(props?.preview);

js/publish/src/video/encoder.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,18 @@ export class Encoder {
5757
// True when the encoder is actively serving a track.
5858
active = new Signal<boolean>(false);
5959

60-
constructor(frame: Getter<VideoFrame | undefined>, source: Signal<Source | undefined>, props?: EncoderProps) {
60+
// Connection signal for reading send bandwidth.
61+
connection: Getter<Moq.Connection.Established | undefined>;
62+
63+
constructor(
64+
frame: Getter<VideoFrame | undefined>,
65+
source: Signal<Source | undefined>,
66+
connection: Getter<Moq.Connection.Established | undefined>,
67+
props?: EncoderProps,
68+
) {
6169
this.frame = frame;
6270
this.source = source;
71+
this.connection = connection;
6372
this.enabled = Signal.from(props?.enabled ?? false);
6473
this.config = Signal.from(props?.config);
6574

@@ -205,6 +214,20 @@ export class Encoder {
205214

206215
bitrate = Math.round(Math.min(bitrate, user.maxBitrate || bitrate));
207216

217+
// If no explicit maxBitrate, cap to the estimated send bandwidth (with 90% safety margin).
218+
if (!user.maxBitrate) {
219+
const conn = effect.get(this.connection);
220+
const sendBw = conn?.sendBandwidth;
221+
if (sendBw) {
222+
const estimate = effect.get(sendBw);
223+
if (estimate != null) {
224+
// Reserve ~10% for audio and protocol overhead.
225+
const cap = Math.round(estimate * 0.9);
226+
bitrate = Math.min(bitrate, cap);
227+
}
228+
}
229+
}
230+
208231
const config: VideoEncoderConfig = {
209232
codec,
210233
width: dimensions.width,

0 commit comments

Comments
 (0)