Skip to content

Commit 7e852d4

Browse files
committed
feat(pheanstalk): Compatibility with v5.0
1 parent 1a4b10d commit 7e852d4

7 files changed

Lines changed: 162 additions & 43 deletions

File tree

.github/workflows/php.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
runs-on: ubuntu-latest
4545
strategy:
4646
matrix:
47-
pheanstalk-versions: ['3.2.1', '4.0']
47+
pheanstalk-versions: ['3.2.1', '4.0', '5.0']
4848
name: Pheanstalk ${{ matrix.pheanstalk-versions }}
4949

5050
steps:

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"enqueue/fs": "~0.9",
3838
"league/container": "~3.0",
3939
"monolog/monolog": "~2.0",
40-
"pda/pheanstalk": "~3.2|~4.0",
40+
"pda/pheanstalk": "~3.2|~4.0|~5.0",
4141
"php-amqplib/php-amqplib": "~3.0",
4242
"phpbench/phpbench": "~0.0|~1.0",
4343
"phpunit/phpunit": "~9.6",

src/Connection/Pheanstalk/PheanstalkConnection.php

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,30 @@
1111
use Bdf\Queue\Message\MessageSerializationTrait;
1212
use Bdf\Queue\Serializer\SerializerInterface;
1313
use Bdf\Queue\Util\MultiServer;
14+
use Pheanstalk\Contract\PheanstalkManagerInterface;
15+
use Pheanstalk\Contract\PheanstalkPublisherInterface;
16+
use Pheanstalk\Contract\PheanstalkSubscriberInterface;
1417
use Pheanstalk\Pheanstalk;
1518
use Pheanstalk\Contract\PheanstalkInterface;
1619

20+
use Pheanstalk\Values\Timeout;
21+
1722
use function class_alias;
23+
use function class_exists;
1824
use function fclose;
1925
use function fsockopen;
2026
use function interface_exists;
2127
use function method_exists;
2228

23-
// Support for Pheanstalk 3
2429
if (!interface_exists(PheanstalkInterface::class)) {
25-
/** @psalm-suppress UndefinedClass */
26-
class_alias(\Pheanstalk\PheanstalkInterface::class, PheanstalkInterface::class);
30+
// Support for Pheanstalk 3
31+
if (interface_exists(\Pheanstalk\PheanstalkInterface::class)) {
32+
/** @psalm-suppress UndefinedClass */
33+
class_alias(\Pheanstalk\PheanstalkInterface::class, PheanstalkInterface::class);
34+
} else {
35+
// Support for Pheanstalk 5
36+
class_alias(PheanstalkManagerInterface::class, PheanstalkInterface::class);
37+
}
2738
}
2839

2940
/**
@@ -34,6 +45,9 @@ class PheanstalkConnection implements ConnectionDriverInterface
3445
use ConnectionNamed;
3546
use MessageSerializationTrait;
3647

48+
public const DEFAULT_PORT = 11300;
49+
public const DEFAULT_TTR = 60; // 1 minute
50+
3751
/**
3852
* @var PheanstalkInterface
3953
*/
@@ -61,8 +75,8 @@ public function __construct(string $name, SerializerInterface $serializer)
6175
*/
6276
public function setConfig(array $config): void
6377
{
64-
$this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', PheanstalkInterface::DEFAULT_PORT) + [
65-
'ttr' => PheanstalkInterface::DEFAULT_TTR,
78+
$this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', self::DEFAULT_PORT) + [
79+
'ttr' => self::DEFAULT_TTR,
6680
'client-timeout' => null,
6781
];
6882
}
@@ -89,14 +103,21 @@ public function timeToRun(): ?int
89103
* @return PheanstalkInterface
90104
* @throws ServerNotAvailableException If no servers has been found
91105
*/
92-
public function pheanstalk(): PheanstalkInterface
106+
public function pheanstalk()
93107
{
94108
if ($this->pheanstalk === null) {
95109
// Set the first available server
96110
// Pheanstalk manage a lazy connection. We can instantiate the client here.
97111
foreach ($this->getActiveHost() as $host => $port) {
98112
if (method_exists(Pheanstalk::class, 'create')) {
99-
$this->pheanstalk = Pheanstalk::create($host, (int) $port, (int) ($this->config['client-timeout'] ?? 10));
113+
$timeout = (int) ($this->config['client-timeout'] ?? 10);
114+
115+
if (class_exists(Timeout::class)) {
116+
// Pheanstalk 5
117+
$timeout = new Timeout($timeout);
118+
}
119+
120+
$this->pheanstalk = Pheanstalk::create($host, (int) $port, $timeout);
100121
} else {
101122
// Pheanstalk 3
102123
/** @psalm-suppress InvalidArgument */

src/Connection/Pheanstalk/PheanstalkQueue.php

Lines changed: 90 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,18 @@
1616
use Bdf\Queue\Message\QueuedMessage;
1717
use Exception;
1818
use Pheanstalk\Exception\ClientException;
19+
use Pheanstalk\Exception\ConnectionException as PheanstalkConnectionException;
1920
use Pheanstalk\Exception\ServerException as BaseServerException;
2021
use Pheanstalk\Exception\SocketException;
2122
use Pheanstalk\Job as PheanstalkJob;
2223
use Pheanstalk\Pheanstalk;
2324

25+
use Pheanstalk\Values\Job as Pheanstalk5Job;
26+
use Pheanstalk\Values\TubeName;
27+
28+
use Pheanstalk\Values\TubeStats;
29+
30+
use function class_exists;
2431
use function method_exists;
2532

2633
/**
@@ -48,15 +55,23 @@ public function push(Message $message): void
4855
{
4956
$message->setQueuedAt(new \DateTimeImmutable());
5057
$pheanstalk = $this->connection->pheanstalk();
58+
$queue = $message->queue();
59+
60+
if (class_exists(TubeName::class)) {
61+
// Support for Pheanstalk 5
62+
$queue = new TubeName($queue);
63+
}
5164

5265
try {
53-
$pheanstalk->useTube($message->queue())->put(
66+
$pheanstalk->useTube($queue);
67+
68+
$pheanstalk->put(
5469
$this->connection->serializer()->serialize($message),
5570
$message->header('priority', Pheanstalk::DEFAULT_PRIORITY),
5671
$message->delay(),
5772
$message->header('ttr', $this->connection->timeToRun())
5873
);
59-
} catch (SocketException $e) {
74+
} catch (SocketException|PheanstalkConnectionException $e) {
6075
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
6176
} catch (BaseServerException $e) {
6277
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -72,14 +87,21 @@ public function pushRaw($raw, string $queue, int $delay = 0): void
7287
{
7388
$pheanstalk = $this->connection->pheanstalk();
7489

90+
if (class_exists(TubeName::class)) {
91+
// Support for Pheanstalk 5
92+
$queue = new TubeName($queue);
93+
}
94+
7595
try {
76-
$pheanstalk->useTube($queue)->put(
96+
$pheanstalk->useTube($queue);
97+
98+
$pheanstalk->put(
7799
$raw,
78100
Pheanstalk::DEFAULT_PRIORITY,
79101
$delay,
80102
$this->connection->timeToRun()
81103
);
82-
} catch (SocketException $e) {
104+
} catch (SocketException|PheanstalkConnectionException $e) {
83105
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
84106
} catch (BaseServerException $e) {
85107
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -95,24 +117,41 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU
95117
{
96118
$pheanstalk = $this->connection->pheanstalk();
97119

120+
if (class_exists(TubeName::class)) {
121+
// Support for Pheanstalk 5
122+
$queue = new TubeName($queue);
123+
}
124+
98125
try {
99-
$pheanstalk = $pheanstalk->watchOnly($queue);
126+
if (method_exists($pheanstalk, 'watchOnly')) {
127+
// Pheanstalk < 5
128+
$pheanstalk->watchOnly($queue);
129+
} else {
130+
// Pheanstalk 5
131+
$pheanstalk->watch($queue);
132+
133+
foreach ($pheanstalk->listTubesWatched() as $tube) {
134+
if ($tube != $queue) {
135+
$pheanstalk->ignore($tube);
136+
}
137+
}
138+
}
100139

101140
if (method_exists($pheanstalk, 'reserveWithTimeout')) {
102141
$job = $pheanstalk->reserveWithTimeout($duration);
103142
} else {
104143
// Support for Pheanstalk 3
105144
$job = $pheanstalk->reserve($duration);
106145
}
107-
} catch (SocketException $e) {
146+
} catch (SocketException|PheanstalkConnectionException $e) {
108147
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
109148
} catch (BaseServerException $e) {
110149
throw new ServerException($e->getMessage(), $e->getCode(), $e);
111150
} catch (ClientException $e) {
112151
throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
113152
}
114153

115-
if (!$job instanceof PheanstalkJob) {
154+
if (!$job instanceof PheanstalkJob && !$job instanceof Pheanstalk5Job) {
116155
return null;
117156
}
118157

@@ -128,7 +167,7 @@ public function acknowledge(QueuedMessage $message): void
128167
{
129168
try {
130169
$this->connection->pheanstalk()->delete($message->internalJob());
131-
} catch (SocketException $e) {
170+
} catch (SocketException|PheanstalkConnectionException $e) {
132171
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
133172
} catch (BaseServerException $e) {
134173
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -148,7 +187,7 @@ public function release(QueuedMessage $message): void
148187
$message->header('priority', Pheanstalk::DEFAULT_PRIORITY),
149188
$message->delay()
150189
);
151-
} catch (SocketException $e) {
190+
} catch (SocketException|PheanstalkConnectionException $e) {
152191
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
153192
} catch (BaseServerException $e) {
154193
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -162,8 +201,19 @@ public function release(QueuedMessage $message): void
162201
*/
163202
public function count(string $name): int
164203
{
204+
if (class_exists(TubeName::class)) {
205+
// Support for Pheanstalk 5
206+
$name = new TubeName($name);
207+
}
208+
165209
try {
166-
return $this->connection->pheanstalk()->statsTube($name)['current-jobs-ready'];
210+
$stats = $this->connection->pheanstalk()->statsTube($name);
211+
212+
if ($stats instanceof TubeStats) {
213+
return $stats->currentJobsReady;
214+
} else {
215+
return $stats['current-jobs-ready'];
216+
}
167217
} catch (Exception $e) {
168218
return 0;
169219
}
@@ -186,7 +236,7 @@ public function stats(): array
186236
try {
187237
$queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk, $host, $port));
188238
$workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk, $host, $port));
189-
} catch (SocketException $e) {
239+
} catch (SocketException|PheanstalkConnectionException $e) {
190240
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
191241
} catch (BaseServerException $e) {
192242
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -214,21 +264,37 @@ private function queuesInfo($pheanstalk, string $host, int $port): array
214264

215265
foreach ($pheanstalk->listTubes() as $tube) {
216266
try {
217-
/** @var \Pheanstalk\Response\ArrayResponse $stats */
267+
/** @var \Pheanstalk\Response\ArrayResponse|TubeStats $stats */
218268
$stats = $pheanstalk->statsTube($tube);
219269

220-
$status[] = [
221-
'host' => $host.':'.$port,
222-
'queue' => $stats['name'],
223-
'jobs in queue' => $stats['current-jobs-ready'],
224-
'jobs running' => $stats['current-jobs-reserved'],
225-
'jobs delayed' => $stats['current-jobs-delayed'],
226-
// 'jobs buried' => $stats['current-jobs-buried'],
227-
'total jobs' => $stats['total-jobs'],
228-
// 'workers using' => $stats['current-using'],
229-
'workers waiting' => $stats['current-waiting'],
230-
'workers watching' => --$stats['current-watching'], // remove the monitoring
231-
];
270+
if ($stats instanceof TubeStats) {
271+
// Pheanstalk 5
272+
$status[] = [
273+
'host' => $host.':'.$port,
274+
'queue' => $stats->name->value,
275+
'jobs in queue' => $stats->currentJobsReady,
276+
'jobs running' => $stats->currentJobsReserved,
277+
'jobs delayed' => $stats->currentJobsDelayed,
278+
// 'jobs buried' => $stats['current-jobs-buried'],
279+
'total jobs' => $stats->totalJobs,
280+
// 'workers using' => $stats['current-using'],
281+
'workers waiting' => $stats->currentWaiting,
282+
'workers watching' => $stats->currentWatching - 1, // remove the monitoring
283+
];
284+
} else {
285+
$status[] = [
286+
'host' => $host.':'.$port,
287+
'queue' => $stats['name'],
288+
'jobs in queue' => $stats['current-jobs-ready'],
289+
'jobs running' => $stats['current-jobs-reserved'],
290+
'jobs delayed' => $stats['current-jobs-delayed'],
291+
// 'jobs buried' => $stats['current-jobs-buried'],
292+
'total jobs' => $stats['total-jobs'],
293+
// 'workers using' => $stats['current-using'],
294+
'workers waiting' => $stats['current-waiting'],
295+
'workers watching' => --$stats['current-watching'], // remove the monitoring
296+
];
297+
}
232298
} catch (Exception $e) {
233299
// tube not found
234300
}

tests/Connection/Pheanstalk/PheanstalkConnectionTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
use Bdf\Queue\Serializer\JsonSerializer;
77
use Pheanstalk\Connection;
88
use Pheanstalk\Contract\PheanstalkInterface;
9+
use Pheanstalk\Contract\PheanstalkSubscriberInterface;
910
use PHPUnit\Framework\MockObject\MockObject;
1011
use PHPUnit\Framework\TestCase;
1112

1213
use function class_exists;
14+
use function interface_exists;
1315
use function method_exists;
1416

1517
/**
@@ -34,6 +36,10 @@ class PheanstalkConnectionTest extends TestCase
3436
*/
3537
public function setUp(): void
3638
{
39+
if (interface_exists(PheanstalkSubscriberInterface::class)) {
40+
$this->markTestSkipped('Pheanstalk >= 5 is not supported');
41+
}
42+
3743
class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined
3844
$this->pheanstalk = $this->createMock(PheanstalkInterface::class);
3945

0 commit comments

Comments
 (0)