@@ -2,37 +2,98 @@ use super::PeerForward;
22use crate :: forward:: rtcp:: RtcpMessage ;
33use crate :: stream:: source:: { MediaPacket , StateChangeEvent } ;
44use anyhow:: Result ;
5+ use anyhow:: anyhow;
56use std:: sync:: Arc ;
67use std:: time:: Duration ;
78use tokio:: sync:: broadcast;
89use tracing:: { debug, error, info, trace, warn} ;
910use webrtc:: util:: Marshal ;
1011
1112const LOG_PACKET_INTERVAL : u64 = 100 ;
12- const CHANNEL_VIDEO_RTP : u8 = 0 ;
13- const CHANNEL_VIDEO_RTCP : u8 = 1 ;
14- const CHANNEL_AUDIO_RTP : u8 = 2 ;
15- const CHANNEL_AUDIO_RTCP : u8 = 3 ;
1613
14+ #[ derive( Debug , Clone , Copy ) ]
15+ struct ChannelMapping {
16+ video_rtp : Option < u8 > ,
17+ video_rtcp : Option < u8 > ,
18+ audio_rtp : Option < u8 > ,
19+ audio_rtcp : Option < u8 > ,
20+ }
21+
22+ impl ChannelMapping {
23+ fn new ( has_video : bool , has_audio : bool ) -> Self {
24+ match ( has_video, has_audio) {
25+ ( true , false ) => Self {
26+ video_rtp : Some ( 0 ) ,
27+ video_rtcp : Some ( 1 ) ,
28+ audio_rtp : None ,
29+ audio_rtcp : None ,
30+ } ,
31+ ( false , true ) => Self {
32+ video_rtp : None ,
33+ video_rtcp : None ,
34+ audio_rtp : Some ( 0 ) ,
35+ audio_rtcp : Some ( 1 ) ,
36+ } ,
37+ ( true , true ) => Self {
38+ video_rtp : Some ( 0 ) ,
39+ video_rtcp : Some ( 1 ) ,
40+ audio_rtp : Some ( 2 ) ,
41+ audio_rtcp : Some ( 3 ) ,
42+ } ,
43+ ( false , false ) => Self {
44+ video_rtp : None ,
45+ video_rtcp : None ,
46+ audio_rtp : None ,
47+ audio_rtcp : None ,
48+ } ,
49+ }
50+ }
51+
52+ fn is_video_rtp ( & self , channel : u8 ) -> bool {
53+ self . video_rtp == Some ( channel)
54+ }
55+
56+ fn is_video_rtcp ( & self , channel : u8 ) -> bool {
57+ self . video_rtcp == Some ( channel)
58+ }
59+
60+ fn is_audio_rtp ( & self , channel : u8 ) -> bool {
61+ self . audio_rtp == Some ( channel)
62+ }
63+
64+ fn is_audio_rtcp ( & self , channel : u8 ) -> bool {
65+ self . audio_rtcp == Some ( channel)
66+ }
67+ }
1768pub struct SourceBridge {
1869 source_id : String ,
1970 forward : Arc < PeerForward > ,
2071 tasks : Arc < tokio:: sync:: Mutex < Vec < tokio:: task:: JoinHandle < ( ) > > > > ,
2172 shutdown_tx : Option < tokio:: sync:: broadcast:: Sender < ( ) > > ,
2273
74+ channel_mapping : ChannelMapping ,
75+
2376 #[ cfg( feature = "source" ) ]
2477 rtcp_to_source_tx : Option < tokio:: sync:: mpsc:: UnboundedSender < Vec < u8 > > > ,
2578 #[ cfg( feature = "source" ) ]
2679 rtcp_ready : Arc < tokio:: sync:: Notify > ,
2780}
2881
2982impl SourceBridge {
30- pub fn new ( source_id : String , forward : Arc < PeerForward > ) -> Self {
83+ pub fn new (
84+ source_id : String ,
85+ forward : Arc < PeerForward > ,
86+ has_video : bool ,
87+ has_audio : bool ,
88+ ) -> Self {
89+ let channel_mapping = ChannelMapping :: new ( has_video, has_audio) ;
90+
3191 Self {
3292 source_id,
3393 forward,
3494 tasks : Arc :: new ( tokio:: sync:: Mutex :: new ( Vec :: new ( ) ) ) ,
3595 shutdown_tx : None ,
96+ channel_mapping,
3697 #[ cfg( feature = "source" ) ]
3798 rtcp_to_source_tx : None ,
3899 #[ cfg( feature = "source" ) ]
@@ -73,9 +134,13 @@ impl SourceBridge {
73134 let forward_clone = self . forward . clone ( ) ;
74135 let source_id_clone = self . source_id . clone ( ) ;
75136 let mut shutdown_rx1 = shutdown_tx. subscribe ( ) ;
137+ let channel_mapping = self . channel_mapping ;
76138
77139 let rtp_task = tokio:: spawn ( async move {
78- info ! ( "[{}] RTP bridging task started" , source_id_clone) ;
140+ info ! (
141+ "[{}] RTP bridging task started with mapping: {:?}" ,
142+ source_id_clone, channel_mapping
143+ ) ;
79144 let mut packet_count = 0u64 ;
80145 let mut video_count = 0u64 ;
81146 let mut audio_count = 0u64 ;
@@ -96,41 +161,36 @@ impl SourceBridge {
96161
97162 let inject_result = match packet {
98163 MediaPacket :: Rtp { channel, data, .. } => {
99- match channel {
100- CHANNEL_VIDEO_RTP => {
101- video_count += 1 ;
102- if video_count % LOG_PACKET_INTERVAL == 1 {
103- debug!(
104- "[{}] Forwarding video packet #{}, size: {}" ,
105- source_id_clone, video_count, data. len( )
106- ) ;
107- }
108- forward_clone. inject_video_rtp( & data) . await
109- }
110- CHANNEL_AUDIO_RTP => {
111- audio_count += 1 ;
112- if audio_count % LOG_PACKET_INTERVAL == 1 {
113- debug!(
114- "[{}] Forwarding audio packet #{}, size: {}" ,
115- source_id_clone, audio_count, data. len( )
116- ) ;
117- }
118- forward_clone. inject_audio_rtp( & data) . await
119- }
120- CHANNEL_VIDEO_RTCP | CHANNEL_AUDIO_RTCP => {
121- trace!(
122- "[{}] Received RTCP packet on channel {}" ,
123- source_id_clone, channel
164+ if channel_mapping. is_video_rtp( channel) {
165+ video_count += 1 ;
166+ if video_count % LOG_PACKET_INTERVAL == 1 {
167+ debug!(
168+ "[{}] Forwarding video packet #{}, size: {}" ,
169+ source_id_clone, video_count, data. len( )
124170 ) ;
125- Ok ( ( ) )
126171 }
127- _ => {
128- warn!(
129- "[{}] Unknown channel: {}" ,
130- source_id_clone, channel
172+ forward_clone. inject_video_rtp( & data) . await . map_err( |e| anyhow!( "{:?}" , e) )
173+ } else if channel_mapping. is_audio_rtp( channel) {
174+ audio_count += 1 ;
175+ if audio_count % LOG_PACKET_INTERVAL == 1 {
176+ debug!(
177+ "[{}] Forwarding audio packet #{}, size: {}" ,
178+ source_id_clone, audio_count, data. len( )
131179 ) ;
132- Ok ( ( ) )
133180 }
181+ forward_clone. inject_audio_rtp( & data) . await . map_err( |e| anyhow!( "{:?}" , e) )
182+ } else if channel_mapping. is_video_rtcp( channel) || channel_mapping. is_audio_rtcp( channel) {
183+ trace!(
184+ "[{}] Received RTCP packet on channel {}" ,
185+ source_id_clone, channel
186+ ) ;
187+ Ok ( ( ) )
188+ } else {
189+ warn!(
190+ "[{}] Unknown channel: {}" ,
191+ source_id_clone, channel
192+ ) ;
193+ Ok ( ( ) )
134194 }
135195 }
136196 } ;
0 commit comments