Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/long-keys-roll.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Cancel losing timeout in AudioMixer race to prevent orphaned timers
102 changes: 102 additions & 0 deletions packages/livekit-rtc/src/audio_mixer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,106 @@ describe('AudioMixer', () => {
// Should get at least 2 frames (stream exhausts after 2)
expect(frames.length).toBeGreaterThanOrEqual(2);
});

it('completes mixing without lingering timers when iterator is fast', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
// Long timeout so the iterator always wins the race.
// Before the fix, each iteration leaked a 5s timer; with the fix,
// cancel() clears it immediately so the mixer shuts down without delay.
streamTimeoutMs: 5000,
});

const stream = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 42);
mixer.addStream(stream);

const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 2) break;
}

await mixer.aclose();

expect(frames.length).toBe(2);
// Verify the frames contain the expected mixed value
for (const frame of frames) {
expect(frame.data[0]).toBe(42);
}
});

it('produces frames even with many race iterations', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
streamTimeoutMs: 5000,
});

// Use more frames to stress multiple race iterations
const stream = createMockAudioStream(6, sampleRate, numChannels, samplesPerChannel, 10);
mixer.addStream(stream);

const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 4) break;
}

await mixer.aclose();

expect(frames.length).toBe(4);
// All frames should contain the expected value
for (const frame of frames) {
expect(frame.data[0]).toBe(10);
}
});

it('handles slow streams via timeout path', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
// Very short timeout to trigger the timeout path
streamTimeoutMs: 1,
});

// Create a stream that is slower than the timeout
async function* slowStream(): AsyncGenerator<AudioFrame> {
await new Promise((resolve) => setTimeout(resolve, 200));
const data = new Int16Array(numChannels * samplesPerChannel).fill(500);
yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel);
}

// Suppress the expected console.warn from the timeout path
const originalWarn = console.warn;
const warnings: string[] = [];
console.warn = (...args: unknown[]) => {
warnings.push(args.map(String).join(' '));
};

try {
mixer.addStream(slowStream());

// The mixer should produce a frame (zero-padded due to timeout)
// and auto-close when the stream exhausts.
const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 1) break;
}

await mixer.aclose();

// The timeout warning should have been logged
expect(warnings.some((w) => w.includes('stream timeout after'))).toBe(true);
} finally {
console.warn = originalWarn;
}
});
});
33 changes: 27 additions & 6 deletions packages/livekit-rtc/src/audio_mixer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,20 +310,26 @@ export class AudioMixer {

// Accumulate data until we have at least chunkSize samples
while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) {
const { result, clearTimeout: cancel } = this.timeoutRace(
iterator.next(),
this.streamTimeoutMs,
);
try {
const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]);
const value = await result;
cancel();

if (result === 'timeout') {
if (value === 'timeout') {
console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`);
break;
}
const iterResult = value;

if (result.done) {
if (iterResult.done) {
exhausted = true;
break;
}

const frame = result.value;
const frame = iterResult.value;
const newData = frame.data;

// Mark that we received data in this call
Expand All @@ -339,6 +345,9 @@ export class AudioMixer {
buf = combined;
}
} catch (error) {
// Clear the timeout on the error path too, so it doesn't linger
// when iterator.next() rejects.
cancel();
console.error(`AudioMixer: Error reading from stream:`, error);
exhausted = true;
break;
Expand Down Expand Up @@ -412,7 +421,19 @@ export class AudioMixer {
return new Promise((resolve) => setTimeout(resolve, ms));
}

private timeout(ms: number): Promise<'timeout'> {
return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms));
/** Race a promise against a timeout, returning a handle to clear the timer
* so the losing setTimeout doesn't linger after the winner resolves. */
private timeoutRace<T>(
promise: Promise<T>,
ms: number,
): { result: Promise<T | 'timeout'>; clearTimeout: () => void } {
let timer: ReturnType<typeof setTimeout>;
const timeoutPromise = new Promise<'timeout'>((resolve) => {
timer = setTimeout(() => resolve('timeout'), ms);
});
return {
result: Promise.race([promise, timeoutPromise]),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be a bit leaner if we simply did

return Promise.race([promise.finally(clearTimeout), timeoutPromise])

and all callees of this util would get the cleanup out of the box

clearTimeout: () => clearTimeout(timer),
};
}
}
Loading