Skip to content

Commit a6094d7

Browse files
authored
feat: adds execution enforcement to preconfirmations (#161)
* feat: adds more txn metadata to block cache * feat: slash on non-successful transactions * chore: remove redundant log * feat: request receipts concurrently * feat: cleanup code * feat: adds txn receipts to tests * chore: updates test stub to include txn receipts * feat: adds a revert check in tests * feat: use errgroup and syncmap * feat: introduces batching * feat: rework bucketing to minimize requests * chore: resolves nit PR requests * chore: removes closure var * chore: use range over integer * feat: adds metrics * chore: adds metric to collector * feat: get total duration for block receipts * feat: adds retry to BatchReceipts call * feat: adds retries to all calls * feat: check for 429 in infinite retry * feat: do infinite retry on batch call and log * feat: checks for 429 errors * feat: attempt the request multiple times * feat: attempt retries 50 times before failing * feat: reduce the size of batch * feat: adds personal L1 RPC URL * chore: adds temp debug logs * chore: avoid context from cancelation * chore: panic when updater routine fails * feat: adds logger for evmhelper * feat: add more logs * feat: fix linter issues
1 parent 10b6452 commit a6094d7

9 files changed

Lines changed: 547 additions & 47 deletions

File tree

infrastructure/nomad/playbooks/variables/profiles.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
datacenter: "dc1"
2-
l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/WqNEQeeexFLQwECjxCPpdep0uvCgn8Yj"
2+
l1_rpc_url: "https://eth-holesky.g.alchemy.com/v2/H8JN1wImnEPrxkFRVOT7cJ_gzu9x3VmB"
33

44
artifacts:
55
bidder_emulator: &bidder_emulator_artifact

oracle/pkg/l1Listener/l1Listener.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type WinnerRegister interface {
2727
type EthClient interface {
2828
BlockNumber(ctx context.Context) (uint64, error)
2929
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
30+
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
3031
}
3132

3233
type L1Listener struct {

oracle/pkg/l1Listener/l1Listener_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ func (t *testEthClient) HeaderByNumber(_ context.Context, number *big.Int) (*typ
205205
return hdr, nil
206206
}
207207

208+
func (t *testEthClient) BlockByNumber(_ context.Context, number *big.Int) (*types.Block, error) {
209+
return nil, nil
210+
}
211+
208212
func publishLog(
209213
eventManager events.EventManager,
210214
blockNum *big.Int,

oracle/pkg/node/node.go

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func NewNode(opts *Options) (*Node, error) {
106106
monitor := txmonitor.New(
107107
owner,
108108
settlementClient,
109-
txmonitor.NewEVMHelper(settlementClient.Client()),
109+
txmonitor.NewEVMHelperWithLogger(settlementClient.Client(), nd.logger),
110110
st,
111111
nd.logger.With("component", "tx_monitor"),
112112
1024,
@@ -164,6 +164,8 @@ func NewNode(opts *Options) (*Node, error) {
164164
listenerL1Client = &laggerdL1Client{EthClient: listenerL1Client, amount: opts.LaggerdMode}
165165
}
166166

167+
listenerL1Client = &infiniteRetryL1Client{EthClient: listenerL1Client, logger: nd.logger}
168+
167169
blockTracker, err := blocktracker.NewBlocktrackerTransactor(
168170
opts.BlockTrackerContractAddr,
169171
settlementRPC,
@@ -232,10 +234,11 @@ func NewNode(opts *Options) (*Node, error) {
232234

233235
updtr, err := updater.NewUpdater(
234236
nd.logger.With("component", "updater"),
235-
l1Client,
237+
listenerL1Client,
236238
st,
237239
evtMgr,
238240
oracleTransactorSession,
241+
txmonitor.NewEVMHelperWithLogger(l1Client.Client(), nd.logger),
239242
)
240243
if err != nil {
241244
nd.logger.Error("failed to instantiate updater", "error", err)
@@ -403,6 +406,61 @@ func (w *winnerOverrideL1Client) HeaderByNumber(ctx context.Context, number *big
403406
return hdr, nil
404407
}
405408

409+
type infiniteRetryL1Client struct {
410+
l1Listener.EthClient
411+
logger *slog.Logger
412+
}
413+
414+
func (i *infiniteRetryL1Client) BlockNumber(ctx context.Context) (uint64, error) {
415+
var blkNum uint64
416+
var err error
417+
for retries := 50; retries > 0; retries-- {
418+
blkNum, err = i.EthClient.BlockNumber(ctx)
419+
if err == nil {
420+
break
421+
}
422+
i.logger.Error("failed to get block number, retrying...", "error", err)
423+
time.Sleep(2 * time.Second)
424+
}
425+
if err != nil {
426+
return 0, err
427+
}
428+
return blkNum, nil
429+
}
430+
431+
func (i *infiniteRetryL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
432+
var hdr *types.Header
433+
var err error
434+
for retries := 50; retries > 0; retries-- {
435+
hdr, err = i.EthClient.HeaderByNumber(ctx, number)
436+
if err == nil {
437+
break
438+
}
439+
i.logger.Error("failed to get header by number, retrying...", "error", err)
440+
time.Sleep(2 * time.Second)
441+
}
442+
if err != nil {
443+
return nil, err
444+
}
445+
return hdr, nil
446+
}
447+
448+
func (i *infiniteRetryL1Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
449+
var blk *types.Block
450+
var err error
451+
for retries := 50; retries > 0; retries-- {
452+
blk, err = i.EthClient.BlockByNumber(ctx, number)
453+
if err == nil {
454+
break
455+
}
456+
i.logger.Error("failed to get block by number, retrying...", "error", err)
457+
time.Sleep(2 * time.Second)
458+
}
459+
if err != nil {
460+
return nil, err
461+
}
462+
return blk, nil
463+
}
406464
func setBuilderMapping(
407465
ctx context.Context,
408466
bt *blocktracker.BlocktrackerTransactorSession,

oracle/pkg/updater/metrics.go

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,21 @@ const (
88
)
99

1010
type metrics struct {
11-
CommitmentsReceivedCount prometheus.Counter
12-
CommitmentsProcessedCount prometheus.Counter
13-
CommitmentsTooOldCount prometheus.Counter
14-
DuplicateCommitmentsCount prometheus.Counter
15-
RewardsCount prometheus.Counter
16-
SlashesCount prometheus.Counter
17-
EncryptedCommitmentsCount prometheus.Counter
18-
NoWinnerCount prometheus.Counter
19-
BlockTxnCacheHits prometheus.Counter
20-
BlockTxnCacheMisses prometheus.Counter
21-
BlockTimeCacheHits prometheus.Counter
22-
BlockTimeCacheMisses prometheus.Counter
23-
LastSentNonce prometheus.Gauge
11+
CommitmentsReceivedCount prometheus.Counter
12+
CommitmentsProcessedCount prometheus.Counter
13+
CommitmentsTooOldCount prometheus.Counter
14+
DuplicateCommitmentsCount prometheus.Counter
15+
RewardsCount prometheus.Counter
16+
SlashesCount prometheus.Counter
17+
EncryptedCommitmentsCount prometheus.Counter
18+
NoWinnerCount prometheus.Counter
19+
BlockTxnCacheHits prometheus.Counter
20+
BlockTxnCacheMisses prometheus.Counter
21+
BlockTimeCacheHits prometheus.Counter
22+
BlockTimeCacheMisses prometheus.Counter
23+
LastSentNonce prometheus.Gauge
24+
TxnReceiptRequestDuration prometheus.Histogram
25+
TxnReceiptRequestBlockDuration prometheus.Histogram
2426
}
2527

2628
func newMetrics() *metrics {
@@ -129,6 +131,22 @@ func newMetrics() *metrics {
129131
Help: "Last nonce sent to for settlement",
130132
},
131133
)
134+
m.TxnReceiptRequestDuration = prometheus.NewHistogram(
135+
prometheus.HistogramOpts{
136+
Namespace: defaultNamespace,
137+
Subsystem: subsystem,
138+
Name: "txn_receipt_request_duration",
139+
Help: "Duration of transaction receipt requests",
140+
},
141+
)
142+
m.TxnReceiptRequestBlockDuration = prometheus.NewHistogram(
143+
prometheus.HistogramOpts{
144+
Namespace: defaultNamespace,
145+
Subsystem: subsystem,
146+
Name: "txn_receipt_request_block_duration",
147+
Help: "Duration of transaction receipt requests",
148+
},
149+
)
132150
return m
133151
}
134152

@@ -147,5 +165,7 @@ func (m *metrics) Collectors() []prometheus.Collector {
147165
m.BlockTimeCacheHits,
148166
m.BlockTimeCacheMisses,
149167
m.LastSentNonce,
168+
m.TxnReceiptRequestDuration,
169+
m.TxnReceiptRequestBlockDuration,
150170
}
151171
}

oracle/pkg/updater/updater.go

Lines changed: 97 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import (
99
"math"
1010
"math/big"
1111
"strings"
12+
"sync"
1213
"sync/atomic"
14+
"time"
1315

1416
"github.com/ethereum/go-ethereum/common"
1517
"github.com/ethereum/go-ethereum/core/types"
@@ -18,12 +20,18 @@ import (
1820
blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker"
1921
preconf "github.com/primev/mev-commit/contracts-abi/clients/PreConfCommitmentStore"
2022
"github.com/primev/mev-commit/x/contracts/events"
23+
"github.com/primev/mev-commit/x/contracts/txmonitor"
2124
"github.com/prometheus/client_golang/prometheus"
2225
"golang.org/x/sync/errgroup"
2326
)
2427

2528
type SettlementType string
2629

30+
type TxMetadata struct {
31+
PosInBlock int
32+
Succeeded bool
33+
}
34+
2735
const (
2836
SettlementTypeReward SettlementType = "reward"
2937
SettlementTypeSlash SettlementType = "slash"
@@ -92,11 +100,12 @@ type Updater struct {
92100
winnerRegister WinnerRegister
93101
oracle Oracle
94102
evtMgr events.EventManager
95-
l1BlockCache *lru.Cache[uint64, map[string]int]
103+
l1BlockCache *lru.Cache[uint64, map[string]TxMetadata]
96104
encryptedCmts chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored
97105
openedCmts chan *preconf.PreconfcommitmentstoreCommitmentStored
98106
currentWindow atomic.Int64
99107
metrics *metrics
108+
receiptBatcher txmonitor.BatchReceiptGetter
100109
}
101110

102111
func NewUpdater(
@@ -105,8 +114,9 @@ func NewUpdater(
105114
winnerRegister WinnerRegister,
106115
evtMgr events.EventManager,
107116
oracle Oracle,
117+
receiptBatcher txmonitor.BatchReceiptGetter,
108118
) (*Updater, error) {
109-
l1BlockCache, err := lru.New[uint64, map[string]int](1024)
119+
l1BlockCache, err := lru.New[uint64, map[string]TxMetadata](1024)
110120
if err != nil {
111121
return nil, fmt.Errorf("failed to create L1 block cache: %w", err)
112122
}
@@ -117,6 +127,7 @@ func NewUpdater(
117127
winnerRegister: winnerRegister,
118128
evtMgr: evtMgr,
119129
oracle: oracle,
130+
receiptBatcher: receiptBatcher,
120131
metrics: newMetrics(),
121132
openedCmts: make(chan *preconf.PreconfcommitmentstoreCommitmentStored),
122133
encryptedCmts: make(chan *preconf.PreconfcommitmentstoreEncryptedCommitmentStored),
@@ -205,7 +216,8 @@ func (u *Updater) Start(ctx context.Context) <-chan struct{} {
205216
go func() {
206217
defer close(doneChan)
207218
if err := eg.Wait(); err != nil {
208-
u.logger.Error("failed to start updater", "error", err)
219+
u.logger.Error("updater failed, exiting", "error", err)
220+
panic(err)
209221
}
210222
}()
211223

@@ -316,7 +328,6 @@ func (u *Updater) handleOpenedCommitment(
316328
)
317329
return err
318330
}
319-
320331
// Compute the decay percentage
321332
decayPercentage := u.computeDecayPercentage(
322333
update.DecayStartTimeStamp,
@@ -327,17 +338,19 @@ func (u *Updater) handleOpenedCommitment(
327338
commitmentTxnHashes := strings.Split(update.TxnHash, ",")
328339
// Ensure Bundle is atomic and present in the block
329340
for i := 0; i < len(commitmentTxnHashes); i++ {
330-
posInBlock, found := txns[commitmentTxnHashes[i]]
331-
if !found || posInBlock != txns[commitmentTxnHashes[0]]+i {
341+
txnDetails, found := txns[commitmentTxnHashes[i]]
342+
if !found || txnDetails.PosInBlock != (txns[commitmentTxnHashes[0]].PosInBlock)+i || !txnDetails.Succeeded {
332343
u.logger.Info(
333-
"bundle is not atomic",
344+
"bundle does not satsify commited requirements",
334345
"commitmentIdx", common.Bytes2Hex(update.CommitmentIndex[:]),
335346
"txnHash", update.TxnHash,
336347
"blockNumber", update.BlockNumber,
337348
"found", found,
338-
"posInBlock", posInBlock,
339-
"expectedPosInBlock", txns[commitmentTxnHashes[0]]+i,
349+
"posInBlock", txnDetails.PosInBlock,
350+
"succeeded", txnDetails.Succeeded,
351+
"expectedPosInBlock", txns[commitmentTxnHashes[0]].PosInBlock+i,
340352
)
353+
341354
// The committer did not include the transactions in the block
342355
// correctly, so this is a slash to be processed
343356
return u.settle(
@@ -450,28 +463,95 @@ func (u *Updater) addSettlement(
450463

451464
return nil
452465
}
453-
454-
func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]int, error) {
466+
func (u *Updater) getL1Txns(ctx context.Context, blockNum uint64) (map[string]TxMetadata, error) {
455467
txns, ok := u.l1BlockCache.Get(blockNum)
456468
if ok {
457469
u.metrics.BlockTxnCacheHits.Inc()
470+
u.logger.Info("cache hit for block transactions", "blockNum", blockNum)
458471
return txns, nil
459472
}
460473

461474
u.metrics.BlockTxnCacheMisses.Inc()
475+
u.logger.Info("cache miss for block transactions", "blockNum", blockNum)
462476

463-
blk, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum))
477+
block, err := u.l1Client.BlockByNumber(ctx, big.NewInt(0).SetUint64(blockNum))
464478
if err != nil {
479+
u.logger.Error("failed to get block by number", "blockNum", blockNum, "error", err)
465480
return nil, fmt.Errorf("failed to get block by number: %w", err)
466481
}
467482

468-
txnsInBlock := make(map[string]int)
469-
for posInBlock, tx := range blk.Transactions() {
470-
txnsInBlock[strings.TrimPrefix(tx.Hash().Hex(), "0x")] = posInBlock
483+
u.logger.Info("retrieved block", "blockNum", blockNum, "blockHash", block.Hash().Hex())
484+
485+
var txnReceipts sync.Map
486+
eg, ctx := errgroup.WithContext(ctx)
487+
488+
txnsArray := make([]common.Hash, len(block.Transactions()))
489+
for i, tx := range block.Transactions() {
490+
txnsArray[i] = tx.Hash()
471491
}
472-
_ = u.l1BlockCache.Add(blockNum, txnsInBlock)
492+
const bucketSize = 25 // Arbitrary number for bucket size
493+
494+
numBuckets := (len(txnsArray) + bucketSize - 1) / bucketSize // Calculate the number of buckets needed, rounding up
495+
buckets := make([][]common.Hash, numBuckets)
496+
for i := 0; i < numBuckets; i++ {
497+
start := i * bucketSize
498+
end := start + bucketSize
499+
if end > len(txnsArray) {
500+
end = len(txnsArray)
501+
}
502+
buckets[i] = txnsArray[start:end]
503+
}
504+
505+
blockStart := time.Now()
506+
507+
for _, bucket := range buckets {
508+
eg.Go(func() error {
509+
start := time.Now()
510+
u.logger.Info("requesting batch receipts", "bucketSize", len(bucket))
511+
results, err := u.receiptBatcher.BatchReceipts(ctx, bucket)
512+
if err != nil {
513+
u.logger.Error("failed to get batch receipts", "error", err)
514+
return fmt.Errorf("failed to get batch receipts: %w", err)
515+
}
516+
u.metrics.TxnReceiptRequestDuration.Observe(time.Since(start).Seconds())
517+
u.logger.Info("received batch receipts", "duration", time.Since(start).Seconds())
518+
for _, result := range results {
519+
if result.Err != nil {
520+
u.logger.Error("failed to get receipt for txn", "txnHash", result.Receipt.TxHash.Hex(), "error", result.Err)
521+
return fmt.Errorf("failed to get receipt for txn: %s", result.Err)
522+
}
523+
524+
txnReceipts.Store(result.Receipt.TxHash.Hex(), result.Receipt)
525+
u.logger.Info("stored receipt", "txnHash", result.Receipt.TxHash.Hex())
526+
}
527+
528+
return nil
529+
})
530+
}
531+
532+
if err := eg.Wait(); err != nil {
533+
u.logger.Error("error while waiting for batch receipts", "error", err)
534+
return nil, err
535+
}
536+
537+
u.metrics.TxnReceiptRequestBlockDuration.Observe(time.Since(blockStart).Seconds())
538+
u.logger.Info("completed batch receipt requests for block", "blockNum", blockNum, "duration", time.Since(blockStart).Seconds())
539+
540+
txnsMap := make(map[string]TxMetadata)
541+
for i, tx := range txnsArray {
542+
receipt, ok := txnReceipts.Load(tx.Hex())
543+
if !ok {
544+
u.logger.Error("receipt not found for txn", "txnHash", tx.Hex())
545+
return nil, fmt.Errorf("receipt not found for txn: %s", tx)
546+
}
547+
txnsMap[strings.TrimPrefix(tx.Hex(), "0x")] = TxMetadata{PosInBlock: i, Succeeded: receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful}
548+
u.logger.Info("added txn to map", "txnHash", tx.Hex(), "posInBlock", i, "succeeded", receipt.(*types.Receipt).Status == types.ReceiptStatusSuccessful)
549+
}
550+
551+
_ = u.l1BlockCache.Add(blockNum, txnsMap)
552+
u.logger.Info("added block transactions to cache", "blockNum", blockNum)
473553

474-
return txnsInBlock, nil
554+
return txnsMap, nil
475555
}
476556

477557
// computeDecayPercentage takes startTimestamp, endTimestamp, commitTimestamp and computes a linear decay percentage

0 commit comments

Comments
 (0)