Skip to content

Commit b88fd4f

Browse files
authored
Merge pull request #22 from cfconrad/fix_race_on_register_process_v2
Fix race on `open3/fork` and `session->register($pid)` call -- second attempt
2 parents 914b0b5 + a37fc97 commit b88fd4f

3 files changed

Lines changed: 41 additions & 7 deletions

File tree

lib/Mojo/IOLoop/ReadWriteProcess.pm

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ sub _open {
120120
my ($self, @args) = @_;
121121
$self->_diag('Execute: ' . (join ', ', map { "'$_'" } @args)) if DEBUG;
122122

123-
$self->session->enable;
123+
$self->on(collect_status => \&_open_collect_status);
124124

125125
my ($wtr, $rdr, $err);
126126
$err = gensym;
@@ -130,7 +130,6 @@ sub _open {
130130
$self->process_id($pid);
131131

132132
# Defered collect of return status and removal of pidfile
133-
$self->on(collect_status => \&_open_collect_status);
134133

135134
return $self unless $self->set_pipes();
136135

@@ -220,7 +219,6 @@ sub _fork {
220219
# Separated handles that could be used for internal comunication.
221220
my ($channel_in, $channel_out);
222221

223-
$self->session->enable;
224222

225223
if ($self->set_pipes) {
226224
$input_pipe = IO::Pipe->new()
@@ -397,7 +395,11 @@ sub exit_status {
397395
sub restart {
398396
$_[0]->is_running ? $_[0]->stop->start : $_[0]->start;
399397
}
400-
sub is_running { $_[0]->process_id ? kill 0 => $_[0]->process_id : 0; }
398+
sub is_running {
399+
my ($self) = shift;
400+
$self->session->consume_collected_info;
401+
$self->process_id ? kill 0 => $self->process_id : 0;
402+
}
401403

402404
sub write_pidfile {
403405
my ($self, $pidfile) = @_;
@@ -472,6 +474,8 @@ sub start {
472474

473475
$self->session->enable_subreaper if $self->subreaper;
474476
$self->_status(undef);
477+
$self->session->enable;
478+
475479

476480
if ($self->code) {
477481
$self->_fork($self->code, @args);

lib/Mojo/IOLoop/ReadWriteProcess/Session.pm

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ has subreaper => 0;
2121
has collect_status => 1;
2222
has orphans => sub { {} };
2323
has process_table => sub { {} };
24+
has collected_info => sub { [] };
2425
has 'handler';
2526

2627
my $singleton;
@@ -52,7 +53,7 @@ sub enable {
5253
$singleton->emit('SIG_CHLD');
5354
return unless $singleton->collect_status;
5455
while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
55-
$singleton->collect($pid => $? => $!);
56+
$singleton->add_collected_info($pid, $?, $!);
5657
}
5758
}
5859
});
@@ -66,7 +67,7 @@ sub _collect {
6667
}
6768

6869
sub collect {
69-
my ($errno, $status, $pid) = (pop, pop, pop);
70+
my ($self, $pid, $status, $errno) = @_;
7071
if ($singleton->resolve($pid)) {
7172
$singleton->_collect($pid => $status => $errno);
7273
$singleton->emit(collected => $singleton->resolve($pid));
@@ -80,6 +81,17 @@ sub collect {
8081
return $singleton;
8182
}
8283

84+
sub consume_collected_info {
85+
while(my $i = shift @{$singleton->collected_info}) {
86+
$singleton->collect(@$i)
87+
}
88+
}
89+
90+
sub add_collected_info {
91+
shift;
92+
push @{$singleton->collected_info}, [@_];
93+
}
94+
8395
# Use as $pid => Mojo::IOLoop::ReadWriteProcess
8496
sub register {
8597
my ($process, $pid) = (pop, pop);
@@ -121,7 +133,7 @@ sub contains {
121133
$singleton->all->grep(sub { $_->pid eq $pid })->size == 1;
122134
}
123135

124-
sub reset { @{+shift}{qw(events orphans process_table)} = ({}, {}, {}) }
136+
sub reset { @{+shift}{qw(events orphans process_table collected_info handler)} = ({}, {}, {}, [], undef) }
125137

126138
# XXX: This should be replaced by PR_GET_CHILD_SUBREAPER
127139
sub disable_subreaper {

t/01_run.t

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,13 @@ subtest 'process execute()' => sub {
301301
$p->stop();
302302
};
303303

304+
subtest 'process(execute =>"/usr/bin/true")' => sub {
305+
use Mojo::IOLoop::ReadWriteProcess qw(process);
306+
plan skip_all => "Missing '/usr/bin/true'" unless -e '/usr/bin/true';
307+
308+
is(process(execute => '/usr/bin/true')->quirkiness(1)->start()->wait_stop()->exit_status(), 0, 'Simple exec of "/usr/bin/true" return 0');
309+
};
310+
304311
subtest 'process code()' => sub {
305312
use Mojo::IOLoop::ReadWriteProcess;
306313
use IO::Select;
@@ -560,4 +567,15 @@ subtest 'process_args' => sub {
560567
is($p->read_all_stdout(), "0$/1$/2$/3$/", '2) Args given as arrayref.');
561568
};
562569

570+
subtest 'process in process' => sub {
571+
my $p = process(sub {
572+
is( process(execute => '/usr/bin/true')->quirkiness(1)->start()->wait_stop()->exit_status(), 0, 'process(execute) from process(code) -- retval check true');
573+
is( process(execute => '/usr/bin/false')->quirkiness(1)->start()->wait_stop()->exit_status(), 1, 'process(execute) from process(code) -- retval check false');
574+
is( process(sub { print 'sub-sub-process'})->start()->wait_stop()->read_all_stdout, 'sub-sub-process', 'process(code) works from process(code)');
575+
print 'DONE';
576+
})->start()->wait_stop();
577+
578+
is ($p->read_all_stdout(), 'DONE', "Use ReadWriteProcess inside of ReadWriteProcess(code=>'')");
579+
};
580+
563581
done_testing;

0 commit comments

Comments
 (0)