Skip to content

Commit bc9330f

Browse files
committed
Retry getBlockTimestamp
(cherry picked from commit a6573fe6c40082380759f55018a47c8da5bf97bd)
1 parent eabde1a commit bc9330f

2 files changed

Lines changed: 122 additions & 10 deletions

File tree

block/internal/da/client.go

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ type client struct {
4242
// Ensure client implements the FullClient interface (Client + BlobGetter + Verifier).
4343
var _ FullClient = (*client)(nil)
4444

45+
const (
46+
blockTimestampFetchMaxAttempts = 3
47+
blockTimestampFetchBackoff = 100 * time.Millisecond
48+
)
49+
4550
// NewClient creates a new blob client wrapper with pre-calculated namespace bytes.
4651
func NewClient(cfg Config) FullClient {
4752
if cfg.DA == nil {
@@ -184,15 +189,40 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace
184189

185190
// getBlockTimestamp fetches the block timestamp from the DA layer header.
186191
func (c *client) getBlockTimestamp(ctx context.Context, height uint64) (time.Time, error) {
187-
headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
188-
defer cancel()
192+
var lastErr error
193+
backoff := blockTimestampFetchBackoff
194+
195+
for attempt := 1; attempt <= blockTimestampFetchMaxAttempts; attempt++ {
196+
headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
197+
header, err := c.headerAPI.GetByHeight(headerCtx, height)
198+
cancel()
199+
if err == nil {
200+
return header.Time(), nil
201+
}
202+
lastErr = err
189203

190-
header, err := c.headerAPI.GetByHeight(headerCtx, height)
191-
if err != nil {
192-
return time.Time{}, fmt.Errorf("failed to get header timestamp for block %d: %w", height, err)
204+
if attempt == blockTimestampFetchMaxAttempts {
205+
break
206+
}
207+
208+
c.logger.Info().
209+
Uint64("height", height).
210+
Int("attempt", attempt).
211+
Int("max_attempts", blockTimestampFetchMaxAttempts).
212+
Dur("retry_in", backoff).
213+
Err(err).
214+
Msg("failed to get block timestamp, retrying")
215+
216+
select {
217+
case <-ctx.Done():
218+
return time.Time{}, fmt.Errorf("fetching header timestamp for block %d: %w", height, ctx.Err())
219+
case <-time.After(backoff):
220+
}
221+
222+
backoff *= 2
193223
}
194224

195-
return header.Time(), nil
225+
return time.Time{}, fmt.Errorf("get header timestamp for block %d after %d attempts: %w", height, blockTimestampFetchMaxAttempts, lastErr)
196226
}
197227

198228
// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
@@ -224,8 +254,13 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
224254
blockTime, err := c.getBlockTimestamp(ctx, height)
225255
if err != nil {
226256
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
227-
blockTime = time.Now()
228-
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
257+
return datypes.ResultRetrieve{
258+
BaseResult: datypes.BaseResult{
259+
Code: datypes.StatusError,
260+
Message: fmt.Sprintf("failed to get block timestamp: %s", err.Error()),
261+
Height: height,
262+
},
263+
}
229264
}
230265

231266
return datypes.ResultRetrieve{
@@ -262,8 +297,13 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
262297
blockTime, err := c.getBlockTimestamp(ctx, height)
263298
if err != nil {
264299
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
265-
blockTime = time.Now()
266-
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
300+
return datypes.ResultRetrieve{
301+
BaseResult: datypes.BaseResult{
302+
Code: datypes.StatusError,
303+
Message: fmt.Sprintf("failed to get block timestamp: %s", err.Error()),
304+
Height: height,
305+
},
306+
}
267307
}
268308

269309
if len(blobs) == 0 {

block/internal/da/client_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,78 @@ func TestClient_Retrieve_Success(t *testing.T) {
156156
require.Equal(t, expectedTime, res.Timestamp)
157157
}
158158

159+
func TestClient_Retrieve_TimestampFetchRetry(t *testing.T) {
160+
ns := share.MustNewV0Namespace([]byte("ns"))
161+
nsBz := ns.Bytes()
162+
fixedTime := time.Date(2024, 1, 2, 12, 0, 0, 0, time.UTC)
163+
164+
specs := map[string]struct {
165+
getAllErr error
166+
headerFailures int
167+
wantStatus datypes.StatusCode
168+
wantTimestamp time.Time
169+
wantMessageSubstr string
170+
wantHeaderCalls int
171+
}{
172+
"success_retries_timestamp_fetch": {
173+
getAllErr: nil,
174+
headerFailures: 2,
175+
wantStatus: datypes.StatusSuccess,
176+
wantTimestamp: fixedTime,
177+
wantHeaderCalls: 3,
178+
},
179+
"not_found_fails_hard_when_timestamp_unavailable": {
180+
getAllErr: datypes.ErrBlobNotFound,
181+
headerFailures: blockTimestampFetchMaxAttempts,
182+
wantStatus: datypes.StatusError,
183+
wantMessageSubstr: "failed to get block timestamp",
184+
wantHeaderCalls: blockTimestampFetchMaxAttempts,
185+
},
186+
}
187+
188+
for name, spec := range specs {
189+
t.Run(name, func(t *testing.T) {
190+
blobModule := mocks.NewMockBlobModule(t)
191+
headerModule := mocks.NewMockHeaderModule(t)
192+
193+
if spec.getAllErr != nil {
194+
blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob(nil), spec.getAllErr).Once()
195+
} else {
196+
b, err := blobrpc.NewBlobV0(ns, []byte("payload"))
197+
require.NoError(t, err)
198+
blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob{b}, nil).Once()
199+
}
200+
201+
headerCalls := 0
202+
headerModule.EXPECT().GetByHeight(mock.Anything, uint64(7)).RunAndReturn(func(context.Context, uint64) (*blobrpc.Header, error) {
203+
headerCalls++
204+
if headerCalls <= spec.headerFailures {
205+
return nil, errors.New("header unavailable")
206+
}
207+
return &blobrpc.Header{Header: blobrpc.RawHeader{Time: fixedTime}}, nil
208+
})
209+
210+
cl := NewClient(Config{
211+
DA: makeBlobRPCClient(blobModule, headerModule),
212+
Logger: zerolog.Nop(),
213+
Namespace: "ns",
214+
DataNamespace: "ns",
215+
})
216+
217+
res := cl.Retrieve(context.Background(), 7, nsBz)
218+
require.Equal(t, spec.wantStatus, res.Code)
219+
require.Equal(t, spec.wantHeaderCalls, headerCalls)
220+
221+
if !spec.wantTimestamp.IsZero() {
222+
require.Equal(t, spec.wantTimestamp, res.Timestamp)
223+
}
224+
if spec.wantMessageSubstr != "" {
225+
require.Contains(t, res.Message, spec.wantMessageSubstr)
226+
}
227+
})
228+
}
229+
}
230+
159231
func TestClient_SubmitOptionsMerge(t *testing.T) {
160232
ns := share.MustNewV0Namespace([]byte("ns")).Bytes()
161233
blobModule := mocks.NewMockBlobModule(t)

0 commit comments

Comments
 (0)