-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLargeArrayBuffer.php
More file actions
286 lines (260 loc) · 7.84 KB
/
LargeArrayBuffer.php
File metadata and controls
286 lines (260 loc) · 7.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
<?php
declare(strict_types=1);
namespace LargeArrayBuffer;
/**
* @template E of object|array|scalar|null
* @implements ArrayBufferInterface<E>
* @author Andreas Wahlen
*/
class LargeArrayBuffer implements ArrayBufferInterface {
public const SERIALIZER_PHP = 1;
public const SERIALIZER_IGBINARY = 2;
public const SERIALIZER_MSGPACK = 3;
public const COMPRESSION_NONE = 0;
public const COMPRESSION_GZIP = 1;
public const COMPRESSION_LZ4 = 2;
/**
* @readonly
* @var self::SERIALIZER_*
*/
private int $serializer;
/**
* @readonly
* @var self::COMPRESSION_*
*/
private int $compression;
/**
* @var resource
*/
private $stream;
/**
* @var int<0, max>
*/
private int $count = 0;
/**
* @var int<0, max>
*/
private int $index = 0;
private ?string $current = null;
/**
* @param int $maxMemoryMiB maximum memory usage in MiB, when more data is pushed, disk space is used
* @psalm-param int<1,max> $maxMemoryMiB
* @psalm-param self::SERIALIZER_* $serializer
* @psalm-param self::COMPRESSION_* $compression
* @throws \InvalidArgumentException if an unsupported serialization was requested
* @throws \InvalidArgumentException if an unsupported compression was requested
* @throws \RuntimeException if php://temp could not be opened
*/
public function __construct(int $maxMemoryMiB = 1024, int $serializer = self::SERIALIZER_PHP, int $compression = self::COMPRESSION_NONE) {
$this->serializer = $serializer;
if($this->serializer === self::SERIALIZER_IGBINARY && !extension_loaded('igbinary')){
throw new \InvalidArgumentException('igbinary serializer was requested, but ext-igbinary is not loaded');
}
if($this->serializer === self::SERIALIZER_MSGPACK && !extension_loaded('msgpack')){
throw new \InvalidArgumentException('msgpack serializer was requested, but ext-msgpack is not loaded');
}
$this->compression = $compression;
if($this->compression === self::COMPRESSION_LZ4 && !extension_loaded('lz4')){
throw new \InvalidArgumentException('LZ4 compression was requested, but ext-lz4 is not loaded');
}
$stream = fopen('php://temp/maxmemory:'.($maxMemoryMiB * 1024 * 1024), 'r+');
if($stream === false) {
throw new \RuntimeException('failed to open php://temp file descriptor');
}
$this->stream = $stream;
}
/**
* @psalm-param E $item
* @throws \RuntimeException if unable to write to php://temp, the serialization failed or the compression failed
*/
public function push(mixed $item): void {
$serialized = match($this->serializer){
self::SERIALIZER_IGBINARY => igbinary_serialize($item),
self::SERIALIZER_MSGPACK => msgpack_serialize($item),
default => serialize($item)
};
if($serialized === false){
throw new \RuntimeException('failed to serialize data');
}
/** @var string|false $compressed */
$compressed = match($this->compression){
self::COMPRESSION_GZIP => gzdeflate($serialized),
self::COMPRESSION_LZ4 => lz4_compress($serialized),
default => $serialized
};
if($compressed === false){
throw new \RuntimeException('failed to compress data');
}
$res = fwrite($this->stream, addcslashes($compressed, "\\\r\n")."\n");
if($res === false){
throw new \RuntimeException('could not write to php://temp');
}
$this->index++;
$this->count++;
}
public function rewind(): void {
fseek($this->stream, 0);
$this->current = null;
$this->index = 0;
}
/**
* @throws \RuntimeException if unable to read from php://temp
*/
public function next(): void {
if(feof($this->stream) || $this->count === $this->index) { // if nothing to read, no data left
$this->current = null;
return;
}
$line = fgets($this->stream);
if($line === false) {
$this->current = null;
if(feof($this->stream)){
return;
} else {
throw new \RuntimeException('could not read line from php://temp at index '.$this->index.' of '.$this->count.' items');
}
}
$line = substr($line, 0, strlen($line) - 1); // cut off line break
$compressed = stripcslashes($line);
/** @var string|false $serialized */
$serialized = match($this->compression){
self::COMPRESSION_GZIP => gzinflate($compressed),
self::COMPRESSION_LZ4 => lz4_uncompress($compressed),
default => $compressed
};
if($serialized === false){
throw new \RuntimeException('failed to uncompress data');
}
$this->current = $serialized;
$this->index++;
}
/**
* @psalm-return E
* @throws \RuntimeException if next() and valid() have not been called before or EOF is reached
*/
public function current(): mixed {
if($this->current === null) {
throw new \RuntimeException('index out of bounds (you might want to call next() and/or valid() before!)');
}
/** @psalm-var E $res */
$res = match($this->serializer){
self::SERIALIZER_IGBINARY => igbinary_unserialize($this->current),
self::SERIALIZER_MSGPACK => msgpack_unserialize($this->current),
default => unserialize($this->current)
};
return $res;
}
/**
* {@inheritDoc}
* @see \Iterator::key()
* @psalm-return int<0, max>
* @psalm-mutation-free
*/
public function key(): int {
return max(0, $this->index - 1);
}
public function valid(): bool {
if($this->current === null) {
$this->next();
}
return $this->current !== null;
}
/**
* @return int|null size in bytes or null if unknown
* @psalm-mutation-free
*/
public function getSize(): ?int {
return fstat($this->stream)['size'] ?? null;
}
/**
* {@inheritDoc}
* @see \Countable::count()
* @psalm-return int<0, max>
* @psalm-mutation-free
*/
public function count(): int {
return $this->count;
}
/**
* @param string|resource $dest filename or resource to write to
* @param int $flags see json_encode for documentation
* @param int $depth see json_encode for documentation
* @psalm-param int<1, 2147483647> $depth
* @throws \RuntimeException
*/
public function toJSONFile($dest, int $flags = JSON_THROW_ON_ERROR, int $depth = 512): void {
if(is_string($dest)){
$stream = fopen($dest, 'w');
if($stream === false){
throw new \RuntimeException('unable to open file: '.$dest);
}
} else {
$stream = $dest;
}
fwrite($stream, '[');
$first = true;
foreach($this as $item){
if(!$first){
fwrite($stream, ',');
}
if(($flags & JSON_PRETTY_PRINT) > 0){
fwrite($stream, PHP_EOL.' ');
}
$json = json_encode($item, $flags, $depth);
if($json === false){
if(is_string($dest)){
fclose($stream);
}
throw new \RuntimeException('failed to serialize data');
}
fwrite($stream, $json);
fflush($stream);
$first = false;
}
if(($flags & JSON_PRETTY_PRINT) > 0){
fwrite($stream, PHP_EOL);
}
fwrite($stream, ']');
if(is_string($dest)){
fclose($stream);
}
}
/**
* @psalm-return list<E>
*/
public function toArray(): array {
$res = [];
foreach($this as $item){
$res[] = $item;
}
return $res;
}
/**
* @psalm-return \SplFixedArray<E>
*/
public function toFixedArray(): \SplFixedArray {
$res = new \SplFixedArray($this->count);
foreach($this as $idx => $item){
$res[$idx] = $item;
}
return $res;
}
/**
* @return \Generator send something other than null to terminate
* @psalm-return \Generator<int, E, mixed, void>
*/
public function toGenerator(): \Generator {
foreach($this as $item){
$cmd = yield $item;
if($cmd !== null){
break;
}
}
}
public function __destruct() {
/**
* @psalm-suppress InvalidPropertyAssignmentValue
*/
fclose($this->stream);
}
}