Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6b7cb76
add watermark manager and integrate with evmrpc
jewei1997 Oct 13, 2025
c89d42b
Merge branch 'main' into watermark-evmrpc
jewei1997 Oct 13, 2025
db2f4fd
add tests for watermark manager
jewei1997 Oct 13, 2025
4ff9050
fix fmt
jewei1997 Oct 13, 2025
13cc586
Merge branch 'main' into watermark-evmrpc
jewei1997 Oct 14, 2025
8fb63b6
fix stalling block number due to empty block
jewei1997 Oct 14, 2025
da56f3f
remove changes from EVMCompatabilityTest.js
jewei1997 Oct 14, 2025
12a96b2
minor fix
jewei1997 Oct 14, 2025
f887228
minor fix
jewei1997 Oct 14, 2025
5760a11
increase test coverage
jewei1997 Oct 14, 2025
51e8db0
add more test coverage
jewei1997 Oct 14, 2025
9b747f8
add more test coverage for unavail height
jewei1997 Oct 14, 2025
3ad2627
Merge branch 'main' into watermark-evmrpc
jewei1997 Oct 14, 2025
684779e
fix changelog
jewei1997 Oct 15, 2025
51a5d3e
fix suggestions
jewei1997 Oct 15, 2025
055d08e
remove redundant uses of ensureHeightAvailable
jewei1997 Oct 15, 2025
2da9bbe
Merge branch 'main' into watermark-evmrpc
jewei1997 Oct 15, 2025
efaed6f
set latest version in ss store even if no changes
jewei1997 Oct 15, 2025
5070985
remove fallbacks in earlest and latest height
jewei1997 Oct 15, 2025
7a97af0
no longer support sync flush of receipts
jewei1997 Oct 15, 2025
328bf99
Merge branch 'main' into watermark-evmrpc
jewei1997 Oct 15, 2025
e5e3117
use WaitForReceipt in x/evm
jewei1997 Oct 15, 2025
95d213e
fast fail in logs if out of bounds
jewei1997 Oct 15, 2025
1cbdbae
minor fix
jewei1997 Oct 15, 2025
032f794
split up state and block/receipt earliest
jewei1997 Oct 15, 2025
0c74333
Merge branch 'main' into watermark-evmrpc
jewei1997 Oct 16, 2025
3b181b0
Merge branch 'main' into watermark-evmrpc
jewei1997 Oct 27, 2025
9c745f3
Merge branch 'main' into watermark-evmrpc
jewei1997 Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions evmrpc/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ type BlockAPI struct {
namespace string
includeShellReceipts bool
includeBankTransfers bool
watermarks *WatermarkManager
}

type SeiBlockAPI struct {
*BlockAPI
isPanicTx func(ctx context.Context, hash common.Hash) (bool, error)
}

func NewBlockAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(int64) sdk.Context, txConfigProvider func(int64) client.TxConfig, connectionType ConnectionType) *BlockAPI {
func NewBlockAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(int64) sdk.Context, txConfigProvider func(int64) client.TxConfig, connectionType ConnectionType, watermarks *WatermarkManager) *BlockAPI {
return &BlockAPI{
tmClient: tmClient,
keeper: k,
Expand All @@ -60,6 +61,7 @@ func NewBlockAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(i
includeShellReceipts: false,
includeBankTransfers: false,
namespace: EthNamespace,
watermarks: watermarks,
}
}

Expand All @@ -70,6 +72,7 @@ func NewSeiBlockAPI(
txConfigProvider func(int64) client.TxConfig,
connectionType ConnectionType,
isPanicTx func(ctx context.Context, hash common.Hash) (bool, error),
watermarks *WatermarkManager,
) *SeiBlockAPI {
blockAPI := &BlockAPI{
tmClient: tmClient,
Expand All @@ -80,6 +83,7 @@ func NewSeiBlockAPI(
includeShellReceipts: true,
includeBankTransfers: false,
namespace: SeiNamespace,
watermarks: watermarks,
}
return &SeiBlockAPI{
BlockAPI: blockAPI,
Expand All @@ -94,8 +98,9 @@ func NewSei2BlockAPI(
txConfigProvider func(int64) client.TxConfig,
connectionType ConnectionType,
isPanicTx func(ctx context.Context, hash common.Hash) (bool, error),
watermarks *WatermarkManager,
) *SeiBlockAPI {
blockAPI := NewSeiBlockAPI(tmClient, k, ctxProvider, txConfigProvider, connectionType, isPanicTx)
blockAPI := NewSeiBlockAPI(tmClient, k, ctxProvider, txConfigProvider, connectionType, isPanicTx, watermarks)
blockAPI.namespace = Sei2Namespace
blockAPI.includeBankTransfers = true
return blockAPI
Expand All @@ -118,10 +123,13 @@ func (a *BlockAPI) GetBlockTransactionCountByNumber(ctx context.Context, number
if err != nil {
return nil, err
}
block, err := blockByNumberWithRetry(ctx, a.tmClient, numberPtr, 1)
block, err := blockByNumberRespectingWatermarks(ctx, a.tmClient, a.watermarks, numberPtr, 1)
if err != nil {
return nil, err
}
if err := a.ensureHeightAvailable(ctx, block.Block.Height); err != nil {
Comment thread
jewei1997 marked this conversation as resolved.
Outdated
return nil, err
}
return a.getEvmTxCount(block.Block.Txs, block.Block.Height), nil
}

Expand All @@ -132,6 +140,9 @@ func (a *BlockAPI) GetBlockTransactionCountByHash(ctx context.Context, blockHash
if err != nil {
return nil, err
}
if err := a.ensureHeightAvailable(ctx, block.Block.Height); err != nil {
return nil, err
}
return a.getEvmTxCount(block.Block.Txs, block.Block.Height), nil
}

Expand All @@ -147,6 +158,9 @@ func (a *BlockAPI) getBlockByHash(ctx context.Context, blockHash common.Hash, fu
if err != nil {
return nil, err
}
if err := a.ensureHeightAvailable(ctx, block.Block.Height); err != nil {
return nil, err
}

// Validate EVM block height for pacific-1 chain
sdkCtx := a.ctxProvider(LatestCtxHeight)
Expand Down Expand Up @@ -212,17 +226,27 @@ func (a *BlockAPI) getBlockByNumber(
}
}

block, err := blockByNumberWithRetry(ctx, a.tmClient, numberPtr, 1)
block, err := blockByNumberRespectingWatermarks(ctx, a.tmClient, a.watermarks, numberPtr, 1)
if err != nil {
return nil, err
}
if err := a.ensureHeightAvailable(ctx, block.Block.Height); err != nil {
Comment thread
jewei1997 marked this conversation as resolved.
Outdated
return nil, err
}
blockRes, err := blockResultsWithRetry(ctx, a.tmClient, &block.Block.Height)
if err != nil {
return nil, err
}
return EncodeTmBlock(a.ctxProvider, a.txConfigProvider, block, blockRes, a.keeper, fullTx, a.includeBankTransfers, includeSyntheticTxs, isPanicTx)
}

func (a *BlockAPI) ensureHeightAvailable(ctx context.Context, height int64) error {
if a.watermarks == nil {
return nil
}
return a.watermarks.EnsureHeightAvailable(ctx, height)
}

func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (result []map[string]interface{}, returnErr error) {
startTime := time.Now()
defer recordMetricsWithError(fmt.Sprintf("%s_getBlockReceipts", a.namespace), a.connectionType, startTime, returnErr)
Expand All @@ -232,13 +256,16 @@ func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.Block
return nil, err
}

block, err := blockByNumberWithRetry(ctx, a.tmClient, heightPtr, 1)
block, err := blockByNumberRespectingWatermarks(ctx, a.tmClient, a.watermarks, heightPtr, 1)
if err != nil {
return nil, err
}

// Get all tx hashes for the block
height := block.Block.Height
if err := a.ensureHeightAvailable(ctx, height); err != nil {
return nil, err
}
txHashes := getTxHashesFromBlock(a.ctxProvider, a.txConfigProvider, a.keeper, block, shouldIncludeSynthetic(a.namespace))
// Get tx receipts for all hashes in parallel
wg := sync.WaitGroup{}
Expand Down
10 changes: 10 additions & 0 deletions evmrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
// controls whether to synchronously flush receipts before block finalze or not
FlushReceiptSync bool `mapstructure:"flush_receipt_sync"`

// DisableWatermark disables cross-store watermark gating (intended for tests)
Comment thread
jewei1997 marked this conversation as resolved.
Outdated
DisableWatermark bool `mapstructure:"disable_watermark"`

// Deny list defines list of methods that EVM RPC should fail fast
DenyList []string `mapstructure:"deny_list"`

Expand Down Expand Up @@ -126,6 +129,7 @@
MaxTxPoolTxs: 1000,
Slow: false,
FlushReceiptSync: false,
DisableWatermark: false,

Check failure on line 132 in evmrpc/config.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
DenyList: make([]string, 0),
MaxLogNoBlock: 10000,
MaxBlocksForLog: 2000,
Expand Down Expand Up @@ -156,6 +160,7 @@
flagCheckTxTimeout = "evm.checktx_timeout"
flagSlow = "evm.slow"
FlagFlushReceiptSync = "evm.flush_receipt_sync"
flagDisableWatermark = "evm.disable_watermark"
flagDenyList = "evm.deny_list"
flagMaxLogNoBlock = "evm.max_log_no_block"
flagMaxBlocksForLog = "evm.max_blocks_for_log"
Expand Down Expand Up @@ -256,6 +261,11 @@
return cfg, err
}
}
if v := opts.Get(flagDisableWatermark); v != nil {
if cfg.DisableWatermark, err = cast.ToBoolE(v); err != nil {
return cfg, err
}
}
if v := opts.Get(flagDenyList); v != nil {
if cfg.DenyList, err = cast.ToStringSliceE(v); err != nil {
return cfg, err
Expand Down
5 changes: 5 additions & 0 deletions evmrpc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type opts struct {
maxTxPoolTxs interface{}
slow interface{}
flushReceiptSync interface{}
disableWatermark interface{}
denyList interface{}
maxLogNoBlock interface{}
maxBlocksForLog interface{}
Expand Down Expand Up @@ -90,6 +91,9 @@ func (o *opts) Get(k string) interface{} {
if k == "evm.flush_receipt_sync" {
return o.flushReceiptSync
}
if k == "evm.disable_watermark" {
return o.disableWatermark
}
if k == "evm.deny_list" {
return o.denyList
}
Expand Down Expand Up @@ -142,6 +146,7 @@ func TestReadConfig(t *testing.T) {
1000,
false,
false,
false,
make([]string, 0),
20000,
1000,
Expand Down
108 changes: 103 additions & 5 deletions evmrpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func NewFilterAPI(
dbReadSemaphore chan struct{},
globalBlockCache BlockCache,
globalLogSlicePool *LogSlicePool,
watermarks *WatermarkManager,
) *FilterAPI {
if filterConfig.maxBlock <= 0 {
filterConfig.maxBlock = DefaultMaxBlockRange
Expand All @@ -281,6 +282,7 @@ func NewFilterAPI(
dbReadSemaphore: dbReadSemaphore,
globalBlockCache: globalBlockCache,
globalLogSlicePool: globalLogSlicePool,
watermarks: watermarks,
}
filters := make(map[ethrpc.ID]filter)
api := &FilterAPI{
Expand Down Expand Up @@ -643,10 +645,18 @@ type LogFetcher struct {
globalBlockCache BlockCache
cacheCreationMutex sync.Mutex
globalLogSlicePool *LogSlicePool
watermarks *WatermarkManager
}

func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) (res []*ethtypes.Log, end int64, err error) {
latest := f.ctxProvider(LatestCtxHeight).BlockHeight()
latest, err := f.latestHeight(ctx)
if err != nil {
return nil, 0, err
}
earliest, err := f.earliestHeight(ctx)
if err != nil {
earliest = 0
}
begin, end := latest, latest
if crit.FromBlock != nil {
begin = getHeightFromBigIntBlockNumber(latest, crit.FromBlock)
Expand All @@ -660,6 +670,18 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr
if lastToHeight > begin {
begin = lastToHeight
}
if begin < earliest {
begin = earliest
}
if end > latest {
end = latest
}
if end < earliest {
end = earliest
}
if begin > end {
return []*ethtypes.Log{}, end, nil
}

blockRange := end - begin + 1

Expand Down Expand Up @@ -805,6 +827,47 @@ func (f *LogFetcher) mergeSortedLogs(batches [][]*ethtypes.Log) []*ethtypes.Log
return res
}

func (f *LogFetcher) ensureHeightAvailable(ctx context.Context, height int64) error {
if f.watermarks == nil {
return nil
}
return f.watermarks.EnsureHeightAvailable(ctx, height)
}
Comment thread
jewei1997 marked this conversation as resolved.
Outdated

func (f *LogFetcher) latestHeight(ctx context.Context) (int64, error) {
if f.watermarks != nil {
return f.watermarks.LatestHeight(ctx)
}
if f.ctxProvider == nil {
return 0, fmt.Errorf("ctx provider not configured")
}
return f.ctxProvider(LatestCtxHeight).BlockHeight(), nil
}

func (f *LogFetcher) earliestHeight(ctx context.Context) (int64, error) {
if f.watermarks != nil {
return f.watermarks.EarliestHeight(ctx)
Comment thread
jewei1997 marked this conversation as resolved.
Outdated
}
if f.ctxProvider == nil {
return 0, fmt.Errorf("ctx provider not configured")
}
storeCtx := f.ctxProvider(LatestCtxHeight)
ms := storeCtx.MultiStore()
if ms == nil {
return 0, nil
}
earliest := int64(0)
func() {
defer func() {
if r := recover(); r != nil {
earliest = 0
}
}()
earliest = ms.GetEarliestVersion()
}()
return earliest, nil
}

// Pooled version that reuses slice allocation
func (f *LogFetcher) GetLogsForBlockPooled(block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes, result *[]*ethtypes.Log) {
collector := &pooledCollector{logs: result}
Expand Down Expand Up @@ -894,14 +957,26 @@ func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterC
close(res)
return res, 0, false, nil
}
if err := f.ensureHeightAvailable(ctx, block.Block.Height); err != nil {
res := make(chan *coretypes.ResultBlock)
close(res)
return res, 0, false, nil
}
res := make(chan *coretypes.ResultBlock, 1)
res <- block
close(res)
return res, 0, false, nil
}

applyOpenEndedLogLimit := f.filterConfig.maxLog > 0 && (crit.FromBlock == nil || crit.ToBlock == nil)
latest := f.ctxProvider(LatestCtxHeight).BlockHeight()
latest, err := f.latestHeight(ctx)
if err != nil {
return nil, 0, false, err
}
earliest, err := f.earliestHeight(ctx)
if err != nil {
earliest = 0
}
begin, end := latest, latest
if crit.FromBlock != nil {
begin = getHeightFromBigIntBlockNumber(latest, crit.FromBlock)
Expand All @@ -915,12 +990,21 @@ func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterC
if lastToHeight > begin {
begin = lastToHeight
}
if begin < earliest {
begin = earliest
}
if end > latest {
end = latest
}
if end < earliest {
end = earliest
}

blockRange := end - begin + 1
if applyOpenEndedLogLimit && blockRange > f.filterConfig.maxBlock {
begin = end - f.filterConfig.maxBlock + 1
if begin < 1 {
begin = 1
if begin < earliest {
begin = earliest
}
} else if !applyOpenEndedLogLimit && f.filterConfig.maxBlock > 0 && blockRange > f.filterConfig.maxBlock {
// Use consistent error message format
Expand Down Expand Up @@ -988,6 +1072,11 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi

// check cache first, without holding the semaphore
if cachedEntry, found := f.globalBlockCache.Get(height); found {
if cachedEntry.Block != nil {
if err := f.ensureHeightAvailable(ctx, cachedEntry.Block.Block.Height); err != nil {
continue
}
}
res <- cachedEntry.Block
continue
}
Expand All @@ -998,6 +1087,11 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi
// Re-check cache after acquiring semaphore, in case another worker cached it.
if cachedEntry, found := f.globalBlockCache.Get(height); found {
<-f.dbReadSemaphore
if cachedEntry.Block != nil {
if err := f.ensureHeightAvailable(ctx, cachedEntry.Block.Block.Height); err != nil {
continue
}
}
res <- cachedEntry.Block
continue
}
Expand All @@ -1020,7 +1114,7 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi
}

// fetch block from network
block, err := blockByNumberWithRetry(ctx, f.tmClient, &height, 1)
block, err := blockByNumberRespectingWatermarks(ctx, f.tmClient, f.watermarks, &height, 1)
if err != nil {
select {
case errChan <- fmt.Errorf("failed to fetch block at height %d: %w", height, err):
Expand All @@ -1029,6 +1123,10 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi
<-f.dbReadSemaphore
continue
}
if err := f.ensureHeightAvailable(ctx, block.Block.Height); err != nil {
<-f.dbReadSemaphore
continue
}

// Use LoadOrStore to create/get cache entry atomically
entry := loadOrStoreCacheEntry(&f.cacheCreationMutex, f.globalBlockCache, height, block)
Expand Down
Loading
Loading