@@ -69,7 +69,6 @@ type daFollower struct {
6969 daBlockTime time.Duration
7070
7171 // lifecycle
72- ctx context.Context
7372 cancel context.CancelFunc
7473 wg sync.WaitGroup
7574}
@@ -108,11 +107,11 @@ func NewDAFollower(cfg DAFollowerConfig) DAFollower {
108107
109108// Start begins the follow and catchup goroutines.
110109func (f * daFollower ) Start (ctx context.Context ) error {
111- f . ctx , f .cancel = context .WithCancel (ctx )
110+ ctx , f .cancel = context .WithCancel (ctx )
112111
113112 f .wg .Add (2 )
114- go f .followLoop ()
115- go f .catchupLoop ()
113+ go f .followLoop (ctx )
114+ go f .catchupLoop (ctx )
116115
117116 f .logger .Info ().
118117 Uint64 ("start_da_height" , f .localDAHeight .Load ()).
@@ -144,20 +143,20 @@ func (f *daFollower) signalCatchup() {
144143
145144// followLoop subscribes to DA blob events and keeps highestSeenDAHeight up to date.
146145// When a new height appears above localDAHeight, it wakes the catchup loop.
147- func (f * daFollower ) followLoop () {
146+ func (f * daFollower ) followLoop (ctx context. Context ) {
148147 defer f .wg .Done ()
149148
150149 f .logger .Info ().Msg ("starting follow loop" )
151150 defer f .logger .Info ().Msg ("follow loop stopped" )
152151
153152 for {
154- if err := f .runSubscription (); err != nil {
155- if f . ctx .Err () != nil {
153+ if err := f .runSubscription (ctx ); err != nil {
154+ if ctx .Err () != nil {
156155 return
157156 }
158157 f .logger .Warn ().Err (err ).Msg ("DA subscription failed, reconnecting" )
159158 select {
160- case <- f . ctx .Done ():
159+ case <- ctx .Done ():
161160 return
162161 case <- time .After (f .backoff ()):
163162 }
@@ -169,9 +168,9 @@ func (f *daFollower) followLoop() {
169168// different) and processes events until a channel is closed or an error occurs.
170169// A watchdog timer triggers if no events arrive within watchdogTimeout(),
171170// causing a reconnect.
172- func (f * daFollower ) runSubscription () error {
171+ func (f * daFollower ) runSubscription (ctx context. Context ) error {
173172 // Sub-context ensures the merge goroutine is cancelled when this function returns.
174- subCtx , subCancel := context .WithCancel (f . ctx )
173+ subCtx , subCancel := context .WithCancel (ctx )
175174 defer subCancel ()
176175
177176 headerCh , err := f .client .Subscribe (subCtx , f .namespace )
@@ -201,7 +200,7 @@ func (f *daFollower) runSubscription() error {
201200 if ! ok {
202201 return errors .New ("subscription channel closed" )
203202 }
204- f .handleSubscriptionEvent (ev )
203+ f .handleSubscriptionEvent (ctx , ev )
205204 watchdog .Reset (watchdogTimeout )
206205 case <- watchdog .C :
207206 return errors .New ("subscription watchdog: no events received, reconnecting" )
@@ -251,17 +250,17 @@ func (f *daFollower) mergeSubscriptions(
251250//
252251// Uses CAS on localDAHeight to claim exclusive access to processBlobs,
253252// preventing concurrent map access with catchupLoop.
254- func (f * daFollower ) handleSubscriptionEvent (ev datypes.SubscriptionEvent ) {
253+ func (f * daFollower ) handleSubscriptionEvent (ctx context. Context , ev datypes.SubscriptionEvent ) {
255254 // Always record the highest height we've seen from the subscription.
256255 f .updateHighest (ev .Height )
257256
258257 // Fast path: try to claim this height for inline processing.
259258 // CAS(N, N+1) ensures only one goroutine (followLoop or catchupLoop)
260259 // can enter processBlobs for height N.
261260 if len (ev .Blobs ) > 0 && f .localDAHeight .CompareAndSwap (ev .Height , ev .Height + 1 ) {
262- events := f .retriever .ProcessBlobs (f . ctx , ev .Blobs , ev .Height )
261+ events := f .retriever .ProcessBlobs (ctx , ev .Blobs , ev .Height )
263262 for _ , event := range events {
264- if err := f .pipeEvent (f . ctx , event ); err != nil {
263+ if err := f .pipeEvent (ctx , event ); err != nil {
265264 // Roll back so catchupLoop can retry this height.
266265 f .localDAHeight .Store (ev .Height )
267266 f .logger .Warn ().Err (err ).Uint64 ("da_height" , ev .Height ).
@@ -299,27 +298,27 @@ func (f *daFollower) updateHighest(height uint64) {
299298
300299// catchupLoop waits for signals and sequentially retrieves DA heights
301300// from localDAHeight up to highestSeenDAHeight.
302- func (f * daFollower ) catchupLoop () {
301+ func (f * daFollower ) catchupLoop (ctx context. Context ) {
303302 defer f .wg .Done ()
304303
305304 f .logger .Info ().Msg ("starting catchup loop" )
306305 defer f .logger .Info ().Msg ("catchup loop stopped" )
307306
308307 for {
309308 select {
310- case <- f . ctx .Done ():
309+ case <- ctx .Done ():
311310 return
312311 case <- f .catchupSignal :
313- f .runCatchup ()
312+ f .runCatchup (ctx )
314313 }
315314 }
316315}
317316
318317// runCatchup sequentially retrieves from localDAHeight to highestSeenDAHeight.
319318// It handles priority heights first, then sequential heights.
320- func (f * daFollower ) runCatchup () {
319+ func (f * daFollower ) runCatchup (ctx context. Context ) {
321320 for {
322- if f . ctx .Err () != nil {
321+ if ctx .Err () != nil {
323322 return
324323 }
325324
@@ -332,8 +331,8 @@ func (f *daFollower) runCatchup() {
332331 f .logger .Debug ().
333332 Uint64 ("da_height" , priorityHeight ).
334333 Msg ("fetching priority DA height from P2P hint" )
335- if err := f .fetchAndPipeHeight (priorityHeight ); err != nil {
336- if ! f .waitOnCatchupError (err , priorityHeight ) {
334+ if err := f .fetchAndPipeHeight (ctx , priorityHeight ); err != nil {
335+ if ! f .waitOnCatchupError (ctx , err , priorityHeight ) {
337336 return
338337 }
339338 }
@@ -350,16 +349,16 @@ func (f *daFollower) runCatchup() {
350349 return
351350 }
352351
353- // CAS claims this height — prevents followLoop from inline-processing
352+ // CAS claims this height prevents followLoop from inline-processing
354353 if ! f .localDAHeight .CompareAndSwap (local , local + 1 ) {
355354 // followLoop already advanced past this height via inline processing.
356355 continue
357356 }
358357
359- if err := f .fetchAndPipeHeight (local ); err != nil {
358+ if err := f .fetchAndPipeHeight (ctx , local ); err != nil {
360359 // Roll back so we can retry after backoff.
361360 f .localDAHeight .Store (local )
362- if ! f .waitOnCatchupError (err , local ) {
361+ if ! f .waitOnCatchupError (ctx , err , local ) {
363362 return
364363 }
365364 continue
@@ -369,8 +368,8 @@ func (f *daFollower) runCatchup() {
369368
370369// fetchAndPipeHeight retrieves events at a single DA height and pipes them
371370// to the syncer.
372- func (f * daFollower ) fetchAndPipeHeight (daHeight uint64 ) error {
373- events , err := f .retriever .RetrieveFromDA (f . ctx , daHeight )
371+ func (f * daFollower ) fetchAndPipeHeight (ctx context. Context , daHeight uint64 ) error {
372+ events , err := f .retriever .RetrieveFromDA (ctx , daHeight )
374373 if err != nil {
375374 switch {
376375 case errors .Is (err , datypes .ErrBlobNotFound ):
@@ -387,7 +386,7 @@ func (f *daFollower) fetchAndPipeHeight(daHeight uint64) error {
387386 }
388387
389388 for _ , event := range events {
390- if err := f .pipeEvent (f . ctx , event ); err != nil {
389+ if err := f .pipeEvent (ctx , event ); err != nil {
391390 return err
392391 }
393392 }
@@ -401,17 +400,17 @@ var errCaughtUp = errors.New("caught up with DA head")
401400// waitOnCatchupError logs the error and backs off before retrying.
402401// It returns true if the caller should continue (retry), or false if the
403402// catchup loop should exit (context cancelled or caught-up sentinel).
404- func (f * daFollower ) waitOnCatchupError (err error , daHeight uint64 ) bool {
403+ func (f * daFollower ) waitOnCatchupError (ctx context. Context , err error , daHeight uint64 ) bool {
405404 if errors .Is (err , errCaughtUp ) {
406405 f .logger .Debug ().Uint64 ("da_height" , daHeight ).Msg ("DA catchup reached head, waiting for subscription signal" )
407406 return false
408407 }
409- if f . ctx .Err () != nil {
408+ if ctx .Err () != nil {
410409 return false
411410 }
412411 f .logger .Warn ().Err (err ).Uint64 ("da_height" , daHeight ).Msg ("catchup error, backing off" )
413412 select {
414- case <- f . ctx .Done ():
413+ case <- ctx .Done ():
415414 return false
416415 case <- time .After (f .backoff ()):
417416 return true
0 commit comments