@@ -2,6 +2,8 @@ use anyhow::{Context, Result};
22use bytes:: Bytes ;
33use clap:: Parser ;
44use std:: path:: PathBuf ;
5+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
6+ use std:: sync:: { Arc , Condvar , Mutex } ;
57use url:: Url ;
68
79mod audio;
@@ -77,40 +79,138 @@ async fn run(config: &Config) -> Result<()> {
7779 . connect ( config. url . clone ( ) )
7880 . await ?;
7981
80- // Set up catalog and video encoder .
82+ // Set up catalog and encoders .
8183 let catalog = moq_mux:: CatalogProducer :: new ( & mut broadcast) ?;
8284 let video_encoder = video:: VideoEncoder :: spawn ( broadcast. clone ( ) , catalog. clone ( ) ) ;
8385
86+ // Init ffmpeg and create audio encoder before the blocking thread
87+ // so we can clone its track producer for monitoring.
88+ ffmpeg_next:: init ( ) . context ( "failed to init ffmpeg" ) ?;
89+ let mut audio_encoder = audio:: AudioEncoder :: new ( broadcast. clone ( ) , catalog. clone ( ) , 44100 ) ?;
90+
91+ // Clone track producers for the monitoring task.
92+ let video_track = video_encoder. track . clone ( ) ;
93+ let audio_track = audio_encoder. track ( ) . clone ( ) ;
94+
8495 // Create status track.
8596 let status_track = moq_lite:: Track {
8697 name : "status" . to_string ( ) ,
8798 priority : 10 ,
8899 } ;
89100 let mut status_producer = broadcast. create_track ( status_track) ?;
90101
102+ // Per-track and overall pause signaling.
103+ let video_active = Arc :: new ( AtomicBool :: new ( false ) ) ;
104+ let audio_active = Arc :: new ( AtomicBool :: new ( false ) ) ;
105+ let paused = Arc :: new ( AtomicBool :: new ( true ) ) ; // Start paused until first viewer.
106+ let resume_notify = Arc :: new ( ( Mutex :: new ( ( ) ) , Condvar :: new ( ) ) ) ;
107+
108+ // Monitor video track.
109+ let flag = video_active. clone ( ) ;
110+ let all_paused = paused. clone ( ) ;
111+ let resume = resume_notify. clone ( ) ;
112+ let vt = video_track. clone ( ) ;
113+ tokio:: spawn ( async move {
114+ loop {
115+ if vt. used ( ) . await . is_err ( ) {
116+ break ;
117+ }
118+ tracing:: info!( "resuming video: viewer subscribed" ) ;
119+ flag. store ( true , Ordering :: Release ) ;
120+ all_paused. store ( false , Ordering :: Release ) ;
121+ resume. 1 . notify_all ( ) ;
122+
123+ if vt. unused ( ) . await . is_err ( ) {
124+ break ;
125+ }
126+ tracing:: info!( "pausing video: no viewers" ) ;
127+ flag. store ( false , Ordering :: Release ) ;
128+ }
129+ } ) ;
130+
131+ // Monitor audio track.
132+ let flag = audio_active. clone ( ) ;
133+ let all_paused = paused. clone ( ) ;
134+ let resume = resume_notify. clone ( ) ;
135+ let at = audio_track. clone ( ) ;
136+ tokio:: spawn ( async move {
137+ loop {
138+ if at. used ( ) . await . is_err ( ) {
139+ break ;
140+ }
141+ tracing:: info!( "resuming audio: viewer subscribed" ) ;
142+ flag. store ( true , Ordering :: Release ) ;
143+ all_paused. store ( false , Ordering :: Release ) ;
144+ resume. 1 . notify_all ( ) ;
145+
146+ if at. unused ( ) . await . is_err ( ) {
147+ break ;
148+ }
149+ tracing:: info!( "pausing audio: no viewers" ) ;
150+ flag. store ( false , Ordering :: Release ) ;
151+ }
152+ } ) ;
153+
154+ // Monitor overall pause state (both unused = pause emulation).
155+ {
156+ let paused = paused. clone ( ) ;
157+ let resume = resume_notify. clone ( ) ;
158+ tokio:: spawn ( async move {
159+ loop {
160+ // Wait for BOTH tracks to become unused.
161+ let ( v, a) = tokio:: join!( video_track. unused( ) , audio_track. unused( ) ) ;
162+ if v. is_err ( ) || a. is_err ( ) {
163+ break ;
164+ }
165+ tracing:: info!( "pausing emulation: no viewers" ) ;
166+ paused. store ( true , Ordering :: Release ) ;
167+
168+ // Wait for EITHER track to become used.
169+ tokio:: select! {
170+ Err ( _) = video_track. used( ) => break ,
171+ Err ( _) = audio_track. used( ) => break ,
172+ else => { } ,
173+ }
174+ tracing:: info!( "resuming emulation: viewer connected" ) ;
175+ paused. store ( false , Ordering :: Release ) ;
176+ resume. 1 . notify_all ( ) ;
177+ }
178+ // Ensure emulator thread isn't stuck waiting on resume.
179+ paused. store ( false , Ordering :: Release ) ;
180+ resume. 1 . notify_all ( ) ;
181+ } ) ;
182+ }
183+
91184 // Run the emulator on a blocking thread.
92185 let timeout_secs = config. timeout ;
93186 let emulator_handle = tokio:: task:: spawn_blocking ( move || -> Result < ( ) > {
94- ffmpeg_next:: init ( ) . context ( "failed to init ffmpeg" ) ?;
95-
96187 let mut emu = emulator:: Emulator :: new ( & rom_path) ?;
97188
98- // Set up audio encoder (runs on this thread since Opus encoding is fast).
99- // GB APU typically outputs at ~44100Hz but we'll check.
100- let mut audio_encoder =
101- audio:: AudioEncoder :: new ( broadcast. clone ( ) , catalog. clone ( ) , 44100 ) ?;
102-
103189 let frame_duration = std:: time:: Duration :: from_micros ( 16_742 ) ; // ~59.73fps
104190 let mut next_frame = std:: time:: Instant :: now ( ) ;
105191 let start = std:: time:: Instant :: now ( ) ;
106192 let mut last_input = std:: time:: Instant :: now ( ) ;
107193 let mut last_status = String :: new ( ) ;
108194 let timeout = std:: time:: Duration :: from_secs ( timeout_secs) ;
109- // Per-viewer latency: viewer_id -> (latency_ms, last_seen).
110195 let mut viewer_latency: std:: collections:: HashMap < String , ( f64 , std:: time:: Instant ) > =
111196 std:: collections:: HashMap :: new ( ) ;
112197
113198 loop {
199+ // Pause emulation when no viewers are watching.
200+ if paused. load ( Ordering :: Acquire ) {
201+ tracing:: info!( "pausing encoding" ) ;
202+ let ( lock, cvar) = & * resume_notify;
203+ let mut guard = lock. lock ( ) . unwrap ( ) ;
204+ while paused. load ( Ordering :: Acquire ) {
205+ guard = cvar. wait ( guard) . unwrap ( ) ;
206+ }
207+ tracing:: info!( "resuming encoding" ) ;
208+ // Don't try to catch up after a pause.
209+ next_frame = std:: time:: Instant :: now ( ) ;
210+ // Force a keyframe so new viewers can start decoding.
211+ video_encoder. force_keyframe ( ) ;
212+ }
213+
114214 // Wait for next frame.
115215 let now = std:: time:: Instant :: now ( ) ;
116216 if now < next_frame {
@@ -132,7 +232,6 @@ async fn run(config: &Config) -> Result<()> {
132232 emu. set_buttons ( & viewer_id, buttons. into_iter ( ) . collect ( ) ) ;
133233 last_input = std:: time:: Instant :: now ( ) ;
134234
135- // Calculate end-to-end latency: current time - viewer's displayed time.
136235 let latency = current_ts_ms - ts_ms;
137236 if latency >= 0.0 {
138237 viewer_latency. insert ( viewer_id, ( latency, std:: time:: Instant :: now ( ) ) ) ;
@@ -165,7 +264,7 @@ async fn run(config: &Config) -> Result<()> {
165264 let stale = std:: time:: Duration :: from_secs ( 30 ) ;
166265 viewer_latency. retain ( |_, ( _, last_seen) | last_seen. elapsed ( ) < stale) ;
167266
168- // Publish status with held buttons, idle countdown, and per-viewer latency .
267+ // Publish status.
169268 let held: Vec < _ > = emu. pressed_buttons ( ) . iter ( ) . copied ( ) . collect ( ) ;
170269 let idle_secs = idle_time. as_secs ( ) ;
171270 let remaining = timeout_secs. saturating_sub ( idle_secs) ;
@@ -190,21 +289,26 @@ async fn run(config: &Config) -> Result<()> {
190289 }
191290 }
192291
193- // Grab and publish video frame.
194- let rgba = Bytes :: from ( emu . framebuffer ( ) ) ;
195- let pts_micros = start . elapsed ( ) . as_micros ( ) as u64 ;
196- let ts = hang :: container :: Timestamp :: from_micros ( pts_micros )
197- . context ( "timestamp overflow" ) ? ;
198-
199- // Send to video encoder thread (non-blocking, drop frame if behind).
200- video_encoder . try_frame ( rgba , ts ) ;
292+ // Grab and publish video frame (skip if no video viewers) .
293+ if video_active . load ( Ordering :: Relaxed ) {
294+ let rgba = Bytes :: from ( emu . framebuffer ( ) ) ;
295+ let pts_micros = start . elapsed ( ) . as_micros ( ) as u64 ;
296+ let ts = hang :: container :: Timestamp :: from_micros ( pts_micros )
297+ . context ( "timestamp overflow" ) ? ;
298+ video_encoder . try_frame ( rgba , ts ) ;
299+ }
201300
202- // Grab and encode audio.
203- let samples = emu. audio_samples ( ) ;
204- if !samples. is_empty ( ) {
205- if let Err ( e) = audio_encoder. push_samples ( & samples) {
206- tracing:: warn!( error = %e, "audio encode error" ) ;
301+ // Grab and encode audio (skip if no audio viewers).
302+ if audio_active. load ( Ordering :: Relaxed ) {
303+ let samples = emu. audio_samples ( ) ;
304+ if !samples. is_empty ( ) {
305+ if let Err ( e) = audio_encoder. push_samples ( & samples) {
306+ tracing:: warn!( error = %e, "audio encode error" ) ;
307+ }
207308 }
309+ } else {
310+ // Drain audio buffer even when not encoding to prevent buildup.
311+ emu. audio_samples ( ) ;
208312 }
209313 }
210314 } ) ;
0 commit comments