From 28d68940e68469eed3b9aba8c8f645748e1fba62 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 30 Mar 2026 16:33:02 +0200 Subject: [PATCH 01/10] feat(pkg/p2p): reconnect on disconnected peers --- pkg/p2p/client.go | 128 ++++++++++++++++++++++++++++++++++++++- pkg/signer/aws/signer.go | 2 +- pkg/signer/gcp/signer.go | 2 +- 3 files changed, 129 insertions(+), 3 deletions(-) diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index e8fc6eabe1..1f301d482f 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/ipfs/go-datastore" @@ -28,13 +29,18 @@ import ( rollhash "github.com/evstack/ev-node/pkg/hash" ) -// TODO(tzdybal): refactor to configuration parameters const ( // reAdvertisePeriod defines a period after which P2P client re-attempt advertising namespace in DHT. reAdvertisePeriod = 1 * time.Hour // peerLimit defines limit of number of peers returned during active peer discovery. peerLimit = 60 + + // peerDiscoveryInterval is how often the background loop re-runs peer discovery. + peerDiscoveryInterval = 5 * time.Minute + + // reconnectCooldown is the minimum time between reconnect attempts for the same seed peer. + reconnectCooldown = 5 * time.Second ) // Client is a P2P client, implemented with libp2p. @@ -56,6 +62,12 @@ type Client struct { ps *pubsub.PubSub started bool + seedPeers []peer.AddrInfo + + maintenanceCancel context.CancelFunc + maintenanceWg sync.WaitGroup + reconnectCh chan peer.ID + metrics *Metrics } @@ -170,11 +182,21 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { } c.started = true + + c.reconnectCh = make(chan peer.ID, 32) + c.host.Network().Notify(c.newDisconnectNotifee()) + c.startConnectionMaintenance(ctx) + return nil } // Close gently stops Client. func (c *Client) Close() error { + if c.maintenanceCancel != nil { + c.maintenanceCancel() + } + c.maintenanceWg.Wait() + var err error if c.dht != nil { err = errors.Join(err, c.dht.Close()) @@ -245,6 +267,109 @@ func (c *Client) Peers() []PeerConnection { return res } +// disconnectNotifee is a network.Notifee that triggers seed peer reconnection +// when a configured seed peer disconnects. +type disconnectNotifee struct { + c *Client +} + +func (n disconnectNotifee) Connected(_ network.Network, _ network.Conn) {} +func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {} +func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {} +func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {} +func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {} + +func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) { + p := conn.RemotePeer() + for _, sp := range n.c.seedPeers { + if sp.ID == p { + select { + case n.c.reconnectCh <- p: + default: + } + return + } + } +} + +func (c *Client) newDisconnectNotifee() disconnectNotifee { + return disconnectNotifee{c: c} +} + +// startConnectionMaintenance launches a background goroutine that reconnects +// to seed peers on disconnect (driven by network.Notifee events) and +// periodically refreshes peer discovery. This ensures P2P connectivity +// recovers after transient network failures without requiring a full node restart. +func (c *Client) startConnectionMaintenance(parentCtx context.Context) { + ctx, cancel := context.WithCancel(parentCtx) + c.maintenanceCancel = cancel + + c.maintenanceWg.Go(func() { + + discoveryTicker := time.NewTicker(peerDiscoveryInterval) + defer discoveryTicker.Stop() + + lastReconnect := make(map[peer.ID]time.Time) + + for { + select { + case <-ctx.Done(): + return + case pid := <-c.reconnectCh: + if until := lastReconnect[pid].Add(reconnectCooldown); time.Now().Before(until) { + continue + } + lastReconnect[pid] = time.Now() + + for _, sp := range c.seedPeers { + if sp.ID != pid { + continue + } + if c.isConnected(sp.ID) { + break + } + c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnecting to disconnected seed peer") + if err := c.host.Connect(ctx, sp); err != nil && ctx.Err() == nil { + c.logger.Warn().Str("peer", sp.ID.String()).Err(err).Msg("failed to reconnect to seed peer") + } + break + } + case <-discoveryTicker.C: + c.refreshPeerDiscovery(ctx) + } + } + }) +} + +// refreshPeerDiscovery re-advertises and re-runs peer discovery via DHT. +func (c *Client) refreshPeerDiscovery(ctx context.Context) { + if c.disc == nil { + return + } + + c.logger.Debug().Msg("refreshing peer discovery") + + _ = c.advertise(ctx) + + peerCh, err := c.disc.FindPeers(ctx, c.getNamespace(), cdiscovery.Limit(peerLimit)) + if err != nil { + c.logger.Warn().Err(err).Msg("peer discovery refresh failed") + return + } + + for p := range peerCh { + if p.ID == c.host.ID() || c.isConnected(p.ID) { + continue + } + go c.tryConnect(ctx, p) + } +} + +// isConnected returns true if there is an active connection to the given peer. +func (c *Client) isConnected(id peer.ID) bool { + return c.host.Network().Connectedness(id) == network.Connected +} + func (c *Client) listen() (host.Host, error) { maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress) if err != nil { @@ -256,6 +381,7 @@ func (c *Client) listen() (host.Host, error) { func (c *Client) setupDHT(ctx context.Context) error { peers := c.parseAddrInfoList(c.conf.Peers) + c.seedPeers = peers if len(peers) == 0 { c.logger.Info().Msg("no peers - only listening for connections") } diff --git a/pkg/signer/aws/signer.go b/pkg/signer/aws/signer.go index 4c0d9e1d63..f5a2d5c270 100644 --- a/pkg/signer/aws/signer.go +++ b/pkg/signer/aws/signer.go @@ -159,7 +159,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) { timeout := s.opts.timeout() maxAttempts := maxRetries + 1 - for attempt := 0; attempt < maxAttempts; attempt++ { + for attempt := range maxAttempts { if err := ctx.Err(); err != nil { return nil, err } diff --git a/pkg/signer/gcp/signer.go b/pkg/signer/gcp/signer.go index 70f6667c24..2dc8b27a31 100644 --- a/pkg/signer/gcp/signer.go +++ b/pkg/signer/gcp/signer.go @@ -189,7 +189,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) { timeout := s.opts.timeout() maxAttempts := maxRetries + 1 - for attempt := 0; attempt < maxAttempts; attempt++ { + for attempt := range maxAttempts { if err := ctx.Err(); err != nil { return nil, err } From 606921305516fba6dfe7516d854ebbe01849871a Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 30 Mar 2026 20:55:43 +0200 Subject: [PATCH 02/10] feedback --- pkg/p2p/client.go | 64 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 10 deletions(-) diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index 1f301d482f..592c1e54cd 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -36,11 +36,19 @@ const ( // peerLimit defines limit of number of peers returned during active peer discovery. peerLimit = 60 - // peerDiscoveryInterval is how often the background loop re-runs peer discovery. + // peerDiscoveryInterval is how often the background loop re-advertises and + // re-runs peer discovery via DHT. peerDiscoveryInterval = 5 * time.Minute - // reconnectCooldown is the minimum time between reconnect attempts for the same seed peer. + // reconnectCooldown is the base cooldown between reconnect attempts for the same seed peer. reconnectCooldown = 5 * time.Second + + // maxReconnectCooldown caps the exponential backoff for seed peer reconnection. + maxReconnectCooldown = 5 * time.Minute + + // connectWorkers limits the number of concurrent connection attempts during + // periodic peer discovery refresh. + connectWorkers = 16 ) // Client is a P2P client, implemented with libp2p. @@ -67,6 +75,7 @@ type Client struct { maintenanceCancel context.CancelFunc maintenanceWg sync.WaitGroup reconnectCh chan peer.ID + connectSem chan struct{} metrics *Metrics } @@ -176,6 +185,9 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { return err } + c.reconnectCh = make(chan peer.ID, 32) + c.connectSem = make(chan struct{}, connectWorkers) + c.logger.Debug().Msg("setting up active peer discovery") if err := c.peerDiscovery(ctx); err != nil { return err @@ -183,7 +195,6 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { c.started = true - c.reconnectCh = make(chan peer.ID, 32) c.host.Network().Notify(c.newDisconnectNotifee()) c.startConnectionMaintenance(ctx) @@ -281,6 +292,9 @@ func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) { p := conn.RemotePeer() + if n.c.reconnectCh == nil { + return + } for _, sp := range n.c.seedPeers { if sp.ID == p { select { @@ -309,29 +323,51 @@ func (c *Client) startConnectionMaintenance(parentCtx context.Context) { discoveryTicker := time.NewTicker(peerDiscoveryInterval) defer discoveryTicker.Stop() - lastReconnect := make(map[peer.ID]time.Time) + type reconnectState struct { + lastAttempt time.Time + attempts int + } + states := make(map[peer.ID]*reconnectState) for { select { case <-ctx.Done(): return case pid := <-c.reconnectCh: - if until := lastReconnect[pid].Add(reconnectCooldown); time.Now().Before(until) { + st := states[pid] + if st == nil { + st = &reconnectState{} + states[pid] = st + } + + if time.Since(st.lastAttempt) > maxReconnectCooldown { + st.attempts = 0 + } + + backoff := reconnectCooldown * time.Duration(1< maxReconnectCooldown { + backoff = maxReconnectCooldown + } + if time.Now().Before(st.lastAttempt.Add(backoff)) { continue } - lastReconnect[pid] = time.Now() + st.lastAttempt = time.Now() for _, sp := range c.seedPeers { if sp.ID != pid { continue } if c.isConnected(sp.ID) { + st.attempts = 0 break } + st.attempts++ c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnecting to disconnected seed peer") - if err := c.host.Connect(ctx, sp); err != nil && ctx.Err() == nil { - c.logger.Warn().Str("peer", sp.ID.String()).Err(err).Msg("failed to reconnect to seed peer") - } + go func(info peer.AddrInfo) { + if err := c.host.Connect(ctx, info); err != nil && ctx.Err() == nil { + c.logger.Warn().Str("peer", info.ID.String()).Err(err).Msg("failed to reconnect to seed peer") + } + }(sp) break } case <-discoveryTicker.C: @@ -361,7 +397,15 @@ func (c *Client) refreshPeerDiscovery(ctx context.Context) { if p.ID == c.host.ID() || c.isConnected(p.ID) { continue } - go c.tryConnect(ctx, p) + select { + case c.connectSem <- struct{}{}: + go func(peer peer.AddrInfo) { + defer func() { <-c.connectSem }() + c.tryConnect(ctx, peer) + }(p) + case <-ctx.Done(): + return + } } } From 32dcf35984237ef95f0ec693e38dfa4d1a2371e7 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 31 Mar 2026 18:10:01 +0200 Subject: [PATCH 03/10] cl --- CHANGELOG.md | 4 ++++ apps/evm/go.mod | 8 ++++---- apps/evm/go.sum | 4 ---- apps/testapp/go.mod | 2 +- apps/testapp/go.sum | 2 -- 5 files changed, 9 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 912343ec00..119fcf00e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changes + +- Improve P2P connection in case of transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212) + ## v1.1.0-rc.1 ### Added diff --git a/apps/evm/go.mod b/apps/evm/go.mod index edd9c14311..cdce2d0ee3 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm go 1.25.7 -// replace ( -// github.com/evstack/ev-node => ../../ -// github.com/evstack/ev-node/execution/evm => ../../execution/evm -// ) +replace ( + github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/execution/evm => ../../execution/evm +) require ( github.com/ethereum/go-ethereum v1.17.2 diff --git a/apps/evm/go.sum b/apps/evm/go.sum index 239a59c985..e0249473d4 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -472,12 +472,8 @@ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab h1:rvv6MJ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab/go.mod h1:IuLm4IsPipXKF7CW5Lzf68PIbZ5yl7FFd74l/E0o9A8= github.com/ethereum/go-ethereum v1.17.2 h1:ag6geu0kn8Hv5FLKTpH+Hm2DHD+iuFtuqKxEuwUsDOI= github.com/ethereum/go-ethereum v1.17.2/go.mod h1:KHcRXfGOUfUmKg51IhQ0IowiqZ6PqZf08CMtk0g5K1o= -github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs= -github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk= github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8= github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= -github.com/evstack/ev-node/execution/evm v1.0.0 h1:UTAdCrnPsLoGzSgsBx4Kv76jkXpMmHBIpNv3MxyzWPo= -github.com/evstack/ev-node/execution/evm v1.0.0/go.mod h1:UrqkiepfTMiot6M8jnswgu3VU8SSucZpaMIHIl22/1A= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= diff --git a/apps/testapp/go.mod b/apps/testapp/go.mod index c235c3f07b..aeb4b4bbb2 100644 --- a/apps/testapp/go.mod +++ b/apps/testapp/go.mod @@ -2,7 +2,7 @@ module github.com/evstack/ev-node/apps/testapp go 1.25.7 -// replace github.com/evstack/ev-node => ../../. +replace github.com/evstack/ev-node => ../../. require ( github.com/evstack/ev-node v1.1.0-rc.1 diff --git a/apps/testapp/go.sum b/apps/testapp/go.sum index be901c574f..8670e575e8 100644 --- a/apps/testapp/go.sum +++ b/apps/testapp/go.sum @@ -432,8 +432,6 @@ github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87K github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4= github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= -github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs= -github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk= github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8= github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= From 2ff6ceecdf2b316913c143b8fe7e55ffac53557f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 31 Mar 2026 22:05:23 +0200 Subject: [PATCH 04/10] feedback 1/n --- pkg/p2p/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index 592c1e54cd..e432f6b26b 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -196,7 +196,7 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { c.started = true c.host.Network().Notify(c.newDisconnectNotifee()) - c.startConnectionMaintenance(ctx) + c.startConnectionMaintenance() return nil } @@ -314,8 +314,8 @@ func (c *Client) newDisconnectNotifee() disconnectNotifee { // to seed peers on disconnect (driven by network.Notifee events) and // periodically refreshes peer discovery. This ensures P2P connectivity // recovers after transient network failures without requiring a full node restart. -func (c *Client) startConnectionMaintenance(parentCtx context.Context) { - ctx, cancel := context.WithCancel(parentCtx) +func (c *Client) startConnectionMaintenance() { + ctx, cancel := context.WithCancel(context.Background()) c.maintenanceCancel = cancel c.maintenanceWg.Go(func() { From e276ea2e7f89e7f32a490d49cebfd1410e8485ca Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 1 Apr 2026 08:11:11 +0200 Subject: [PATCH 05/10] fix backoff --- pkg/p2p/client.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index e432f6b26b..b9985378ce 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -349,6 +349,13 @@ func (c *Client) startConnectionMaintenance() { backoff = maxReconnectCooldown } if time.Now().Before(st.lastAttempt.Add(backoff)) { + remaining := time.Until(st.lastAttempt.Add(backoff)) + time.AfterFunc(remaining, func() { + select { + case c.reconnectCh <- pid: + default: + } + }) continue } st.lastAttempt = time.Now() @@ -366,6 +373,10 @@ func (c *Client) startConnectionMaintenance() { go func(info peer.AddrInfo) { if err := c.host.Connect(ctx, info); err != nil && ctx.Err() == nil { c.logger.Warn().Str("peer", info.ID.String()).Err(err).Msg("failed to reconnect to seed peer") + select { + case c.reconnectCh <- info.ID: + default: + } } }(sp) break From 9259f1aa5f0ade34f9ae481b378a1f416954a092 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 1 Apr 2026 13:07:49 +0200 Subject: [PATCH 06/10] simplify --- pkg/p2p/client.go | 106 ++-------------------------------------------- 1 file changed, 3 insertions(+), 103 deletions(-) diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index b9985378ce..9bac84f769 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -40,12 +40,6 @@ const ( // re-runs peer discovery via DHT. peerDiscoveryInterval = 5 * time.Minute - // reconnectCooldown is the base cooldown between reconnect attempts for the same seed peer. - reconnectCooldown = 5 * time.Second - - // maxReconnectCooldown caps the exponential backoff for seed peer reconnection. - maxReconnectCooldown = 5 * time.Minute - // connectWorkers limits the number of concurrent connection attempts during // periodic peer discovery refresh. connectWorkers = 16 @@ -70,11 +64,8 @@ type Client struct { ps *pubsub.PubSub started bool - seedPeers []peer.AddrInfo - maintenanceCancel context.CancelFunc maintenanceWg sync.WaitGroup - reconnectCh chan peer.ID connectSem chan struct{} metrics *Metrics @@ -185,7 +176,6 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { return err } - c.reconnectCh = make(chan peer.ID, 32) c.connectSem = make(chan struct{}, connectWorkers) c.logger.Debug().Msg("setting up active peer discovery") @@ -195,7 +185,6 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { c.started = true - c.host.Network().Notify(c.newDisconnectNotifee()) c.startConnectionMaintenance() return nil @@ -278,109 +267,21 @@ func (c *Client) Peers() []PeerConnection { return res } -// disconnectNotifee is a network.Notifee that triggers seed peer reconnection -// when a configured seed peer disconnects. -type disconnectNotifee struct { - c *Client -} - -func (n disconnectNotifee) Connected(_ network.Network, _ network.Conn) {} -func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {} -func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {} -func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {} -func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {} - -func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) { - p := conn.RemotePeer() - if n.c.reconnectCh == nil { - return - } - for _, sp := range n.c.seedPeers { - if sp.ID == p { - select { - case n.c.reconnectCh <- p: - default: - } - return - } - } -} - -func (c *Client) newDisconnectNotifee() disconnectNotifee { - return disconnectNotifee{c: c} -} - -// startConnectionMaintenance launches a background goroutine that reconnects -// to seed peers on disconnect (driven by network.Notifee events) and -// periodically refreshes peer discovery. This ensures P2P connectivity -// recovers after transient network failures without requiring a full node restart. +// startConnectionMaintenance launches a background goroutine that periodically +// refreshes peer discovery via DHT. This ensures P2P connectivity recovers after +// transient network failures and discovers new peers without requiring a full node restart. func (c *Client) startConnectionMaintenance() { ctx, cancel := context.WithCancel(context.Background()) c.maintenanceCancel = cancel c.maintenanceWg.Go(func() { - discoveryTicker := time.NewTicker(peerDiscoveryInterval) defer discoveryTicker.Stop() - type reconnectState struct { - lastAttempt time.Time - attempts int - } - states := make(map[peer.ID]*reconnectState) - for { select { case <-ctx.Done(): return - case pid := <-c.reconnectCh: - st := states[pid] - if st == nil { - st = &reconnectState{} - states[pid] = st - } - - if time.Since(st.lastAttempt) > maxReconnectCooldown { - st.attempts = 0 - } - - backoff := reconnectCooldown * time.Duration(1< maxReconnectCooldown { - backoff = maxReconnectCooldown - } - if time.Now().Before(st.lastAttempt.Add(backoff)) { - remaining := time.Until(st.lastAttempt.Add(backoff)) - time.AfterFunc(remaining, func() { - select { - case c.reconnectCh <- pid: - default: - } - }) - continue - } - st.lastAttempt = time.Now() - - for _, sp := range c.seedPeers { - if sp.ID != pid { - continue - } - if c.isConnected(sp.ID) { - st.attempts = 0 - break - } - st.attempts++ - c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnecting to disconnected seed peer") - go func(info peer.AddrInfo) { - if err := c.host.Connect(ctx, info); err != nil && ctx.Err() == nil { - c.logger.Warn().Str("peer", info.ID.String()).Err(err).Msg("failed to reconnect to seed peer") - select { - case c.reconnectCh <- info.ID: - default: - } - } - }(sp) - break - } case <-discoveryTicker.C: c.refreshPeerDiscovery(ctx) } @@ -436,7 +337,6 @@ func (c *Client) listen() (host.Host, error) { func (c *Client) setupDHT(ctx context.Context) error { peers := c.parseAddrInfoList(c.conf.Peers) - c.seedPeers = peers if len(peers) == 0 { c.logger.Info().Msg("no peers - only listening for connections") } From cd87d238d3bad7fb280198f8fd3a465d4e180137 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 1 Apr 2026 13:22:19 +0200 Subject: [PATCH 07/10] add connection logs --- pkg/p2p/client.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index 9bac84f769..de49cba960 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -64,6 +64,8 @@ type Client struct { ps *pubsub.PubSub started bool + seedPeers []peer.AddrInfo + maintenanceCancel context.CancelFunc maintenanceWg sync.WaitGroup connectSem chan struct{} @@ -185,6 +187,7 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { c.started = true + c.host.Network().Notify(c.newDisconnectNotifee()) c.startConnectionMaintenance() return nil @@ -267,6 +270,38 @@ func (c *Client) Peers() []PeerConnection { return res } +type disconnectNotifee struct { + c *Client +} + +func (n disconnectNotifee) Connected(_ network.Network, conn network.Conn) { + p := conn.RemotePeer() + for _, sp := range n.c.seedPeers { + if sp.ID == p { + n.c.logger.Info().Str("peer", p.String()).Msg("connected to seed peer") + return + } + } +} +func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {} +func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {} +func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {} +func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {} + +func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) { + p := conn.RemotePeer() + for _, sp := range n.c.seedPeers { + if sp.ID == p { + n.c.logger.Info().Str("peer", p.String()).Msg("disconnected from seed peer") + return + } + } +} + +func (c *Client) newDisconnectNotifee() disconnectNotifee { + return disconnectNotifee{c: c} +} + // startConnectionMaintenance launches a background goroutine that periodically // refreshes peer discovery via DHT. This ensures P2P connectivity recovers after // transient network failures and discovers new peers without requiring a full node restart. @@ -337,6 +372,7 @@ func (c *Client) listen() (host.Host, error) { func (c *Client) setupDHT(ctx context.Context) error { peers := c.parseAddrInfoList(c.conf.Peers) + c.seedPeers = peers if len(peers) == 0 { c.logger.Info().Msg("no peers - only listening for connections") } From 1587cdf82d56c7bf4f388bae8bb4b5f8636ecf74 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 1 Apr 2026 14:12:58 +0200 Subject: [PATCH 08/10] more simplification --- CHANGELOG.md | 2 +- pkg/p2p/client.go | 76 +---------------------------------------------- 2 files changed, 2 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 119fcf00e6..994a49c200 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes -- Improve P2P connection in case of transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212) +- Add logging on P2P transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212) ## v1.1.0-rc.1 diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index de49cba960..85ece0ece0 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "strings" - "sync" "time" "github.com/ipfs/go-datastore" @@ -29,20 +28,13 @@ import ( rollhash "github.com/evstack/ev-node/pkg/hash" ) +// TODO(tzdybal): refactor to configuration parameters const ( // reAdvertisePeriod defines a period after which P2P client re-attempt advertising namespace in DHT. reAdvertisePeriod = 1 * time.Hour // peerLimit defines limit of number of peers returned during active peer discovery. peerLimit = 60 - - // peerDiscoveryInterval is how often the background loop re-advertises and - // re-runs peer discovery via DHT. - peerDiscoveryInterval = 5 * time.Minute - - // connectWorkers limits the number of concurrent connection attempts during - // periodic peer discovery refresh. - connectWorkers = 16 ) // Client is a P2P client, implemented with libp2p. @@ -66,10 +58,6 @@ type Client struct { seedPeers []peer.AddrInfo - maintenanceCancel context.CancelFunc - maintenanceWg sync.WaitGroup - connectSem chan struct{} - metrics *Metrics } @@ -178,8 +166,6 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { return err } - c.connectSem = make(chan struct{}, connectWorkers) - c.logger.Debug().Msg("setting up active peer discovery") if err := c.peerDiscovery(ctx); err != nil { return err @@ -188,18 +174,12 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { c.started = true c.host.Network().Notify(c.newDisconnectNotifee()) - c.startConnectionMaintenance() return nil } // Close gently stops Client. func (c *Client) Close() error { - if c.maintenanceCancel != nil { - c.maintenanceCancel() - } - c.maintenanceWg.Wait() - var err error if c.dht != nil { err = errors.Join(err, c.dht.Close()) @@ -302,60 +282,6 @@ func (c *Client) newDisconnectNotifee() disconnectNotifee { return disconnectNotifee{c: c} } -// startConnectionMaintenance launches a background goroutine that periodically -// refreshes peer discovery via DHT. This ensures P2P connectivity recovers after -// transient network failures and discovers new peers without requiring a full node restart. -func (c *Client) startConnectionMaintenance() { - ctx, cancel := context.WithCancel(context.Background()) - c.maintenanceCancel = cancel - - c.maintenanceWg.Go(func() { - discoveryTicker := time.NewTicker(peerDiscoveryInterval) - defer discoveryTicker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-discoveryTicker.C: - c.refreshPeerDiscovery(ctx) - } - } - }) -} - -// refreshPeerDiscovery re-advertises and re-runs peer discovery via DHT. -func (c *Client) refreshPeerDiscovery(ctx context.Context) { - if c.disc == nil { - return - } - - c.logger.Debug().Msg("refreshing peer discovery") - - _ = c.advertise(ctx) - - peerCh, err := c.disc.FindPeers(ctx, c.getNamespace(), cdiscovery.Limit(peerLimit)) - if err != nil { - c.logger.Warn().Err(err).Msg("peer discovery refresh failed") - return - } - - for p := range peerCh { - if p.ID == c.host.ID() || c.isConnected(p.ID) { - continue - } - select { - case c.connectSem <- struct{}{}: - go func(peer peer.AddrInfo) { - defer func() { <-c.connectSem }() - c.tryConnect(ctx, peer) - }(p) - case <-ctx.Done(): - return - } - } -} - // isConnected returns true if there is an active connection to the given peer. func (c *Client) isConnected(id peer.ID) bool { return c.host.Network().Connectedness(id) == network.Connected From f63ec638d953b706e4778bfc3dfafaf6ef47af43 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 1 Apr 2026 14:49:34 +0200 Subject: [PATCH 09/10] reconnect attempt --- pkg/p2p/client.go | 49 ++++++++++++++++++++- pkg/p2p/client_test.go | 98 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 1 deletion(-) diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index 85ece0ece0..0093a8ad60 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -35,6 +35,12 @@ const ( // peerLimit defines limit of number of peers returned during active peer discovery. peerLimit = 60 + + // seedReconnectBackoff is the initial backoff when reconnecting to a disconnected seed peer. + seedReconnectBackoff = 1 * time.Second + + // seedReconnectMaxBackoff is the maximum backoff for seed peer reconnection attempts. + seedReconnectMaxBackoff = 30 * time.Second ) // Client is a P2P client, implemented with libp2p. @@ -56,6 +62,9 @@ type Client struct { ps *pubsub.PubSub started bool + ctx context.Context + cancel context.CancelFunc + seedPeers []peer.AddrInfo metrics *Metrics @@ -142,6 +151,7 @@ func (c *Client) Start(ctx context.Context) error { func (c *Client) startWithHost(ctx context.Context, h host.Host) error { c.host = h + c.ctx, c.cancel = context.WithCancel(ctx) for _, a := range c.host.Addrs() { c.logger.Info().Str("address", fmt.Sprintf("%s/p2p/%s", a, c.host.ID())).Msg("listening on address") } @@ -180,6 +190,9 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { // Close gently stops Client. func (c *Client) Close() error { + if c.cancel != nil { + c.cancel() + } var err error if c.dht != nil { err = errors.Join(err, c.dht.Close()) @@ -272,9 +285,43 @@ func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) { p := conn.RemotePeer() for _, sp := range n.c.seedPeers { if sp.ID == p { - n.c.logger.Info().Str("peer", p.String()).Msg("disconnected from seed peer") + n.c.logger.Warn().Str("peer", p.String()).Msg("disconnected from seed peer, scheduling reconnect") + go n.c.reconnectSeedPeer(sp) + return + } + } +} + +func (c *Client) reconnectSeedPeer(sp peer.AddrInfo) { + backoff := seedReconnectBackoff + for { + if c.ctx.Err() != nil { + return + } + if c.isConnected(sp.ID) { return } + + err := c.host.Connect(c.ctx, sp) + if err == nil { + c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnected to seed peer") + return + } + if c.ctx.Err() != nil { + return + } + + c.logger.Debug().Str("peer", sp.ID.String()).Dur("backoff", backoff).Err(err).Msg("failed to reconnect to seed peer, retrying") + select { + case <-c.ctx.Done(): + return + case <-time.After(backoff): + } + + backoff *= 2 + if backoff > seedReconnectMaxBackoff { + backoff = seedReconnectMaxBackoff + } } } diff --git a/pkg/p2p/client_test.go b/pkg/p2p/client_test.go index e3ac6f1fab..568b152ea4 100644 --- a/pkg/p2p/client_test.go +++ b/pkg/p2p/client_test.go @@ -278,6 +278,104 @@ func waitForCondition(timeout time.Duration, conditionFunc func() bool) error { } } +func TestSeedPeerReconnect(t *testing.T) { + require := require.New(t) + assert := assert.New(t) + logger := zerolog.Nop() + + mn := mocknet.New() + defer mn.Close() + + seedKey, err := key.GenerateNodeKey() + require.NoError(err) + seedAddr, err := getAddr(seedKey.PrivKey) + require.NoError(err) + seedHost, err := mn.AddPeer(seedKey.PrivKey, seedAddr) + require.NoError(err) + + clientKey, err := key.GenerateNodeKey() + require.NoError(err) + clientAddr, err := getAddr(clientKey.PrivKey) + require.NoError(err) + clientHost, err := mn.AddPeer(clientKey.PrivKey, clientAddr) + require.NoError(err) + + seedAddrStr := seedHost.Addrs()[0].String() + "/p2p/" + seedHost.ID().String() + conf := config.P2PConfig{Peers: seedAddrStr} + + client, err := NewClient(conf, clientKey.PrivKey, dssync.MutexWrap(datastore.NewMapDatastore()), "test-chain", logger, NopMetrics()) + require.NoError(err) + require.NotNil(client) + + err = mn.LinkAll() + require.NoError(err) + err = mn.ConnectAllButSelf() + require.NoError(err) + + ctx := t.Context() + err = client.startWithHost(ctx, clientHost) + require.NoError(err) + defer client.Close() + + err = waitForCondition(2*time.Second, func() bool { + return client.isConnected(seedHost.ID()) + }) + require.NoError(err, "client should connect to seed peer on start") + + conns := client.host.Network().ConnsToPeer(seedHost.ID()) + for _, conn := range conns { + conn.Close() + } + client.host.Network().ClosePeer(seedHost.ID()) + + assert.False(client.isConnected(seedHost.ID()), "seed peer should be disconnected") + + err = waitForCondition(5*time.Second, func() bool { + return client.isConnected(seedHost.ID()) + }) + require.NoError(err, "client should reconnect to seed peer after disconnect") +} + +func TestSeedPeerReconnectStopsOnClose(t *testing.T) { + require := require.New(t) + + mn := mocknet.New() + defer mn.Close() + + seedKey, err := key.GenerateNodeKey() + require.NoError(err) + seedAddr, err := getAddr(seedKey.PrivKey) + require.NoError(err) + seedHost, err := mn.AddPeer(seedKey.PrivKey, seedAddr) + require.NoError(err) + + clientKey, err := key.GenerateNodeKey() + require.NoError(err) + clientAddr, err := getAddr(clientKey.PrivKey) + require.NoError(err) + clientHost, err := mn.AddPeer(clientKey.PrivKey, clientAddr) + require.NoError(err) + + seedAddrStr := seedHost.Addrs()[0].String() + "/p2p/" + seedHost.ID().String() + conf := config.P2PConfig{Peers: seedAddrStr} + + client, err := NewClient(conf, clientKey.PrivKey, dssync.MutexWrap(datastore.NewMapDatastore()), "test-chain", zerolog.Nop(), NopMetrics()) + require.NoError(err) + + err = mn.LinkAll() + require.NoError(err) + err = mn.ConnectAllButSelf() + require.NoError(err) + + ctx := t.Context() + err = client.startWithHost(ctx, clientHost) + require.NoError(err) + + require.NoError(client.Close()) + + require.Error(client.ctx.Err(), "client context should be cancelled after Close") +} + func TestClientInfoMethods(t *testing.T) { require := require.New(t) assert := assert.New(t) From 7f4e251c9a797f4e811d3747c2a168e62e462173 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 1 Apr 2026 15:14:55 +0200 Subject: [PATCH 10/10] accurate log --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 994a49c200..f6f021fbd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes -- Add logging on P2P transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212) +- Improve P2P transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212) ## v1.1.0-rc.1