Skip to content

Commit 55f94d3

Browse files
committed
fix: reuse streams across session IDs
1 parent 77c48c1 commit 55f94d3

4 files changed

Lines changed: 34 additions & 107 deletions

File tree

comm/health_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (s *CommunicationHealthTestSuite) TearDownTest() {
4848
}
4949
}
5050

51-
func (s *CommunicationHealthTestSuite) TestCommHealth_AllPearsAvailable() {
51+
func (s *CommunicationHealthTestSuite) TestCommHealth_AllPeersAvailable() {
5252
errors := comm.ExecuteCommHealthCheck(
5353
s.testCommunications[0], peer.IDSlice{s.testHosts[1].ID(), s.testHosts[2].ID()},
5454
)

comm/p2p/libp2p.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ const (
2727
type Libp2pCommunication struct {
2828
SessionSubscriptionManager
2929
h host.Host
30-
protocolID protocol.ID
3130
logger zerolog.Logger
3231
streamManager *StreamManager
3332
}
@@ -37,21 +36,18 @@ func NewCommunication(h host.Host, protocolID protocol.ID) Libp2pCommunication {
3736
c := Libp2pCommunication{
3837
SessionSubscriptionManager: NewSessionSubscriptionManager(),
3938
h: h,
40-
protocolID: protocolID,
4139
logger: logger,
42-
streamManager: NewStreamManager(h),
40+
streamManager: NewStreamManager(h, protocolID),
4341
}
4442

4543
// start processing incoming messages
46-
c.h.SetStreamHandler(c.protocolID, c.StreamHandlerFunc)
44+
c.h.SetStreamHandler(protocolID, c.StreamHandlerFunc)
4745
return c
4846
}
4947

5048
/** Communication interface methods **/
5149

52-
func (c Libp2pCommunication) CloseSession(sessionID string) {
53-
c.streamManager.ReleaseStreams(sessionID)
54-
}
50+
func (c Libp2pCommunication) CloseSession(sessionID string) {}
5551

5652
func (c Libp2pCommunication) Broadcast(
5753
peers peer.IDSlice,
@@ -179,15 +175,15 @@ func (c Libp2pCommunication) sendMessage(
179175
}
180176

181177
var stream network.Stream
182-
stream, err = c.streamManager.Stream(sessionID, to, c.protocolID)
178+
stream, err = c.streamManager.Stream(to)
183179
if err != nil {
184180
return err
185181
}
186182

187183
err = WriteStream(msg, bufio.NewWriterSize(stream, defaultBufferSize))
188184
if err != nil {
189185
c.logger.Error().Str("To", to.String()).Err(err).Msg("unable to send message")
190-
c.streamManager.ReleaseStreams(sessionID)
186+
c.streamManager.CloseStream(to, stream)
191187
return err
192188
}
193189

comm/p2p/manager.go

Lines changed: 21 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,75 +18,50 @@ import (
1818
//
1919
// Each stream is mapped to a specific session, by sessionID
2020
type StreamManager struct {
21-
streamsBySessionID map[string]map[peer.ID]network.Stream
22-
streamLocker *sync.Mutex
23-
host host.Host
21+
streamsByPeer map[peer.ID]network.Stream
22+
streamLocker *sync.Mutex
23+
host host.Host
24+
protocolID protocol.ID
2425
}
2526

2627
// NewStreamManager creates new StreamManager
27-
func NewStreamManager(host host.Host) *StreamManager {
28+
func NewStreamManager(host host.Host, protocolID protocol.ID) *StreamManager {
2829
return &StreamManager{
29-
streamsBySessionID: make(map[string]map[peer.ID]network.Stream),
30-
streamLocker: &sync.Mutex{},
31-
host: host,
30+
streamsByPeer: make(map[peer.ID]network.Stream),
31+
streamLocker: &sync.Mutex{},
32+
host: host,
33+
protocolID: protocolID,
3234
}
3335
}
3436

35-
// ReleaseStream removes reference on streams mapped to provided sessionID and closes them
36-
func (sm *StreamManager) ReleaseStreams(sessionID string) {
37+
// CloseStream closes stream to the peer
38+
func (sm *StreamManager) CloseStream(peerID peer.ID, stream network.Stream) {
3739
sm.streamLocker.Lock()
38-
defer sm.streamLocker.Unlock()
40+
delete(sm.streamsByPeer, peerID)
41+
sm.streamLocker.Unlock()
3942

40-
streams, ok := sm.streamsBySessionID[sessionID]
41-
if !ok {
43+
err := stream.Close()
44+
if err != nil {
45+
log.Warn().Err(err).Msgf("Failed to close stream")
4246
return
4347
}
44-
45-
for peer, stream := range streams {
46-
err := stream.Close()
47-
if err != nil {
48-
log.Debug().Msgf("Cannot close stream to peer %s, err: %s", peer.String(), err.Error())
49-
_ = stream.Reset()
50-
}
51-
}
52-
53-
delete(sm.streamsBySessionID, sessionID)
5448
}
5549

56-
// AddStream saves and maps provided stream to sessionID
57-
func (sm *StreamManager) AddStream(sessionID string, peerID peer.ID, stream network.Stream) {
50+
// Stream fetches stream by peer
51+
func (sm *StreamManager) Stream(peerID peer.ID) (network.Stream, error) {
5852
sm.streamLocker.Lock()
5953
defer sm.streamLocker.Unlock()
6054

61-
_, ok := sm.streamsBySessionID[sessionID]
55+
stream, ok := sm.streamsByPeer[peerID]
6256
if !ok {
63-
sm.streamsBySessionID[sessionID] = make(map[peer.ID]network.Stream)
64-
}
65-
66-
_, ok = sm.streamsBySessionID[sessionID][peerID]
67-
if ok {
68-
return
69-
}
70-
71-
sm.streamsBySessionID[sessionID][peerID] = stream
72-
}
73-
74-
// Stream fetches stream by peer and session ID
75-
func (sm *StreamManager) Stream(sessionID string, peerID peer.ID, protocolID protocol.ID) (network.Stream, error) {
76-
sm.streamLocker.Lock()
77-
78-
stream, ok := sm.streamsBySessionID[sessionID][peerID]
79-
if !ok {
80-
stream, err := sm.host.NewStream(context.TODO(), peerID, protocolID)
57+
stream, err := sm.host.NewStream(context.TODO(), peerID, sm.protocolID)
8158
if err != nil {
8259
return nil, err
8360
}
8461

85-
sm.streamLocker.Unlock()
86-
sm.AddStream(sessionID, peerID, stream)
62+
sm.streamsByPeer[peerID] = stream
8763
return stream, nil
8864
}
8965

90-
sm.streamLocker.Unlock()
9166
return stream, nil
9267
}

comm/p2p/manager_test.go

Lines changed: 7 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -32,66 +32,22 @@ func (s *StreamManagerTestSuite) SetupTest() {
3232
}
3333

3434
func (s *StreamManagerTestSuite) Test_ManagingSubscriptions_Success() {
35-
streamManager := p2p.NewStreamManager(s.mockHost)
35+
streamManager := p2p.NewStreamManager(s.mockHost, protocol.ID("1"))
3636

3737
mockConn := mock_network.NewMockConn(s.mockController)
3838
stream1 := mock_network.NewMockStream(s.mockController)
3939
stream1.EXPECT().Conn().Return(mockConn).AnyTimes()
40-
stream2 := mock_network.NewMockStream(s.mockController)
41-
stream2.EXPECT().Conn().Return(mockConn).AnyTimes()
42-
stream3 := mock_network.NewMockStream(s.mockController)
43-
stream3.EXPECT().Conn().Return(mockConn).AnyTimes()
40+
s.mockHost.EXPECT().NewStream(gomock.Any(), gomock.Any(), gomock.Any()).Return(stream1, nil)
4441

4542
peerID1, _ := peer.Decode("QmcW3oMdSqoEcjbyd51auqC23vhKX6BqfcZcY2HJ3sKAZR")
46-
peerID2, _ := peer.Decode("QmZHPnN3CKiTAp8VaJqszbf8m7v4mPh15M421KpVdYHF54")
47-
48-
streamManager.AddStream("1", peerID1, stream1)
49-
streamManager.AddStream("1", peerID1, stream1)
50-
streamManager.AddStream("1", peerID2, stream2)
51-
streamManager.AddStream("2", peerID1, stream3)
52-
53-
stream1.EXPECT().Close().Times(1).Return(nil)
54-
stream2.EXPECT().Close().Times(1).Return(nil)
55-
56-
streamManager.ReleaseStreams("1")
57-
}
58-
59-
func (s *StreamManagerTestSuite) Test_FetchStream_NoStream() {
60-
streamManager := p2p.NewStreamManager(s.mockHost)
61-
62-
expectedStream := mock_network.NewMockStream(s.mockController)
63-
s.mockHost.EXPECT().NewStream(gomock.Any(), gomock.Any(), gomock.Any()).Return(expectedStream, nil)
64-
65-
stream, err := streamManager.Stream("1", peer.ID(""), protocol.ID(""))
6643

44+
s1, err := streamManager.Stream(peerID1)
6745
s.Nil(err)
68-
s.Equal(stream, expectedStream)
69-
}
70-
71-
func (s *StreamManagerTestSuite) Test_FetchStream_ValidStream() {
72-
streamManager := p2p.NewStreamManager(s.mockHost)
73-
74-
stream := mock_network.NewMockStream(s.mockController)
75-
peerID1, _ := peer.Decode("QmcW3oMdSqoEcjbyd51auqC23vhKX6BqfcZcY2HJ3sKAZR")
76-
streamManager.AddStream("1", peerID1, stream)
77-
78-
expectedStream, err := streamManager.Stream("1", peerID1, protocol.ID(""))
79-
46+
s2, err := streamManager.Stream(peerID1)
8047
s.Nil(err)
81-
s.Equal(stream, expectedStream)
82-
}
83-
84-
func (s *StreamManagerTestSuite) Test_AddStream_IgnoresExistingPeer() {
85-
streamManager := p2p.NewStreamManager(s.mockHost)
8648

87-
stream1 := mock_network.NewMockStream(s.mockController)
88-
stream2 := mock_network.NewMockStream(s.mockController)
89-
peerID1, _ := peer.Decode("QmcW3oMdSqoEcjbyd51auqC23vhKX6BqfcZcY2HJ3sKAZR")
90-
streamManager.AddStream("1", peerID1, stream1)
91-
streamManager.AddStream("1", peerID1, stream2)
92-
93-
expectedStream, err := streamManager.Stream("1", peerID1, protocol.ID(""))
49+
s.Equal(s1, s2)
9450

95-
s.Nil(err)
96-
s.Equal(stream1, expectedStream)
51+
stream1.EXPECT().Close().Times(1).Return(nil)
52+
streamManager.CloseStream(peerID1, s1)
9753
}

0 commit comments

Comments
 (0)