Skip to content

Commit a340ba8

Browse files
committed
Refactor czar
1 parent e08cd5a commit a340ba8

2 files changed

Lines changed: 24 additions & 22 deletions

File tree

protocol/czar/engine.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,25 @@ import (
2727
// To open a stream:
2828
//
2929
// +-----+-----+-----+-----+
30-
// | Sid | 0 | Rsv |
30+
// | 0 | Sid | Rsv |
3131
// +-----+-----+-----+-----+
3232
//
3333
// Both server and client can push data to each other.
3434
//
3535
// +-----+-----+-----+-----+-----+-----+
36-
// | Sid | 1 | Len | Msg |
36+
// | 1 | Sid | Len | Msg |
3737
// +-----+-----+-----+-----+-----+-----+
3838
//
3939
// Close the specified stream.
4040
//
4141
// +-----+-----+-----+-----+
42-
// | Sid | 2 | 0/1 | Rsv |
42+
// | 2 | Sid | 0/1 | Rsv |
4343
// +-----+-----+-----+-----+
4444
//
4545
// Keep alive probe and reply.
4646
//
4747
// +-----+-----+-----+-----+
48-
// | 0x0 | 3 | 0/1 | Rsv |
48+
// | 3 | 0/1 | Rsv |
4949
// +-----+-----+-----+-----+
5050

5151
// Conf is acting as package level configuration.

protocol/czar/mux.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (s *Stream) Close() error {
2828
s.wer.Put(io.ErrClosedPipe)
2929
s.zo0.Do(func() {
3030
s.mux.pri.Pri(0, func() error {
31-
s.mux.con.Write([]byte{s.idx, 0x02, 0x00, 0x00})
31+
s.mux.con.Write([]byte{0x02, s.idx, 0x00, 0x00})
3232
return nil
3333
})
3434
})
@@ -41,7 +41,7 @@ func (s *Stream) Esolc() error {
4141
s.wer.Put(io.ErrClosedPipe)
4242
s.zo0.Do(func() {
4343
s.mux.pri.Pri(0, func() error {
44-
s.mux.con.Write([]byte{s.idx, 0x02, 0x01, 0x00})
44+
s.mux.con.Write([]byte{0x02, s.idx, 0x01, 0x00})
4545
return nil
4646
})
4747
})
@@ -98,8 +98,8 @@ func (s *Stream) Write(p []byte) (int, error) {
9898
case len(p) >= 0:
9999
return n, nil
100100
}
101-
buf[0] = s.idx
102-
buf[1] = 0x01
101+
buf[0] = 0x01
102+
buf[1] = s.idx
103103
binary.BigEndian.PutUint16(buf[2:4], uint16(l))
104104
copy(buf[4:], p[:l])
105105
p = p[l:]
@@ -177,7 +177,7 @@ func (m *Mux) Open() (*Stream, error) {
177177
return nil, err
178178
}
179179
err = m.pri.Pri(0, func() error {
180-
return doa.Err(m.con.Write([]byte{idx, 0x00, 0x00, 0x00}))
180+
return doa.Err(m.con.Write([]byte{0x00, idx, 0x00, 0x00}))
181181
})
182182
if err != nil {
183183
m.idp.Put(idx)
@@ -200,7 +200,7 @@ func (m *Mux) Recv() {
200200
old *Stream
201201
prb = time.AfterFunc(Conf.IdleProbeDuration, func() {
202202
if m.pri.Pri(0, func() error {
203-
return doa.Err(m.con.Write([]byte{0x00, 0x03, 0x00, 0x00}))
203+
return doa.Err(m.con.Write([]byte{0x03, 0x00, 0x00, 0x00}))
204204
}) != nil {
205205
m.Close()
206206
}
@@ -218,10 +218,15 @@ func (m *Mux) Recv() {
218218
m.rer.Put(err)
219219
break
220220
}
221-
idx = buf[0]
222-
cmd = buf[1]
223-
switch {
224-
case cmd == 0x00:
221+
cmd = buf[0]
222+
idx = buf[1]
223+
if cmd >= 0x04 {
224+
// Packet format error, connection closed.
225+
m.con.Close()
226+
break
227+
}
228+
switch cmd {
229+
case 0x00:
225230
// Make sure the stream has been closed properly.
226231
old = m.usb[idx]
227232
if old.rer.Get() == nil || old.wer.Get() == nil {
@@ -232,7 +237,7 @@ func (m *Mux) Recv() {
232237
m.idp.Set(idx)
233238
m.usb[idx] = stm
234239
m.ach <- stm
235-
case cmd == 0x01:
240+
case 0x01:
236241
bsz = binary.BigEndian.Uint16(buf[2:4])
237242
msg = make([]byte, bsz)
238243
_, err = io.ReadFull(m.con, msg)
@@ -248,22 +253,19 @@ func (m *Mux) Recv() {
248253
case stm.rch <- msg:
249254
case <-stm.rer.Sig():
250255
}
251-
case cmd == 0x02:
256+
case 0x02:
252257
stm = m.usb[idx]
253258
stm.Esolc()
254259
old = NewWither(idx, m)
255260
m.usb[idx] = old
256-
case cmd == 0x03:
257-
switch buf[2] {
261+
case 0x03:
262+
switch idx {
258263
case 0x00:
259264
m.pri.Pri(0, func() error {
260-
return doa.Err(m.con.Write([]byte{0x00, 0x03, 0x01, 0x00}))
265+
return doa.Err(m.con.Write([]byte{0x03, 0x01, 0x00, 0x00}))
261266
})
262267
case 0x01:
263268
}
264-
case cmd >= 0x04:
265-
// Packet format error, connection closed.
266-
m.con.Close()
267269
}
268270
}
269271
close(m.ach)

0 commit comments

Comments
 (0)