Skip to content
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changes

- Improve P2P connection in case of transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212)

## v1.1.0-rc.1

### Added
Expand Down
8 changes: 4 additions & 4 deletions apps/evm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm

go 1.25.7

// replace (
// github.com/evstack/ev-node => ../../
// github.com/evstack/ev-node/execution/evm => ../../execution/evm
// )
replace (
github.com/evstack/ev-node => ../../
github.com/evstack/ev-node/execution/evm => ../../execution/evm
)
Comment thread
julienrbrt marked this conversation as resolved.

require (
github.com/ethereum/go-ethereum v1.17.2
Expand Down
4 changes: 0 additions & 4 deletions apps/evm/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,8 @@ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab h1:rvv6MJ
github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab/go.mod h1:IuLm4IsPipXKF7CW5Lzf68PIbZ5yl7FFd74l/E0o9A8=
github.com/ethereum/go-ethereum v1.17.2 h1:ag6geu0kn8Hv5FLKTpH+Hm2DHD+iuFtuqKxEuwUsDOI=
github.com/ethereum/go-ethereum v1.17.2/go.mod h1:KHcRXfGOUfUmKg51IhQ0IowiqZ6PqZf08CMtk0g5K1o=
github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs=
github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk=
github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8=
github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
github.com/evstack/ev-node/execution/evm v1.0.0 h1:UTAdCrnPsLoGzSgsBx4Kv76jkXpMmHBIpNv3MxyzWPo=
github.com/evstack/ev-node/execution/evm v1.0.0/go.mod h1:UrqkiepfTMiot6M8jnswgu3VU8SSucZpaMIHIl22/1A=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
Expand Down
2 changes: 1 addition & 1 deletion apps/testapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/evstack/ev-node/apps/testapp

go 1.25.7

// replace github.com/evstack/ev-node => ../../.
replace github.com/evstack/ev-node => ../../.

require (
github.com/evstack/ev-node v1.1.0-rc.1
Expand Down
2 changes: 0 additions & 2 deletions apps/testapp/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87K
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4=
github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA=
github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs=
github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk=
github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8=
github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
Expand Down
172 changes: 171 additions & 1 deletion pkg/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/ipfs/go-datastore"
Expand All @@ -28,13 +29,26 @@ import (
rollhash "github.com/evstack/ev-node/pkg/hash"
)

// TODO(tzdybal): refactor to configuration parameters
const (
// reAdvertisePeriod defines a period after which P2P client re-attempt advertising namespace in DHT.
reAdvertisePeriod = 1 * time.Hour

// peerLimit defines limit of number of peers returned during active peer discovery.
peerLimit = 60

// peerDiscoveryInterval is how often the background loop re-advertises and
// re-runs peer discovery via DHT.
peerDiscoveryInterval = 5 * time.Minute

// reconnectCooldown is the base cooldown between reconnect attempts for the same seed peer.
reconnectCooldown = 5 * time.Second

// maxReconnectCooldown caps the exponential backoff for seed peer reconnection.
maxReconnectCooldown = 5 * time.Minute

// connectWorkers limits the number of concurrent connection attempts during
// periodic peer discovery refresh.
connectWorkers = 16
)

// Client is a P2P client, implemented with libp2p.
Expand All @@ -56,6 +70,13 @@ type Client struct {
ps *pubsub.PubSub
started bool

seedPeers []peer.AddrInfo

maintenanceCancel context.CancelFunc
maintenanceWg sync.WaitGroup
reconnectCh chan peer.ID
connectSem chan struct{}

metrics *Metrics
}

Expand Down Expand Up @@ -164,17 +185,29 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error {
return err
}

c.reconnectCh = make(chan peer.ID, 32)
c.connectSem = make(chan struct{}, connectWorkers)

c.logger.Debug().Msg("setting up active peer discovery")
if err := c.peerDiscovery(ctx); err != nil {
return err
}

c.started = true

c.host.Network().Notify(c.newDisconnectNotifee())
c.startConnectionMaintenance(ctx)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

return nil
}

// Close gently stops Client.
func (c *Client) Close() error {
if c.maintenanceCancel != nil {
c.maintenanceCancel()
}
c.maintenanceWg.Wait()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Track and time-box the maintenance dials.

Close() only waits for the dispatcher goroutine. Lines 373-381 and 413-416 spawn dial workers outside maintenanceWg, while Line 385 runs discovery inline on the only maintenance worker. A slow discovery/dial can block reconnect handling, pin connectSem, or still be running when teardown closes the host and DHT. Give the refresh and each dial a short child timeout, and track the spawned workers in the same wait group/errgroup.

As per coding guidelines "Use context.Context for cancellation in Go" and "Be mindful of goroutine leaks in Go code".

Also applies to: 373-381, 384-385, 401-419

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/p2p/client.go` around lines 206 - 209, Close() currently cancels
maintenanceCancel and waits only for maintenanceWg but discovery and dial
goroutines (spawned by the maintenance worker) are not time-boxed or tracked, so
add a child context with a short timeout for the periodic discovery/refresh and
for each dial/Connect call (derive via maintenanceCtx :=
context.WithTimeout(maintenanceCancelCtx, shortDuration)), ensure every spawned
dial worker is registered with the same maintenanceWg (or use an errgroup bound
to maintenanceCtx) so they are waited on, pass the child ctx into the
dial/Connect/DHT calls so they can be cancelled, and guarantee connectSem is
released in a defer inside each worker even on timeout/cancellation; also ensure
maintenanceCancel cancels the parent ctx so all child timeouts abort during
Close().


var err error
if c.dht != nil {
err = errors.Join(err, c.dht.Close())
Expand Down Expand Up @@ -245,6 +278,142 @@ func (c *Client) Peers() []PeerConnection {
return res
}

// disconnectNotifee is a network.Notifee that triggers seed peer reconnection
// when a configured seed peer disconnects.
type disconnectNotifee struct {
c *Client
}

func (n disconnectNotifee) Connected(_ network.Network, _ network.Conn) {}
func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {}
func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {}
func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {}
func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {}

func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) {
p := conn.RemotePeer()
if n.c.reconnectCh == nil {
return
}
for _, sp := range n.c.seedPeers {
if sp.ID == p {
select {
case n.c.reconnectCh <- p:
default:
}
return
}
}
}

func (c *Client) newDisconnectNotifee() disconnectNotifee {
return disconnectNotifee{c: c}
}

// startConnectionMaintenance launches a background goroutine that reconnects
// to seed peers on disconnect (driven by network.Notifee events) and
// periodically refreshes peer discovery. This ensures P2P connectivity
// recovers after transient network failures without requiring a full node restart.
func (c *Client) startConnectionMaintenance(parentCtx context.Context) {
ctx, cancel := context.WithCancel(parentCtx)
c.maintenanceCancel = cancel

c.maintenanceWg.Go(func() {

discoveryTicker := time.NewTicker(peerDiscoveryInterval)
defer discoveryTicker.Stop()

type reconnectState struct {
lastAttempt time.Time
attempts int
}
states := make(map[peer.ID]*reconnectState)

for {
select {
case <-ctx.Done():
return
case pid := <-c.reconnectCh:
st := states[pid]
if st == nil {
st = &reconnectState{}
states[pid] = st
}

if time.Since(st.lastAttempt) > maxReconnectCooldown {
st.attempts = 0
}

backoff := reconnectCooldown * time.Duration(1<<min(st.attempts, 6))
if backoff > maxReconnectCooldown {
backoff = maxReconnectCooldown
}
if time.Now().Before(st.lastAttempt.Add(backoff)) {
continue
}
st.lastAttempt = time.Now()

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
for _, sp := range c.seedPeers {
if sp.ID != pid {
continue
}
if c.isConnected(sp.ID) {
st.attempts = 0
break
}
st.attempts++
c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnecting to disconnected seed peer")
go func(info peer.AddrInfo) {
if err := c.host.Connect(ctx, info); err != nil && ctx.Err() == nil {
c.logger.Warn().Str("peer", info.ID.String()).Err(err).Msg("failed to reconnect to seed peer")
}
}(sp)
break
}
case <-discoveryTicker.C:
c.refreshPeerDiscovery(ctx)
}
}
})
}

// refreshPeerDiscovery re-advertises and re-runs peer discovery via DHT.
func (c *Client) refreshPeerDiscovery(ctx context.Context) {
if c.disc == nil {
return
}

c.logger.Debug().Msg("refreshing peer discovery")

_ = c.advertise(ctx)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
peerCh, err := c.disc.FindPeers(ctx, c.getNamespace(), cdiscovery.Limit(peerLimit))
if err != nil {
c.logger.Warn().Err(err).Msg("peer discovery refresh failed")
return
}

for p := range peerCh {
if p.ID == c.host.ID() || c.isConnected(p.ID) {
continue
}
select {
case c.connectSem <- struct{}{}:
go func(peer peer.AddrInfo) {
defer func() { <-c.connectSem }()
c.tryConnect(ctx, peer)
}(p)
case <-ctx.Done():
return
}
}
}

// isConnected returns true if there is an active connection to the given peer.
func (c *Client) isConnected(id peer.ID) bool {
return c.host.Network().Connectedness(id) == network.Connected
}

func (c *Client) listen() (host.Host, error) {
maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress)
if err != nil {
Expand All @@ -256,6 +425,7 @@ func (c *Client) listen() (host.Host, error) {

func (c *Client) setupDHT(ctx context.Context) error {
peers := c.parseAddrInfoList(c.conf.Peers)
c.seedPeers = peers
if len(peers) == 0 {
c.logger.Info().Msg("no peers - only listening for connections")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/signer/aws/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) {
timeout := s.opts.timeout()
maxAttempts := maxRetries + 1

for attempt := 0; attempt < maxAttempts; attempt++ {
for attempt := range maxAttempts {
if err := ctx.Err(); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/signer/gcp/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) {
timeout := s.opts.timeout()
maxAttempts := maxRetries + 1

for attempt := 0; attempt < maxAttempts; attempt++ {
for attempt := range maxAttempts {
if err := ctx.Err(); err != nil {
return nil, err
}
Expand Down
Loading