Skip to content

Commit 3e82620

Browse files
committed
Add bitrate estimation API for send and receive rates
Expose BandwidthProducer/Consumer pairs on Session (Rust) and Connection (JS) for estimated send rate (from congestion controller, polled every 100ms) and receive rate (from PROBE mechanism, moq-lite-03+ only). The subscriber now initiates PROBE streams lazily when consumers exist. @moq/watch uses recv bandwidth for automatic ABR rendition selection, and @moq/publish caps encoder bitrate to the estimated send bandwidth. https://claude.ai/code/session_01C68pEkoUhQqtQALYxRajhx
1 parent de8a8e9 commit 3e82620

17 files changed

Lines changed: 483 additions & 38 deletions

File tree

js/lite/src/bandwidth.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { type Getter, Signal } from "@moq/signals";
2+
3+
/**
4+
* A bandwidth estimate that can be read synchronously or observed reactively.
5+
*
6+
* Created internally by the connection. Consumers read from it via the signal.
7+
*/
8+
export class Bandwidth {
9+
#bitrate = new Signal<number | undefined>(undefined);
10+
11+
/** Reactive signal for the current bandwidth estimate in bits per second. */
12+
readonly signal: Getter<number | undefined> = this.#bitrate;
13+
14+
/**
15+
* Update the bandwidth estimate. Called internally by the connection/subscriber.
16+
* @internal
17+
*/
18+
set(bitrate: number | undefined): void {
19+
this.#bitrate.set(bitrate);
20+
}
21+
22+
/** Get the current bandwidth estimate synchronously. */
23+
get(): number | undefined {
24+
return this.#bitrate.peek();
25+
}
26+
27+
/** Wait for the bandwidth estimate to change. */
28+
async changed(): Promise<number | undefined> {
29+
return new Promise<number | undefined>((resolve) => {
30+
this.#bitrate.changed(resolve);
31+
});
32+
}
33+
}

js/lite/src/connection/established.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Announced } from "../announced.ts";
2+
import type { Bandwidth } from "../bandwidth.ts";
23
import type { Broadcast } from "../broadcast.ts";
34
import type * as Path from "../path.ts";
45

@@ -7,6 +8,12 @@ export interface Established {
78
readonly url: URL;
89
readonly version: string;
910

11+
/** Estimated send bitrate from the congestion controller (if supported). */
12+
readonly sendBandwidth?: Bandwidth;
13+
14+
/** Estimated receive bitrate from PROBE (moq-lite-03+ only). */
15+
readonly recvBandwidth?: Bandwidth;
16+
1017
announced(prefix?: Path.Valid): Announced;
1118
publish(path: Path.Valid, broadcast: Broadcast): void;
1219
consume(broadcast: Path.Valid): Broadcast;

js/lite/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * as Signals from "@moq/signals";
22
export * from "./announced.ts";
3+
export * from "./bandwidth.ts";
34
export * from "./broadcast.ts";
45
export * as Connection from "./connection/index.ts";
56
export * from "./group.ts";

js/lite/src/lite/connection.ts

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Announced } from "../announced.ts";
2+
import { Bandwidth } from "../bandwidth.ts";
23
import type { Broadcast } from "../broadcast.ts";
34
import type { Established } from "../connection/established.ts";
45
import * as Path from "../path.ts";
@@ -10,7 +11,9 @@ import { SessionInfo } from "./session.ts";
1011
import { StreamId } from "./stream.ts";
1112
import { Subscribe } from "./subscribe.ts";
1213
import { Subscriber } from "./subscriber.ts";
13-
import { type Version, versionName } from "./version.ts";
14+
import { Version, versionName } from "./version.ts";
15+
16+
const SEND_BW_POLL_INTERVAL = 100; // ms
1417

1518
/**
1619
* Represents a connection to a MoQ server.
@@ -42,6 +45,12 @@ export class Connection implements Established {
4245
// Just to avoid logging when `close()` is called.
4346
#closed = false;
4447

48+
/** Estimated send bitrate from the congestion controller. */
49+
readonly sendBandwidth?: Bandwidth;
50+
51+
/** Estimated receive bitrate from PROBE (moq-lite-03+ only). */
52+
readonly recvBandwidth?: Bandwidth;
53+
4554
/**
4655
* Creates a new Connection instance.
4756
* @param url - The URL of the connection
@@ -57,8 +66,14 @@ export class Connection implements Established {
5766
this.version = versionName(version);
5867
this.#version = version;
5968

69+
// Set up bandwidth estimation for Lite03+.
70+
if (version === Version.DRAFT_03) {
71+
this.sendBandwidth = new Bandwidth();
72+
this.recvBandwidth = new Bandwidth();
73+
}
74+
6075
this.#publisher = new Publisher(this.#quic, this.#version);
61-
this.#subscriber = new Subscriber(this.#quic, this.#version);
76+
this.#subscriber = new Subscriber(this.#quic, this.#version, this.recvBandwidth);
6277

6378
this.#run();
6479
}
@@ -86,6 +101,11 @@ export class Connection implements Established {
86101
const bidis = this.#runBidis();
87102
const unis = this.#runUnis();
88103

104+
// Start polling send bandwidth if supported.
105+
if (this.sendBandwidth) {
106+
this.#runSendBandwidth(this.sendBandwidth);
107+
}
108+
89109
try {
90110
await Promise.all([session, bidis, unis]);
91111
} catch (err) {
@@ -211,6 +231,37 @@ export class Connection implements Established {
211231
}
212232
}
213233

234+
/**
235+
* Polls the QUIC congestion controller for estimated send rate.
236+
*/
237+
#runSendBandwidth(bandwidth: Bandwidth) {
238+
// getStats is not yet in the TypeScript WebTransport type definitions.
239+
const quic = this.#quic as unknown as {
240+
getStats?: () => Promise<{ estimatedSendRate: number | null }>;
241+
};
242+
243+
const getStats = quic.getStats?.bind(quic);
244+
if (!getStats) return;
245+
246+
const run = async () => {
247+
try {
248+
while (!this.#closed) {
249+
const timeout = new Promise<void>((resolve) => setTimeout(resolve, SEND_BW_POLL_INTERVAL));
250+
await timeout;
251+
252+
if (this.#closed) break;
253+
254+
const stats = await getStats();
255+
bandwidth.set(stats.estimatedSendRate ?? undefined);
256+
}
257+
} catch {
258+
// Connection closed.
259+
}
260+
};
261+
262+
void run();
263+
}
264+
214265
/**
215266
* Returns a promise that resolves when the connection is closed.
216267
* @returns A promise that resolves when closed

js/lite/src/lite/subscriber.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Announced } from "../announced.ts";
2+
import type { Bandwidth } from "../bandwidth.ts";
23
import { Broadcast, type TrackRequest } from "../broadcast.ts";
34
import { Group } from "../group.ts";
45
import * as Path from "../path.ts";
@@ -7,6 +8,7 @@ import type { Track } from "../track.ts";
78
import { error } from "../util/error.ts";
89
import { Announce, AnnounceInit, AnnounceInterest } from "./announce.ts";
910
import type { Group as GroupMessage } from "./group.ts";
11+
import { Probe } from "./probe.ts";
1012
import { StreamId } from "./stream.ts";
1113
import { decodeSubscribeResponse, Subscribe } from "./subscribe.ts";
1214
import { Version } from "./version.ts";
@@ -26,15 +28,25 @@ export class Subscriber {
2628
#subscribes = new Map<bigint, Track>();
2729
#subscribeNext = 0n;
2830

31+
// Recv bandwidth producer (Lite03+ only).
32+
#recvBandwidth?: Bandwidth;
33+
2934
/**
3035
* Creates a new Subscriber instance.
3136
* @param quic - The WebTransport session to use
37+
* @param version - The protocol version
38+
* @param recvBandwidth - Optional bandwidth producer for PROBE (Lite03+ only)
3239
*
3340
* @internal
3441
*/
35-
constructor(quic: WebTransport, version: Version) {
42+
constructor(quic: WebTransport, version: Version, recvBandwidth?: Bandwidth) {
3643
this.#quic = quic;
3744
this.version = version;
45+
this.#recvBandwidth = recvBandwidth;
46+
47+
if (recvBandwidth && version === Version.DRAFT_03) {
48+
this.#runProbe();
49+
}
3850
}
3951

4052
/**
@@ -195,6 +207,38 @@ export class Subscriber {
195207
}
196208
}
197209

210+
/**
211+
* Opens a PROBE bidi stream to receive bandwidth estimates from the publisher.
212+
* Retries on error.
213+
*/
214+
#runProbe() {
215+
const run = async () => {
216+
for (;;) {
217+
try {
218+
const stream = await Stream.open(this.#quic);
219+
await stream.writer.u53(StreamId.Probe);
220+
221+
for (;;) {
222+
const probe = await Probe.decodeMaybe(stream.reader, this.version);
223+
if (!probe) break;
224+
this.#recvBandwidth?.set(probe.bitrate);
225+
}
226+
} catch (err: unknown) {
227+
const e = error(err);
228+
console.debug(`probe recv error: ${e.message}`);
229+
}
230+
231+
// Clear the bitrate on disconnect.
232+
this.#recvBandwidth?.set(undefined);
233+
234+
// Wait before retrying.
235+
await new Promise((resolve) => setTimeout(resolve, 1000));
236+
}
237+
};
238+
239+
void run();
240+
}
241+
198242
close() {
199243
for (const track of this.#subscribes.values()) {
200244
track.close();

js/publish/src/broadcast.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ export class Broadcast {
3535
preview: Preview;
3636
user: User.Info;
3737

38+
// Derived signal tracking the connection's send bandwidth estimate.
39+
#sendBandwidth = new Signal<number | undefined>(undefined);
40+
3841
signals = new Effect();
3942

4043
constructor(props?: BroadcastProps) {
@@ -43,13 +46,25 @@ export class Broadcast {
4346
this.name = Signal.from(props?.name ?? Moq.Path.empty());
4447

4548
this.audio = new Audio.Encoder(props?.audio);
46-
this.video = new Video.Root(props?.video);
49+
this.video = new Video.Root({ ...props?.video, sendBandwidth: this.#sendBandwidth });
4750
this.location = new Location.Root(props?.location);
4851
this.chat = new Chat.Root(props?.chat);
4952
this.preview = new Preview(props?.preview);
5053
this.user = new User.Info(props?.user);
5154

5255
this.signals.run(this.#run.bind(this));
56+
this.signals.run(this.#runSendBandwidth.bind(this));
57+
}
58+
59+
#runSendBandwidth(effect: Effect) {
60+
const connection = effect.get(this.connection);
61+
if (!connection?.sendBandwidth) {
62+
effect.set(this.#sendBandwidth, undefined);
63+
return;
64+
}
65+
66+
const estimate = effect.get(connection.sendBandwidth.signal);
67+
effect.set(this.#sendBandwidth, estimate);
5368
}
5469

5570
#run(effect: Effect) {

js/publish/src/video/encoder.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ export interface EncoderProps {
1010
enabled?: boolean | Signal<boolean>;
1111
config?: EncoderConfig | Signal<EncoderConfig | undefined>;
1212
container?: Catalog.Container;
13+
14+
// Optional: estimated send bandwidth in bps. When set and no explicit maxBitrate
15+
// is configured, the encoder will cap its bitrate to this value.
16+
sendBandwidth?: Getter<number | undefined>;
1317
}
1418

1519
// TODO support signals?
@@ -57,11 +61,15 @@ export class Encoder {
5761
// True when the encoder is actively serving a track.
5862
active = new Signal<boolean>(false);
5963

64+
// Optional: estimated send bandwidth for adaptive bitrate capping.
65+
sendBandwidth: Getter<number | undefined>;
66+
6067
constructor(frame: Getter<VideoFrame | undefined>, source: Signal<Source | undefined>, props?: EncoderProps) {
6168
this.frame = frame;
6269
this.source = source;
6370
this.enabled = Signal.from(props?.enabled ?? false);
6471
this.config = Signal.from(props?.config);
72+
this.sendBandwidth = props?.sendBandwidth ?? new Signal<number | undefined>(undefined);
6573

6674
this.#signals.run(this.#runCatalog.bind(this));
6775
this.#signals.run(this.#runConfig.bind(this));
@@ -203,6 +211,18 @@ export class Encoder {
203211

204212
bitrate = Math.round(Math.min(bitrate, user.maxBitrate || bitrate));
205213

214+
// If no explicit maxBitrate, cap to the estimated send bandwidth (with 90% safety margin).
215+
if (!user.maxBitrate) {
216+
const sendBw = effect.get(this.sendBandwidth);
217+
if (sendBw != null) {
218+
// Reserve ~10% for audio and protocol overhead.
219+
const cap = Math.round(sendBw * 0.9);
220+
// Don't go below 100kbps to keep encoding usable.
221+
const MIN_BITRATE = 100_000;
222+
bitrate = Math.max(MIN_BITRATE, Math.min(bitrate, cap));
223+
}
224+
}
225+
206226
const config: VideoEncoderConfig = {
207227
codec,
208228
width: dimensions.width,

js/publish/src/video/index.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as Catalog from "@moq/hang/catalog";
2-
import { Effect, Signal } from "@moq/signals";
2+
import { Effect, type Getter, Signal } from "@moq/signals";
33
import { Encoder, type EncoderProps } from "./encoder";
44
import { TrackProcessor } from "./polyfill";
55
import type { Source } from "./types";
@@ -12,6 +12,7 @@ export type Props = {
1212
hd?: EncoderProps;
1313
sd?: EncoderProps;
1414
flip?: boolean | Signal<boolean>;
15+
sendBandwidth?: Getter<number | undefined>;
1516
};
1617

1718
export class Root {
@@ -34,8 +35,10 @@ export class Root {
3435
constructor(props?: Props) {
3536
this.source = Signal.from(props?.source);
3637

37-
this.hd = new Encoder(this.frame, this.source, props?.hd);
38-
this.sd = new Encoder(this.frame, this.source, props?.sd);
38+
const hdProps = props?.sendBandwidth ? { ...props?.hd, sendBandwidth: props.sendBandwidth } : props?.hd;
39+
const sdProps = props?.sendBandwidth ? { ...props?.sd, sendBandwidth: props.sendBandwidth } : props?.sd;
40+
this.hd = new Encoder(this.frame, this.source, hdProps);
41+
this.sd = new Encoder(this.frame, this.source, sdProps);
3942

4043
this.flip = Signal.from(props?.flip ?? false);
4144

js/watch/src/video/source.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,25 @@ export class Source {
212212

213213
const target = effect.get(this.target);
214214

215+
// If no explicit bitrate target, use the recv bandwidth estimate from the connection.
216+
let effectiveTarget = target;
217+
if (!target?.bitrate && !target?.name) {
218+
const broadcast = effect.get(this.broadcast);
219+
const connection = broadcast ? effect.get(broadcast.connection) : undefined;
220+
const recvBw = connection?.recvBandwidth;
221+
if (recvBw) {
222+
const estimate = effect.get(recvBw.signal);
223+
if (estimate != null) {
224+
// Apply a safety margin (80%) to avoid oscillation.
225+
const safeBitrate = Math.round(estimate * 0.8);
226+
effectiveTarget = { ...target, bitrate: safeBitrate };
227+
}
228+
}
229+
}
230+
215231
// Manual selection by name
216-
const manual = target?.name;
217-
const selected = manual && manual in available ? manual : this.#select(available, target);
232+
const manual = effectiveTarget?.name;
233+
const selected = manual && manual in available ? manual : this.#select(available, effectiveTarget);
218234
if (!selected) return;
219235

220236
const config = available[selected];

0 commit comments

Comments
 (0)