Skip to content

Commit 6069213

Browse files
committed
feedback
1 parent cf7c3ce commit 6069213

1 file changed

Lines changed: 54 additions & 10 deletions

File tree

pkg/p2p/client.go

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,19 @@ const (
3636
// peerLimit defines limit of number of peers returned during active peer discovery.
3737
peerLimit = 60
3838

39-
// peerDiscoveryInterval is how often the background loop re-runs peer discovery.
39+
// peerDiscoveryInterval is how often the background loop re-advertises and
40+
// re-runs peer discovery via DHT.
4041
peerDiscoveryInterval = 5 * time.Minute
4142

42-
// reconnectCooldown is the minimum time between reconnect attempts for the same seed peer.
43+
// reconnectCooldown is the base cooldown between reconnect attempts for the same seed peer.
4344
reconnectCooldown = 5 * time.Second
45+
46+
// maxReconnectCooldown caps the exponential backoff for seed peer reconnection.
47+
maxReconnectCooldown = 5 * time.Minute
48+
49+
// connectWorkers limits the number of concurrent connection attempts during
50+
// periodic peer discovery refresh.
51+
connectWorkers = 16
4452
)
4553

4654
// Client is a P2P client, implemented with libp2p.
@@ -67,6 +75,7 @@ type Client struct {
6775
maintenanceCancel context.CancelFunc
6876
maintenanceWg sync.WaitGroup
6977
reconnectCh chan peer.ID
78+
connectSem chan struct{}
7079

7180
metrics *Metrics
7281
}
@@ -176,14 +185,16 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error {
176185
return err
177186
}
178187

188+
c.reconnectCh = make(chan peer.ID, 32)
189+
c.connectSem = make(chan struct{}, connectWorkers)
190+
179191
c.logger.Debug().Msg("setting up active peer discovery")
180192
if err := c.peerDiscovery(ctx); err != nil {
181193
return err
182194
}
183195

184196
c.started = true
185197

186-
c.reconnectCh = make(chan peer.ID, 32)
187198
c.host.Network().Notify(c.newDisconnectNotifee())
188199
c.startConnectionMaintenance(ctx)
189200

@@ -281,6 +292,9 @@ func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr)
281292

282293
func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) {
283294
p := conn.RemotePeer()
295+
if n.c.reconnectCh == nil {
296+
return
297+
}
284298
for _, sp := range n.c.seedPeers {
285299
if sp.ID == p {
286300
select {
@@ -309,29 +323,51 @@ func (c *Client) startConnectionMaintenance(parentCtx context.Context) {
309323
discoveryTicker := time.NewTicker(peerDiscoveryInterval)
310324
defer discoveryTicker.Stop()
311325

312-
lastReconnect := make(map[peer.ID]time.Time)
326+
type reconnectState struct {
327+
lastAttempt time.Time
328+
attempts int
329+
}
330+
states := make(map[peer.ID]*reconnectState)
313331

314332
for {
315333
select {
316334
case <-ctx.Done():
317335
return
318336
case pid := <-c.reconnectCh:
319-
if until := lastReconnect[pid].Add(reconnectCooldown); time.Now().Before(until) {
337+
st := states[pid]
338+
if st == nil {
339+
st = &reconnectState{}
340+
states[pid] = st
341+
}
342+
343+
if time.Since(st.lastAttempt) > maxReconnectCooldown {
344+
st.attempts = 0
345+
}
346+
347+
backoff := reconnectCooldown * time.Duration(1<<min(st.attempts, 6))
348+
if backoff > maxReconnectCooldown {
349+
backoff = maxReconnectCooldown
350+
}
351+
if time.Now().Before(st.lastAttempt.Add(backoff)) {
320352
continue
321353
}
322-
lastReconnect[pid] = time.Now()
354+
st.lastAttempt = time.Now()
323355

324356
for _, sp := range c.seedPeers {
325357
if sp.ID != pid {
326358
continue
327359
}
328360
if c.isConnected(sp.ID) {
361+
st.attempts = 0
329362
break
330363
}
364+
st.attempts++
331365
c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnecting to disconnected seed peer")
332-
if err := c.host.Connect(ctx, sp); err != nil && ctx.Err() == nil {
333-
c.logger.Warn().Str("peer", sp.ID.String()).Err(err).Msg("failed to reconnect to seed peer")
334-
}
366+
go func(info peer.AddrInfo) {
367+
if err := c.host.Connect(ctx, info); err != nil && ctx.Err() == nil {
368+
c.logger.Warn().Str("peer", info.ID.String()).Err(err).Msg("failed to reconnect to seed peer")
369+
}
370+
}(sp)
335371
break
336372
}
337373
case <-discoveryTicker.C:
@@ -361,7 +397,15 @@ func (c *Client) refreshPeerDiscovery(ctx context.Context) {
361397
if p.ID == c.host.ID() || c.isConnected(p.ID) {
362398
continue
363399
}
364-
go c.tryConnect(ctx, p)
400+
select {
401+
case c.connectSem <- struct{}{}:
402+
go func(peer peer.AddrInfo) {
403+
defer func() { <-c.connectSem }()
404+
c.tryConnect(ctx, peer)
405+
}(p)
406+
case <-ctx.Done():
407+
return
408+
}
365409
}
366410
}
367411

0 commit comments

Comments
 (0)