Skip to content

Commit 5c5d67b

Browse files
authored
feat: Add option to expire worker when empty queue (#25)
* feat: Add receiver to stop the worker when queue is empty for a among of time * fix: Fix wrong sentence and add type on new receiver * doc: Fix wrong word
1 parent 0ffc202 commit 5c5d67b

5 files changed

Lines changed: 187 additions & 2 deletions

File tree

src/Console/Command/ConsumeCommand.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ protected function configure(): void
8383
->addOption('save', null, InputOption::VALUE_NONE, 'Save failed job.')
8484
->addOption('max', null, InputOption::VALUE_REQUIRED, 'The max number of jobs.')
8585
->addOption('expire', null, InputOption::VALUE_REQUIRED, 'The worker duration in seconds.')
86+
->addOption('expireWhenEmpty', null, InputOption::VALUE_REQUIRED, 'Stop the worker when the queues are empty for an amount of time in seconds.')
8687
->addOption('stopWhenEmpty', null, InputOption::VALUE_NONE, 'Stop the worker if the queues are empty.')
8788
->addOption('stopOnError', null, InputOption::VALUE_NONE, 'Stop the worker if error occurs.')
8889
->addOption('logger', null, InputOption::VALUE_REQUIRED, 'The logger to use "stdout", "null", or "default".', "default")
@@ -159,6 +160,10 @@ protected function createExtension(InputInterface $input, OutputInterface $outpu
159160
$builder->expire((int)$input->getOption('expire'));
160161
}
161162

163+
if ($input->getOption('expireWhenEmpty')) {
164+
$builder->expireWhenEmpty((int)$input->getOption('expireWhenEmpty'));
165+
}
166+
162167
$memory = $this->convertToBytes($input->getOption('memory'));
163168
if ($memory > 0) {
164169
$builder->memory($memory);

src/Consumer/Receiver/Builder/ReceiverBuilder.php

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Bdf\Queue\Consumer\Receiver\Binder\BinderInterface;
88
use Bdf\Queue\Consumer\Receiver\Binder\BinderReceiver;
99
use Bdf\Queue\Consumer\Receiver\Binder\ClassNameBinder;
10+
use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver;
1011
use Bdf\Queue\Consumer\Receiver\MemoryLimiterReceiver;
1112
use Bdf\Queue\Consumer\Receiver\MessageCountLimiterReceiver;
1213
use Bdf\Queue\Consumer\Receiver\MessageLoggerReceiver;
@@ -219,17 +220,31 @@ public function max(int $number): ReceiverBuilder
219220
* Limit the number of received message
220221
* When the limit is reached, the consumer is stopped
221222
*
222-
* @param int $seconds Number of messages
223+
* @param int $seconds Time in seconds
223224
*
224225
* @return $this
225226
*
226-
* @see MessageCountLimiterReceiver
227+
* @see TimeLimiterReceiver
227228
*/
228229
public function expire(int $seconds): ReceiverBuilder
229230
{
230231
return $this->add(new TimeLimiterReceiver($seconds, $this->logger));
231232
}
232233

234+
/**
235+
* Stops consumption when the queues are empty for an amount of time
236+
*
237+
* @param int $seconds Time in seconds
238+
*
239+
* @return $this
240+
*
241+
* @see LimitTimeWhenEmptyReceiver
242+
*/
243+
public function expireWhenEmpty(int $seconds): ReceiverBuilder
244+
{
245+
return $this->add(new LimitTimeWhenEmptyReceiver($seconds, $this->logger));
246+
}
247+
233248
/**
234249
* Limit the total memory usage of the current runtime
235250
* When the limit is reached, the consumer is stopped
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<?php
2+
3+
namespace Bdf\Queue\Consumer\Receiver;
4+
5+
use Bdf\Queue\Consumer\ConsumerInterface;
6+
use Bdf\Queue\Consumer\DelegateHelper;
7+
use Bdf\Queue\Consumer\ReceiverInterface;
8+
use Psr\Log\LoggerInterface;
9+
10+
/**
11+
*
12+
*/
13+
class LimitTimeWhenEmptyReceiver implements ReceiverInterface
14+
{
15+
use DelegateHelper;
16+
17+
private int $limit;
18+
private ?int $endTime;
19+
private ?LoggerInterface $logger;
20+
21+
/**
22+
* TimeLimiterMiddlewareReceiver constructor.
23+
*
24+
* @param ReceiverInterface $delegate
25+
* @param int $limit Time limit in second
26+
* @param LoggerInterface|null $logger
27+
*/
28+
public function __construct(/*int $limit, LoggerInterface $logger = null*/)
29+
{
30+
$args = func_get_args();
31+
$index = 0;
32+
33+
if ($args[0] instanceof ReceiverInterface) {
34+
@trigger_error('Passing delegate in constructor of receiver is deprecated since 1.4', E_USER_DEPRECATED);
35+
$this->delegate = $args[0];
36+
++$index;
37+
}
38+
39+
$this->endTime = null;
40+
$this->limit = $args[$index++];
41+
$this->logger = $args[$index] ?? null;
42+
}
43+
44+
/**
45+
* {@inheritdoc}
46+
*/
47+
public function receive($message, ConsumerInterface $consumer): void
48+
{
49+
$this->endTime = null;
50+
51+
$next = $this->delegate ?? $consumer;
52+
$next->receive($message, $consumer);
53+
}
54+
55+
/**
56+
* {@inheritdoc}
57+
*/
58+
public function receiveTimeout(ConsumerInterface $consumer): void
59+
{
60+
$next = $this->delegate ?? $consumer;
61+
$next->receiveTimeout($consumer);
62+
63+
if (null === $this->endTime) {
64+
$this->endTime = $this->limit + time();
65+
}
66+
67+
if ($this->endTime >= time()) {
68+
return;
69+
}
70+
71+
$consumer->stop();
72+
73+
if (null !== $this->logger) {
74+
$this->logger->info('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => $this->limit]);
75+
}
76+
}
77+
}

tests/Consumer/Receiver/Builder/ReceiverBuilderTest.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Bdf\Queue\Consumer\Receiver\BenchReceiver;
1010
use Bdf\Queue\Consumer\Receiver\Binder\BinderReceiver;
1111
use Bdf\Queue\Consumer\Receiver\Binder\ClassNameBinder;
12+
use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver;
1213
use Bdf\Queue\Consumer\Receiver\MemoryLimiterReceiver;
1314
use Bdf\Queue\Consumer\Receiver\MessageCountLimiterReceiver;
1415
use Bdf\Queue\Consumer\Receiver\MessageLoggerReceiver;
@@ -573,6 +574,22 @@ public function test_expire()
573574
);
574575
}
575576

577+
/**
578+
*
579+
*/
580+
public function test_expire_when_empty()
581+
{
582+
$this->builder->expireWhenEmpty(10);
583+
584+
$this->assertEquals(
585+
new ReceiverPipeline([
586+
new LimitTimeWhenEmptyReceiver(10, new LoggerProxy(new NullLogger())),
587+
new ProcessorReceiver(new JobHintProcessorResolver($this->container->get(InstantiatorInterface::class))),
588+
]),
589+
$this->builder->build()
590+
);
591+
}
592+
576593
/**
577594
*
578595
*/
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
3+
namespace Bdf\Queue\Consumer\Receiver\Tests;
4+
5+
use Bdf\Queue\Consumer\ConsumerInterface;
6+
use Bdf\Queue\Consumer\Receiver\LimitTimeWhenEmptyReceiver;
7+
use Bdf\Queue\Consumer\Receiver\NextInterface;
8+
use Bdf\Queue\Consumer\ReceiverInterface;
9+
use PHPUnit\Framework\TestCase;
10+
use Psr\Log\LoggerInterface;
11+
12+
/**
13+
*
14+
*/
15+
class LimitTimeWhenEmptyReceiverTest extends TestCase
16+
{
17+
/**
18+
* @group time-sensitive
19+
*/
20+
public function test_receiver_never_stop_stops()
21+
{
22+
$next = $this->createMock(NextInterface::class);
23+
$next->expects($this->never())->method('stop');
24+
25+
$extension = new LimitTimeWhenEmptyReceiver(2);
26+
$extension->receiveTimeout($next);
27+
$extension->receiveTimeout($next);
28+
sleep(2);
29+
$extension->receive('message', $next);
30+
$extension->receiveTimeout($next);
31+
}
32+
33+
/**
34+
* @group time-sensitive
35+
*/
36+
public function test_receiver_stops_when_time_limit_is_reached()
37+
{
38+
$next = $this->createMock(NextInterface::class);
39+
$next->expects($this->once())->method('stop');
40+
41+
$logger = $this->createMock(LoggerInterface::class);
42+
$logger->expects($this->once())->method('info')
43+
->with('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => 1]);
44+
45+
$extension = new LimitTimeWhenEmptyReceiver(1, $logger);
46+
$extension->receiveTimeout($next);
47+
sleep(2);
48+
$extension->receiveTimeout($next);
49+
}
50+
51+
/**
52+
* @group time-sensitive
53+
*/
54+
public function test_receiver_stops_when_time_limit_is_reached_legacy()
55+
{
56+
$decorated = $this->createMock(ReceiverInterface::class);
57+
$decorated->expects($this->any())->method('receiveTimeout');
58+
59+
$next = $this->createMock(ConsumerInterface::class);
60+
$next->expects($this->once())->method('stop');
61+
62+
$logger = $this->createMock(LoggerInterface::class);
63+
$logger->expects($this->once())->method('info')
64+
->with('Receiver stopped due to empty time limit of {timeLimit}s reached', ['timeLimit' => 1]);
65+
66+
$extension = new LimitTimeWhenEmptyReceiver($decorated, 1, $logger);
67+
$extension->receiveTimeout($next);
68+
sleep(2);
69+
$extension->receiveTimeout($next);
70+
}
71+
}

0 commit comments

Comments
 (0)