Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/kind-foxes-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Add AbortSignal to waitFor() to clean up listeners on disconnect and send trailer on stream abort
29 changes: 26 additions & 3 deletions packages/livekit-rtc/src/ffi_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,37 @@ export class FfiClient extends (EventEmitter as new () => TypedEmitter<FfiClient
return livekitRetrievePtr(data);
}

async waitFor<T>(predicate: (ev: FfiEvent) => boolean): Promise<T> {
return new Promise<T>((resolve) => {
async waitFor<T>(
predicate: (ev: FfiEvent) => boolean,
options?: { signal?: AbortSignal },
): Promise<T> {
return new Promise<T>((resolve, reject) => {
const listener = (ev: FfiEvent) => {
if (predicate(ev)) {
this.off(FfiClientEvent.FfiEvent, listener);
cleanup();
resolve(ev.message.value as T);
}
};

const cleanup = () => {
this.off(FfiClientEvent.FfiEvent, listener);
options?.signal?.removeEventListener('abort', onAbort);
};

// If an AbortSignal is provided, remove the listener when the signal
// fires so that pending waitFor() calls don't leak listeners after
// the room disconnects or the operation is cancelled.
const onAbort = () => {
cleanup();
reject(options?.signal?.reason ?? new Error('waitFor aborted'));
};

if (options?.signal?.aborted) {
reject(options.signal.reason ?? new Error('waitFor aborted'));
return;
}

options?.signal?.addEventListener('abort', onAbort);
this.on(FfiClientEvent.FfiEvent, listener);
});
}
Expand Down
149 changes: 100 additions & 49 deletions packages/livekit-rtc/src/participant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,16 @@ export class LocalParticipant extends Participant {

private ffiEventLock: Mutex;

// Signal that fires when the owning Room disconnects, used to cancel
// pending FfiClient.waitFor() listeners so they don't leak.
private disconnectSignal: AbortSignal;

trackPublications: Map<string, LocalTrackPublication> = new Map();

constructor(info: OwnedParticipant, ffiEventLock: Mutex) {
constructor(info: OwnedParticipant, ffiEventLock: Mutex, disconnectSignal: AbortSignal) {
super(info);
this.ffiEventLock = ffiEventLock;
this.disconnectSignal = disconnectSignal;
}

async publishData(data: Uint8Array, options: DataPublishOptions) {
Expand All @@ -178,9 +183,10 @@ export class LocalParticipant extends Participant {
message: { case: 'publishData', value: req },
});

const cb = await FfiClient.instance.waitFor<PublishDataCallback>((ev) => {
return ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PublishDataCallback>(
(ev) => ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
Expand All @@ -198,9 +204,10 @@ export class LocalParticipant extends Participant {
message: { case: 'publishSipDtmf', value: req },
});

const cb = await FfiClient.instance.waitFor<PublishSipDtmfCallback>((ev) => {
return ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PublishSipDtmfCallback>(
(ev) => ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
Expand Down Expand Up @@ -229,9 +236,10 @@ export class LocalParticipant extends Participant {
message: { case: 'publishTranscription', value: req },
});

const cb = await FfiClient.instance.waitFor<PublishTranscriptionCallback>((ev) => {
return ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PublishTranscriptionCallback>(
(ev) => ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
Expand All @@ -248,9 +256,10 @@ export class LocalParticipant extends Participant {
message: { case: 'setLocalMetadata', value: req },
});

await FfiClient.instance.waitFor<SetLocalMetadataCallback>((ev) => {
return ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId;
});
await FfiClient.instance.waitFor<SetLocalMetadataCallback>(
(ev) => ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);
}

/**
Expand Down Expand Up @@ -335,8 +344,24 @@ export class LocalParticipant extends Participant {
});
await sendTrailer(trailerReq);
},
abort(err) {
// Send a trailer with the error reason so the remote side's stream
// controller is closed instead of waiting for data that won't arrive.
async abort(err) {
log.error(err, 'Sink Error');
try {
const trailerReq = new SendStreamTrailerRequest({
senderIdentity,
localParticipantHandle: localHandle,
destinationIdentities,
trailer: new DataStream_Trailer({
streamId,
reason: err instanceof Error ? err.message : String(err ?? ''),
}),
});
await sendTrailer(trailerReq);
} catch {
// Best-effort: the connection may already be gone.
}
},
});

Expand Down Expand Up @@ -450,8 +475,24 @@ export class LocalParticipant extends Participant {
});
await sendTrailer(trailerReq);
},
abort(err) {
// Send a trailer with the error reason so the remote side's stream
// controller is closed instead of waiting for data that won't arrive.
async abort(err) {
log.error(err, 'Sink error');
try {
const trailerReq = new SendStreamTrailerRequest({
senderIdentity,
localParticipantHandle: localHandle,
destinationIdentities,
trailer: new DataStream_Trailer({
streamId,
reason: err instanceof Error ? err.message : String(err ?? ''),
}),
});
await sendTrailer(trailerReq);
} catch {
// Best-effort: the connection may already be gone.
}
},
});

Expand Down Expand Up @@ -494,44 +535,47 @@ export class LocalParticipant extends Participant {
message: { case: type, value: req },
});

const cb = await FfiClient.instance.waitFor<SendStreamHeaderCallback>((ev) => {
return ev.message.case == type && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendStreamHeaderCallback>(
(ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
}
}

private async sendStreamChunk(req: SendStreamChunkRequest) {
private sendStreamChunk = async (req: SendStreamChunkRequest) => {
const type = 'sendStreamChunk';
const res = FfiClient.instance.request<SendStreamChunkResponse>({
message: { case: type, value: req },
});

const cb = await FfiClient.instance.waitFor<SendStreamChunkCallback>((ev) => {
return ev.message.case == type && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendStreamChunkCallback>(
(ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
}
}
};

private async sendStreamTrailer(req: SendStreamTrailerRequest) {
private sendStreamTrailer = async (req: SendStreamTrailerRequest) => {
const type = 'sendStreamTrailer';
const res = FfiClient.instance.request<SendStreamTrailerResponse>({
message: { case: type, value: req },
});

const cb = await FfiClient.instance.waitFor<SendStreamTrailerCallback>((ev) => {
return ev.message.case == type && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendStreamTrailerCallback>(
(ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
}
}
};

/**
* Sends a chat message to participants in the room
Expand All @@ -557,9 +601,10 @@ export class LocalParticipant extends Participant {
message: { case: 'sendChatMessage', value: req },
});

const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>((ev) => {
return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>(
(ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

switch (cb.message.case) {
case 'chatMessage':
Expand Down Expand Up @@ -603,9 +648,10 @@ export class LocalParticipant extends Participant {
message: { case: 'editChatMessage', value: req },
});

const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>((ev) => {
return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>(
(ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

switch (cb.message.case) {
case 'chatMessage':
Expand All @@ -632,9 +678,10 @@ export class LocalParticipant extends Participant {
message: { case: 'setLocalName', value: req },
});

await FfiClient.instance.waitFor<SetLocalNameCallback>((ev) => {
return ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId;
});
await FfiClient.instance.waitFor<SetLocalNameCallback>(
(ev) => ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);
}

async setAttributes(attributes: Record<string, string>) {
Expand All @@ -647,9 +694,10 @@ export class LocalParticipant extends Participant {
message: { case: 'setLocalAttributes', value: req },
});

await FfiClient.instance.waitFor<SetLocalAttributesCallback>((ev) => {
return ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId;
});
await FfiClient.instance.waitFor<SetLocalAttributesCallback>(
(ev) => ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);
}

async publishTrack(
Expand All @@ -669,9 +717,10 @@ export class LocalParticipant extends Participant {
});

try {
const cb = await FfiClient.instance.waitFor<PublishTrackCallback>((ev) => {
return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PublishTrackCallback>(
(ev) => ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

switch (cb.message.case) {
case 'publication':
Expand Down Expand Up @@ -702,9 +751,10 @@ export class LocalParticipant extends Participant {
message: { case: 'unpublishTrack', value: req },
});

const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>((ev) => {
return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>(
(ev) => ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
Expand Down Expand Up @@ -744,9 +794,10 @@ export class LocalParticipant extends Participant {
message: { case: 'performRpc', value: req },
});

const cb = await FfiClient.instance.waitFor<PerformRpcCallback>((ev) => {
return ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PerformRpcCallback>(
(ev) => ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw RpcError.fromProto(cb.error);
Expand Down
16 changes: 16 additions & 0 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>

private preConnectEvents: FfiEvent[] = [];

// Aborted on disconnect to cancel any pending FfiClient.waitFor() listeners,
// preventing them from leaking when the room goes away.
private disconnectController = new AbortController();

private _token?: string;
private _serverUrl?: string;

Expand Down Expand Up @@ -241,9 +245,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this._serverUrl = url;
this.info = cb.message.value.room!.info;
this.connectionState = ConnectionState.CONN_CONNECTED;
// Reset the abort controller for this connection session so that
// a previous disconnect doesn't immediately cancel new operations.
this.disconnectController = new AbortController();
this.localParticipant = new LocalParticipant(
cb.message.value.localParticipant!,
this.ffiEventLock,
this.disconnectController.signal,
);

for (const pt of cb.message.value.participants) {
Expand Down Expand Up @@ -283,6 +291,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId;
});

// Abort all pending FfiClient.waitFor() listeners so they don't leak.
// This causes any in-flight operations (publishData, publishTrack, etc.)
// to reject and clean up their event listeners.
this.disconnectController.abort();

FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);
this.removeAllListeners();
}
Expand Down Expand Up @@ -599,6 +612,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
/*} else if (ev.case == 'connected') {
this.emit(RoomEvent.Connected);*/
} else if (ev.case == 'disconnected') {
// Abort pending waitFor() listeners on server-initiated disconnect too,
// not just on explicit disconnect() calls.
this.disconnectController.abort();
this.emit(RoomEvent.Disconnected, ev.value.reason!);
} else if (ev.case == 'reconnecting') {
this.emit(RoomEvent.Reconnecting);
Expand Down
Loading