Skip to content

Commit 70a39e0

Browse files
committed
Merge pull request #74 from jamesmoey/feature/file_provider
Feature/file provider
2 parents 83076e9 + c0dcf9c commit 70a39e0

9 files changed

Lines changed: 356 additions & 3 deletions

File tree

composer.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,15 @@
2828
"require-dev": {
2929
"phpunit/phpunit": "~3.7",
3030
"aws/aws-sdk-php": "~2.5",
31-
"iron-io/iron_mq": "~1.5"
31+
"iron-io/iron_mq": "~1.5",
32+
"symfony/finder": "~2.3",
33+
"symfony/filesystem": "~2.3"
3234
},
3335
"suggest": {
3436
"aws/aws-sdk-php": "Required to use AWS as a Queue Provider",
35-
"iron-io/iron_mq": "Required to use IronMQ as a Queue Provider"
37+
"iron-io/iron_mq": "Required to use IronMQ as a Queue Provider",
38+
"symfony/finder": "Required to use File as a Queue Provider",
39+
"symfony/filesystem": "Required to use File as a Queue Provider"
3640
},
3741
"autoload": {
3842
"psr-4": {

src/DependencyInjection/Configuration.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ private function getProvidersNode()
6060
'aws' => ['key', 'secret'],
6161
'ironmq' => ['token', 'project_id'],
6262
'sync' => [],
63-
'custom' => ['service']
63+
'custom' => ['service'],
64+
'file' => ['path']
6465
];
6566

6667
$node
@@ -97,6 +98,8 @@ private function getProvidersNode()
9798
->scalarNode('region')
9899
->defaultValue('us-east-1')
99100
->end()
101+
// File
102+
->scalarNode('path')->end()
100103
->end()
101104

102105
->validate()

src/DependencyInjection/UecodeQPushExtension.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ public function load(array $configs, ContainerBuilder $container)
8585
$class = $container->getParameter('uecode_qpush.provider.custom');
8686
$client = $this->createCustomClient($config['providers'][$provider]['service']);
8787
break;
88+
case 'file':
89+
$class = $container->getParameter('uecode_qpush.provider.file');
90+
$values['options']['path'] = $config['providers'][$provider]['path'];
91+
break;
8892
}
8993

9094
$definition = new Definition(

src/Provider/FileProvider.php

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
<?php
2+
namespace Uecode\Bundle\QPushBundle\Provider;
3+
4+
use Doctrine\Common\Cache\Cache;
5+
use Symfony\Bridge\Monolog\Logger;
6+
use Symfony\Component\Filesystem\Filesystem;
7+
use Symfony\Component\Finder\Finder;
8+
use Symfony\Component\Finder\SplFileInfo;
9+
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
10+
use Uecode\Bundle\QPushBundle\Message\Message;
11+
12+
class FileProvider extends AbstractProvider
13+
{
14+
protected $filePointerList = [];
15+
16+
public function __construct($name, array $options, $client, Cache $cache, Logger $logger) {
17+
$this->name = $name;
18+
$this->options = $options;
19+
$this->cache = $cache;
20+
$this->logger = $logger;
21+
}
22+
23+
public function getProvider()
24+
{
25+
return 'File';
26+
}
27+
28+
public function create()
29+
{
30+
$fs = new Filesystem();
31+
if (!$fs->exists($this->options['path'])) {
32+
return $fs->mkdir($this->options['path']);
33+
}
34+
return true;
35+
}
36+
37+
public function publish(array $message, array $options = [])
38+
{
39+
$fileName = microtime(false);
40+
$fileName = str_replace(' ', '', $fileName);
41+
$path = substr(hash('md5', $fileName), 0, 3);
42+
if (!is_dir($this->options['path'].DIRECTORY_SEPARATOR.$path)) {
43+
mkdir($this->options['path'].DIRECTORY_SEPARATOR.$path);
44+
}
45+
$fs = new Filesystem();
46+
$fs->dumpFile(
47+
$this->options['path'].DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.$fileName.'.json',
48+
json_encode($message)
49+
);
50+
return $fileName;
51+
}
52+
53+
/**
54+
* @param array $options
55+
* @return Message[]
56+
*/
57+
public function receive(array $options = [])
58+
{
59+
$finder = new Finder();
60+
$finder
61+
->files()
62+
->ignoreDotFiles(true)
63+
->ignoreUnreadableDirs(true)
64+
->ignoreVCS(true)
65+
->name('*.json')
66+
->in($this->options['path'])
67+
;
68+
if ($this->options['message_delay'] > 0) {
69+
$finder->date(
70+
sprintf('< %d seconds ago', $this->options['message_delay'])
71+
);
72+
}
73+
$finder
74+
->date(
75+
sprintf('> %d seconds ago', $this->options['message_expiration'])
76+
)
77+
;
78+
$messages = [];
79+
/** @var SplFileInfo $file */
80+
foreach ($finder as $file) {
81+
$filePointer = fopen($file->getRealPath(), 'r+');
82+
$id = substr($file->getFilename(), 0, -5);
83+
if (!isset($this->filePointerList[$id]) && flock($filePointer, LOCK_EX | LOCK_NB)) {
84+
$this->filePointerList[$id] = $filePointer;
85+
$messages[] = new Message($id, json_decode($file->getContents(), true), []);
86+
} else {
87+
fclose($filePointer);
88+
}
89+
if (count($messages) === (int) $this->options['messages_to_receive']) {
90+
break;
91+
}
92+
}
93+
return $messages;
94+
}
95+
96+
public function delete($id)
97+
{
98+
$success = false;
99+
if (isset($this->filePointerList[$id])) {
100+
$fileName = $id;
101+
$path = substr(hash('md5', (string)$fileName), 0, 3);
102+
$fs = new Filesystem();
103+
$fs->remove(
104+
$this->options['path'] . DIRECTORY_SEPARATOR . $path . DIRECTORY_SEPARATOR . $fileName . '.json'
105+
);
106+
fclose($this->filePointerList[$id]);
107+
unset($this->filePointerList[$id]);
108+
$success = true;
109+
}
110+
if (rand(1,10) === 5) {
111+
$this->cleanUp();
112+
}
113+
return $success;
114+
}
115+
116+
public function cleanUp()
117+
{
118+
$finder = new Finder();
119+
$finder
120+
->files()
121+
->in($this->options['path'])
122+
->ignoreDotFiles(true)
123+
->ignoreUnreadableDirs(true)
124+
->ignoreVCS(true)
125+
->depth('< 2')
126+
->name('*.json')
127+
;
128+
$finder->date(
129+
sprintf('> %d seconds ago', $this->options['message_expiration'])
130+
);
131+
/** @var SplFileInfo $file */
132+
foreach ($finder as $file) {
133+
@unlink($file->getRealPath());
134+
}
135+
}
136+
137+
public function destroy()
138+
{
139+
$fs = new Filesystem();
140+
$fs->remove($this->options['path']);
141+
return !is_dir($this->options['path']);
142+
}
143+
144+
/**
145+
* Removes the message from queue after all other listeners have fired
146+
*
147+
* If an earlier listener has erred or stopped propagation, this method
148+
* will not fire and the Queued Message should become visible in queue again.
149+
*
150+
* Stops Event Propagation after removing the Message
151+
*
152+
* @param MessageEvent $event The SQS Message Event
153+
* @return bool|void
154+
*/
155+
public function onMessageReceived(MessageEvent $event)
156+
{
157+
$id = $event->getMessage()->getId();
158+
$this->delete($id);
159+
$event->stopPropagation();
160+
}
161+
}

src/Resources/config/parameters.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ parameters:
55
uecode_qpush.provider.ironmq: Uecode\Bundle\QPushBundle\Provider\IronMqProvider
66
uecode_qpush.provider.sync: Uecode\Bundle\QPushBundle\Provider\SyncProvider
77
uecode_qpush.provider.custom: Uecode\Bundle\QPushBundle\Provider\CustomProvider
8+
uecode_qpush.provider.file: Uecode\Bundle\QPushBundle\Provider\FileProvider

tests/DependencyInjection/UecodeQPushExtensionTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public function testConfiguration()
6767
$this->assertTrue($this->container->has('uecode_qpush'));
6868

6969
$this->assertTrue($this->container->has('uecode_qpush.test_aws'));
70+
$this->assertTrue($this->container->has('uecode_qpush.test_file'));
7071
$this->assertTrue($this->container->has('uecode_qpush.test_secondary_aws'));
7172
$this->assertNotSame(
7273
$this->container->get('uecode_qpush.test_aws'),

tests/Fixtures/config_test.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,18 @@ uecode_qpush:
2020
driver: ironmq
2121
token: 234
2222
project_id: 234
23+
file:
24+
driver: file
25+
path: /tmp/my_queue
2326
queues:
27+
test_file:
28+
provider: file
29+
options:
30+
message_delay: 0
31+
message_timeout: 30
32+
message_expiration: 604800
33+
messages_to_receive: 1
34+
receive_wait_time: 3
2435
test_aws:
2536
provider: aws
2637
options:

0 commit comments

Comments
 (0)