Skip to content

Commit 6c9dedc

Browse files
committed
feat!: consolidate separate multiplex channels to force consumption of all messages to prevent congestion
1 parent db01f8b commit 6c9dedc

2 files changed

Lines changed: 39 additions & 36 deletions

File tree

pkg/client/stream/multiplex.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,19 @@ func (s Type) IsValid() bool {
3939
return s.Name() != "unknown"
4040
}
4141

42+
type Message struct {
43+
StreamType Type
44+
Content []byte
45+
}
46+
4247
type MultiplexedStream struct {
43-
io.Closer
48+
closer io.Closer
4449
reader io.Reader
4550
writer io.Writer
4651

4752
logger *zap.SugaredLogger
4853

49-
stdOut chan []byte
50-
stdErr chan []byte
51-
systemError chan []byte
54+
messageChan chan Message
5255
done chan struct{}
5356
}
5457

@@ -61,12 +64,10 @@ func NewMultiplexedStream(
6164
logger *zap.SugaredLogger,
6265
) *MultiplexedStream {
6366
m := &MultiplexedStream{
64-
Closer: closer,
67+
closer: closer,
6568
reader: reader,
6669
writer: writer,
67-
stdOut: make(chan []byte, 100),
68-
stdErr: make(chan []byte, 100),
69-
systemError: make(chan []byte, 100),
70+
messageChan: make(chan Message, 100),
7071
done: make(chan struct{}),
7172
logger: logger,
7273
}
@@ -80,16 +81,8 @@ func NewMultiplexedStream(
8081
return m
8182
}
8283

83-
func (m *MultiplexedStream) Stdout() <-chan []byte {
84-
return m.stdOut
85-
}
86-
87-
func (m *MultiplexedStream) Stderr() <-chan []byte {
88-
return m.stdErr
89-
}
90-
91-
func (m *MultiplexedStream) SystemError() <-chan []byte {
92-
return m.systemError
84+
func (m *MultiplexedStream) Messages() chan Message {
85+
return m.messageChan
9386
}
9487

9588
func (m *MultiplexedStream) Done() <-chan struct{} {
@@ -103,11 +96,22 @@ func (m *MultiplexedStream) Write(data []byte) (int, error) {
10396
return m.writer.Write(data)
10497
}
10598

99+
func (m *MultiplexedStream) Close() error {
100+
if m.closer == nil {
101+
return nil
102+
}
103+
err := m.closer.Close()
104+
if err != nil {
105+
m.logger.Errorf("failed to close multiplexed stream: %v", err)
106+
return err
107+
}
108+
m.logger.Debugf("multiplexed stream closed")
109+
return nil
110+
}
111+
106112
func (m *MultiplexedStream) handleSimplexOutput(ctx context.Context) {
107113
defer close(m.done)
108-
defer close(m.stdOut)
109-
defer close(m.stdErr)
110-
defer close(m.systemError)
114+
defer close(m.messageChan)
111115
go func() {
112116
<-ctx.Done()
113117
_ = m.Close()
@@ -116,7 +120,10 @@ func (m *MultiplexedStream) handleSimplexOutput(ctx context.Context) {
116120
scanner := bufio.NewScanner(m.reader)
117121
for ctx.Err() == nil && scanner.Scan() {
118122
line := scanner.Bytes()
119-
m.stdOut <- line
123+
m.messageChan <- Message{
124+
Content: line,
125+
StreamType: TypeStdout,
126+
}
120127
}
121128
err := scanner.Err()
122129
if err != nil {
@@ -126,9 +133,7 @@ func (m *MultiplexedStream) handleSimplexOutput(ctx context.Context) {
126133

127134
func (m *MultiplexedStream) handleMultiplexOutput(ctx context.Context) {
128135
defer close(m.done)
129-
defer close(m.stdOut)
130-
defer close(m.stdErr)
131-
defer close(m.systemError)
136+
defer close(m.messageChan)
132137
go func() {
133138
<-ctx.Done()
134139
_ = m.Close()
@@ -165,15 +170,9 @@ func (m *MultiplexedStream) handleMultiplexOutput(ctx context.Context) {
165170
return
166171
}
167172

168-
switch streamType {
169-
case TypeStdout:
170-
m.stdOut <- data
171-
case TypeStderr:
172-
m.stdErr <- data
173-
case TypeSystemError:
174-
m.systemError <- data
175-
default:
176-
m.logger.Warnf("unexpected stream type: %s", streamType.Name())
173+
m.messageChan <- Message{
174+
StreamType: streamType,
175+
Content: data,
177176
}
178177
}
179178
m.logger.Debugf("multiplexed output handling completed, closing channels")

pkg/client/system_host.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/docker/docker/api/types/container"
1010
"github.com/docker/docker/api/types/network"
11+
"github.com/silenium-dev/docker-wrapper/pkg/client/stream"
1112
"k8s.io/apimachinery/pkg/util/rand"
1213
)
1314

@@ -70,11 +71,14 @@ func (c *Client) SystemHostIPFromContainers(ctx context.Context, netId *string)
7071
if err != nil {
7172
return nil, err
7273
}
73-
ipAddrByteStr, ok := <-multiplex.Stdout()
74+
ipAddrByteMsg, ok := <-multiplex.Messages()
7475
if !ok {
7576
return nil, fmt.Errorf("no output from container %s", cont.ID)
7677
}
77-
ipAddrStr = strings.TrimSpace(string(ipAddrByteStr))
78+
if ipAddrByteMsg.StreamType != stream.TypeStdout {
79+
return nil, fmt.Errorf("unexpected stream type %s from container %s", ipAddrByteMsg.StreamType.Name(), cont.ID)
80+
}
81+
ipAddrStr = strings.TrimSpace(string(ipAddrByteMsg.Content))
7882
} else if netId == nil {
7983
ipAddrStr = inspect.NetworkSettings.Gateway
8084
} else {

0 commit comments

Comments
 (0)