Skip to content

Commit 6c1e630

Browse files
committed
Review feedback
1 parent a102b8e commit 6c1e630

8 files changed

Lines changed: 95 additions & 59 deletions

File tree

apps/evm/cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ var RunCmd = &cobra.Command{
6060
return err
6161
}
6262

63-
blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
63+
blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
6464
if err != nil {
6565
return fmt.Errorf("failed to create blob client: %w", err)
6666
}

block/internal/syncing/da_follower.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
// - followLoop listens on the subscription channel. When caught up, it processes
2323
// subscription blobs inline (fast path, no DA re-fetch). Otherwise, it updates
2424
// highestSeenDAHeight and signals the catchup loop.
25-
// - catchupLoop sequentially retrieves from localDAHeight → highestSeenDAHeight,
25+
// - catchupLoop sequentially retrieves from localNextDAHeight → highestSeenDAHeight,
2626
// piping events to the Syncer's heightInCh.
2727
//
2828
// The two goroutines share only atomic state; no mutexes needed.
@@ -51,9 +51,9 @@ type daFollower struct {
5151
// share the same namespace). When different, we subscribe to both and merge.
5252
dataNamespace []byte
5353

54-
// localDAHeight is only written by catchupLoop and read by followLoop
54+
// localNextDAHeight is only written by catchupLoop and read by followLoop
5555
// to determine whether a catchup is needed.
56-
localDAHeight atomic.Uint64
56+
localNextDAHeight atomic.Uint64
5757

5858
// highestSeenDAHeight is written by followLoop and read by catchupLoop.
5959
highestSeenDAHeight atomic.Uint64
@@ -62,7 +62,7 @@ type daFollower struct {
6262
headReached atomic.Bool
6363

6464
// catchupSignal is sent by followLoop to wake catchupLoop when a new
65-
// height is seen that is above localDAHeight.
65+
// height is seen that is above localNextDAHeight.
6666
catchupSignal chan struct{}
6767

6868
// daBlockTime is used as a backoff before retrying after errors.
@@ -101,7 +101,7 @@ func NewDAFollower(cfg DAFollowerConfig) DAFollower {
101101
catchupSignal: make(chan struct{}, 1),
102102
daBlockTime: cfg.DABlockTime,
103103
}
104-
f.localDAHeight.Store(cfg.StartDAHeight)
104+
f.localNextDAHeight.Store(cfg.StartDAHeight)
105105
return f
106106
}
107107

@@ -114,7 +114,7 @@ func (f *daFollower) Start(ctx context.Context) error {
114114
go f.catchupLoop(ctx)
115115

116116
f.logger.Info().
117-
Uint64("start_da_height", f.localDAHeight.Load()).
117+
Uint64("start_da_height", f.localNextDAHeight.Load()).
118118
Msg("DA follower started")
119119
return nil
120120
}
@@ -142,7 +142,7 @@ func (f *daFollower) signalCatchup() {
142142
}
143143

144144
// followLoop subscribes to DA blob events and keeps highestSeenDAHeight up to date.
145-
// When a new height appears above localDAHeight, it wakes the catchup loop.
145+
// When a new height appears above localNextDAHeight, it wakes the catchup loop.
146146
func (f *daFollower) followLoop(ctx context.Context) {
147147
defer f.wg.Done()
148148

@@ -244,11 +244,11 @@ func (f *daFollower) mergeSubscriptions(
244244
}
245245

246246
// handleSubscriptionEvent processes a subscription event. When the follower is
247-
// caught up (ev.Height == localDAHeight) and blobs are available, it processes
247+
// caught up (ev.Height == localNextDAHeight) and blobs are available, it processes
248248
// them inline — avoiding a DA re-fetch round trip. Otherwise, it just updates
249249
// highestSeenDAHeight and lets catchupLoop handle retrieval.
250250
//
251-
// Uses CAS on localDAHeight to claim exclusive access to processBlobs,
251+
// Uses CAS on localNextDAHeight to claim exclusive access to processBlobs,
252252
// preventing concurrent map access with catchupLoop.
253253
func (f *daFollower) handleSubscriptionEvent(ctx context.Context, ev datypes.SubscriptionEvent) {
254254
// Always record the highest height we've seen from the subscription.
@@ -257,24 +257,26 @@ func (f *daFollower) handleSubscriptionEvent(ctx context.Context, ev datypes.Sub
257257
// Fast path: try to claim this height for inline processing.
258258
// CAS(N, N+1) ensures only one goroutine (followLoop or catchupLoop)
259259
// can enter processBlobs for height N.
260-
if len(ev.Blobs) > 0 && f.localDAHeight.CompareAndSwap(ev.Height, ev.Height+1) {
260+
if len(ev.Blobs) > 0 && f.localNextDAHeight.CompareAndSwap(ev.Height, ev.Height+1) {
261261
events := f.retriever.ProcessBlobs(ctx, ev.Blobs, ev.Height)
262262
for _, event := range events {
263263
if err := f.pipeEvent(ctx, event); err != nil {
264264
// Roll back so catchupLoop can retry this height.
265-
f.localDAHeight.Store(ev.Height)
265+
f.localNextDAHeight.Store(ev.Height)
266266
f.logger.Warn().Err(err).Uint64("da_height", ev.Height).
267267
Msg("failed to pipe inline event, catchup will retry")
268268
return
269269
}
270270
}
271271
if len(events) != 0 {
272-
f.headReached.Store(true)
272+
if !f.headReached.Load() && f.localNextDAHeight.Load() > f.highestSeenDAHeight.Load() {
273+
f.headReached.Store(true)
274+
}
273275
f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
274276
Msg("processed subscription blobs inline (fast path)")
275277
} else {
276278
// No complete events (split namespace, waiting for other half).
277-
f.localDAHeight.Store(ev.Height)
279+
f.localNextDAHeight.Store(ev.Height)
278280
}
279281
return
280282
}
@@ -297,7 +299,7 @@ func (f *daFollower) updateHighest(height uint64) {
297299
}
298300

299301
// catchupLoop waits for signals and sequentially retrieves DA heights
300-
// from localDAHeight up to highestSeenDAHeight.
302+
// from localNextDAHeight up to highestSeenDAHeight.
301303
func (f *daFollower) catchupLoop(ctx context.Context) {
302304
defer f.wg.Done()
303305

@@ -314,7 +316,7 @@ func (f *daFollower) catchupLoop(ctx context.Context) {
314316
}
315317
}
316318

317-
// runCatchup sequentially retrieves from localDAHeight to highestSeenDAHeight.
319+
// runCatchup sequentially retrieves from localNextDAHeight to highestSeenDAHeight.
318320
// It handles priority heights first, then sequential heights.
319321
func (f *daFollower) runCatchup(ctx context.Context) {
320322
for {
@@ -324,8 +326,8 @@ func (f *daFollower) runCatchup(ctx context.Context) {
324326

325327
// Check for priority heights from P2P hints first.
326328
if priorityHeight := f.retriever.PopPriorityHeight(); priorityHeight > 0 {
327-
currentHeight := f.localDAHeight.Load()
328-
if priorityHeight < currentHeight {
329+
nextHeight := f.localNextDAHeight.Load()
330+
if priorityHeight < nextHeight {
329331
continue
330332
}
331333
f.logger.Debug().
@@ -340,7 +342,7 @@ func (f *daFollower) runCatchup(ctx context.Context) {
340342
}
341343

342344
// Sequential catchup.
343-
local := f.localDAHeight.Load()
345+
local := f.localNextDAHeight.Load()
344346
highest := f.highestSeenDAHeight.Load()
345347

346348
if local > highest {
@@ -350,14 +352,14 @@ func (f *daFollower) runCatchup(ctx context.Context) {
350352
}
351353

352354
// CAS claims this height prevents followLoop from inline-processing
353-
if !f.localDAHeight.CompareAndSwap(local, local+1) {
355+
if !f.localNextDAHeight.CompareAndSwap(local, local+1) {
354356
// followLoop already advanced past this height via inline processing.
355357
continue
356358
}
357359

358360
if err := f.fetchAndPipeHeight(ctx, local); err != nil {
359361
// Roll back so we can retry after backoff.
360-
f.localDAHeight.Store(local)
362+
f.localNextDAHeight.Store(local)
361363
if !f.waitOnCatchupError(ctx, err, local) {
362364
return
363365
}

block/internal/syncing/da_retriever.go

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"slices"
99
"sync"
10+
"sync/atomic"
1011

1112
"github.com/rs/zerolog"
1213
"google.golang.org/protobuf/proto"
@@ -43,12 +44,13 @@ type daRetriever struct {
4344

4445
// transient cache, only full event need to be passed to the syncer
4546
// on restart, will be refetch as da height is updated by syncer
47+
pendingMu sync.Mutex
4648
pendingHeaders map[uint64]*types.SignedHeader
4749
pendingData map[uint64]*types.Data
4850

4951
// strictMode indicates if the node has seen a valid DAHeaderEnvelope
5052
// and should now reject all legacy/unsigned headers.
51-
strictMode bool
53+
strictMode atomic.Bool
5254

5355
// priorityMu protects priorityHeights from concurrent access
5456
priorityMu sync.Mutex
@@ -71,7 +73,6 @@ func NewDARetriever(
7173
logger: logger.With().Str("component", "da_retriever").Logger(),
7274
pendingHeaders: make(map[uint64]*types.SignedHeader),
7375
pendingData: make(map[uint64]*types.Data),
74-
strictMode: false,
7576
priorityHeights: make([]uint64, 0),
7677
}
7778
}
@@ -198,41 +199,55 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight
198199
// This is the public interface used by the DAFollower for inline subscription processing.
199200
//
200201
// NOT thread-safe: the caller (DAFollower) must ensure exclusive access via CAS
201-
// on localDAHeight before calling this method.
202+
// on localNextDAHeight before calling this method.
202203
func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
203204
return r.processBlobs(ctx, blobs, daHeight)
204205
}
205206

206207
// processBlobs processes retrieved blobs to extract headers and data and returns height events
207208
func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
208-
// Decode all blobs
209+
var decodedHeaders []*types.SignedHeader
210+
var decodedData []*types.Data
211+
212+
// Decode all blobs first without holding the lock
209213
for _, bz := range blobs {
210214
if len(bz) == 0 {
211215
continue
212216
}
213217

214218
if header := r.tryDecodeHeader(bz, daHeight); header != nil {
215-
if _, ok := r.pendingHeaders[header.Height()]; ok {
216-
// a (malicious) node may have re-published valid header to another da height (should never happen)
217-
// we can already discard it, only the first one is valid
218-
r.logger.Debug().Uint64("height", header.Height()).Uint64("da_height", daHeight).Msg("header blob already exists for height, discarding")
219-
continue
220-
}
221-
222-
r.pendingHeaders[header.Height()] = header
219+
decodedHeaders = append(decodedHeaders, header)
223220
continue
224221
}
225222

226223
if data := r.tryDecodeData(bz, daHeight); data != nil {
227-
if _, ok := r.pendingData[data.Height()]; ok {
228-
// a (malicious) node may have re-published valid data to another da height (should never happen)
229-
// we can already discard it, only the first one is valid
230-
r.logger.Debug().Uint64("height", data.Height()).Uint64("da_height", daHeight).Msg("data blob already exists for height, discarding")
231-
continue
232-
}
224+
decodedData = append(decodedData, data)
225+
}
226+
}
233227

234-
r.pendingData[data.Height()] = data
228+
r.pendingMu.Lock()
229+
defer r.pendingMu.Unlock()
230+
231+
for _, header := range decodedHeaders {
232+
if _, ok := r.pendingHeaders[header.Height()]; ok {
233+
// a (malicious) node may have re-published valid header to another da height (should never happen)
234+
// we can already discard it, only the first one is valid
235+
r.logger.Debug().Uint64("height", header.Height()).Uint64("da_height", daHeight).Msg("header blob already exists for height, discarding")
236+
continue
237+
}
238+
239+
r.pendingHeaders[header.Height()] = header
240+
}
241+
242+
for _, data := range decodedData {
243+
if _, ok := r.pendingData[data.Height()]; ok {
244+
// a (malicious) node may have re-published valid data to another da height (should never happen)
245+
// we can already discard it, only the first one is valid
246+
r.logger.Debug().Uint64("height", data.Height()).Uint64("da_height", daHeight).Msg("data blob already exists for height, discarding")
247+
continue
235248
}
249+
250+
r.pendingData[data.Height()] = data
236251
}
237252

238253
var events []common.DAHeightEvent
@@ -294,7 +309,7 @@ func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedH
294309
// Attempt to unmarshal as DAHeaderEnvelope and get the envelope signature
295310
if envelopeSignature, err := header.UnmarshalDAEnvelope(bz); err != nil {
296311
// If in strict mode, we REQUIRE an envelope.
297-
if r.strictMode {
312+
if r.strictMode.Load() {
298313
r.logger.Warn().Err(err).Msg("strict mode is enabled, rejecting non-envelope blob")
299314
return nil
300315
}
@@ -328,14 +343,14 @@ func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedH
328343
isValidEnvelope = true
329344
}
330345
}
331-
if r.strictMode && !isValidEnvelope {
346+
if r.strictMode.Load() && !isValidEnvelope {
332347
r.logger.Warn().Msg("strict mode: rejecting block that is not a fully valid envelope")
333348
return nil
334349
}
335350
// Mode Switch Logic
336-
if isValidEnvelope && !r.strictMode {
351+
if isValidEnvelope && !r.strictMode.Load() {
337352
r.logger.Info().Uint64("height", header.Height()).Msg("valid DA envelope detected, switching to STRICT MODE")
338-
r.strictMode = true
353+
r.strictMode.Store(true)
339354
}
340355

341356
// Legacy blob support implies: strictMode == false AND (!isValidEnvelope).

block/internal/syncing/syncer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ type Syncer struct {
7979
p2pHandler p2pHandler
8080
raftRetriever *raftRetriever
8181

82-
// DA follower (replaces the old polling daWorkerLoop)
8382
daFollower DAFollower
8483

8584
// Forced inclusion tracking
@@ -209,6 +208,8 @@ func (s *Syncer) Start(ctx context.Context) error {
209208
DABlockTime: s.config.DA.BlockTime.Duration,
210209
})
211210
if err := s.daFollower.Start(ctx); err != nil {
211+
s.cancel()
212+
s.wg.Wait()
212213
return fmt.Errorf("failed to start DA follower: %w", err)
213214
}
214215

@@ -394,8 +395,6 @@ func (s *Syncer) HasReachedDAHead() bool {
394395
return false
395396
}
396397

397-
// fetchDAUntilCaughtUp was removed — the DAFollower handles this concern.
398-
399398
// PendingCount returns the number of unprocessed height events in the pipeline.
400399
func (s *Syncer) PendingCount() int {
401400
return len(s.heightInCh)

block/internal/syncing/syncer_backoff_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func TestDAFollower_CatchupThenReachHead(t *testing.T) {
255255
}
256256

257257
// TestDAFollower_InlineProcessing verifies the fast path: when the subscription
258-
// delivers blobs at the current localDAHeight, handleSubscriptionEvent processes
258+
// delivers blobs at the current localNextDAHeight, handleSubscriptionEvent processes
259259
// them inline via ProcessBlobs (not RetrieveFromDA).
260260
func TestDAFollower_InlineProcessing(t *testing.T) {
261261
t.Run("processes_blobs_inline_when_caught_up", func(t *testing.T) {
@@ -285,7 +285,7 @@ func TestDAFollower_InlineProcessing(t *testing.T) {
285285
daRetriever.On("ProcessBlobs", mock.Anything, blobs, uint64(10)).
286286
Return(expectedEvents).Once()
287287

288-
// Simulate subscription event at the current localDAHeight
288+
// Simulate subscription event at the current localNextDAHeight
289289
follower.handleSubscriptionEvent(t.Context(), datypes.SubscriptionEvent{
290290
Height: 10,
291291
Blobs: blobs,
@@ -294,7 +294,7 @@ func TestDAFollower_InlineProcessing(t *testing.T) {
294294
// Verify: ProcessBlobs was called, events were piped, height advanced
295295
require.Len(t, pipedEvents, 1, "should pipe 1 event from inline processing")
296296
assert.Equal(t, uint64(10), pipedEvents[0].DaHeight)
297-
assert.Equal(t, uint64(11), follower.localDAHeight.Load(), "localDAHeight should advance past processed height")
297+
assert.Equal(t, uint64(11), follower.localNextDAHeight.Load(), "localNextDAHeight should advance past processed height")
298298
assert.True(t, follower.HasReachedHead(), "should mark head as reached after inline processing")
299299
})
300300

@@ -324,7 +324,7 @@ func TestDAFollower_InlineProcessing(t *testing.T) {
324324

325325
// ProcessBlobs should NOT have been called
326326
daRetriever.AssertNotCalled(t, "ProcessBlobs", mock.Anything, mock.Anything, mock.Anything)
327-
assert.Equal(t, uint64(10), follower.localDAHeight.Load(), "localDAHeight should not change")
327+
assert.Equal(t, uint64(10), follower.localNextDAHeight.Load(), "localNextDAHeight should not change")
328328
assert.Equal(t, uint64(15), follower.highestSeenDAHeight.Load(), "highestSeen should be updated")
329329
})
330330

@@ -350,7 +350,7 @@ func TestDAFollower_InlineProcessing(t *testing.T) {
350350

351351
// ProcessBlobs should NOT have been called
352352
daRetriever.AssertNotCalled(t, "ProcessBlobs", mock.Anything, mock.Anything, mock.Anything)
353-
assert.Equal(t, uint64(10), follower.localDAHeight.Load(), "localDAHeight should not change")
353+
assert.Equal(t, uint64(10), follower.localNextDAHeight.Load(), "localNextDAHeight should not change")
354354
assert.Equal(t, uint64(10), follower.highestSeenDAHeight.Load(), "highestSeen should be updated")
355355
})
356356
}

0 commit comments

Comments
 (0)