Skip to content

Commit 45dbbd7

Browse files
authored
Merge pull request #20 from ppavlovic/master
When RabbitMQ connection is not available add tasks to database
2 parents b5004ad + 6cc1586 commit 45dbbd7

4 files changed

Lines changed: 71 additions & 39 deletions

File tree

composer.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
}
2727
],
2828
"require": {
29-
"php" : ">=5.5",
29+
"ext-json" : "*",
30+
"ext-pdo" : "*",
31+
"php" : ">=5.6",
3032
"g4/cron" : "*",
3133
"g4/log" : "1.*"
3234
},
@@ -37,4 +39,4 @@
3739
"autoload": {
3840
"psr-4": {"G4\\Tasker\\": "src/"}
3941
}
40-
}
42+
}

src/Tasker2/Manager.php

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@
22

33
namespace G4\Tasker\Tasker2;
44

5-
use G4\Tasker\Consts;
6-
use Model\Domain\RabbitMq\RabbitMqConsts;
5+
use G4\Tasker\Tasker2\Queue\BatchPublisher;
76
use PhpAmqpLib\Connection\AMQPStreamConnection;
8-
use PhpAmqpLib\Message\AMQPMessage;
97

108
class Manager
119
{
@@ -17,7 +15,7 @@ class Manager
1715
private $tasks;
1816

1917
/**
20-
* @var AMQPStreamConnection
18+
* @var AMQPStreamConnection | null
2119
*/
2220
private $rabbitMqConnection;
2321

@@ -35,7 +33,7 @@ class Manager
3533

3634
public function __construct(
3735
\G4\Tasker\Model\Repository\TaskRepositoryInterface $taskRepository,
38-
AMQPStreamConnection $rabbitMqConnection,
36+
AMQPStreamConnection $rabbitMqConnection = null,
3937
MessageOptions $messageOptions
4038
) {
4139
$this->taskRepository = $taskRepository;
@@ -46,6 +44,11 @@ public function __construct(
4644

4745
public function run()
4846
{
47+
if ($this->rabbitMqConnection === null) {
48+
// no rabbitmq connection is available
49+
trigger_error('RabbitMQ connection is not available for Tasker Manager', E_USER_NOTICE);
50+
return;
51+
}
4952
$this
5053
->getReservedTasks()
5154
->addToMessageQueue()
@@ -85,21 +88,8 @@ private function addToMessageQueue()
8588

8689
try {
8790
$messages = $this->getMessages();
88-
foreach ($messages as $message) {
89-
$decodedMessageBody = json_decode($message->getBody(), 1);
90-
$binding = ($this->messageOptions->hasBindingHP() && isset($decodedMessageBody[Consts::PARAM_PRIORITY])
91-
&& ($decodedMessageBody[Consts::PARAM_PRIORITY] > Consts::PRIORITY_50))
92-
? $this->messageOptions->getBindingHP()
93-
: $this->messageOptions->getBinding();
94-
95-
$channel->batch_basic_publish(
96-
$message,
97-
$this->messageOptions->getExchange(),
98-
$binding
99-
);
100-
101-
}
102-
$channel->publish_batch();
91+
$queuePublisher = new BatchPublisher($channel, $this->messageOptions);
92+
$queuePublisher->publish(...$messages);
10393
} catch (\Exception $e) {
10494
// todo throw exception if unable to add messages to queue
10595
} finally {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
3+
namespace G4\Tasker\Tasker2\Queue;
4+
5+
use G4\Tasker\Consts;
6+
use G4\Tasker\Tasker2\MessageOptions;
7+
use PhpAmqpLib\Channel\AMQPChannel;
8+
use PhpAmqpLib\Message\AMQPMessage;
9+
10+
class BatchPublisher
11+
{
12+
/**
13+
* @var MessageOptions
14+
*/
15+
private $messageOptions;
16+
/**
17+
* @var AMQPChannel
18+
*/
19+
private $channel;
20+
21+
public function __construct(AMQPChannel $channel, MessageOptions $messageOptions)
22+
{
23+
$this->channel = $channel;
24+
$this->messageOptions = $messageOptions;
25+
}
26+
27+
public function publish(AMQPMessage ...$messages)
28+
{
29+
foreach ($messages as $message) {
30+
$decodedMessageBody = json_decode($message->getBody(), true);
31+
$binding = ($this->messageOptions->hasBindingHP() && isset($decodedMessageBody[Consts::PARAM_PRIORITY])
32+
&& ($decodedMessageBody[Consts::PARAM_PRIORITY] > Consts::PRIORITY_50))
33+
? $this->messageOptions->getBindingHP()
34+
: $this->messageOptions->getBinding();
35+
$this->channel->batch_basic_publish(
36+
$message,
37+
$this->messageOptions->getExchange(),
38+
$binding
39+
);
40+
}
41+
$this->channel->publish_batch();
42+
}
43+
}

src/Tasker2/TaskQueue.php

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22

33
namespace G4\Tasker\Tasker2;
44

5-
use G4\Tasker\Consts;
65
use G4\Tasker\TaskAbstract;
6+
use G4\Tasker\Tasker2\Queue\BatchPublisher;
77
use PhpAmqpLib\Connection\AMQPStreamConnection;
8-
use PhpAmqpLib\Message\AMQPMessage;
98
use G4\ValueObject\Uuid;
109

1110
class TaskQueue
@@ -16,7 +15,7 @@ class TaskQueue
1615
private $queue;
1716

1817
/**
19-
* @var AMQPStreamConnection
18+
* @var AMQPStreamConnection | null
2019
*/
2120
private $AMQPConnection;
2221

@@ -37,7 +36,7 @@ class TaskQueue
3736

3837
public function __construct(
3938
\G4\Tasker\Queue $queue,
40-
AMQPStreamConnection $AMQPConnection,
39+
AMQPStreamConnection $AMQPConnection = null,
4140
MessageOptions $messageOptions,
4241
$requestUuid = null
4342
)
@@ -97,6 +96,14 @@ private function saveCurrentTasks($tasks)
9796
if (count($tasks) === 0) {
9897
return $this;
9998
}
99+
100+
if ($this->AMQPConnection === null) {
101+
// in case that rabbitmq is not available save tasks to database
102+
$this->saveDelayedTasks($tasks);
103+
trigger_error('RabbitMQ connection is not available for Tasker TaskQueue', E_USER_NOTICE);
104+
return $this;
105+
}
106+
100107
$channel = $this->AMQPConnection->channel();
101108

102109
$messages = array_map(function (TaskAbstract $taskAbstract) {
@@ -105,20 +112,10 @@ private function saveCurrentTasks($tasks)
105112
return (new AmqpMessageFactory($task, $this->messageOptions->getDeliveryMode()))->create();
106113
}, $tasks);
107114

108-
foreach ($messages as $message) {
109-
$decodedMessageBody = json_decode($message->getBody(), 1);
110-
$binding = ($this->messageOptions->hasBindingHP() && isset($decodedMessageBody[Consts::PARAM_PRIORITY])
111-
&& ($decodedMessageBody[Consts::PARAM_PRIORITY] > Consts::PRIORITY_50))
112-
? $this->messageOptions->getBindingHP()
113-
: $this->messageOptions->getBinding();
114-
$channel->batch_basic_publish(
115-
$message,
116-
$this->messageOptions->getExchange(),
117-
$binding
118-
);
119-
}
120-
$channel->publish_batch();
115+
$queuePublisher = new BatchPublisher($channel, $this->messageOptions);
116+
$queuePublisher->publish(...$messages);
121117
$channel->close();
118+
return $this;
122119
}
123120

124121
private function getRequestUuid()

0 commit comments

Comments
 (0)