Skip to content

Commit 82d61fd

Browse files
committed
feat: add job status service
Related: VCLOUD-593
1 parent 2af1cac commit 82d61fd

8 files changed

Lines changed: 187 additions & 0 deletions

File tree

Classes/Domain/AbstractScheduler.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,4 +343,8 @@ protected function validateGroupName(string $groupName): void
343343
throw new InvalidArgumentException(\sprintf('Group name "%s" is not active', $groupName), 1721393320);
344344
}
345345
}
346+
347+
public function getConnection(): Connection {
348+
return $this->dbal;
349+
}
346350
}

Classes/Domain/Scheduler.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Netlogix\JobQueue\Scheduled\Domain;
44

55
use Netlogix\JobQueue\Scheduled\Domain\Model\ScheduledJob;
6+
use Netlogix\JobQueue\Scheduled\Service\Connection;
67

78
interface Scheduler {
89

@@ -24,4 +25,5 @@ public function activity(ScheduledJob $job): void;
2425

2526
public function resetStaleJobs(string $groupName, int $minutes): int;
2627

28+
public function getConnection(): Connection;
2729
}

Classes/Service/Connection.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ public function fetchOne(string $query, array $params = [], array $types = [])
4949
});
5050
}
5151

52+
// requires dbal autocommit to be enabled
53+
public function fetchOneReadUncommited(string $query, array $params = [], array $types = [])
54+
{
55+
return $this->withAutoReconnectAndRetry(function () use ($query, $params, $types) {
56+
$this->dbal->executeQuery("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
57+
return $this->dbal->fetchOne($query, $params, $types);
58+
});
59+
}
60+
5261
public function executeQuery($sql, array $params = [], $types = [], ?QueryCacheProfile $qcp = null)
5362
{
5463
return $this->withAutoReconnectAndRetry(function () use ($sql, $params, $types, $qcp) {
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
<?php
2+
3+
namespace Netlogix\JobQueue\Scheduled\Service;
4+
5+
use Doctrine\DBAL\Types\Types;
6+
use Neos\Flow\Annotations as Flow;
7+
use Netlogix\JobQueue\Scheduled\Domain\Scheduler;
8+
use Netlogix\JobQueue\Scheduled\Domain\Model\ScheduledJob;
9+
10+
#[Flow\Scope("singleton")]
11+
abstract class JobStatusService {
12+
13+
#[Flow\Inject]
14+
protected Scheduler $scheduler;
15+
16+
public function getTotalJobCount(string $groupName): int {
17+
$tableName = ScheduledJob::TABLE_NAME;
18+
$query = <<<MySQL
19+
SELECT COUNT(*) FROM {$tableName}
20+
WHERE groupname = :groupName
21+
MySQL;
22+
return $this->fetchOne(
23+
$query,
24+
[
25+
'groupName' => $groupName
26+
],
27+
[
28+
'groupName' => Types::STRING
29+
]
30+
);
31+
}
32+
33+
public function getRunningJobCount(string $groupName): int {
34+
$tableName = ScheduledJob::TABLE_NAME;
35+
$query = <<<MySQL
36+
SELECT COUNT(*) FROM {$tableName}
37+
WHERE running = 1
38+
AND claimed NOT LIKE 'failed(%)'
39+
AND groupname = :groupName
40+
AND activity > NOW() - INTERVAL 2 SECOND
41+
MySQL;
42+
return $this->fetchOne(
43+
$query,
44+
[
45+
'groupName' => $groupName
46+
],
47+
[
48+
'groupName' => Types::STRING
49+
]
50+
);
51+
}
52+
53+
public function getPendingJobCount(string $groupName): int {
54+
$tableName = ScheduledJob::TABLE_NAME;
55+
$query = <<<MySQL
56+
SELECT COUNT(*) FROM {$tableName}
57+
WHERE ((running = 0
58+
AND claimed = '')
59+
OR running = 2)
60+
AND groupname = :groupName
61+
MySQL;
62+
return $this->fetchOne(
63+
$query,
64+
[
65+
'groupName' => $groupName
66+
],
67+
[
68+
'groupName' => Types::STRING
69+
]
70+
);
71+
}
72+
73+
public function getStaleJobCount(string $groupName, int $minutes): int {
74+
$tableName = ScheduledJob::TABLE_NAME;
75+
$query = <<<MySQL
76+
SELECT COUNT(*) FROM {$tableName}
77+
WHERE running = 1
78+
AND claimed NOT LIKE 'failed(%)'
79+
AND groupname = :groupName
80+
AND activity < NOW() - INTERVAL :minutes MINUTE
81+
MySQL;
82+
return $this->fetchOne(
83+
$query,
84+
[
85+
"groupName" => $groupName,
86+
"minutes" => $minutes
87+
],
88+
[
89+
"groupName" => Types::STRING,
90+
"minutes" => Types::INTEGER
91+
]
92+
);
93+
}
94+
95+
public function getFailedJobCount(string $groupName): int {
96+
$tableName = ScheduledJob::TABLE_NAME;
97+
$query = <<<MySQL
98+
SELECT COUNT(*) FROM {$tableName}
99+
WHERE claimed LIKE 'failed(%)'
100+
AND groupname = :groupName
101+
MySQL;
102+
return $this->fetchOne(
103+
$query,
104+
[
105+
'groupName' => $groupName
106+
],
107+
[
108+
'groupName' => Types::STRING
109+
]
110+
);
111+
}
112+
113+
protected function fetchOne(string $query, array $params = [], array $types = []) {
114+
return $this->scheduler->getConnection()->fetchOne($query, $params, $types);
115+
}
116+
117+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
namespace Netlogix\JobQueue\Scheduled\Service;
4+
5+
use Doctrine\DBAL\Connection;
6+
use Doctrine\DBAL\Platforms\MySqlPlatform;
7+
use Doctrine\DBAL\Platforms\PostgreSQL94Platform;
8+
use Doctrine\DBAL\Platforms\PostgreSqlPlatform;
9+
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
10+
11+
use Neos\Flow\Annotations as Flow;
12+
13+
class JobStatusServiceFactory {
14+
15+
#[Flow\Inject]
16+
protected Connection $connection;
17+
18+
#[Flow\Inject]
19+
protected ObjectManagerInterface $objectManager;
20+
21+
public function create(): JobStatusService
22+
{
23+
$platform = $this->connection->getDatabasePlatform();
24+
if ($platform instanceof MySqlPlatform) {
25+
return $this->objectManager->get(MySQLJobStatusService::class);
26+
}
27+
if ($platform instanceof PostgreSqlPlatform || $platform instanceof PostgreSQL94Platform) {
28+
return $this->objectManager->get(PostgreSQLJobStatusService::class);
29+
}
30+
throw new \InvalidArgumentException("unsupported database platform " . $this->connection->getDatabasePlatform()->getName());
31+
}
32+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
3+
namespace Netlogix\JobQueue\Scheduled\Service;
4+
5+
class MySQLJobStatusService extends JobStatusService {
6+
7+
protected function fetchOne(string $query, array $params = [], array $types = []) {
8+
return $this->scheduler->getConnection()->fetchOneReadUncommited($query, $params, $types);
9+
}
10+
11+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace Netlogix\JobQueue\Scheduled\Service;
4+
5+
class PostgreSQLJobStatusService extends JobStatusService {
6+
7+
}

Configuration/Objects.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,8 @@ Netlogix\JobQueue\Scheduled\Domain\Scheduler:
55
scope: singleton
66
factoryObjectName: Netlogix\JobQueue\Scheduled\Domain\SchedulerFactory
77
factoryMethodName: create
8+
9+
Netlogix\JobQueue\Scheduled\Service\JobStatusService:
10+
scope: singleton
11+
factoryObjectName: Netlogix\JobQueue\Scheduled\Service\JobStatusServiceFactory
12+
factoryMethodName: create

0 commit comments

Comments
 (0)