-
Notifications
You must be signed in to change notification settings - Fork 260
Expand file tree
/
Copy pathda_retriever.go
More file actions
384 lines (328 loc) · 12.8 KB
/
da_retriever.go
File metadata and controls
384 lines (328 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
package syncing
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"github.com/rs/zerolog"
"google.golang.org/protobuf/proto"
"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/da"
datypes "github.com/evstack/ev-node/pkg/da/types"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/types"
pb "github.com/evstack/ev-node/types/pb/evnode/v1"
)
// DARetriever defines the interface for retrieving events from the DA layer
type DARetriever interface {
// RetrieveFromDA retrieves blocks from the specified DA height and returns height events
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
// ProcessBlobs parses raw blob bytes at a given DA height into height events.
// Used by the DAFollower to process subscription blobs inline without re-fetching.
ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent
}
// daRetriever handles DA retrieval operations for syncing
type daRetriever struct {
client da.Client
cache cache.CacheManager
genesis genesis.Genesis
logger zerolog.Logger
mu sync.Mutex
// transient cache, only full event need to be passed to the syncer
// on restart, will be refetch as da height is updated by syncer
pendingHeaders map[uint64]*types.SignedHeader
pendingData map[uint64]*types.Data
// strictMode indicates if the node has seen a valid DAHeaderEnvelope
// and should now reject all legacy/unsigned headers.
strictMode bool
}
// NewDARetriever creates a new DA retriever
func NewDARetriever(
client da.Client,
cache cache.CacheManager,
genesis genesis.Genesis,
logger zerolog.Logger,
) *daRetriever {
return &daRetriever{
client: client,
cache: cache,
genesis: genesis,
logger: logger.With().Str("component", "da_retriever").Logger(),
pendingHeaders: make(map[uint64]*types.SignedHeader),
pendingData: make(map[uint64]*types.Data),
strictMode: false,
}
}
// RetrieveFromDA retrieves blocks from the specified DA height and returns height events
func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
r.logger.Debug().Uint64("da_height", daHeight).Msg("retrieving from DA")
blobsResp, err := r.fetchBlobs(ctx, daHeight)
if err != nil {
return nil, err
}
// Check for context cancellation upfront
if err := ctx.Err(); err != nil {
return nil, err
}
r.logger.Debug().Int("blobs", len(blobsResp.Data)).Uint64("da_height", daHeight).Msg("retrieved blob data")
return r.processBlobs(ctx, blobsResp.Data, daHeight), nil
}
// fetchBlobs retrieves blobs from both header and data namespaces
func (r *daRetriever) fetchBlobs(ctx context.Context, daHeight uint64) (datypes.ResultRetrieve, error) {
// Retrieve from both namespaces using the DA client
headerRes := r.client.RetrieveBlobs(ctx, daHeight, r.client.GetHeaderNamespace())
// If namespaces are the same, return header result
if bytes.Equal(r.client.GetHeaderNamespace(), r.client.GetDataNamespace()) {
return headerRes, r.validateBlobResponse(headerRes, daHeight)
}
dataRes := r.client.RetrieveBlobs(ctx, daHeight, r.client.GetDataNamespace())
// Validate responses
headerErr := r.validateBlobResponse(headerRes, daHeight)
// ignoring error not found, as data can have data
if headerErr != nil && !errors.Is(headerErr, datypes.ErrBlobNotFound) {
return headerRes, headerErr
}
dataErr := r.validateBlobResponse(dataRes, daHeight)
// ignoring error not found, as header can have data
if dataErr != nil && !errors.Is(dataErr, datypes.ErrBlobNotFound) {
return dataRes, dataErr
}
// Combine successful results
combinedResult := datypes.ResultRetrieve{
BaseResult: datypes.BaseResult{
Code: datypes.StatusSuccess,
Height: daHeight,
},
Data: make([][]byte, 0),
}
if headerRes.Code == datypes.StatusSuccess {
combinedResult.Data = append(combinedResult.Data, headerRes.Data...)
combinedResult.IDs = append(combinedResult.IDs, headerRes.IDs...)
}
if dataRes.Code == datypes.StatusSuccess {
combinedResult.Data = append(combinedResult.Data, dataRes.Data...)
combinedResult.IDs = append(combinedResult.IDs, dataRes.IDs...)
}
// Re-throw error not found if both were not found.
if len(combinedResult.Data) == 0 && len(combinedResult.IDs) == 0 {
r.logger.Debug().Uint64("da_height", daHeight).Msg("no blob data found")
combinedResult.Code = datypes.StatusNotFound
combinedResult.Message = datypes.ErrBlobNotFound.Error()
return combinedResult, datypes.ErrBlobNotFound
}
return combinedResult, nil
}
// validateBlobResponse validates a blob response from DA layer
// those are the only error code returned by da.RetrieveWithHelpers
func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight uint64) error {
switch res.Code {
case datypes.StatusError:
return fmt.Errorf("DA retrieval failed: %s", res.Message)
case datypes.StatusHeightFromFuture:
return fmt.Errorf("%w: height from future", datypes.ErrHeightFromFuture)
case datypes.StatusNotFound:
return fmt.Errorf("%w: blob not found", datypes.ErrBlobNotFound)
case datypes.StatusSuccess:
r.logger.Debug().Uint64("da_height", daHeight).Msg("successfully retrieved from DA")
return nil
default:
return nil
}
}
// ProcessBlobs processes raw blob bytes to extract headers and data and returns height events.
// This is the public interface used by the DAFollower for inline subscription processing.
func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
return r.processBlobs(ctx, blobs, daHeight)
}
// processBlobs processes retrieved blobs to extract headers and data and returns height events
func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
r.mu.Lock()
defer r.mu.Unlock()
// Decode all blobs
for _, bz := range blobs {
if len(bz) == 0 {
continue
}
if header := r.tryDecodeHeader(bz, daHeight); header != nil {
if _, ok := r.pendingHeaders[header.Height()]; ok {
// a (malicious) node may have re-published valid header to another da height (should never happen)
// we can already discard it, only the first one is valid
r.logger.Debug().Uint64("height", header.Height()).Uint64("da_height", daHeight).Msg("header blob already exists for height, discarding")
continue
}
r.pendingHeaders[header.Height()] = header
continue
}
if data := r.tryDecodeData(bz, daHeight); data != nil {
if _, ok := r.pendingData[data.Height()]; ok {
// a (malicious) node may have re-published valid data to another da height (should never happen)
// we can already discard it, only the first one is valid
r.logger.Debug().Uint64("height", data.Height()).Uint64("da_height", daHeight).Msg("data blob already exists for height, discarding")
continue
}
r.pendingData[data.Height()] = data
}
}
var events []common.DAHeightEvent
// Match headers with data and create events
for height, header := range r.pendingHeaders {
data := r.pendingData[height]
// Handle empty data case
if data == nil {
if isEmptyDataExpected(header) {
data = createEmptyDataForHeader(ctx, header)
delete(r.pendingHeaders, height)
} else {
// keep header in pending headers until data lands
r.logger.Debug().Uint64("height", height).Msg("header found but no matching data")
continue
}
} else {
delete(r.pendingHeaders, height)
delete(r.pendingData, height)
}
// Create height event
event := common.DAHeightEvent{
Header: header,
Data: data,
DaHeight: daHeight,
Source: common.SourceDA,
}
events = append(events, event)
}
if len(events) > 0 {
startHeight := events[0].Header.Height()
endHeight := events[0].Header.Height()
for _, event := range events {
h := event.Header.Height()
if h < startHeight {
startHeight = h
}
if h > endHeight {
endHeight = h
}
}
r.logger.Info().Uint64("da_height", daHeight).Uint64("start_height", startHeight).Uint64("end_height", endHeight).Msg("processed blocks from DA")
}
return events
}
// tryDecodeHeader attempts to decode a blob as a header
func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedHeader {
header := new(types.SignedHeader)
isValidEnvelope := false
// Attempt to unmarshal as DAHeaderEnvelope and get the envelope signature
if envelopeSignature, err := header.UnmarshalDAEnvelope(bz); err != nil {
// If in strict mode, we REQUIRE an envelope.
if r.strictMode {
r.logger.Warn().Err(err).Msg("strict mode is enabled, rejecting non-envelope blob")
return nil
}
// Fallback for backward compatibility (only if NOT in strict mode)
r.logger.Debug().Msg("trying legacy decoding")
var headerPb pb.SignedHeader
if errLegacy := proto.Unmarshal(bz, &headerPb); errLegacy != nil {
return nil
}
if errLegacy := header.FromProto(&headerPb); errLegacy != nil {
return nil
}
} else {
// We have a structurally valid envelope (or at least it parsed)
if len(envelopeSignature) > 0 {
if header.Signer.PubKey == nil {
r.logger.Debug().Msg("header signer has no pubkey, cannot verify envelope")
return nil
}
payload, err := header.MarshalBinary()
if err != nil {
r.logger.Debug().Err(err).Msg("failed to marshal header for verification")
return nil
}
if valid, err := header.Signer.PubKey.Verify(payload, envelopeSignature); err != nil || !valid {
r.logger.Info().Err(err).Msg("DA envelope signature verification failed")
return nil
}
r.logger.Debug().Uint64("height", header.Height()).Msg("DA envelope signature verified")
isValidEnvelope = true
}
}
if r.strictMode && !isValidEnvelope {
// no need to print warnings, as tryDecodeHeader could try to decode data first, which will show this warning.
r.logger.Debug().Msg("strict mode: rejecting block that is not a fully valid envelope")
return nil
}
if err := header.Header.ValidateBasic(); err != nil {
r.logger.Debug().Err(err).Msg("invalid header structure")
return nil
}
if err := r.assertExpectedProposer(header.ProposerAddress); err != nil {
r.logger.Debug().Err(err).Msg("unexpected proposer")
return nil
}
if isValidEnvelope && !r.strictMode {
r.logger.Info().Uint64("height", header.Height()).Msg("valid DA envelope detected, switching to STRICT MODE")
r.strictMode = true
}
// Optimistically mark as DA included
// This has to be done for all fetched DA headers prior to validation because P2P does not confirm
// da inclusion. This is not an issue, as an invalid header will be rejected. There cannot be hash collisions.
headerHash := header.MemoizeHash().String()
r.cache.SetHeaderDAIncluded(headerHash, daHeight, header.Height())
r.logger.Debug().
Str("header_hash", headerHash).
Uint64("da_height", daHeight).
Uint64("height", header.Height()).
Msg("optimistically marked header as DA included")
return header
}
// tryDecodeData attempts to decode a blob as signed data
func (r *daRetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data {
var signedData types.SignedData
if err := signedData.UnmarshalBinary(bz); err != nil {
return nil
}
// Skip completely empty data
if len(signedData.Txs) == 0 && len(signedData.Signature) == 0 {
return nil
}
// Validate signature using the configured provider
if err := r.assertValidSignedData(&signedData); err != nil {
r.logger.Debug().Err(err).Msg("invalid signed data")
return nil
}
// Mark as DA included
dataHash := signedData.Data.DACommitment().String()
r.cache.SetDataDAIncluded(dataHash, daHeight, signedData.Height())
r.logger.Debug().
Str("data_hash", dataHash).
Uint64("da_height", daHeight).
Uint64("height", signedData.Height()).
Msg("data marked as DA included")
return &signedData.Data
}
// assertExpectedProposer validates the proposer address
func (r *daRetriever) assertExpectedProposer(proposerAddr []byte) error {
return assertExpectedProposer(r.genesis, proposerAddr)
}
// assertValidSignedData validates signed data using the configured signature provider
func (r *daRetriever) assertValidSignedData(signedData *types.SignedData) error {
return assertValidSignedData(signedData, r.genesis)
}
// isEmptyDataExpected checks if empty data is expected for a header
func isEmptyDataExpected(header *types.SignedHeader) bool {
return len(header.DataHash) == 0 || bytes.Equal(header.DataHash, common.DataHashForEmptyTxs)
}
// createEmptyDataForHeader creates empty data for a header
func createEmptyDataForHeader(_ context.Context, header *types.SignedHeader) *types.Data {
return &types.Data{
Txs: make(types.Txs, 0),
Metadata: &types.Metadata{
ChainID: header.ChainID(),
Height: header.Height(),
Time: header.BaseHeader.Time,
LastDataHash: nil, // LastDataHash must be filled in the syncer, as it is not available here since block n-1 has not been processed yet.
},
}
}