2222
2323namespace Uecode \Bundle \QPushBundle \Provider ;
2424
25- use IronMQ ;
25+ use IronMQ \ IronMQ ;
2626use Doctrine \Common \Cache \Cache ;
2727use Symfony \Bridge \Monolog \Logger ;
2828use Symfony \Component \EventDispatcher \EventDispatcherInterface ;
@@ -71,10 +71,13 @@ public function create()
7171 {
7272 if ($ this ->options ['push_notifications ' ]) {
7373 $ params = [
74- 'push_type ' => 'multicast ' ,
75- 'retries ' => $ this ->options ['notification_retries ' ],
76- 'retry_delay ' => $ this ->options ['notification_retry_delay ' ],
77- 'subscribers ' => []
74+ 'type ' => $ this ->options ['push_type ' ],
75+ 'push ' => [
76+ 'rate_limit ' => $ this ->options ['rate_limit ' ],
77+ 'retries ' => $ this ->options ['notification_retries ' ],
78+ 'retries_delay ' => $ this ->options ['notification_retries_delay ' ],
79+ 'subscribers ' => []
80+ ]
7881 ];
7982
8083 foreach ($ this ->options ['subscribers ' ] as $ subscriber ) {
@@ -84,14 +87,14 @@ public function create()
8487 );
8588 }
8689
87- $ params ['subscribers ' ][] = ['url ' => $ subscriber ['endpoint ' ]];
90+ $ params ['push ' ][ ' subscribers ' ][] = ['url ' => $ subscriber ['endpoint ' ]];
8891 }
8992
9093 } else {
9194 $ params = ['push_type ' => 'pull ' ];
9295 }
9396
94- $ result = $ this ->ironmq ->updateQueue ($ this ->getNameWithPrefix (), $ params );
97+ $ result = $ this ->ironmq ->createQueue ($ this ->getNameWithPrefix (), $ params );
9598 $ this ->queue = $ result ;
9699
97100 $ key = $ this ->getNameWithPrefix ();
@@ -301,4 +304,64 @@ public function onMessageReceived(MessageEvent $event)
301304
302305 $ event ->stopPropagation ();
303306 }
307+
308+ /**
309+ * Get queue info
310+ *
311+ * This allows to get queue size. Allowing to know if processing is finished or not
312+ *
313+ * @return stdObject|null
314+ */
315+ public function queueInfo ()
316+ {
317+ if ($ this ->queueExists ()) {
318+ $ key = $ this ->getNameWithPrefix ();
319+ $ this ->queue = $ this ->ironmq ->getQueue ($ key );
320+
321+ return $ this ->queue ;
322+ }
323+
324+ return null ;
325+ }
326+
327+ /**
328+ * Publishes multiple message at once
329+ *
330+ * @param array $messages
331+ * @param array $options
332+ *
333+ * @return array
334+ */
335+ public function publishMessages (array $ messages , array $ options = [])
336+ {
337+ $ options = $ this ->mergeOptions ($ options );
338+ $ publishStart = microtime (true );
339+
340+ if (!$ this ->queueExists ()) {
341+ $ this ->create ();
342+ }
343+
344+ $ encodedMessages = [];
345+ foreach ($ messages as $ message ) {
346+ $ encodedMessages [] = json_encode ($ message + ['_qpush_queue ' => $ this ->name ]);
347+ }
348+
349+ $ result = $ this ->ironmq ->postMessages (
350+ $ this ->getNameWithPrefix (),
351+ $ encodedMessages ,
352+ [
353+ 'timeout ' => $ options ['message_timeout ' ],
354+ 'delay ' => $ options ['message_delay ' ],
355+ 'expires_in ' => $ options ['message_expiration ' ]
356+ ]
357+ );
358+
359+ $ context = [
360+ 'message_ids ' => $ result ->ids ,
361+ 'publish_time ' => microtime (true ) - $ publishStart
362+ ];
363+ $ this ->log (200 , "Messages have been published. " , $ context );
364+
365+ return $ result ->ids ;
366+ }
304367}
0 commit comments