Skip to content

Commit 2162818

Browse files
authored
Merge pull request #18 from sanjamedic/master
000 - implement logic for high priority tasker queue
2 parents c97f2a6 + d1bb813 commit 2162818

4 files changed

Lines changed: 45 additions & 2 deletions

File tree

src/Consts.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class Consts
5151
const ORDER_BY_NAME_CREATED_ON = 'CREATED_ON';
5252
const ORDER_BY_NAME_PRIORITY = 'PRIORITY';
5353

54+
const PARAM_PRIORITY = 'priority';
55+
5456
public static function getMap()
5557
{
5658
return array(

src/Tasker2/Manager.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace G4\Tasker\Tasker2;
44

5+
use G4\Tasker\Consts;
56
use Model\Domain\RabbitMq\RabbitMqConsts;
67
use PhpAmqpLib\Connection\AMQPStreamConnection;
78
use PhpAmqpLib\Message\AMQPMessage;
@@ -76,11 +77,18 @@ private function addToMessageQueue()
7677
try {
7778
$messages = $this->getMessages();
7879
foreach ($messages as $message) {
80+
$decodedMessageBody = json_decode($message->getBody(), 1);
81+
$binding = ($this->messageOptions->hasBindingHP() && isset($decodedMessageBody[Consts::PARAM_PRIORITY])
82+
&& ($decodedMessageBody[Consts::PARAM_PRIORITY] > Consts::PRIORITY_50))
83+
? $this->messageOptions->getBindingHP()
84+
: $this->messageOptions->getBinding();
85+
7986
$channel->batch_basic_publish(
8087
$message,
8188
$this->messageOptions->getExchange(),
82-
$this->messageOptions->getBinding()
89+
$binding
8390
);
91+
8492
}
8593
$channel->publish_batch();
8694
} catch (\Exception $e) {

src/Tasker2/MessageOptions.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ class MessageOptions
1818
*/
1919
private $binding;
2020

21+
/**
22+
* @var StringLiteral
23+
*/
24+
private $bindingHP;
25+
2126
/**
2227
* @var int
2328
*/
@@ -54,4 +59,26 @@ public function getDeliveryMode()
5459
return (int) $this->deliveryMode;
5560
}
5661

62+
/**
63+
* @return string
64+
*/
65+
public function getBindingHP()
66+
{
67+
return (string) $this->bindingHP;
68+
}
69+
70+
/**
71+
* @return bool
72+
*/
73+
public function hasBindingHP()
74+
{
75+
return isset($this->bindingHP) && $this->bindingHP instanceof StringLiteral;
76+
}
77+
78+
public function setBindingHP(StringLiteral $bindingHP)
79+
{
80+
$this->bindingHP = $bindingHP;
81+
return $this;
82+
}
83+
5784
}

src/Tasker2/TaskQueue.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace G4\Tasker\Tasker2;
44

5+
use G4\Tasker\Consts;
56
use G4\Tasker\TaskAbstract;
67
use PhpAmqpLib\Connection\AMQPStreamConnection;
78
use PhpAmqpLib\Message\AMQPMessage;
@@ -105,10 +106,15 @@ private function saveCurrentTasks($tasks)
105106
}, $tasks);
106107

107108
foreach ($messages as $message) {
109+
$decodedMessageBody = json_decode($message->getBody(), 1);
110+
$binding = ($this->messageOptions->hasBindingHP() && isset($decodedMessageBody[Consts::PARAM_PRIORITY])
111+
&& ($decodedMessageBody[Consts::PARAM_PRIORITY] > Consts::PRIORITY_50))
112+
? $this->messageOptions->getBindingHP()
113+
: $this->messageOptions->getBinding();
108114
$channel->batch_basic_publish(
109115
$message,
110116
$this->messageOptions->getExchange(),
111-
$this->messageOptions->getBinding()
117+
$binding
112118
);
113119
}
114120
$channel->publish_batch();

0 commit comments

Comments
 (0)