Skip to content

Commit b980434

Browse files
Merge pull request #4510 from OffchainLabs/pmikolajczyk/nit-4666-ordering-stop-wait
Fix `StopWaiter` lifecycle: ordering and context propagation
2 parents 5dbc0cb + fd021ac commit b980434

12 files changed

Lines changed: 38 additions & 28 deletions

File tree

arbnode/batch_poster.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2071,9 +2071,9 @@ func (b *BatchPoster) GetBacklogEstimate() uint64 {
20712071
}
20722072

20732073
func (b *BatchPoster) Start(ctxIn context.Context) {
2074-
b.dataPoster.Start(ctxIn)
2075-
b.redisLock.Start(ctxIn)
20762074
b.StopWaiter.Start(ctxIn, b)
2075+
b.dataPoster.Start(b.GetContext())
2076+
b.redisLock.Start(b.GetContext())
20772077
b.LaunchThread(b.pollForReverts)
20782078
b.LaunchThread(b.pollForL1PriceData)
20792079
commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0)
@@ -2157,9 +2157,9 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
21572157
}
21582158

21592159
func (b *BatchPoster) StopAndWait() {
2160-
b.StopWaiter.StopAndWait()
2161-
b.dataPoster.StopAndWait()
21622160
b.redisLock.StopAndWait()
2161+
b.dataPoster.StopAndWait()
2162+
b.StopWaiter.StopAndWait()
21632163
}
21642164

21652165
type BoolRing struct {

changelog/pmikolajczyk-nit-4666.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
### Fixed
2+
- Fix StopWaiter lifecycle ordering: stop children before parent in StopAndWait, and pass managed context to children in Start

cmd/el-proxy/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,12 @@ func NewExpressLaneProxy(
137137

138138
func (p *ExpressLaneProxy) Start(ctx context.Context) {
139139
p.StopWaiter.Start(ctx, p)
140-
p.expressLaneTracker.Start(ctx)
140+
p.expressLaneTracker.Start(p.GetContext())
141141
}
142142

143143
func (p *ExpressLaneProxy) StopAndWait() {
144-
p.StopWaiter.StopAndWait()
145144
p.expressLaneTracker.StopAndWait()
145+
p.StopWaiter.StopAndWait()
146146
}
147147

148148
var ErrorInternalConnectionError = errors.New("internal connection error")

execution/gethexec/addressfilter/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (s *FilterService) Start(ctx context.Context) {
8484
return s.config.PollInterval
8585
})
8686

87-
s.addressChecker.Start(ctx)
87+
s.addressChecker.Start(s.GetContext())
8888

8989
log.Info("address-filter service started",
9090
"poll_interval", s.config.PollInterval,

execution/gethexec/express_lane_service.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,18 +123,20 @@ func (es *expressLaneService) Start(ctxIn context.Context) {
123123
es.StopWaiter.Start(ctxIn, es)
124124

125125
if es.redisCoordinator != nil {
126-
es.redisCoordinator.Start(ctxIn)
126+
es.redisCoordinator.Start(es.GetContext())
127127
}
128128
}
129129

130130
func (es *expressLaneService) StopAndWait() {
131-
es.StopWaiter.StopAndWait()
132131
if es.redisCoordinator != nil {
133132
es.redisCoordinator.StopAndWait()
134133
}
134+
// tracker is started by ExecutionNode, not by expressLaneService,
135+
// but stopped here because no one else does it.
135136
if es.tracker != nil {
136137
es.tracker.StopAndWait()
137138
}
139+
es.StopWaiter.StopAndWait()
138140
}
139141

140142
// DontCareSequence is a special sequence number that indicates a transaction should bypass the

execution/gethexec/sequencer.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,13 +1864,15 @@ func (s TxSource) String() string {
18641864
}
18651865

18661866
func (s *Sequencer) StopAndWait() {
1867-
s.StopWaiter.StopAndWait()
1868-
if s.addressFilterService != nil {
1869-
s.addressFilterService.StopAndWait()
1870-
}
1867+
// expressLaneService is started by ExecutionNode via StartExpressLaneService,
1868+
// but stopped here because the Sequencer owns it.
18711869
if s.config().Timeboost.Enable && s.expressLaneService != nil {
18721870
s.expressLaneService.StopAndWait()
18731871
}
1872+
if s.addressFilterService != nil {
1873+
s.addressFilterService.StopAndWait()
1874+
}
1875+
s.StopWaiter.StopAndWait()
18741876
if s.txRetryQueue.Len() == 0 &&
18751877
len(s.txQueue) == 0 &&
18761878
s.nonceFailures.Len() == 0 &&

relay/relay.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,12 @@ func (r *Relay) Start(ctx context.Context) error {
8181
if err != nil {
8282
return errors.New("broadcast unable to initialize")
8383
}
84-
err = r.broadcaster.Start(ctx)
84+
err = r.broadcaster.Start(r.GetContext())
8585
if err != nil {
8686
return errors.New("broadcast unable to start")
8787
}
8888

89-
r.broadcastClients.Start(ctx)
89+
r.broadcastClients.Start(r.GetContext())
9090

9191
r.LaunchThread(func(ctx context.Context) {
9292
for {
@@ -112,9 +112,9 @@ func (r *Relay) GetListenerAddr() net.Addr {
112112
}
113113

114114
func (r *Relay) StopAndWait() {
115-
r.StopWaiter.StopAndWait()
116115
r.broadcastClients.StopAndWait()
117116
r.broadcaster.StopAndWait()
117+
r.StopWaiter.StopAndWait()
118118
}
119119

120120
type Config struct {

staker/legacy/staker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,10 +561,11 @@ func (s *Staker) getLatestStakedState(ctx context.Context, stakerAddress common.
561561
}
562562

563563
func (s *Staker) StopAndWait() {
564-
s.StopWaiter.StopAndWait()
564+
// wallet is started externally, but stopped here because the Staker owns it.
565565
if s.Strategy() != WatchtowerStrategy {
566566
s.wallet.StopAndWait()
567567
}
568+
s.StopWaiter.StopAndWait()
568569
}
569570

570571
func (s *Staker) Start(ctxIn context.Context) {

timeboost/auctioneer.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -445,15 +445,15 @@ func (a *AuctioneerServer) Start(ctx_in context.Context) {
445445
a.StopWaiter.Start(ctx_in, a)
446446
// Start S3 storage service to persist validated bids to s3
447447
if a.s3StorageService != nil {
448-
a.s3StorageService.Start(ctx_in)
448+
a.s3StorageService.Start(a.GetContext())
449449
}
450450

451451
// Start coordination to manage primary/secondary status
452452
a.StopWaiter.CallIteratively(a.updateCoordination)
453453

454454
// Channel that consumer uses to indicate its readiness.
455455
readyStream := make(chan struct{}, 1)
456-
a.consumer.Start(ctx_in)
456+
a.consumer.Start(a.GetContext())
457457
// Channel for single consumer, once readiness is indicated in this,
458458
// consumer will start consuming iteratively.
459459
ready := make(chan struct{}, 1)
@@ -795,6 +795,9 @@ func (a *AuctioneerServer) StopAndWait() {
795795
// auctioneerLivenessTimeout. This timeout gives time for existing messages to become
796796
// unclaimed after IdleTimeToAutoclaim before the secondary auctioneer starts consuming
797797
// messages.
798-
a.StopWaiter.StopAndWait()
799798
a.consumer.StopAndWait()
799+
if a.s3StorageService != nil {
800+
a.s3StorageService.StopAndWait()
801+
}
802+
a.StopWaiter.StopAndWait()
800803
}

timeboost/bid_validator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (bv *BidValidator) Start(ctx_in context.Context) {
226226
if bv.producer == nil {
227227
log.Crit("Bid validator not yet initialized by calling Initialize(ctx)")
228228
}
229-
bv.producer.Start(ctx_in)
229+
bv.producer.Start(bv.GetContext())
230230

231231
// Thread to set reserve price and clear per-round map of bid count per account.
232232
bv.StopWaiter.LaunchThread(func(ctx context.Context) {

0 commit comments

Comments
 (0)