Skip to content

Commit eb058b6

Browse files
committed
Fixed MistProcMKVExec to work again, now with improved concurrency
1 parent fbcd8f4 commit eb058b6

1 file changed

Lines changed: 56 additions & 27 deletions

File tree

src/process/process_exec.cpp

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,13 @@
88
#include <sys/stat.h> //for stat
99
#include <sys/types.h> //for stat
1010
#include <unistd.h> //for stat
11+
#include <condition_variable>
12+
#include <mutex>
13+
14+
int pipein = -1, pipeout = -1;
1115

12-
int pipein[2], pipeout[2];
16+
std::condition_variable xCV;
17+
std::mutex xMutex;
1318

1419
Util::Config co;
1520
Util::Config conf;
@@ -212,7 +217,14 @@ namespace Mist{
212217
}
213218
args[argCnt] = 0;
214219

215-
execd_proc = Util::Procs::StartPiped(args, &pipein[0], &pipeout[1], &ffer);
220+
221+
{
222+
std::unique_lock<std::mutex> lk(xMutex);
223+
xCV.wait(lk, []() { return conf.is_active && co.is_active; });
224+
execd_proc = Util::Procs::StartPiped(args, &pipein, &pipeout, &ffer);
225+
if (!execd_proc) { return; }
226+
}
227+
xCV.notify_all();
216228

217229
uint64_t lastProcUpdate = Util::bootSecs();
218230
{
@@ -248,12 +260,26 @@ namespace Mist{
248260
void sinkThread(){
249261
Mist::ProcessSink in(&co);
250262
co.getOption("output", true).append("-");
251-
co.activate();
263+
264+
{
265+
std::unique_lock<std::mutex> lk(xMutex);
266+
co.activate();
267+
}
268+
xCV.notify_all();
269+
{
270+
std::unique_lock<std::mutex> lk(xMutex);
271+
xCV.wait(lk, []() { return pipeout != -1 || !co.is_active; });
272+
}
273+
252274
MEDIUM_MSG("Running sink thread...");
253-
in.setInFile(pipeout[0]);
254-
co.is_active = true;
275+
in.setInFile(pipeout);
255276
in.run();
256-
conf.is_active = false;
277+
278+
{
279+
std::unique_lock<std::mutex> lk(xMutex);
280+
conf.is_active = false;
281+
}
282+
xCV.notify_all();
257283
}
258284

259285
void sourceThread(){
@@ -263,12 +289,27 @@ void sourceThread(){
263289
if (Mist::opt.isMember("track_select")){
264290
conf.getOption("target", true).append("-?" + Mist::opt["track_select"].asString());
265291
}
266-
conf.is_active = true;
267-
Socket::Connection c(pipein[1], 0);
292+
{
293+
std::unique_lock<std::mutex> lk(xMutex);
294+
conf.is_active = true;
295+
}
296+
xCV.notify_all();
297+
{
298+
std::unique_lock<std::mutex> lk(xMutex);
299+
xCV.wait(lk, []() { return pipein != -1 || !conf.is_active; });
300+
}
301+
302+
Socket::Connection c(pipein, 0);
268303
Mist::ProcessSource out(c);
304+
269305
MEDIUM_MSG("Running source thread...");
270306
out.run();
271-
co.is_active = false;
307+
308+
{
309+
std::unique_lock<std::mutex> lk(xMutex);
310+
co.is_active = false;
311+
}
312+
xCV.notify_all();
272313
}
273314

274315
int main(int argc, char *argv[]){
@@ -407,19 +448,8 @@ int main(int argc, char *argv[]){
407448
return 1;
408449
}
409450

410-
// create pipe pair before thread
411-
if (pipe(pipein) || pipe(pipeout)){
412-
FAIL_MSG("Could not create pipes for process!");
413-
return 1;
414-
}
415-
Util::Procs::socketList.insert(pipeout[0]);
416-
Util::Procs::socketList.insert(pipeout[1]);
417-
Util::Procs::socketList.insert(pipein[0]);
418-
Util::Procs::socketList.insert(pipein[1]);
419-
420451
// stream which connects to input
421452
std::thread source(sourceThread);
422-
Util::sleep(500);
423453

424454
// needs to pass through encoder to outputEBML
425455
std::thread sink(sinkThread);
@@ -429,14 +459,13 @@ int main(int argc, char *argv[]){
429459
// run process
430460
Enc.Run();
431461

432-
co.is_active = false;
433-
conf.is_active = false;
462+
{
463+
std::unique_lock<std::mutex> lk(xMutex);
464+
co.is_active = false;
465+
conf.is_active = false;
466+
}
467+
xCV.notify_all();
434468

435-
// close pipes
436-
close(pipein[0]);
437-
close(pipeout[0]);
438-
close(pipein[1]);
439-
close(pipeout[1]);
440469

441470
source.join();
442471
HIGH_MSG("source thread joined");

0 commit comments

Comments
 (0)