-
Notifications
You must be signed in to change notification settings - Fork 879
Expand file tree
/
Copy pathchannel.go
More file actions
122 lines (104 loc) · 3.87 KB
/
channel.go
File metadata and controls
122 lines (104 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package p2p
import (
"context"
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn"
"github.com/sei-protocol/sei-chain/sei-tendermint/types"
)
type ChannelID = conn.ChannelID
type ChannelDescriptor[T proto.Message] = conn.ChannelDescriptorT[T]
// sendMsg is a message to be sent to a peer.
type sendMsg struct {
Message proto.Message // message payload
ChannelID ChannelID // channel id
}
// RecvMsg is a message received from a peer.
type RecvMsg[T proto.Message] struct {
Message T // message payload
From types.NodeID // sender
}
// PeerError is a peer error reported via Channel.Error.
//
// FIXME: This currently just disconnects the peer, which is too simplistic.
// For example, some errors should be logged, some should cause disconnects,
// and some should ban the peer.
//
// FIXME: This should probably be replaced by a more general PeerBehavior
// concept that can mark good and bad behavior and contributes to peer scoring.
// It should possibly also allow reactors to request explicit actions, e.g.
// disconnection or banning, in addition to doing this based on aggregates.
type PeerError struct {
NodeID types.NodeID
Err error
Fatal bool
}
func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) }
func (pe PeerError) Unwrap() error { return pe.Err }
// channel is a bidirectional channel to exchange Protobuf messages with peers.
type channel struct {
desc conn.ChannelDescriptor
recvQueue *Queue[RecvMsg[proto.Message]] // inbound messages (peers to reactors)
}
type Channel[T proto.Message] struct {
*channel
router *Router
}
// NewChannel creates a new channel. It is primarily for internal and test
// use, reactors should use Router.OpenChannel().
func newChannel(desc conn.ChannelDescriptor) *channel {
return &channel{
desc: desc,
// TODO(gprusak): get rid of this random cap*cap value once we understand
// what the sizes per channel really should be.
recvQueue: NewQueue[RecvMsg[proto.Message]](desc.RecvBufferCapacity * desc.RecvBufferCapacity),
}
}
func (ch *Channel[T]) send(msg T, queues ...*Queue[sendMsg]) {
ch.router.metrics.ChannelMsgs.With("ch_id", fmt.Sprint(ch.desc.ID), "direction", "out").Add(1.)
m := sendMsg{msg, ch.desc.ID}
size := proto.Size(msg)
for _, q := range queues {
if pruned, ok := q.Send(m, size, ch.desc.Priority).Get(); ok {
ch.router.metrics.QueueDroppedMsgs.With("ch_id", fmt.Sprint(pruned.ChannelID), "direction", "out").Add(float64(1))
}
}
}
func (ch *Channel[T]) Send(msg T, to types.NodeID) {
c, ok := GetAny(ch.router.peerManager.Conns(), to)
if !ok {
logger.Debug("dropping message for unconnected peer", "peer", to, "channel", ch.desc.ID)
return
}
if _, contains := c.Channels[ch.desc.ID]; !contains {
// reactor tried to send a message across a channel that the
// peer doesn't have available. This is a known issue due to
// how peer subscriptions work:
// https://github.com/tendermint/tendermint/issues/6598
return
}
ch.send(msg, c.sendQueue)
}
// Broadcasts msg to all peers on the channel.
func (ch *Channel[T]) Broadcast(msg T) {
var queues []*Queue[sendMsg]
for _, c := range ch.router.peerManager.Conns().All() {
if _, ok := c.Channels[ch.desc.ID]; ok {
queues = append(queues, c.sendQueue)
}
}
ch.send(msg, queues...)
}
func (ch *Channel[T]) String() string {
return fmt.Sprintf("p2p.Channel<%d:%s>", ch.desc.ID, ch.desc.Name)
}
func (ch *Channel[T]) ReceiveLen() int { return ch.recvQueue.Len() }
// Recv Receives the next message from the channel.
func (ch *Channel[T]) Recv(ctx context.Context) (RecvMsg[T], error) {
recv, err := ch.recvQueue.Recv(ctx)
if err != nil {
return RecvMsg[T]{}, err
}
ch.router.metrics.ChannelMsgs.With("ch_id", fmt.Sprint(ch.desc.ID), "direction", "in").Add(1.)
return RecvMsg[T]{Message: recv.Message.(T), From: recv.From}, nil
}