-
Notifications
You must be signed in to change notification settings - Fork 260
Expand file tree
/
Copy pathexecution.go
More file actions
1140 lines (993 loc) · 41.2 KB
/
execution.go
File metadata and controls
1140 lines (993 loc) · 41.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package evm
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"math/big"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/golang-jwt/jwt/v5"
ds "github.com/ipfs/go-datastore"
"github.com/rs/zerolog"
"github.com/evstack/ev-node/core/execution"
"github.com/evstack/ev-node/pkg/telemetry"
)
const (
// MaxPayloadStatusRetries is the maximum number of retries for SYNCING status.
// According to the Engine API specification, SYNCING indicates temporary unavailability
// and should be retried with exponential backoff.
MaxPayloadStatusRetries = 3
// InitialRetryBackoff is the initial backoff duration for retries.
// The backoff doubles on each retry attempt (exponential backoff).
InitialRetryBackoff = 1 * time.Second
// SafeBlockLag is the number of blocks the safe block lags behind the head.
// This provides a buffer for reorg protection - safe blocks won't be reorged
// under normal operation. A value of 2 means when head is at block N,
// safe is at block N-2.
SafeBlockLag = 2
// FinalizedBlockLag is the number of blocks the finalized block lags behind head.
// This is a temporary mock value until proper DA-based finalization is wired up.
// A value of 3 means when head is at block N, finalized is at block N-3.
FinalizedBlockLag = 3
)
var (
// ErrInvalidPayloadStatus indicates that the execution engine returned a permanent
// failure status (INVALID or unknown status). This error should not be retried.
ErrInvalidPayloadStatus = errors.New("invalid payload status")
// ErrPayloadSyncing indicates that the execution engine is temporarily syncing.
// According to the Engine API specification, this is a transient condition that
// should be handled with retry logic rather than immediate failure.
ErrPayloadSyncing = errors.New("payload syncing")
)
// Ensure EngineAPIExecutionClient implements the execution.Execute interface
var _ execution.Executor = (*EngineClient)(nil)
// Ensure EngineClient implements the execution.HeightProvider interface
var _ execution.HeightProvider = (*EngineClient)(nil)
// Ensure EngineClient implements the execution.Rollbackable interface
var _ execution.Rollbackable = (*EngineClient)(nil)
// Ensure EngineClient implements optional pruning interface when used with
// ev-node's height-based pruning.
var _ execution.ExecPruner = (*EngineClient)(nil)
// validatePayloadStatus checks the payload status and returns appropriate errors.
// It implements the Engine API specification's status handling:
// - VALID: Operation succeeded, return nil
// - SYNCING/ACCEPTED: Temporary unavailability, return ErrPayloadSyncing for retry
// - INVALID: Permanent failure, return ErrInvalidPayloadStatus (no retry)
// - Unknown: Treat as permanent failure (no retry)
func validatePayloadStatus(status engine.PayloadStatusV1) error {
switch status.Status {
case engine.VALID:
return nil
case engine.SYNCING, engine.ACCEPTED:
// SYNCING and ACCEPTED indicate temporary unavailability - should retry
return ErrPayloadSyncing
case engine.INVALID:
// INVALID is a permanent failure - should not retry
return ErrInvalidPayloadStatus
default:
// Unknown status - treat as invalid
return ErrInvalidPayloadStatus
}
}
func latestValidHashHex(latestValidHash *common.Hash) string {
if latestValidHash == nil {
return ""
}
return latestValidHash.Hex()
}
// retryWithBackoffOnPayloadStatus executes a function with exponential backoff retry logic.
// It implements the Engine API specification's recommendation to retry SYNCING
// status with exponential backoff. The function:
// - Retries only on ErrPayloadSyncing (transient failures)
// - Fails immediately on ErrInvalidPayloadStatus (permanent failures)
// - Respects context cancellation for graceful shutdown
// - Uses exponential backoff that doubles on each attempt
func retryWithBackoffOnPayloadStatus(ctx context.Context, fn func() error, maxRetries int, initialBackoff time.Duration, operation string) error {
backoff := initialBackoff
for attempt := 1; attempt <= maxRetries; attempt++ {
err := fn()
if err == nil {
return nil
}
// Don't retry on invalid status
if errors.Is(err, ErrInvalidPayloadStatus) {
return err
}
// Only retry on syncing status
if !errors.Is(err, ErrPayloadSyncing) {
return err
}
// Check if we've exhausted retries
if attempt >= maxRetries {
return fmt.Errorf("max retries (%d) exceeded for %s: %w", maxRetries, operation, err)
}
// Wait with exponential backoff
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled during retry for %s: %w", operation, ctx.Err())
case <-time.After(backoff):
backoff *= 2
}
}
return fmt.Errorf("max retries (%d) exceeded for %s", maxRetries, operation)
}
// EngineRPCClient abstracts Engine API RPC calls for tracing and testing.
type EngineRPCClient interface {
// ForkchoiceUpdated updates the forkchoice state and optionally starts payload building.
ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error)
// GetPayload retrieves a previously requested execution payload.
GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error)
// NewPayload submits a new execution payload for validation.
NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error)
}
// EthRPCClient abstracts Ethereum JSON-RPC calls for tracing and testing.
type EthRPCClient interface {
// HeaderByNumber retrieves a block header by number (nil = latest).
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
// GetTxs retrieves pending transactions from the transaction pool.
GetTxs(ctx context.Context) ([]string, error)
}
// EngineClient represents a client that interacts with an Ethereum execution engine
// through the Engine API. It manages connections to both the engine and standard Ethereum
// APIs, and maintains state related to block processing.
type EngineClient struct {
engineClient EngineRPCClient // Client for Engine API calls
ethClient EthRPCClient // Client for standard Ethereum API calls
genesisHash common.Hash // Hash of the genesis block
initialHeight uint64
feeRecipient common.Address // Address to receive transaction fees
// store provides persistence for ExecMeta to enable idempotent execution
// and crash recovery.
store *EVMStore
mu sync.Mutex // Mutex to protect concurrent access to block hashes
currentHeadBlockHash common.Hash // Store last non-finalized HeadBlockHash
currentHeadHeight uint64 // Height of the current head block (for safe lag calculation)
currentSafeBlockHash common.Hash // Store last non-finalized SafeBlockHash
currentFinalizedBlockHash common.Hash // Store last finalized block hash
blockHashCache map[uint64]common.Hash // height -> hash cache for safe block lookups
cachedExecutionInfo atomic.Pointer[execution.ExecutionInfo] // Cached execution info (gas limit)
logger zerolog.Logger
}
// NewEngineExecutionClient creates a new instance of EngineAPIExecutionClient.
// The db parameter is required for ExecMeta tracking which enables idempotent
// execution and crash recovery. The db is wrapped with a prefix to isolate
// EVM execution data from other ev-node data.
// When tracingEnabled is true, the client will inject W3C trace context headers
// and wrap Engine API and Eth API calls with OpenTelemetry spans.
func NewEngineExecutionClient(
ethURL,
engineURL string,
jwtSecret string,
genesisHash common.Hash,
feeRecipient common.Address,
db ds.Batching,
tracingEnabled bool,
logger zerolog.Logger,
) (*EngineClient, error) {
if db == nil {
return nil, errors.New("db is required for EVM execution client")
}
var rpcOpts []rpc.ClientOption
// If tracing enabled, add W3C header propagation to rpcOpts
if tracingEnabled {
rpcOpts = append(rpcOpts, rpc.WithHTTPClient(
telemetry.NewPropagatingHTTPClient(http.DefaultTransport)))
}
// Create ETH RPC client with HTTP options
ethRPC, err := rpc.DialOptions(context.Background(), ethURL, rpcOpts...)
if err != nil {
return nil, err
}
rawEthClient := ethclient.NewClient(ethRPC)
secret, err := decodeSecret(jwtSecret)
if err != nil {
return nil, err
}
// Create Engine RPC with optional HTTP client and JWT auth
// Compose engine options: pass-through rpcOpts plus JWT auth
engineOptions := make([]rpc.ClientOption, len(rpcOpts))
copy(engineOptions, rpcOpts) // copy to avoid using same backing array from rpcOpts.
engineOptions = append(engineOptions, rpc.WithHTTPAuth(func(h http.Header) error {
authToken, err := getAuthToken(secret)
if err != nil {
return err
}
if authToken != "" {
h.Set("Authorization", "Bearer "+authToken)
}
return nil
}))
rawEngineClient, err := rpc.DialOptions(context.Background(), engineURL, engineOptions...)
if err != nil {
return nil, err
}
// wrap raw clients with interfaces
engineClient := NewEngineRPCClient(rawEngineClient)
ethClient := NewEthRPCClient(rawEthClient)
// if tracing enabled, wrap with traced decorators
if tracingEnabled {
engineClient = withTracingEngineRPCClient(engineClient)
ethClient = withTracingEthRPCClient(ethClient)
}
return &EngineClient{
engineClient: engineClient,
ethClient: ethClient,
genesisHash: genesisHash,
feeRecipient: feeRecipient,
store: NewEVMStore(db),
currentHeadBlockHash: genesisHash,
currentSafeBlockHash: genesisHash,
currentFinalizedBlockHash: genesisHash,
blockHashCache: make(map[uint64]common.Hash),
logger: logger.With().Str("module", "engine_client").Logger(),
}, nil
}
// InitChain initializes the blockchain with the given genesis parameters
func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) ([]byte, error) {
if initialHeight != 1 {
return nil, fmt.Errorf("initialHeight must be 1, got %d", initialHeight)
}
// Acknowledge the genesis block with retry logic for SYNCING status
err := retryWithBackoffOnPayloadStatus(ctx, func() error {
forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx,
engine.ForkchoiceStateV1{
HeadBlockHash: c.genesisHash,
SafeBlockHash: c.genesisHash,
FinalizedBlockHash: c.genesisHash,
},
nil,
)
if err != nil {
return fmt.Errorf("engine_forkchoiceUpdatedV3 failed: %w", err)
}
// Validate payload status
if err := validatePayloadStatus(forkchoiceResult.PayloadStatus); err != nil {
c.logger.Warn().
Str("status", forkchoiceResult.PayloadStatus.Status).
Str("latestValidHash", latestValidHashHex(forkchoiceResult.PayloadStatus.LatestValidHash)).
Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError).
Msg("InitChain: engine_forkchoiceUpdatedV3 returned non-VALID status")
return err
}
return nil
}, MaxPayloadStatusRetries, InitialRetryBackoff, "InitChain")
if err != nil {
return nil, err
}
_, stateRoot, _, _, err := c.getBlockInfo(ctx, 0)
if err != nil {
return nil, fmt.Errorf("failed to get block info: %w", err)
}
c.initialHeight = initialHeight
return stateRoot[:], nil
}
// GetTxs retrieves transactions from the current execution payload
func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) {
result, err := c.ethClient.GetTxs(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get tx pool content: %w", err)
}
txs := make([][]byte, 0, len(result))
for _, rlpHex := range result {
if !strings.HasPrefix(rlpHex, "0x") || len(rlpHex) < 3 {
return nil, fmt.Errorf("invalid hex format for transaction: %s", rlpHex)
}
txBytes := common.FromHex(rlpHex)
if len(txBytes) == 0 && len(rlpHex) > 2 {
return nil, fmt.Errorf("failed to decode hex transaction: %s", rlpHex)
}
txs = append(txs, txBytes)
}
return txs, nil
}
// ExecuteTxs executes the given transactions at the specified block height and timestamp.
//
// ExecMeta tracking (if store is configured):
// - Checks for already-promoted blocks to enable idempotent execution
// - Saves ExecMeta with payloadID after forkchoiceUpdatedV3 for crash recovery
// - Updates ExecMeta to "promoted" after successful execution
func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, err error) {
// 1. Check for idempotent execution
stateRoot, payloadID, found, idempotencyErr := c.reconcileExecutionAtHeight(ctx, blockHeight, timestamp, txs)
if idempotencyErr != nil {
c.logger.Warn().Err(idempotencyErr).Uint64("height", blockHeight).Msg("ExecuteTxs: idempotency check failed")
// Continue execution on error, as it might be transient
} else if found {
if stateRoot != nil {
return stateRoot, nil
}
if payloadID != nil {
// Found in-progress execution, attempt to resume
return c.processPayload(ctx, *payloadID, txs)
}
}
prevBlockHash, prevHeaderStateRoot, prevGasLimit, _, err := c.getBlockInfo(ctx, blockHeight-1)
if err != nil {
return nil, fmt.Errorf("failed to get block info: %w", err)
}
// It's possible that the prev state root passed in is nil if this is the first block.
// If so, we can't do a comparison. Otherwise, we compare the roots.
if len(prevStateRoot) > 0 && !bytes.Equal(prevStateRoot, prevHeaderStateRoot.Bytes()) {
return nil, fmt.Errorf("prevStateRoot mismatch at height %d: consensus=%x execution=%x", blockHeight-1, prevStateRoot, prevHeaderStateRoot.Bytes())
}
// 2. Prepare payload attributes
txsPayload := c.filterTransactions(txs)
// Cache parent block hash for safe-block lookups.
c.cacheBlockHash(blockHeight-1, prevBlockHash)
// Use tracked safe/finalized state rather than prevBlockHash to avoid
// regressing these values. Head must be prevBlockHash to build on top of it.
c.mu.Lock()
args := engine.ForkchoiceStateV1{
HeadBlockHash: prevBlockHash,
SafeBlockHash: c.currentSafeBlockHash,
FinalizedBlockHash: c.currentFinalizedBlockHash,
}
c.mu.Unlock()
// update forkchoice to get the next payload id
// Create evolve-compatible payloadtimestamp.Unix()
evPayloadAttrs := map[string]any{
// Standard Ethereum payload attributes (flattened) - using camelCase as expected by JSON
"timestamp": timestamp.Unix(),
"prevRandao": c.derivePrevRandao(blockHeight),
"suggestedFeeRecipient": c.feeRecipient,
"withdrawals": []*types.Withdrawal{},
// V3 requires parentBeaconBlockRoot
"parentBeaconBlockRoot": common.Hash{}.Hex(), // Use zero hash for evolve
// evolve-specific fields
"transactions": txsPayload,
"gasLimit": prevGasLimit, // Use camelCase to match JSON conventions
}
c.logger.Debug().
Uint64("height", blockHeight).
Int("tx_count", len(txs)).
Msg("engine_forkchoiceUpdatedV3")
// 3. Call forkchoice update to get PayloadID
var newPayloadID *engine.PayloadID
err = retryWithBackoffOnPayloadStatus(ctx, func() error {
forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, evPayloadAttrs)
if err != nil {
return fmt.Errorf("forkchoice update failed: %w", err)
}
// Validate payload status
if err := validatePayloadStatus(forkchoiceResult.PayloadStatus); err != nil {
c.logger.Warn().
Str("status", forkchoiceResult.PayloadStatus.Status).
Str("latestValidHash", latestValidHashHex(forkchoiceResult.PayloadStatus.LatestValidHash)).
Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError).
Uint64("blockHeight", blockHeight).
Msg("ExecuteTxs: engine_forkchoiceUpdatedV3 returned non-VALID status")
return err
}
if forkchoiceResult.PayloadID == nil {
c.logger.Error().
Str("status", forkchoiceResult.PayloadStatus.Status).
Str("latestValidHash", latestValidHashHex(forkchoiceResult.PayloadStatus.LatestValidHash)).
Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError).
Interface("forkchoiceState", args).
Interface("payloadAttributes", evPayloadAttrs).
Uint64("blockHeight", blockHeight).
Msg("returned nil PayloadID")
return fmt.Errorf("returned nil PayloadID - (status: %s, latestValidHash: %s)",
forkchoiceResult.PayloadStatus.Status,
latestValidHashHex(forkchoiceResult.PayloadStatus.LatestValidHash))
}
newPayloadID = forkchoiceResult.PayloadID
return nil
}, MaxPayloadStatusRetries, InitialRetryBackoff, "ExecuteTxs forkchoice")
if err != nil {
return nil, err
}
// Save ExecMeta with payloadID for crash recovery (Stage="started")
// This allows resuming the payload build if we crash before completing
c.saveExecMeta(ctx, blockHeight, timestamp.Unix(), newPayloadID[:], nil, nil, txs, ExecStageStarted)
// 4. Process the payload (get, submit, finalize)
return c.processPayload(ctx, *newPayloadID, txs)
}
// setHead updates the head block hash without changing safe or finalized.
// This is used when reusing an existing block (idempotency check).
func (c *EngineClient) setHead(ctx context.Context, blockHash common.Hash) error {
c.mu.Lock()
c.currentHeadBlockHash = blockHash
// Note: safe and finalized are NOT updated - they advance separately via derivation
args := engine.ForkchoiceStateV1{
HeadBlockHash: c.currentHeadBlockHash,
SafeBlockHash: c.currentSafeBlockHash,
FinalizedBlockHash: c.currentFinalizedBlockHash,
}
c.mu.Unlock()
return c.doForkchoiceUpdate(ctx, args, "setHead")
}
func (c *EngineClient) setFinal(ctx context.Context, blockHash common.Hash, isFinal bool) error {
return c.setFinalWithHeight(ctx, blockHash, 0, isFinal)
}
// setFinalWithHeight updates forkchoice state with safe and finalized block lagging.
// When isFinal=false:
// - Safe block is set to headHeight - SafeBlockLag (when headHeight > SafeBlockLag)
// - Finalized block is set to headHeight - FinalizedBlockLag (when headHeight > FinalizedBlockLag)
//
// Note: The finalized lag is a temporary mock until proper DA-based finalization is wired up.
func (c *EngineClient) setFinalWithHeight(ctx context.Context, blockHash common.Hash, headHeight uint64, isFinal bool) error {
var safeHash, finalizedHash common.Hash
updateSafe := !isFinal && headHeight > SafeBlockLag
updateFinalized := !isFinal && headHeight > FinalizedBlockLag
// Look up safe block hash
if updateSafe {
safeHeight := headHeight - SafeBlockLag
c.mu.Lock()
cachedSafeHash, ok := c.blockHashCache[safeHeight]
c.mu.Unlock()
if ok {
safeHash = cachedSafeHash
} else {
var err error
safeHash, _, _, _, err = c.getBlockInfo(ctx, safeHeight)
if err != nil {
c.logger.Debug().
Uint64("safeHeight", safeHeight).
Err(err).
Msg("setFinalWithHeight: safe block not found, skipping safe update")
updateSafe = false
}
}
}
// Look up finalized block hash
if updateFinalized {
finalizedHeight := headHeight - FinalizedBlockLag
c.mu.Lock()
cachedFinalizedHash, ok := c.blockHashCache[finalizedHeight]
c.mu.Unlock()
if ok {
finalizedHash = cachedFinalizedHash
} else {
var err error
finalizedHash, _, _, _, err = c.getBlockInfo(ctx, finalizedHeight)
if err != nil {
c.logger.Debug().
Uint64("finalizedHeight", finalizedHeight).
Err(err).
Msg("setFinalWithHeight: finalized block not found, skipping finalized update")
updateFinalized = false
}
}
}
c.mu.Lock()
if isFinal {
c.currentFinalizedBlockHash = blockHash
c.currentSafeBlockHash = blockHash
} else {
c.currentHeadBlockHash = blockHash
c.currentHeadHeight = headHeight
if updateSafe {
c.currentSafeBlockHash = safeHash
}
if updateFinalized {
c.currentFinalizedBlockHash = finalizedHash
}
}
args := engine.ForkchoiceStateV1{
HeadBlockHash: c.currentHeadBlockHash,
SafeBlockHash: c.currentSafeBlockHash,
FinalizedBlockHash: c.currentFinalizedBlockHash,
}
c.mu.Unlock()
return c.doForkchoiceUpdate(ctx, args, "setFinal")
}
// doForkchoiceUpdate performs the actual forkchoice update RPC call with retry logic.
func (c *EngineClient) doForkchoiceUpdate(ctx context.Context, args engine.ForkchoiceStateV1, operation string) error {
// Call forkchoice update with retry logic for SYNCING status
err := retryWithBackoffOnPayloadStatus(ctx, func() error {
forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, nil)
if err != nil {
return fmt.Errorf("forkchoice update failed: %w", err)
}
// Validate payload status
if err := validatePayloadStatus(forkchoiceResult.PayloadStatus); err != nil {
c.logger.Warn().
Str("status", forkchoiceResult.PayloadStatus.Status).
Str("latestValidHash", latestValidHashHex(forkchoiceResult.PayloadStatus.LatestValidHash)).
Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError).
Str("operation", operation).
Msg("forkchoiceUpdatedV3 returned non-VALID status")
return err
}
return nil
}, MaxPayloadStatusRetries, InitialRetryBackoff, operation)
if err != nil {
return err
}
return nil
}
// SetFinal marks the block at the given height as finalized
func (c *EngineClient) SetFinal(ctx context.Context, blockHeight uint64) error {
blockHash, _, _, _, err := c.getBlockInfo(ctx, blockHeight)
if err != nil {
return fmt.Errorf("failed to get block info: %w", err)
}
return c.setFinal(ctx, blockHash, true)
}
// SetSafe explicitly sets the safe block hash.
// This allows the derivation layer to advance the safe block independently of head.
// Safe indicates a block that is unlikely to be reorged (e.g., confirmed by DA).
func (c *EngineClient) SetSafe(ctx context.Context, blockHash common.Hash) error {
c.mu.Lock()
c.currentSafeBlockHash = blockHash
args := engine.ForkchoiceStateV1{
HeadBlockHash: c.currentHeadBlockHash,
SafeBlockHash: c.currentSafeBlockHash,
FinalizedBlockHash: c.currentFinalizedBlockHash,
}
c.mu.Unlock()
return c.doForkchoiceUpdate(ctx, args, "SetSafe")
}
// SetSafeByHeight sets the safe block by looking up the block hash at the given height.
// Uses cached block hashes when available to avoid RPC calls. Falls back to RPC on cache miss
// (e.g., during restart before cache is warmed).
// Returns nil if the height doesn't exist yet (block not produced).
func (c *EngineClient) SetSafeByHeight(ctx context.Context, height uint64) error {
// Try cache first (avoids RPC call in normal operation)
c.mu.Lock()
blockHash, ok := c.blockHashCache[height]
c.mu.Unlock()
if !ok {
// Cache miss - fallback to RPC (happens during restart/recovery)
var err error
blockHash, _, _, _, err = c.getBlockInfo(ctx, height)
if err != nil {
// Block doesn't exist yet - this is expected during early blocks
c.logger.Debug().
Uint64("height", height).
Err(err).
Msg("SetSafeByHeight: block not found, skipping safe update")
return nil
}
}
return c.SetSafe(ctx, blockHash)
}
// cacheBlockHash stores a block hash in the cache for later safe block lookups.
// The cache is bounded to prevent unbounded memory growth - old entries are pruned.
func (c *EngineClient) cacheBlockHash(height uint64, hash common.Hash) {
c.mu.Lock()
defer c.mu.Unlock()
c.blockHashCache[height] = hash
// Prune old entries to keep cache bounded (keep last ~10 blocks)
// This is sufficient since SafeBlockLag is only 2
const maxCacheSize = 10
if len(c.blockHashCache) > maxCacheSize {
var minHeight uint64
if height >= maxCacheSize-1 {
minHeight = height - (maxCacheSize - 1)
}
for h := range c.blockHashCache {
if h < minHeight {
delete(c.blockHashCache, h)
}
}
}
}
// SetFinalized explicitly sets the finalized block hash.
// This allows the derivation layer to advance finalization independently.
// Finalized indicates a block that will never be reorged (e.g., included in DA with sufficient confirmations).
func (c *EngineClient) SetFinalized(ctx context.Context, blockHash common.Hash) error {
c.mu.Lock()
c.currentFinalizedBlockHash = blockHash
// Finalized implies safe.
c.currentSafeBlockHash = blockHash
args := engine.ForkchoiceStateV1{
HeadBlockHash: c.currentHeadBlockHash,
SafeBlockHash: c.currentSafeBlockHash,
FinalizedBlockHash: c.currentFinalizedBlockHash,
}
c.mu.Unlock()
return c.doForkchoiceUpdate(ctx, args, "SetFinalized")
}
// ResumePayload resumes an in-progress payload build using a stored payloadID.
// This is used for crash recovery when we have a payloadID but haven't yet
// retrieved and submitted the payload to the EL.
//
// Returns the state root from the payload, or an error if resumption fails.
// Implements the execution.PayloadResumer interface.
func (c *EngineClient) ResumePayload(ctx context.Context, payloadIDBytes []byte) (stateRoot []byte, err error) {
// Convert bytes to PayloadID
if len(payloadIDBytes) != 8 {
return nil, fmt.Errorf("ResumePayload: invalid payloadID length %d, expected 8", len(payloadIDBytes))
}
var payloadID engine.PayloadID
copy(payloadID[:], payloadIDBytes)
c.logger.Info().
Str("payloadID", payloadID.String()).
Msg("ResumePayload: attempting to resume in-progress payload")
stateRoot, err = c.processPayload(ctx, payloadID, nil) // txs = nil for resume
return stateRoot, err
}
// reconcileExecutionAtHeight checks if the block at the given height and timestamp has already been executed.
// It returns:
// - stateRoot: non-nil if block is already promoted/finalized (idempotent success)
// - payloadID: non-nil if block execution was started but not finished (resume needed)
// - found: true if either of the above is true
// - err: error during checks
func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) {
// 1. Check ExecMeta from store
execMeta, err := c.store.GetExecMeta(ctx, height)
if err == nil && execMeta != nil {
// If we already have a promoted block at this height, verify timestamp matches
// to catch Dual-Store Conflicts where ExecMeta was saved for an old block
// that was later replaced via consensus.
if execMeta.Stage == ExecStagePromoted && len(execMeta.StateRoot) > 0 {
if execMeta.Timestamp == timestamp.Unix() {
// Verify the block actually exists in the EL before trusting ExecMeta.
// ExecMeta could be stale if ev-reth crashed/restarted after we saved it
// but before the block was persisted on the EL side.
existingBlockHash, existingStateRoot, _, existingTimestamp, elErr := c.getBlockInfo(ctx, height)
if elErr == nil && existingBlockHash != (common.Hash{}) && existingTimestamp == uint64(timestamp.Unix()) {
// Block exists in EL with matching timestamp - safe to reuse
c.logger.Info().
Uint64("height", height).
Str("stage", execMeta.Stage).
Str("blockHash", existingBlockHash.Hex()).
Msg("ExecuteTxs: reusing already-promoted execution (idempotent)")
// Update head to point to this existing block
if err := c.setHead(ctx, existingBlockHash); err != nil {
c.logger.Warn().Err(err).Msg("ExecuteTxs: failed to update head to existing block")
}
return existingStateRoot.Bytes(), nil, true, nil
}
// ExecMeta says promoted but block doesn't exist in EL or timestamp mismatch
// This can happen if ev-reth crashed before persisting the block
c.logger.Warn().
Uint64("height", height).
Bool("block_exists", existingBlockHash != common.Hash{}).
Err(elErr).
Msg("ExecuteTxs: ExecMeta shows promoted but block not found in EL (EL in behind), will re-execute")
// Fall through to fresh execution
} else {
// Timestamp mismatch - ExecMeta is stale from an old block that was replaced.
// Ignore it and proceed to EL check which will handle rollback if needed.
c.logger.Warn().
Uint64("height", height).
Int64("execmeta_timestamp", execMeta.Timestamp).
Int64("requested_timestamp", timestamp.Unix()).
Msg("ExecuteTxs: ExecMeta timestamp mismatch, ignoring stale promoted record")
}
}
// If we have a started execution with a payloadID, validate it still exists before resuming.
// After node restart, the EL's payload cache is ephemeral and the payloadID may be stale.
if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 {
requestedTxHash := hashTxs(txs)
if execMeta.Timestamp != timestamp.Unix() || !bytes.Equal(execMeta.TxHash, requestedTxHash) {
c.logger.Warn().
Uint64("height", height).
Int64("execmeta_timestamp", execMeta.Timestamp).
Int64("requested_timestamp", timestamp.Unix()).
Msg("ExecuteTxs: ignoring stale in-progress execution for different block inputs")
} else {
var pid engine.PayloadID
copy(pid[:], execMeta.PayloadID)
// Validate payload still exists by attempting to retrieve it
if _, err = c.engineClient.GetPayload(ctx, pid); err == nil {
c.logger.Info().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")
return nil, &pid, true, nil
}
// Payload is stale (expired or node restarted) - proceed with fresh execution
c.logger.Debug().
Uint64("height", height).
Str("payloadID", pid.String()).
Err(err).
Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute")
// Don't return - fall through to fresh execution
}
}
}
// 2. Check EL for existing block (EL-level idempotency)
existingBlockHash, existingStateRoot, _, existingTimestamp, err := c.getBlockInfo(ctx, height)
if err == nil && existingBlockHash != (common.Hash{}) {
// Block exists at this height - check if timestamp matches
if existingTimestamp == uint64(timestamp.Unix()) {
c.logger.Info().
Uint64("height", height).
Str("blockHash", existingBlockHash.Hex()).
Str("stateRoot", existingStateRoot.Hex()).
Msg("ExecuteTxs: reusing existing block at height (EL idempotency)")
// Update head to point to this existing block
if err := c.setHead(ctx, existingBlockHash); err != nil {
c.logger.Warn().Err(err).Msg("ExecuteTxs: failed to update head to existing block")
// Continue anyway - the block exists and we can return its state root
}
// Update ExecMeta to promoted
c.saveExecMeta(ctx, height, timestamp.Unix(), nil, existingBlockHash[:], existingStateRoot.Bytes(), txs, ExecStagePromoted)
return existingStateRoot.Bytes(), nil, true, nil
}
// We need to rollback the EL to height-1 so it can re-execute
c.logger.Warn().
Uint64("height", height).
Uint64("existingTimestamp", existingTimestamp).
Int64("requestedTimestamp", timestamp.Unix()).
Msg("ExecuteTxs: block exists at height but timestamp differs - rolling back EL to re-sync")
// Rollback to height-1 to allow re-execution with correct timestamp
if height > 0 {
if err := c.Rollback(ctx, height-1); err != nil {
c.logger.Error().Err(err).
Uint64("height", height).
Uint64("rollback_target", height-1).
Msg("ExecuteTxs: failed to rollback EL for timestamp mismatch")
return nil, nil, false, fmt.Errorf("failed to rollback EL for timestamp mismatch at height %d: %w", height, err)
}
c.logger.Info().
Uint64("height", height).
Uint64("rollback_target", height-1).
Msg("ExecuteTxs: EL rolled back successfully, will re-execute with correct timestamp")
}
}
return nil, nil, false, nil
}
// filterTransactions formats transactions for the payload.
// DA transactions should already be filtered via FilterTxs before reaching here.
// Mempool transactions are already validated when added to mempool.
func (c *EngineClient) filterTransactions(txs [][]byte) []string {
validTxs := make([]string, 0, len(txs))
for _, tx := range txs {
if len(tx) == 0 {
continue
}
validTxs = append(validTxs, "0x"+hex.EncodeToString(tx))
}
return validTxs
}
// GetExecutionInfo returns current execution layer parameters.
func (c *EngineClient) GetExecutionInfo(ctx context.Context) (execution.ExecutionInfo, error) {
if cached := c.cachedExecutionInfo.Load(); cached != nil {
return *cached, nil
}
header, err := c.ethClient.HeaderByNumber(ctx, nil) // nil = latest
if err != nil {
return execution.ExecutionInfo{}, fmt.Errorf("failed to get latest block: %w", err)
}
info := execution.ExecutionInfo{MaxGas: header.GasLimit}
c.cachedExecutionInfo.Store(&info)
return info, nil
}
// FilterTxs validates force-included transactions and applies gas and size filtering for all passed txs.
// If hasForceIncludedTransaction is false, skip filtering entirely - mempool batch is already filtered.
// Returns a slice of FilterStatus for each transaction.
func (c *EngineClient) FilterTxs(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) ([]execution.FilterStatus, error) {
result := make([]execution.FilterStatus, len(txs))
var cumulativeGas uint64
var cumulativeBytes uint64
limitReached := false
for i, tx := range txs {
// Skip empty transactions
if len(tx) == 0 {
result[i] = execution.FilterRemove
continue
}
txBytes := uint64(len(tx))
var txGas uint64
// Only validate and parse tx if force-included txs are present
// Mempool txs are already validated, so we can skip parsing when not needed
if hasForceIncludedTransaction {
var ethTx types.Transaction
if err := ethTx.UnmarshalBinary(tx); err != nil {
c.logger.Debug().
Int("tx_index", i).
Err(err).
Str("tx_hex", "0x"+hex.EncodeToString(tx)).
Msg("filtering out invalid transaction (gibberish)")
result[i] = execution.FilterRemove
continue
}
txGas = ethTx.Gas()
// Skip tx that can never make it in a block (too much gas)
if maxGas > 0 && txGas > maxGas {
result[i] = execution.FilterRemove
continue
}
}
// Skip tx that can never make it in a block (too big)
if maxBytes > 0 && txBytes > maxBytes {
result[i] = execution.FilterRemove
continue
}
// Once limit is reached, postpone remaining txs
if limitReached {
result[i] = execution.FilterPostpone
continue
}
// Check size limit
if maxBytes > 0 && cumulativeBytes+txBytes > maxBytes {
limitReached = true
result[i] = execution.FilterPostpone
c.logger.Debug().
Uint64("cumulative_bytes", cumulativeBytes).
Uint64("tx_bytes", txBytes).
Uint64("max_bytes", maxBytes).
Msg("size limit reached, postponing remaining txs")
continue
}
// Check gas limit (only when we have force-included txs and parsed the tx)
if hasForceIncludedTransaction && maxGas > 0 && cumulativeGas+txGas > maxGas {
limitReached = true
result[i] = execution.FilterPostpone
c.logger.Debug().
Uint64("cumulative_gas", cumulativeGas).
Uint64("tx_gas", txGas).
Uint64("max_gas", maxGas).
Msg("gas limit reached, postponing remaining txs")
continue
}
cumulativeBytes += txBytes
cumulativeGas += txGas
result[i] = execution.FilterOK
}
return result, nil
}
// processPayload handles the common logic of getting, submitting, and finalizing a payload.
func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.PayloadID, txs [][]byte) ([]byte, error) {
// 1. Get Payload
payloadResult, err := c.engineClient.GetPayload(ctx, payloadID)
if err != nil {
return nil, fmt.Errorf("get payload failed: %w", err)
}
blockHeight := payloadResult.ExecutionPayload.Number
blockTimestamp := int64(payloadResult.ExecutionPayload.Timestamp)
// 2. Submit Payload (newPayload)
err = retryWithBackoffOnPayloadStatus(ctx, func() error {
newPayloadResult, err := c.engineClient.NewPayload(ctx,
payloadResult.ExecutionPayload,
[]string{}, // No blob hashes
common.Hash{}.Hex(), // Use zero hash for parentBeaconBlockRoot
[][]byte{}, // No execution requests
)
if err != nil {
return fmt.Errorf("new payload submission failed: %w", err)
}
if err := validatePayloadStatus(*newPayloadResult); err != nil {
c.logger.Warn().
Str("status", newPayloadResult.Status).
Str("latestValidHash", latestValidHashHex(newPayloadResult.LatestValidHash)).
Interface("validationError", newPayloadResult.ValidationError).
Uint64("blockHeight", blockHeight).
Msg("processPayload: engine_newPayloadV4 returned non-VALID status")
return err
}
return nil
}, MaxPayloadStatusRetries, InitialRetryBackoff, "processPayload newPayload")
if err != nil {
return nil, err
}
// 3. Update Forkchoice
blockHash := payloadResult.ExecutionPayload.BlockHash
c.cacheBlockHash(blockHeight, blockHash)
err = c.setFinalWithHeight(ctx, blockHash, blockHeight, false)
if err != nil {
return nil, fmt.Errorf("forkchoice update failed: %w", err)
}