|
5 | 5 | namespace Netlogix\JobQueue\Scheduled\Domain; |
6 | 6 |
|
7 | 7 | use DateTimeImmutable; |
| 8 | +use Doctrine\DBAL\Exception; |
8 | 9 | use Doctrine\DBAL\Exception\RetryableException; |
9 | 10 | use Doctrine\DBAL\Types\Types; |
10 | 11 | use InvalidArgumentException; |
@@ -40,6 +41,7 @@ abstract class AbstractScheduler implements Scheduler |
40 | 41 | protected const SELECT_QUERY = ""; |
41 | 42 | protected const RELEASE_QUERY = ""; |
42 | 43 | protected const SCHEDULE_QUERY = ""; |
| 44 | + protected const RESET_STALE_JOBS_QUERY = ""; |
43 | 45 |
|
44 | 46 | public function injectConnection(Connection $connection): void |
45 | 47 | { |
@@ -274,6 +276,28 @@ public function activity(ScheduledJob $job): void |
274 | 276 | ); |
275 | 277 | } |
276 | 278 |
|
| 279 | + /** |
| 280 | + * Reset stale jobs that have not changed for too long. |
| 281 | + * |
| 282 | + * @param string $groupName Free jobs in this group only |
| 283 | + * @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago |
| 284 | + * @throws Exception |
| 285 | + * @return int Number of freed jobs |
| 286 | + */ |
| 287 | + public function resetStaleJobs(string $groupName, int $minutes): int { |
| 288 | + return $this->dbal->executeQuery( |
| 289 | + sql: static::RESET_STALE_JOBS_QUERY, |
| 290 | + params: [ |
| 291 | + 'groupName' => $groupName, |
| 292 | + 'minutes' => max($minutes, 1), |
| 293 | + ], |
| 294 | + types: [ |
| 295 | + 'groupName' => Types::STRING, |
| 296 | + 'minutes' => Types::SMALLINT, |
| 297 | + ], |
| 298 | + )->rowCount(); |
| 299 | + } |
| 300 | + |
277 | 301 | protected function scheduleJob(ScheduledJob $job): void |
278 | 302 | { |
279 | 303 | $this->validateGroupName($job->getGroupName()); |
|
0 commit comments