Skip to content

Commit 1eb84c5

Browse files
authored
Added RebalanceService to rebalance tasks from unavailable to available hosts (#43)
* Added RebalanceService to rebalance tasks from unavailable to available hosts
1 parent 8f62b22 commit 1eb84c5

4 files changed

Lines changed: 139 additions & 3 deletions

File tree

src/Model/Domain/TaskerPool/DiscoveryService.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ public function notify()
3232
*/
3333
public function getAvailableHostnames()
3434
{
35-
return implode(Identifier::DELIMITER, $this->repo->getAvailableHostnames());
35+
return implode(Identifier::DELIMITER, $this->getAvailableHostnamesAsArray());
36+
}
37+
38+
/**
39+
* @return array
40+
*/
41+
public function getAvailableHostnamesAsArray()
42+
{
43+
return $this->repo->getAvailableHostnames();
3644
}
3745
}

src/Model/Repository/Mysql/TaskRepository.php

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ class TaskRepository implements TaskRepositoryInterface
1515
const RESET_TASKS_AFTER_SECONDS = 60; // seconds to retry failed tasks
1616
const RESET_TASKS_LIMIT = 20; // how many tasks to reset at once
1717

18+
const REBALANCE_LIMIT = 1000; // how many tasks to rebalance at once
19+
const REBALANCE_TIME_IN_FUTURE = 3600; // seconds in future tasks to rebalance
20+
1821
/**
1922
* @var \PDO
2023
*/
@@ -100,6 +103,30 @@ private function fetchTasks($status, $olderThanSeconds, $limit)
100103
}, $stmt->fetchAll());
101104
}
102105

106+
/**
107+
* @param array $availableHostnames
108+
* @return array|int[]
109+
*/
110+
public function findTasksForRebalance(array $availableHostnames)
111+
{
112+
$query = sprintf('SELECT task_id FROM %s
113+
WHERE
114+
identifier NOT IN (:availableHostnames) AND status=:status AND ts_created <= :ts_created
115+
ORDER BY ts_created ASC LIMIT :limit',
116+
Consts::TASKS_TABLE_NAME
117+
);
118+
119+
$stmt = $this->pdo->prepare($query);
120+
$stmt->bindValue(':availableHostnames', implode(',', $availableHostnames));
121+
$stmt->bindValue(':status', Consts::STATUS_PENDING, \PDO::PARAM_INT);
122+
$stmt->bindValue(':limit', self::REBALANCE_LIMIT, \PDO::PARAM_INT);
123+
$stmt->bindValue(':ts_created', time() + self::REBALANCE_TIME_IN_FUTURE, \PDO::PARAM_INT);
124+
125+
$stmt->execute();
126+
127+
return $stmt->fetchAll(\PDO::FETCH_COLUMN);
128+
}
129+
103130
private function getIdentifier()
104131
{
105132
if ($this->identifier === null) {
@@ -157,7 +184,6 @@ public function addBulk($tasks)
157184
$sql .= implode(', ', $insertQuery);
158185

159186
$stmt = $this->pdo->prepare($sql);
160-
$stmt->bindValue(':mudo', Consts::TASKS_TABLE_NAME);
161187
$this->execute($stmt,$insertData);
162188
}
163189

@@ -193,6 +219,27 @@ public function updateStatus($status, Task ...$tasks)
193219
);
194220
}
195221

222+
/**
223+
* @param string$identifier
224+
* @param array $taskIds
225+
* @return void
226+
*/
227+
public function updateIdentifier($identifier, array $taskIds)
228+
{
229+
if (count($taskIds) === 0) {
230+
return;
231+
}
232+
233+
$query= sprintf(
234+
'UPDATE %s SET identifier="%s" WHERE task_id IN (%s)',
235+
Consts::TASKS_TABLE_NAME,
236+
$identifier,
237+
implode(',', $taskIds)
238+
);
239+
240+
$this->pdo->exec($query);
241+
}
242+
196243
/**
197244
* @param \PDOStatement $stmt
198245
* @param Task $task
@@ -228,4 +275,4 @@ private function execute(\PDOStatement $stmt, $data = null)
228275
}
229276
return $res;
230277
}
231-
}
278+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace G4\Tasker\Tasker2\Exception;
4+
5+
class NoAvailableHostsException extends \RuntimeException
6+
{
7+
}

src/Tasker2/RebalanceService.php

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
<?php
2+
3+
namespace G4\Tasker\Tasker2;
4+
5+
use G4\Tasker\Model\Domain\TaskerPool\DiscoveryService;
6+
use G4\Tasker\Model\Repository\Mysql\TaskRepository;
7+
use G4\Tasker\Tasker2\Exception\NoAvailableHostsException;
8+
9+
class RebalanceService
10+
{
11+
/**
12+
* @var DiscoveryService
13+
*/
14+
private $discoveryService;
15+
16+
/**
17+
* @var TaskRepository
18+
*/
19+
private $taskRepository;
20+
21+
/**
22+
* @var \G4\Log\ErrorLogger
23+
*/
24+
private $errorLogger;
25+
26+
public function __construct(DiscoveryService $discoveryService, TaskRepository $taskRepository)
27+
{
28+
$this->discoveryService = $discoveryService;
29+
$this->taskRepository = $taskRepository;
30+
}
31+
32+
public function setErrorLogger(\G4\Log\ErrorLogger $errorLogger)
33+
{
34+
$this->errorLogger = $errorLogger;
35+
return $this;
36+
}
37+
38+
/**
39+
* @return void
40+
*/
41+
public function rebalance()
42+
{
43+
$availableHostnames = $this->discoveryService->getAvailableHostnamesAsArray();
44+
45+
if (count($availableHostnames) === 0) {
46+
$this->errorLogger !== null && $this->errorLogger->log(new NoAvailableHostsException(
47+
'No available hosts found for ' . __CLASS__ . __METHOD__
48+
));
49+
return;
50+
}
51+
52+
$tasksForRebalance = $this->taskRepository->findTasksForRebalance($availableHostnames);
53+
if (count($tasksForRebalance) === 0) {
54+
return;
55+
}
56+
57+
foreach ($tasksForRebalance as $taskId) {
58+
$index = array_rand($availableHostnames);
59+
$tasks[$availableHostnames[$index]][] = (int) $taskId;
60+
}
61+
62+
print("Tasks rebalance results:\n");
63+
foreach ($tasks as $identifier => $taskIds) {
64+
$this->taskRepository->updateIdentifier($identifier, $taskIds);
65+
printf(
66+
"[%s] - Hostname %s rebalanced %d tasks to %s\n",
67+
date('Y-m-d H:i:s'),
68+
gethostname(),
69+
count($taskIds),
70+
$identifier
71+
);
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)