@@ -20,13 +20,12 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
2020THE SOFTWARE.
2121*/
2222
23- var msgpack = require ( 'msgpack-js' ) ;
24- var snappy = require ( '@klaus_trainer/snappyjs' ) ;
25- var through = require ( 'through' ) ;
26- var bops = require ( 'bops' ) ;
23+ var msgpack = require ( 'msgpack-js' )
24+ var snappy = require ( '@klaus_trainer/snappyjs' )
25+ var through = require ( 'through' )
26+ var bops = require ( 'bops' )
2727
28-
29- ////////////////////////////////////////////////////////////////////////////////
28+ // //////////////////////////////////////////////////////////////////////////////
3029
3130// Transport is a connection between two Agents. It lives on top of a duplex,
3231// binary stream.
@@ -46,106 +45,113 @@ function () {
4645}
4746
4847function send ( output , message ) {
49- // Uncomment to debug protocol
50- // console.log(process.pid + " -> " + inspect(message, false, 2, true));
48+ // Uncomment to debug protocol
49+ // console.log(process.pid + " -> " + inspect(message, false, 2, true));
5150
52- // Serialize the messsage.
53- var encodedMessage = msgpack . encode ( message ) ;
54- var frame = snappy . compress ( encodedMessage ) ;
51+ // Serialize the messsage.
52+ var encodedMessage = msgpack . encode ( message )
53+ var frame = snappy . compress ( encodedMessage )
5554
56- // Send a 4 byte length header before the frame.
57- var header = bops . create ( 4 ) ;
58- bops . writeUInt32BE ( header , frame . byteLength , 0 ) ;
59- output . emit ( 'data' , header ) ;
55+ // Send a 4 byte length header before the frame.
56+ var header = bops . create ( 4 )
57+ bops . writeUInt32BE ( header , frame . byteLength , 0 )
58+ output . emit ( 'data' , header )
6059
61- // Send the serialized message.
62- return output . emit ( 'data' , frame ) ;
60+ // Send the serialized message.
61+ return output . emit ( 'data' , frame )
6362}
6463
6564exports . createDecodeStream =
66- function ( ) {
67- var stream
68- return stream = through ( deFramer ( function ( frame ) {
69- // console.log(frame)
70- var message ;
71- try {
72- var uncompressedFrame = snappy . uncompress ( frame ) ;
73- message = msgpack . decode ( uncompressedFrame ) ;
74- } catch ( err ) {
75- return stream . emit ( "error" , err ) ;
76- }
77-
78- stream . emit ( 'data' , message )
79- } ) )
65+ function ( ) {
66+ var stream = through ( deFramer ( function ( frame ) {
67+ // console.log(frame)
68+ var message
69+
70+ try {
71+ var uncompressedFrame = snappy . uncompress ( frame )
72+ message = msgpack . decode ( uncompressedFrame )
73+ } catch ( err ) {
74+ return stream . emit ( 'error' , err )
75+ }
76+
77+ stream . emit ( 'data' , message )
78+ } ) )
79+
80+ return stream
8081}
8182
82- function Transport ( input , output ) {
83- var parse = deFramer ( function ( frame ) {
84- var message ;
85- try {
86- var uncompressedFrame = snappy . uncompress ( frame ) ;
87- message = msgpack . decode ( uncompressedFrame ) ;
88- } catch ( err ) {
89- return self . emit ( "error" , err ) ;
90- }
91- // console.log(process.pid + " <- " + inspect(message, false, 2, true));
92- self . emit ( "message" , message ) ;
93- } ) ;
94-
95- // Route data chunks to the parser, but check for errors
96- function onData ( chunk ) {
97- try {
98- parse ( chunk ) ;
99- } catch ( err ) {
100- self . emit ( "error" , err ) ;
101- }
83+ /* eslint-disable no-unused-vars */
84+ function Transport ( input , output ) {
85+ /* global self */
86+ var parse = deFramer ( function ( frame ) {
87+ var message
88+
89+ try {
90+ var uncompressedFrame = snappy . uncompress ( frame )
91+ message = msgpack . decode ( uncompressedFrame )
92+ } catch ( err ) {
93+ return self . emit ( 'error' , err )
10294 }
95+
96+ // console.log(process.pid + " <- " + inspect(message, false, 2, true));
97+ self . emit ( 'message' , message )
98+ } )
99+
100+ // Route data chunks to the parser, but check for errors
101+ function onData ( chunk ) {
102+ try {
103+ parse ( chunk )
104+ } catch ( err ) {
105+ self . emit ( 'error' , err )
106+ }
107+ }
103108}
109+ /* eslint-enable no-unused-vars */
104110
105111// A simple state machine that consumes raw bytes and emits frame events.
106112// Returns a parser function that consumes buffers. It emits message buffers
107113// via onMessage callback passed in.
108- function deFramer ( onFrame ) {
109- var buffer ;
110- var state = 0 ;
111- var length = 0 ;
112- var offset ;
113- return function parse ( chunk ) {
114- for ( var i = 0 , l = chunk . length ; i < l ; i ++ ) {
115- switch ( state ) {
116- case 0 : length |= chunk [ i ] << 24 ; state = 1 ; break ;
117- case 1 : length |= chunk [ i ] << 16 ; state = 2 ; break ;
118- case 2 : length |= chunk [ i ] << 8 ; state = 3 ; break ;
119- case 3 : length |= chunk [ i ] ; state = 4 ;
120- offset = 0 ;
121- break ;
122- case 4 :
123- var len = l - i ;
124- var emit = false ;
125- if ( len + offset >= length ) {
126- emit = true ;
127- len = length - offset ;
128- }
129- if ( emit && offset === 0 ) {
130- buffer = bops . subarray ( chunk , i , i + len ) ;
131- } else if ( offset === 0 ) {
132- buffer = bops . create ( length ) ;
133- bops . copy ( chunk , buffer , 0 , i , i + len ) ;
134- } else {
135- bops . copy ( chunk , buffer , offset , i , i + len ) ;
136- }
137- offset += len ;
138- i += len - 1 ;
139- if ( emit ) {
140- state = 0 ;
141- length = 0 ;
142- var _buffer = buffer
143- buffer = undefined ;
144- offset = undefined ;
145- onFrame ( _buffer ) ;
146- }
147- break ;
148- }
149- }
150- } ;
114+ function deFramer ( onFrame ) {
115+ var buffer
116+ var state = 0
117+ var length = 0
118+ var offset
119+ return function parse ( chunk ) {
120+ for ( var i = 0 , l = chunk . length ; i < l ; i ++ ) {
121+ switch ( state ) {
122+ case 0 : length |= chunk [ i ] << 24 ; state = 1 ; break
123+ case 1 : length |= chunk [ i ] << 16 ; state = 2 ; break
124+ case 2 : length |= chunk [ i ] << 8 ; state = 3 ; break
125+ case 3 : length |= chunk [ i ] ; state = 4
126+ offset = 0
127+ break
128+ case 4 :
129+ var len = l - i
130+ var emit = false
131+ if ( len + offset >= length ) {
132+ emit = true
133+ len = length - offset
134+ }
135+ if ( emit && offset === 0 ) {
136+ buffer = bops . subarray ( chunk , i , i + len )
137+ } else if ( offset === 0 ) {
138+ buffer = bops . create ( length )
139+ bops . copy ( chunk , buffer , 0 , i , i + len )
140+ } else {
141+ bops . copy ( chunk , buffer , offset , i , i + len )
142+ }
143+ offset += len
144+ i += len - 1
145+ if ( emit ) {
146+ state = 0
147+ length = 0
148+ var _buffer = buffer
149+ buffer = undefined
150+ offset = undefined
151+ onFrame ( _buffer )
152+ }
153+ break
154+ }
155+ }
156+ }
151157}
0 commit comments