Skip to content

Commit 731f712

Browse files
feat: Allow sub process to boot without message
The sub process "./flow job:execute" now starts with its "queue" argument but without its "messageCacheIdentifier" argument in an interactive process object. The messaeg identifier gets passed to this process when available, which might be at a later time. This allows the sub process to boot its fraemwork while still waiting for messaegs to come in. While this will increae memory consumption by a bit, it will decrease the wait time for executing a message.
1 parent 2d4198c commit 731f712

2 files changed

Lines changed: 87 additions & 72 deletions

File tree

Classes/Worker.php

Lines changed: 85 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,119 +1,97 @@
11
<?php
2+
23
declare(strict_types=1);
34

45
namespace Netlogix\JobQueue\FastRabbit;
56

67
use Flowpack\JobQueue\Common\Job\JobManager;
78
use Flowpack\JobQueue\Common\Queue\Message;
89
use Neos\Cache\Frontend\FrontendInterface;
9-
use Neos\Flow\Annotations as Flow;
1010
use Neos\Flow\Cli\ConsoleOutput;
11+
use Symfony\Component\Process\InputStream;
12+
use Symfony\Component\Process\Process;
1113
use t3n\JobQueue\RabbitMQ\Queue\RabbitQueue;
1214

13-
/**
14-
* @Flow\Proxy(false)
15-
*/
15+
use function array_shift;
16+
1617
final class Worker
1718
{
18-
/**
19-
* @var string
20-
*/
21-
protected $command;
19+
protected readonly ConsoleOutput $output;
2220

2321
/**
24-
* @var RabbitQueue
22+
* @var array{input: InputStream, process: Process}[]
2523
*/
26-
protected $queue;
24+
private array $pool = [];
2725

2826
/**
29-
* @var array
27+
* A pool size of 1 means one standby while 1 is working.
3028
*/
31-
protected $queueSettings;
32-
33-
/**
34-
* @var FrontendInterface
35-
*/
36-
protected $messageCache;
37-
38-
/**
39-
* @var ConsoleOutput
40-
*/
41-
protected $output;
42-
43-
/**
44-
* @var Lock
45-
*/
46-
private $lock;
29+
protected int $poolSize = 1;
4730

4831
public function __construct(
49-
string $command,
50-
RabbitQueue $queue,
51-
array $queueSettings,
52-
FrontendInterface $messageCache,
53-
Lock $lock
32+
protected readonly string $command,
33+
protected readonly RabbitQueue $queue,
34+
protected readonly array $queueSettings,
35+
protected readonly FrontendInterface $messageCache,
36+
protected readonly Lock $lock
5437
) {
55-
$this->command = $command;
56-
$this->queue = $queue;
57-
$this->queueSettings = $queueSettings;
58-
$this->messageCache = $messageCache;
59-
$this->lock = $lock;
6038
}
6139

62-
public function prepare()
40+
public function prepare(): void
6341
{
42+
$this->cleanPool();
43+
6444
$this->output = new ConsoleOutput();
65-
$this->outputLine('Watching queue <b>"%s"</b>', $this->queue->getName());
45+
$this->output->outputLine('Watching queue <b>"%s"</b>', [$this->queue->getName()]);
6646
}
6747

68-
public function executeMessage(Message $message)
48+
public function executeMessage(Message $message): void
6949
{
7050
$messageCacheIdentifier = sha1(serialize($message));
7151
$this->messageCache->set($messageCacheIdentifier, $message);
7252

73-
$this->lock->run(function() use (&$messageCacheIdentifier, &$commandOutput, &$result) {
74-
exec(
75-
$this->command . ' --messageCacheIdentifier=' . escapeshellarg($messageCacheIdentifier),
76-
$commandOutput,
77-
$result
78-
);
79-
});
53+
$process = $this->lock->run(
54+
fn () => $this->runFromPool($messageCacheIdentifier)
55+
);
8056

81-
if ($result === 0) {
57+
if ($process->getExitCode() === 0) {
8258
$this->queue->finish($message->getIdentifier());
83-
$this->outputLine(
84-
'<success>Successfully executed job "%s" (%s)</success>',
85-
$message->getIdentifier(),
86-
join('', $commandOutput)
59+
$this->output->outputLine(
60+
'<success>Successfully executed job "%s"</success>',
61+
[$message->getIdentifier()]
8762
);
88-
63+
$this->output->outputLine('Output: %s', [$process->getOutput()]);
8964
} else {
9065
$maximumNumberOfReleases = isset($this->queueSettings['maximumNumberOfReleases'])
91-
? (int)$this->queueSettings['maximumNumberOfReleases']
66+
? (int) $this->queueSettings['maximumNumberOfReleases']
9267
: JobManager::DEFAULT_MAXIMUM_NUMBER_RELEASES;
9368

9469
if ($message->getNumberOfReleases() < $maximumNumberOfReleases) {
9570
$releaseOptions = isset($this->queueSettings['releaseOptions']) ? $this->queueSettings['releaseOptions'] : [];
9671
$this->queue->release($message->getIdentifier(), $releaseOptions);
9772
$this->queue->reQueueMessage($message, $releaseOptions);
98-
$this->outputLine(
99-
'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - RELEASE',
100-
$message->getIdentifier(),
101-
$this->queue->getName(),
102-
$message->getNumberOfReleases() + 1,
103-
$maximumNumberOfReleases + 1
73+
$this->output->outputLine('Output: %s', [$process->getOutput()]);
74+
$this->output->outputLine(
75+
'<error>Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - RELEASE</error>',
76+
[
77+
$message->getIdentifier(),
78+
$this->queue->getName(),
79+
$message->getNumberOfReleases() + 1,
80+
$maximumNumberOfReleases + 1,
81+
]
10482
);
105-
$this->outputLine('<error>Message: %s</error>', join('', $commandOutput));
106-
10783
} else {
10884
$this->queue->abort($message->getIdentifier());
109-
$this->outputLine(
110-
'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING',
111-
$message->getIdentifier(),
112-
$this->queue->getName(),
113-
$message->getNumberOfReleases() + 1,
114-
$maximumNumberOfReleases + 1
85+
$this->output->outputLine('Output: %s', [$process->getOutput()]);
86+
$this->output->outputLine(
87+
'<error>Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING</error>',
88+
[
89+
$message->getIdentifier(),
90+
$this->queue->getName(),
91+
$message->getNumberOfReleases() + 1,
92+
$maximumNumberOfReleases + 1,
93+
]
11594
);
116-
$this->outputLine('<error>Message: %s</error>', join('', $commandOutput));
11795
}
11896
}
11997

@@ -122,8 +100,44 @@ public function executeMessage(Message $message)
122100
}
123101
}
124102

125-
protected function outputLine(string $text, ...$arguments)
103+
/**
104+
* @return array{input: InputStream, process: Process}
105+
*/
106+
private function createProcess(): array
107+
{
108+
$input = new InputStream();
109+
$process = Process::fromShellCommandline(
110+
command: $this->command,
111+
input: $input,
112+
timeout: 0
113+
);
114+
$process->start();
115+
return ['input' => $input, 'process' => $process];
116+
}
117+
118+
private function runFromPool(string $messageCacheIdentifier): Process
126119
{
127-
$this->output->outputLine($text, $arguments);
120+
$this->cleanPool();
121+
['input' => $input, 'process' => $process] = array_shift($this->pool);
122+
$this->pool[] = $this->createProcess();
123+
124+
assert($input instanceof InputStream);
125+
assert($process instanceof Process);
126+
127+
$input->write($messageCacheIdentifier . PHP_EOL);
128+
129+
$process->wait();
130+
return $process;
131+
}
132+
133+
private function cleanPool(): void
134+
{
135+
$this->pool = array_filter(
136+
$this->pool,
137+
fn (array $item) => $item['process']->isRunning()
138+
);
139+
while (count($this->pool) < $this->poolSize) {
140+
$this->pool[] = $this->createProcess();
141+
}
128142
}
129143
}

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "netlogix/jobqueue-fast-rabbit",
33
"type": "neos-package",
4-
"description": "Low memory footprint worker for RabbitMQ jobs",
4+
"description": "Worker for RabbitMQ jobs",
55
"license": "MIT",
66
"autoload": {
77
"psr-4": {
@@ -13,6 +13,7 @@
1313
"flowpack/jobqueue-common": "^3.0.0",
1414
"netlogix/supervisor": "^1.0",
1515
"php-amqplib/php-amqplib": "^2.11",
16+
"symfony/process": "^5.4 || ^6.0 || ^7.0",
1617
"t3n/jobqueue-rabbitmq": "^2.3.0",
1718
"php": "~8.2 || ~8.3 || ~8.4"
1819
},

0 commit comments

Comments
 (0)