Skip to content

Commit bf1a635

Browse files
committed
Merge pull request #75 from jamesmoey/feature/file_provider
File Provider
2 parents 70a39e0 + 0c58be2 commit bf1a635

3 files changed

Lines changed: 58 additions & 23 deletions

File tree

docs/file-provider.rst

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
File Provider
2+
-------------
3+
4+
The file provider uses the filesystem to dispatches and resolves queued messages.
5+
6+
Configuration
7+
^^^^^^^^^^^^^
8+
9+
To designate a queue as file, set the ``driver`` of its provider to ``file``. You will
10+
need to a read-able and write-able path to store the messages.
11+
12+
.. code-block:: yaml
13+
14+
#app/config_dev.yml
15+
16+
uecode_qpush:
17+
providers:
18+
file_based:
19+
driver: file
20+
path: [Path to store messages]
21+
queues:
22+
my_queue_name:
23+
provider: file_based

src/Provider/FileProvider.php

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@
1212
class FileProvider extends AbstractProvider
1313
{
1414
protected $filePointerList = [];
15+
protected $queuePath;
1516

1617
public function __construct($name, array $options, $client, Cache $cache, Logger $logger) {
1718
$this->name = $name;
19+
/** @var md5 only contain numeric and A to F, so it is file system safe */
20+
$this->queuePath = $options['path'].DIRECTORY_SEPARATOR.str_replace('-', '', hash('md5', $name));
1821
$this->options = $options;
1922
$this->cache = $cache;
2023
$this->logger = $logger;
@@ -28,8 +31,9 @@ public function getProvider()
2831
public function create()
2932
{
3033
$fs = new Filesystem();
31-
if (!$fs->exists($this->options['path'])) {
32-
return $fs->mkdir($this->options['path']);
34+
if (!$fs->exists($this->queuePath)) {
35+
$fs->mkdir($this->queuePath);
36+
return $fs->exists($this->queuePath);
3337
}
3438
return true;
3539
}
@@ -39,12 +43,12 @@ public function publish(array $message, array $options = [])
3943
$fileName = microtime(false);
4044
$fileName = str_replace(' ', '', $fileName);
4145
$path = substr(hash('md5', $fileName), 0, 3);
42-
if (!is_dir($this->options['path'].DIRECTORY_SEPARATOR.$path)) {
43-
mkdir($this->options['path'].DIRECTORY_SEPARATOR.$path);
46+
if (!is_dir($this->queuePath.DIRECTORY_SEPARATOR.$path)) {
47+
mkdir($this->queuePath.DIRECTORY_SEPARATOR.$path);
4448
}
4549
$fs = new Filesystem();
4650
$fs->dumpFile(
47-
$this->options['path'].DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.$fileName.'.json',
51+
$this->queuePath.DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.$fileName.'.json',
4852
json_encode($message)
4953
);
5054
return $fileName;
@@ -63,7 +67,7 @@ public function receive(array $options = [])
6367
->ignoreUnreadableDirs(true)
6468
->ignoreVCS(true)
6569
->name('*.json')
66-
->in($this->options['path'])
70+
->in($this->queuePath)
6771
;
6872
if ($this->options['message_delay'] > 0) {
6973
$finder->date(
@@ -82,7 +86,7 @@ public function receive(array $options = [])
8286
$id = substr($file->getFilename(), 0, -5);
8387
if (!isset($this->filePointerList[$id]) && flock($filePointer, LOCK_EX | LOCK_NB)) {
8488
$this->filePointerList[$id] = $filePointer;
85-
$messages[] = new Message($id, json_decode($file->getContents(), true), []);
89+
$messages[] = new Message($id, json_decode($file->getContents(), true), []);
8690
} else {
8791
fclose($filePointer);
8892
}
@@ -101,7 +105,7 @@ public function delete($id)
101105
$path = substr(hash('md5', (string)$fileName), 0, 3);
102106
$fs = new Filesystem();
103107
$fs->remove(
104-
$this->options['path'] . DIRECTORY_SEPARATOR . $path . DIRECTORY_SEPARATOR . $fileName . '.json'
108+
$this->queuePath . DIRECTORY_SEPARATOR . $path . DIRECTORY_SEPARATOR . $fileName . '.json'
105109
);
106110
fclose($this->filePointerList[$id]);
107111
unset($this->filePointerList[$id]);
@@ -118,7 +122,7 @@ public function cleanUp()
118122
$finder = new Finder();
119123
$finder
120124
->files()
121-
->in($this->options['path'])
125+
->in($this->queuePath)
122126
->ignoreDotFiles(true)
123127
->ignoreUnreadableDirs(true)
124128
->ignoreVCS(true)
@@ -137,8 +141,9 @@ public function cleanUp()
137141
public function destroy()
138142
{
139143
$fs = new Filesystem();
140-
$fs->remove($this->options['path']);
141-
return !is_dir($this->options['path']);
144+
$fs->remove($this->queuePath);
145+
$this->filePointerList = [];
146+
return !is_dir($this->queuePath);
142147
}
143148

144149
/**

tests/Provider/FileProviderTest.php

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,21 @@ class FileProviderTest extends \PHPUnit_Framework_TestCase
1212
{
1313
/** @var FileProvider */
1414
protected $provider;
15-
protected $path;
15+
protected $basePath;
16+
protected $queueHash;
1617
protected $umask;
1718

1819
public function setUp()
1920
{
2021
$this->umask = umask(0);
21-
$this->path = rtrim(sys_get_temp_dir(), DIRECTORY_SEPARATOR).DIRECTORY_SEPARATOR.time().rand(0, 1000);
22-
mkdir($this->path);
22+
$this->basePath = rtrim(sys_get_temp_dir(), DIRECTORY_SEPARATOR).DIRECTORY_SEPARATOR.time().rand(0, 1000);
23+
mkdir($this->basePath);
2324
$this->provider = $this->getFileProvider();
2425
}
2526

2627
public function tearDown()
2728
{
28-
$this->clean($this->path);
29+
$this->clean($this->basePath);
2930
umask($this->umask);
3031
}
3132

@@ -50,7 +51,7 @@ private function getFileProvider(array $options = [])
5051
{
5152
$options = array_merge(
5253
[
53-
'path' => $this->path,
54+
'path' => $this->basePath,
5455
'logging_enabled' => false,
5556
'message_delay' => 0,
5657
'message_timeout' => 30,
@@ -70,6 +71,8 @@ private function getFileProvider(array $options = [])
7071
'Symfony\Bridge\Monolog\Logger', [], ['qpush.test']
7172
);
7273

74+
$this->queueHash = str_replace('-', '', md5('test'));
75+
7376
return new FileProvider('test', $options, null, $cache, $logger);
7477
}
7578

@@ -83,18 +86,19 @@ public function testGetProvider()
8386
public function testCreate()
8487
{
8588
$this->assertTrue($this->provider->create());
86-
$this->assertTrue(is_readable($this->path));
87-
$this->assertTrue(is_writable($this->path));
89+
$this->assertTrue(is_readable($this->basePath.DIRECTORY_SEPARATOR.$this->queueHash));
90+
$this->assertTrue(is_writable($this->basePath.DIRECTORY_SEPARATOR.$this->queueHash));
8891
}
8992

9093
public function testDestroy()
9194
{
9295
$this->provider->destroy();
93-
$this->assertFalse(is_dir($this->path));
96+
$this->assertFalse(is_dir($this->basePath.DIRECTORY_SEPARATOR.$this->queueHash));
9497
}
9598

9699
public function testReceive()
97100
{
101+
$this->provider->create();
98102
$this->assertTrue(is_array($this->provider->receive()));
99103
}
100104

@@ -103,8 +107,8 @@ public function testDelete()
103107
$this->provider->create();
104108

105109
$path = substr(hash('md5', '123'), 0, 3);
106-
mkdir($this->path.DIRECTORY_SEPARATOR.$path);
107-
touch($this->path.DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.'123.json');
110+
mkdir($this->basePath.DIRECTORY_SEPARATOR.$this->queueHash.DIRECTORY_SEPARATOR.$path);
111+
touch($this->basePath.DIRECTORY_SEPARATOR.$this->queueHash.DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.'123.json');
108112

109113
$messages = $this->provider->receive();
110114
$this->assertNotEmpty($messages);
@@ -113,6 +117,7 @@ public function testDelete()
113117

114118
public function testPublish()
115119
{
120+
$this->provider->create();
116121
$content = [
117122
['testing'],
118123
['testing 123']
@@ -129,6 +134,7 @@ public function testPublish()
129134
}
130135

131136
public function testPublishDelay() {
137+
$this->provider->create();
132138
$provider = $this->getFileProvider([
133139
'message_delay' => 2,
134140
]);
@@ -142,16 +148,17 @@ public function testOnMessageReceived()
142148
$this->provider->create();
143149
$id = $this->provider->publish(['foo' => 'bar']);
144150
$path = substr(hash('md5', $id), 0, 3);
145-
$this->assertTrue(is_file($this->path.DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.$id.'.json'));
151+
$this->assertTrue(is_file($this->basePath.DIRECTORY_SEPARATOR.$this->queueHash.DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.$id.'.json'));
146152
$this->provider->onMessageReceived(new MessageEvent(
147153
'test',
148154
$this->provider->receive()[0]
149155
));
150-
$this->assertFalse(is_file($this->path.DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.$id.'.json'));
156+
$this->assertFalse(is_file($this->basePath.DIRECTORY_SEPARATOR.$this->queueHash.DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.$id.'.json'));
151157
}
152158

153159
public function testCleanUp()
154160
{
161+
$this->provider->create();
155162
$provider = $this->getFileProvider([
156163
'message_expiration' => 1,
157164
]);

0 commit comments

Comments
 (0)