Skip to content

Commit cc96111

Browse files
authored
rotation of outbound peers (#3037)
To make the p2p network uniformly random, nodes need to be able to change their peers periodically (otherwise network would be centralized at bootstrap peers). This PR implements an algorithm based on stable hashing, which makes nodes assign random priority to every node ID, and connect to peers with higher priority. This pr also separates inbound and outbound connection pools to simplify logic. In particular it is possible that there will be 2 concurrent connections between 2 peers (inbound + outbound). Avoiding duplicate connections is best effort - nodes try not to dial peers that they are connected to, but in case 2 peers dial each other at the same time they let those 2 connections be. Basic requirements: * peermanager maintains a pex table: addresses of peers of our peers. This bounds the network view size (even though it is an unauthenticated network) that our node needs to maintain and provides enough exposure to select new peers. Peers are periodically reporting their connections, therefore peermanager keeps only fresh addresses * on startup a spike of dials is expected - node will try to connect to peers that it was connected to before restart * during stable operation node will dial peers at a low rate like 1/s or 0.1/s, and the node should be selected from a fresh set of addresses - i.e. we cannot snapshot a list of currently available addresses and try to dial them all (it will take hours) * despite low dial rate, node should attempt to round robin over the ever changing peer candidates set - i.e. it should not get stuck dialing the same bad address over and over * in the stable-hash-based approach, each peer ID obtains a priority for dialing - it should be taken into account * implementation should support replacing low priority peers with higher priority peers (to support convergence to a random graph). The churn of the connections should be low though, so that connection efficiency is not affected. My initial guesstimate would be that we should allow replacing a connection every ~1min. * We need to support a pex table with ~100 * 100 = 10k addresses total (100 connections per peer is a safe estimate with the current implementation). Whether we can affort just rank all the addresses on every dial attempt is a borderline IMO. * the addresses inserted to pex table should be made available for dialing ASAP, without any active polling, if possible.
1 parent 79b7899 commit cc96111

29 files changed

Lines changed: 1547 additions & 1441 deletions

sei-tendermint/config/config.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ type P2PConfig struct {
652652
// MaxOutboundConnections limits the number of outbound connections to regular (non-persistent) peers.
653653
// It should be significantly lower than MaxConnections, unless
654654
// the node is supposed to have a small number of connections altogether.
655-
MaxOutboundConnections uint
655+
MaxOutboundConnections *uint `mapstructure:"max-outbound-connections"`
656656

657657
// MaxIncomingConnectionAttempts rate limits the number of incoming connection
658658
// attempts per IP address.
@@ -703,14 +703,10 @@ type P2PConfig struct {
703703
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer
704704
func DefaultP2PConfig() *P2PConfig {
705705
return &P2PConfig{
706-
ListenAddress: "tcp://127.0.0.1:26656",
707-
ExternalAddress: "",
708-
UPNP: false,
709-
MaxConnections: 100,
710-
// TODO(gprusak): decrease to 10, once PEX is improved to:
711-
// * exchange both inbound and outbound connections information
712-
// * exchange information on handshake as well.
713-
MaxOutboundConnections: 100,
706+
ListenAddress: "tcp://127.0.0.1:26656",
707+
ExternalAddress: "",
708+
UPNP: false,
709+
MaxConnections: 100,
714710
MaxIncomingConnectionAttempts: 100,
715711
FlushThrottleTimeout: 100 * time.Millisecond,
716712
MaxPacketMsgPayloadSize: 1000000,

sei-tendermint/internal/p2p/address.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,6 @@ type NodeAddress struct {
3535
Port uint16
3636
}
3737

38-
var cgnat = netip.MustParsePrefix("100.64.0.0/10")
39-
40-
// IsPublic checks if the address is routable from the public internet.
41-
// It is good enough to exclude internal addresses of cloud providers.
42-
// As a simplification, it treats non-IP Hostnames (DNS addresses) as public.
43-
// TODO(gprusak): DNS addresses should be eliminated from PEX entirely - all
44-
// addresses should be resolved locally and only then advertised to peers.
45-
func (a NodeAddress) IsPublic() bool {
46-
ip, err := netip.ParseAddr(a.Hostname)
47-
if err != nil {
48-
return true
49-
}
50-
return ip.IsGlobalUnicast() && !ip.IsPrivate() && !cgnat.Contains(ip.Unmap())
51-
}
52-
5338
// ParseNodeAddress parses a node address URL into a NodeAddress, normalizing
5439
// and validating it.
5540
func ParseNodeAddress(urlString string) (NodeAddress, error) {

sei-tendermint/internal/p2p/address_test.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"testing"
66

77
"github.com/sei-protocol/sei-chain/sei-tendermint/crypto/ed25519"
8-
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
98
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require"
109
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/tcp"
1110
"github.com/sei-protocol/sei-chain/sei-tendermint/types"
@@ -278,21 +277,6 @@ func TestNodeAddress_String(t *testing.T) {
278277
}
279278
}
280279

281-
func TestNodeAddress_IsPublic(t *testing.T) {
282-
rng := utils.TestRng()
283-
id := makeNodeID(rng)
284-
testcases := map[string]bool{
285-
"192.168.1.10": false,
286-
"93.184.216.34": true,
287-
"example.com": true,
288-
"100.64.0.1": false,
289-
}
290-
for hostname, isPublic := range testcases {
291-
addr := NodeAddress{NodeID: id, Hostname: hostname, Port: defaultPort}
292-
require.Equal(t, isPublic, addr.IsPublic())
293-
}
294-
}
295-
296280
func TestNodeAddress_Validate(t *testing.T) {
297281
id := types.NodeID("00112233445566778899aabbccddeeff00112233")
298282
testcases := []struct {

sei-tendermint/internal/p2p/channel.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,12 @@ func (ch *Channel[T]) send(msg T, queues ...*Queue[sendMsg]) {
7979
}
8080

8181
func (ch *Channel[T]) Send(msg T, to types.NodeID) {
82-
c, ok := ch.router.peerManager.Conns().Get(to)
82+
c, ok := GetAny(ch.router.peerManager.Conns(), to)
8383
if !ok {
8484
logger.Debug("dropping message for unconnected peer", "peer", to, "channel", ch.desc.ID)
8585
return
8686
}
87-
if _, contains := c.peerChannels[ch.desc.ID]; !contains {
87+
if _, contains := c.Channels[ch.desc.ID]; !contains {
8888
// reactor tried to send a message across a channel that the
8989
// peer doesn't have available. This is a known issue due to
9090
// how peer subscriptions work:
@@ -98,7 +98,7 @@ func (ch *Channel[T]) Send(msg T, to types.NodeID) {
9898
func (ch *Channel[T]) Broadcast(msg T) {
9999
var queues []*Queue[sendMsg]
100100
for _, c := range ch.router.peerManager.Conns().All() {
101-
if _, ok := c.peerChannels[ch.desc.ID]; ok {
101+
if _, ok := c.Channels[ch.desc.ID]; ok {
102102
queues = append(queues, c.sendQueue)
103103
}
104104
}

0 commit comments

Comments
 (0)