Skip to content

Commit e74686b

Browse files
committed
Revise bitrate estimation API based on feedback
- BandwidthProducer: add close(err)/closed() for cancellation, rename get() to peek() - Move send bandwidth polling to Session (version-agnostic, depends on QUIC backend support not protocol version) - Remove send bandwidth from lite/ietf start() return values - Make recv bandwidth errors fatal (no retry loop) - Use forward-compatible version matching (list older versions explicitly) - JS: make send bandwidth version-agnostic (check getStats existence), use setInterval with closed promise instead of fire-and-forget - JS: make probe async and fatal, called from connection Promise.all - JS watch: handle target.name early before ABR, remove MIN_BITRATE - JS publish: read sendBandwidth from connection directly instead of piping through props - CLAUDE.md: add JS async patterns convention (Effect.interval/timer) https://claude.ai/code/session_01C68pEkoUhQqtQALYxRajhx
1 parent 490e53c commit e74686b

14 files changed

Lines changed: 220 additions & 226 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ match version {
111111
- **Formatting/Linting**: Biome for JS/TS formatting and linting
112112
- **UI**: Solid.js for Web Components in `@moq/watch/ui` and `@moq/publish/ui`
113113
- **Builds**: Nix flake for reproducible builds (optional)
114+
- **JS async patterns**: Use `Effect.interval()`, `Effect.timer()`, and `Effect.event()` helpers from `@moq/signals` instead of raw `setInterval`, `setTimeout`, `addEventListener`. These handle cleanup automatically when the Effect is closed.
114115

115116
## Testing Approach
116117

js/lite/src/lite/connection.ts

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Announced } from "../announced.ts";
2-
import { type Bandwidth, createBandwidth } from "../bandwidth.ts";
2+
import type { Bandwidth } from "../bandwidth.ts";
33
import type { Broadcast } from "../broadcast.ts";
44
import type { Established } from "../connection/established.ts";
55
import * as Path from "../path.ts";
@@ -42,9 +42,6 @@ export class Connection implements Established {
4242
// Module for distributing tracks.
4343
#subscriber: Subscriber;
4444

45-
// Just to avoid logging when `close()` is called.
46-
#closed = false;
47-
4845
/** Estimated send bitrate from the congestion controller. */
4946
readonly sendBandwidth?: Bandwidth;
5047

@@ -66,10 +63,15 @@ export class Connection implements Established {
6663
this.version = versionName(version);
6764
this.#version = version;
6865

69-
// Set up bandwidth estimation for Lite03+.
70-
if (version === Version.DRAFT_03) {
71-
this.sendBandwidth = createBandwidth();
72-
this.recvBandwidth = createBandwidth();
66+
// Send bandwidth is version-agnostic: depends on browser/QUIC support.
67+
const hasGetStats = typeof (quic as unknown as { getStats?: unknown }).getStats === "function";
68+
if (hasGetStats) {
69+
this.sendBandwidth = new Bandwidth();
70+
}
71+
72+
// Recv bandwidth requires PROBE support (not available in older drafts).
73+
if (version !== Version.DRAFT_01 && version !== Version.DRAFT_02) {
74+
this.recvBandwidth = new Bandwidth();
7375
}
7476

7577
this.#publisher = new Publisher(this.#quic, this.#version);
@@ -82,9 +84,6 @@ export class Connection implements Established {
8284
* Closes the connection.
8385
*/
8486
close() {
85-
if (this.#closed) return;
86-
87-
this.#closed = true;
8887
this.#publisher.close();
8988
this.#subscriber.close();
9089

@@ -97,21 +96,20 @@ export class Connection implements Established {
9796
}
9897

9998
async #run(): Promise<void> {
100-
const session = this.#runSession();
101-
const bidis = this.#runBidis();
102-
const unis = this.#runUnis();
99+
const tasks: Promise<void>[] = [this.#runSession(), this.#runBidis(), this.#runUnis()];
103100

104-
// Start polling send bandwidth if supported.
105101
if (this.sendBandwidth) {
106-
this.#runSendBandwidth(this.sendBandwidth);
102+
tasks.push(this.#runSendBandwidth(this.sendBandwidth));
103+
}
104+
105+
if (this.recvBandwidth) {
106+
tasks.push(this.#subscriber.runProbe());
107107
}
108108

109109
try {
110-
await Promise.all([session, bidis, unis]);
110+
await Promise.all(tasks);
111111
} catch (err) {
112-
if (!this.#closed) {
113-
console.error("fatal error running connection", err);
114-
}
112+
console.error("fatal error running connection", err);
115113
} finally {
116114
this.close();
117115
}
@@ -233,31 +231,31 @@ export class Connection implements Established {
233231

234232
/**
235233
* Polls the QUIC congestion controller for estimated send rate.
234+
* Resolves when the connection is closed.
236235
*/
237-
#runSendBandwidth(bandwidth: Bandwidth) {
236+
async #runSendBandwidth(bandwidth: Bandwidth): Promise<void> {
238237
// getStats is not yet in the TypeScript WebTransport type definitions.
239238
const quic = this.#quic as unknown as {
240-
getStats?: () => Promise<{ estimatedSendRate: number | null }>;
239+
getStats: () => Promise<{ estimatedSendRate: number | null }>;
241240
};
242241

243-
const getStats = quic.getStats?.bind(quic);
244-
if (!getStats) return;
245-
246-
const run = async () => {
247-
while (!this.#closed) {
248-
await new Promise<void>((resolve) => setTimeout(resolve, SEND_BW_POLL_INTERVAL));
249-
if (this.#closed) break;
250-
242+
return new Promise<void>((resolve) => {
243+
const id = setInterval(async () => {
251244
try {
252-
const stats = await getStats();
245+
const stats = await quic.getStats();
253246
bandwidth.set(stats.estimatedSendRate ?? undefined);
254247
} catch {
255-
if (this.#closed) break;
248+
// Connection likely closed.
249+
clearInterval(id);
250+
resolve();
256251
}
257-
}
258-
};
252+
}, SEND_BW_POLL_INTERVAL);
259253

260-
void run();
254+
void this.closed.then(() => {
255+
clearInterval(id);
256+
resolve();
257+
});
258+
});
261259
}
262260

263261
/**

js/lite/src/lite/subscriber.ts

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,14 @@ export class Subscriber {
3535
* Creates a new Subscriber instance.
3636
* @param quic - The WebTransport session to use
3737
* @param version - The protocol version
38-
* @param recvBandwidth - Optional bandwidth producer for PROBE (Lite03+ only)
38+
* @param recvBandwidth - Optional bandwidth producer for PROBE
3939
*
4040
* @internal
4141
*/
4242
constructor(quic: WebTransport, version: Version, recvBandwidth?: Bandwidth) {
4343
this.#quic = quic;
4444
this.version = version;
4545
this.#recvBandwidth = recvBandwidth;
46-
47-
if (recvBandwidth && version === Version.DRAFT_03) {
48-
this.#runProbe();
49-
}
5046
}
5147

5248
/**
@@ -211,36 +207,23 @@ export class Subscriber {
211207

212208
/**
213209
* Opens a PROBE bidi stream to receive bandwidth estimates from the publisher.
214-
* Retries on error.
210+
* Returns immediately if recv bandwidth is not supported.
211+
* Errors are fatal and propagate to the connection.
212+
*
213+
* @internal
215214
*/
216-
#runProbe() {
217-
const run = async () => {
218-
while (!this.#closed) {
219-
try {
220-
const stream = await Stream.open(this.#quic);
221-
await stream.writer.u53(StreamId.Probe);
222-
223-
for (;;) {
224-
const probe = await Probe.decodeMaybe(stream.reader, this.version);
225-
if (!probe) break;
226-
this.#recvBandwidth?.set(probe.bitrate);
227-
}
228-
} catch (err: unknown) {
229-
const e = error(err);
230-
console.debug(`probe recv error: ${e.message}`);
231-
}
215+
async runProbe(): Promise<void> {
216+
if (!this.#recvBandwidth) return;
217+
if (this.version === Version.DRAFT_01 || this.version === Version.DRAFT_02) return;
232218

233-
// Clear the bitrate on disconnect.
234-
this.#recvBandwidth?.set(undefined);
235-
236-
if (this.#closed) break;
237-
238-
// Wait before retrying.
239-
await new Promise((resolve) => setTimeout(resolve, 1000));
240-
}
241-
};
219+
const stream = await Stream.open(this.#quic);
220+
await stream.writer.u53(StreamId.Probe);
242221

243-
void run();
222+
for (;;) {
223+
const probe = await Probe.decodeMaybe(stream.reader, this.version);
224+
if (!probe) break;
225+
this.#recvBandwidth.set(probe.bitrate);
226+
}
244227
}
245228

246229
close() {

js/publish/src/broadcast.ts

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ export class Broadcast {
3535
preview: Preview;
3636
user: User.Info;
3737

38-
// Derived signal tracking the connection's send bandwidth estimate.
39-
#sendBandwidth = new Signal<number | undefined>(undefined);
40-
4138
signals = new Effect();
4239

4340
constructor(props?: BroadcastProps) {
@@ -46,25 +43,13 @@ export class Broadcast {
4643
this.name = Signal.from(props?.name ?? Moq.Path.empty());
4744

4845
this.audio = new Audio.Encoder(props?.audio);
49-
this.video = new Video.Root({ ...props?.video, sendBandwidth: this.#sendBandwidth });
46+
this.video = new Video.Root({ ...props?.video, connection: this.connection });
5047
this.location = new Location.Root(props?.location);
5148
this.chat = new Chat.Root(props?.chat);
5249
this.preview = new Preview(props?.preview);
5350
this.user = new User.Info(props?.user);
5451

5552
this.signals.run(this.#run.bind(this));
56-
this.signals.run(this.#runSendBandwidth.bind(this));
57-
}
58-
59-
#runSendBandwidth(effect: Effect) {
60-
const connection = effect.get(this.connection);
61-
if (!connection?.sendBandwidth) {
62-
effect.set(this.#sendBandwidth, undefined);
63-
return;
64-
}
65-
66-
const estimate = effect.get(connection.sendBandwidth);
67-
effect.set(this.#sendBandwidth, estimate);
6853
}
6954

7055
#run(effect: Effect) {

js/publish/src/video/encoder.ts

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@ export interface EncoderProps {
1010
enabled?: boolean | Signal<boolean>;
1111
config?: EncoderConfig | Signal<EncoderConfig | undefined>;
1212
container?: Catalog.Container;
13-
14-
// Optional: estimated send bandwidth in bps. When set and no explicit maxBitrate
15-
// is configured, the encoder will cap its bitrate to this value.
16-
sendBandwidth?: Getter<number | undefined>;
1713
}
1814

1915
// TODO support signals?
@@ -61,15 +57,20 @@ export class Encoder {
6157
// True when the encoder is actively serving a track.
6258
active = new Signal<boolean>(false);
6359

64-
// Optional: estimated send bandwidth for adaptive bitrate capping.
65-
sendBandwidth: Getter<number | undefined>;
60+
// Connection signal for reading send bandwidth.
61+
connection: Getter<Moq.Connection.Established | undefined>;
6662

67-
constructor(frame: Getter<VideoFrame | undefined>, source: Signal<Source | undefined>, props?: EncoderProps) {
63+
constructor(
64+
frame: Getter<VideoFrame | undefined>,
65+
source: Signal<Source | undefined>,
66+
connection: Getter<Moq.Connection.Established | undefined>,
67+
props?: EncoderProps,
68+
) {
6869
this.frame = frame;
6970
this.source = source;
71+
this.connection = connection;
7072
this.enabled = Signal.from(props?.enabled ?? false);
7173
this.config = Signal.from(props?.config);
72-
this.sendBandwidth = props?.sendBandwidth ?? new Signal<number | undefined>(undefined);
7374

7475
this.#signals.run(this.#runCatalog.bind(this));
7576
this.#signals.run(this.#runConfig.bind(this));
@@ -213,13 +214,15 @@ export class Encoder {
213214

214215
// If no explicit maxBitrate, cap to the estimated send bandwidth (with 90% safety margin).
215216
if (!user.maxBitrate) {
216-
const sendBw = effect.get(this.sendBandwidth);
217-
if (sendBw != null) {
218-
// Reserve ~10% for audio and protocol overhead.
219-
const cap = Math.round(sendBw * 0.9);
220-
// Don't go below 100kbps to keep encoding usable.
221-
const MIN_BITRATE = 100_000;
222-
bitrate = Math.max(MIN_BITRATE, Math.min(bitrate, cap));
217+
const conn = effect.get(this.connection);
218+
const sendBw = conn?.sendBandwidth;
219+
if (sendBw) {
220+
const estimate = effect.get(sendBw.signal);
221+
if (estimate != null) {
222+
// Reserve ~10% for audio and protocol overhead.
223+
const cap = Math.round(estimate * 0.9);
224+
bitrate = Math.min(bitrate, cap);
225+
}
223226
}
224227
}
225228

js/publish/src/video/index.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import * as Catalog from "@moq/hang/catalog";
2+
import type * as Moq from "@moq/lite";
23
import { Effect, type Getter, Signal } from "@moq/signals";
34
import { Encoder, type EncoderProps } from "./encoder";
45
import { TrackProcessor } from "./polyfill";
@@ -12,7 +13,7 @@ export type Props = {
1213
hd?: EncoderProps;
1314
sd?: EncoderProps;
1415
flip?: boolean | Signal<boolean>;
15-
sendBandwidth?: Getter<number | undefined>;
16+
connection?: Getter<Moq.Connection.Established | undefined>;
1617
};
1718

1819
export class Root {
@@ -35,10 +36,9 @@ export class Root {
3536
constructor(props?: Props) {
3637
this.source = Signal.from(props?.source);
3738

38-
const hdProps = props?.sendBandwidth ? { ...props?.hd, sendBandwidth: props.sendBandwidth } : props?.hd;
39-
const sdProps = props?.sendBandwidth ? { ...props?.sd, sendBandwidth: props.sendBandwidth } : props?.sd;
40-
this.hd = new Encoder(this.frame, this.source, hdProps);
41-
this.sd = new Encoder(this.frame, this.source, sdProps);
39+
const connection = props?.connection ?? new Signal<Moq.Connection.Established | undefined>(undefined);
40+
this.hd = new Encoder(this.frame, this.source, connection, props?.hd);
41+
this.sd = new Encoder(this.frame, this.source, connection, props?.sd);
4242

4343
this.flip = Signal.from(props?.flip ?? false);
4444

js/watch/src/video/source.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,18 @@ export class Source {
212212

213213
const target = effect.get(this.target);
214214

215-
// If no explicit bitrate target, use the recv bandwidth estimate from the connection.
215+
// Manual selection by name — skip all ABR logic.
216+
if (target?.name && target.name in available) {
217+
const config = available[target.name];
218+
effect.set(this.#track, target.name);
219+
effect.set(this.#config, config);
220+
effect.set(this.sync.video, config.jitter as Moq.Time.Milli | undefined);
221+
return;
222+
}
223+
224+
// Auto-select: use recv bandwidth if no explicit bitrate target.
216225
let effectiveTarget = target;
217-
if (!target?.bitrate && !target?.name) {
226+
if (!target?.bitrate) {
218227
const broadcast = effect.get(this.broadcast);
219228
const connection = broadcast ? effect.get(broadcast.connection) : undefined;
220229
const recvBw = connection?.recvBandwidth;
@@ -228,9 +237,7 @@ export class Source {
228237
}
229238
}
230239

231-
// Manual selection by name
232-
const manual = effectiveTarget?.name;
233-
const selected = manual && manual in available ? manual : this.#select(available, effectiveTarget);
240+
const selected = this.#select(available, effectiveTarget);
234241
if (!selected) return;
235242

236243
const config = available[selected];

0 commit comments

Comments
 (0)