Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/data-tracks/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 1. Copy this file and rename it to .env.local
# 2. Update the environment variables below.

LIVEKIT_API_KEY=mykey
LIVEKIT_API_SECRET=mysecret
LIVEKIT_URL=wss://myproject.livekit.cloud
46 changes: 46 additions & 0 deletions examples/data-tracks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Data Tracks Example

This example demonstrates how to publish and subscribe to [data tracks](https://docs.livekit.io/transport/data/data-tracks/) in LiveKit. It consists of two scripts:

- **publisher** — Connects to a room, publishes a data track, and pushes frames at a regular interval.
- **subscriber** — Connects to a room, listens for published data tracks, subscribes, and logs received frames.

## Prerequisites

Before running this example, make sure you have:

1. Node.js installed on your machine.
2. A LiveKit server running (either locally or remotely).
3. LiveKit API key and secret.

## Setup

1. Install dependencies:

```
pnpm install
```

2. Create a `.env.local` file in the example directory with your LiveKit credentials:

```
LIVEKIT_API_KEY=your_api_key
LIVEKIT_API_SECRET=your_api_secret
LIVEKIT_URL=your_livekit_url
```

## Running the Example

Start the subscriber in one terminal:

```
pnpm run subscriber
```

Then start the publisher in another terminal:

```
pnpm run publisher
```

The subscriber will log received frames and their latency to the terminal.
23 changes: 23 additions & 0 deletions examples/data-tracks/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "example-data-tracks",
"author": "LiveKit",
"private": true,
"description": "Example of using data tracks in LiveKit",
"type": "module",
"scripts": {
"lint": "eslint -f unix \"**/*.ts\"",
"publisher": "tsx publisher.ts",
"subscriber": "tsx subscriber.ts"
},
"keywords": [],
"license": "Apache-2.0",
"dependencies": {
"@livekit/rtc-node": "workspace:*",
"dotenv": "^16.4.5",
"livekit-server-sdk": "workspace:*"
},
"devDependencies": {
"@types/node": "^20.10.4",
"tsx": "^4.7.1"
}
}
70 changes: 70 additions & 0 deletions examples/data-tracks/publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import {
type DataTrackFrame,
DataTrackPushFrameError,
type LocalDataTrack,
Room,
} from '@livekit/rtc-node';
import { config } from 'dotenv';
import { setTimeout } from 'node:timers/promises';
import { AccessToken } from 'livekit-server-sdk';

config({ path: '.env.local', override: false });
const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY;
const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET;
const LIVEKIT_URL = process.env.LIVEKIT_URL;
if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) {
throw new Error('Missing required environment variables. Please check your .env.local file.');
}

async function readSensor(): Promise<Uint8Array> {
return new Uint8Array(256).fill(0xfa);
}

async function pushFrames(track: LocalDataTrack) {
while (true) {
console.log('Pushing frame');
const data = await readSensor();
try {
const frame: DataTrackFrame = {
payload: data,
userTimestamp: BigInt(Date.now()),
};
track.tryPush(frame);
} catch (e) {
if (e instanceof DataTrackPushFrameError) {
console.error('Failed to push frame:', e.message);
} else {
throw e;
}
}
await setTimeout(500);
}
}

const main = async () => {
const roomName = 'data-track-demo';
const identity = 'publisher';
const token = await createToken(identity, roomName);

const room = new Room();
await room.connect(LIVEKIT_URL, token);
console.log('connected to room', room.name);

const track = await room.localParticipant.publishDataTrack({ name: 'my_sensor_data' });
await pushFrames(track);
};

const createToken = async (identity: string, roomName: string) => {
const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
identity,
});
token.addGrant({
room: roomName,
roomJoin: true,
roomCreate: true,
canPublish: true,
});
return await token.toJwt();
};

main();
57 changes: 57 additions & 0 deletions examples/data-tracks/subscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { type RemoteDataTrack, Room, RoomEvent } from '@livekit/rtc-node';
import { config } from 'dotenv';
import { AccessToken } from 'livekit-server-sdk';

config({ path: '.env.local', override: false });
const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY;
const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET;
const LIVEKIT_URL = process.env.LIVEKIT_URL;
if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) {
throw new Error('Missing required environment variables. Please check your .env.local file.');
}

async function subscribe(track: RemoteDataTrack) {
console.log(
`Subscribing to '${track.info.name}' published by '${track.publisherIdentity}'`,
);
for await (const frame of track.subscribe()) {
console.log(`Received frame (${frame.payload.byteLength} bytes)`);

if (frame.userTimestamp != null) {
const latency = (Date.now() - Number(frame.userTimestamp)) / 1000;
console.log(`Latency: ${latency.toFixed(3)} s`);
}
}
}

const main = async () => {
const roomName = 'data-track-demo';
const identity = 'subscriber';
const token = await createToken(identity, roomName);

const room = new Room();

room.on(RoomEvent.DataTrackPublished, (track: RemoteDataTrack) => {
subscribe(track).catch((e) => {
console.error(`Failed to subscribe to '${track.info.name}':`, e);
});
});

await room.connect(LIVEKIT_URL, token);
console.log('connected to room', room.name);
};

const createToken = async (identity: string, roomName: string) => {
const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
identity,
});
token.addGrant({
room: roomName,
roomJoin: true,
roomCreate: true,
canSubscribe: true,
});
return await token.toJwt();
};

main();
15 changes: 15 additions & 0 deletions packages/livekit-rtc/src/data_tracks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0

export {
type DataTrackFrame,
type DataTrackInfo,
type DataTrackOptions,
type DataTrackSubscribeOptions,
PublishDataTrackError,
DataTrackPushFrameError,
} from './types.js';

export { LocalDataTrack } from './local.js';
export { RemoteDataTrack } from './remote.js';
99 changes: 99 additions & 0 deletions packages/livekit-rtc/src/data_tracks/local.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type {
LocalDataTrackIsPublishedResponse,
LocalDataTrackTryPushResponse,
OwnedLocalDataTrack,
} from '@livekit/rtc-ffi-bindings';
import {
LocalDataTrackIsPublishedRequest,
LocalDataTrackTryPushRequest,
LocalDataTrackUnpublishRequest,
DataTrackFrame as ProtoDataTrackFrame,
} from '@livekit/rtc-ffi-bindings';
import { FfiClient, FfiHandle } from '../ffi_client.js';
import type { DataTrackFrame, DataTrackInfo } from './types.js';
import { DataTrackPushFrameError } from './types.js';

/** Data track published by the local participant. */
export class LocalDataTrack {
private _info: DataTrackInfo;
private ffiHandle: FfiHandle;

/** @internal */
constructor(ownedTrack: OwnedLocalDataTrack) {
this._info = {
sid: ownedTrack.info!.sid!,
name: ownedTrack.info!.name!,
usesE2ee: ownedTrack.info!.usesE2ee!,
};
this.ffiHandle = new FfiHandle(ownedTrack.handle!.id!);
}

/** Information about the data track. */
get info(): DataTrackInfo {
return this._info;
}

/** Whether or not the track is still published. */
isPublished(): boolean {
const res = FfiClient.instance.request<LocalDataTrackIsPublishedResponse>({
message: {
case: 'localDataTrackIsPublished',
value: new LocalDataTrackIsPublishedRequest({
trackHandle: this.ffiHandle.handle,
}),
},
});
return res.isPublished!;
}

/**
* Try pushing a frame to subscribers of the track.
*
* See {@link DataTrackFrame} for how to construct a frame and attach metadata.
*
* Pushing a frame can fail for several reasons:
*
* - The track has been unpublished by the local participant or SFU
* - The room is no longer connected
*
* @throws {@link DataTrackPushFrameError} If the push fails.
*/
tryPush(frame: DataTrackFrame): void {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought(non-blocking): I left a similar comment on the JS implementation, it seems a bit counter-intuitive that a try* method can throw.
Although I don't know if there's a good alternative here. If users aren't expected to gain additional insight from the error's message maybe returning a "success" boolean from the method would be sufficient?

const protoFrame = new ProtoDataTrackFrame({
payload: frame.payload,
userTimestamp: frame.userTimestamp,
});

const res = FfiClient.instance.request<LocalDataTrackTryPushResponse>({
message: {
case: 'localDataTrackTryPush',
value: new LocalDataTrackTryPushRequest({
trackHandle: this.ffiHandle.handle,
frame: protoFrame,
}),
},
});

if (res.error) {
throw new DataTrackPushFrameError(res.error.message!);
}
}

/**
* Unpublish the track from the SFU. Once this is called, any further calls to
* {@link tryPush} will fail.
*/
async unpublish(): Promise<void> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is async, do we need to await a dataTrackUnpublished response ?

FfiClient.instance.request({
message: {
case: 'localDataTrackUnpublish',
value: new LocalDataTrackUnpublishRequest({
trackHandle: this.ffiHandle.handle,
}),
},
});
}
}
Loading
Loading