Skip to content

Commit 75af995

Browse files
committed
Make the czar protocol supports 65536 streams
1 parent 9e8b700 commit 75af995

5 files changed

Lines changed: 52 additions & 36 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ $ daze client ... -p baboon
8989

9090
**Czar**
9191

92-
Protocol czar is an implementation of the ashe protocol based on TCP multiplexing. Multiplexing involves reusing a single TCP connection for multiple ashe protocols, which saves time on the TCP three-way handshake. However, this may result in a slight decrease in data transfer rate (approximately 0.19%). In most cases, using Protocol czar provides a better user experience compared to using the ashe protocol directly.
92+
Protocol czar is an implementation of the ashe protocol based on TCP multiplexing. Multiplexing involves reusing a single TCP connection for multiple ashe protocols, which saves time on the TCP three-way handshake. However, this may result in a slight decrease in data transfer rate (approximately 0.29%). In most cases, using Protocol czar provides a better user experience compared to using the ashe protocol directly.
9393

9494
```sh
9595
$ daze server ... -p czar

protocol/czar/engine.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,25 @@ import (
2727
// To open a stream:
2828
//
2929
// +-----+-----+-----+-----+
30-
// | 0 | Sid | Rsv |
30+
// | 0x0 | Rsv | Sid |
3131
// +-----+-----+-----+-----+
3232
//
3333
// Both server and client can push data to each other.
3434
//
35-
// +-----+-----+-----+-----+-----+-----+
36-
// | 1 | Sid | Len | Msg |
37-
// +-----+-----+-----+-----+-----+-----+
35+
// +-----+-----+-----+-----+-----+-----+-----+-----+
36+
// | 0x1 | Rsv | Sid | Len | Msg |
37+
// +-----+-----+-----+-----+-----+-----+-----+-----+
3838
//
3939
// Close the specified stream.
4040
//
4141
// +-----+-----+-----+-----+
42-
// | 2 | Sid | 0/1 | Rsv |
42+
// | 0x2 | 0/1 | Sid |
4343
// +-----+-----+-----+-----+
4444
//
4545
// Keep alive probe and reply.
4646
//
4747
// +-----+-----+-----+-----+
48-
// | 3 | 0/1 | Rsv |
48+
// | 0x3 | 0/1 | Rsv |
4949
// +-----+-----+-----+-----+
5050

5151
// Conf is acting as package level configuration.
@@ -54,14 +54,16 @@ var Conf = struct {
5454
IdleProbeDuration time.Duration
5555
// If no data is read for more than this time, the connection is closed.
5656
IdleReplyDuration time.Duration
57-
// Packet size. Since the size of the packet header is 4, this value must be greater than 4. If the value is too
57+
// Packet size. Since the size of the packet header is 6, this value must be greater than 6. If the value is too
5858
// small, the transmission efficiency will be reduced, and if it is too large, the concurrency capability of mux
5959
// will be reduced.
6060
PacketSize int
61+
StreamPool int
6162
}{
6263
IdleProbeDuration: time.Second * 32,
6364
IdleReplyDuration: time.Second * 48,
6465
PacketSize: 2048,
66+
StreamPool: 256,
6567
}
6668

6769
// Server implemented the czar protocol.
@@ -163,7 +165,7 @@ func (c *Client) Dial(ctx *daze.Context, network string, address string) (io.Rea
163165
if err != nil {
164166
return nil, err
165167
}
166-
log.Printf("czar: mux slot stream id=0x%02x", srv.idx)
168+
log.Printf("czar: mux slot stream id=0x%04x", srv.idx)
167169
spy := &ashe.Client{Cipher: c.Cipher}
168170
con, err := spy.Estab(ctx, srv, network, address)
169171
if err != nil {

protocol/czar/mux.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
// A Stream managed by the multiplexer.
1414
type Stream struct {
15-
idx uint8
15+
idx uint16
1616
mux *Mux
1717
rbf []byte
1818
rch chan []byte
@@ -28,7 +28,9 @@ func (s *Stream) Close() error {
2828
s.wer.Put(io.ErrClosedPipe)
2929
s.zo0.Do(func() {
3030
s.mux.pri.Pri(0, func() error {
31-
s.mux.con.Write([]byte{0x02, s.idx, 0x00, 0x00})
31+
buf := [4]byte{0x02, 0x00}
32+
binary.BigEndian.PutUint16(buf[2:4], s.idx)
33+
s.mux.con.Write(buf[:])
3234
return nil
3335
})
3436
})
@@ -41,7 +43,9 @@ func (s *Stream) Esolc() error {
4143
s.wer.Put(io.ErrClosedPipe)
4244
s.zo0.Do(func() {
4345
s.mux.pri.Pri(0, func() error {
44-
s.mux.con.Write([]byte{0x02, s.idx, 0x01, 0x00})
46+
buf := [4]byte{0x02, 0x01}
47+
binary.BigEndian.PutUint16(buf[2:4], s.idx)
48+
s.mux.con.Write(buf[:])
4549
return nil
4650
})
4751
})
@@ -89,19 +93,20 @@ func (s *Stream) Write(p []byte) (int, error) {
8993
)
9094
for {
9195
switch {
92-
case len(p) >= Conf.PacketSize-4:
96+
case len(p) >= Conf.PacketSize-6:
9397
buf = make([]byte, Conf.PacketSize)
94-
l = Conf.PacketSize - 4
98+
l = Conf.PacketSize - 6
9599
case len(p) >= 1:
96-
buf = make([]byte, 4+len(p))
100+
buf = make([]byte, 6+len(p))
97101
l = len(p)
98102
case len(p) >= 0:
99103
return n, nil
100104
}
101105
buf[0] = 0x01
102-
buf[1] = s.idx
103-
binary.BigEndian.PutUint16(buf[2:4], uint16(l))
104-
copy(buf[4:], p[:l])
106+
buf[1] = 0x00
107+
binary.BigEndian.PutUint16(buf[2:4], s.idx)
108+
binary.BigEndian.PutUint16(buf[4:6], uint16(l))
109+
copy(buf[6:], p[:l])
105110
p = p[l:]
106111
err := s.mux.pri.Pri(1, func() error {
107112
if err := s.wer.Get(); err != nil {
@@ -122,7 +127,7 @@ func (s *Stream) Write(p []byte) (int, error) {
122127
}
123128

124129
// NewStream returns a new Stream.
125-
func NewStream(idx uint8, mux *Mux) *Stream {
130+
func NewStream(idx uint16, mux *Mux) *Stream {
126131
return &Stream{
127132
idx: idx,
128133
mux: mux,
@@ -136,7 +141,7 @@ func NewStream(idx uint8, mux *Mux) *Stream {
136141
}
137142

138143
// NewWither returns a new Stream. Stream has been automatically closed, used as a placeholder.
139-
func NewWither(idx uint8, mux *Mux) *Stream {
144+
func NewWither(idx uint16, mux *Mux) *Stream {
140145
stm := NewStream(idx, mux)
141146
stm.zo0.Do(func() {})
142147
stm.zo1.Do(func() {})
@@ -169,15 +174,17 @@ func (m *Mux) Close() error {
169174
func (m *Mux) Open() (*Stream, error) {
170175
var (
171176
err error
172-
idx uint8
177+
idx uint16
173178
stm *Stream
174179
)
175180
idx, err = m.idp.Get()
176181
if err != nil {
177182
return nil, err
178183
}
179184
err = m.pri.Pri(0, func() error {
180-
return doa.Err(m.con.Write([]byte{0x00, idx, 0x00, 0x00}))
185+
buf := [4]byte{0x00, 0x00}
186+
binary.BigEndian.PutUint16(buf[2:4], idx)
187+
return doa.Err(m.con.Write(buf[:]))
181188
})
182189
if err != nil {
183190
m.idp.Put(idx)
@@ -194,8 +201,9 @@ func (m *Mux) Recv() {
194201
bsz uint16
195202
buf = make([]byte, 4)
196203
cmd uint8
204+
cme uint8
197205
err error
198-
idx uint8
206+
idx uint16
199207
msg []byte
200208
old *Stream
201209
prb = time.AfterFunc(Conf.IdleProbeDuration, func() {
@@ -219,7 +227,8 @@ func (m *Mux) Recv() {
219227
break
220228
}
221229
cmd = buf[0]
222-
idx = buf[1]
230+
cme = buf[1]
231+
idx = binary.BigEndian.Uint16(buf[2:4])
223232
if cmd >= 0x04 {
224233
// Packet format error, connection closed.
225234
m.con.Close()
@@ -238,7 +247,12 @@ func (m *Mux) Recv() {
238247
m.usb[idx] = stm
239248
m.ach <- stm
240249
case 0x01:
241-
bsz = binary.BigEndian.Uint16(buf[2:4])
250+
_, err = io.ReadFull(m.con, buf[0:2])
251+
if err != nil {
252+
m.con.Close()
253+
break
254+
}
255+
bsz = binary.BigEndian.Uint16(buf[0:2])
242256
msg = make([]byte, bsz)
243257
_, err = io.ReadFull(m.con, msg)
244258
if err != nil {
@@ -259,7 +273,7 @@ func (m *Mux) Recv() {
259273
old = NewWither(idx, m)
260274
m.usb[idx] = old
261275
case 0x03:
262-
switch idx {
276+
switch cme {
263277
case 0x00:
264278
m.pri.Pri(0, func() error {
265279
return doa.Err(m.con.Write([]byte{0x03, 0x01, 0x00, 0x00}))
@@ -279,16 +293,16 @@ func NewMux(conn io.ReadWriteCloser) *Mux {
279293
idp: NewSip(),
280294
pri: priority.NewPriority(2),
281295
rer: NewErr(),
282-
usb: make([]*Stream, 256),
296+
usb: make([]*Stream, Conf.StreamPool),
283297
}
284298
return mux
285299
}
286300

287301
// NewMuxServer returns a new MuxServer.
288302
func NewMuxServer(conn io.ReadWriteCloser) *Mux {
289303
mux := NewMux(conn)
290-
for i := range 256 {
291-
mux.usb[i] = NewWither(uint8(i), mux)
304+
for i := range Conf.StreamPool {
305+
mux.usb[i] = NewWither(uint16(i), mux)
292306
}
293307
go mux.Recv()
294308
return mux

protocol/czar/sip.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,28 @@ type Sip struct {
1616
}
1717

1818
// Get selects an stream id from the pool, removes it from the pool, and returns it to the caller.
19-
func (s *Sip) Get() (uint8, error) {
19+
func (s *Sip) Get() (uint16, error) {
2020
s.m.Lock()
2121
defer s.m.Unlock()
2222
n := big.NewInt(0).Not(s.i)
2323
m := n.TrailingZeroBits()
24-
if m == 256 {
24+
if m == uint(Conf.StreamPool) {
2525
return 0, errors.New("daze: out of stream")
2626
}
2727
s.i.SetBit(s.i, int(m), 1)
28-
return uint8(m), nil
28+
return uint16(m), nil
2929
}
3030

3131
// Put adds x to the pool.
32-
func (s *Sip) Put(x uint8) {
32+
func (s *Sip) Put(x uint16) {
3333
s.m.Lock()
3434
defer s.m.Unlock()
3535
doa.Doa(s.i.Bit(int(x)) == 1)
3636
s.i = s.i.SetBit(s.i, int(x), 0)
3737
}
3838

3939
// Set removes x from the pool.
40-
func (s *Sip) Set(x uint8) {
40+
func (s *Sip) Set(x uint16) {
4141
s.m.Lock()
4242
defer s.m.Unlock()
4343
doa.Doa(s.i.Bit(int(x)) == 0)

protocol/czar/sip_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88

99
func TestProtocolCzarSip(t *testing.T) {
1010
sid := NewSip()
11-
for i := range 256 {
12-
doa.Doa(doa.Try(sid.Get()) == uint8(i))
11+
for i := range Conf.StreamPool {
12+
doa.Doa(doa.Try(sid.Get()) == uint16(i))
1313
}
1414
doa.Doa(doa.Err(sid.Get()) != nil)
1515
sid.Put(65)

0 commit comments

Comments
 (0)