Skip to content

Commit 724ba2e

Browse files
committed
FEATURE: Exit fast-rabbit loop after 600 seconds
Original: b9c3a8c670e544c5926448bfe96ddbb983d00782
1 parent 43e6a2b commit 724ba2e

2 files changed

Lines changed: 32 additions & 4 deletions

File tree

Classes/Loop.php

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace Netlogix\JobQueue\FastRabbit;
55

66
use Neos\Flow\Annotations as Flow;
7+
use PhpAmqpLib\Exception\AMQPTimeoutException;
78
use t3n\JobQueue\RabbitMQ\Queue\RabbitQueue;
89

910
/**
@@ -13,17 +14,44 @@ final class Loop
1314
{
1415
protected $queue;
1516

16-
public function __construct(RabbitQueue $queue)
17+
/**
18+
* Unix timestamp after which the Loop should exit
19+
*
20+
* @var int|null
21+
*/
22+
protected $exitAfterTimestamp;
23+
24+
/**
25+
* Timeout in seconds when waiting for new messages
26+
*
27+
* @var int|null
28+
*/
29+
protected $timeout;
30+
31+
/**
32+
* @param RabbitQueue $queue The Queue to watch
33+
* @param int $exitAfter Time in seconds after which the loop should exit
34+
*/
35+
public function __construct(RabbitQueue $queue, int $exitAfter = 0)
1736
{
1837
$this->queue = $queue;
38+
$this->exitAfterTimestamp = $exitAfter > 0 ? time() + $exitAfter : null;
39+
$this->timeout = $exitAfter > 0 ? $exitAfter : null;
1940
}
2041

2142
public function runMessagesOnWorker(Worker $worker)
2243
{
2344
$worker->prepare();
2445
do {
25-
$message = $this->queue->waitAndReserve();
26-
$worker->executeMessage($message);
46+
try {
47+
$message = $this->queue->waitAndReserve($this->timeout);
48+
$worker->executeMessage($message);
49+
} catch (AMQPTimeoutException $e) {
50+
}
51+
52+
if ($this->exitAfterTimestamp !== null && time() >= $this->exitAfterTimestamp) {
53+
break;
54+
}
2755
} while (true);
2856
}
2957
}

bin/fast-rabbit

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ $messageCache = CacheFactory::get($config);
2525
$lock = new Lock($config['workerPool']['numberOfWorkers'], $config['workerPool']['lockFileDirectory']);
2626

2727
$worker = new Worker($command, $queue, $queueSettings, $messageCache, $lock);
28-
$loop = new Loop($queue);
28+
$loop = new Loop($queue, 600);
2929

3030
$loop->runMessagesOnWorker($worker);

0 commit comments

Comments
 (0)