Skip to content
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changes

- Improve P2P connection in case of transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212)

## v1.1.0-rc.1

### Added
Expand Down
8 changes: 4 additions & 4 deletions apps/evm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm

go 1.25.7

// replace (
// github.com/evstack/ev-node => ../../
// github.com/evstack/ev-node/execution/evm => ../../execution/evm
// )
replace (
github.com/evstack/ev-node => ../../
github.com/evstack/ev-node/execution/evm => ../../execution/evm
)
Comment thread
julienrbrt marked this conversation as resolved.

require (
github.com/ethereum/go-ethereum v1.17.2
Expand Down
4 changes: 0 additions & 4 deletions apps/evm/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,8 @@ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab h1:rvv6MJ
github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab/go.mod h1:IuLm4IsPipXKF7CW5Lzf68PIbZ5yl7FFd74l/E0o9A8=
github.com/ethereum/go-ethereum v1.17.2 h1:ag6geu0kn8Hv5FLKTpH+Hm2DHD+iuFtuqKxEuwUsDOI=
github.com/ethereum/go-ethereum v1.17.2/go.mod h1:KHcRXfGOUfUmKg51IhQ0IowiqZ6PqZf08CMtk0g5K1o=
github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs=
github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk=
github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8=
github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
github.com/evstack/ev-node/execution/evm v1.0.0 h1:UTAdCrnPsLoGzSgsBx4Kv76jkXpMmHBIpNv3MxyzWPo=
github.com/evstack/ev-node/execution/evm v1.0.0/go.mod h1:UrqkiepfTMiot6M8jnswgu3VU8SSucZpaMIHIl22/1A=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
Expand Down
2 changes: 1 addition & 1 deletion apps/testapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/evstack/ev-node/apps/testapp

go 1.25.7

// replace github.com/evstack/ev-node => ../../.
replace github.com/evstack/ev-node => ../../.

require (
github.com/evstack/ev-node v1.1.0-rc.1
Expand Down
2 changes: 0 additions & 2 deletions apps/testapp/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87K
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4=
github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA=
github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs=
github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk=
github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8=
github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
Expand Down
172 changes: 171 additions & 1 deletion pkg/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/ipfs/go-datastore"
Expand All @@ -28,13 +29,26 @@
rollhash "github.com/evstack/ev-node/pkg/hash"
)

// TODO(tzdybal): refactor to configuration parameters
const (
// reAdvertisePeriod defines a period after which P2P client re-attempt advertising namespace in DHT.
reAdvertisePeriod = 1 * time.Hour

// peerLimit defines limit of number of peers returned during active peer discovery.
peerLimit = 60

// peerDiscoveryInterval is how often the background loop re-advertises and
// re-runs peer discovery via DHT.
peerDiscoveryInterval = 5 * time.Minute

// reconnectCooldown is the base cooldown between reconnect attempts for the same seed peer.
reconnectCooldown = 5 * time.Second

// maxReconnectCooldown caps the exponential backoff for seed peer reconnection.
maxReconnectCooldown = 5 * time.Minute

// connectWorkers limits the number of concurrent connection attempts during
// periodic peer discovery refresh.
connectWorkers = 16
)

// Client is a P2P client, implemented with libp2p.
Expand All @@ -56,6 +70,13 @@
ps *pubsub.PubSub
started bool

seedPeers []peer.AddrInfo

maintenanceCancel context.CancelFunc
maintenanceWg sync.WaitGroup
reconnectCh chan peer.ID
connectSem chan struct{}

metrics *Metrics
}

Expand Down Expand Up @@ -164,17 +185,29 @@
return err
}

c.reconnectCh = make(chan peer.ID, 32)
c.connectSem = make(chan struct{}, connectWorkers)

c.logger.Debug().Msg("setting up active peer discovery")
if err := c.peerDiscovery(ctx); err != nil {
return err
}

c.started = true

c.host.Network().Notify(c.newDisconnectNotifee())
c.startConnectionMaintenance()

Check failure on line 199 in pkg/p2p/client.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

Function `startConnectionMaintenance` should pass the context parameter (contextcheck)

return nil
}

// Close gently stops Client.
func (c *Client) Close() error {
if c.maintenanceCancel != nil {
c.maintenanceCancel()
}
c.maintenanceWg.Wait()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Track and time-box the maintenance dials.

Close() only waits for the dispatcher goroutine. Lines 373-381 and 413-416 spawn dial workers outside maintenanceWg, while Line 385 runs discovery inline on the only maintenance worker. A slow discovery/dial can block reconnect handling, pin connectSem, or still be running when teardown closes the host and DHT. Give the refresh and each dial a short child timeout, and track the spawned workers in the same wait group/errgroup.

As per coding guidelines "Use context.Context for cancellation in Go" and "Be mindful of goroutine leaks in Go code".

Also applies to: 373-381, 384-385, 401-419

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/p2p/client.go` around lines 206 - 209, Close() currently cancels
maintenanceCancel and waits only for maintenanceWg but discovery and dial
goroutines (spawned by the maintenance worker) are not time-boxed or tracked, so
add a child context with a short timeout for the periodic discovery/refresh and
for each dial/Connect call (derive via maintenanceCtx :=
context.WithTimeout(maintenanceCancelCtx, shortDuration)), ensure every spawned
dial worker is registered with the same maintenanceWg (or use an errgroup bound
to maintenanceCtx) so they are waited on, pass the child ctx into the
dial/Connect/DHT calls so they can be cancelled, and guarantee connectSem is
released in a defer inside each worker even on timeout/cancellation; also ensure
maintenanceCancel cancels the parent ctx so all child timeouts abort during
Close().


var err error
if c.dht != nil {
err = errors.Join(err, c.dht.Close())
Expand Down Expand Up @@ -245,6 +278,142 @@
return res
}

// disconnectNotifee is a network.Notifee that triggers seed peer reconnection
// when a configured seed peer disconnects.
type disconnectNotifee struct {
c *Client
}

func (n disconnectNotifee) Connected(_ network.Network, _ network.Conn) {}
func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {}
func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {}
func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {}
func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {}

func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) {
p := conn.RemotePeer()
if n.c.reconnectCh == nil {
return
}
for _, sp := range n.c.seedPeers {
if sp.ID == p {
select {
case n.c.reconnectCh <- p:
default:
}
return
}
}
}

func (c *Client) newDisconnectNotifee() disconnectNotifee {
return disconnectNotifee{c: c}
}

// startConnectionMaintenance launches a background goroutine that reconnects
// to seed peers on disconnect (driven by network.Notifee events) and
// periodically refreshes peer discovery. This ensures P2P connectivity
// recovers after transient network failures without requiring a full node restart.
func (c *Client) startConnectionMaintenance() {
ctx, cancel := context.WithCancel(context.Background())
c.maintenanceCancel = cancel

c.maintenanceWg.Go(func() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Queue currently disconnected seed peers when maintenance starts.

This worker only reacts to future Disconnected events. A seed peer that never connected during startup, or dropped before the notifee was registered, never enters reconnectCh, and refreshPeerDiscovery only searches c.getNamespace(). Seed any currently disconnected c.seedPeers once before the select loop begins.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/p2p/client.go` around lines 317 - 321, startConnectionMaintenance
currently only handles future Disconnected events so seed peers that are already
disconnected never get enqueued; before the maintenance select loop begins
(inside the maintenanceWg.Go started by startConnectionMaintenance), iterate
c.seedPeers and for each peer that is not currently connected (use existing
connection state check used elsewhere in the file), send that peer into
c.reconnectCh once so they will be retried; ensure sends won't block the worker
(use a non-blocking select with default or spawn a short goroutine per send) and
rely on refreshPeerDiscovery/c.getNamespace as before for discovery updates.


discoveryTicker := time.NewTicker(peerDiscoveryInterval)
defer discoveryTicker.Stop()

type reconnectState struct {
lastAttempt time.Time
attempts int
}
states := make(map[peer.ID]*reconnectState)

for {
select {
case <-ctx.Done():
return
case pid := <-c.reconnectCh:
st := states[pid]
if st == nil {
st = &reconnectState{}
states[pid] = st
}

if time.Since(st.lastAttempt) > maxReconnectCooldown {
st.attempts = 0
}

backoff := reconnectCooldown * time.Duration(1<<min(st.attempts, 6))
if backoff > maxReconnectCooldown {
backoff = maxReconnectCooldown
}
if time.Now().Before(st.lastAttempt.Add(backoff)) {
continue
}
st.lastAttempt = time.Now()

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
for _, sp := range c.seedPeers {
if sp.ID != pid {
continue
}
if c.isConnected(sp.ID) {
st.attempts = 0
break
}
st.attempts++
c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnecting to disconnected seed peer")
go func(info peer.AddrInfo) {
if err := c.host.Connect(ctx, info); err != nil && ctx.Err() == nil {
c.logger.Warn().Str("peer", info.ID.String()).Err(err).Msg("failed to reconnect to seed peer")
}
}(sp)
break
}
case <-discoveryTicker.C:
c.refreshPeerDiscovery(ctx)
}
}
})
}

// refreshPeerDiscovery re-advertises and re-runs peer discovery via DHT.
func (c *Client) refreshPeerDiscovery(ctx context.Context) {
if c.disc == nil {
return
}

c.logger.Debug().Msg("refreshing peer discovery")

_ = c.advertise(ctx)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
peerCh, err := c.disc.FindPeers(ctx, c.getNamespace(), cdiscovery.Limit(peerLimit))
if err != nil {
c.logger.Warn().Err(err).Msg("peer discovery refresh failed")
return
}

for p := range peerCh {
if p.ID == c.host.ID() || c.isConnected(p.ID) {
continue
}
select {
case c.connectSem <- struct{}{}:
go func(peer peer.AddrInfo) {
defer func() { <-c.connectSem }()
c.tryConnect(ctx, peer)
}(p)
case <-ctx.Done():
return
}
}
}

// isConnected returns true if there is an active connection to the given peer.
func (c *Client) isConnected(id peer.ID) bool {
return c.host.Network().Connectedness(id) == network.Connected
}

func (c *Client) listen() (host.Host, error) {
maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress)
if err != nil {
Expand All @@ -256,6 +425,7 @@

func (c *Client) setupDHT(ctx context.Context) error {
peers := c.parseAddrInfoList(c.conf.Peers)
c.seedPeers = peers
if len(peers) == 0 {
c.logger.Info().Msg("no peers - only listening for connections")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/signer/aws/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) {
timeout := s.opts.timeout()
maxAttempts := maxRetries + 1

for attempt := 0; attempt < maxAttempts; attempt++ {
for attempt := range maxAttempts {
if err := ctx.Err(); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/signer/gcp/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) {
timeout := s.opts.timeout()
maxAttempts := maxRetries + 1

for attempt := 0; attempt < maxAttempts; attempt++ {
for attempt := range maxAttempts {
if err := ctx.Err(); err != nil {
return nil, err
}
Expand Down
Loading