From 3a37ba65b9bf6892ce24476692f1af81efddc4b3 Mon Sep 17 00:00:00 2001 From: David Chen Date: Fri, 19 Jun 2026 12:40:19 -0700 Subject: [PATCH 01/11] add user_data trailer type --- examples/local_video/src/publisher.rs | 4 +- libwebrtc/src/native/packet_trailer.rs | 18 +++- libwebrtc/src/native/video_source.rs | 25 +++-- libwebrtc/src/native/video_stream.rs | 7 +- libwebrtc/src/video_frame.rs | 11 +- livekit-ffi/protocol/track.proto | 1 + livekit-ffi/protocol/video_frame.proto | 1 + livekit-ffi/src/conversion/room.rs | 3 + livekit-ffi/src/conversion/track.rs | 3 + livekit-ffi/src/server/video_source.rs | 15 ++- livekit-ffi/src/server/video_stream.rs | 3 + livekit-protocol/src/livekit.rs | 3 + livekit-protocol/src/livekit.serde.rs | 3 + livekit/src/room/options.rs | 7 +- livekit/tests/packet_trailer_test.rs | 109 ++++++++++++++++++-- webrtc-sys/include/livekit/packet_trailer.h | 27 ++++- webrtc-sys/src/packet_trailer.cpp | 72 +++++++++++-- webrtc-sys/src/packet_trailer.rs | 5 + webrtc-sys/src/video_track.cpp | 4 +- webrtc-sys/src/video_track.rs | 1 + 20 files changed, 279 insertions(+), 43 deletions(-) diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index c9cbb93c2..eead6300e 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -1574,7 +1574,7 @@ async fn run_capture_loop( debug_assert_eq!(burned_timestamp_us, Some(capture_wall_time_us)); } frame.frame_metadata = if user_ts.is_some() || fid.is_some() { - Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid }) + Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data: None }) } else { None }; @@ -1819,7 +1819,7 @@ async fn run_argus_capture_loop( None }; let frame_metadata = if user_ts.is_some() || fid.is_some() { - Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid }) + Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data: None }) } else { None }; diff --git a/libwebrtc/src/native/packet_trailer.rs b/libwebrtc/src/native/packet_trailer.rs index 6ba68a909..476b01a9f 100644 --- a/libwebrtc/src/native/packet_trailer.rs +++ b/libwebrtc/src/native/packet_trailer.rs @@ -159,13 +159,15 @@ impl PacketTrailerHandler { } /// Lookup the frame metadata for a given RTP timestamp (receiver side). - /// Returns `Some((user_timestamp, frame_id))` if found, `None` otherwise. - /// The entry is removed from the map after a successful lookup. - pub fn lookup_frame_metadata(&self, rtp_timestamp: u32) -> Option<(u64, u32)> { + /// Returns `Some((user_timestamp, frame_id, user_data))` if found, + /// `None` otherwise. The entry is removed from the map after a + /// successful lookup. + pub fn lookup_frame_metadata(&self, rtp_timestamp: u32) -> Option<(u64, u32, Vec)> { let ts = self.sys_handle.lookup_timestamp(rtp_timestamp); if ts != u64::MAX { let frame_id = self.sys_handle.last_lookup_frame_id(); - Some((ts, frame_id)) + let user_data = self.sys_handle.last_lookup_user_data(); + Some((ts, frame_id, user_data)) } else { None } @@ -188,8 +190,14 @@ impl PacketTrailerHandler { capture_timestamp_us: i64, user_timestamp: u64, frame_id: u32, + user_data: &[u8], ) { - self.sys_handle.store_frame_metadata(capture_timestamp_us, user_timestamp, frame_id); + self.sys_handle.store_frame_metadata( + capture_timestamp_us, + user_timestamp, + frame_id, + user_data, + ); } pub(crate) fn sys_handle(&self) -> SharedPtr { diff --git a/libwebrtc/src/native/video_source.rs b/libwebrtc/src/native/video_source.rs index 4bbe47fcf..323e61c88 100644 --- a/libwebrtc/src/native/video_source.rs +++ b/libwebrtc/src/native/video_source.rs @@ -89,6 +89,7 @@ impl NativeVideoSource { has_packet_trailer: false, user_timestamp: 0, frame_id: 0, + user_data: Vec::new(), }, ); } @@ -115,9 +116,14 @@ impl NativeVideoSource { }; builder.pin_mut().set_timestamp_us(capture_ts); - let (has_trailer, user_ts, fid) = match frame.frame_metadata { - Some(meta) => (true, meta.user_timestamp.unwrap_or(0), meta.frame_id.unwrap_or(0)), - None => (false, 0, 0), + let (has_trailer, user_ts, fid, user_data) = match &frame.frame_metadata { + Some(meta) => ( + true, + meta.user_timestamp.unwrap_or(0), + meta.frame_id.unwrap_or(0), + meta.user_data.clone().unwrap_or_default(), + ), + None => (false, 0, 0, Vec::new()), }; self.inner.lock().captured_frames += 1; @@ -128,6 +134,7 @@ impl NativeVideoSource { has_packet_trailer: has_trailer, user_timestamp: user_ts, frame_id: fid, + user_data, }, ); } @@ -167,9 +174,14 @@ impl NativeVideoSource { timestamp_us: i64, frame_metadata: Option, ) -> bool { - let (has_trailer, user_ts, fid) = match frame_metadata { - Some(meta) => (true, meta.user_timestamp.unwrap_or(0), meta.frame_id.unwrap_or(0)), - None => (false, 0, 0), + let (has_trailer, user_ts, fid, user_data) = match frame_metadata { + Some(meta) => ( + true, + meta.user_timestamp.unwrap_or(0), + meta.frame_id.unwrap_or(0), + meta.user_data.unwrap_or_default(), + ), + None => (false, 0, 0, Vec::new()), }; self.inner.lock().captured_frames += 1; @@ -183,6 +195,7 @@ impl NativeVideoSource { has_packet_trailer: has_trailer, user_timestamp: user_ts, frame_id: fid, + user_data, }, ) } diff --git a/libwebrtc/src/native/video_stream.rs b/libwebrtc/src/native/video_stream.rs index 9edcc0f2a..1828dcb0d 100644 --- a/libwebrtc/src/native/video_stream.rs +++ b/libwebrtc/src/native/video_stream.rs @@ -112,14 +112,15 @@ impl VideoTrackObserver { ) -> Option { handler .and_then(|handler| { - handler.lookup_frame_metadata(rtp_timestamp).map(|(ts, fid)| { + handler.lookup_frame_metadata(rtp_timestamp).map(|(ts, fid, user_data)| { handler.emit_subscribe_timing(SubscribeTimingStage::DecoderOutput, ts, fid); - (ts, fid) + (ts, fid, user_data) }) }) - .map(|(ts, fid)| FrameMetadata { + .map(|(ts, fid, user_data)| FrameMetadata { user_timestamp: Some(ts), frame_id: if fid != 0 { Some(fid) } else { None }, + user_data: if user_data.is_empty() { None } else { Some(user_data) }, }) } } diff --git a/libwebrtc/src/video_frame.rs b/libwebrtc/src/video_frame.rs index 3f90768a7..1c728c3e9 100644 --- a/libwebrtc/src/video_frame.rs +++ b/libwebrtc/src/video_frame.rs @@ -55,13 +55,20 @@ pub enum VideoBufferType { /// Metadata carried alongside a video frame via the packet trailer mechanism. /// /// Each field corresponds to an independently negotiable packet trailer feature -/// (`PTF_USER_TIMESTAMP`, `PTF_FRAME_ID`), so individual fields are `Option`. -#[derive(Debug, Clone, Copy)] +/// (`PTF_USER_TIMESTAMP`, `PTF_FRAME_ID`, `PTF_USER_DATA`), so individual fields +/// are `Option`. +#[derive(Debug, Clone)] pub struct FrameMetadata { /// Wall-clock capture time in microseconds, when `PTF_USER_TIMESTAMP` is enabled. pub user_timestamp: Option, /// Monotonically increasing frame identifier, when `PTF_FRAME_ID` is enabled. pub frame_id: Option, + /// Arbitrary application-supplied bytes, when `PTF_USER_DATA` is enabled. + /// + /// Bounded by the packet trailer size budget (~232 bytes when the other + /// features are also active); oversize payloads are dropped on the send + /// side rather than truncated. + pub user_data: Option>, } #[derive(Debug)] diff --git a/livekit-ffi/protocol/track.proto b/livekit-ffi/protocol/track.proto index b6eb77551..f952992c8 100644 --- a/livekit-ffi/protocol/track.proto +++ b/livekit-ffi/protocol/track.proto @@ -164,4 +164,5 @@ enum AudioTrackFeature { enum FrameMetadataFeature { FMF_USER_TIMESTAMP = 0; FMF_FRAME_ID = 1; + FMF_USER_DATA = 2; } diff --git a/livekit-ffi/protocol/video_frame.proto b/livekit-ffi/protocol/video_frame.proto index ff91fa3c6..a4b610e0e 100644 --- a/livekit-ffi/protocol/video_frame.proto +++ b/livekit-ffi/protocol/video_frame.proto @@ -157,6 +157,7 @@ message OwnedVideoBuffer { message FrameMetadata { optional uint64 user_timestamp = 1; optional uint32 frame_id = 2; + optional bytes user_data = 3; } // diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index ffd41e0ad..a42de2342 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -44,6 +44,9 @@ fn frame_metadata_features_from_proto(features: Vec) -> FrameMetadataFeatur proto::FrameMetadataFeature::FmfFrameId => { frame_metadata_features.frame_id = true; } + proto::FrameMetadataFeature::FmfUserData => { + frame_metadata_features.user_data = true; + } } } diff --git a/livekit-ffi/src/conversion/track.rs b/livekit-ffi/src/conversion/track.rs index 47fd64471..53acc046a 100644 --- a/livekit-ffi/src/conversion/track.rs +++ b/livekit-ffi/src/conversion/track.rs @@ -173,6 +173,9 @@ impl From for proto::FrameMetadataFeatur livekit_protocol::PacketTrailerFeature::PtfFrameId => { proto::FrameMetadataFeature::FmfFrameId } + livekit_protocol::PacketTrailerFeature::PtfUserData => { + proto::FrameMetadataFeature::FmfUserData + } } } } diff --git a/livekit-ffi/src/server/video_source.rs b/livekit-ffi/src/server/video_source.rs index 047443728..ee1410a72 100644 --- a/livekit-ffi/src/server/video_source.rs +++ b/livekit-ffi/src/server/video_source.rs @@ -29,11 +29,16 @@ impl FfiHandle for FfiVideoSource {} fn frame_metadata_from_proto(metadata: Option) -> Option { let metadata = metadata?; - let frame_metadata = - FrameMetadata { user_timestamp: metadata.user_timestamp, frame_id: metadata.frame_id }; + let frame_metadata = FrameMetadata { + user_timestamp: metadata.user_timestamp, + frame_id: metadata.frame_id, + user_data: metadata.user_data, + }; - (frame_metadata.user_timestamp.is_some() || frame_metadata.frame_id.is_some()) - .then_some(frame_metadata) + (frame_metadata.user_timestamp.is_some() + || frame_metadata.frame_id.is_some() + || frame_metadata.user_data.is_some()) + .then_some(frame_metadata) } impl FfiVideoSource { @@ -106,10 +111,12 @@ mod tests { let metadata = frame_metadata_from_proto(Some(proto::FrameMetadata { user_timestamp: Some(123), frame_id: Some(456), + user_data: Some(vec![7, 8, 9]), })) .unwrap(); assert_eq!(metadata.user_timestamp, Some(123)); assert_eq!(metadata.frame_id, Some(456)); + assert_eq!(metadata.user_data, Some(vec![7, 8, 9])); } } diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 5b8453568..2cc69ae7f 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -41,6 +41,7 @@ fn frame_metadata_to_proto(metadata: Option) -> Option "PTF_USER_TIMESTAMP", PacketTrailerFeature::PtfFrameId => "PTF_FRAME_ID", + PacketTrailerFeature::PtfUserData => "PTF_USER_DATA", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -2114,6 +2116,7 @@ impl PacketTrailerFeature { match value { "PTF_USER_TIMESTAMP" => Some(Self::PtfUserTimestamp), "PTF_FRAME_ID" => Some(Self::PtfFrameId), + "PTF_USER_DATA" => Some(Self::PtfUserData), _ => None, } } diff --git a/livekit-protocol/src/livekit.serde.rs b/livekit-protocol/src/livekit.serde.rs index c80ca9821..1a3c60d91 100644 --- a/livekit-protocol/src/livekit.serde.rs +++ b/livekit-protocol/src/livekit.serde.rs @@ -26889,6 +26889,7 @@ impl serde::Serialize for PacketTrailerFeature { let variant = match self { Self::PtfUserTimestamp => "PTF_USER_TIMESTAMP", Self::PtfFrameId => "PTF_FRAME_ID", + Self::PtfUserData => "PTF_USER_DATA", }; serializer.serialize_str(variant) } @@ -26902,6 +26903,7 @@ impl<'de> serde::Deserialize<'de> for PacketTrailerFeature { const FIELDS: &[&str] = &[ "PTF_USER_TIMESTAMP", "PTF_FRAME_ID", + "PTF_USER_DATA", ]; struct GeneratedVisitor; @@ -26944,6 +26946,7 @@ impl<'de> serde::Deserialize<'de> for PacketTrailerFeature { match value { "PTF_USER_TIMESTAMP" => Ok(PacketTrailerFeature::PtfUserTimestamp), "PTF_FRAME_ID" => Ok(PacketTrailerFeature::PtfFrameId), + "PTF_USER_DATA" => Ok(PacketTrailerFeature::PtfUserData), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/livekit/src/room/options.rs b/livekit/src/room/options.rs index 9cfb4ecda..c36ae1725 100644 --- a/livekit/src/room/options.rs +++ b/livekit/src/room/options.rs @@ -77,11 +77,12 @@ pub struct AudioPreset { pub struct FrameMetadataFeatures { pub user_timestamp: bool, pub frame_id: bool, + pub user_data: bool, } impl FrameMetadataFeatures { pub(crate) fn is_empty(&self) -> bool { - !self.user_timestamp && !self.frame_id + !self.user_timestamp && !self.frame_id && !self.user_data } pub(crate) fn to_proto(&self) -> Vec { @@ -95,6 +96,10 @@ impl FrameMetadataFeatures { features.push(proto::PacketTrailerFeature::PtfFrameId); } + if self.user_data { + features.push(proto::PacketTrailerFeature::PtfUserData); + } + features } } diff --git a/livekit/tests/packet_trailer_test.rs b/livekit/tests/packet_trailer_test.rs index cb1b05c8c..eab9f7d77 100644 --- a/livekit/tests/packet_trailer_test.rs +++ b/livekit/tests/packet_trailer_test.rs @@ -14,9 +14,10 @@ //! Packet Trailer E2E Tests //! -//! These tests verify that user_timestamp and frame_id metadata survives the -//! full publish → SFU → subscribe WebRTC pipeline via the packet trailer -//! mechanism, both with and without E2EE. +//! These tests verify that user_timestamp, frame_id, and user_data metadata +//! survives the full publish → SFU → subscribe WebRTC pipeline via the packet +//! trailer mechanism, both with and without E2EE. They also verify that +//! oversize user_data is dropped (not truncated) on the send side. //! //! Run all tests (use --test-threads=1 to avoid local server flakiness): //! livekit-server --dev --node-ip 127.0.0.1 @@ -54,6 +55,13 @@ const TEST_HEIGHT: u32 = 480; struct PacketTrailerTestParams { attach_timestamp: bool, attach_frame_id: bool, + /// Bytes the publisher attaches to each frame. `None` means the + /// `user_data` feature is disabled and no bytes are sent. + user_data: Option>, + /// What the subscriber is expected to observe. `None` means the + /// receiver should see no `user_data` (e.g. the payload was dropped + /// because it exceeded the trailer budget). + expect_user_data: Option>, e2ee: bool, codec: VideoCodec, } @@ -65,6 +73,8 @@ async fn test_timestamp_only_vp8() -> Result<()> { run_packet_trailer_test(PacketTrailerTestParams { attach_timestamp: true, attach_frame_id: false, + user_data: None, + expect_user_data: None, e2ee: false, codec: VideoCodec::VP8, }) @@ -76,6 +86,8 @@ async fn test_frame_id_only_vp8() -> Result<()> { run_packet_trailer_test(PacketTrailerTestParams { attach_timestamp: false, attach_frame_id: true, + user_data: None, + expect_user_data: None, e2ee: false, codec: VideoCodec::VP8, }) @@ -87,6 +99,8 @@ async fn test_timestamp_and_frame_id_vp8() -> Result<()> { run_packet_trailer_test(PacketTrailerTestParams { attach_timestamp: true, attach_frame_id: true, + user_data: None, + expect_user_data: None, e2ee: false, codec: VideoCodec::VP8, }) @@ -98,12 +112,72 @@ async fn test_timestamp_and_frame_id_vp8_e2ee() -> Result<()> { run_packet_trailer_test(PacketTrailerTestParams { attach_timestamp: true, attach_frame_id: true, + user_data: None, + expect_user_data: None, e2ee: true, codec: VideoCodec::VP8, }) .await } +#[test_log::test(tokio::test)] +async fn test_user_data_only_vp8() -> Result<()> { + let payload = b"livekit-user-data".to_vec(); + run_packet_trailer_test(PacketTrailerTestParams { + attach_timestamp: false, + attach_frame_id: false, + user_data: Some(payload.clone()), + expect_user_data: Some(payload), + e2ee: false, + codec: VideoCodec::VP8, + }) + .await +} + +#[test_log::test(tokio::test)] +async fn test_user_data_with_timestamp_and_frame_id_vp8() -> Result<()> { + let payload = b"all-three-features".to_vec(); + run_packet_trailer_test(PacketTrailerTestParams { + attach_timestamp: true, + attach_frame_id: true, + user_data: Some(payload.clone()), + expect_user_data: Some(payload), + e2ee: false, + codec: VideoCodec::VP8, + }) + .await +} + +#[test_log::test(tokio::test)] +async fn test_user_data_vp8_e2ee() -> Result<()> { + let payload = b"encrypted-user-data".to_vec(); + run_packet_trailer_test(PacketTrailerTestParams { + attach_timestamp: true, + attach_frame_id: false, + user_data: Some(payload.clone()), + expect_user_data: Some(payload), + e2ee: true, + codec: VideoCodec::VP8, + }) + .await +} + +/// user_data that exceeds the remaining trailer budget must be dropped on the +/// send side (skip + warn), not truncated. The timestamp TLV is always present +/// so frames still carry metadata; user_data should simply be absent. +#[test_log::test(tokio::test)] +async fn test_user_data_oversize_dropped_vp8() -> Result<()> { + run_packet_trailer_test(PacketTrailerTestParams { + attach_timestamp: false, + attach_frame_id: false, + user_data: Some(vec![0xAB; 250]), + expect_user_data: None, + e2ee: false, + codec: VideoCodec::VP8, + }) + .await +} + // ==================== Implementation ==================== /// Publishes solid-color video frames with packet trailer metadata (user_timestamp @@ -142,6 +216,7 @@ async fn run_packet_trailer_test(params: PacketTrailerTestParams) -> Result<()> let mut frame_metadata_features = FrameMetadataFeatures::default(); frame_metadata_features.user_timestamp = params.attach_timestamp; frame_metadata_features.frame_id = params.attach_frame_id; + frame_metadata_features.user_data = params.user_data.is_some(); let rtc_source = NativeVideoSource::new(VideoResolution { width: TEST_WIDTH, height: TEST_HEIGHT }, false); @@ -168,8 +243,9 @@ async fn run_packet_trailer_test(params: PacketTrailerTestParams) -> Result<()> let rtc_source = rtc_source.clone(); let attach_ts = params.attach_timestamp; let attach_fid = params.attach_frame_id; + let user_data = params.user_data.clone(); async move { - publish_frames(stop_rx, rtc_source, attach_ts, attach_fid).await; + publish_frames(stop_rx, rtc_source, attach_ts, attach_fid, user_data).await; } }); @@ -191,6 +267,7 @@ async fn run_packet_trailer_test(params: PacketTrailerTestParams) -> Result<()> let mut stream = NativeVideoStream::new(remote_track.rtc_track()); let attach_ts = params.attach_timestamp; let attach_fid = params.attach_frame_id; + let expect_user_data = params.expect_user_data.clone(); let validate = async { let mut validated = 0; @@ -223,6 +300,21 @@ async fn run_packet_trailer_test(params: PacketTrailerTestParams) -> Result<()> seen_frame_ids.push(fid); } + match &expect_user_data { + Some(expected) => { + let got = + meta.user_data.as_ref().expect("Expected user_data in frame metadata"); + assert_eq!(got, expected, "user_data should round-trip unchanged"); + } + None => { + assert!( + meta.user_data.is_none(), + "Expected no user_data, got {:?}", + meta.user_data + ); + } + } + validated += 1; if validated >= FRAMES_TO_VALIDATE { break; @@ -276,6 +368,7 @@ async fn publish_frames( rtc_source: NativeVideoSource, attach_timestamp: bool, attach_frame_id: bool, + user_data: Option>, ) { use std::time::{SystemTime, UNIX_EPOCH}; @@ -307,8 +400,12 @@ async fn publish_frames( None }; - let frame_metadata = if user_ts.is_some() || fid.is_some() { - Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid }) + let frame_metadata = if user_ts.is_some() || fid.is_some() || user_data.is_some() { + Some(FrameMetadata { + user_timestamp: user_ts, + frame_id: fid, + user_data: user_data.clone(), + }) } else { None }; diff --git a/webrtc-sys/include/livekit/packet_trailer.h b/webrtc-sys/include/livekit/packet_trailer.h index 5b30701eb..efab8e41d 100644 --- a/webrtc-sys/include/livekit/packet_trailer.h +++ b/webrtc-sys/include/livekit/packet_trailer.h @@ -62,11 +62,20 @@ constexpr size_t kTrailerEnvelopeSize = 5; // TLV tag IDs constexpr uint8_t kTagTimestampUs = 0x01; // value: 8 bytes big-endian uint64 constexpr uint8_t kTagFrameId = 0x02; // value: 4 bytes big-endian uint32 +constexpr uint8_t kTagUserData = 0x03; // value: arbitrary bytes (len <= 255) constexpr size_t kTimestampTlvSize = 10; // tag + len + 8-byte value constexpr size_t kFrameIdTlvSize = 6; // tag + len + 4-byte value +constexpr size_t kUserDataTlvHeaderSize = 2; // tag + len, before value bytes -// Trailer size varies because frame_id is omitted when it is unset (0). +// The trailer length is encoded in a single XORed byte, so the entire +// trailer (TLV region + envelope) can never exceed 255 bytes. user_data +// that would push the trailer past this budget is dropped (see +// AppendTrailer), never truncated. +constexpr size_t kPacketTrailerMaxTotal = 255; + +// Fixed-feature trailer size varies because frame_id is omitted when it is +// unset (0). user_data is variable-length and accounted for separately. constexpr size_t kPacketTrailerMinSize = kTimestampTlvSize + kTrailerEnvelopeSize; constexpr size_t kPacketTrailerMaxSize = @@ -76,6 +85,7 @@ struct PacketTrailerMetadata { uint64_t user_timestamp; uint32_t frame_id; uint32_t ssrc; // SSRC that produced this entry (for simulcast tracking) + std::vector user_data; // arbitrary app-supplied bytes (PTF_USER_DATA) }; /// Frame transformer that appends/extracts packet trailers. @@ -121,7 +131,8 @@ class PacketTrailerTransformer : public webrtc::FrameTransformerInterface { /// in the encoder pipeline. void store_frame_metadata(int64_t capture_timestamp_us, uint64_t user_timestamp, - uint32_t frame_id); + uint32_t frame_id, + rust::Slice user_data); /// Set the observer receiving sender-side publish timing events. void set_publish_timing_observer( @@ -168,7 +179,8 @@ class PacketTrailerTransformer : public webrtc::FrameTransformerInterface { std::vector AppendTrailer( webrtc::ArrayView data, uint64_t user_timestamp, - uint32_t frame_id); + uint32_t frame_id, + const std::vector& user_data); /// Extract and remove frame metadata trailer from frame data std::optional ExtractTrailer( @@ -237,10 +249,16 @@ class PacketTrailerHandler { /// lookup_timestamp() call. Returns 0 if no lookup succeeded. uint32_t last_lookup_frame_id() const; + /// Returns the user_data from the most recent successful + /// lookup_timestamp() call. Returns an empty vector if no lookup + /// succeeded or the frame carried no user_data. + rust::Vec last_lookup_user_data() const; + /// Store frame metadata for a given capture timestamp (sender side). void store_frame_metadata(int64_t capture_timestamp_us, uint64_t user_timestamp, - uint32_t frame_id) const; + uint32_t frame_id, + rust::Slice user_data) const; /// Set the observer receiving sender-side publish timing events. void set_publish_timing_observer( @@ -275,6 +293,7 @@ class PacketTrailerHandler { webrtc::scoped_refptr sender_; webrtc::scoped_refptr receiver_; mutable uint32_t last_frame_id_{0}; + mutable std::vector last_user_data_; }; // Factory functions for Rust FFI diff --git a/webrtc-sys/src/packet_trailer.cpp b/webrtc-sys/src/packet_trailer.cpp index 96ff6418f..549d93b1a 100644 --- a/webrtc-sys/src/packet_trailer.cpp +++ b/webrtc-sys/src/packet_trailer.cpp @@ -97,7 +97,7 @@ void PacketTrailerTransformer::TransformSend( std::vector new_data; if (enabled_.load()) { new_data = AppendTrailer(data, meta_to_embed.user_timestamp, - meta_to_embed.frame_id); + meta_to_embed.frame_id, meta_to_embed.user_data); frame->SetData(webrtc::ArrayView(new_data)); } @@ -238,11 +238,35 @@ void PacketTrailerTransformer::TransformReceive( std::vector PacketTrailerTransformer::AppendTrailer( webrtc::ArrayView data, uint64_t user_timestamp, - uint32_t frame_id) { + uint32_t frame_id, + const std::vector& user_data) { const bool has_frame_id = frame_id != 0; - const size_t trailer_len = kTimestampTlvSize + - (has_frame_id ? kFrameIdTlvSize : 0) + - kTrailerEnvelopeSize; + const size_t fixed_len = kTimestampTlvSize + + (has_frame_id ? kFrameIdTlvSize : 0) + + kTrailerEnvelopeSize; + + // user_data is embedded only if it fits the remaining trailer budget. + // The trailer length is a single byte (max 255 total) and the TLV length + // field is also a single byte; oversize user_data is dropped + logged + // rather than truncated, so the frame is never silently corrupted. + bool embed_user_data = false; + if (!user_data.empty()) { + if (user_data.size() <= 255 && + fixed_len + kUserDataTlvHeaderSize + user_data.size() <= + kPacketTrailerMaxTotal) { + embed_user_data = true; + } else { + RTC_LOG(LS_WARNING) + << "PacketTrailerTransformer::AppendTrailer dropping user_data: " + << user_data.size() << " bytes exceeds remaining trailer budget (" + << (kPacketTrailerMaxTotal - fixed_len - kUserDataTlvHeaderSize) + << " bytes)"; + } + } + + const size_t trailer_len = + fixed_len + + (embed_user_data ? kUserDataTlvHeaderSize + user_data.size() : 0); std::vector result; result.reserve(data.size() + trailer_len); @@ -270,6 +294,15 @@ std::vector PacketTrailerTransformer::AppendTrailer( } } + if (embed_user_data) { + // TLV: user_data (tag=0x03, len=N, N arbitrary bytes) + result.push_back(kTagUserData ^ 0xFF); + result.push_back(static_cast(user_data.size()) ^ 0xFF); + for (uint8_t byte : user_data) { + result.push_back(static_cast(byte ^ 0xFF)); + } + } + // Envelope: trailer_len (1B, XORed) + magic (4B, NOT XORed) result.push_back(static_cast(trailer_len ^ 0xFF)); result.insert(result.end(), std::begin(kPacketTrailerMagic), @@ -333,6 +366,12 @@ std::optional PacketTrailerTransformer::ExtractTrailer( } meta.frame_id = fid; found_any = true; + } else if (tag == kTagUserData) { + meta.user_data.resize(len); + for (uint8_t i = 0; i < len; ++i) { + meta.user_data[i] = static_cast(val[i] ^ 0xFF); + } + found_any = true; } // Unknown tags are silently skipped. @@ -401,7 +440,8 @@ std::optional PacketTrailerTransformer::lookup_frame_meta void PacketTrailerTransformer::store_frame_metadata( int64_t capture_timestamp_us, uint64_t user_timestamp, - uint32_t frame_id) { + uint32_t frame_id, + rust::Slice user_data) { // Truncate to millisecond precision to match what WebRTC stores // internally. The encoder pipeline converts the VideoFrame's // timestamp_us to capture_time_ms_ = timestamp_us / 1000, and @@ -426,7 +466,9 @@ void PacketTrailerTransformer::store_frame_metadata( if (send_map_.find(key) == send_map_.end()) { send_map_order_.push_back(key); } - send_map_[key] = PacketTrailerMetadata{user_timestamp, frame_id, 0}; + send_map_[key] = PacketTrailerMetadata{ + user_timestamp, frame_id, 0, + std::vector(user_data.begin(), user_data.end())}; } void PacketTrailerTransformer::set_publish_timing_observer( @@ -554,6 +596,7 @@ uint64_t PacketTrailerHandler::lookup_timestamp(uint32_t rtp_timestamp) const { auto meta = transformer_->lookup_frame_metadata(rtp_timestamp); if (meta.has_value()) { last_frame_id_ = meta->frame_id; + last_user_data_ = meta->user_data; return meta->user_timestamp; } return UINT64_MAX; @@ -563,11 +606,22 @@ uint32_t PacketTrailerHandler::last_lookup_frame_id() const { return last_frame_id_; } +rust::Vec PacketTrailerHandler::last_lookup_user_data() const { + rust::Vec out; + out.reserve(last_user_data_.size()); + for (uint8_t byte : last_user_data_) { + out.push_back(byte); + } + return out; +} + void PacketTrailerHandler::store_frame_metadata( int64_t capture_timestamp_us, uint64_t user_timestamp, - uint32_t frame_id) const { - transformer_->store_frame_metadata(capture_timestamp_us, user_timestamp, frame_id); + uint32_t frame_id, + rust::Slice user_data) const { + transformer_->store_frame_metadata(capture_timestamp_us, user_timestamp, + frame_id, user_data); } void PacketTrailerHandler::set_publish_timing_observer( diff --git a/webrtc-sys/src/packet_trailer.rs b/webrtc-sys/src/packet_trailer.rs index 3a3394415..ceabb5fa8 100644 --- a/webrtc-sys/src/packet_trailer.rs +++ b/webrtc-sys/src/packet_trailer.rs @@ -82,12 +82,17 @@ pub mod ffi { /// lookup_timestamp() call. fn last_lookup_frame_id(self: &PacketTrailerHandler) -> u32; + /// Returns the user_data from the most recent successful + /// lookup_timestamp() call. Empty if none. + fn last_lookup_user_data(self: &PacketTrailerHandler) -> Vec; + /// Store frame metadata for a given capture timestamp (sender side). fn store_frame_metadata( self: &PacketTrailerHandler, capture_timestamp_us: i64, user_timestamp: u64, frame_id: u32, + user_data: &[u8], ); /// Set a callback for sender-side publish timing events. diff --git a/webrtc-sys/src/video_track.cpp b/webrtc-sys/src/video_track.cpp index 4a25ab1ef..44d1351fa 100644 --- a/webrtc-sys/src/video_track.cpp +++ b/webrtc-sys/src/video_track.cpp @@ -151,7 +151,9 @@ bool VideoTrackSource::InternalSource::on_captured_frame( if (frame_metadata.has_packet_trailer && packet_trailer_handler_) { packet_trailer_handler_->store_frame_metadata( aligned_timestamp_us, frame_metadata.user_timestamp, - frame_metadata.frame_id); + frame_metadata.frame_id, + rust::Slice(frame_metadata.user_data.data(), + frame_metadata.user_data.size())); } webrtc::scoped_refptr buffer = diff --git a/webrtc-sys/src/video_track.rs b/webrtc-sys/src/video_track.rs index 9453d5766..6c0a584e9 100644 --- a/webrtc-sys/src/video_track.rs +++ b/webrtc-sys/src/video_track.rs @@ -47,6 +47,7 @@ pub mod ffi { pub has_packet_trailer: bool, pub user_timestamp: u64, pub frame_id: u32, + pub user_data: Vec, } extern "C++" { From 6bf5b383c25285163ea7381e794284bd8798df06 Mon Sep 17 00:00:00 2001 From: David Chen Date: Fri, 19 Jun 2026 14:40:46 -0700 Subject: [PATCH 02/11] fix max length --- webrtc-sys/include/livekit/packet_trailer.h | 4 ++- webrtc-sys/src/packet_trailer.cpp | 34 +++++++++++---------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/webrtc-sys/include/livekit/packet_trailer.h b/webrtc-sys/include/livekit/packet_trailer.h index efab8e41d..0515aee8d 100644 --- a/webrtc-sys/include/livekit/packet_trailer.h +++ b/webrtc-sys/include/livekit/packet_trailer.h @@ -62,7 +62,9 @@ constexpr size_t kTrailerEnvelopeSize = 5; // TLV tag IDs constexpr uint8_t kTagTimestampUs = 0x01; // value: 8 bytes big-endian uint64 constexpr uint8_t kTagFrameId = 0x02; // value: 4 bytes big-endian uint32 -constexpr uint8_t kTagUserData = 0x03; // value: arbitrary bytes (len <= 255) +constexpr uint8_t kTagUserData = 0x03; // value: arbitrary bytes, bounded + // by the remaining trailer budget + // (255 - fixed TLVs - envelope - 2) constexpr size_t kTimestampTlvSize = 10; // tag + len + 8-byte value constexpr size_t kFrameIdTlvSize = 6; // tag + len + 4-byte value diff --git a/webrtc-sys/src/packet_trailer.cpp b/webrtc-sys/src/packet_trailer.cpp index 549d93b1a..e179ae308 100644 --- a/webrtc-sys/src/packet_trailer.cpp +++ b/webrtc-sys/src/packet_trailer.cpp @@ -246,22 +246,24 @@ std::vector PacketTrailerTransformer::AppendTrailer( kTrailerEnvelopeSize; // user_data is embedded only if it fits the remaining trailer budget. - // The trailer length is a single byte (max 255 total) and the TLV length - // field is also a single byte; oversize user_data is dropped + logged - // rather than truncated, so the frame is never silently corrupted. - bool embed_user_data = false; - if (!user_data.empty()) { - if (user_data.size() <= 255 && - fixed_len + kUserDataTlvHeaderSize + user_data.size() <= - kPacketTrailerMaxTotal) { - embed_user_data = true; - } else { - RTC_LOG(LS_WARNING) - << "PacketTrailerTransformer::AppendTrailer dropping user_data: " - << user_data.size() << " bytes exceeds remaining trailer budget (" - << (kPacketTrailerMaxTotal - fixed_len - kUserDataTlvHeaderSize) - << " bytes)"; - } + // The whole trailer length is a single byte (255 max), so after the + // always-present timestamp TLV, the optional frame_id TLV, the envelope + // and this TLV's own 2-byte header, the value can never approach 255 -- + // the real cap is (255 - fixed_len - 2), at most ~238 bytes. Oversize + // user_data is dropped + logged rather than truncated, so the frame is + // never silently corrupted. (This bound is always < 256, so the 1-byte + // TLV length field below can't overflow.) + const size_t user_data_budget = + kPacketTrailerMaxTotal > fixed_len + kUserDataTlvHeaderSize + ? kPacketTrailerMaxTotal - fixed_len - kUserDataTlvHeaderSize + : 0; + const bool embed_user_data = + !user_data.empty() && user_data.size() <= user_data_budget; + if (!user_data.empty() && !embed_user_data) { + RTC_LOG(LS_WARNING) + << "PacketTrailerTransformer::AppendTrailer dropping user_data: " + << user_data.size() << " bytes exceeds remaining trailer budget (" + << user_data_budget << " bytes)"; } const size_t trailer_len = From 87ea8d7e73448836858188f5499306ed8fa2c197 Mon Sep 17 00:00:00 2001 From: David Chen Date: Fri, 19 Jun 2026 22:38:32 -0700 Subject: [PATCH 03/11] update local video example to publish user_data --- examples/local_video/src/publisher.rs | 33 ++++++- examples/local_video/src/subscriber.rs | 111 +++++++++++++++++++++- examples/local_video/src/user_data.rs | 81 ++++++++++++++++ examples/local_video/src/video_display.rs | 82 ++++++++++++++++ 4 files changed, 298 insertions(+), 9 deletions(-) create mode 100644 examples/local_video/src/user_data.rs diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index eead6300e..5f85660df 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -33,6 +33,7 @@ use yuv_sys; mod argus; mod codec_display; mod test_pattern; +mod user_data; mod timestamp_burn; mod video_display; mod viewport_aspect; @@ -250,6 +251,13 @@ struct Args { #[arg(long, default_value_t = false)] attach_frame_id: bool, + /// Attach keyboard-controlled 6-channel data (6x int16 fixed-point, 12 bytes) + /// as the per-frame user_data trailer field. Control the channels from the + /// preview window: Q/A=CH1, W/S=CH2, E/D=CH3, R/F=CH4, T/G=CH5, Y/H=CH6. + /// Requires --display-video (the window provides keyboard focus). + #[arg(long, default_value_t = false, requires = "display_video")] + attach_user_data: bool, + /// Open a window that displays the video frames being published #[arg(long, default_value_t = false)] display_video: bool, @@ -1181,6 +1189,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { let mut frame_metadata_features = FrameMetadataFeatures::default(); frame_metadata_features.user_timestamp = args.attach_timestamp; frame_metadata_features.frame_id = args.attach_frame_id; + frame_metadata_features.user_data = args.attach_user_data; let publish_opts = |codec: VideoCodec| TrackPublishOptions { source: TrackSource::Camera, @@ -1222,6 +1231,12 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { display_timing: args.display_timing, }; + // Shared keyboard-controlled channel values, written by the preview window + // and read by the capture loop to fill the user_data trailer. + let user_data_channels = args + .attach_user_data + .then(|| Arc::new(Mutex::new([0.0f32; user_data::NUM_CHANNELS]))); + let publish_stats_task = tokio::spawn(update_publisher_video_stats(track.clone(), ctrl_c_received.clone())); @@ -1235,6 +1250,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { session, width, height, + user_data_channels.clone(), ) .await; let _ = publish_stats_task.await; @@ -1264,6 +1280,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { height, Some(shared.clone()), publish_timing_state.clone(), + user_data_channels.clone(), )); let display_result = video_display::run_display( @@ -1271,6 +1288,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { shared, ctrl_c_received.clone(), Some(width as f32 / height as f32), + user_data_channels.clone(), ); let capture_result = capture_task.await?; @@ -1289,6 +1307,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { height, None, publish_timing_state.clone(), + user_data_channels.clone(), ) .await; let _ = publish_stats_task.await; @@ -1310,6 +1329,7 @@ async fn run_capture_loop( height: u32, display_shared: Option>>, publish_timing_state: Option>>, + user_data_channels: Option>>, ) -> Result<()> { // Pace publishing at the requested FPS (not the camera-reported FPS) to hit desired cadence let pace_fps = config.fps as f64; @@ -1573,8 +1593,10 @@ async fn run_capture_loop( if burned_timestamp_us.is_some() { debug_assert_eq!(burned_timestamp_us, Some(capture_wall_time_us)); } - frame.frame_metadata = if user_ts.is_some() || fid.is_some() { - Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data: None }) + let user_data = + user_data_channels.as_ref().map(|targets| user_data::encode(&targets.lock())); + frame.frame_metadata = if user_ts.is_some() || fid.is_some() || user_data.is_some() { + Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data }) } else { None }; @@ -1702,6 +1724,7 @@ async fn run_argus_capture_loop( session: argus::ArgusCaptureSession, width: u32, height: u32, + user_data_channels: Option>>, ) -> Result<()> { let capture_handle = std::thread::Builder::new() .name("mipi-capture".into()) @@ -1818,8 +1841,10 @@ async fn run_argus_capture_loop( } else { None }; - let frame_metadata = if user_ts.is_some() || fid.is_some() { - Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data: None }) + let user_data = + user_data_channels.as_ref().map(|targets| user_data::encode(&targets.lock())); + let frame_metadata = if user_ts.is_some() || fid.is_some() || user_data.is_some() { + Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data }) } else { None }; diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index 54f4fff5e..cceb55d6b 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -13,7 +13,7 @@ use livekit_api::access_token; use log::{debug, info}; use parking_lot::Mutex; use std::{ - collections::HashMap, + collections::{HashMap, VecDeque}, env, sync::OnceLock, sync::{ @@ -24,6 +24,7 @@ use std::{ }; mod codec_display; +mod user_data; mod subscriber_timing; mod viewport_aspect; @@ -946,7 +947,7 @@ async fn handle_track_subscribed( if drained_frames > 0 { debug!("Dropped {drained_frames} stale decoded frames before render upload"); } - if let Some(metadata) = frame.frame_metadata { + if let Some(metadata) = &frame.frame_metadata { if let Some(capture_timestamp_us) = metadata.user_timestamp { subscriber_timing_sink.record_frame_received_by_sink( capture_timestamp_us, @@ -1120,6 +1121,81 @@ fn subscriber_overlay_lines( Some(lines) } +/// Render a live line graph of the six decoded channel values (top-right overlay). +/// Each trace is normalized so ±`VALUE_RANGE` spans the plot height. +fn paint_channel_graph(ctx: &egui::Context, history: &VecDeque<[f32; user_data::NUM_CHANNELS]>) { + if history.is_empty() { + return; + } + let latest = *history.back().unwrap(); + + egui::Area::new("channel_graph".into()) + .anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 10.0)) + .interactable(false) + .show(ctx, |ui| { + egui::Frame::NONE + .fill(egui::Color32::from_black_alpha(180)) + .corner_radius(egui::CornerRadius::same(4)) + .inner_margin(egui::Margin::same(8)) + .show(ui, |ui| { + let plot_size = egui::vec2(360.0, 160.0); + // Pin the panel to the plot width so the legend wraps within it + // instead of stretching the frame wider than the graph. + ui.set_max_width(plot_size.x); + + ui.label( + egui::RichText::new("user_data channels") + .monospace() + .size(12.0) + .color(egui::Color32::WHITE), + ); + + let (rect, _) = ui.allocate_exact_size(plot_size, egui::Sense::hover()); + let painter = ui.painter_at(rect); + + // Zero axis. + painter.hline( + rect.x_range(), + rect.center().y, + egui::Stroke::new(1.0, egui::Color32::from_gray(90)), + ); + + let n = history.len(); + let denom = (n.saturating_sub(1)).max(1) as f32; + let half_h = rect.height() / 2.0 - 2.0; + for j in 0..user_data::NUM_CHANNELS { + let points: Vec = history + .iter() + .enumerate() + .map(|(i, sample)| { + let x = rect.left() + (i as f32 / denom) * rect.width(); + let norm = (sample[j] / user_data::VALUE_RANGE) + .clamp(-1.0, 1.0); + let y = rect.center().y - norm * half_h; + egui::pos2(x, y) + }) + .collect(); + painter.add(egui::Shape::line( + points, + egui::Stroke::new(1.5, CHANNEL_COLORS[j]), + )); + } + + // Legend: current value per channel. + ui.horizontal_wrapped(|ui| { + for (j, value) in latest.iter().enumerate() { + ui.label( + egui::RichText::new(format!("CH{}: {:>+6.2}", j + 1, value)) + .monospace() + .size(11.0) + .color(CHANNEL_COLORS[j]), + ); + } + }); + }); + }); +} + fn paint_subscriber_overlay(ctx: &egui::Context, lines: &[String]) { egui::Area::new("subscriber_overlay".into()) .anchor(egui::Align2::LEFT_TOP, egui::vec2(10.0, 10.0)) @@ -1182,6 +1258,19 @@ fn handle_track_unpublished( clear_hud_and_simulcast(shared, frame_slot, video_size, simulcast, subscriber_timing); } +/// Number of channel samples retained for the live graph (~10s at 30fps). +const CHANNEL_HISTORY_LEN: usize = 300; + +/// Distinct colors for the six channel traces. +const CHANNEL_COLORS: [egui::Color32; user_data::NUM_CHANNELS] = [ + egui::Color32::from_rgb(0xef, 0x53, 0x50), // red + egui::Color32::from_rgb(0xff, 0xa7, 0x26), // orange + egui::Color32::from_rgb(0xff, 0xee, 0x58), // yellow + egui::Color32::from_rgb(0x66, 0xbb, 0x6a), // green + egui::Color32::from_rgb(0x42, 0xa5, 0xf5), // blue + egui::Color32::from_rgb(0xab, 0x47, 0xbc), // purple +]; + struct VideoApp { shared: Arc>, frame_slot: Arc, @@ -1192,6 +1281,8 @@ struct VideoApp { ctrl_c_received: Arc, viewport: AspectConstrainedViewport, display_timestamp: bool, + /// Rolling history of decoded channel values from the user_data trailer. + channel_history: VecDeque<[f32; user_data::NUM_CHANNELS]>, } impl eframe::App for VideoApp { @@ -1208,7 +1299,7 @@ impl eframe::App for VideoApp { let render_frame = self.frame_slot.take(); if let Some(frame) = render_frame.as_ref() { - if let Some(metadata) = frame.frame_metadata { + if let Some(metadata) = &frame.frame_metadata { if let Some(capture_timestamp_us) = metadata.user_timestamp { self.subscriber_timing.record_frame_selected_for_render( capture_timestamp_us, @@ -1216,6 +1307,13 @@ impl eframe::App for VideoApp { current_timestamp_us(), ); } + // Decode the 6 user_data channel values for the live graph. + if let Some(values) = metadata.user_data.as_deref().and_then(user_data::decode) { + if self.channel_history.len() >= CHANNEL_HISTORY_LEN { + self.channel_history.pop_front(); + } + self.channel_history.push_back(values); + } } } @@ -1255,6 +1353,8 @@ impl eframe::App for VideoApp { paint_subscriber_overlay(ctx, lines); } + paint_channel_graph(ctx, &self.channel_history); + // Simulcast layer controls: bottom-left overlay egui::Area::new("simulcast_controls".into()) .anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0)) @@ -1450,6 +1550,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { ctrl_c_received: ctrl_c_received.clone(), viewport, display_timestamp: args.display_timestamp, + channel_history: VecDeque::with_capacity(CHANNEL_HISTORY_LEN), }; let native_options = viewport_aspect::native_options(None); eframe::run_native( @@ -1797,8 +1898,8 @@ impl CallbackTrait for YuvPaintCallback { let frame_for_upload = self.render_frame.lock().take().map(|frame| { let prepare_timestamp_us = current_timestamp_us(); - let frame_id = frame.frame_metadata.and_then(|m| m.frame_id); - let sample = frame.frame_metadata.and_then(|metadata| { + let frame_id = frame.frame_metadata.as_ref().and_then(|m| m.frame_id); + let sample = frame.frame_metadata.as_ref().and_then(|metadata| { metadata.user_timestamp.map(|capture_timestamp_us| PendingPaintSample { frame_id, capture_timestamp_us, diff --git a/examples/local_video/src/user_data.rs b/examples/local_video/src/user_data.rs new file mode 100644 index 000000000..128004c46 --- /dev/null +++ b/examples/local_video/src/user_data.rs @@ -0,0 +1,81 @@ +//! Shared 6-channel codec for the `--attach-user-data` demo. +//! +//! Six channel values are encoded as little-endian `int16` fixed-point, 2 bytes +//! per channel = 12 bytes total, and shipped in the `user_data` frame-metadata +//! trailer field. The full `int16` range maps to `±VALUE_RANGE`, giving +//! ~1/32767 of the range in resolution — well within the ~232-byte trailer +//! budget. +//! +//! Both the `publisher` and `subscriber` binaries include this file via +//! `mod user_data;` so they agree on the wire format. + +/// Number of channels carried in the user_data payload. +pub const NUM_CHANNELS: usize = 6; + +/// Encoded payload size in bytes (2 bytes per channel). +pub const ENCODED_LEN: usize = NUM_CHANNELS * 2; + +/// Value that maps to `i16::MAX`. Channel values are normalized to +/// `±VALUE_RANGE` before quantization. +pub const VALUE_RANGE: f32 = 1.0; + +/// Value units per `int16` step. +fn scale() -> f32 { + VALUE_RANGE / i16::MAX as f32 +} + +/// Clamp a channel value to the encodable `±VALUE_RANGE` range. +pub fn clamp_value(value: f32) -> f32 { + value.clamp(-VALUE_RANGE, VALUE_RANGE) +} + +/// Encode 6 channel values into 12 little-endian `int16` bytes. +pub fn encode(values: &[f32; NUM_CHANNELS]) -> Vec { + let s = scale(); + let mut buf = Vec::with_capacity(ENCODED_LEN); + for &v in values { + let q = (v / s).round().clamp(i16::MIN as f32, i16::MAX as f32) as i16; + buf.extend_from_slice(&q.to_le_bytes()); + } + buf +} + +/// Decode 6 channel values from the `user_data` payload. Returns `None` if the +/// buffer is too short to hold all six values. +pub fn decode(buf: &[u8]) -> Option<[f32; NUM_CHANNELS]> { + if buf.len() < ENCODED_LEN { + return None; + } + let s = scale(); + let mut out = [0.0f32; NUM_CHANNELS]; + for (i, chunk) in buf.chunks_exact(2).take(NUM_CHANNELS).enumerate() { + out[i] = i16::from_le_bytes([chunk[0], chunk[1]]) as f32 * s; + } + Some(out) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn round_trips_within_quantization_error() { + let values = [0.0, 0.5, -0.75, 1.0, -0.1, 0.9]; + let decoded = decode(&encode(&values)).unwrap(); + for (v, d) in values.iter().zip(decoded.iter()) { + assert!((v - d).abs() <= scale(), "got {d}, expected ~{v}"); + } + } + + #[test] + fn clamp_keeps_within_range() { + assert_eq!(clamp_value(100.0), VALUE_RANGE); + assert_eq!(clamp_value(-100.0), -VALUE_RANGE); + assert_eq!(clamp_value(0.5), 0.5); + } + + #[test] + fn decode_rejects_short_buffer() { + assert!(decode(&[0u8; ENCODED_LEN - 1]).is_none()); + } +} diff --git a/examples/local_video/src/video_display.rs b/examples/local_video/src/video_display.rs index 9acb7fd1e..3d809d0fb 100644 --- a/examples/local_video/src/video_display.rs +++ b/examples/local_video/src/video_display.rs @@ -593,11 +593,54 @@ mod tests { } } +/// Per-channel (increment, decrement) keys: Q/A=CH1, W/S=CH2, E/D=CH3, R/F=CH4, +/// T/G=CH5, Y/H=CH6. +const CHANNEL_KEYS: [(egui::Key, egui::Key); crate::user_data::NUM_CHANNELS] = [ + (egui::Key::Q, egui::Key::A), + (egui::Key::W, egui::Key::S), + (egui::Key::E, egui::Key::D), + (egui::Key::R, egui::Key::F), + (egui::Key::T, egui::Key::G), + (egui::Key::Y, egui::Key::H), +]; + +/// How fast a held key drives a channel (value units/second). Full `±VALUE_RANGE` +/// span in ~2s. +const CHANNEL_RATE_PER_S: f32 = 1.0; + +type ChannelValues = Arc>; + struct VideoApp { shared: Arc>, ctrl_c_received: Arc, viewport: AspectConstrainedViewport, timing_overlay_state: PublisherTimingOverlayState, + /// Keyboard-controlled user_data channel values shared with the capture loop. + channels: Option, +} + +/// Apply held-key deltas to the shared channel values and return the current +/// values for display. Returns `None` when channel control is off. +fn drive_channels( + ctx: &egui::Context, + targets: &ChannelValues, +) -> [f32; crate::user_data::NUM_CHANNELS] { + let dt = ctx.input(|i| i.stable_dt).min(0.1); + let mut values = targets.lock(); + for (idx, (inc, dec)) in CHANNEL_KEYS.iter().enumerate() { + let (inc_down, dec_down) = ctx.input(|i| (i.key_down(*inc), i.key_down(*dec))); + let mut delta = 0.0; + if inc_down { + delta += CHANNEL_RATE_PER_S * dt; + } + if dec_down { + delta -= CHANNEL_RATE_PER_S * dt; + } + if delta != 0.0 { + values[idx] = crate::user_data::clamp_value(values[idx] + delta); + } + } + *values } impl eframe::App for VideoApp { @@ -613,6 +656,8 @@ impl eframe::App for VideoApp { self.viewport.set_video_size(ctx, width, height); } + let channel_values = self.channels.as_ref().map(|targets| drive_channels(ctx, targets)); + egui::CentralPanel::default().frame(egui::Frame::NONE).show(ctx, |ui| { ui.ctx().request_repaint(); @@ -662,21 +707,58 @@ impl eframe::App for VideoApp { }); }); + if let Some(values) = channel_values { + paint_channel_controls(ctx, &values); + } + ctx.request_repaint_after(viewport_aspect::VIDEO_REPAINT_INTERVAL); } } +/// Bottom-left HUD listing the channel key bindings and current values. +fn paint_channel_controls(ctx: &egui::Context, values: &[f32; crate::user_data::NUM_CHANNELS]) { + const KEY_LABELS: [&str; crate::user_data::NUM_CHANNELS] = + ["Q/A", "W/S", "E/D", "R/F", "T/G", "Y/H"]; + let mut lines = vec!["user_data channels".to_string()]; + for (idx, value) in values.iter().enumerate() { + lines.push(format!("CH{} [{}]: {:>+6.2}", idx + 1, KEY_LABELS[idx], value)); + } + + egui::Area::new("channel_controls".into()) + .anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0)) + .interactable(false) + .show(ctx, |ui| { + egui::Frame::NONE + .fill(egui::Color32::from_black_alpha(160)) + .corner_radius(egui::CornerRadius::same(4)) + .inner_margin(egui::Margin::same(6)) + .show(ui, |ui| { + ui.add( + egui::Label::new( + egui::RichText::new(lines.join("\n")) + .monospace() + .size(12.0) + .color(egui::Color32::WHITE), + ) + .extend(), + ); + }); + }); +} + pub(crate) fn run_display( title: &str, shared: Arc>, ctrl_c_received: Arc, initial_aspect: Option, + channels: Option, ) -> Result<()> { let app = VideoApp { shared, ctrl_c_received: ctrl_c_received.clone(), viewport: AspectConstrainedViewport::new(initial_aspect), timing_overlay_state: PublisherTimingOverlayState::default(), + channels, }; let native_options = viewport_aspect::native_options(initial_aspect); let result = eframe::run_native(title, native_options, Box::new(|_| Ok(Box::new(app)))); From 8b7a7bd9f325a34e4dc061d8677c327dc280e52c Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 20 Jun 2026 20:48:26 +0000 Subject: [PATCH 04/11] generated protobuf --- livekit-ffi-node-bindings/proto/track_pb.d.ts | 5 +++++ livekit-ffi-node-bindings/proto/track_pb.js | 1 + livekit-ffi-node-bindings/proto/video_frame_pb.d.ts | 5 +++++ livekit-ffi-node-bindings/proto/video_frame_pb.js | 1 + 4 files changed, 12 insertions(+) diff --git a/livekit-ffi-node-bindings/proto/track_pb.d.ts b/livekit-ffi-node-bindings/proto/track_pb.d.ts index 8aaf93650..03c2c8027 100644 --- a/livekit-ffi-node-bindings/proto/track_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/track_pb.d.ts @@ -148,6 +148,11 @@ export declare enum FrameMetadataFeature { * @generated from enum value: FMF_FRAME_ID = 1; */ FMF_FRAME_ID = 1, + + /** + * @generated from enum value: FMF_USER_DATA = 2; + */ + FMF_USER_DATA = 2, } /** diff --git a/livekit-ffi-node-bindings/proto/track_pb.js b/livekit-ffi-node-bindings/proto/track_pb.js index 8ecc19d4d..55d07ff5c 100644 --- a/livekit-ffi-node-bindings/proto/track_pb.js +++ b/livekit-ffi-node-bindings/proto/track_pb.js @@ -87,6 +87,7 @@ const FrameMetadataFeature = /*@__PURE__*/ proto2.makeEnum( [ {no: 0, name: "FMF_USER_TIMESTAMP"}, {no: 1, name: "FMF_FRAME_ID"}, + {no: 2, name: "FMF_USER_DATA"}, ], ); diff --git a/livekit-ffi-node-bindings/proto/video_frame_pb.d.ts b/livekit-ffi-node-bindings/proto/video_frame_pb.d.ts index e0ec12f19..76a56bb4d 100644 --- a/livekit-ffi-node-bindings/proto/video_frame_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/video_frame_pb.d.ts @@ -701,6 +701,11 @@ export declare class FrameMetadata extends Message { */ frameId?: number; + /** + * @generated from field: optional bytes user_data = 3; + */ + userData?: Uint8Array; + constructor(data?: PartialMessage); static readonly runtime: typeof proto2; diff --git a/livekit-ffi-node-bindings/proto/video_frame_pb.js b/livekit-ffi-node-bindings/proto/video_frame_pb.js index 331320682..8cffe1708 100644 --- a/livekit-ffi-node-bindings/proto/video_frame_pb.js +++ b/livekit-ffi-node-bindings/proto/video_frame_pb.js @@ -281,6 +281,7 @@ const FrameMetadata = /*@__PURE__*/ proto2.makeMessageType( () => [ { no: 1, name: "user_timestamp", kind: "scalar", T: 4 /* ScalarType.UINT64 */, opt: true }, { no: 2, name: "frame_id", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true }, + { no: 3, name: "user_data", kind: "scalar", T: 12 /* ScalarType.BYTES */, opt: true }, ], ); From b63418f90ed5ef6344d5345f50c3d9a4cc0a484d Mon Sep 17 00:00:00 2001 From: David Chen Date: Sat, 20 Jun 2026 13:54:27 -0700 Subject: [PATCH 05/11] bump protocol to 1.48.0 --- livekit-protocol/protocol | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-protocol/protocol b/livekit-protocol/protocol index 4b09446be..df0314e18 160000 --- a/livekit-protocol/protocol +++ b/livekit-protocol/protocol @@ -1 +1 @@ -Subproject commit 4b09446beca5b3b5b02a3c424655f386521ca008 +Subproject commit df0314e189f0ab695005c5edc10f087b5a36ad23 From d96d855cfedcf6bad334454a81cbfcbb628a774e Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 20 Jun 2026 20:56:20 +0000 Subject: [PATCH 06/11] generated protobuf --- livekit-protocol/src/livekit.rs | 101 ++++++-- livekit-protocol/src/livekit.serde.rs | 358 +++++++++++++++++++++++--- 2 files changed, 415 insertions(+), 44 deletions(-) diff --git a/livekit-protocol/src/livekit.rs b/livekit-protocol/src/livekit.rs index 0b06daee6..1af6a6e0e 100644 --- a/livekit-protocol/src/livekit.rs +++ b/livekit-protocol/src/livekit.rs @@ -371,6 +371,10 @@ pub struct ParticipantInfo { /// protocol version used for client feature compatibility #[prost(int32, tag="20")] pub client_protocol: i32, + /// capabilities the participant's client advertises, mirrored from ClientInfo. + /// Lets other participants perform client-side feature detection. + #[prost(enumeration="client_info::Capability", repeated, tag="21")] + pub capabilities: ::prost::alloc::vec::Vec, } /// Nested message and enum types in `ParticipantInfo`. pub mod participant_info { @@ -1165,6 +1169,7 @@ pub mod client_info { pub enum Capability { CapUnused = 0, CapPacketTrailer = 1, + CapCompressionDeflateRaw = 2, } impl Capability { /// String value of the enum field names used in the ProtoBuf definition. @@ -1175,6 +1180,7 @@ pub mod client_info { match self { Capability::CapUnused => "CAP_UNUSED", Capability::CapPacketTrailer => "CAP_PACKET_TRAILER", + Capability::CapCompressionDeflateRaw => "CAP_COMPRESSION_DEFLATE_RAW", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -1182,6 +1188,7 @@ pub mod client_info { match value { "CAP_UNUSED" => Some(Self::CapUnused), "CAP_PACKET_TRAILER" => Some(Self::CapPacketTrailer), + "CAP_COMPRESSION_DEFLATE_RAW" => Some(Self::CapCompressionDeflateRaw), _ => None, } } @@ -1481,6 +1488,13 @@ pub mod data_stream { /// user defined attributes map that can carry additional info #[prost(map="string, string", tag="8")] pub attributes: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, + /// Optional inline content so that a data stream can be sent as a single packet for short payloads. + /// + /// content as binary (bytes) + #[prost(bytes="vec", optional, tag="11")] + pub inline_content: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(enumeration="CompressionType", tag="12")] + pub compression: i32, /// oneof to choose between specific header types #[prost(oneof="header::ContentHeader", tags="9, 10")] pub content_header: ::core::option::Option, @@ -1562,6 +1576,37 @@ pub mod data_stream { } } } + /// The compression type of the whole data stream + /// + /// This will only get populated when send to participants with a + /// client protocol >= 2 which advertise a client capability of CAP_COMPRESSION_DEFLATE_RAW + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] + #[repr(i32)] + pub enum CompressionType { + None = 0, + /// DEFLATE_RAW = DEFLATE without header+checksum/trailer + DeflateRaw = 1, + } + impl CompressionType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + CompressionType::None => "NONE", + CompressionType::DeflateRaw => "DEFLATE_RAW", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "NONE" => Some(Self::None), + "DEFLATE_RAW" => Some(Self::DeflateRaw), + _ => None, + } + } + } } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -2199,10 +2244,9 @@ pub struct WebSource { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct MediaSource { + /// TODO: DataConfig data = 4; #[prost(message, optional, tag="3")] pub audio: ::core::option::Option, - #[prost(message, optional, tag="4")] - pub data: ::core::option::Option, #[prost(oneof="media_source::Video", tags="1, 2")] pub video: ::core::option::Option, } @@ -2217,6 +2261,8 @@ pub mod media_source { ParticipantVideo(super::ParticipantVideo), } } +// --- Video Configuration --- + #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParticipantVideo { @@ -2232,9 +2278,10 @@ pub struct ParticipantVideo { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct AudioConfig { - /// If empty, all audio captured in both channels. - /// If non-empty, only matching audio is captured and routed. Unmatched is excluded. - #[prost(message, repeated, tag="1")] + /// If true, all unmatched audio is recorded to both channels + #[prost(bool, tag="1")] + pub capture_all: bool, + #[prost(message, repeated, tag="2")] pub routes: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] @@ -2263,15 +2310,15 @@ pub mod audio_route { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DataConfig { - /// If empty, all data tracks captured. - /// If non-empty, only matching data tracks are captured. - #[prost(message, repeated, tag="1")] + #[prost(bool, tag="1")] + pub capture_all: bool, + #[prost(message, repeated, tag="2")] pub selectors: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DataSelector { - #[prost(oneof="data_selector::Match", tags="1, 2, 3")] + #[prost(oneof="data_selector::Match", tags="1, 2")] pub r#match: ::core::option::Option, } /// Nested message and enum types in `DataSelector`. @@ -2283,8 +2330,6 @@ pub mod data_selector { TrackId(::prost::alloc::string::String), #[prost(string, tag="2")] ParticipantIdentity(::prost::alloc::string::String), - #[prost(string, tag="3")] - Topic(::prost::alloc::string::String), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -2353,7 +2398,7 @@ pub mod output { Stream(super::StreamOutput), #[prost(message, tag="3")] Segments(super::SegmentedFileOutput), - /// 5 reserved for mcap; + /// TODO: DataOutput data = 5; #[prost(message, tag="4")] Images(super::ImageOutput), } @@ -2405,11 +2450,13 @@ pub struct SegmentedFileOutput { /// disable upload of manifest file (default false) #[prost(bool, tag="8")] pub disable_manifest: bool, + /// TODO: deprecate #[prost(oneof="segmented_file_output::Output", tags="5, 6, 7, 9")] pub output: ::core::option::Option, } /// Nested message and enum types in `SegmentedFileOutput`. pub mod segmented_file_output { + /// TODO: deprecate #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Output { @@ -2448,11 +2495,13 @@ pub struct ImageOutput { /// disable upload of manifest file (default false) #[prost(bool, tag="7")] pub disable_manifest: bool, + /// TODO: deprecate #[prost(oneof="image_output::Output", tags="8, 9, 10, 11")] pub output: ::core::option::Option, } /// Nested message and enum types in `ImageOutput`. pub mod image_output { + /// TODO: deprecate #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Output { @@ -2655,7 +2704,7 @@ pub struct EgressInfo { pub backup_storage_used: bool, #[prost(int32, tag="27")] pub retry_count: i32, - #[prost(oneof="egress_info::Request", tags="30, 4, 14, 19, 5, 6")] + #[prost(oneof="egress_info::Request", tags="29, 30, 4, 14, 19, 5, 6")] pub request: ::core::option::Option, // next ID: 31 @@ -2668,9 +2717,11 @@ pub mod egress_info { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Request { - /// StartEgressRequest egress = 29; + #[prost(message, tag="29")] + Egress(super::StartEgressRequest), #[prost(message, tag="30")] Replay(super::ExportReplayRequest), + /// TODO: deprecate #[prost(message, tag="4")] RoomComposite(super::RoomCompositeEgressRequest), #[prost(message, tag="14")] @@ -2893,7 +2944,7 @@ pub mod export_replay_request { Advanced(super::EncodingOptions), } } -// --- V1 --- +// TODO: deprecate --- V1 --- #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -4597,6 +4648,8 @@ pub struct Job { pub enable_recording: bool, #[prost(string, tag="11")] pub deployment: ::prost::alloc::string::String, + #[prost(map="string, string", tag="12")] + pub attributes: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -4906,6 +4959,8 @@ pub struct CreateAgentDispatchRequest { pub restart_policy: i32, #[prost(string, tag="5")] pub deployment: ::prost::alloc::string::String, + #[prost(map="string, string", tag="6")] + pub attributes: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -4919,6 +4974,8 @@ pub struct RoomAgentDispatch { pub restart_policy: i32, #[prost(string, tag="4")] pub deployment: ::prost::alloc::string::String, + #[prost(map="string, string", tag="5")] + pub attributes: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -4962,6 +5019,8 @@ pub struct AgentDispatch { pub restart_policy: i32, #[prost(string, tag="7")] pub deployment: ::prost::alloc::string::String, + #[prost(map="string, string", tag="8")] + pub attributes: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -5998,8 +6057,11 @@ pub struct SipInboundTrunkInfo { pub max_call_duration: ::core::option::Option<::pbjson_types::Duration>, #[prost(bool, tag="13")] pub krisp_enabled: bool, + #[deprecated] #[prost(enumeration="SipMediaEncryption", tag="16")] pub media_encryption: i32, + #[prost(message, optional, tag="20")] + pub media: ::core::option::Option, #[prost(message, optional, tag="17")] pub created_at: ::core::option::Option<::pbjson_types::Timestamp>, #[prost(message, optional, tag="18")] @@ -6024,8 +6086,11 @@ pub struct SipInboundTrunkUpdate { pub name: ::core::option::Option<::prost::alloc::string::String>, #[prost(string, optional, tag="7")] pub metadata: ::core::option::Option<::prost::alloc::string::String>, + #[deprecated] #[prost(enumeration="SipMediaEncryption", optional, tag="8")] pub media_encryption: ::core::option::Option, + #[prost(message, optional, tag="10")] + pub media: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -6103,8 +6168,11 @@ pub struct SipOutboundTrunkInfo { /// lowercase header names should be used, for example: sip.h.x-custom-header. #[prost(enumeration="SipHeaderOptions", tag="12")] pub include_headers: i32, + #[deprecated] #[prost(enumeration="SipMediaEncryption", tag="13")] pub media_encryption: i32, + #[prost(message, optional, tag="18")] + pub media: ::core::option::Option, /// Optional custom hostname for the 'From' SIP header in outbound INVITEs. /// When set, outbound calls from this trunk will use this host instead of the default project SIP domain. /// Enables originating calls from custom domains. @@ -6134,8 +6202,11 @@ pub struct SipOutboundTrunkUpdate { pub name: ::core::option::Option<::prost::alloc::string::String>, #[prost(string, optional, tag="7")] pub metadata: ::core::option::Option<::prost::alloc::string::String>, + #[deprecated] #[prost(enumeration="SipMediaEncryption", optional, tag="8")] pub media_encryption: ::core::option::Option, + #[prost(message, optional, tag="11")] + pub media: ::core::option::Option, #[prost(string, optional, tag="10")] pub from_host: ::core::option::Option<::prost::alloc::string::String>, } diff --git a/livekit-protocol/src/livekit.serde.rs b/livekit-protocol/src/livekit.serde.rs index 1a3c60d91..ebd07f129 100644 --- a/livekit-protocol/src/livekit.serde.rs +++ b/livekit-protocol/src/livekit.serde.rs @@ -1554,6 +1554,9 @@ impl serde::Serialize for AgentDispatch { if !self.deployment.is_empty() { len += 1; } + if !self.attributes.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("livekit.AgentDispatch", len)?; if !self.id.is_empty() { struct_ser.serialize_field("id", &self.id)?; @@ -1578,6 +1581,9 @@ impl serde::Serialize for AgentDispatch { if !self.deployment.is_empty() { struct_ser.serialize_field("deployment", &self.deployment)?; } + if !self.attributes.is_empty() { + struct_ser.serialize_field("attributes", &self.attributes)?; + } struct_ser.end() } } @@ -1597,6 +1603,7 @@ impl<'de> serde::Deserialize<'de> for AgentDispatch { "restart_policy", "restartPolicy", "deployment", + "attributes", ]; #[allow(clippy::enum_variant_names)] @@ -1608,6 +1615,7 @@ impl<'de> serde::Deserialize<'de> for AgentDispatch { State, RestartPolicy, Deployment, + Attributes, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -1637,6 +1645,7 @@ impl<'de> serde::Deserialize<'de> for AgentDispatch { "state" => Ok(GeneratedField::State), "restartPolicy" | "restart_policy" => Ok(GeneratedField::RestartPolicy), "deployment" => Ok(GeneratedField::Deployment), + "attributes" => Ok(GeneratedField::Attributes), _ => Ok(GeneratedField::__SkipField__), } } @@ -1663,6 +1672,7 @@ impl<'de> serde::Deserialize<'de> for AgentDispatch { let mut state__ = None; let mut restart_policy__ = None; let mut deployment__ = None; + let mut attributes__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Id => { @@ -1707,6 +1717,14 @@ impl<'de> serde::Deserialize<'de> for AgentDispatch { } deployment__ = Some(map_.next_value()?); } + GeneratedField::Attributes => { + if attributes__.is_some() { + return Err(serde::de::Error::duplicate_field("attributes")); + } + attributes__ = Some( + map_.next_value::>()? + ); + } GeneratedField::__SkipField__ => { let _ = map_.next_value::()?; } @@ -1720,6 +1738,7 @@ impl<'de> serde::Deserialize<'de> for AgentDispatch { state: state__, restart_policy: restart_policy__.unwrap_or_default(), deployment: deployment__.unwrap_or_default(), + attributes: attributes__.unwrap_or_default(), }) } } @@ -4906,10 +4925,16 @@ impl serde::Serialize for AudioConfig { { use serde::ser::SerializeStruct; let mut len = 0; + if self.capture_all { + len += 1; + } if !self.routes.is_empty() { len += 1; } let mut struct_ser = serializer.serialize_struct("livekit.AudioConfig", len)?; + if self.capture_all { + struct_ser.serialize_field("captureAll", &self.capture_all)?; + } if !self.routes.is_empty() { struct_ser.serialize_field("routes", &self.routes)?; } @@ -4923,11 +4948,14 @@ impl<'de> serde::Deserialize<'de> for AudioConfig { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ + "capture_all", + "captureAll", "routes", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { + CaptureAll, Routes, __SkipField__, } @@ -4951,6 +4979,7 @@ impl<'de> serde::Deserialize<'de> for AudioConfig { E: serde::de::Error, { match value { + "captureAll" | "capture_all" => Ok(GeneratedField::CaptureAll), "routes" => Ok(GeneratedField::Routes), _ => Ok(GeneratedField::__SkipField__), } @@ -4971,9 +5000,16 @@ impl<'de> serde::Deserialize<'de> for AudioConfig { where V: serde::de::MapAccess<'de>, { + let mut capture_all__ = None; let mut routes__ = None; while let Some(k) = map_.next_key()? { match k { + GeneratedField::CaptureAll => { + if capture_all__.is_some() { + return Err(serde::de::Error::duplicate_field("captureAll")); + } + capture_all__ = Some(map_.next_value()?); + } GeneratedField::Routes => { if routes__.is_some() { return Err(serde::de::Error::duplicate_field("routes")); @@ -4986,6 +5022,7 @@ impl<'de> serde::Deserialize<'de> for AudioConfig { } } Ok(AudioConfig { + capture_all: capture_all__.unwrap_or_default(), routes: routes__.unwrap_or_default(), }) } @@ -6994,6 +7031,7 @@ impl serde::Serialize for client_info::Capability { let variant = match self { Self::CapUnused => "CAP_UNUSED", Self::CapPacketTrailer => "CAP_PACKET_TRAILER", + Self::CapCompressionDeflateRaw => "CAP_COMPRESSION_DEFLATE_RAW", }; serializer.serialize_str(variant) } @@ -7007,6 +7045,7 @@ impl<'de> serde::Deserialize<'de> for client_info::Capability { const FIELDS: &[&str] = &[ "CAP_UNUSED", "CAP_PACKET_TRAILER", + "CAP_COMPRESSION_DEFLATE_RAW", ]; struct GeneratedVisitor; @@ -7049,6 +7088,7 @@ impl<'de> serde::Deserialize<'de> for client_info::Capability { match value { "CAP_UNUSED" => Ok(client_info::Capability::CapUnused), "CAP_PACKET_TRAILER" => Ok(client_info::Capability::CapPacketTrailer), + "CAP_COMPRESSION_DEFLATE_RAW" => Ok(client_info::Capability::CapCompressionDeflateRaw), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } @@ -8427,6 +8467,9 @@ impl serde::Serialize for CreateAgentDispatchRequest { if !self.deployment.is_empty() { len += 1; } + if !self.attributes.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("livekit.CreateAgentDispatchRequest", len)?; if !self.agent_name.is_empty() { struct_ser.serialize_field("agentName", &self.agent_name)?; @@ -8445,6 +8488,9 @@ impl serde::Serialize for CreateAgentDispatchRequest { if !self.deployment.is_empty() { struct_ser.serialize_field("deployment", &self.deployment)?; } + if !self.attributes.is_empty() { + struct_ser.serialize_field("attributes", &self.attributes)?; + } struct_ser.end() } } @@ -8462,6 +8508,7 @@ impl<'de> serde::Deserialize<'de> for CreateAgentDispatchRequest { "restart_policy", "restartPolicy", "deployment", + "attributes", ]; #[allow(clippy::enum_variant_names)] @@ -8471,6 +8518,7 @@ impl<'de> serde::Deserialize<'de> for CreateAgentDispatchRequest { Metadata, RestartPolicy, Deployment, + Attributes, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -8498,6 +8546,7 @@ impl<'de> serde::Deserialize<'de> for CreateAgentDispatchRequest { "metadata" => Ok(GeneratedField::Metadata), "restartPolicy" | "restart_policy" => Ok(GeneratedField::RestartPolicy), "deployment" => Ok(GeneratedField::Deployment), + "attributes" => Ok(GeneratedField::Attributes), _ => Ok(GeneratedField::__SkipField__), } } @@ -8522,6 +8571,7 @@ impl<'de> serde::Deserialize<'de> for CreateAgentDispatchRequest { let mut metadata__ = None; let mut restart_policy__ = None; let mut deployment__ = None; + let mut attributes__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::AgentName => { @@ -8554,6 +8604,14 @@ impl<'de> serde::Deserialize<'de> for CreateAgentDispatchRequest { } deployment__ = Some(map_.next_value()?); } + GeneratedField::Attributes => { + if attributes__.is_some() { + return Err(serde::de::Error::duplicate_field("attributes")); + } + attributes__ = Some( + map_.next_value::>()? + ); + } GeneratedField::__SkipField__ => { let _ = map_.next_value::()?; } @@ -8565,6 +8623,7 @@ impl<'de> serde::Deserialize<'de> for CreateAgentDispatchRequest { metadata: metadata__.unwrap_or_default(), restart_policy: restart_policy__.unwrap_or_default(), deployment: deployment__.unwrap_or_default(), + attributes: attributes__.unwrap_or_default(), }) } } @@ -10725,10 +10784,16 @@ impl serde::Serialize for DataConfig { { use serde::ser::SerializeStruct; let mut len = 0; + if self.capture_all { + len += 1; + } if !self.selectors.is_empty() { len += 1; } let mut struct_ser = serializer.serialize_struct("livekit.DataConfig", len)?; + if self.capture_all { + struct_ser.serialize_field("captureAll", &self.capture_all)?; + } if !self.selectors.is_empty() { struct_ser.serialize_field("selectors", &self.selectors)?; } @@ -10742,11 +10807,14 @@ impl<'de> serde::Deserialize<'de> for DataConfig { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ + "capture_all", + "captureAll", "selectors", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { + CaptureAll, Selectors, __SkipField__, } @@ -10770,6 +10838,7 @@ impl<'de> serde::Deserialize<'de> for DataConfig { E: serde::de::Error, { match value { + "captureAll" | "capture_all" => Ok(GeneratedField::CaptureAll), "selectors" => Ok(GeneratedField::Selectors), _ => Ok(GeneratedField::__SkipField__), } @@ -10790,9 +10859,16 @@ impl<'de> serde::Deserialize<'de> for DataConfig { where V: serde::de::MapAccess<'de>, { + let mut capture_all__ = None; let mut selectors__ = None; while let Some(k) = map_.next_key()? { match k { + GeneratedField::CaptureAll => { + if capture_all__.is_some() { + return Err(serde::de::Error::duplicate_field("captureAll")); + } + capture_all__ = Some(map_.next_value()?); + } GeneratedField::Selectors => { if selectors__.is_some() { return Err(serde::de::Error::duplicate_field("selectors")); @@ -10805,6 +10881,7 @@ impl<'de> serde::Deserialize<'de> for DataConfig { } } Ok(DataConfig { + capture_all: capture_all__.unwrap_or_default(), selectors: selectors__.unwrap_or_default(), }) } @@ -11260,9 +11337,6 @@ impl serde::Serialize for DataSelector { data_selector::Match::ParticipantIdentity(v) => { struct_ser.serialize_field("participantIdentity", v)?; } - data_selector::Match::Topic(v) => { - struct_ser.serialize_field("topic", v)?; - } } } struct_ser.end() @@ -11279,14 +11353,12 @@ impl<'de> serde::Deserialize<'de> for DataSelector { "trackId", "participant_identity", "participantIdentity", - "topic", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { TrackId, ParticipantIdentity, - Topic, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -11311,7 +11383,6 @@ impl<'de> serde::Deserialize<'de> for DataSelector { match value { "trackId" | "track_id" => Ok(GeneratedField::TrackId), "participantIdentity" | "participant_identity" => Ok(GeneratedField::ParticipantIdentity), - "topic" => Ok(GeneratedField::Topic), _ => Ok(GeneratedField::__SkipField__), } } @@ -11346,12 +11417,6 @@ impl<'de> serde::Deserialize<'de> for DataSelector { } r#match__ = map_.next_value::<::std::option::Option<_>>()?.map(data_selector::Match::ParticipantIdentity); } - GeneratedField::Topic => { - if r#match__.is_some() { - return Err(serde::de::Error::duplicate_field("topic")); - } - r#match__ = map_.next_value::<::std::option::Option<_>>()?.map(data_selector::Match::Topic); - } GeneratedField::__SkipField__ => { let _ = map_.next_value::()?; } @@ -11711,6 +11776,77 @@ impl<'de> serde::Deserialize<'de> for data_stream::Chunk { deserializer.deserialize_struct("livekit.DataStream.Chunk", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for data_stream::CompressionType { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::None => "NONE", + Self::DeflateRaw => "DEFLATE_RAW", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for data_stream::CompressionType { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "NONE", + "DEFLATE_RAW", + ]; + + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = data_stream::CompressionType; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "NONE" => Ok(data_stream::CompressionType::None), + "DEFLATE_RAW" => Ok(data_stream::CompressionType::DeflateRaw), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} impl serde::Serialize for data_stream::Header { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -11740,6 +11876,12 @@ impl serde::Serialize for data_stream::Header { if !self.attributes.is_empty() { len += 1; } + if self.inline_content.is_some() { + len += 1; + } + if self.compression != 0 { + len += 1; + } if self.content_header.is_some() { len += 1; } @@ -11771,6 +11913,16 @@ impl serde::Serialize for data_stream::Header { if !self.attributes.is_empty() { struct_ser.serialize_field("attributes", &self.attributes)?; } + if let Some(v) = self.inline_content.as_ref() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("inlineContent", pbjson::private::base64::encode(&v).as_str())?; + } + if self.compression != 0 { + let v = data_stream::CompressionType::try_from(self.compression) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.compression)))?; + struct_ser.serialize_field("compression", &v)?; + } if let Some(v) = self.content_header.as_ref() { match v { data_stream::header::ContentHeader::TextHeader(v) => { @@ -11802,6 +11954,9 @@ impl<'de> serde::Deserialize<'de> for data_stream::Header { "encryption_type", "encryptionType", "attributes", + "inline_content", + "inlineContent", + "compression", "text_header", "textHeader", "byte_header", @@ -11817,6 +11972,8 @@ impl<'de> serde::Deserialize<'de> for data_stream::Header { TotalLength, EncryptionType, Attributes, + InlineContent, + Compression, TextHeader, ByteHeader, __SkipField__, @@ -11848,6 +12005,8 @@ impl<'de> serde::Deserialize<'de> for data_stream::Header { "totalLength" | "total_length" => Ok(GeneratedField::TotalLength), "encryptionType" | "encryption_type" => Ok(GeneratedField::EncryptionType), "attributes" => Ok(GeneratedField::Attributes), + "inlineContent" | "inline_content" => Ok(GeneratedField::InlineContent), + "compression" => Ok(GeneratedField::Compression), "textHeader" | "text_header" => Ok(GeneratedField::TextHeader), "byteHeader" | "byte_header" => Ok(GeneratedField::ByteHeader), _ => Ok(GeneratedField::__SkipField__), @@ -11876,6 +12035,8 @@ impl<'de> serde::Deserialize<'de> for data_stream::Header { let mut total_length__ = None; let mut encryption_type__ = None; let mut attributes__ = None; + let mut inline_content__ = None; + let mut compression__ = None; let mut content_header__ = None; while let Some(k) = map_.next_key()? { match k { @@ -11927,6 +12088,20 @@ impl<'de> serde::Deserialize<'de> for data_stream::Header { map_.next_value::>()? ); } + GeneratedField::InlineContent => { + if inline_content__.is_some() { + return Err(serde::de::Error::duplicate_field("inlineContent")); + } + inline_content__ = + map_.next_value::<::std::option::Option<::pbjson::private::BytesDeserialize<_>>>()?.map(|x| x.0) + ; + } + GeneratedField::Compression => { + if compression__.is_some() { + return Err(serde::de::Error::duplicate_field("compression")); + } + compression__ = Some(map_.next_value::()? as i32); + } GeneratedField::TextHeader => { if content_header__.is_some() { return Err(serde::de::Error::duplicate_field("textHeader")); @@ -11954,6 +12129,8 @@ impl<'de> serde::Deserialize<'de> for data_stream::Header { total_length: total_length__, encryption_type: encryption_type__.unwrap_or_default(), attributes: attributes__.unwrap_or_default(), + inline_content: inline_content__, + compression: compression__.unwrap_or_default(), content_header: content_header__, }) } @@ -14944,6 +15121,9 @@ impl serde::Serialize for EgressInfo { } if let Some(v) = self.request.as_ref() { match v { + egress_info::Request::Egress(v) => { + struct_ser.serialize_field("egress", v)?; + } egress_info::Request::Replay(v) => { struct_ser.serialize_field("replay", v)?; } @@ -15020,6 +15200,7 @@ impl<'de> serde::Deserialize<'de> for EgressInfo { "backupStorageUsed", "retry_count", "retryCount", + "egress", "replay", "room_composite", "roomComposite", @@ -15053,6 +15234,7 @@ impl<'de> serde::Deserialize<'de> for EgressInfo { ManifestLocation, BackupStorageUsed, RetryCount, + Egress, Replay, RoomComposite, Web, @@ -15102,6 +15284,7 @@ impl<'de> serde::Deserialize<'de> for EgressInfo { "manifestLocation" | "manifest_location" => Ok(GeneratedField::ManifestLocation), "backupStorageUsed" | "backup_storage_used" => Ok(GeneratedField::BackupStorageUsed), "retryCount" | "retry_count" => Ok(GeneratedField::RetryCount), + "egress" => Ok(GeneratedField::Egress), "replay" => Ok(GeneratedField::Replay), "roomComposite" | "room_composite" => Ok(GeneratedField::RoomComposite), "web" => Ok(GeneratedField::Web), @@ -15270,6 +15453,13 @@ impl<'de> serde::Deserialize<'de> for EgressInfo { Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } + GeneratedField::Egress => { + if request__.is_some() { + return Err(serde::de::Error::duplicate_field("egress")); + } + request__ = map_.next_value::<::std::option::Option<_>>()?.map(egress_info::Request::Egress) +; + } GeneratedField::Replay => { if request__.is_some() { return Err(serde::de::Error::duplicate_field("replay")); @@ -21322,6 +21512,9 @@ impl serde::Serialize for Job { if !self.deployment.is_empty() { len += 1; } + if !self.attributes.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("livekit.Job", len)?; if !self.id.is_empty() { struct_ser.serialize_field("id", &self.id)?; @@ -21358,6 +21551,9 @@ impl serde::Serialize for Job { if !self.deployment.is_empty() { struct_ser.serialize_field("deployment", &self.deployment)?; } + if !self.attributes.is_empty() { + struct_ser.serialize_field("attributes", &self.attributes)?; + } struct_ser.end() } } @@ -21382,6 +21578,7 @@ impl<'de> serde::Deserialize<'de> for Job { "enable_recording", "enableRecording", "deployment", + "attributes", ]; #[allow(clippy::enum_variant_names)] @@ -21397,6 +21594,7 @@ impl<'de> serde::Deserialize<'de> for Job { State, EnableRecording, Deployment, + Attributes, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -21430,6 +21628,7 @@ impl<'de> serde::Deserialize<'de> for Job { "state" => Ok(GeneratedField::State), "enableRecording" | "enable_recording" => Ok(GeneratedField::EnableRecording), "deployment" => Ok(GeneratedField::Deployment), + "attributes" => Ok(GeneratedField::Attributes), _ => Ok(GeneratedField::__SkipField__), } } @@ -21460,6 +21659,7 @@ impl<'de> serde::Deserialize<'de> for Job { let mut state__ = None; let mut enable_recording__ = None; let mut deployment__ = None; + let mut attributes__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Id => { @@ -21528,6 +21728,14 @@ impl<'de> serde::Deserialize<'de> for Job { } deployment__ = Some(map_.next_value()?); } + GeneratedField::Attributes => { + if attributes__.is_some() { + return Err(serde::de::Error::duplicate_field("attributes")); + } + attributes__ = Some( + map_.next_value::>()? + ); + } GeneratedField::__SkipField__ => { let _ = map_.next_value::()?; } @@ -21545,6 +21753,7 @@ impl<'de> serde::Deserialize<'de> for Job { state: state__, enable_recording: enable_recording__.unwrap_or_default(), deployment: deployment__.unwrap_or_default(), + attributes: attributes__.unwrap_or_default(), }) } } @@ -25308,9 +25517,6 @@ impl serde::Serialize for MediaSource { if self.audio.is_some() { len += 1; } - if self.data.is_some() { - len += 1; - } if self.video.is_some() { len += 1; } @@ -25318,9 +25524,6 @@ impl serde::Serialize for MediaSource { if let Some(v) = self.audio.as_ref() { struct_ser.serialize_field("audio", v)?; } - if let Some(v) = self.data.as_ref() { - struct_ser.serialize_field("data", v)?; - } if let Some(v) = self.video.as_ref() { match v { media_source::Video::VideoTrackId(v) => { @@ -25342,7 +25545,6 @@ impl<'de> serde::Deserialize<'de> for MediaSource { { const FIELDS: &[&str] = &[ "audio", - "data", "video_track_id", "videoTrackId", "participant_video", @@ -25352,7 +25554,6 @@ impl<'de> serde::Deserialize<'de> for MediaSource { #[allow(clippy::enum_variant_names)] enum GeneratedField { Audio, - Data, VideoTrackId, ParticipantVideo, __SkipField__, @@ -25378,7 +25579,6 @@ impl<'de> serde::Deserialize<'de> for MediaSource { { match value { "audio" => Ok(GeneratedField::Audio), - "data" => Ok(GeneratedField::Data), "videoTrackId" | "video_track_id" => Ok(GeneratedField::VideoTrackId), "participantVideo" | "participant_video" => Ok(GeneratedField::ParticipantVideo), _ => Ok(GeneratedField::__SkipField__), @@ -25401,7 +25601,6 @@ impl<'de> serde::Deserialize<'de> for MediaSource { V: serde::de::MapAccess<'de>, { let mut audio__ = None; - let mut data__ = None; let mut video__ = None; while let Some(k) = map_.next_key()? { match k { @@ -25411,12 +25610,6 @@ impl<'de> serde::Deserialize<'de> for MediaSource { } audio__ = map_.next_value()?; } - GeneratedField::Data => { - if data__.is_some() { - return Err(serde::de::Error::duplicate_field("data")); - } - data__ = map_.next_value()?; - } GeneratedField::VideoTrackId => { if video__.is_some() { return Err(serde::de::Error::duplicate_field("videoTrackId")); @@ -25437,7 +25630,6 @@ impl<'de> serde::Deserialize<'de> for MediaSource { } Ok(MediaSource { audio: audio__, - data: data__, video: video__, }) } @@ -27387,6 +27579,9 @@ impl serde::Serialize for ParticipantInfo { if self.client_protocol != 0 { len += 1; } + if !self.capabilities.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("livekit.ParticipantInfo", len)?; if !self.sid.is_empty() { struct_ser.serialize_field("sid", &self.sid)?; @@ -27456,6 +27651,13 @@ impl serde::Serialize for ParticipantInfo { if self.client_protocol != 0 { struct_ser.serialize_field("clientProtocol", &self.client_protocol)?; } + if !self.capabilities.is_empty() { + let v = self.capabilities.iter().cloned().map(|v| { + client_info::Capability::try_from(v) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", v))) + }).collect::, _>>()?; + struct_ser.serialize_field("capabilities", &v)?; + } struct_ser.end() } } @@ -27491,6 +27693,7 @@ impl<'de> serde::Deserialize<'de> for ParticipantInfo { "dataTracks", "client_protocol", "clientProtocol", + "capabilities", ]; #[allow(clippy::enum_variant_names)] @@ -27513,6 +27716,7 @@ impl<'de> serde::Deserialize<'de> for ParticipantInfo { KindDetails, DataTracks, ClientProtocol, + Capabilities, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -27553,6 +27757,7 @@ impl<'de> serde::Deserialize<'de> for ParticipantInfo { "kindDetails" | "kind_details" => Ok(GeneratedField::KindDetails), "dataTracks" | "data_tracks" => Ok(GeneratedField::DataTracks), "clientProtocol" | "client_protocol" => Ok(GeneratedField::ClientProtocol), + "capabilities" => Ok(GeneratedField::Capabilities), _ => Ok(GeneratedField::__SkipField__), } } @@ -27590,6 +27795,7 @@ impl<'de> serde::Deserialize<'de> for ParticipantInfo { let mut kind_details__ = None; let mut data_tracks__ = None; let mut client_protocol__ = None; + let mut capabilities__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Sid => { @@ -27710,6 +27916,12 @@ impl<'de> serde::Deserialize<'de> for ParticipantInfo { Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } + GeneratedField::Capabilities => { + if capabilities__.is_some() { + return Err(serde::de::Error::duplicate_field("capabilities")); + } + capabilities__ = Some(map_.next_value::>()?.into_iter().map(|x| x as i32).collect()); + } GeneratedField::__SkipField__ => { let _ = map_.next_value::()?; } @@ -27734,6 +27946,7 @@ impl<'de> serde::Deserialize<'de> for ParticipantInfo { kind_details: kind_details__.unwrap_or_default(), data_tracks: data_tracks__.unwrap_or_default(), client_protocol: client_protocol__.unwrap_or_default(), + capabilities: capabilities__.unwrap_or_default(), }) } } @@ -33433,6 +33646,9 @@ impl serde::Serialize for RoomAgentDispatch { if !self.deployment.is_empty() { len += 1; } + if !self.attributes.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("livekit.RoomAgentDispatch", len)?; if !self.agent_name.is_empty() { struct_ser.serialize_field("agentName", &self.agent_name)?; @@ -33448,6 +33664,9 @@ impl serde::Serialize for RoomAgentDispatch { if !self.deployment.is_empty() { struct_ser.serialize_field("deployment", &self.deployment)?; } + if !self.attributes.is_empty() { + struct_ser.serialize_field("attributes", &self.attributes)?; + } struct_ser.end() } } @@ -33464,6 +33683,7 @@ impl<'de> serde::Deserialize<'de> for RoomAgentDispatch { "restart_policy", "restartPolicy", "deployment", + "attributes", ]; #[allow(clippy::enum_variant_names)] @@ -33472,6 +33692,7 @@ impl<'de> serde::Deserialize<'de> for RoomAgentDispatch { Metadata, RestartPolicy, Deployment, + Attributes, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -33498,6 +33719,7 @@ impl<'de> serde::Deserialize<'de> for RoomAgentDispatch { "metadata" => Ok(GeneratedField::Metadata), "restartPolicy" | "restart_policy" => Ok(GeneratedField::RestartPolicy), "deployment" => Ok(GeneratedField::Deployment), + "attributes" => Ok(GeneratedField::Attributes), _ => Ok(GeneratedField::__SkipField__), } } @@ -33521,6 +33743,7 @@ impl<'de> serde::Deserialize<'de> for RoomAgentDispatch { let mut metadata__ = None; let mut restart_policy__ = None; let mut deployment__ = None; + let mut attributes__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::AgentName => { @@ -33547,6 +33770,14 @@ impl<'de> serde::Deserialize<'de> for RoomAgentDispatch { } deployment__ = Some(map_.next_value()?); } + GeneratedField::Attributes => { + if attributes__.is_some() { + return Err(serde::de::Error::duplicate_field("attributes")); + } + attributes__ = Some( + map_.next_value::>()? + ); + } GeneratedField::__SkipField__ => { let _ = map_.next_value::()?; } @@ -33557,6 +33788,7 @@ impl<'de> serde::Deserialize<'de> for RoomAgentDispatch { metadata: metadata__.unwrap_or_default(), restart_policy: restart_policy__.unwrap_or_default(), deployment: deployment__.unwrap_or_default(), + attributes: attributes__.unwrap_or_default(), }) } } @@ -37738,6 +37970,9 @@ impl serde::Serialize for SipInboundTrunkInfo { if self.media_encryption != 0 { len += 1; } + if self.media.is_some() { + len += 1; + } if self.created_at.is_some() { len += 1; } @@ -37800,6 +38035,9 @@ impl serde::Serialize for SipInboundTrunkInfo { .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.media_encryption)))?; struct_ser.serialize_field("mediaEncryption", &v)?; } + if let Some(v) = self.media.as_ref() { + struct_ser.serialize_field("media", v)?; + } if let Some(v) = self.created_at.as_ref() { struct_ser.serialize_field("createdAt", v)?; } @@ -37846,6 +38084,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkInfo { "krispEnabled", "media_encryption", "mediaEncryption", + "media", "created_at", "createdAt", "updated_at", @@ -37871,6 +38110,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkInfo { MaxCallDuration, KrispEnabled, MediaEncryption, + Media, CreatedAt, UpdatedAt, __SkipField__, @@ -37912,6 +38152,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkInfo { "maxCallDuration" | "max_call_duration" => Ok(GeneratedField::MaxCallDuration), "krispEnabled" | "krisp_enabled" => Ok(GeneratedField::KrispEnabled), "mediaEncryption" | "media_encryption" => Ok(GeneratedField::MediaEncryption), + "media" => Ok(GeneratedField::Media), "createdAt" | "created_at" => Ok(GeneratedField::CreatedAt), "updatedAt" | "updated_at" => Ok(GeneratedField::UpdatedAt), _ => Ok(GeneratedField::__SkipField__), @@ -37950,6 +38191,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkInfo { let mut max_call_duration__ = None; let mut krisp_enabled__ = None; let mut media_encryption__ = None; + let mut media__ = None; let mut created_at__ = None; let mut updated_at__ = None; while let Some(k) = map_.next_key()? { @@ -38062,6 +38304,12 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkInfo { } media_encryption__ = Some(map_.next_value::()? as i32); } + GeneratedField::Media => { + if media__.is_some() { + return Err(serde::de::Error::duplicate_field("media")); + } + media__ = map_.next_value()?; + } GeneratedField::CreatedAt => { if created_at__.is_some() { return Err(serde::de::Error::duplicate_field("createdAt")); @@ -38097,6 +38345,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkInfo { max_call_duration: max_call_duration__, krisp_enabled: krisp_enabled__.unwrap_or_default(), media_encryption: media_encryption__.unwrap_or_default(), + media: media__, created_at: created_at__, updated_at: updated_at__, }) @@ -38140,6 +38389,9 @@ impl serde::Serialize for SipInboundTrunkUpdate { if self.media_encryption.is_some() { len += 1; } + if self.media.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("livekit.SIPInboundTrunkUpdate", len)?; if let Some(v) = self.numbers.as_ref() { struct_ser.serialize_field("numbers", v)?; @@ -38170,6 +38422,9 @@ impl serde::Serialize for SipInboundTrunkUpdate { .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", *v)))?; struct_ser.serialize_field("mediaEncryption", &v)?; } + if let Some(v) = self.media.as_ref() { + struct_ser.serialize_field("media", v)?; + } struct_ser.end() } } @@ -38195,6 +38450,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkUpdate { "metadata", "media_encryption", "mediaEncryption", + "media", ]; #[allow(clippy::enum_variant_names)] @@ -38208,6 +38464,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkUpdate { Name, Metadata, MediaEncryption, + Media, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -38239,6 +38496,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkUpdate { "name" => Ok(GeneratedField::Name), "metadata" => Ok(GeneratedField::Metadata), "mediaEncryption" | "media_encryption" => Ok(GeneratedField::MediaEncryption), + "media" => Ok(GeneratedField::Media), _ => Ok(GeneratedField::__SkipField__), } } @@ -38267,6 +38525,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkUpdate { let mut name__ = None; let mut metadata__ = None; let mut media_encryption__ = None; + let mut media__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Numbers => { @@ -38323,6 +38582,12 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkUpdate { } media_encryption__ = map_.next_value::<::std::option::Option>()?.map(|x| x as i32); } + GeneratedField::Media => { + if media__.is_some() { + return Err(serde::de::Error::duplicate_field("media")); + } + media__ = map_.next_value()?; + } GeneratedField::__SkipField__ => { let _ = map_.next_value::()?; } @@ -38338,6 +38603,7 @@ impl<'de> serde::Deserialize<'de> for SipInboundTrunkUpdate { name: name__, metadata: metadata__, media_encryption: media_encryption__, + media: media__, }) } } @@ -38974,6 +39240,9 @@ impl serde::Serialize for SipOutboundTrunkInfo { if self.media_encryption != 0 { len += 1; } + if self.media.is_some() { + len += 1; + } if !self.from_host.is_empty() { len += 1; } @@ -39032,6 +39301,9 @@ impl serde::Serialize for SipOutboundTrunkInfo { .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.media_encryption)))?; struct_ser.serialize_field("mediaEncryption", &v)?; } + if let Some(v) = self.media.as_ref() { + struct_ser.serialize_field("media", v)?; + } if !self.from_host.is_empty() { struct_ser.serialize_field("fromHost", &self.from_host)?; } @@ -39073,6 +39345,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkInfo { "includeHeaders", "media_encryption", "mediaEncryption", + "media", "from_host", "fromHost", "created_at", @@ -39097,6 +39370,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkInfo { AttributesToHeaders, IncludeHeaders, MediaEncryption, + Media, FromHost, CreatedAt, UpdatedAt, @@ -39136,6 +39410,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkInfo { "attributesToHeaders" | "attributes_to_headers" => Ok(GeneratedField::AttributesToHeaders), "includeHeaders" | "include_headers" => Ok(GeneratedField::IncludeHeaders), "mediaEncryption" | "media_encryption" => Ok(GeneratedField::MediaEncryption), + "media" => Ok(GeneratedField::Media), "fromHost" | "from_host" => Ok(GeneratedField::FromHost), "createdAt" | "created_at" => Ok(GeneratedField::CreatedAt), "updatedAt" | "updated_at" => Ok(GeneratedField::UpdatedAt), @@ -39172,6 +39447,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkInfo { let mut attributes_to_headers__ = None; let mut include_headers__ = None; let mut media_encryption__ = None; + let mut media__ = None; let mut from_host__ = None; let mut created_at__ = None; let mut updated_at__ = None; @@ -39267,6 +39543,12 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkInfo { } media_encryption__ = Some(map_.next_value::()? as i32); } + GeneratedField::Media => { + if media__.is_some() { + return Err(serde::de::Error::duplicate_field("media")); + } + media__ = map_.next_value()?; + } GeneratedField::FromHost => { if from_host__.is_some() { return Err(serde::de::Error::duplicate_field("fromHost")); @@ -39305,6 +39587,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkInfo { attributes_to_headers: attributes_to_headers__.unwrap_or_default(), include_headers: include_headers__.unwrap_or_default(), media_encryption: media_encryption__.unwrap_or_default(), + media: media__, from_host: from_host__.unwrap_or_default(), created_at: created_at__, updated_at: updated_at__, @@ -39349,6 +39632,9 @@ impl serde::Serialize for SipOutboundTrunkUpdate { if self.media_encryption.is_some() { len += 1; } + if self.media.is_some() { + len += 1; + } if self.from_host.is_some() { len += 1; } @@ -39384,6 +39670,9 @@ impl serde::Serialize for SipOutboundTrunkUpdate { .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", *v)))?; struct_ser.serialize_field("mediaEncryption", &v)?; } + if let Some(v) = self.media.as_ref() { + struct_ser.serialize_field("media", v)?; + } if let Some(v) = self.from_host.as_ref() { struct_ser.serialize_field("fromHost", v)?; } @@ -39410,6 +39699,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkUpdate { "metadata", "media_encryption", "mediaEncryption", + "media", "from_host", "fromHost", ]; @@ -39425,6 +39715,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkUpdate { Name, Metadata, MediaEncryption, + Media, FromHost, __SkipField__, } @@ -39457,6 +39748,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkUpdate { "name" => Ok(GeneratedField::Name), "metadata" => Ok(GeneratedField::Metadata), "mediaEncryption" | "media_encryption" => Ok(GeneratedField::MediaEncryption), + "media" => Ok(GeneratedField::Media), "fromHost" | "from_host" => Ok(GeneratedField::FromHost), _ => Ok(GeneratedField::__SkipField__), } @@ -39486,6 +39778,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkUpdate { let mut name__ = None; let mut metadata__ = None; let mut media_encryption__ = None; + let mut media__ = None; let mut from_host__ = None; while let Some(k) = map_.next_key()? { match k { @@ -39543,6 +39836,12 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkUpdate { } media_encryption__ = map_.next_value::<::std::option::Option>()?.map(|x| x as i32); } + GeneratedField::Media => { + if media__.is_some() { + return Err(serde::de::Error::duplicate_field("media")); + } + media__ = map_.next_value()?; + } GeneratedField::FromHost => { if from_host__.is_some() { return Err(serde::de::Error::duplicate_field("fromHost")); @@ -39564,6 +39863,7 @@ impl<'de> serde::Deserialize<'de> for SipOutboundTrunkUpdate { name: name__, metadata: metadata__, media_encryption: media_encryption__, + media: media__, from_host: from_host__, }) } From d9f101a713ca68f6007a3fab5715c6e08328fb6d Mon Sep 17 00:00:00 2001 From: David Chen Date: Mon, 22 Jun 2026 15:25:44 -0700 Subject: [PATCH 07/11] add changeset --- .changeset/frame-metadata-user-data-addition.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/frame-metadata-user-data-addition.md diff --git a/.changeset/frame-metadata-user-data-addition.md b/.changeset/frame-metadata-user-data-addition.md new file mode 100644 index 000000000..8fba9917b --- /dev/null +++ b/.changeset/frame-metadata-user-data-addition.md @@ -0,0 +1,7 @@ +--- +livekit: minor +livekit-ffi: minor +livekit-protocol: minor +--- + +Add `user_data` support to frame metadata, allowing arbitrary application-supplied bytes to be attached to a video frame via the `PTF_USER_DATA` packet trailer feature. From 80fec169aad0f9a264f436abc173877b477f9c06 Mon Sep 17 00:00:00 2001 From: David Chen Date: Mon, 22 Jun 2026 15:33:34 -0700 Subject: [PATCH 08/11] fix build error, new field in proto --- livekit-api/src/services/sip.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/livekit-api/src/services/sip.rs b/livekit-api/src/services/sip.rs index 874c8a14a..81c6d7a34 100644 --- a/livekit-api/src/services/sip.rs +++ b/livekit-api/src/services/sip.rs @@ -204,6 +204,7 @@ impl SIPClient { // TODO: support these attributes include_headers: Default::default(), media_encryption: Default::default(), + media: Default::default(), created_at: Default::default(), updated_at: Default::default(), }), @@ -247,6 +248,7 @@ impl SIPClient { // TODO: support these attributes include_headers: Default::default(), media_encryption: Default::default(), + media: Default::default(), destination_country: Default::default(), created_at: Default::default(), updated_at: Default::default(), From 68e88b5730c174f19e1729e208a5918deed202ef Mon Sep 17 00:00:00 2001 From: David Chen Date: Mon, 22 Jun 2026 15:52:44 -0700 Subject: [PATCH 09/11] fix build issues from protocol bump --- livekit-ffi/src/conversion/room.rs | 2 ++ livekit-uniffi/src/access_token.rs | 1 + livekit/src/room/data_stream/outgoing.rs | 12 ++++++++++++ livekit/src/room/participant/local_participant.rs | 1 + 4 files changed, 16 insertions(+) diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index a42de2342..3059ddf60 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -545,6 +545,8 @@ impl From for livekit_protocol::data_stream::Header attributes: msg.attributes, content_header, encryption_type: 0, + inline_content: None, + compression: livekit_protocol::data_stream::CompressionType::None.into(), } } } diff --git a/livekit-uniffi/src/access_token.rs b/livekit-uniffi/src/access_token.rs index 02115a4a1..b5b205877 100644 --- a/livekit-uniffi/src/access_token.rs +++ b/livekit-uniffi/src/access_token.rs @@ -71,6 +71,7 @@ pub struct RoomAgentDispatch { pub metadata: String, pub restart_policy: i32, pub deployment: String, + pub attributes: HashMap, } /// Room configuration diff --git a/livekit/src/room/data_stream/outgoing.rs b/livekit/src/room/data_stream/outgoing.rs index 36cffdad5..56e54ffd8 100644 --- a/livekit/src/room/data_stream/outgoing.rs +++ b/livekit/src/room/data_stream/outgoing.rs @@ -308,6 +308,8 @@ impl OutgoingStreamManager { total_length: None, encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, + inline_content: None, + compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::TextHeader( text_header.clone(), )), @@ -334,6 +336,8 @@ impl OutgoingStreamManager { total_length: options.total_length, encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, + inline_content: None, + compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( byte_header.clone(), )), @@ -371,6 +375,8 @@ impl OutgoingStreamManager { total_length: Some(text.bytes().len() as u64), encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, + inline_content: None, + compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::TextHeader( text_header.clone(), )), @@ -419,6 +425,8 @@ impl OutgoingStreamManager { total_length: Some(bytes.len() as u64), // not overridable encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, + inline_content: None, + compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( byte_header.clone(), )), @@ -462,6 +470,8 @@ impl OutgoingStreamManager { total_length: Some(file_size as u64), // not overridable encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, + inline_content: None, + compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( byte_header.clone(), )), @@ -523,6 +533,8 @@ mod tests { total_length: None, encryption_type: proto::encryption::Type::None.into(), attributes: HashMap::new(), + inline_content: None, + compression: proto::data_stream::CompressionType::None.into(), content_header: None, }; diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 956bfc96c..12c4ddad9 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -1011,6 +1011,7 @@ mod tests { frame_metadata_features: FrameMetadataFeatures { user_timestamp: true, frame_id: false, + user_data: false, }, ..Default::default() }; From 5671d3f3b5785034244c35427d08aca0b689b585 Mon Sep 17 00:00:00 2001 From: David Chen Date: Tue, 23 Jun 2026 23:24:18 -0700 Subject: [PATCH 10/11] add check for user data feature --- livekit/src/room/e2ee/manager.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/livekit/src/room/e2ee/manager.rs b/livekit/src/room/e2ee/manager.rs index 2b4f23553..5ee35cd3e 100644 --- a/livekit/src/room/e2ee/manager.rs +++ b/livekit/src/room/e2ee/manager.rs @@ -43,6 +43,7 @@ fn has_packet_trailer_features(features: &[i32]) -> bool { features.iter().any(|f| { *f == PacketTrailerFeature::PtfUserTimestamp as i32 || *f == PacketTrailerFeature::PtfFrameId as i32 + || *f == PacketTrailerFeature::PtfUserData as i32 }) } @@ -375,4 +376,11 @@ mod tests { assert!(needs_video_receiver_packet_trailer_handler(&features)); } + + #[test] + fn receiver_packet_trailer_handler_is_needed_for_user_data() { + let features = [PacketTrailerFeature::PtfUserData as i32]; + + assert!(needs_video_receiver_packet_trailer_handler(&features)); + } } From cc672f9a5edf12789d8b4226ecd271dfa5a25a47 Mon Sep 17 00:00:00 2001 From: David Chen Date: Tue, 23 Jun 2026 23:39:14 -0700 Subject: [PATCH 11/11] fix dupe properties --- livekit-api/src/services/sip.rs | 2 -- livekit/src/room/data_stream/outgoing.rs | 10 ---------- 2 files changed, 12 deletions(-) diff --git a/livekit-api/src/services/sip.rs b/livekit-api/src/services/sip.rs index 1abd175d0..fde982b16 100644 --- a/livekit-api/src/services/sip.rs +++ b/livekit-api/src/services/sip.rs @@ -204,7 +204,6 @@ impl SIPClient { // TODO: support these attributes include_headers: Default::default(), media_encryption: Default::default(), - media: Default::default(), created_at: Default::default(), updated_at: Default::default(), media: Default::default(), @@ -249,7 +248,6 @@ impl SIPClient { // TODO: support these attributes include_headers: Default::default(), media_encryption: Default::default(), - media: Default::default(), destination_country: Default::default(), created_at: Default::default(), updated_at: Default::default(), diff --git a/livekit/src/room/data_stream/outgoing.rs b/livekit/src/room/data_stream/outgoing.rs index 735a9e596..bace26ef2 100644 --- a/livekit/src/room/data_stream/outgoing.rs +++ b/livekit/src/room/data_stream/outgoing.rs @@ -308,8 +308,6 @@ impl OutgoingStreamManager { total_length: None, encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, - inline_content: None, - compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::TextHeader( text_header.clone(), )), @@ -339,8 +337,6 @@ impl OutgoingStreamManager { total_length: options.total_length, encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, - inline_content: None, - compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( byte_header.clone(), )), @@ -381,8 +377,6 @@ impl OutgoingStreamManager { total_length: Some(text.bytes().len() as u64), encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, - inline_content: None, - compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::TextHeader( text_header.clone(), )), @@ -434,8 +428,6 @@ impl OutgoingStreamManager { total_length: Some(bytes.len() as u64), // not overridable encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, - inline_content: None, - compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( byte_header.clone(), )), @@ -482,8 +474,6 @@ impl OutgoingStreamManager { total_length: Some(file_size as u64), // not overridable encryption_type: proto::encryption::Type::None.into(), attributes: options.attributes, - inline_content: None, - compression: proto::data_stream::CompressionType::None.into(), content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( byte_header.clone(), )),