From 93ce89c1b9eb6595be3112876e3dd4ec7232db2c Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 8 Apr 2026 09:50:23 +0900 Subject: [PATCH 1/8] Initial implementation --- packages/livekit-rtc/src/data_tracks/index.ts | 15 +++ packages/livekit-rtc/src/data_tracks/local.ts | 81 ++++++++++++ .../livekit-rtc/src/data_tracks/remote.ts | 118 ++++++++++++++++++ packages/livekit-rtc/src/data_tracks/types.ts | 36 ++++++ packages/livekit-rtc/src/index.ts | 12 ++ packages/livekit-rtc/src/participant.ts | 44 +++++++ packages/livekit-rtc/src/room.ts | 10 ++ 7 files changed, 316 insertions(+) create mode 100644 packages/livekit-rtc/src/data_tracks/index.ts create mode 100644 packages/livekit-rtc/src/data_tracks/local.ts create mode 100644 packages/livekit-rtc/src/data_tracks/remote.ts create mode 100644 packages/livekit-rtc/src/data_tracks/types.ts diff --git a/packages/livekit-rtc/src/data_tracks/index.ts b/packages/livekit-rtc/src/data_tracks/index.ts new file mode 100644 index 00000000..916a8850 --- /dev/null +++ b/packages/livekit-rtc/src/data_tracks/index.ts @@ -0,0 +1,15 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export { + type DataTrackFrame, + type DataTrackInfo, + type DataTrackOptions, + type DataTrackSubscribeOptions, + PublishDataTrackError, + DataTrackPushFrameError, +} from './types.js'; + +export { LocalDataTrack } from './local.js'; +export { RemoteDataTrack } from './remote.js'; diff --git a/packages/livekit-rtc/src/data_tracks/local.ts b/packages/livekit-rtc/src/data_tracks/local.ts new file mode 100644 index 00000000..83131559 --- /dev/null +++ b/packages/livekit-rtc/src/data_tracks/local.ts @@ -0,0 +1,81 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +import type { + LocalDataTrackIsPublishedResponse, + LocalDataTrackTryPushResponse, + OwnedLocalDataTrack, +} from '@livekit/rtc-ffi-bindings'; +import { + DataTrackFrame as ProtoDataTrackFrame, + LocalDataTrackIsPublishedRequest, + LocalDataTrackTryPushRequest, + LocalDataTrackUnpublishRequest, +} from '@livekit/rtc-ffi-bindings'; +import { FfiClient, FfiHandle } from '../ffi_client.js'; +import type { DataTrackFrame, DataTrackInfo } from './types.js'; +import { DataTrackPushFrameError } from './types.js'; + +export class LocalDataTrack { + private _info: DataTrackInfo; + private ffiHandle: FfiHandle; + + /** @internal */ + constructor(ownedTrack: OwnedLocalDataTrack) { + this._info = { + sid: ownedTrack.info!.sid!, + name: ownedTrack.info!.name!, + usesE2ee: ownedTrack.info!.usesE2ee!, + }; + this.ffiHandle = new FfiHandle(ownedTrack.handle!.id!); + } + + get info(): DataTrackInfo { + return this._info; + } + + isPublished(): boolean { + const res = FfiClient.instance.request({ + message: { + case: 'localDataTrackIsPublished', + value: new LocalDataTrackIsPublishedRequest({ + trackHandle: this.ffiHandle.handle, + }), + }, + }); + return res.isPublished!; + } + + tryPush(frame: DataTrackFrame): void { + const protoFrame = new ProtoDataTrackFrame({ + payload: frame.payload, + userTimestamp: frame.userTimestamp, + }); + + const res = FfiClient.instance.request({ + message: { + case: 'localDataTrackTryPush', + value: new LocalDataTrackTryPushRequest({ + trackHandle: this.ffiHandle.handle, + frame: protoFrame, + }), + }, + }); + + if (res.error) { + throw new DataTrackPushFrameError(res.error.message!); + } + } + + async unpublish(): Promise { + FfiClient.instance.request({ + message: { + case: 'localDataTrackUnpublish', + value: new LocalDataTrackUnpublishRequest({ + trackHandle: this.ffiHandle.handle, + }), + }, + }); + } +} diff --git a/packages/livekit-rtc/src/data_tracks/remote.ts b/packages/livekit-rtc/src/data_tracks/remote.ts new file mode 100644 index 00000000..f7e00770 --- /dev/null +++ b/packages/livekit-rtc/src/data_tracks/remote.ts @@ -0,0 +1,118 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +import type { + DataTrackStreamEvent, + OwnedDataTrackStream, + OwnedRemoteDataTrack, + SubscribeDataTrackResponse, +} from '@livekit/rtc-ffi-bindings'; +import { + DataTrackStreamReadRequest, + DataTrackSubscribeOptions as ProtoDataTrackSubscribeOptions, + SubscribeDataTrackRequest, +} from '@livekit/rtc-ffi-bindings'; +import type { UnderlyingSource } from 'node:stream/web'; +import { FfiClient, FfiHandle } from '../ffi_client.js'; +import type { DataTrackFrame, DataTrackInfo, DataTrackSubscribeOptions } from './types.js'; + +export class RemoteDataTrack { + info: DataTrackInfo; + publisherIdentity: string; + private ffiHandle: FfiHandle; + + /** @internal */ + constructor(ownedTrack: OwnedRemoteDataTrack) { + this.info = { + sid: ownedTrack.info!.sid!, + name: ownedTrack.info!.name!, + usesE2ee: ownedTrack.info!.usesE2ee!, + }; + this.publisherIdentity = ownedTrack.publisherIdentity!; + this.ffiHandle = new FfiHandle(ownedTrack.handle!.id!); + } + + subscribe(options?: DataTrackSubscribeOptions): ReadableStream { + const opts = new ProtoDataTrackSubscribeOptions({ + bufferSize: options?.bufferSize, + }); + + const res = FfiClient.instance.request({ + message: { + case: 'subscribeDataTrack', + value: new SubscribeDataTrackRequest({ + trackHandle: this.ffiHandle.handle, + options: opts, + }), + }, + }); + + return new ReadableStream( + new DataTrackStreamSource(res.stream!), + options?.bufferSize != null + ? new CountQueuingStrategy({ highWaterMark: options.bufferSize }) + : undefined, + ); + } +} + +class DataTrackStreamSource implements UnderlyingSource { + private ffiHandle: FfiHandle; + private streamHandle: bigint; + private disposed = false; + + constructor(ownedStream: OwnedDataTrackStream) { + this.ffiHandle = new FfiHandle(ownedStream.handle!.id!); + this.streamHandle = ownedStream.handle!.id!; + } + + async pull(controller: ReadableStreamDefaultController): Promise { + FfiClient.instance.request({ + message: { + case: 'dataTrackStreamRead', + value: new DataTrackStreamReadRequest({ + streamHandle: this.streamHandle, + }), + }, + }); + + const event = await FfiClient.instance.waitFor((ev) => { + return ( + ev.message.case === 'dataTrackStreamEvent' && + ev.message.value.streamHandle === this.streamHandle + ); + }); + + switch (event.detail.case) { + case 'frameReceived': { + const protoFrame = event.detail.value.frame!; + controller.enqueue({ + payload: protoFrame.payload!, + userTimestamp: protoFrame.userTimestamp, + }); + break; + } + case 'eos': { + this.dispose(); + if (event.detail.value.error) { + controller.error(new Error(event.detail.value.error)); + } else { + controller.close(); + } + break; + } + } + } + + cancel(): void { + this.dispose(); + } + + private dispose(): void { + if (!this.disposed) { + this.disposed = true; + this.ffiHandle.dispose(); + } + } +} diff --git a/packages/livekit-rtc/src/data_tracks/types.ts b/packages/livekit-rtc/src/data_tracks/types.ts new file mode 100644 index 00000000..c1a067af --- /dev/null +++ b/packages/livekit-rtc/src/data_tracks/types.ts @@ -0,0 +1,36 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export type DataTrackFrame = { + payload: Uint8Array; + userTimestamp?: bigint; +}; + +export type DataTrackInfo = { + sid: string; + name: string; + usesE2ee: boolean; +}; + +export type DataTrackOptions = { + name: string; +}; + +export type DataTrackSubscribeOptions = { + bufferSize?: number; +}; + +export class PublishDataTrackError extends Error { + constructor(message: string) { + super(message); + this.name = 'PublishDataTrackError'; + } +} + +export class DataTrackPushFrameError extends Error { + constructor(message: string) { + super(message); + this.name = 'DataTrackPushFrameError'; + } +} diff --git a/packages/livekit-rtc/src/index.ts b/packages/livekit-rtc/src/index.ts index 6cc9652f..f30415dc 100644 --- a/packages/livekit-rtc/src/index.ts +++ b/packages/livekit-rtc/src/index.ts @@ -10,6 +10,18 @@ export type { NoiseCancellationOptions } from './audio_stream.js'; export { AudioFilter } from './audio_filter.js'; export { AudioMixer, type AudioMixerOptions } from './audio_mixer.js'; export * from './data_streams/index.js'; +export { + LocalDataTrack, + RemoteDataTrack, + PublishDataTrackError, + DataTrackPushFrameError, +} from './data_tracks/index.js'; +export type { + DataTrackFrame, + DataTrackInfo, + DataTrackOptions, + DataTrackSubscribeOptions, +} from './data_tracks/index.js'; export { E2EEManager, FrameCryptor, KeyProvider } from './e2ee.js'; export type { E2EEOptions, KeyProviderOptions } from './e2ee.js'; export { dispose } from './ffi_client.js'; diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index fcff2e81..48f4fe99 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -78,6 +78,17 @@ import { type TextStreamInfo, TextStreamWriter, } from './data_streams/index.js'; +import type { PublishDataTrackCallback, PublishDataTrackResponse } from '@livekit/rtc-ffi-bindings'; +import { + DataTrackOptions as ProtoDataTrackOptions, + PublishDataTrackRequest, +} from '@livekit/rtc-ffi-bindings'; +import { + type DataTrackOptions, + type LocalDataTrack, + PublishDataTrackError, +} from './data_tracks/index.js'; +import { LocalDataTrack as LocalDataTrackImpl } from './data_tracks/index.js'; import { FfiClient, FfiHandle } from './ffi_client.js'; import { log } from './log.js'; import { type PerformRpcParams, RpcError, type RpcInvocationData } from './rpc.js'; @@ -720,6 +731,39 @@ export class LocalParticipant extends Participant { } } + /** + * Publishes a data track. + * + * @returns The published data track. Use {@link LocalDataTrack.tryPush} to send data frames. + * @throws {@link PublishDataTrackError} if there is an error publishing the data track. + */ + async publishDataTrack(options: DataTrackOptions): Promise { + const protoOpts = new ProtoDataTrackOptions({ name: options.name }); + + const res = FfiClient.instance.request({ + message: { + case: 'publishDataTrack', + value: new PublishDataTrackRequest({ + localParticipantHandle: this.ffi_handle.handle, + options: protoOpts, + }), + }, + }); + + const cb = await FfiClient.instance.waitFor((ev) => { + return ev.message.case === 'publishDataTrack' && ev.message.value.asyncId === res.asyncId; + }); + + switch (cb.result.case) { + case 'track': + return new LocalDataTrackImpl(cb.result.value); + case 'error': + throw new PublishDataTrackError(cb.result.value.message!); + default: + throw new PublishDataTrackError('Unknown error publishing data track'); + } + } + /** * Initiate an RPC call to a remote participant. * @param params - Parameters for initiating the RPC call, see {@link PerformRpcParams} diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 6ef6bf63..cd010238 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -43,6 +43,7 @@ import type { LocalTrack, RemoteTrack } from './track.js'; import { RemoteAudioTrack, RemoteVideoTrack } from './track.js'; import type { LocalTrackPublication, TrackPublication } from './track_publication.js'; import { RemoteTrackPublication } from './track_publication.js'; +import { RemoteDataTrack } from './data_tracks/index.js'; import type { ChatMessage } from './types.js'; import { bigIntToNumber } from './utils.js'; @@ -639,6 +640,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter } else if (ev.case === 'tokenRefreshed') { this._token = ev.value.token; this.emit('tokenRefreshed'); + } else if (ev.case === 'dataTrackPublished') { + const remoteDataTrack = new RemoteDataTrack(ev.value.track!); + this.emit(RoomEvent.DataTrackPublished, remoteDataTrack); + } else if (ev.case === 'dataTrackUnpublished') { + this.emit(RoomEvent.DataTrackUnpublished, ev.value.sid!); } }; @@ -861,6 +867,8 @@ export type RoomCallbacks = { roomUpdated: () => void; moved: () => void; tokenRefreshed: () => void; + dataTrackPublished: (track: RemoteDataTrack) => void; + dataTrackUnpublished: (sid: string) => void; }; export enum RoomEvent { @@ -896,4 +904,6 @@ export enum RoomEvent { RoomUpdated = 'roomUpdated', Moved = 'moved', TokenRefreshed = 'tokenRefreshed', + DataTrackPublished = 'dataTrackPublished', + DataTrackUnpublished = 'dataTrackUnpublished', } From c53c03a4858493aa66189629d4f2d07a3fee3b18 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 8 Apr 2026 09:55:55 +0900 Subject: [PATCH 2/8] Doc comments --- packages/livekit-rtc/src/data_tracks/local.ts | 19 +++++++++++++ .../livekit-rtc/src/data_tracks/remote.ts | 16 +++++++++++ packages/livekit-rtc/src/data_tracks/types.ts | 27 +++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/packages/livekit-rtc/src/data_tracks/local.ts b/packages/livekit-rtc/src/data_tracks/local.ts index 83131559..a6e34214 100644 --- a/packages/livekit-rtc/src/data_tracks/local.ts +++ b/packages/livekit-rtc/src/data_tracks/local.ts @@ -17,6 +17,7 @@ import { FfiClient, FfiHandle } from '../ffi_client.js'; import type { DataTrackFrame, DataTrackInfo } from './types.js'; import { DataTrackPushFrameError } from './types.js'; +/** Data track published by the local participant. */ export class LocalDataTrack { private _info: DataTrackInfo; private ffiHandle: FfiHandle; @@ -31,10 +32,12 @@ export class LocalDataTrack { this.ffiHandle = new FfiHandle(ownedTrack.handle!.id!); } + /** Information about the data track. */ get info(): DataTrackInfo { return this._info; } + /** Whether or not the track is still published. */ isPublished(): boolean { const res = FfiClient.instance.request({ message: { @@ -47,6 +50,18 @@ export class LocalDataTrack { return res.isPublished!; } + /** + * Try pushing a frame to subscribers of the track. + * + * See {@link DataTrackFrame} for how to construct a frame and attach metadata. + * + * Pushing a frame can fail for several reasons: + * + * - The track has been unpublished by the local participant or SFU + * - The room is no longer connected + * + * @throws {@link DataTrackPushFrameError} If the push fails. + */ tryPush(frame: DataTrackFrame): void { const protoFrame = new ProtoDataTrackFrame({ payload: frame.payload, @@ -68,6 +83,10 @@ export class LocalDataTrack { } } + /** + * Unpublish the track from the SFU. Once this is called, any further calls to + * {@link tryPush} will fail. + */ async unpublish(): Promise { FfiClient.instance.request({ message: { diff --git a/packages/livekit-rtc/src/data_tracks/remote.ts b/packages/livekit-rtc/src/data_tracks/remote.ts index f7e00770..68b34b4a 100644 --- a/packages/livekit-rtc/src/data_tracks/remote.ts +++ b/packages/livekit-rtc/src/data_tracks/remote.ts @@ -17,8 +17,11 @@ import type { UnderlyingSource } from 'node:stream/web'; import { FfiClient, FfiHandle } from '../ffi_client.js'; import type { DataTrackFrame, DataTrackInfo, DataTrackSubscribeOptions } from './types.js'; +/** Data track published by a remote participant. */ export class RemoteDataTrack { + /** Information about the data track. */ info: DataTrackInfo; + /** Identity of the participant who published the track. */ publisherIdentity: string; private ffiHandle: FfiHandle; @@ -33,6 +36,19 @@ export class RemoteDataTrack { this.ffiHandle = new FfiHandle(ownedTrack.handle!.id!); } + /** + * Subscribes to the data track to receive frames. + * + * Returns a `ReadableStream` that yields {@link DataTrackFrame}s as they arrive. + * + * An application may call `subscribe` more than once to process frames in multiple places. + * Internally, only the first call communicates with the SFU and allocates the resources + * required to receive frames. Additional subscriptions reuse the same underlying pipeline + * and do not trigger additional signaling. + * + * Note that newly created subscriptions only receive frames published after the initial + * subscription is established. + */ subscribe(options?: DataTrackSubscribeOptions): ReadableStream { const opts = new ProtoDataTrackSubscribeOptions({ bufferSize: options?.bufferSize, diff --git a/packages/livekit-rtc/src/data_tracks/types.ts b/packages/livekit-rtc/src/data_tracks/types.ts index c1a067af..c47aa036 100644 --- a/packages/livekit-rtc/src/data_tracks/types.ts +++ b/packages/livekit-rtc/src/data_tracks/types.ts @@ -2,25 +2,43 @@ // // SPDX-License-Identifier: Apache-2.0 +/** A frame published on a data track, consisting of a payload and optional metadata. */ export type DataTrackFrame = { + /** The frame's payload. */ payload: Uint8Array; + /** The frame's user timestamp, if one is associated. */ userTimestamp?: bigint; }; +/** Information about a published data track. */ export type DataTrackInfo = { + /** + * Unique track identifier assigned by the SFU. + * + * This identifier may change if a reconnect occurs. Use {@link DataTrackInfo.name | name} + * if a stable identifier is needed. + */ sid: string; + /** Name of the track assigned by the publisher. */ name: string; + /** Whether or not frames sent on the track use end-to-end encryption. */ usesE2ee: boolean; }; +/** Options for publishing a data track. */ export type DataTrackOptions = { name: string; }; export type DataTrackSubscribeOptions = { + /** + * The number of {@link DataTrackFrame}s to hold in the ReadableStream before discarding + * extra frames. When omitted, the default buffer size is used. + */ bufferSize?: number; }; +/** An error that can occur when publishing a data track. */ export class PublishDataTrackError extends Error { constructor(message: string) { super(message); @@ -28,6 +46,15 @@ export class PublishDataTrackError extends Error { } } +/** + * Frame could not be pushed to a data track. + * + * Pushing a frame can fail for several reasons: + * + * - The track has been unpublished by the local participant or SFU + * - The room is no longer connected + * - Frames are being pushed too fast + */ export class DataTrackPushFrameError extends Error { constructor(message: string) { super(message); From 643c96e77dc9b9cb6b47565f860ebd4d6e4dba26 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 8 Apr 2026 09:59:06 +0900 Subject: [PATCH 3/8] No additional buffering on JS side --- packages/livekit-rtc/src/data_tracks/remote.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/livekit-rtc/src/data_tracks/remote.ts b/packages/livekit-rtc/src/data_tracks/remote.ts index 68b34b4a..2ac0e2bc 100644 --- a/packages/livekit-rtc/src/data_tracks/remote.ts +++ b/packages/livekit-rtc/src/data_tracks/remote.ts @@ -64,12 +64,9 @@ export class RemoteDataTrack { }, }); - return new ReadableStream( - new DataTrackStreamSource(res.stream!), - options?.bufferSize != null - ? new CountQueuingStrategy({ highWaterMark: options.bufferSize }) - : undefined, - ); + return new ReadableStream(new DataTrackStreamSource(res.stream!), { + highWaterMark: 0, // Buffer owned by Rust + }); } } From 198241bd5aedfef122f2a9b816e91d5f0f77626c Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:14:31 +0900 Subject: [PATCH 4/8] Port over example --- examples/data-tracks/.env.example | 6 +++ examples/data-tracks/README.md | 45 +++++++++++++++++ examples/data-tracks/package.json | 23 +++++++++ examples/data-tracks/publisher.ts | 70 +++++++++++++++++++++++++++ examples/data-tracks/subscriber.ts | 57 ++++++++++++++++++++++ pnpm-lock.yaml | 77 ++++++++++++++++++++++++++---- 6 files changed, 270 insertions(+), 8 deletions(-) create mode 100644 examples/data-tracks/.env.example create mode 100644 examples/data-tracks/README.md create mode 100644 examples/data-tracks/package.json create mode 100644 examples/data-tracks/publisher.ts create mode 100644 examples/data-tracks/subscriber.ts diff --git a/examples/data-tracks/.env.example b/examples/data-tracks/.env.example new file mode 100644 index 00000000..99816257 --- /dev/null +++ b/examples/data-tracks/.env.example @@ -0,0 +1,6 @@ +# 1. Copy this file and rename it to .env.local +# 2. Update the environment variables below. + +LIVEKIT_API_KEY=mykey +LIVEKIT_API_SECRET=mysecret +LIVEKIT_URL=wss://myproject.livekit.cloud diff --git a/examples/data-tracks/README.md b/examples/data-tracks/README.md new file mode 100644 index 00000000..6a8d31b7 --- /dev/null +++ b/examples/data-tracks/README.md @@ -0,0 +1,45 @@ +# Data Tracks Example + +This example demonstrates how to publish and subscribe to data tracks in LiveKit. It consists of two scripts: + +- **publisher** — Connects to a room, publishes a data track, and pushes frames at a regular interval. +- **subscriber** — Connects to a room, listens for published data tracks, subscribes, and logs received frames. + +## Prerequisites + +Before running this example, make sure you have: + +1. Node.js installed on your machine. +2. A LiveKit server running (either locally or remotely). +3. LiveKit API key and secret. + +## Setup + +1. Install dependencies: + + ``` + pnpm install + ``` + +2. Create a `.env.local` file in the example directory with your LiveKit credentials: + ``` + LIVEKIT_API_KEY=your_api_key + LIVEKIT_API_SECRET=your_api_secret + LIVEKIT_URL=your_livekit_url + ``` + +## Running the Example + +Start the subscriber in one terminal: + +``` +pnpm run subscriber +``` + +Then start the publisher in another terminal: + +``` +pnpm run publisher +``` + +The subscriber will log received frames and their latency to the terminal. diff --git a/examples/data-tracks/package.json b/examples/data-tracks/package.json new file mode 100644 index 00000000..d7c7fb52 --- /dev/null +++ b/examples/data-tracks/package.json @@ -0,0 +1,23 @@ +{ + "name": "example-data-tracks", + "author": "LiveKit", + "private": true, + "description": "Example of using data tracks in LiveKit", + "type": "module", + "scripts": { + "lint": "eslint -f unix \"**/*.ts\"", + "publisher": "tsx publisher.ts", + "subscriber": "tsx subscriber.ts" + }, + "keywords": [], + "license": "Apache-2.0", + "dependencies": { + "@livekit/rtc-node": "workspace:*", + "dotenv": "^16.4.5", + "livekit-server-sdk": "workspace:*" + }, + "devDependencies": { + "@types/node": "^20.10.4", + "tsx": "^4.7.1" + } +} diff --git a/examples/data-tracks/publisher.ts b/examples/data-tracks/publisher.ts new file mode 100644 index 00000000..218c4d80 --- /dev/null +++ b/examples/data-tracks/publisher.ts @@ -0,0 +1,70 @@ +import { + type DataTrackFrame, + DataTrackPushFrameError, + type LocalDataTrack, + Room, +} from '@livekit/rtc-node'; +import { config } from 'dotenv'; +import { setTimeout } from 'node:timers/promises'; +import { AccessToken } from 'livekit-server-sdk'; + +config({ path: '.env.local', override: false }); +const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY; +const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET; +const LIVEKIT_URL = process.env.LIVEKIT_URL; +if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) { + throw new Error('Missing required environment variables. Please check your .env.local file.'); +} + +async function readSensor(): Promise { + return new Uint8Array(256).fill(0xfa); +} + +async function pushFrames(track: LocalDataTrack) { + while (true) { + console.log('Pushing frame'); + const data = await readSensor(); + try { + const frame: DataTrackFrame = { + payload: data, + userTimestamp: BigInt(Date.now()), + }; + track.tryPush(frame); + } catch (e) { + if (e instanceof DataTrackPushFrameError) { + console.error('Failed to push frame:', e.message); + } else { + throw e; + } + } + await setTimeout(500); + } +} + +const main = async () => { + const roomName = 'data-track-demo'; + const identity = 'publisher'; + const token = await createToken(identity, roomName); + + const room = new Room(); + await room.connect(LIVEKIT_URL, token); + console.log('connected to room', room.name); + + const track = await room.localParticipant.publishDataTrack({ name: 'my_sensor_data' }); + await pushFrames(track); +}; + +const createToken = async (identity: string, roomName: string) => { + const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, { + identity, + }); + token.addGrant({ + room: roomName, + roomJoin: true, + roomCreate: true, + canPublish: true, + }); + return await token.toJwt(); +}; + +main(); diff --git a/examples/data-tracks/subscriber.ts b/examples/data-tracks/subscriber.ts new file mode 100644 index 00000000..583d5dd2 --- /dev/null +++ b/examples/data-tracks/subscriber.ts @@ -0,0 +1,57 @@ +import { type RemoteDataTrack, Room, RoomEvent } from '@livekit/rtc-node'; +import { config } from 'dotenv'; +import { AccessToken } from 'livekit-server-sdk'; + +config({ path: '.env.local', override: false }); +const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY; +const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET; +const LIVEKIT_URL = process.env.LIVEKIT_URL; +if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) { + throw new Error('Missing required environment variables. Please check your .env.local file.'); +} + +async function subscribe(track: RemoteDataTrack) { + console.log( + `Subscribing to '${track.info.name}' published by '${track.publisherIdentity}'`, + ); + for await (const frame of track.subscribe()) { + console.log(`Received frame (${frame.payload.byteLength} bytes)`); + + if (frame.userTimestamp != null) { + const latency = (Date.now() - Number(frame.userTimestamp)) / 1000; + console.log(`Latency: ${latency.toFixed(3)} s`); + } + } +} + +const main = async () => { + const roomName = 'data-track-demo'; + const identity = 'subscriber'; + const token = await createToken(identity, roomName); + + const room = new Room(); + + room.on(RoomEvent.DataTrackPublished, (track: RemoteDataTrack) => { + subscribe(track).catch((e) => { + console.error(`Failed to subscribe to '${track.info.name}':`, e); + }); + }); + + await room.connect(LIVEKIT_URL, token); + console.log('connected to room', room.name); +}; + +const createToken = async (identity: string, roomName: string) => { + const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, { + identity, + }); + token.addGrant({ + room: roomName, + roomJoin: true, + roomCreate: true, + canSubscribe: true, + }); + return await token.toJwt(); +}; + +main(); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0307d4e1..65af2c47 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -110,6 +110,25 @@ importers: specifier: ^4.7.1 version: 4.17.0 + examples/data-tracks: + dependencies: + '@livekit/rtc-node': + specifier: workspace:* + version: link:../../packages/livekit-rtc + dotenv: + specifier: ^16.4.5 + version: 16.4.5 + livekit-server-sdk: + specifier: workspace:* + version: link:../../packages/livekit-server-sdk + devDependencies: + '@types/node': + specifier: ^20.10.4 + version: 20.19.11 + tsx: + specifier: ^4.7.1 + version: 4.17.0 + examples/publish-wav: dependencies: '@livekit/rtc-node': @@ -982,12 +1001,14 @@ packages: engines: {node: '>= 18'} cpu: [arm64] os: [linux] + libc: [glibc] '@livekit/rtc-ffi-bindings-linux-x64-gnu@0.12.52-patch.0': resolution: {integrity: sha512-y1j4ciiCMaUrii0/XYwLFyRBRHDvx4202YCK5ePF3xB+9tW3Fuwexd/z4GuupCpP9eadGkpALCQt60wnLnFDnw==} engines: {node: '>= 18'} cpu: [x64] os: [linux] + libc: [glibc] '@livekit/rtc-ffi-bindings-win32-x64-msvc@0.12.52-patch.0': resolution: {integrity: sha512-a7eoTor7KgN4JDPqZjyBQjgkVIZcxkyP5Iau3O/1qDaYKboLMqSYHfSAk84Un4r0SsSFvxUXXDY3boMLJ7QYow==} @@ -1067,24 +1088,28 @@ packages: engines: {node: '>= 10'} cpu: [arm64] os: [linux] + libc: [glibc] '@next/swc-linux-arm64-musl@12.3.0': resolution: {integrity: sha512-nvNWoUieMjvDjpYJ/4SQe9lQs2xMj6ZRs8N+bmTrVu9leY2Fg3WD6W9p/1uU9hGO8u+OdF13wc4iRShu/WYIHg==} engines: {node: '>= 10'} cpu: [arm64] os: [linux] + libc: [musl] '@next/swc-linux-x64-gnu@12.3.0': resolution: {integrity: sha512-4ajhIuVU9PeQCMMhdDgZTLrHmjbOUFuIyg6J19hZqwEwDTSqQyrSLkbJs2Nd7IRiM6Ul/XyrtEFCpk4k+xD2+w==} engines: {node: '>= 10'} cpu: [x64] os: [linux] + libc: [glibc] '@next/swc-linux-x64-musl@12.3.0': resolution: {integrity: sha512-U092RBYbaGxoMAwpauePJEu2PuZSEoUCGJBvsptQr2/2XIMwAJDYM4c/M5NfYEsBr+yjvsYNsOpYfeQ88D82Yg==} engines: {node: '>= 10'} cpu: [x64] os: [linux] + libc: [musl] '@next/swc-win32-arm64-msvc@12.3.0': resolution: {integrity: sha512-pzSzaxjDEJe67bUok9Nxf9rykbJfHXW0owICFsPBsqHyc+cr8vpF7g9e2APTCddtVhvjkga9ILoZJ9NxWS7Yiw==} @@ -1225,176 +1250,211 @@ packages: resolution: {integrity: sha512-EtP8aquZ0xQg0ETFcxUbU71MZlHaw9MChwrQzatiE8U/bvi5uv/oChExXC4mWhjiqK7azGJBqU0tt5H123SzVA==} cpu: [arm] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm-gnueabihf@4.50.1': resolution: {integrity: sha512-54v4okehwl5TaSIkpp97rAHGp7t3ghinRd/vyC1iXqXMfjYUTm7TfYmCzXDoHUPTTf36L8pr0E7YsD3CfB3ZDg==} cpu: [arm] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm-gnueabihf@4.57.1': resolution: {integrity: sha512-F8sWbhZ7tyuEfsmOxwc2giKDQzN3+kuBLPwwZGyVkLlKGdV1nvnNwYD0fKQ8+XS6hp9nY7B+ZeK01EBUE7aHaw==} cpu: [arm] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm-musleabihf@4.46.2': resolution: {integrity: sha512-qO7F7U3u1nfxYRPM8HqFtLd+raev2K137dsV08q/LRKRLEc7RsiDWihUnrINdsWQxPR9jqZ8DIIZ1zJJAm5PjQ==} cpu: [arm] os: [linux] + libc: [musl] '@rollup/rollup-linux-arm-musleabihf@4.50.1': resolution: {integrity: sha512-p/LaFyajPN/0PUHjv8TNyxLiA7RwmDoVY3flXHPSzqrGcIp/c2FjwPPP5++u87DGHtw+5kSH5bCJz0mvXngYxw==} cpu: [arm] os: [linux] + libc: [musl] '@rollup/rollup-linux-arm-musleabihf@4.57.1': resolution: {integrity: sha512-rGfNUfn0GIeXtBP1wL5MnzSj98+PZe/AXaGBCRmT0ts80lU5CATYGxXukeTX39XBKsxzFpEeK+Mrp9faXOlmrw==} cpu: [arm] os: [linux] + libc: [musl] '@rollup/rollup-linux-arm64-gnu@4.46.2': resolution: {integrity: sha512-3dRaqLfcOXYsfvw5xMrxAk9Lb1f395gkoBYzSFcc/scgRFptRXL9DOaDpMiehf9CO8ZDRJW2z45b6fpU5nwjng==} cpu: [arm64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm64-gnu@4.50.1': resolution: {integrity: sha512-2AbMhFFkTo6Ptna1zO7kAXXDLi7H9fGTbVaIq2AAYO7yzcAsuTNWPHhb2aTA6GPiP+JXh85Y8CiS54iZoj4opw==} cpu: [arm64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm64-gnu@4.57.1': resolution: {integrity: sha512-MMtej3YHWeg/0klK2Qodf3yrNzz6CGjo2UntLvk2RSPlhzgLvYEB3frRvbEF2wRKh1Z2fDIg9KRPe1fawv7C+g==} cpu: [arm64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm64-musl@4.46.2': resolution: {integrity: sha512-fhHFTutA7SM+IrR6lIfiHskxmpmPTJUXpWIsBXpeEwNgZzZZSg/q4i6FU4J8qOGyJ0TR+wXBwx/L7Ho9z0+uDg==} cpu: [arm64] os: [linux] + libc: [musl] '@rollup/rollup-linux-arm64-musl@4.50.1': resolution: {integrity: sha512-Cgef+5aZwuvesQNw9eX7g19FfKX5/pQRIyhoXLCiBOrWopjo7ycfB292TX9MDcDijiuIJlx1IzJz3IoCPfqs9w==} cpu: [arm64] os: [linux] + libc: [musl] '@rollup/rollup-linux-arm64-musl@4.57.1': resolution: {integrity: sha512-1a/qhaaOXhqXGpMFMET9VqwZakkljWHLmZOX48R0I/YLbhdxr1m4gtG1Hq7++VhVUmf+L3sTAf9op4JlhQ5u1Q==} cpu: [arm64] os: [linux] + libc: [musl] '@rollup/rollup-linux-loong64-gnu@4.57.1': resolution: {integrity: sha512-QWO6RQTZ/cqYtJMtxhkRkidoNGXc7ERPbZN7dVW5SdURuLeVU7lwKMpo18XdcmpWYd0qsP1bwKPf7DNSUinhvA==} cpu: [loong64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-loong64-musl@4.57.1': resolution: {integrity: sha512-xpObYIf+8gprgWaPP32xiN5RVTi/s5FCR+XMXSKmhfoJjrpRAjCuuqQXyxUa/eJTdAE6eJ+KDKaoEqjZQxh3Gw==} cpu: [loong64] os: [linux] + libc: [musl] '@rollup/rollup-linux-loongarch64-gnu@4.46.2': resolution: {integrity: sha512-i7wfGFXu8x4+FRqPymzjD+Hyav8l95UIZ773j7J7zRYc3Xsxy2wIn4x+llpunexXe6laaO72iEjeeGyUFmjKeA==} cpu: [loong64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-loongarch64-gnu@4.50.1': resolution: {integrity: sha512-RPhTwWMzpYYrHrJAS7CmpdtHNKtt2Ueo+BlLBjfZEhYBhK00OsEqM08/7f+eohiF6poe0YRDDd8nAvwtE/Y62Q==} cpu: [loong64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-ppc64-gnu@4.46.2': resolution: {integrity: sha512-B/l0dFcHVUnqcGZWKcWBSV2PF01YUt0Rvlurci5P+neqY/yMKchGU8ullZvIv5e8Y1C6wOn+U03mrDylP5q9Yw==} cpu: [ppc64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-ppc64-gnu@4.50.1': resolution: {integrity: sha512-eSGMVQw9iekut62O7eBdbiccRguuDgiPMsw++BVUg+1K7WjZXHOg/YOT9SWMzPZA+w98G+Fa1VqJgHZOHHnY0Q==} cpu: [ppc64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-ppc64-gnu@4.57.1': resolution: {integrity: sha512-4BrCgrpZo4hvzMDKRqEaW1zeecScDCR+2nZ86ATLhAoJ5FQ+lbHVD3ttKe74/c7tNT9c6F2viwB3ufwp01Oh2w==} cpu: [ppc64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-ppc64-musl@4.57.1': resolution: {integrity: sha512-NOlUuzesGauESAyEYFSe3QTUguL+lvrN1HtwEEsU2rOwdUDeTMJdO5dUYl/2hKf9jWydJrO9OL/XSSf65R5+Xw==} cpu: [ppc64] os: [linux] + libc: [musl] '@rollup/rollup-linux-riscv64-gnu@4.46.2': resolution: {integrity: sha512-32k4ENb5ygtkMwPMucAb8MtV8olkPT03oiTxJbgkJa7lJ7dZMr0GCFJlyvy+K8iq7F/iuOr41ZdUHaOiqyR3iQ==} cpu: [riscv64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-gnu@4.50.1': resolution: {integrity: sha512-S208ojx8a4ciIPrLgazF6AgdcNJzQE4+S9rsmOmDJkusvctii+ZvEuIC4v/xFqzbuP8yDjn73oBlNDgF6YGSXQ==} cpu: [riscv64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-gnu@4.57.1': resolution: {integrity: sha512-ptA88htVp0AwUUqhVghwDIKlvJMD/fmL/wrQj99PRHFRAG6Z5nbWoWG4o81Nt9FT+IuqUQi+L31ZKAFeJ5Is+A==} cpu: [riscv64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-musl@4.46.2': resolution: {integrity: sha512-t5B2loThlFEauloaQkZg9gxV05BYeITLvLkWOkRXogP4qHXLkWSbSHKM9S6H1schf/0YGP/qNKtiISlxvfmmZw==} cpu: [riscv64] os: [linux] + libc: [musl] '@rollup/rollup-linux-riscv64-musl@4.50.1': resolution: {integrity: sha512-3Ag8Ls1ggqkGUvSZWYcdgFwriy2lWo+0QlYgEFra/5JGtAd6C5Hw59oojx1DeqcA2Wds2ayRgvJ4qxVTzCHgzg==} cpu: [riscv64] os: [linux] + libc: [musl] '@rollup/rollup-linux-riscv64-musl@4.57.1': resolution: {integrity: sha512-S51t7aMMTNdmAMPpBg7OOsTdn4tySRQvklmL3RpDRyknk87+Sp3xaumlatU+ppQ+5raY7sSTcC2beGgvhENfuw==} cpu: [riscv64] os: [linux] + libc: [musl] '@rollup/rollup-linux-s390x-gnu@4.46.2': resolution: {integrity: sha512-YKjekwTEKgbB7n17gmODSmJVUIvj8CX7q5442/CK80L8nqOUbMtf8b01QkG3jOqyr1rotrAnW6B/qiHwfcuWQA==} cpu: [s390x] os: [linux] + libc: [glibc] '@rollup/rollup-linux-s390x-gnu@4.50.1': resolution: {integrity: sha512-t9YrKfaxCYe7l7ldFERE1BRg/4TATxIg+YieHQ966jwvo7ddHJxPj9cNFWLAzhkVsbBvNA4qTbPVNsZKBO4NSg==} cpu: [s390x] os: [linux] + libc: [glibc] '@rollup/rollup-linux-s390x-gnu@4.57.1': resolution: {integrity: sha512-Bl00OFnVFkL82FHbEqy3k5CUCKH6OEJL54KCyx2oqsmZnFTR8IoNqBF+mjQVcRCT5sB6yOvK8A37LNm/kPJiZg==} cpu: [s390x] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-gnu@4.46.2': resolution: {integrity: sha512-Jj5a9RUoe5ra+MEyERkDKLwTXVu6s3aACP51nkfnK9wJTraCC8IMe3snOfALkrjTYd2G1ViE1hICj0fZ7ALBPA==} cpu: [x64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-gnu@4.50.1': resolution: {integrity: sha512-MCgtFB2+SVNuQmmjHf+wfI4CMxy3Tk8XjA5Z//A0AKD7QXUYFMQcns91K6dEHBvZPCnhJSyDWLApk40Iq/H3tA==} cpu: [x64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-gnu@4.57.1': resolution: {integrity: sha512-ABca4ceT4N+Tv/GtotnWAeXZUZuM/9AQyCyKYyKnpk4yoA7QIAuBt6Hkgpw8kActYlew2mvckXkvx0FfoInnLg==} cpu: [x64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-musl@4.46.2': resolution: {integrity: sha512-7kX69DIrBeD7yNp4A5b81izs8BqoZkCIaxQaOpumcJ1S/kmqNFjPhDu1LHeVXv0SexfHQv5cqHsxLOjETuqDuA==} cpu: [x64] os: [linux] + libc: [musl] '@rollup/rollup-linux-x64-musl@4.50.1': resolution: {integrity: sha512-nEvqG+0jeRmqaUMuwzlfMKwcIVffy/9KGbAGyoa26iu6eSngAYQ512bMXuqqPrlTyfqdlB9FVINs93j534UJrg==} cpu: [x64] os: [linux] + libc: [musl] '@rollup/rollup-linux-x64-musl@4.57.1': resolution: {integrity: sha512-HFps0JeGtuOR2convgRRkHCekD7j+gdAuXM+/i6kGzQtFhlCtQkpwtNzkNj6QhCDp7DRJ7+qC/1Vg2jt5iSOFw==} cpu: [x64] os: [linux] + libc: [musl] '@rollup/rollup-openbsd-x64@4.57.1': resolution: {integrity: sha512-H+hXEv9gdVQuDTgnqD+SQffoWoc0Of59AStSzTEj/feWTBAnSfSD3+Dql1ZruJQxmykT/JVY0dE8Ka7z0DH1hw==} @@ -1701,41 +1761,49 @@ packages: resolution: {integrity: sha512-34gw7PjDGB9JgePJEmhEqBhWvCiiWCuXsL9hYphDF7crW7UgI05gyBAi6MF58uGcMOiOqSJ2ybEeCvHcq0BCmQ==} cpu: [arm64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-arm64-musl@1.11.1': resolution: {integrity: sha512-RyMIx6Uf53hhOtJDIamSbTskA99sPHS96wxVE/bJtePJJtpdKGXO1wY90oRdXuYOGOTuqjT8ACccMc4K6QmT3w==} cpu: [arm64] os: [linux] + libc: [musl] '@unrs/resolver-binding-linux-ppc64-gnu@1.11.1': resolution: {integrity: sha512-D8Vae74A4/a+mZH0FbOkFJL9DSK2R6TFPC9M+jCWYia/q2einCubX10pecpDiTmkJVUH+y8K3BZClycD8nCShA==} cpu: [ppc64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-riscv64-gnu@1.11.1': resolution: {integrity: sha512-frxL4OrzOWVVsOc96+V3aqTIQl1O2TjgExV4EKgRY09AJ9leZpEg8Ak9phadbuX0BA4k8U5qtvMSQQGGmaJqcQ==} cpu: [riscv64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-riscv64-musl@1.11.1': resolution: {integrity: sha512-mJ5vuDaIZ+l/acv01sHoXfpnyrNKOk/3aDoEdLO/Xtn9HuZlDD6jKxHlkN8ZhWyLJsRBxfv9GYM2utQ1SChKew==} cpu: [riscv64] os: [linux] + libc: [musl] '@unrs/resolver-binding-linux-s390x-gnu@1.11.1': resolution: {integrity: sha512-kELo8ebBVtb9sA7rMe1Cph4QHreByhaZ2QEADd9NzIQsYNQpt9UkM9iqr2lhGr5afh885d/cB5QeTXSbZHTYPg==} cpu: [s390x] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-x64-gnu@1.11.1': resolution: {integrity: sha512-C3ZAHugKgovV5YvAMsxhq0gtXuwESUKc5MhEtjBpLoHPLYM+iuwSj3lflFwK3DPm68660rZ7G8BMcwSro7hD5w==} cpu: [x64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-x64-musl@1.11.1': resolution: {integrity: sha512-rV0YSoyhK2nZ4vEswT/QwqzqQXw5I6CjoaYMOX0TqBlWhojUf8P94mvI7nuJTeaCkkds3QE4+zS8Ko+GdXuZtA==} cpu: [x64] os: [linux] + libc: [musl] '@unrs/resolver-binding-wasm32-wasi@1.11.1': resolution: {integrity: sha512-5u4RkfxJm+Ng7IWgkzi3qrFOvLvQYnPBmjmZQ8+szTK/b31fQCnleNl1GgEt7nIsZRIf5PLhPwT0WM+q45x/UQ==} @@ -2534,9 +2602,6 @@ packages: get-tsconfig@4.10.1: resolution: {integrity: sha512-auHyJ4AgMz7vgS8Hp3N6HXSmlMdUyhSUrfBF16w153rxtLIEOE+HGqaBppczZvnHLqQJfiHotCYpNhl0lUROFQ==} - get-tsconfig@4.8.0: - resolution: {integrity: sha512-Pgba6TExTZ0FJAn1qkJAjIeKoDJ3CsI2ChuLohJnZl/tTU8MVrq3b+2t5UOPfRa4RMsorClBjJALkJUMjG1PAw==} - git-repo-info@2.1.1: resolution: {integrity: sha512-8aCohiDo4jwjOwma4FmYFd3i97urZulL8XL24nIPxuE+GZnfsAyy/g2Shqx6OjUiFKUXZM+Yy+KHnOmmA3FVcg==} engines: {node: '>= 4.0'} @@ -6301,10 +6366,6 @@ snapshots: dependencies: resolve-pkg-maps: 1.0.0 - get-tsconfig@4.8.0: - dependencies: - resolve-pkg-maps: 1.0.0 - git-repo-info@2.1.1: {} glob-parent@5.1.2: @@ -7524,7 +7585,7 @@ snapshots: tsx@4.17.0: dependencies: esbuild: 0.23.1 - get-tsconfig: 4.8.0 + get-tsconfig: 4.10.1 optionalDependencies: fsevents: 2.3.3 From c5cf4d3da9068927e35495271186a99e068a5cfd Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:21:51 +0900 Subject: [PATCH 5/8] Link to docs --- examples/data-tracks/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/data-tracks/README.md b/examples/data-tracks/README.md index 6a8d31b7..3f22cb4f 100644 --- a/examples/data-tracks/README.md +++ b/examples/data-tracks/README.md @@ -1,6 +1,6 @@ # Data Tracks Example -This example demonstrates how to publish and subscribe to data tracks in LiveKit. It consists of two scripts: +This example demonstrates how to publish and subscribe to [data tracks](https://docs.livekit.io/transport/data/data-tracks/) in LiveKit. It consists of two scripts: - **publisher** — Connects to a room, publishes a data track, and pushes frames at a regular interval. - **subscriber** — Connects to a room, listens for published data tracks, subscribes, and logs received frames. @@ -22,6 +22,7 @@ Before running this example, make sure you have: ``` 2. Create a `.env.local` file in the example directory with your LiveKit credentials: + ``` LIVEKIT_API_KEY=your_api_key LIVEKIT_API_SECRET=your_api_secret From 14498a8295327f6da945a13c2dda30535a9f3d91 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:25:56 +0900 Subject: [PATCH 6/8] Format --- packages/livekit-rtc/src/data_tracks/local.ts | 3 +-- packages/livekit-rtc/src/data_tracks/remote.ts | 1 - packages/livekit-rtc/src/participant.ts | 10 +++++----- packages/livekit-rtc/src/room.ts | 2 +- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/livekit-rtc/src/data_tracks/local.ts b/packages/livekit-rtc/src/data_tracks/local.ts index a6e34214..6699a47f 100644 --- a/packages/livekit-rtc/src/data_tracks/local.ts +++ b/packages/livekit-rtc/src/data_tracks/local.ts @@ -1,17 +1,16 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 - import type { LocalDataTrackIsPublishedResponse, LocalDataTrackTryPushResponse, OwnedLocalDataTrack, } from '@livekit/rtc-ffi-bindings'; import { - DataTrackFrame as ProtoDataTrackFrame, LocalDataTrackIsPublishedRequest, LocalDataTrackTryPushRequest, LocalDataTrackUnpublishRequest, + DataTrackFrame as ProtoDataTrackFrame, } from '@livekit/rtc-ffi-bindings'; import { FfiClient, FfiHandle } from '../ffi_client.js'; import type { DataTrackFrame, DataTrackInfo } from './types.js'; diff --git a/packages/livekit-rtc/src/data_tracks/remote.ts b/packages/livekit-rtc/src/data_tracks/remote.ts index 2ac0e2bc..a083c11b 100644 --- a/packages/livekit-rtc/src/data_tracks/remote.ts +++ b/packages/livekit-rtc/src/data_tracks/remote.ts @@ -1,7 +1,6 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 - import type { DataTrackStreamEvent, OwnedDataTrackStream, diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index 48f4fe99..79daacb9 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -69,6 +69,11 @@ import { RpcMethodInvocationResponseRequest, UnregisterRpcMethodRequest, } from '@livekit/rtc-ffi-bindings'; +import type { PublishDataTrackCallback, PublishDataTrackResponse } from '@livekit/rtc-ffi-bindings'; +import { + DataTrackOptions as ProtoDataTrackOptions, + PublishDataTrackRequest, +} from '@livekit/rtc-ffi-bindings'; import type { PathLike } from 'node:fs'; import { open, stat } from 'node:fs/promises'; import { @@ -78,11 +83,6 @@ import { type TextStreamInfo, TextStreamWriter, } from './data_streams/index.js'; -import type { PublishDataTrackCallback, PublishDataTrackResponse } from '@livekit/rtc-ffi-bindings'; -import { - DataTrackOptions as ProtoDataTrackOptions, - PublishDataTrackRequest, -} from '@livekit/rtc-ffi-bindings'; import { type DataTrackOptions, type LocalDataTrack, diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index cd010238..de4c402b 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -33,6 +33,7 @@ import type { TextStreamHandler, TextStreamInfo, } from './data_streams/types.js'; +import { RemoteDataTrack } from './data_tracks/index.js'; import type { E2EEOptions } from './e2ee.js'; import { E2EEManager, defaultE2EEOptions } from './e2ee.js'; import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; @@ -43,7 +44,6 @@ import type { LocalTrack, RemoteTrack } from './track.js'; import { RemoteAudioTrack, RemoteVideoTrack } from './track.js'; import type { LocalTrackPublication, TrackPublication } from './track_publication.js'; import { RemoteTrackPublication } from './track_publication.js'; -import { RemoteDataTrack } from './data_tracks/index.js'; import type { ChatMessage } from './types.js'; import { bigIntToNumber } from './utils.js'; From 2fa4c325e1586d51cab1e4adb64a31abd3fe4b26 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 8 Apr 2026 14:08:49 +0900 Subject: [PATCH 7/8] Add E2E test case --- packages/livekit-rtc/src/tests/e2e.test.ts | 83 ++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/packages/livekit-rtc/src/tests/e2e.test.ts b/packages/livekit-rtc/src/tests/e2e.test.ts index ad1d5af2..af807a7c 100644 --- a/packages/livekit-rtc/src/tests/e2e.test.ts +++ b/packages/livekit-rtc/src/tests/e2e.test.ts @@ -12,6 +12,7 @@ import { ConnectionState, LocalAudioTrack, ParticipantKind, + type RemoteDataTrack, Room, RoomEvent, RpcError, @@ -514,4 +515,86 @@ describeE2E('livekit-rtc e2e', () => { }, testTimeoutMs * 2, ); + + it( + 'publishes and subscribes to a data track', + async () => { + const FRAME_COUNT = 5; + const PAYLOAD_SIZE = 64; + const TRACK_NAME = 'test-track'; + + const { rooms } = await connectTestRooms(2); + const [subscriberRoom, publisherRoom] = rooms; + const publisherIdentity = publisherRoom!.localParticipant!.identity; + + const unpublishedEvent = waitForRoomEvent( + subscriberRoom!, + RoomEvent.DataTrackUnpublished, + testTimeoutMs, + (sid: string) => sid, + ); + + const publishedEvent = waitForRoomEvent( + subscriberRoom!, + RoomEvent.DataTrackPublished, + testTimeoutMs, + (track: RemoteDataTrack) => track, + ); + + const localTrack = await publisherRoom!.localParticipant!.publishDataTrack({ + name: TRACK_NAME, + }); + expect(localTrack.info.sid).toBeTruthy(); + expect(localTrack.info.name).toBe(TRACK_NAME); + expect(localTrack.isPublished()).toBe(true); + + const remoteTrack = await publishedEvent; + expect(remoteTrack.info.name).toBe(TRACK_NAME); + expect(remoteTrack.publisherIdentity).toBe(publisherIdentity); + + const stream = remoteTrack.subscribe(); + const reader = stream.getReader(); + + const pushTask = (async () => { + for (let i = 0; i < FRAME_COUNT; i++) { + localTrack.tryPush({ + payload: new Uint8Array(PAYLOAD_SIZE).fill(i), + userTimestamp: BigInt(Date.now()), + }); + await delay(100); + } + await localTrack.unpublish(); + })(); + + const readTask = (async () => { + let recvCount = 0; + while (true) { + const { done, value: frame } = await reader.read(); + if (done) break; + const firstByte = frame.payload[0]!; + expect(frame.payload.every((b) => b === firstByte)).toBe(true); + expect(frame.payload.byteLength).toBe(PAYLOAD_SIZE); + expect(frame.userTimestamp).toBeDefined(); + const latency = (Date.now() - Number(frame.userTimestamp!)) / 1000; + expect(latency).toBeLessThan(5.0); + recvCount++; + } + return recvCount; + })(); + + const recvCount = await withTimeout( + Promise.all([pushTask, readTask]).then(([, count]) => count), + testTimeoutMs, + 'Timed out during data track test', + ); + expect(recvCount).toBeGreaterThan(0); + + const unpublishedSid = await unpublishedEvent; + expect(unpublishedSid).toBe(localTrack.info.sid); + + reader.releaseLock(); + await Promise.all(rooms.map((r) => r.disconnect())); + }, + testTimeoutMs * 2, + ); }); From 6994cc95eb0d2b417cb2e9adf2a130145c006de9 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 9 Apr 2026 14:38:45 +0900 Subject: [PATCH 8/8] Style --- examples/data-tracks/subscriber.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/data-tracks/subscriber.ts b/examples/data-tracks/subscriber.ts index 583d5dd2..539b8360 100644 --- a/examples/data-tracks/subscriber.ts +++ b/examples/data-tracks/subscriber.ts @@ -14,12 +14,13 @@ async function subscribe(track: RemoteDataTrack) { console.log( `Subscribing to '${track.info.name}' published by '${track.publisherIdentity}'`, ); - for await (const frame of track.subscribe()) { + const stream = track.subscribe(); + for await (const frame of stream) { console.log(`Received frame (${frame.payload.byteLength} bytes)`); - if (frame.userTimestamp != null) { - const latency = (Date.now() - Number(frame.userTimestamp)) / 1000; - console.log(`Latency: ${latency.toFixed(3)} s`); + if (frame.userTimestamp) { + const latencyMs = Date.now() - Number(frame.userTimestamp); + console.log(`Latency: ${latencyMs}ms`); } } }