Skip to content

Commit 4dea19c

Browse files
Merge pull request #190 from stackkit/feature/queue-pause-resume
Add queue pause/resume
2 parents 86a30b5 + 3d97763 commit 4dea19c

9 files changed

Lines changed: 210 additions & 1 deletion

phpstan.neon

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ parameters:
66
- src
77
level: 9
88
ignoreErrors:
9-
- "/dispatchAfterCommit with no type specified/"
9+
- "/dispatchAfterCommit with no type specified/"
10+
- message: "#Illuminate\\\\Queue\\\\Events\\\\Queue(Paused|Resumed)#"
11+
identifier: class.notFound

src/CloudTasksApi.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
* @method static void deleteTask(string $taskName)
1313
* @method static Task getTask(string $taskName)
1414
* @method static bool exists(string $taskName)
15+
* @method static void pause(string $queue)
16+
* @method static void resume(string $queue)
1517
*/
1618
class CloudTasksApi extends Facade
1719
{

src/CloudTasksApiConcrete.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
use Google\Cloud\Tasks\V2\GetTaskRequest;
1010
use Google\Cloud\Tasks\V2\CreateTaskRequest;
1111
use Google\Cloud\Tasks\V2\DeleteTaskRequest;
12+
use Google\Cloud\Tasks\V2\PauseQueueRequest;
13+
use Google\Cloud\Tasks\V2\ResumeQueueRequest;
1214
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
1315

1416
class CloudTasksApiConcrete implements CloudTasksApiContract
@@ -65,4 +67,14 @@ public function exists(string $taskName): bool
6567

6668
return false;
6769
}
70+
71+
public function pause(string $queue): void
72+
{
73+
$this->client->pauseQueue(PauseQueueRequest::build($queue));
74+
}
75+
76+
public function resume(string $queue): void
77+
{
78+
$this->client->resumeQueue(ResumeQueueRequest::build($queue));
79+
}
6880
}

src/CloudTasksApiContract.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,8 @@ public function deleteTask(string $taskName): void;
1515
public function getTask(string $taskName): Task;
1616

1717
public function exists(string $taskName): bool;
18+
19+
public function pause(string $queue): void;
20+
21+
public function resume(string $queue): void;
1822
}

src/CloudTasksApiFake.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ class CloudTasksApiFake implements CloudTasksApiContract
2323
*/
2424
public array $deletedTasks = [];
2525

26+
/**
27+
* @var array<string, true>
28+
*/
29+
public array $pausedQueues = [];
30+
2631
public function createTask(string $queueName, Task $task): Task
2732
{
2833
$this->createdTasks[] = compact('queueName', 'task');
@@ -51,6 +56,16 @@ public function exists(string $taskName): bool
5156
return false;
5257
}
5358

59+
public function pause(string $queue): void
60+
{
61+
$this->pausedQueues[$queue] = true;
62+
}
63+
64+
public function resume(string $queue): void
65+
{
66+
unset($this->pausedQueues[$queue]);
67+
}
68+
5469
public function assertTaskDeleted(string $taskName): void
5570
{
5671
Assert::assertTrue(
@@ -85,4 +100,14 @@ public function assertCreatedTaskCount(int $count): void
85100
{
86101
Assert::assertCount($count, $this->createdTasks);
87102
}
103+
104+
public function assertQueuePaused(string $queue): void
105+
{
106+
Assert::assertArrayHasKey($queue, $this->pausedQueues, 'Expected queue ['.$queue.'] to be paused, but it is not.');
107+
}
108+
109+
public function assertQueueNotPaused(string $queue): void
110+
{
111+
Assert::assertArrayNotHasKey($queue, $this->pausedQueues, 'Expected queue ['.$queue.'] to not be paused, but it is.');
112+
}
88113
}

src/CloudTasksQueue.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,4 +467,18 @@ private function getCloudRunJobEnvVars(string $encodedPayload, string $taskName)
467467

468468
return $envVars;
469469
}
470+
471+
public function pause(string $queue): void
472+
{
473+
$queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue);
474+
475+
CloudTasksApi::pause($queueName);
476+
}
477+
478+
public function resume(string $queue): void
479+
{
480+
$queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue);
481+
482+
CloudTasksApi::resume($queueName);
483+
}
470484
}

src/CloudTasksServiceProvider.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77
use Illuminate\Routing\Router;
88
use Illuminate\Events\Dispatcher;
99
use Illuminate\Queue\QueueManager;
10+
use Illuminate\Support\Facades\Queue;
1011
use Illuminate\Foundation\Application;
1112
use Illuminate\Queue\Events\JobFailed;
13+
use Illuminate\Queue\Events\QueuePaused;
14+
use Illuminate\Queue\Events\QueueResumed;
1215
use Illuminate\Contracts\Debug\ExceptionHandler;
1316
use Illuminate\Queue\Events\JobExceptionOccurred;
1417
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
@@ -113,6 +116,26 @@ private function registerEvents(): void
113116
return;
114117
}
115118
});
119+
120+
if (! class_exists(QueuePaused::class)) {
121+
return;
122+
}
123+
124+
$events->listen(QueuePaused::class, function (QueuePaused $event) {
125+
$queue = Queue::connection($event->connection);
126+
127+
if ($queue instanceof CloudTasksQueue) {
128+
$queue->pause($event->queue);
129+
}
130+
});
131+
132+
$events->listen(QueueResumed::class, function (QueueResumed $event) {
133+
$queue = Queue::connection($event->connection);
134+
135+
if ($queue instanceof CloudTasksQueue) {
136+
$queue->resume($event->queue);
137+
}
138+
});
116139
}
117140

118141
private function registerCommands(): void

tests/CloudTasksApiTest.php

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
use Google\ApiCore\ApiException;
1010
use Google\Cloud\Tasks\V2\HttpMethod;
1111
use Google\Cloud\Tasks\V2\HttpRequest;
12+
use Google\Cloud\Tasks\V2\Queue\State;
1213
use PHPUnit\Framework\Attributes\Test;
14+
use Google\Cloud\Tasks\V2\GetQueueRequest;
1315
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
1416
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi;
1517

@@ -138,4 +140,88 @@ public function test_delete_task()
138140
$this->expectExceptionMessage('NOT_FOUND');
139141
CloudTasksApi::getTask($task->getName());
140142
}
143+
144+
#[Test]
145+
public function it_can_pause_queues(): void
146+
{
147+
$queueName = $this->client->queueName(
148+
env('CI_CLOUD_TASKS_PROJECT_ID'),
149+
env('CI_CLOUD_TASKS_LOCATION'),
150+
env('CI_CLOUD_TASKS_QUEUE').'-pause'
151+
);
152+
153+
$this->ensureQueueIs($queueName, State::RUNNING);
154+
155+
// Act
156+
CloudTasksApi::pause($queueName);
157+
158+
// Assert
159+
$this->assertEquals(State::PAUSED, $this->waitForQueueState($queueName, State::PAUSED));
160+
}
161+
162+
#[Test]
163+
public function it_can_resume_queues(): void
164+
{
165+
$queueName = $this->client->queueName(
166+
env('CI_CLOUD_TASKS_PROJECT_ID'),
167+
env('CI_CLOUD_TASKS_LOCATION'),
168+
env('CI_CLOUD_TASKS_QUEUE').'-pause'
169+
);
170+
171+
$this->ensureQueueIs($queueName, State::PAUSED);
172+
173+
// Act
174+
CloudTasksApi::resume($queueName);
175+
176+
// Assert
177+
$this->assertEquals(State::RUNNING, $this->waitForQueueState($queueName, State::RUNNING));
178+
}
179+
180+
private function getQueueState(string $queue): int
181+
{
182+
return $this->client->getQueue(GetQueueRequest::build($queue))->getState();
183+
}
184+
185+
private function waitForQueueState(string $queue, int $waitForState): ?int
186+
{
187+
$state = null;
188+
$attempts = 0;
189+
190+
while ($state !== $waitForState) {
191+
$state = $this->getQueueState($queue);
192+
193+
if ($state === $waitForState) {
194+
return $state;
195+
}
196+
197+
$attempts++;
198+
199+
if ($attempts >= 10) {
200+
break;
201+
}
202+
203+
sleep(1);
204+
}
205+
206+
return $state;
207+
}
208+
209+
private function ensureQueueIs(string $queue, int $desiredState): void
210+
{
211+
$currentState = $this->getQueueState($queue);
212+
213+
if ($currentState === $desiredState) {
214+
return;
215+
}
216+
217+
if ($currentState === State::RUNNING && $desiredState === State::PAUSED) {
218+
CloudTasksApi::pause($queue);
219+
}
220+
221+
if ($currentState === State::PAUSED && $desiredState === State::RUNNING) {
222+
CloudTasksApi::resume($queue);
223+
}
224+
225+
$this->assertEquals($desiredState, $this->waitForQueueState($queue, $desiredState));
226+
}
141227
}

tests/PauseResumeQueueTest.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests;
6+
7+
use PHPUnit\Framework\Attributes\Test;
8+
use Illuminate\Support\Facades\Artisan;
9+
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi;
10+
11+
class PauseResumeQueueTest extends TestCase
12+
{
13+
protected function setUp(): void
14+
{
15+
parent::setUp();
16+
17+
if (version_compare(app()->version(), '12.0.0', '<')) {
18+
$this->markTestSkipped('This feature only exists in Laravel 12 and up.');
19+
}
20+
21+
CloudTasksApi::fake();
22+
}
23+
24+
#[Test]
25+
public function queue_can_be_paused(): void
26+
{
27+
Artisan::call('queue:pause my-cloudtasks-connection:barbequeue');
28+
29+
CloudTasksApi::assertQueuePaused('projects/my-test-project/locations/europe-west6/queues/barbequeue');
30+
}
31+
32+
#[Test]
33+
public function queue_can_be_resumed(): void
34+
{
35+
Artisan::call('queue:pause my-cloudtasks-connection:barbequeue');
36+
CloudTasksApi::assertQueuePaused('projects/my-test-project/locations/europe-west6/queues/barbequeue');
37+
38+
Artisan::call('queue:continue my-cloudtasks-connection:barbequeue');
39+
CloudTasksApi::assertQueueNotPaused('projects/my-test-project/locations/europe-west6/queues/barbequeue');
40+
}
41+
}

0 commit comments

Comments
 (0)