Skip to content

Commit bcdc480

Browse files
fix: Prevent MissingInputExceptions when loop restarts
Our FastRabbit loop will retart every 6 hours. This results in child processes loosing their STDIN, so they will throw an exceptions. Since this exception is expected, we don't need to log it.
1 parent 1f90b55 commit bcdc480

3 files changed

Lines changed: 64 additions & 1 deletion

File tree

Classes/Loop.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,14 @@ public function runMessagesOnWorker(Worker $worker)
4545
do {
4646
try {
4747
$message = $this->queue->waitAndReserve($this->timeout);
48-
$worker->executeMessage($message);
48+
if ($message) {
49+
$worker->executeMessage($message);
50+
}
4951
} catch (AMQPTimeoutException $e) {
5052
}
5153

5254
if ($this->exitAfterTimestamp !== null && time() >= $this->exitAfterTimestamp) {
55+
$worker->shutdownObject();
5356
break;
5457
}
5558
} while (true);
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace Netlogix\JobQueue\FastRabbit\NoFoo;
4+
5+
use Flowpack\JobQueue\Common\Command\JobCommandController;
6+
use Neos\Flow\Annotations as Flow;
7+
use Neos\Flow\Aop\JoinPointInterface;
8+
use Neos\Flow\Cli\Exception\StopCommandException;
9+
use Neos\Flow\Reflection\ClassReflection;
10+
use Symfony\Component\Console\Exception\MissingInputException;
11+
12+
/**
13+
* There might be orphaned worker processes when the parent process
14+
* of FastRabbit loop restarts due to its max wait time.
15+
*
16+
* Those are still waiting for message identifiers, which can never
17+
* be fulfilled once their STDIN closes, so a MissingInputException
18+
* gets thrown.
19+
*
20+
* This is expected and will happen every 6 hours due to the current
21+
* configuration. We don't need those exceptions tracked. So redirect
22+
* to StopCommandException instead.
23+
*/
24+
#[Flow\Aspect]
25+
#[Flow\Proxy(false)]
26+
class JobCommandInitializationAspect
27+
{
28+
#[Flow\Around('within(' . JobCommandController::class . ') && method(.*->mapRequestArgumentsToControllerArguments())')]
29+
public function preventLoggingOfMissingInputExceptions(JoinPointInterface $joinPoint): void
30+
{
31+
$jobCommandController = $joinPoint->getProxy();
32+
assert($jobCommandController instanceof JobCommandController);
33+
34+
$reflection = new ClassReflection($jobCommandController);
35+
36+
$commandMethodName = $reflection
37+
->getProperty('commandMethodName')
38+
->getValue($jobCommandController);
39+
40+
if ($commandMethodName !== 'executeCommand') {
41+
$joinPoint->getAdviceChain()->proceed($joinPoint);
42+
return;
43+
}
44+
45+
try {
46+
$joinPoint->getAdviceChain()->proceed($joinPoint);
47+
} catch (MissingInputException $e) {
48+
throw new StopCommandException();
49+
}
50+
}
51+
}

Classes/Worker.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ public function __construct(
4949
$this->poolSize = max(0, (int) ($queueSettings['poolSize'] ?? 1));
5050
}
5151

52+
public function shutdownObject()
53+
{
54+
foreach ($this->pool as $process) {
55+
$process->terminate();
56+
$process->stdin->close();
57+
}
58+
$this->pool = [];
59+
}
60+
5261
public function prepare(): void
5362
{
5463
$this->fillPool($this->poolSize);

0 commit comments

Comments
 (0)