Skip to content

Commit 77e1fe4

Browse files
authored
Merge pull request #134 from smartboxgroup/feature/delay-interceptor
Feature/delay interceptor
2 parents fdc77e9 + 9ce49f8 commit 77e1fe4

7 files changed

Lines changed: 200 additions & 0 deletions

File tree

Core/Handlers/MessageHandler.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@
22

33
namespace Smartbox\Integration\FrameworkBundle\Core\Handlers;
44

5+
use Smartbox\Integration\FrameworkBundle\Components\WebService\Rest\Exceptions\UnrecoverableRestException;
56
use Smartbox\Integration\FrameworkBundle\Configurability\Routing\InternalRouter;
67
use Smartbox\Integration\FrameworkBundle\Core\Endpoints\EndpointFactory;
78
use Smartbox\Integration\FrameworkBundle\Core\Endpoints\EndpointInterface;
89
use Smartbox\Integration\FrameworkBundle\Core\Exchange;
910
use Smartbox\Integration\FrameworkBundle\Core\Messages\Context;
1011
use Smartbox\Integration\FrameworkBundle\Core\Messages\DeferredExchangeEnvelope;
12+
use Smartbox\Integration\FrameworkBundle\Core\Messages\DelayedExchangeEnvelope;
1113
use Smartbox\Integration\FrameworkBundle\Core\Messages\ErrorExchangeEnvelope;
1214
use Smartbox\Integration\FrameworkBundle\Core\Messages\ExchangeEnvelope;
1315
use Smartbox\Integration\FrameworkBundle\Core\Messages\FailedExchangeEnvelope;
1416
use Smartbox\Integration\FrameworkBundle\Core\Messages\MessageInterface;
1517
use Smartbox\Integration\FrameworkBundle\Core\Messages\RetryExchangeEnvelope;
1618
use Smartbox\Integration\FrameworkBundle\Core\Messages\ThrottledExchangeEnvelope;
19+
use Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions\DelayException;
1720
use Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions\ProcessingException;
1821
use Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions\ThrottledException;
1922
use Smartbox\Integration\FrameworkBundle\Core\Processors\Processor;
@@ -405,6 +408,13 @@ public function onHandleException(
405408
$this->dispatchEvent($exchangeBackup, HandlerEvent::THROTTLING_HANDLE_EVENT_NAME);
406409

407410
} // If it's an exchange that can be retried later but it's failing due to an error
411+
elseif ($originalException instanceof DelayException) {
412+
$delayPeriod = $exchangeBackup->getIn()->getHeader('delay');
413+
$delayExchangeEnvelope = new DelayedExchangeEnvelope($exchangeBackup, $delayPeriod);
414+
415+
$fromQueue = $exchangeBackup->getHeader('from');
416+
$this->deferExchangeMessage($delayExchangeEnvelope, $fromQueue);
417+
}
408418
elseif ($originalException instanceof RecoverableExceptionInterface && $retries < $this->retriesMax) {
409419

410420
$retryExchangeEnvelope = new RetryExchangeEnvelope($exchangeBackup, $exception->getProcessingContext(), $retries + 1);
@@ -543,6 +553,10 @@ public function handle(MessageInterface $message, EndpointInterface $endpointFro
543553

544554
return;
545555
}
556+
} elseif ($message instanceof DelayedExchangeEnvelope) {
557+
$headers = $message->getBody()->getIn()->getHeaders();
558+
unset($headers['delay']);
559+
$message->getBody()->getIn()->setHeaders($headers);
546560
}
547561
}
548562
// Otherwise create the exchange
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
namespace Smartbox\Integration\FrameworkBundle\Core\Messages;
4+
5+
use Smartbox\CoreBundle\Type\SerializableArray;
6+
use Smartbox\Integration\FrameworkBundle\Core\Exchange;
7+
8+
class DelayedExchangeEnvelope extends ExchangeEnvelope
9+
{
10+
const HEADER_DELAY_PERIOD = 'delay_period';
11+
12+
public function __construct(Exchange $exchange, int $delayPeriod)
13+
{
14+
parent::__construct($exchange);
15+
16+
$this->setHeader(self::HEADER_DELAY_PERIOD, $delayPeriod);
17+
}
18+
19+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
<?php
2+
3+
namespace Smartbox\Integration\FrameworkBundle\Core\Processors\ControlFlow;
4+
5+
use Smartbox\CoreBundle\Type\SerializableArray;
6+
use Smartbox\Integration\FrameworkBundle\Core\Exchange;
7+
use Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions\DelayException;
8+
use Smartbox\Integration\FrameworkBundle\Core\Processors\Processor;
9+
10+
class DelayInterceptor extends Processor
11+
{
12+
protected $delayPeriod = 0;
13+
14+
/**
15+
* @return bool
16+
*/
17+
public function isRuntimeBreakpoint()
18+
{
19+
return $this->runtimeBreakpoint;
20+
}
21+
22+
/**
23+
* @param bool $runtimeBreakpoint
24+
*/
25+
public function setRuntimeBreakpoint($runtimeBreakpoint)
26+
{
27+
$this->runtimeBreakpoint = $runtimeBreakpoint;
28+
}
29+
30+
/**
31+
* @return int
32+
*/
33+
public function getDelayPeriod()
34+
{
35+
return $this->delayPeriod;
36+
}
37+
38+
/**
39+
* @param int $periodMs
40+
*/
41+
public function setDelayPeriod(int $delayPeriod)
42+
{
43+
$this->delayPeriod = $delayPeriod;
44+
}
45+
46+
/**
47+
* @param Exchange $exchange
48+
* @param SerializableArray $processingContext
49+
*
50+
* @throws DelayException
51+
*/
52+
protected function doProcess(Exchange $exchange, SerializableArray $processingContext)
53+
{
54+
$delayPeriodInSeconds = $exchange->getIn()->getHeader('delay') ?? null;
55+
56+
if (!is_null($delayPeriodInSeconds)) {
57+
throw new DelayException();
58+
}
59+
}
60+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions;
4+
5+
class DelayException extends \Exception
6+
{
7+
8+
}

Resources/config/services.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ parameters:
99
smartesb.processor.process.class: Smartbox\Integration\FrameworkBundle\Core\Processors\Miscellaneous\Process
1010
smartesb.processor.try_catch.class: Smartbox\Integration\FrameworkBundle\Core\Processors\ControlFlow\TryCatch
1111
smartesb.processor.throttle.class: Smartbox\Integration\FrameworkBundle\Core\Processors\ControlFlow\Throttler
12+
smartesb.processor.delay.class: Smartbox\Integration\FrameworkBundle\Core\Processors\ControlFlow\DelayInterceptor
1213
smartesb.processor.recipient_list.class: Smartbox\Integration\FrameworkBundle\Core\Processors\Routing\RecipientList
1314

1415
smartesb.handlers.message_routing.class: Smartbox\Integration\FrameworkBundle\Core\Handlers\MessageHandler
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace Smartbox\Integration\FrameworkBundle\Tests\Unit\Core\Messages;
4+
5+
use Smartbox\CoreBundle\Type\SerializableArray;
6+
use Smartbox\Integration\FrameworkBundle\Core\Exchange;
7+
use Smartbox\Integration\FrameworkBundle\Core\Messages\Context;
8+
use Smartbox\Integration\FrameworkBundle\Core\Messages\Message;
9+
use Smartbox\Integration\FrameworkBundle\Core\Messages\MessageInterface;
10+
use Smartbox\Integration\FrameworkBundle\Core\Messages\DelayedExchangeEnvelope;
11+
12+
class DelayedExchangeEnvelopeTest extends \PHPUnit\Framework\TestCase
13+
{
14+
/** @var DelayedExchangeEnvelope */
15+
private $delayedExchangeEnvelope;
16+
17+
protected function setUp()
18+
{
19+
/** @var Context|\PHPUnit_Framework_MockObject_MockObject $context */
20+
$context = $this->createMock(Context::class);
21+
22+
/** @var MessageInterface|\PHPUnit_Framework_MockObject_MockObject $message */
23+
$message = $this->createMock(MessageInterface::class);
24+
$message
25+
->expects($this->once())
26+
->method('getContext')
27+
->will($this->returnValue($context));
28+
29+
$exchange = new Exchange($message);
30+
31+
$this->delayedExchangeEnvelope = new DelayedExchangeEnvelope($exchange);
32+
}
33+
34+
protected function tearDown()
35+
{
36+
$this->delayedExchangeEnvelope = null;
37+
}
38+
39+
public function testConstruct()
40+
{
41+
$context = new Context();
42+
$message = new Message(null, [], $context);
43+
$exchange = new Exchange($message);
44+
$delayPeriod = 10;
45+
46+
$delayedExchangeEnvelope = new DelayedExchangeEnvelope($exchange, $delayPeriod);
47+
48+
$this->assertSame($exchange, $delayedExchangeEnvelope->getExchange());
49+
$this->assertSame($delayPeriod, $delayedExchangeEnvelope->getHeader(DelayedExchangeEnvelope::HEADER_DELAY_PERIOD));
50+
}
51+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
3+
namespace Smartbox\Integration\FrameworkBundle\Tests\Unit\Core\Processors\ControlFlow;
4+
5+
use Smartbox\CoreBundle\Tests\Fixtures\Entity\TestEntity;
6+
use Smartbox\Integration\FrameworkBundle\Core\Exchange;
7+
use Smartbox\Integration\FrameworkBundle\Core\Messages\Message;
8+
use Smartbox\Integration\FrameworkBundle\Core\Processors\ControlFlow\DelayInterceptor;
9+
use Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions\DelayException;
10+
use Smartbox\Integration\FrameworkBundle\Core\Processors\Exceptions\ProcessingException;
11+
use Symfony\Component\EventDispatcher\EventDispatcher;
12+
13+
/**
14+
* Class DelayInterceptorTest.
15+
*
16+
* @coversDefaultClass \Smartbox\Integration\FrameworkBundle\Core\Processors\ControlFlow\DelayInterceptorTest
17+
*/
18+
class DelayInterceptorTest extends \PHPUnit\Framework\TestCase
19+
{
20+
/**
21+
* Test that when we process a message that shall not pass and the processor is asyncDelayed that we do throw an
22+
* Exception and that exception has a delay set.
23+
*/
24+
public function testExceptionsDelayIsSet()
25+
{
26+
$throttlerMock = $this->getMockBuilder(DelayInterceptor::class)
27+
->disableOriginalConstructor()
28+
->getMock();
29+
30+
$eventDispatcherMock = $this->createMock(EventDispatcher::class);
31+
$throttlerMock->setEventDispatcher($eventDispatcherMock);
32+
33+
$exchange = new Exchange(new Message(new TestEntity()));
34+
35+
//We do not use expectException, instead we want to actually inspect what is in the exception
36+
$thrown = false;
37+
try {
38+
$throttlerMock->process($exchange);
39+
} catch (\Exception $e) {
40+
$thrown = true;
41+
$this->assertInstanceOf(ProcessingException::class, $e);
42+
$this->assertInstanceOf(DelayException::class, $e->getOriginalException());
43+
}
44+
45+
$this->assertTrue($thrown, 'Process did not throw an exception');
46+
}
47+
}

0 commit comments

Comments
 (0)