Skip to content

Commit 073d68f

Browse files
committed
feat(pheanstalk): Compatibility with v4.0
1 parent 32da10c commit 073d68f

7 files changed

Lines changed: 83 additions & 22 deletions

File tree

.github/workflows/php.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
runs-on: ubuntu-latest
4545
strategy:
4646
matrix:
47-
pheanstalk-versions: ['3.2.1']
47+
pheanstalk-versions: ['3.2.1', '4.0']
4848
name: Pheanstalk ${{ matrix.pheanstalk-versions }}
4949

5050
steps:

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"enqueue/fs": "~0.9",
3838
"league/container": "~3.0",
3939
"monolog/monolog": "~2.0",
40-
"pda/pheanstalk": "~3.2",
40+
"pda/pheanstalk": "~3.2|~4.0",
4141
"php-amqplib/php-amqplib": "~3.0",
4242
"phpbench/phpbench": "~0.0|~1.0",
4343
"phpunit/phpunit": "~9.6",

src/Connection/Pheanstalk/PheanstalkConnection.php

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,19 @@
1111
use Bdf\Queue\Message\MessageSerializationTrait;
1212
use Bdf\Queue\Serializer\SerializerInterface;
1313
use Bdf\Queue\Util\MultiServer;
14-
use Pheanstalk\Connection;
1514
use Pheanstalk\Pheanstalk;
16-
use Pheanstalk\PheanstalkInterface;
15+
use Pheanstalk\Contract\PheanstalkInterface;
16+
17+
use function class_alias;
18+
use function fclose;
19+
use function fsockopen;
20+
use function interface_exists;
21+
use function method_exists;
22+
23+
// Support for Pheanstalk 3
24+
if (!interface_exists(PheanstalkInterface::class)) {
25+
class_alias(\Pheanstalk\PheanstalkInterface::class, PheanstalkInterface::class);
26+
}
1727

1828
/**
1929
* PheanstalkConnection
@@ -84,7 +94,12 @@ public function pheanstalk(): PheanstalkInterface
8494
// Set the first available server
8595
// Pheanstalk manage a lazy connection. We can instantiate the client here.
8696
foreach ($this->getActiveHost() as $host => $port) {
87-
$this->pheanstalk = new Pheanstalk($host, $port, $this->config['client-timeout']);
97+
if (method_exists(Pheanstalk::class, 'create')) {
98+
$this->pheanstalk = Pheanstalk::create($host, (int) $port, (int) ($this->config['client-timeout'] ?? 10));
99+
} else {
100+
// Pheanstalk 3
101+
$this->pheanstalk = new Pheanstalk($host, $port, $this->config['client-timeout']);
102+
}
88103
break;
89104
}
90105
}
@@ -108,7 +123,10 @@ public function setPheanstalk(PheanstalkInterface $pheanstalk)
108123
public function close(): void
109124
{
110125
if ($this->pheanstalk !== null) {
111-
$this->pheanstalk->getConnection()->disconnect();
126+
if (method_exists($this->pheanstalk, 'getConnection')) {
127+
$this->pheanstalk->getConnection()->disconnect();
128+
}
129+
112130
$this->pheanstalk = null;
113131
}
114132
}
@@ -143,7 +161,10 @@ public function getActiveHost()
143161
$valid = [];
144162

145163
foreach ($this->config['hosts'] as $host => $port) {
146-
if ((new Connection($host, $port))->isServiceListening()) {
164+
$stream = @fsockopen($host, $port, $errno, $errstr, 0.1);
165+
166+
if ($stream !== false) {
167+
fclose($stream);
147168
$valid[$host] = $port;
148169
}
149170
}

src/Connection/Pheanstalk/PheanstalkQueue.php

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
use Pheanstalk\Job as PheanstalkJob;
2222
use Pheanstalk\Pheanstalk;
2323

24+
use function method_exists;
25+
2426
/**
2527
* PheanstalkDriver
2628
*/
@@ -94,7 +96,14 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU
9496
$pheanstalk = $this->connection->pheanstalk();
9597

9698
try {
97-
$job = $pheanstalk->watchOnly($queue)->reserve($duration);
99+
$pheanstalk = $pheanstalk->watchOnly($queue);
100+
101+
if (method_exists($pheanstalk, 'reserveWithTimeout')) {
102+
$job = $pheanstalk->reserveWithTimeout($duration);
103+
} else {
104+
// Support for Pheanstalk 3
105+
$job = $pheanstalk->reserve($duration);
106+
}
98107
} catch (SocketException $e) {
99108
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
100109
} catch (BaseServerException $e) {
@@ -169,11 +178,14 @@ public function stats(): array
169178
$workersInfo = [];
170179

171180
foreach ($this->connection->getActiveHost() as $host => $port) {
172-
$pheanstalk = new Pheanstalk($host, $port);
181+
$pheanstalk = method_exists(Pheanstalk::class, 'create')
182+
? Pheanstalk::create($host, (int) $port)
183+
: new Pheanstalk($host, $port)
184+
;
173185

174186
try {
175-
$queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk));
176-
$workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk));
187+
$queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk, $host, $port));
188+
$workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk, $host, $port));
177189
} catch (SocketException $e) {
178190
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
179191
} catch (BaseServerException $e) {
@@ -196,7 +208,7 @@ public function stats(): array
196208
*
197209
* @return array
198210
*/
199-
private function queuesInfo($pheanstalk)
211+
private function queuesInfo($pheanstalk, string $host, int $port): array
200212
{
201213
$status = [];
202214

@@ -206,7 +218,7 @@ private function queuesInfo($pheanstalk)
206218
$stats = $pheanstalk->statsTube($tube);
207219

208220
$status[] = [
209-
'host' => $pheanstalk->getConnection()->getHost().':'.$pheanstalk->getConnection()->getPort(),
221+
'host' => $host.':'.$port,
210222
'queue' => $stats['name'],
211223
'jobs in queue' => $stats['current-jobs-ready'],
212224
'jobs running' => $stats['current-jobs-reserved'],
@@ -232,13 +244,13 @@ private function queuesInfo($pheanstalk)
232244
*
233245
* @return array
234246
*/
235-
private function workersInfo($pheanstalk)
247+
private function workersInfo($pheanstalk, string $host, int $port): array
236248
{
237249
$jobs = [];
238250

239251
foreach ($pheanstalk->listTubes() as $tube) {
240252
$job = [
241-
'host' => $pheanstalk->getConnection()->getHost().':'.$pheanstalk->getConnection()->getPort(),
253+
'host' => $host.':'.$port,
242254
'queue' => $tube,
243255
'job ready id' => '',
244256
'job ready data' => '',

tests/Connection/Pheanstalk/PheanstalkConnectionTest.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
use Bdf\Queue\Connection\Generic\GenericTopic;
66
use Bdf\Queue\Serializer\JsonSerializer;
77
use Pheanstalk\Connection;
8-
use Pheanstalk\PheanstalkInterface;
8+
use Pheanstalk\Contract\PheanstalkInterface;
99
use PHPUnit\Framework\MockObject\MockObject;
1010
use PHPUnit\Framework\TestCase;
1111

12+
use function class_exists;
13+
use function method_exists;
14+
1215
/**
1316
* @group Bdf_Queue
1417
* @group Bdf_Queue_Connection
@@ -31,6 +34,7 @@ class PheanstalkConnectionTest extends TestCase
3134
*/
3235
public function setUp(): void
3336
{
37+
class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined
3438
$this->pheanstalk = $this->createMock(PheanstalkInterface::class);
3539

3640
$this->connection = new PheanstalkConnection('foo', new JsonSerializer());
@@ -70,6 +74,10 @@ public function test_set_get_pheanstalk()
7074
*/
7175
public function test_close()
7276
{
77+
if (!method_exists($this->pheanstalk, 'getConnection')) {
78+
$this->markTestSkipped('Pheanstalk < 4.0 does not support connection closing');
79+
}
80+
7381
$connection = $this->createMock(Connection::class);
7482
$connection->expects($this->once())->method('disconnect');
7583
$this->pheanstalk->expects($this->once())->method('getConnection')->willReturn($connection);

tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public function test_queuePushPop()
4848

4949
$message->acknowledge();
5050

51-
$this->assertNull($queue->pop('test'));
51+
$this->assertNull($queue->pop('test', 1));
5252
}
5353

5454
public function test_queueRelease()

tests/Connection/Pheanstalk/PheanstalkQueueTest.php

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@
88
use Bdf\Queue\Message\Message;
99
use Bdf\Queue\Message\QueuedMessage;
1010
use Bdf\Queue\Serializer\JsonSerializer;
11+
use Pheanstalk\Contract\PheanstalkInterface;
12+
use Pheanstalk\Contract\ResponseInterface;
1113
use Pheanstalk\Exception\SocketException;
1214
use Pheanstalk\Job as PheanstalkJob;
1315
use Pheanstalk\Pheanstalk;
14-
use Pheanstalk\PheanstalkInterface;
16+
use Pheanstalk\Response\ArrayResponse;
1517
use PHPUnit\Framework\MockObject\MockObject;
1618
use PHPUnit\Framework\TestCase;
1719

20+
use function class_exists;
21+
use function method_exists;
22+
1823
/**
1924
* @group Bdf_Queue
2025
* @group Bdf_Queue_Connection
@@ -37,6 +42,7 @@ class PheanstalkQueueTest extends TestCase
3742
*/
3843
public function setUp(): void
3944
{
45+
class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined
4046
$this->pheanstalk = $this->createMock(PheanstalkInterface::class);
4147

4248
$connection = new PheanstalkConnection('foo', new JsonSerializer());
@@ -139,7 +145,12 @@ public function test_pop()
139145
$job->expects($this->once())->method('getData')->willReturn('{"data":"foo"}');
140146

141147
$this->pheanstalk->expects($this->once())->method('watchOnly')->with('queue')->willReturnSelf();
142-
$this->pheanstalk->expects($this->once())->method('reserve')->with(1)->willReturn($job);
148+
149+
if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) {
150+
$this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->with(1)->willReturn($job);
151+
} else {
152+
$this->pheanstalk->expects($this->once())->method('reserve')->with(1)->willReturn($job);
153+
}
143154

144155
$message = $this->driver->pop('queue', 1)->message();
145156

@@ -156,7 +167,12 @@ public function test_pop()
156167
public function test_pop_end_of_queue()
157168
{
158169
$this->pheanstalk->expects($this->once())->method('watchOnly')->willReturnSelf();
159-
$this->pheanstalk->expects($this->once())->method('reserve')->willReturn(null);
170+
171+
if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) {
172+
$this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->willReturn(null);
173+
} else {
174+
$this->pheanstalk->expects($this->once())->method('reserve')->willReturn(null);
175+
}
160176

161177
$this->assertSame(null, $this->driver->pop('queue', 1));
162178
}
@@ -168,7 +184,11 @@ public function test_pop_error($expected, $internal)
168184
{
169185
$this->expectException($expected);
170186
$this->pheanstalk->expects($this->once())->method('watchOnly')->willReturnSelf();
171-
$this->pheanstalk->expects($this->once())->method('reserve')->willThrowException($internal);
187+
if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) {
188+
$this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->willThrowException($internal);
189+
} else {
190+
$this->pheanstalk->expects($this->once())->method('reserve')->willThrowException($internal);
191+
}
172192

173193
$this->driver->pop('queue', 1);
174194
}
@@ -236,7 +256,7 @@ public function test_release_error($expected, $internal)
236256
*/
237257
public function test_count()
238258
{
239-
$this->pheanstalk->expects($this->once())->method('statsTube')->with('queue')->willReturn(['current-jobs-ready' => 1]);
259+
$this->pheanstalk->expects($this->once())->method('statsTube')->with('queue')->willReturn(new ArrayResponse('', ['current-jobs-ready' => 1]));
240260

241261
$this->assertSame(1, $this->driver->count('queue'));
242262
}

0 commit comments

Comments
 (0)