Skip to content

Commit b8eda5d

Browse files
authored
refactor(store): improve cache restore perf & fix go-header store errs (#3051)
* perf: improve restore perf * fix expected go-header error * fixes * remove lru cache for store_adapter * optimize * simplify getmetadata * feedback * add tests
1 parent 0a349c0 commit b8eda5d

14 files changed

Lines changed: 1119 additions & 170 deletions

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1717

1818
- Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect that. ([#3046](https://github.com/evstack/ev-node/pull/3046))
1919
- **BREAKING** Make pending events cache and tx cache fully ephemeral. Those will be re-fetched on restart. DA Inclusion cache persists until cleared up after DA inclusion has been processed. Persist accross restart using store metadata. ([#3047](https://github.com/evstack/ev-node/pull/3047))
20+
- Replace LRU cache by standard mem cache with manual eviction in `store_adapter`. When P2P blocks were fetched too fast, they would be evicted before being executed [#3051](https://github.com/evstack/ev-node/pull/3051)
2021

2122
## v1.0.0-rc.2
2223

@@ -25,7 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2526
- Improve cache handling when there is a significant backlog of pending headers and data. ([#3030](https://github.com/evstack/ev-node/pull/3030))
2627
- Decrease MaxBytesSize to `5MB` to increase compatibility with public nodes. ([#3030](https://github.com/evstack/ev-node/pull/3030))
2728
- Proper counting of `DASubmitterPendingBlobs` metrics. [#3038](https://github.com/evstack/ev-node/pull/3038)
28-
- Replace `go-header` store by `ev-node` store. This avoid duplication of all blocks in `go-header` and `ev-node` store. Thanks to the cached store from #3030, this should improve p2p performance as well.
29+
- Replace `go-header` store by `ev-node` store. This avoid duplication of all blocks in `go-header` and `ev-node` store. Thanks to the cached store from #3030, this should improve p2p performance as well. [#3036](https://github.com/evstack/ev-node/pull/3036)
2930

3031
## v1.0.0-rc.1
3132

block/internal/cache/generic_cache.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"context"
55
"encoding/binary"
66
"fmt"
7+
"strings"
78
"sync"
89

910
"sync/atomic"
1011

1112
lru "github.com/hashicorp/golang-lru/v2"
13+
"github.com/rs/zerolog/log"
1214

1315
"github.com/evstack/ev-node/pkg/store"
1416
)
@@ -215,32 +217,40 @@ func (c *Cache[T]) deleteAllForHeight(height uint64) {
215217

216218
// RestoreFromStore loads DA inclusion data from the store into the in-memory cache.
217219
// This should be called during initialization to restore persisted state.
218-
// It iterates through store metadata keys with the cache's prefix and populates the LRU cache.
219-
func (c *Cache[T]) RestoreFromStore(ctx context.Context, hashes []string) error {
220-
if c.store == nil {
221-
return nil // No store configured, nothing to restore
220+
// It directly queries store metadata keys with the cache's prefix, avoiding iteration through all blocks.
221+
func (c *Cache[T]) RestoreFromStore(ctx context.Context) error {
222+
if c.store == nil || c.storeKeyPrefix == "" {
223+
return nil // No store configured or no prefix, nothing to restore
222224
}
223225

224-
for _, hash := range hashes {
225-
value, err := c.store.GetMetadata(ctx, c.storeKey(hash))
226-
if err != nil {
227-
// Key not found is not an error - the hash may not have been DA included yet
226+
// Query all metadata entries with our prefix directly
227+
entries, err := c.store.GetMetadataByPrefix(ctx, c.storeKeyPrefix)
228+
if err != nil {
229+
return fmt.Errorf("failed to query metadata by prefix %q: %w", c.storeKeyPrefix, err)
230+
}
231+
232+
for _, entry := range entries {
233+
// Extract the hash from the key by removing the prefix
234+
hash := strings.TrimPrefix(entry.Key, c.storeKeyPrefix)
235+
if hash == entry.Key || hash == "" {
236+
// Prefix not found or empty hash - skip invalid entry
228237
continue
229238
}
230239

231-
daHeight, blockHeight, ok := decodeDAInclusion(value)
240+
daHeight, blockHeight, ok := decodeDAInclusion(entry.Value)
232241
if !ok {
233-
continue // Invalid data, skip
242+
log.Warn().
243+
Str("key", entry.Key).
244+
Int("value_len", len(entry.Value)).
245+
Msg("skipping invalid DA inclusion entry during cache restore")
246+
continue
234247
}
235248

236249
c.daIncluded.Add(hash, daHeight)
237250
c.hashByHeight.Add(blockHeight, hash)
238251

239252
// Update max DA height
240-
current := c.maxDAHeight.Load()
241-
if daHeight > current {
242-
c.maxDAHeight.Store(daHeight)
243-
}
253+
c.setMaxDAHeight(daHeight)
244254
}
245255

246256
return nil

block/internal/cache/generic_cache_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ func TestCache_MaxDAHeight_WithStore(t *testing.T) {
6767
// Create new cache and restore from store
6868
c2 := NewCache[testItem](st, "test/da-included/")
6969

70-
// Restore with the known hashes
71-
err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"})
70+
err = c2.RestoreFromStore(ctx)
7271
require.NoError(t, err)
7372

7473
if got := c2.daHeight(); got != 200 {
@@ -106,7 +105,7 @@ func TestCache_WithStorePersistence(t *testing.T) {
106105
// Create new cache with same store and restore
107106
c2 := NewCache[testItem](st, "test/")
108107

109-
err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"})
108+
err = c2.RestoreFromStore(ctx)
110109
require.NoError(t, err)
111110

112111
// hash1 and hash2 should be restored, hash3 should not exist
@@ -263,7 +262,7 @@ func TestCache_SaveToStore(t *testing.T) {
263262
// Verify data is in store by creating new cache and restoring
264263
c2 := NewCache[testItem](st, "save-test/")
265264

266-
err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2"})
265+
err = c2.RestoreFromStore(ctx)
267266
require.NoError(t, err)
268267

269268
daHeight, ok := c2.getDAIncluded("hash1")

block/internal/cache/manager.go

Lines changed: 6 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -342,54 +342,26 @@ func (m *implementation) SaveToStore() error {
342342
}
343343

344344
// RestoreFromStore restores the DA inclusion cache from the store.
345-
// This iterates through blocks in the store and checks for persisted DA inclusion data.
345+
// This uses prefix-based queries to directly load persisted DA inclusion data,
346+
// avoiding expensive iteration through all blocks.
346347
func (m *implementation) RestoreFromStore() error {
347348
ctx := context.Background()
348349

349-
// Get current store height to know how many blocks to check
350-
height, err := m.store.Height(ctx)
351-
if err != nil {
352-
return fmt.Errorf("failed to get store height: %w", err)
353-
}
354-
355-
if height == 0 {
356-
return nil // No blocks to restore
357-
}
358-
359-
// Collect hashes from stored blocks
360-
var headerHashes []string
361-
var dataHashes []string
362-
363-
for h := uint64(1); h <= height; h++ {
364-
header, data, err := m.store.GetBlockData(ctx, h)
365-
if err != nil {
366-
m.logger.Warn().Uint64("height", h).Err(err).Msg("failed to get block data during cache restore")
367-
continue
368-
}
369-
370-
if header != nil {
371-
headerHashes = append(headerHashes, header.Hash().String())
372-
}
373-
if data != nil {
374-
dataHashes = append(dataHashes, data.DACommitment().String())
375-
}
376-
}
377-
378350
// Restore DA inclusion data from store
379-
if err := m.headerCache.RestoreFromStore(ctx, headerHashes); err != nil {
351+
if err := m.headerCache.RestoreFromStore(ctx); err != nil {
380352
return fmt.Errorf("failed to restore header cache from store: %w", err)
381353
}
382354

383-
if err := m.dataCache.RestoreFromStore(ctx, dataHashes); err != nil {
355+
if err := m.dataCache.RestoreFromStore(ctx); err != nil {
384356
return fmt.Errorf("failed to restore data cache from store: %w", err)
385357
}
386358

387359
// Initialize DA height from store metadata to ensure DaHeight() is never 0.
388360
m.initDAHeightFromStore(ctx)
389361

390362
m.logger.Info().
391-
Int("header_hashes", len(headerHashes)).
392-
Int("data_hashes", len(dataHashes)).
363+
Int("header_entries", m.headerCache.daIncluded.Len()).
364+
Int("data_entries", m.dataCache.daIncluded.Len()).
393365
Uint64("da_height", m.DaHeight()).
394366
Msg("restored DA inclusion cache from store")
395367

pkg/store/data_store_adapter_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ func TestDataStoreAdapter_NewDataStoreAdapter(t *testing.T) {
5858
// Initially, height should be 0
5959
assert.Equal(t, uint64(0), adapter.Height())
6060

61-
// Head should return ErrNotFound when empty
61+
// Head should return ErrEmptyStore when empty
6262
_, err = adapter.Head(ctx)
63-
assert.ErrorIs(t, err, header.ErrNotFound)
63+
assert.ErrorIs(t, err, header.ErrEmptyStore)
6464
}
6565

6666
func TestDataStoreAdapter_AppendAndRetrieve(t *testing.T) {
@@ -289,9 +289,9 @@ func TestDataStoreAdapter_Tail(t *testing.T) {
289289
store := New(ds)
290290
adapter := NewDataStoreAdapter(store, testGenesisData())
291291

292-
// Tail on empty store should return ErrNotFound
292+
// Tail on empty store should return ErrEmptyStore
293293
_, err = adapter.Tail(ctx)
294-
assert.ErrorIs(t, err, header.ErrNotFound)
294+
assert.ErrorIs(t, err, header.ErrEmptyStore)
295295

296296
_, d1 := types.GetRandomBlock(1, 1, "test-chain")
297297
_, d2 := types.GetRandomBlock(2, 1, "test-chain")
@@ -512,9 +512,9 @@ func TestDataStoreAdapter_InitWithNil(t *testing.T) {
512512
err = adapter.Init(ctx, nil)
513513
require.NoError(t, err)
514514

515-
// Should still return ErrNotFound
515+
// Should still return ErrEmptyStore
516516
_, err = adapter.Head(ctx)
517-
assert.ErrorIs(t, err, header.ErrNotFound)
517+
assert.ErrorIs(t, err, header.ErrEmptyStore)
518518
}
519519

520520
func TestDataStoreAdapter_ContextTimeout(t *testing.T) {

pkg/store/header_store_adapter_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ func TestHeaderStoreAdapter_NewHeaderStoreAdapter(t *testing.T) {
5656
// Initially, height should be 0
5757
assert.Equal(t, uint64(0), adapter.Height())
5858

59-
// Head should return ErrNotFound when empty
59+
// Head should return ErrEmptyStore when empty
6060
_, err = adapter.Head(ctx)
61-
assert.ErrorIs(t, err, header.ErrNotFound)
61+
assert.ErrorIs(t, err, header.ErrEmptyStore)
6262
}
6363

6464
func TestHeaderStoreAdapter_AppendAndRetrieve(t *testing.T) {
@@ -287,9 +287,9 @@ func TestHeaderStoreAdapter_Tail(t *testing.T) {
287287
store := New(ds)
288288
adapter := NewHeaderStoreAdapter(store, testGenesis())
289289

290-
// Tail on empty store should return ErrNotFound
290+
// Tail on empty store should return ErrEmptyStore
291291
_, err = adapter.Tail(ctx)
292-
assert.ErrorIs(t, err, header.ErrNotFound)
292+
assert.ErrorIs(t, err, header.ErrEmptyStore)
293293

294294
h1, _ := types.GetRandomBlock(1, 1, "test-chain")
295295
h2, _ := types.GetRandomBlock(2, 1, "test-chain")
@@ -510,9 +510,9 @@ func TestHeaderStoreAdapter_InitWithNil(t *testing.T) {
510510
err = adapter.Init(ctx, nil)
511511
require.NoError(t, err)
512512

513-
// Should still return ErrNotFound
513+
// Should still return ErrEmptyStore
514514
_, err = adapter.Head(ctx)
515-
assert.ErrorIs(t, err, header.ErrNotFound)
515+
assert.ErrorIs(t, err, header.ErrEmptyStore)
516516
}
517517

518518
func TestHeaderStoreAdapter_ContextTimeout(t *testing.T) {

pkg/store/store.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"encoding/binary"
77
"errors"
88
"fmt"
9+
"strings"
910

1011
ds "github.com/ipfs/go-datastore"
12+
dsq "github.com/ipfs/go-datastore/query"
1113
"google.golang.org/protobuf/proto"
1214

1315
"github.com/evstack/ev-node/types"
@@ -190,6 +192,40 @@ func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, err
190192
return data, nil
191193
}
192194

195+
// GetMetadataByPrefix returns all metadata entries whose keys have the given prefix.
196+
// This is more efficient than iterating through known keys when the set of keys is unknown.
197+
func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) {
198+
// The full key in the datastore includes the meta prefix
199+
fullPrefix := getMetaKey(prefix)
200+
201+
results, err := s.db.Query(ctx, dsq.Query{Prefix: fullPrefix})
202+
if err != nil {
203+
return nil, fmt.Errorf("failed to query metadata with prefix '%s': %w", prefix, err)
204+
}
205+
defer results.Close()
206+
207+
var entries []MetadataEntry
208+
for result := range results.Next() {
209+
if result.Error != nil {
210+
return nil, fmt.Errorf("error iterating metadata results: %w", result.Error)
211+
}
212+
213+
// Extract the original key by removing the meta prefix
214+
// The key from datastore is like "/m/cache/header-da-included/hash"
215+
// We want to return "cache/header-da-included/hash"
216+
metaKeyPrefix := getMetaKey("")
217+
key := strings.TrimPrefix(result.Key, metaKeyPrefix)
218+
key = strings.TrimPrefix(key, "/") // Remove leading slash for consistency
219+
220+
entries = append(entries, MetadataEntry{
221+
Key: key,
222+
Value: result.Value,
223+
})
224+
}
225+
226+
return entries, nil
227+
}
228+
193229
// DeleteMetadata removes a metadata key from the store.
194230
func (s *DefaultStore) DeleteMetadata(ctx context.Context, key string) error {
195231
err := s.db.Delete(ctx, ds.NewKey(getMetaKey(key)))

0 commit comments

Comments
 (0)