Skip to content

Commit 9840072

Browse files
feat: Use React/EventLoop for ticks during sub process execution
The previous setting only reacted to data sent from the child process to the parent process via STDOUT and STDERR but had no timeout based callback, so during long-running child processees with no data interaction, the parent could not e.g. keep its database alive.
1 parent 37d9f64 commit 9840072

2 files changed

Lines changed: 22 additions & 4 deletions

File tree

Classes/Worker.php

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@
88
use Flowpack\JobQueue\Common\Queue\Message;
99
use Neos\Cache\Frontend\FrontendInterface;
1010
use Neos\Flow\Cli\ConsoleOutput;
11+
use React\EventLoop;
1112
use Symfony\Component\Process\InputStream;
1213
use Symfony\Component\Process\Process;
1314
use t3n\JobQueue\RabbitMQ\Queue\RabbitQueue;
1415

1516
use function array_shift;
17+
use function fputs;
18+
19+
use const STDERR;
20+
use const STDOUT;
1621

1722
final class Worker
1823
{
@@ -60,7 +65,6 @@ public function executeMessage(Message $message): void
6065
'<success>Successfully executed job "%s"</success>',
6166
[$message->getIdentifier()]
6267
);
63-
$this->output->outputLine('Output: %s', [$process->getOutput()]);
6468
} else {
6569
$maximumNumberOfReleases = isset($this->queueSettings['maximumNumberOfReleases'])
6670
? (int) $this->queueSettings['maximumNumberOfReleases']
@@ -70,7 +74,6 @@ public function executeMessage(Message $message): void
7074
$releaseOptions = isset($this->queueSettings['releaseOptions']) ? $this->queueSettings['releaseOptions'] : [];
7175
$this->queue->release($message->getIdentifier(), $releaseOptions);
7276
$this->queue->reQueueMessage($message, $releaseOptions);
73-
$this->output->outputLine('Output: %s', [$process->getOutput()]);
7477
$this->output->outputLine(
7578
'<error>Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - RELEASE</error>',
7679
[
@@ -82,7 +85,6 @@ public function executeMessage(Message $message): void
8285
);
8386
} else {
8487
$this->queue->abort($message->getIdentifier());
85-
$this->output->outputLine('Output: %s', [$process->getOutput()]);
8688
$this->output->outputLine(
8789
'<error>Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING</error>',
8890
[
@@ -126,7 +128,22 @@ private function runFromPool(string $messageCacheIdentifier): Process
126128

127129
$input->write($messageCacheIdentifier . PHP_EOL);
128130

129-
$process->wait();
131+
$loop = EventLoop\Loop::get();
132+
$loop->addPeriodicTimer(0.01, function (EventLoop\TimerInterface $timer) use ($process, $loop) {
133+
try {
134+
fputs(STDOUT, $process->getIncrementalOutput());
135+
fputs(STDERR, $process->getIncrementalErrorOutput());
136+
} catch (\Throwable $e) {
137+
}
138+
139+
if (!$process->isRunning()) {
140+
$loop->cancelTimer($timer);
141+
$loop->stop();
142+
}
143+
});
144+
145+
$loop->run();
146+
130147
return $process;
131148
}
132149

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"flowpack/jobqueue-common": "^3.0.0",
1414
"netlogix/supervisor": "^1.0",
1515
"php-amqplib/php-amqplib": "^2.11",
16+
"react/event-loop": "^1.5",
1617
"symfony/process": "^5.4 || ^6.0 || ^7.0",
1718
"t3n/jobqueue-rabbitmq": "^2.3.0",
1819
"php": "~8.2 || ~8.3 || ~8.4"

0 commit comments

Comments
 (0)