Skip to content

Commit 9825d5e

Browse files
committed
add p2p logs to help debug issue
1 parent 889da9a commit 9825d5e

2 files changed

Lines changed: 133 additions & 1 deletion

File tree

block/internal/syncing/syncer.go

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ const (
4242
fullnessThreshold = 0.8
4343
)
4444

45+
var p2pWaitLogInterval = 5 * time.Second
46+
4547
// Syncer handles block synchronization from DA and P2P sources.
4648
type Syncer struct {
4749
// Core components
@@ -476,7 +478,7 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) {
476478
waitCtx, cancel := context.WithCancel(ctx)
477479
s.setP2PWaitState(targetHeight, cancel)
478480

479-
err = s.p2pHandler.ProcessHeight(waitCtx, targetHeight, s.heightInCh)
481+
err = s.processP2PHeight(waitCtx, targetHeight, logger)
480482
s.cancelP2PWait(targetHeight)
481483

482484
if err != nil {
@@ -503,6 +505,54 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) {
503505
}
504506
}
505507

508+
func (s *Syncer) processP2PHeight(ctx context.Context, targetHeight uint64, logger zerolog.Logger) error {
509+
doneCh := make(chan error, 1)
510+
startedAt := time.Now()
511+
loggedWait := false
512+
513+
go func() {
514+
doneCh <- s.p2pHandler.ProcessHeight(ctx, targetHeight, s.heightInCh)
515+
}()
516+
517+
ticker := time.NewTicker(p2pWaitLogInterval)
518+
defer ticker.Stop()
519+
520+
for {
521+
select {
522+
case err := <-doneCh:
523+
if loggedWait && err == nil {
524+
logger.Info().
525+
Uint64("height", targetHeight).
526+
Dur("waited", time.Since(startedAt)).
527+
Msg("P2P height became available")
528+
}
529+
return err
530+
case <-ctx.Done():
531+
return ctx.Err()
532+
case <-ticker.C:
533+
loggedWait = true
534+
535+
event := logger.Warn().
536+
Uint64("target_height", targetHeight).
537+
Uint64("header_store_height", storeHeightOf(s.headerStore)).
538+
Uint64("data_store_height", storeHeightOf(s.dataStore)).
539+
Int("height_events_buffered", len(s.heightInCh)).
540+
Int64("in_flight_events", s.inFlight.Load()).
541+
Int("pending_events", s.cache.PendingEventsCount()).
542+
Int("total_pending", s.PendingCount()).
543+
Dur("waited", time.Since(startedAt))
544+
545+
if localHeight, err := s.store.Height(ctx); err != nil {
546+
event = event.Str("local_height_err", err.Error())
547+
} else {
548+
event = event.Uint64("local_height", localHeight)
549+
}
550+
551+
event.Msg("waiting for P2P height to become available")
552+
}
553+
}
554+
}
555+
506556
func (s *Syncer) waitForGenesis() bool {
507557
if delay := time.Until(s.genesis.StartTime); delay > 0 {
508558
timer := time.NewTimer(delay)
@@ -1110,6 +1160,17 @@ func (s *Syncer) sleepOrDone(ctx context.Context, duration time.Duration) bool {
11101160
}
11111161
}
11121162

1163+
type storeHeightReader interface {
1164+
Height() uint64
1165+
}
1166+
1167+
func storeHeightOf(store storeHeightReader) uint64 {
1168+
if store == nil {
1169+
return 0
1170+
}
1171+
return store.Height()
1172+
}
1173+
11131174
type p2pWaitState struct {
11141175
height uint64
11151176
cancel context.CancelFunc

block/internal/syncing/syncer_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package syncing
22

33
import (
4+
"bytes"
45
"context"
56
crand "crypto/rand"
67
"crypto/sha512"
78
"errors"
89
"math"
10+
"strings"
911
"sync/atomic"
1012
"testing"
1113
"testing/synctest"
@@ -34,6 +36,16 @@ import (
3436
"github.com/evstack/ev-node/types"
3537
)
3638

39+
type stubP2PHandler struct {
40+
processHeight func(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error
41+
}
42+
43+
func (s *stubP2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error {
44+
return s.processHeight(ctx, height, heightInCh)
45+
}
46+
47+
func (s *stubP2PHandler) SetProcessedHeight(uint64) {}
48+
3749
// helper to create a signer, pubkey and address for tests
3850
func buildSyncTestSigner(tb testing.TB) (addr []byte, pub crypto.PubKey, signer signerpkg.Signer) {
3951
tb.Helper()
@@ -165,6 +177,65 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) {
165177
require.Error(t, err)
166178
}
167179

180+
func TestSyncer_processP2PHeight_LogsWhenBlocked(t *testing.T) {
181+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
182+
st := store.New(ds)
183+
184+
batch, err := st.NewBatch(t.Context())
185+
require.NoError(t, err)
186+
require.NoError(t, batch.SetHeight(7))
187+
require.NoError(t, batch.Commit())
188+
189+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
190+
require.NoError(t, err)
191+
cm.SetPendingEvent(10, &common.DAHeightEvent{})
192+
193+
headerStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
194+
headerStore.EXPECT().Height().Return(uint64(11)).Maybe()
195+
dataStore := extmocks.NewMockStore[*types.P2PData](t)
196+
dataStore.EXPECT().Height().Return(uint64(12)).Maybe()
197+
198+
var logBuf bytes.Buffer
199+
logger := zerolog.New(&logBuf)
200+
201+
originalInterval := p2pWaitLogInterval
202+
p2pWaitLogInterval = 10 * time.Millisecond
203+
t.Cleanup(func() {
204+
p2pWaitLogInterval = originalInterval
205+
})
206+
207+
s := &Syncer{
208+
store: st,
209+
cache: cm,
210+
headerStore: headerStore,
211+
dataStore: dataStore,
212+
heightInCh: make(chan common.DAHeightEvent, 2),
213+
logger: logger,
214+
p2pHandler: &stubP2PHandler{
215+
processHeight: func(ctx context.Context, _ uint64, _ chan<- common.DAHeightEvent) error {
216+
select {
217+
case <-time.After(35 * time.Millisecond):
218+
return nil
219+
case <-ctx.Done():
220+
return ctx.Err()
221+
}
222+
},
223+
},
224+
}
225+
226+
err = s.processP2PHeight(t.Context(), 8, logger)
227+
require.NoError(t, err)
228+
229+
logs := logBuf.String()
230+
require.Contains(t, logs, "waiting for P2P height to become available")
231+
require.Contains(t, logs, "\"target_height\":8")
232+
require.Contains(t, logs, "\"local_height\":7")
233+
require.Contains(t, logs, "\"header_store_height\":11")
234+
require.Contains(t, logs, "\"data_store_height\":12")
235+
require.Contains(t, logs, "\"pending_events\":1")
236+
require.True(t, strings.Contains(logs, "P2P height became available"))
237+
}
238+
168239
func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) {
169240
ds := dssync.MutexWrap(datastore.NewMapDatastore())
170241
st := store.New(ds)

0 commit comments

Comments
 (0)