Skip to content

Commit 6cc50fa

Browse files
author
Jason Silva
committed
Add handling of delay messages during initial consume and handling
1 parent 3974548 commit 6cc50fa

4 files changed

Lines changed: 84 additions & 6 deletions

File tree

Core/Handlers/MessageHandler.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Smartbox\Integration\FrameworkBundle\Core\Exchange;
1010
use Smartbox\Integration\FrameworkBundle\Core\Messages\Context;
1111
use Smartbox\Integration\FrameworkBundle\Core\Messages\DeferredExchangeEnvelope;
12+
use Smartbox\Integration\FrameworkBundle\Core\Messages\DelayedExchangeEnvelope;
1213
use Smartbox\Integration\FrameworkBundle\Core\Messages\ErrorExchangeEnvelope;
1314
use Smartbox\Integration\FrameworkBundle\Core\Messages\ExchangeEnvelope;
1415
use Smartbox\Integration\FrameworkBundle\Core\Messages\FailedExchangeEnvelope;
@@ -408,7 +409,11 @@ public function onHandleException(
408409

409410
} // If it's an exchange that can be retried later but it's failing due to an error
410411
elseif ($originalException instanceof DelayException) {
411-
throw new UnrecoverableRestException('Delay Interceptor triggered.');
412+
$delayPeriod = $exchangeBackup->getIn()->getHeader('delay');
413+
$delayExchangeEnvelope = new DelayedExchangeEnvelope($exchangeBackup, $delayPeriod);
414+
415+
$fromQueue = $exchangeBackup->getHeader('from');
416+
$this->deferExchangeMessage($delayExchangeEnvelope, $fromQueue);
412417
}
413418
elseif ($originalException instanceof RecoverableExceptionInterface && $retries < $this->retriesMax) {
414419

@@ -548,6 +553,10 @@ public function handle(MessageInterface $message, EndpointInterface $endpointFro
548553

549554
return;
550555
}
556+
} elseif ($message instanceof DelayedExchangeEnvelope) {
557+
$headers = $message->getBody()->getIn()->getHeaders();
558+
unset($headers['delay']);
559+
$message->getBody()->getIn()->setHeaders($headers);
551560
}
552561
}
553562
// Otherwise create the exchange
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
namespace Smartbox\Integration\FrameworkBundle\Core\Messages;
4+
5+
use Smartbox\CoreBundle\Type\SerializableArray;
6+
use Smartbox\Integration\FrameworkBundle\Core\Exchange;
7+
8+
class DelayedExchangeEnvelope extends ExchangeEnvelope
9+
{
10+
const HEADER_DELAY_PERIOD = 'delay_period';
11+
12+
public function __construct(Exchange $exchange, int $delayPeriod)
13+
{
14+
parent::__construct($exchange);
15+
16+
$this->setHeader(self::HEADER_DELAY_PERIOD, $delayPeriod);
17+
}
18+
19+
}

Core/Processors/ControlFlow/DelayInterceptor.php

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
use Smartbox\CoreBundle\Type\SerializableArray;
66
use Smartbox\Integration\FrameworkBundle\Core\Exchange;
77
use Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions\DelayException;
8-
use Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions\ThrottledException;
9-
use Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions\ThrottlingLimitReachedException;
108
use Smartbox\Integration\FrameworkBundle\Core\Processors\Processor;
119

1210
class DelayInterceptor extends Processor
@@ -49,12 +47,13 @@ public function setDelayPeriod(int $delayPeriod)
4947
* @param Exchange $exchange
5048
* @param SerializableArray $processingContext
5149
*
52-
* @throws ThrottlingLimitReachedException
53-
* @throws ThrottledException
50+
* @throws DelayException
5451
*/
5552
protected function doProcess(Exchange $exchange, SerializableArray $processingContext)
5653
{
57-
if ($this->delayPeriod > 0 || $exchange->getIn()->getHeader('delay') > 0) {
54+
$delayPeriodInSeconds = $exchange->getIn()->getHeader('delay') ?? null;
55+
56+
if (!is_null($delayPeriodInSeconds)) {
5857
throw new DelayException();
5958
}
6059
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace Smartbox\Integration\FrameworkBundle\Tests\Unit\Core\Messages;
4+
5+
use Smartbox\CoreBundle\Type\SerializableArray;
6+
use Smartbox\Integration\FrameworkBundle\Core\Exchange;
7+
use Smartbox\Integration\FrameworkBundle\Core\Messages\Context;
8+
use Smartbox\Integration\FrameworkBundle\Core\Messages\Message;
9+
use Smartbox\Integration\FrameworkBundle\Core\Messages\MessageInterface;
10+
use Smartbox\Integration\FrameworkBundle\Core\Messages\DelayedExchangeEnvelope;
11+
12+
class DelayedExchangeEnvelopeTest extends \PHPUnit\Framework\TestCase
13+
{
14+
/** @var DelayedExchangeEnvelope */
15+
private $delayedExchangeEnvelope;
16+
17+
protected function setUp()
18+
{
19+
/** @var Context|\PHPUnit_Framework_MockObject_MockObject $context */
20+
$context = $this->createMock(Context::class);
21+
22+
/** @var MessageInterface|\PHPUnit_Framework_MockObject_MockObject $message */
23+
$message = $this->createMock(MessageInterface::class);
24+
$message
25+
->expects($this->once())
26+
->method('getContext')
27+
->will($this->returnValue($context));
28+
29+
$exchange = new Exchange($message);
30+
31+
$this->delayedExchangeEnvelope = new DelayedExchangeEnvelope($exchange);
32+
}
33+
34+
protected function tearDown()
35+
{
36+
$this->delayedExchangeEnvelope = null;
37+
}
38+
39+
public function testConstruct()
40+
{
41+
$context = new Context();
42+
$message = new Message(null, [], $context);
43+
$exchange = new Exchange($message);
44+
$delayPeriod = 10;
45+
46+
$delayedExchangeEnvelope = new DelayedExchangeEnvelope($exchange, $delayPeriod);
47+
48+
$this->assertSame($exchange, $delayedExchangeEnvelope->getExchange());
49+
$this->assertSame($delayPeriod, $delayedExchangeEnvelope->getHeader(DelayedExchangeEnvelope::HEADER_DELAY_PERIOD));
50+
}
51+
}

0 commit comments

Comments
 (0)