|
1 | 1 | 'use strict' |
2 | 2 |
|
3 | | -var snappy = require('snappyjs') |
4 | | -var msgpack = require('msgpack-lite') |
| 3 | +var pumpify = require('pumpify') |
| 4 | +var through = require('through2') |
5 | 5 | var lpStream = require('length-prefixed-stream') |
6 | | -var Transform = require('readable-stream/transform') |
| 6 | +var msgpack = require('msgpack-lite') |
| 7 | +var snappy = require('snappyjs') |
7 | 8 |
|
8 | | -var msgpackEncodeStream = new Transform({ |
9 | | - objectMode: true, |
10 | | - transform: function (chunk, encoding, next) { |
11 | | - next(null, msgpack.encode(chunk)) |
12 | | - } |
13 | | -}) |
| 9 | +module.exports.createEncodeStream = function SnappyMsgpackEncodeStream (stream) { |
| 10 | + var msgEncode = through.obj(function (data, enc, next) { |
| 11 | + next(null, msgpack.encode(data)) |
| 12 | + }) |
14 | 13 |
|
15 | | -var msgpackDecodeStream = new Transform({ |
16 | | - objectMode: true, |
17 | | - transform: function (chunk, encoding, callback) { |
18 | | - callback(null, msgpack.decode(chunk)) |
19 | | - } |
20 | | -}) |
| 14 | + var snappyCompress = through.obj(function (data, enc, next) { |
| 15 | + next(null, snappy.compress(data)) |
| 16 | + }) |
21 | 17 |
|
22 | | -var snappyCompressStream = new Transform({ |
23 | | - transform: function (chunk, encoding, next) { |
24 | | - next(null, snappy.compress(chunk)) |
25 | | - } |
26 | | -}) |
| 18 | + return pumpify.obj(msgEncode, snappyCompress, lpStream.encode()) |
| 19 | +} |
27 | 20 |
|
28 | | -var snappyUncompressStream = new Transform({ |
29 | | - transform: function (chunk, encoding, callback) { |
30 | | - callback(null, snappy.uncompress(chunk)) |
31 | | - } |
32 | | -}) |
| 21 | +module.exports.createDecodeStream = function SnappyMsgpackDecodeStream (stream) { |
| 22 | + var msgDecode = through.obj(function (data, enc, next) { |
| 23 | + next(null, msgpack.decode(data)) |
| 24 | + }) |
33 | 25 |
|
34 | | -exports.createEncodeStream = function SnappyMsgpackEncodeStream () { |
35 | | - msgpackEncodeStream.pipe(snappyCompressStream).pipe(lpStream.encode()) |
36 | | - return msgpackEncodeStream |
37 | | -} |
| 26 | + var snappyUncompress = through.obj(function (data, enc, next) { |
| 27 | + next(null, snappy.uncompress(data)) |
| 28 | + }) |
38 | 29 |
|
39 | | -exports.createDecodeStream = function SnappyMsgpackDecodeStream () { |
40 | | - return lpStream.decode().pipe(snappyUncompressStream).pipe(msgpackDecodeStream) |
| 30 | + return pumpify.obj(lpStream.decode(), snappyUncompress, msgDecode) |
41 | 31 | } |
0 commit comments