Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions sei-db/db_engine/pebbledb/mvcc/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ const (
PruneCommitBatchSize = 50
DeleteCommitBatchSize = 50
MinWALEntriesToKeep = 1000

// maxConcurrentCompactions is the upper bound for the number of compactions
// Pebble may run in parallel. Pebble's default range is {1,1}, but a single
// compactor cannot keep up with the tombstone churn that pruning generates,
// so deleted data accumulates and slows every subsequent prune scan. Allowing
// Pebble to burst up to a few compactions clears that backlog.
maxConcurrentCompactions = 4
)

var (
Expand Down Expand Up @@ -121,6 +128,9 @@ func OpenDB(dataDir string, config config.StateStoreConfig) (types.StateStore, e
LBaseMaxBytes: 64 << 20, // 64 MB
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
// Let Pebble run several compactions in parallel so it can keep up with
// the tombstone churn produced by pruning. See maxConcurrentCompactions.
CompactionConcurrencyRange: func() (int, int) { return 1, maxConcurrentCompactions },
}

// Configure L0 with explicit settings
Expand Down Expand Up @@ -489,6 +499,26 @@ func (db *Database) Prune(version int64) error {
return db.pruneAscending(version)
}

// compactPrunedRange compacts only the span of keys that a prune pass deleted so
// Pebble reclaims the tombstoned space right away. Without it, deleted keys pile
// up as un-compacted tombstones and every subsequent full-DB prune scan has to
// read through them, which makes prune latency creep upward the longer a node
// stays up (and is why restarting a node temporarily relieves head-lag: the
// reopen triggers compaction). first and last are the smallest and largest
// encoded keys deleted during the pass, in Pebble comparer order; both are nil
// when nothing was deleted, in which case compaction is skipped entirely.
func (db *Database) compactPrunedRange(first, last []byte) error {
if first == nil {
return nil
}
// Pebble's Compact treats [start, end] as an inclusive range but requires
// start < end. Appending a zero byte extends the user-key portion of last,
// yielding a key strictly greater than it under both the MVCC and default
// comparers, so the entire deleted span is covered.
end := append(slices.Clone(last), 0)
return db.storage.Compact(context.Background(), first, end, true)
}

// Iterator dispatches between descending- and ascending-mode implementations
// depending on the on-disk encoding detected at open time.
func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (dbm.Iterator, error) {
Expand Down Expand Up @@ -592,10 +622,11 @@ func (db *Database) pruneDescending(version int64) (_err error) {
defer func() { _ = batch.Close() }()

var (
counter int
prevKey []byte
keptBelowPrune bool
prevStore string
counter int
prevKey []byte
keptBelowPrune bool
prevStore string
firstDeletedKey, lastDeletedKey []byte
)

for itr.First(); itr.Valid(); {
Expand Down Expand Up @@ -662,6 +693,13 @@ func (db *Database) pruneDescending(version int64) (_err error) {
if err := batch.Delete(currKeyEncoded, nil); err != nil {
return err
}
// Track the deleted span (keys are visited in comparer order, so
// the first delete is the smallest and the last is the largest)
// to compact just that range once pruning completes.
if firstDeletedKey == nil {
firstDeletedKey = currKeyEncoded
}
lastDeletedKey = currKeyEncoded
counter++
if counter >= PruneCommitBatchSize {
if err := batch.Commit(defaultWriteOpts); err != nil {
Expand All @@ -684,7 +722,10 @@ func (db *Database) pruneDescending(version int64) (_err error) {
}
}

return db.SetEarliestVersion(earliestVersion, false)
if err := db.SetEarliestVersion(earliestVersion, false); err != nil {
return err
}
return db.compactPrunedRange(firstDeletedKey, lastDeletedKey)
}

func (db *Database) iteratorDescending(storeKey string, version int64, start, end []byte) (dbm.Iterator, error) {
Expand Down
14 changes: 13 additions & 1 deletion sei-db/db_engine/pebbledb/mvcc/db_ascending.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (db *Database) pruneAscending(version int64) (_err error) {
prevKey, prevKeyEncoded, prevValEncoded []byte
prevVersionDecoded int64
prevStore string
firstDeletedKey, lastDeletedKey []byte
)

for itr.First(); itr.Valid(); {
Expand Down Expand Up @@ -178,6 +179,14 @@ func (db *Database) pruneAscending(version int64) (_err error) {
return err
}

// Track the deleted span (keys are visited in comparer order, so the
// first delete is the smallest and the last is the largest) to compact
// just that range once pruning completes.
if firstDeletedKey == nil {
firstDeletedKey = prevKeyEncoded
}
lastDeletedKey = prevKeyEncoded

counter++
if counter >= PruneCommitBatchSize {
err = batch.Commit(defaultWriteOpts)
Expand Down Expand Up @@ -207,7 +216,10 @@ func (db *Database) pruneAscending(version int64) (_err error) {
}
}

return db.SetEarliestVersion(earliestVersion, false)
if err := db.SetEarliestVersion(earliestVersion, false); err != nil {
return err
}
return db.compactPrunedRange(firstDeletedKey, lastDeletedKey)
}

func (db *Database) iteratorAscending(storeKey string, version int64, start, end []byte) (dbm.Iterator, error) {
Expand Down
141 changes: 141 additions & 0 deletions sei-db/db_engine/pebbledb/mvcc/prune_compaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package mvcc

import (
"testing"

"github.com/cockroachdb/pebble/v2"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"

"github.com/sei-protocol/sei-chain/sei-db/config"
sstest "github.com/sei-protocol/sei-chain/sei-db/db_engine/test"
)

const compactionTestStore = "store1" // matches the store key used by sstest.FillData

func newCompactionTestDB(t *testing.T) *Database {
t.Helper()

cfg := config.DefaultStateStoreConfig()
cfg.Backend = "pebbledb"

store, err := OpenDB(t.TempDir(), cfg)
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })

return store.(*Database)
}

// TestPruneCompactsDeletedRange verifies that a prune which deletes keys triggers
// a compaction of the pruned range, and that live data survives it. Without the
// post-prune compaction the deleted keys would linger as tombstones and make
// every later prune scan progressively slower (the root cause of the head-lag
// that creeps in with node uptime).
func TestPruneCompactsDeletedRange(t *testing.T) {
db := newCompactionTestDB(t)

require.NoError(t, sstest.FillData(db, 10, 50))

// Push the data into SSTables so the post-prune compaction has files to act on.
require.NoError(t, db.storage.Flush())
compactionsBefore := db.storage.Metrics().Compact.Count

require.NoError(t, db.Prune(25))

compactionsAfter := db.storage.Metrics().Compact.Count
require.Greater(t, compactionsAfter, compactionsBefore,
"a prune that deletes keys should compact the range it pruned")

// Live data is preserved: versions <= 25 are gone, later versions remain.
bz, err := db.Get(compactionTestStore, 25, []byte("key000"))
require.NoError(t, err)
require.Nil(t, bz)

bz, err = db.Get(compactionTestStore, 50, []byte("key000"))
require.NoError(t, err)
require.Equal(t, []byte("val000-050"), bz)
}

// TestPruneWithoutDeletionsSkipsCompaction verifies the guard that skips
// compaction entirely when a prune pass deleted nothing, so idle prunes stay
// cheap. The data is deliberately left in the memtable (no flush) so no
// background compaction can be scheduled and pollute the count.
func TestPruneWithoutDeletionsSkipsCompaction(t *testing.T) {
db := newCompactionTestDB(t)

require.NoError(t, sstest.FillData(db, 4, 4))

compactionsBefore := db.storage.Metrics().Compact.Count
// No version is <= 0, so this prune deletes nothing and must not compact.
require.NoError(t, db.Prune(0))
require.Equal(t, compactionsBefore, db.storage.Metrics().Compact.Count,
"a prune that deletes nothing must not trigger a compaction")
}

// TestCompactPrunedRangeSingleKey verifies the inclusive-bound math for the
// degenerate case where a prune deleted exactly one key (first == last). Pebble
// rejects Compact unless start < end, so the helper must derive an end bound
// strictly greater than the single deleted key.
func TestCompactPrunedRangeSingleKey(t *testing.T) {
db := newCompactionTestDB(t)

key := db.mvccEncode([]byte("s/k:store1/key000"), 1)

// The derived end bound must sort strictly after the deleted key under the
// MVCC comparer (the default for this config).
end := append(slices.Clone(key), 0)
require.Equal(t, -1, MVCCKeyCompare(key, end))

// And the compaction itself must not be rejected.
require.NoError(t, db.compactPrunedRange(key, key))
}

// TestPruneAscendingCompactsDeletedRange covers the legacy ascending-encoding
// prune path end to end: a prune that deletes keys must compact the pruned
// range while preserving live data. New DBs use the descending path, so the
// directory is first seeded with a legacy-style DB to force ascending mode.
func TestPruneAscendingCompactsDeletedRange(t *testing.T) {
dir := t.TempDir()

// Seed a legacy-style DB: ascending-encoded data plus a latestVersionKey but
// no descending marker, so OpenDB selects ascending mode.
{
raw, err := pebble.Open(dir, &pebble.Options{Comparer: MVCCComparer})
require.NoError(t, err)
seedKey := MVCCEncodeAscending(prependStoreKey(compactionTestStore, []byte("seed")), 1)
require.NoError(t, raw.Set(seedKey, MVCCEncodeAscending([]byte("v"), 0), pebble.Sync))
var ts [VersionSize]byte
ts[0] = 1
require.NoError(t, raw.Set([]byte(latestVersionKey), ts[:], pebble.Sync))
require.NoError(t, raw.Close())
}

cfg := config.DefaultStateStoreConfig()
cfg.Backend = "pebbledb"
store, err := OpenDB(dir, cfg)
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })

db := store.(*Database)
require.False(t, db.descending, "seeded legacy DB must open in ascending mode")

require.NoError(t, sstest.FillData(db, 10, 50))

// Push the data into SSTables so the post-prune compaction has files to act on.
require.NoError(t, db.storage.Flush())
compactionsBefore := db.storage.Metrics().Compact.Count

require.NoError(t, db.Prune(25))

require.Greater(t, db.storage.Metrics().Compact.Count, compactionsBefore,
"an ascending prune that deletes keys should compact the range it pruned")

// Live data is preserved: versions <= 25 are gone, later versions remain.
bz, err := db.Get(compactionTestStore, 25, []byte("key000"))
require.NoError(t, err)
require.Nil(t, bz)

bz, err = db.Get(compactionTestStore, 50, []byte("key000"))
require.NoError(t, err)
require.Equal(t, []byte("val000-050"), bz)
}
Loading