Skip to content

Commit b8bdc60

Browse files
authored
Merge pull request #120 from uniplaces/add_fifo_queue_support
Add fifo queue support
2 parents 3b7044a + 70dc9cf commit b8bdc60

11 files changed

Lines changed: 284 additions & 122 deletions

File tree

docs/configuration.rst

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,31 @@ Queue Options
5959
Each queue can have their own options that determine how messages are published or received.
6060
The options and their descriptions are listed below.
6161

62-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
63-
| Option | Description | Default Value |
64-
+==========================+===========================================================================================+===============+
65-
| ``queue_name`` | The name used to describe the queue on the Provider's side | ``null`` |
66-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
67-
| ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` |
68-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
69-
| ``notification_retries`` | How many attempts notifications are resent in case of errors - if supported | ``3`` |
70-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
71-
| ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` |
72-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
73-
| ``message_timeout`` | Time in seconds a worker has to delete a Message before it is available to other workers | ``30`` |
74-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
75-
| ``message_expiration`` | Time in seconds that Messages may remain in the Queue before being removed | ``604800`` |
76-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
77-
| ``messages_to_receive`` | Maximum amount of messages that can be received when polling the queue | ``1`` |
78-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
79-
| ``receive_wait_time`` | If supported, time in seconds to leave the polling request open - for long polling | ``3`` |
80-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
81-
| ``subscribers`` | An array of Subscribers, containing an ``endpoint`` and ``protocol`` | ``empty`` |
82-
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
62+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
63+
| Option | Description | Default Value |
64+
+=================================+============================================================================================+===============+
65+
| ``queue_name`` | The name used to describe the queue on the Provider's side | ``null`` |
66+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
67+
| ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` |
68+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
69+
| ``notification_retries`` | How many attempts notifications are resent in case of errors - if supported | ``3`` |
70+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
71+
| ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` |
72+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
73+
| ``message_timeout`` | Time in seconds a worker has to delete a Message before it is available to other workers | ``30`` |
74+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
75+
| ``message_expiration`` | Time in seconds that Messages may remain in the Queue before being removed | ``604800`` |
76+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
77+
| ``messages_to_receive`` | Maximum amount of messages that can be received when polling the queue | ``1`` |
78+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
79+
| ``receive_wait_time`` | If supported, time in seconds to leave the polling request open - for long polling | ``3`` |
80+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
81+
| ``fifo`` | If supported (only aws), sets queue into FIFO mode | ``false`` |
82+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
83+
| ``content_based_deduplication`` | If supported (only aws), turns on automatic deduplication id based on the message content | ``false`` |
84+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
85+
| ``subscribers`` | An array of Subscribers, containing an ``endpoint`` and ``protocol`` | ``empty`` |
86+
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
8387

8488
Symfony Application as a Subscriber
8589
-----------------------------------
@@ -131,14 +135,32 @@ A working configuration would look like the following
131135
my_queue_key:
132136
provider: ironmq #or aws or in_band or another_aws_provider
133137
options:
134-
queue_name: my_actual_queue_name
135-
push_notifications: true
136-
notification_retries: 3
137-
message_delay: 0
138-
message_timeout: 30
139-
message_expiration: 604800
140-
messages_to_receive: 1
141-
receive_wait_time: 3
138+
queue_name: my_actual_queue_name
139+
push_notifications: true
140+
notification_retries: 3
141+
message_delay: 0
142+
message_timeout: 30
143+
message_expiration: 604800
144+
messages_to_receive: 1
145+
receive_wait_time: 3
146+
fifo: false
147+
content_based_deduplication: false
148+
subscribers:
149+
- { endpoint: http://example1.com/, protocol: http }
150+
- { endpoint: http://example2.com/, protocol: http }
151+
my_fifo_queue_key:
152+
provider: aws
153+
options:
154+
queue_name: my_actual_queue_name.fifo
155+
push_notifications: true
156+
notification_retries: 3
157+
message_delay: 0
158+
message_timeout: 30
159+
message_expiration: 604800
160+
messages_to_receive: 1
161+
receive_wait_time: 3
162+
fifo: true
163+
content_based_deduplication: true
142164
subscribers:
143165
- { endpoint: http://example1.com/, protocol: http }
144166
- { endpoint: http://example2.com/, protocol: http }

src/DependencyInjection/Configuration.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,14 @@ private function getQueuesNode()
188188
->info('How many push requests per second will be triggered. -1 for unlimited, 0 disables push')
189189
->example(1)
190190
->end()
191+
->booleanNode('fifo')
192+
->defaultFalse()
193+
->info('If the queue is FIFO (aws)')
194+
->end()
195+
->booleanNode('content_based_deduplication')
196+
->defaultFalse()
197+
->info('If the FIFO queue has content based deduplication (aws)')
198+
->end()
191199
->append($this->getSubscribersNode())
192200
->end()
193201
->end()

src/DependencyInjection/UecodeQPushExtension.php

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,22 @@
2828
use Symfony\Component\DependencyInjection\ContainerBuilder;
2929
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
3030
use Symfony\Component\Config\FileLocator;
31+
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
32+
use Symfony\Component\DependencyInjection\Exception\ServiceNotFoundException;
33+
use RuntimeException;
34+
use Exception;
3135

3236
/**
3337
* @author Keith Kirk <kkirk@undergroundelephant.com>
3438
*/
3539
class UecodeQPushExtension extends Extension
3640
{
41+
/**
42+
* @param array $configs
43+
* @param ContainerBuilder $container
44+
*
45+
* @throws RuntimeException|InvalidArgumentException|ServiceNotFoundException
46+
*/
3747
public function load(array $configs, ContainerBuilder $container)
3848
{
3949
$configuration = new Configuration();
@@ -112,9 +122,10 @@ public function load(array $configs, ContainerBuilder $container)
112122
]
113123
);
114124

115-
if (!empty($values['options']['queue_name'])
116-
&& $config['providers'][$provider]['driver'] == 'aws'
117-
) {
125+
$isProviderAWS = $config['providers'][$provider]['driver'] === 'aws';
126+
$isQueueNameSet = isset($values['options']['queue_name']) && !empty($values['options']['queue_name']);
127+
128+
if ($isQueueNameSet && $isProviderAWS) {
118129
$definition->addTag(
119130
'uecode_qpush.event_listener',
120131
[
@@ -123,6 +134,12 @@ public function load(array $configs, ContainerBuilder $container)
123134
'priority' => 255
124135
]
125136
);
137+
138+
// Check queue name ends with ".fifo"
139+
$isQueueNameFIFOReady = preg_match("/$(?<=(\.fifo))/", $values['options']['queue_name']) === 1;
140+
if ($values['options']['fifo'] === true && !$isQueueNameFIFOReady) {
141+
throw new InvalidArgumentException('Queue name must end with ".fifo" on AWS FIFO queues');
142+
}
126143
}
127144

128145
$name = sprintf('uecode_qpush.%s', $queue);
@@ -139,6 +156,8 @@ public function load(array $configs, ContainerBuilder $container)
139156
* @param ContainerBuilder $container The container
140157
* @param string $name The provider key
141158
*
159+
* @throws RuntimeException
160+
*
142161
* @return Reference
143162
*/
144163
private function createAwsClient($config, ContainerBuilder $container, $name)
@@ -150,9 +169,7 @@ private function createAwsClient($config, ContainerBuilder $container, $name)
150169
$aws2 = class_exists('Aws\Common\Aws');
151170
$aws3 = class_exists('Aws\Sdk');
152171
if (!$aws2 && !$aws3) {
153-
throw new \RuntimeException(
154-
'You must require "aws/aws-sdk-php" to use the AWS provider.'
155-
);
172+
throw new RuntimeException('You must require "aws/aws-sdk-php" to use the AWS provider.');
156173
}
157174

158175
$awsConfig = [
@@ -186,8 +203,7 @@ private function createAwsClient($config, ContainerBuilder $container, $name)
186203

187204
$aws->setArguments([$awsConfig]);
188205

189-
$container->setDefinition($service, $aws)
190-
->setPublic(false);
206+
$container->setDefinition($service, $aws)->setPublic(false);
191207
}
192208

193209
return new Reference($service);
@@ -200,6 +216,8 @@ private function createAwsClient($config, ContainerBuilder $container, $name)
200216
* @param ContainerBuilder $container The container
201217
* @param string $name The provider key
202218
*
219+
* @throws RuntimeException
220+
*
203221
* @return Reference
204222
*/
205223
private function createIronMQClient($config, ContainerBuilder $container, $name)
@@ -209,9 +227,7 @@ private function createIronMQClient($config, ContainerBuilder $container, $name)
209227
if (!$container->hasDefinition($service)) {
210228

211229
if (!class_exists('IronMQ\IronMQ')) {
212-
throw new \RuntimeException(
213-
'You must require "iron-io/iron_mq" to use the Iron MQ provider.'
214-
);
230+
throw new RuntimeException('You must require "iron-io/iron_mq" to use the Iron MQ provider.');
215231
}
216232

217233
$ironmq = new Definition('IronMQ\IronMQ');
@@ -225,13 +241,15 @@ private function createIronMQClient($config, ContainerBuilder $container, $name)
225241
]
226242
]);
227243

228-
$container->setDefinition($service, $ironmq)
229-
->setPublic(false);
244+
$container->setDefinition($service, $ironmq)->setPublic(false);
230245
}
231246

232247
return new Reference($service);
233248
}
234249

250+
/**
251+
* @return Reference
252+
*/
235253
private function createSyncClient()
236254
{
237255
return new Reference('event_dispatcher');

0 commit comments

Comments
 (0)