-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBeanstalkdStorage.php
More file actions
93 lines (81 loc) · 2.34 KB
/
BeanstalkdStorage.php
File metadata and controls
93 lines (81 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
<?php
namespace Slowmove\SimplePhpQueue\Storage\Adapters;
use Pheanstalk\Pheanstalk;
use Pheanstalk\Values\Job;
use Pheanstalk\Values\TubeName;
use Pheanstalk\Values\TubeStats;
use Slowmove\SimplePhpQueue\Storage\StorageInterface;
class BeanstalkdStorage implements StorageInterface
{
const DEFAULT_STORAGE_PATH = '127.0.0.1';
const DEFAULT_STORAGE_PORT = 11300;
private Pheanstalk $beanstalkdClient;
private TubeName $tube;
public function __construct(
string $connectionString = self::DEFAULT_STORAGE_PATH . ':' . self::DEFAULT_STORAGE_PORT,
string $tubeName = 'queue'
) {
$host = self::DEFAULT_STORAGE_PATH;
$port = self::DEFAULT_STORAGE_PORT;
if ($connectionString && strpos($connectionString, ":") > -1) {
$connectionStringParts = explode(':', $connectionString);
$host = $connectionStringParts[0];
$port = $connectionStringParts[1];
} else if ($connectionString) {
$host = $connectionString;
}
$this->beanstalkdClient = Pheanstalk::create($host, $port);
$this->tube = new TubeName($tubeName);
}
public function enqueue(string $data): bool
{
$this->beanstalkdClient->useTube($this->tube);
$jobId = $this->beanstalkdClient->put($data);
return !!$jobId;
}
public function dequeue(): ?string
{
$this->beanstalkdClient->watch($this->tube);
$job = $this->beanstalkdClient->reserveWithTimeout(5);
if ($job instanceof Job) {
$retval = $job->getData();
$this->beanstalkdClient->delete($job);
return $retval;
}
return null;
}
public function exist(string $value): bool
{
throw new \Exception('Not implemented yet');
}
public function length(): int
{
try {
$tubeStats = $this->beanstalkdClient->statsTube($this->tube);
if ($tubeStats instanceof TubeStats) {
return $tubeStats->totalJobs;
}
return 0;
} catch (\Throwable $th) {
return 0;
}
}
public function content(): array
{
if ($this->length() === 0) {
return [];
}
$jobs = [];
$items = [];
for ($i = 0; $i < $this->length(); $i++) {
$this->beanstalkdClient->watch($this->tube);
$job = $this->beanstalkdClient->reserve();
$items[$i] = $job->getData();
$jobs[] = $job;
}
foreach ($jobs as $job) {
$this->beanstalkdClient->release($job);
}
return $items;
}
}