Skip to content

Commit 0494974

Browse files
committed
Make the czar protocol supports 65536 streams
1 parent 652f689 commit 0494974

6 files changed

Lines changed: 60 additions & 56 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ $ daze client ... -p baboon
8989

9090
**Czar**
9191

92-
Protocol czar is an implementation of the ashe protocol based on TCP multiplexing. Multiplexing involves reusing a single TCP connection for multiple ashe protocols, which saves time on the TCP three-way handshake. However, this may result in a slight decrease in data transfer rate (approximately 0.19%). In most cases, using Protocol czar provides a better user experience compared to using the ashe protocol directly.
92+
Protocol czar is an implementation of the ashe protocol based on TCP multiplexing. Multiplexing involves reusing a single TCP connection for multiple ashe protocols, which saves time on the TCP three-way handshake. However, this may result in a slight decrease in data transfer rate (approximately 0.29%). In most cases, using Protocol czar provides a better user experience compared to using the ashe protocol directly.
9393

9494
```sh
9595
$ daze server ... -p czar

protocol/czar/engine.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,42 +26,43 @@ import (
2626
//
2727
// To open a stream:
2828
//
29-
// +-----+-----+-----+-----+
30-
// | 0x0 | Sid | Rsv |
31-
// +-----+-----+-----+-----+
29+
// +-----+-----+-----+-----+-----+-----+-----+-----+
30+
// | 0x0 | Rsv | Sid | Rsv |
31+
// +-----+-----+-----+-----+-----+-----+-----+-----+
3232
//
3333
// Both server and client can push data to each other.
3434
//
35-
// +-----+-----+-----+-----+-----+-----+
36-
// | 0x1 | Sid | Len | Msg |
37-
// +-----+-----+-----+-----+-----+-----+
35+
// +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
36+
// | 0x1 | Rsv | Sid | Len | Rsv | Msg |
37+
// +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
3838
//
3939
// Close the specified stream.
4040
//
41-
// +-----+-----+-----+-----+
42-
// | 0x2 | Sid | 0/1 | Rsv |
43-
// +-----+-----+-----+-----+
41+
// +-----+-----+-----+-----+-----+-----+-----+-----+
42+
// | 0x2 | 0/1 | Sid | Rsv |
43+
// +-----+-----+-----+-----+-----+-----+-----+-----+
4444
//
4545
// Keep alive probe and reply.
4646
//
47-
// +-----+-----+-----+-----+
48-
// | 0x3 | 0/1 | Rsv |
49-
// +-----+-----+-----+-----+
47+
// +-----+-----+-----+-----+-----+-----+-----+-----+
48+
// | 0x3 | 0/1 | Rsv |
49+
// +-----+-----+-----+-----+-----+-----+-----+-----+
5050

5151
// Conf is acting as package level configuration.
5252
var Conf = struct {
5353
// The duration a connection needs to be idle before mux begins sending out keep-alive probe.
5454
IdleProbeDuration time.Duration
5555
// If no data is read for more than this time, the connection is closed.
5656
IdleReplyDuration time.Duration
57-
// Packet size. Since the size of the packet header is 4, this value must be greater than 4. If the value is too
58-
// small, the transmission efficiency will be reduced, and if it is too large, the concurrency capability of mux
59-
// will be reduced.
57+
// Packet size. The value must be greater than 6. If the value is too small, the transmission efficiency will be
58+
// reduced, and if it is too large, the concurrency capability of mux will be reduced.
6059
PacketSize int
60+
StreamPool int
6161
}{
6262
IdleProbeDuration: time.Second * 32,
6363
IdleReplyDuration: time.Second * 48,
6464
PacketSize: 2048,
65+
StreamPool: 256,
6566
}
6667

6768
// Server implemented the czar protocol.
@@ -163,7 +164,7 @@ func (c *Client) Dial(ctx *daze.Context, network string, address string) (io.Rea
163164
if err != nil {
164165
return nil, err
165166
}
166-
log.Printf("czar: mux slot stream id=0x%02x", srv.idx)
167+
log.Printf("czar: mux slot stream id=0x%04x", srv.idx)
167168
spy := &ashe.Client{Cipher: c.Cipher}
168169
con, err := spy.Estab(ctx, srv, network, address)
169170
if err != nil {

protocol/czar/mux.go

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
// A Stream managed by the multiplexer.
1414
type Stream struct {
15-
idx uint8
15+
idx uint16
1616
mux *Mux
1717
rbf []byte
1818
rch chan []byte
@@ -28,10 +28,10 @@ func (s *Stream) Close() error {
2828
s.wer.Put(io.ErrClosedPipe)
2929
s.zo0.Do(func() {
3030
s.mux.pri.Pri(0, func() error {
31-
mph := make([]byte, 4)
31+
mph := make([]byte, 8)
3232
mph[0] = 0x02
33-
mph[1] = s.idx
34-
mph[2] = 0x00
33+
mph[1] = 0x00
34+
binary.BigEndian.PutUint16(mph[2:4], s.idx)
3535
s.mux.con.Write(mph)
3636
return nil
3737
})
@@ -45,10 +45,10 @@ func (s *Stream) Esolc() error {
4545
s.wer.Put(io.ErrClosedPipe)
4646
s.zo0.Do(func() {
4747
s.mux.pri.Pri(0, func() error {
48-
mph := make([]byte, 4)
48+
mph := make([]byte, 8)
4949
mph[0] = 0x02
50-
mph[1] = s.idx
51-
mph[2] = 0x01
50+
mph[1] = 0x01
51+
binary.BigEndian.PutUint16(mph[2:4], s.idx)
5252
s.mux.con.Write(mph)
5353
return nil
5454
})
@@ -97,19 +97,20 @@ func (s *Stream) Write(p []byte) (int, error) {
9797
)
9898
for {
9999
switch {
100-
case len(p) >= Conf.PacketSize-4:
100+
case len(p) >= Conf.PacketSize-8:
101101
buf = make([]byte, Conf.PacketSize)
102-
l = Conf.PacketSize - 4
102+
l = Conf.PacketSize - 8
103103
case len(p) >= 1:
104-
buf = make([]byte, 4+len(p))
104+
buf = make([]byte, 8+len(p))
105105
l = len(p)
106106
case len(p) >= 0:
107107
return n, nil
108108
}
109109
buf[0] = 0x01
110-
buf[1] = s.idx
111-
binary.BigEndian.PutUint16(buf[2:4], uint16(l))
112-
copy(buf[4:], p[:l])
110+
buf[1] = 0x00
111+
binary.BigEndian.PutUint16(buf[2:4], s.idx)
112+
binary.BigEndian.PutUint16(buf[4:6], uint16(l))
113+
copy(buf[8:], p[:l])
113114
p = p[l:]
114115
err := s.mux.pri.Pri(1, func() error {
115116
if err := s.wer.Get(); err != nil {
@@ -130,7 +131,7 @@ func (s *Stream) Write(p []byte) (int, error) {
130131
}
131132

132133
// NewStream returns a new Stream.
133-
func NewStream(idx uint8, mux *Mux) *Stream {
134+
func NewStream(idx uint16, mux *Mux) *Stream {
134135
return &Stream{
135136
idx: idx,
136137
mux: mux,
@@ -144,7 +145,7 @@ func NewStream(idx uint8, mux *Mux) *Stream {
144145
}
145146

146147
// NewWither returns a new Stream. Stream has been automatically closed, used as a placeholder.
147-
func NewWither(idx uint8, mux *Mux) *Stream {
148+
func NewWither(idx uint16, mux *Mux) *Stream {
148149
stm := NewStream(idx, mux)
149150
stm.zo0.Do(func() {})
150151
stm.zo1.Do(func() {})
@@ -177,17 +178,17 @@ func (m *Mux) Close() error {
177178
func (m *Mux) Open() (*Stream, error) {
178179
var (
179180
err error
180-
idx uint8
181+
idx uint16
181182
stm *Stream
182183
)
183184
idx, err = m.idp.Get()
184185
if err != nil {
185186
return nil, err
186187
}
187188
err = m.pri.Pri(0, func() error {
188-
mph := make([]byte, 4)
189+
mph := make([]byte, 8)
189190
mph[0] = 0x00
190-
mph[1] = idx
191+
binary.BigEndian.PutUint16(mph[2:4], idx)
191192
return doa.Err(m.con.Write(mph))
192193
})
193194
if err != nil {
@@ -203,15 +204,16 @@ func (m *Mux) Open() (*Stream, error) {
203204
func (m *Mux) Recv() {
204205
var (
205206
bsz uint16
206-
buf = make([]byte, 4)
207+
buf = make([]byte, 8)
207208
cmd uint8
209+
cmz uint8
208210
err error
209-
idx uint8
211+
idx uint16
210212
msg []byte
211213
old *Stream
212214
prb = time.AfterFunc(Conf.IdleProbeDuration, func() {
213215
if m.pri.Pri(0, func() error {
214-
mph := make([]byte, 4)
216+
mph := make([]byte, 8)
215217
mph[0] = 0x03
216218
mph[1] = 0x00
217219
return doa.Err(m.con.Write(mph))
@@ -227,13 +229,14 @@ func (m *Mux) Recv() {
227229
for {
228230
prb.Reset(Conf.IdleProbeDuration)
229231
rst.Reset(Conf.IdleReplyDuration)
230-
_, err = io.ReadFull(m.con, buf[:4])
232+
_, err = io.ReadFull(m.con, buf)
231233
if err != nil {
232234
m.rer.Put(err)
233235
break
234236
}
235237
cmd = buf[0]
236-
idx = buf[1]
238+
cmz = buf[1]
239+
idx = binary.BigEndian.Uint16(buf[2:4])
237240
if cmd >= 0x04 {
238241
// Packet format error, connection closed.
239242
m.con.Close()
@@ -252,7 +255,7 @@ func (m *Mux) Recv() {
252255
m.usb[idx] = stm
253256
m.ach <- stm
254257
case 0x01:
255-
bsz = binary.BigEndian.Uint16(buf[2:4])
258+
bsz = binary.BigEndian.Uint16(buf[4:6])
256259
msg = make([]byte, bsz)
257260
_, err = io.ReadFull(m.con, msg)
258261
if err != nil {
@@ -273,10 +276,10 @@ func (m *Mux) Recv() {
273276
old = NewWither(idx, m)
274277
m.usb[idx] = old
275278
case 0x03:
276-
switch idx {
279+
switch cmz {
277280
case 0x00:
278281
m.pri.Pri(0, func() error {
279-
mph := make([]byte, 4)
282+
mph := make([]byte, 8)
280283
mph[0] = 0x03
281284
mph[1] = 0x01
282285
return doa.Err(m.con.Write(mph))
@@ -296,16 +299,16 @@ func NewMux(conn io.ReadWriteCloser) *Mux {
296299
idp: NewSip(),
297300
pri: priority.NewPriority(2),
298301
rer: NewErr(),
299-
usb: make([]*Stream, 256),
302+
usb: make([]*Stream, Conf.StreamPool),
300303
}
301304
return mux
302305
}
303306

304307
// NewMuxServer returns a new MuxServer.
305308
func NewMuxServer(conn io.ReadWriteCloser) *Mux {
306309
mux := NewMux(conn)
307-
for i := range 256 {
308-
mux.usb[i] = NewWither(uint8(i), mux)
310+
for i := range Conf.StreamPool {
311+
mux.usb[i] = NewWither(uint16(i), mux)
309312
}
310313
go mux.Recv()
311314
return mux

protocol/czar/mux_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ func TestProtocolCzarMuxServerReopen(t *testing.T) {
145145
cli := doa.Try(net.Dial("tcp", DazeTesterListenOn))
146146
defer cli.Close()
147147

148-
mph := make([]byte, 4)
149-
mph[0] = 0x00 // Cmd open stream
150-
mph[1] = 0x00 // Sid
148+
mph := make([]byte, 8)
149+
mph[0] = 0x00 // Cmd open stream
150+
binary.BigEndian.PutUint16(mph[2:4], 0x00) // Sid
151151
cli.Write(mph)
152152
cli.Write(mph)
153153
buf := make([]byte, 1)

protocol/czar/sip.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,28 @@ type Sip struct {
1616
}
1717

1818
// Get selects an stream id from the pool, removes it from the pool, and returns it to the caller.
19-
func (s *Sip) Get() (uint8, error) {
19+
func (s *Sip) Get() (uint16, error) {
2020
s.m.Lock()
2121
defer s.m.Unlock()
2222
n := big.NewInt(0).Not(s.i)
2323
m := n.TrailingZeroBits()
24-
if m == 256 {
24+
if m == uint(Conf.StreamPool) {
2525
return 0, errors.New("daze: out of stream")
2626
}
2727
s.i.SetBit(s.i, int(m), 1)
28-
return uint8(m), nil
28+
return uint16(m), nil
2929
}
3030

3131
// Put adds x to the pool.
32-
func (s *Sip) Put(x uint8) {
32+
func (s *Sip) Put(x uint16) {
3333
s.m.Lock()
3434
defer s.m.Unlock()
3535
doa.Doa(s.i.Bit(int(x)) == 1)
3636
s.i = s.i.SetBit(s.i, int(x), 0)
3737
}
3838

3939
// Set removes x from the pool.
40-
func (s *Sip) Set(x uint8) {
40+
func (s *Sip) Set(x uint16) {
4141
s.m.Lock()
4242
defer s.m.Unlock()
4343
doa.Doa(s.i.Bit(int(x)) == 0)

protocol/czar/sip_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88

99
func TestProtocolCzarSip(t *testing.T) {
1010
sid := NewSip()
11-
for i := range 256 {
12-
doa.Doa(doa.Try(sid.Get()) == uint8(i))
11+
for i := range Conf.StreamPool {
12+
doa.Doa(doa.Try(sid.Get()) == uint16(i))
1313
}
1414
doa.Doa(doa.Err(sid.Get()) != nil)
1515
sid.Put(65)

0 commit comments

Comments
 (0)