Skip to content

Commit df8c2bb

Browse files
authored
Tx index for Parquet receipt store (#3222)
## Describe your changes and provide context Create a tx index to store (tx hash -> block number) to allow parquet to quickly serve getReceiptByHash queries. ## Testing performed to validate your change unit tests
1 parent 786863f commit df8c2bb

21 files changed

Lines changed: 920 additions & 86 deletions

sei-db/config/receipt_config.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,21 @@ const (
2020
flagRSAsyncWriteBuffer = "receipt-store.async-write-buffer"
2121
flagRSKeepRecent = "receipt-store.keep-recent"
2222
flagRSPruneIntervalSeconds = "receipt-store.prune-interval-seconds"
23+
flagRSTxIndexBackend = "receipt-store.tx-index-backend"
24+
25+
ReceiptTxIndexBackendNone = ""
26+
ReceiptTxIndexBackendPebble = "pebbledb"
2327
)
2428

29+
func NormalizeReceiptTxIndexBackend(backend string) string {
30+
switch strings.ToLower(strings.TrimSpace(backend)) {
31+
case "pebbledb":
32+
return ReceiptTxIndexBackendPebble
33+
default:
34+
return ReceiptTxIndexBackendNone
35+
}
36+
}
37+
2538
// ReceiptStoreConfig defines configuration for the receipt store database.
2639
type ReceiptStoreConfig struct {
2740
// DBDirectory defines the directory to store the receipt store db files
@@ -49,10 +62,13 @@ type ReceiptStoreConfig struct {
4962
// default to every 600 seconds
5063
PruneIntervalSeconds int `mapstructure:"prune-interval-seconds"`
5164

52-
// DisableTxIndexLookup must remain true. The tx_hash -> block_number lookup
53-
// implementation is intentionally unsupported; setting this to false will
54-
// panic during parquet store initialization.
55-
DisableTxIndexLookup bool `mapstructure:"disable-tx-index-lookup"`
65+
// TxIndexBackend selects the tx-hash index implementation used by the
66+
// parquet receipt store. Set to "pebbledb" (the default) to maintain a
67+
// Pebble-backed tx_hash -> block_number index alongside parquet files so
68+
// receipt-by-hash lookups can target a single file instead of scanning all
69+
// files. Set to "" to disable the index and fall back to a full DuckDB scan.
70+
// Ignored when the receipt backend is not parquet.
71+
TxIndexBackend string `mapstructure:"tx-index-backend"`
5672
}
5773

5874
// DefaultReceiptStoreConfig returns the default ReceiptStoreConfig
@@ -62,7 +78,7 @@ func DefaultReceiptStoreConfig() ReceiptStoreConfig {
6278
AsyncWriteBuffer: DefaultSSAsyncBuffer,
6379
KeepRecent: DefaultSSKeepRecent,
6480
PruneIntervalSeconds: DefaultSSPruneInterval,
65-
DisableTxIndexLookup: true,
81+
TxIndexBackend: ReceiptTxIndexBackendPebble,
6682
}
6783
}
6884

@@ -113,5 +129,12 @@ func ReadReceiptConfig(opts AppOptions) (ReceiptStoreConfig, error) {
113129
}
114130
cfg.PruneIntervalSeconds = pruneIntervalSeconds
115131
}
132+
if v := opts.Get(flagRSTxIndexBackend); v != nil {
133+
txIndexBackend, err := cast.ToStringE(v)
134+
if err != nil {
135+
return cfg, err
136+
}
137+
cfg.TxIndexBackend = NormalizeReceiptTxIndexBackend(txIndexBackend)
138+
}
116139
return cfg, nil
117140
}

sei-db/config/receipt_config_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,30 @@ func TestReadReceiptConfigRejectsMisnamedBackendKey(t *testing.T) {
2121
require.ErrorContains(t, err, "receipt-store.backend")
2222
require.ErrorContains(t, err, "receipt-store.rs-backend")
2323
}
24+
25+
func TestReadReceiptConfigTxIndexBackendOverride(t *testing.T) {
26+
cfg, err := ReadReceiptConfig(mapAppOpts{
27+
"receipt-store.tx-index-backend": "",
28+
})
29+
30+
require.NoError(t, err)
31+
require.Equal(t, "", cfg.TxIndexBackend)
32+
}
33+
34+
func TestReadReceiptConfigAcceptsPebbleDBTxIndexBackend(t *testing.T) {
35+
cfg, err := ReadReceiptConfig(mapAppOpts{
36+
"receipt-store.tx-index-backend": "pebbledb",
37+
})
38+
39+
require.NoError(t, err)
40+
require.Equal(t, ReceiptTxIndexBackendPebble, cfg.TxIndexBackend)
41+
}
42+
43+
func TestReadReceiptConfigUnknownTxIndexBackendDefaultsToNone(t *testing.T) {
44+
cfg, err := ReadReceiptConfig(mapAppOpts{
45+
"receipt-store.tx-index-backend": "rocksdb",
46+
})
47+
48+
require.NoError(t, err)
49+
require.Equal(t, ReceiptTxIndexBackendNone, cfg.TxIndexBackend)
50+
}

sei-db/config/toml.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ keep-recent = {{ .ReceiptStore.KeepRecent }}
160160
# PruneIntervalSeconds defines the interval in seconds to trigger pruning.
161161
# defaults to 600 seconds
162162
prune-interval-seconds = {{ .ReceiptStore.PruneIntervalSeconds }}
163+
164+
# TxIndexBackend selects the tx-hash index implementation for parquet receipts.
165+
# Set to "pebbledb" to enable the index, or "" to disable it.
166+
# Ignored unless rs-backend = "parquet".
167+
tx-index-backend = "{{ .ReceiptStore.TxIndexBackend }}"
163168
`
164169

165170
// DefaultConfigTemplate combines both templates for backward compatibility

sei-db/config/toml_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ func TestReceiptStoreConfigTemplate(t *testing.T) {
117117
require.Contains(t, output, "async-write-buffer =", "Missing async-write-buffer")
118118
require.Contains(t, output, "keep-recent =", "Missing keep-recent")
119119
require.Contains(t, output, "prune-interval-seconds =", "Missing prune-interval-seconds")
120+
require.Contains(t, output, `tx-index-backend = "pebbledb"`, "Missing or incorrect tx-index-backend")
120121
require.Contains(t, output, `Applies only when rs-backend = "pebbledb"`, "Missing pebble-only async-write-buffer note")
121122
require.NotContains(t, output, "use-default-comparer", "use-default-comparer should not be in receipt-store template")
122123
}
@@ -151,6 +152,7 @@ func TestDefaultConfigTemplate(t *testing.T) {
151152
require.Contains(t, output, "async-write-buffer =")
152153
require.Contains(t, output, "keep-recent =")
153154
require.Contains(t, output, "prune-interval-seconds =")
155+
require.Contains(t, output, `tx-index-backend = "pebbledb"`)
154156
}
155157

156158
// TestWriteModeValues verifies WriteMode enum values match template output

sei-db/ledger_db/parquet/reader.go

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -321,26 +321,81 @@ func (r *Reader) MaxReceiptBlockNumber(ctx context.Context) (uint64, bool, error
321321
return uint64(max.Int64), true, nil
322322
}
323323

324-
// GetReceiptByTxHash queries for a receipt by transaction hash.
324+
// GetReceiptByTxHash queries for a receipt by transaction hash across all
325+
// closed parquet files (full scan).
325326
func (r *Reader) GetReceiptByTxHash(ctx context.Context, txHash common.Hash) (*ReceiptResult, error) {
326-
// Hold pruneMu first to prevent file deletion, then snapshot the list.
327+
return r.getReceiptByTxHashFromFiles(ctx, txHash, nil)
328+
}
329+
330+
// GetReceiptByTxHashInBlock narrows the search to the parquet file that
331+
// should contain blockNumber, falling back to a full scan on miss.
332+
func (r *Reader) GetReceiptByTxHashInBlock(ctx context.Context, txHash common.Hash, blockNumber uint64) (*ReceiptResult, error) {
333+
// Hold pruneMu across candidate selection and the targeted query so a
334+
// concurrent prune cannot delete the file between fileForBlock and the
335+
// DuckDB read (see getReceiptByTxHashFromFilesLocked).
327336
r.pruneMu.RLock()
328-
defer r.pruneMu.RUnlock()
337+
candidateFile := r.fileForBlock(blockNumber)
338+
339+
var result *ReceiptResult
340+
var err error
341+
if candidateFile != "" {
342+
result, err = r.getReceiptByTxHashFromFilesLocked(ctx, txHash, []string{candidateFile})
343+
}
344+
r.pruneMu.RUnlock()
329345

346+
if err != nil {
347+
return nil, err
348+
}
349+
if result != nil {
350+
return result, nil
351+
}
352+
return r.getReceiptByTxHashFromFiles(ctx, txHash, nil)
353+
}
354+
355+
// fileForBlock returns the receipt parquet file path that should contain
356+
// blockNumber, using the sorted tracked file list. Returns "" if no
357+
// candidate is found.
358+
func (r *Reader) fileForBlock(blockNumber uint64) string {
330359
r.mu.RLock()
331-
closedFiles := make([]string, len(r.closedReceiptFiles))
332-
copy(closedFiles, r.closedReceiptFiles)
333-
r.mu.RUnlock()
360+
defer r.mu.RUnlock()
334361

335-
if len(closedFiles) == 0 {
362+
var best string
363+
for _, f := range r.closedReceiptFiles {
364+
start := ExtractBlockNumber(f)
365+
if start <= blockNumber {
366+
best = f
367+
} else {
368+
break
369+
}
370+
}
371+
return best
372+
}
373+
374+
func (r *Reader) getReceiptByTxHashFromFiles(ctx context.Context, txHash common.Hash, files []string) (*ReceiptResult, error) {
375+
r.pruneMu.RLock()
376+
defer r.pruneMu.RUnlock()
377+
return r.getReceiptByTxHashFromFilesLocked(ctx, txHash, files)
378+
}
379+
380+
// getReceiptByTxHashFromFilesLocked runs the receipt query. The caller must
381+
// hold r.pruneMu.RLock (or otherwise ensure listed files are not deleted).
382+
func (r *Reader) getReceiptByTxHashFromFilesLocked(ctx context.Context, txHash common.Hash, files []string) (*ReceiptResult, error) {
383+
if files == nil {
384+
r.mu.RLock()
385+
files = make([]string, len(r.closedReceiptFiles))
386+
copy(files, r.closedReceiptFiles)
387+
r.mu.RUnlock()
388+
}
389+
390+
if len(files) == 0 {
336391
return nil, nil
337392
}
338393

339394
var parquetFiles string
340-
if len(closedFiles) == 1 {
341-
parquetFiles = quoteSQLString(closedFiles[0])
395+
if len(files) == 1 {
396+
parquetFiles = quoteSQLString(files[0])
342397
} else {
343-
parquetFiles = fmt.Sprintf("[%s]", joinQuoted(closedFiles))
398+
parquetFiles = fmt.Sprintf("[%s]", joinQuoted(files))
344399
}
345400

346401
// #nosec G201 -- parquetFiles derived from local file paths

sei-db/ledger_db/parquet/reader_filter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ func TestStoreGetReceiptByTxHashWithoutIndex(t *testing.T) {
102102
}
103103

104104
store, err := NewStore(StoreConfig{
105-
DBDirectory: dir,
106-
MaxBlocksPerFile: 500,
107-
DisableTxIndexLookup: true,
105+
DBDirectory: dir,
106+
MaxBlocksPerFile: 500,
107+
TxIndexBackend: "none",
108108
})
109109
require.NoError(t, err)
110110
defer func() { _ = store.Close() }()

sei-db/ledger_db/parquet/reader_race_test.go

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ func TestConcurrentReadsAndPrune(t *testing.T) {
8282
}
8383

8484
store, err := NewStore(StoreConfig{
85-
DBDirectory: dir,
86-
MaxBlocksPerFile: 500,
87-
KeepRecent: 600,
88-
DisableTxIndexLookup: true,
85+
DBDirectory: dir,
86+
MaxBlocksPerFile: 500,
87+
KeepRecent: 600,
88+
TxIndexBackend: "none",
8989
})
9090
require.NoError(t, err)
9191
t.Cleanup(func() { _ = store.Close() })
@@ -126,6 +126,56 @@ func TestConcurrentReadsAndPrune(t *testing.T) {
126126
require.NoError(t, g.Wait())
127127
}
128128

129+
// TestConcurrentTargetedReadsAndPrune exercises GetReceiptByTxHashInBlock
130+
// (single-file candidate path) concurrently with pruning. Without holding
131+
// pruneMu across fileForBlock and the targeted query, a prune could delete
132+
// the candidate file between those steps and DuckDB would error.
133+
func TestConcurrentTargetedReadsAndPrune(t *testing.T) {
134+
dir := t.TempDir()
135+
136+
for _, start := range []uint64{0, 500, 1000} {
137+
require.NoError(t, createTestReceiptFile(dir, start, 500))
138+
}
139+
140+
store, err := NewStore(StoreConfig{
141+
DBDirectory: dir,
142+
MaxBlocksPerFile: 500,
143+
KeepRecent: 600,
144+
TxIndexBackend: "none",
145+
})
146+
require.NoError(t, err)
147+
t.Cleanup(func() { _ = store.Close() })
148+
149+
ctx := context.Background()
150+
txHash := common.BigToHash(new(big.Int).SetUint64(250))
151+
152+
result, err := store.GetReceiptByTxHashInBlock(ctx, txHash, 250)
153+
require.NoError(t, err)
154+
require.NotNil(t, result)
155+
156+
const numReaders = 32
157+
const readsPerReader = 100
158+
159+
g, _ := errgroup.WithContext(ctx)
160+
for i := 0; i < numReaders; i++ {
161+
g.Go(func() error {
162+
for j := 0; j < readsPerReader; j++ {
163+
if _, err := store.GetReceiptByTxHashInBlock(ctx, txHash, 250); err != nil {
164+
return fmt.Errorf("GetReceiptByTxHashInBlock(250): %w", err)
165+
}
166+
}
167+
return nil
168+
})
169+
}
170+
171+
g.Go(func() error {
172+
store.PruneOldFiles(600)
173+
return nil
174+
})
175+
176+
require.NoError(t, g.Wait())
177+
}
178+
129179
// TestOnFileRotationNotBlockedByPruneMu verifies the structural property
130180
// that OnFileRotation only acquires mu (the file-list lock), never pruneMu
131181
// (the file-lifetime lock). We hold pruneMu.RLock to simulate in-flight
@@ -135,9 +185,9 @@ func TestOnFileRotationNotBlockedByPruneMu(t *testing.T) {
135185
require.NoError(t, createTestReceiptFile(dir, 0, 1))
136186

137187
store, err := NewStore(StoreConfig{
138-
DBDirectory: dir,
139-
MaxBlocksPerFile: 500,
140-
DisableTxIndexLookup: true,
188+
DBDirectory: dir,
189+
MaxBlocksPerFile: 500,
190+
TxIndexBackend: "none",
141191
})
142192
require.NoError(t, err)
143193
t.Cleanup(func() { _ = store.Close() })
@@ -170,10 +220,10 @@ func TestConcurrentReadsPruneAndRotation(t *testing.T) {
170220
}
171221

172222
store, err := NewStore(StoreConfig{
173-
DBDirectory: dir,
174-
MaxBlocksPerFile: 500,
175-
KeepRecent: 1000,
176-
DisableTxIndexLookup: true,
223+
DBDirectory: dir,
224+
MaxBlocksPerFile: 500,
225+
KeepRecent: 1000,
226+
TxIndexBackend: "none",
177227
})
178228
require.NoError(t, err)
179229
t.Cleanup(func() { _ = store.Close() })

sei-db/ledger_db/parquet/store.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/ethereum/go-ethereum/common"
1515
"github.com/parquet-go/parquet-go"
16+
dbconfig "github.com/sei-protocol/sei-chain/sei-db/config"
1617
dbwal "github.com/sei-protocol/sei-chain/sei-db/wal"
1718
"github.com/sei-protocol/seilog"
1819
)
@@ -36,15 +37,15 @@ type StoreConfig struct {
3637
PruneIntervalSeconds int64
3738
BlockFlushInterval uint64
3839
MaxBlocksPerFile uint64
39-
DisableTxIndexLookup bool
40+
TxIndexBackend string
4041
}
4142

4243
// DefaultStoreConfig returns the default store configuration.
4344
func DefaultStoreConfig() StoreConfig {
4445
return StoreConfig{
45-
BlockFlushInterval: defaultBlockFlushInterval,
46-
MaxBlocksPerFile: defaultMaxBlocksPerFile,
47-
DisableTxIndexLookup: true,
46+
BlockFlushInterval: defaultBlockFlushInterval,
47+
MaxBlocksPerFile: defaultMaxBlocksPerFile,
48+
TxIndexBackend: dbconfig.ReceiptTxIndexBackendPebble,
4849
}
4950
}
5051

@@ -104,9 +105,6 @@ type Store struct {
104105
// NewStore creates a new parquet store.
105106
func NewStore(cfg StoreConfig) (*Store, error) {
106107
storeCfg := resolveStoreConfig(cfg)
107-
if !storeCfg.DisableTxIndexLookup {
108-
panic("not implemented")
109-
}
110108

111109
if err := os.MkdirAll(cfg.DBDirectory, 0o750); err != nil {
112110
return nil, fmt.Errorf("failed to create parquet base directory: %w", err)
@@ -156,7 +154,9 @@ func resolveStoreConfig(cfg StoreConfig) StoreConfig {
156154
resolved.DBDirectory = cfg.DBDirectory
157155
resolved.KeepRecent = cfg.KeepRecent
158156
resolved.PruneIntervalSeconds = cfg.PruneIntervalSeconds
159-
resolved.DisableTxIndexLookup = cfg.DisableTxIndexLookup
157+
if cfg.TxIndexBackend != "" {
158+
resolved.TxIndexBackend = cfg.TxIndexBackend
159+
}
160160
if cfg.BlockFlushInterval > 0 {
161161
resolved.BlockFlushInterval = cfg.BlockFlushInterval
162162
}
@@ -200,6 +200,12 @@ func (s *Store) GetReceiptByTxHash(ctx context.Context, txHash common.Hash) (*Re
200200
return s.Reader.GetReceiptByTxHash(ctx, txHash)
201201
}
202202

203+
// GetReceiptByTxHashInBlock narrows the parquet search to the file containing
204+
// blockNumber, falling back to a full scan on miss.
205+
func (s *Store) GetReceiptByTxHashInBlock(ctx context.Context, txHash common.Hash, blockNumber uint64) (*ReceiptResult, error) {
206+
return s.Reader.GetReceiptByTxHashInBlock(ctx, txHash, blockNumber)
207+
}
208+
203209
// GetLogs retrieves logs matching the filter.
204210
func (s *Store) GetLogs(ctx context.Context, filter LogFilter) ([]LogResult, error) {
205211
return s.Reader.GetLogs(ctx, filter)

0 commit comments

Comments
 (0)