-
Notifications
You must be signed in to change notification settings - Fork 885
State Store: Compact pruned key range after each prune #3675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,6 +51,13 @@ const ( | |
| PruneCommitBatchSize = 50 | ||
| DeleteCommitBatchSize = 50 | ||
| MinWALEntriesToKeep = 1000 | ||
|
|
||
| // maxConcurrentCompactions is the upper bound for the number of compactions | ||
| // Pebble may run in parallel. Pebble's default range is {1,1}, but a single | ||
| // compactor cannot keep up with the tombstone churn that pruning generates, | ||
| // so deleted data accumulates and slows every subsequent prune scan. Allowing | ||
| // Pebble to burst up to a few compactions clears that backlog. | ||
| maxConcurrentCompactions = 4 | ||
| ) | ||
|
|
||
| var ( | ||
|
|
@@ -125,6 +132,9 @@ func OpenDB(dataDir string, config config.StateStoreConfig) (types.StateStore, e | |
| LBaseMaxBytes: 64 << 20, // 64 MB | ||
| MemTableSize: 64 << 20, | ||
| MemTableStopWritesThreshold: 4, | ||
| // Let Pebble run several compactions in parallel so it can keep up with | ||
| // the tombstone churn produced by pruning. See maxConcurrentCompactions. | ||
| CompactionConcurrencyRange: func() (int, int) { return 1, maxConcurrentCompactions }, | ||
| } | ||
|
|
||
| // Configure L0 with explicit settings | ||
|
|
@@ -507,6 +517,26 @@ func (db *Database) Prune(version int64) error { | |
| return db.pruneAscending(version) | ||
| } | ||
|
|
||
| // compactPrunedRange compacts only the span of keys that a prune pass deleted so | ||
| // Pebble reclaims the tombstoned space right away. Without it, deleted keys pile | ||
| // up as un-compacted tombstones and every subsequent full-DB prune scan has to | ||
| // read through them, which makes prune latency creep upward the longer a node | ||
| // stays up (and is why restarting a node temporarily relieves head-lag: the | ||
| // reopen triggers compaction). first and last are the smallest and largest | ||
| // encoded keys deleted during the pass, in Pebble comparer order; both are nil | ||
| // when nothing was deleted, in which case compaction is skipped entirely. | ||
| func (db *Database) compactPrunedRange(first, last []byte) error { | ||
| if first == nil { | ||
| return nil | ||
| } | ||
| // Pebble's Compact treats [start, end] as an inclusive range but requires | ||
| // start < end. Appending a zero byte extends the user-key portion of last, | ||
| // yielding a key strictly greater than it under both the MVCC and default | ||
| // comparers, so the entire deleted span is covered. | ||
| end := append(slices.Clone(last), 0) | ||
| return db.storage.Compact(context.Background(), first, end, true) | ||
| } | ||
|
|
||
| // Iterator dispatches between descending- and ascending-mode implementations | ||
| // depending on the on-disk encoding detected at open time. | ||
| func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (dbm.Iterator, error) { | ||
|
|
@@ -611,11 +641,12 @@ func (db *Database) pruneDescending(version int64) (_err error) { | |
| defer func() { _ = batch.Close() }() | ||
|
|
||
| var ( | ||
| counter int | ||
| prevKey []byte | ||
| keptBelowPrune bool | ||
| prevStore string | ||
| scanReads int64 | ||
| counter int | ||
| prevKey []byte | ||
| keptBelowPrune bool | ||
| prevStore string | ||
| scanReads int64 | ||
| firstDeletedKey, lastDeletedKey []byte | ||
| ) | ||
|
|
||
| for itr.First(); itr.Valid(); { | ||
|
|
@@ -683,6 +714,13 @@ func (db *Database) pruneDescending(version int64) (_err error) { | |
| if err := batch.Delete(currKeyEncoded, nil); err != nil { | ||
| return err | ||
| } | ||
| // Track the deleted span (keys are visited in comparer order, so | ||
| // the first delete is the smallest and the last is the largest) | ||
| // to compact just that range once pruning completes. | ||
| if firstDeletedKey == nil { | ||
| firstDeletedKey = currKeyEncoded | ||
| } | ||
| lastDeletedKey = currKeyEncoded | ||
| counter++ | ||
| if counter >= PruneCommitBatchSize { | ||
| writeCount := int64(batch.Count()) | ||
|
|
@@ -710,7 +748,10 @@ func (db *Database) pruneDescending(version int64) (_err error) { | |
| } | ||
| db.operationMetrics.AddRead(scanReads) | ||
|
|
||
| return db.SetEarliestVersion(earliestVersion, false) | ||
| if err := db.SetEarliestVersion(earliestVersion, false); err != nil { | ||
| return err | ||
| } | ||
| return db.compactPrunedRange(firstDeletedKey, lastDeletedKey) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [suggestion]
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Kbhat1 this is worth addressing in a follow up PR I think. |
||
| } | ||
|
|
||
| func (db *Database) iteratorDescending(storeKey string, version int64, start, end []byte) (dbm.Iterator, error) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| package mvcc | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/cockroachdb/pebble/v2" | ||
| "github.com/stretchr/testify/require" | ||
| "golang.org/x/exp/slices" | ||
|
|
||
| "github.com/sei-protocol/sei-chain/sei-db/config" | ||
| sstest "github.com/sei-protocol/sei-chain/sei-db/db_engine/test" | ||
| ) | ||
|
|
||
| const compactionTestStore = "store1" // matches the store key used by sstest.FillData | ||
|
|
||
| func newCompactionTestDB(t *testing.T) *Database { | ||
| t.Helper() | ||
|
|
||
| cfg := config.DefaultStateStoreConfig() | ||
| cfg.Backend = "pebbledb" | ||
|
|
||
| store, err := OpenDB(t.TempDir(), cfg) | ||
| require.NoError(t, err) | ||
| t.Cleanup(func() { _ = store.Close() }) | ||
|
|
||
| return store.(*Database) | ||
| } | ||
|
|
||
| // TestPruneCompactsDeletedRange verifies that a prune which deletes keys triggers | ||
| // a compaction of the pruned range, and that live data survives it. Without the | ||
| // post-prune compaction the deleted keys would linger as tombstones and make | ||
| // every later prune scan progressively slower (the root cause of the head-lag | ||
| // that creeps in with node uptime). | ||
| func TestPruneCompactsDeletedRange(t *testing.T) { | ||
| db := newCompactionTestDB(t) | ||
|
|
||
| require.NoError(t, sstest.FillData(db, 10, 50)) | ||
|
|
||
| // Push the data into SSTables so the post-prune compaction has files to act on. | ||
| require.NoError(t, db.storage.Flush()) | ||
| compactionsBefore := db.storage.Metrics().Compact.Count | ||
|
|
||
| require.NoError(t, db.Prune(25)) | ||
|
|
||
| compactionsAfter := db.storage.Metrics().Compact.Count | ||
| require.Greater(t, compactionsAfter, compactionsBefore, | ||
| "a prune that deletes keys should compact the range it pruned") | ||
|
|
||
| // Live data is preserved: versions <= 25 are gone, later versions remain. | ||
| bz, err := db.Get(compactionTestStore, 25, []byte("key000")) | ||
| require.NoError(t, err) | ||
| require.Nil(t, bz) | ||
|
|
||
| bz, err = db.Get(compactionTestStore, 50, []byte("key000")) | ||
| require.NoError(t, err) | ||
| require.Equal(t, []byte("val000-050"), bz) | ||
| } | ||
|
|
||
| // TestPruneWithoutDeletionsSkipsCompaction verifies the guard that skips | ||
| // compaction entirely when a prune pass deleted nothing, so idle prunes stay | ||
| // cheap. The data is deliberately left in the memtable (no flush) so no | ||
| // background compaction can be scheduled and pollute the count. | ||
| func TestPruneWithoutDeletionsSkipsCompaction(t *testing.T) { | ||
| db := newCompactionTestDB(t) | ||
|
|
||
| require.NoError(t, sstest.FillData(db, 4, 4)) | ||
|
|
||
| compactionsBefore := db.storage.Metrics().Compact.Count | ||
| // No version is <= 0, so this prune deletes nothing and must not compact. | ||
| require.NoError(t, db.Prune(0)) | ||
| require.Equal(t, compactionsBefore, db.storage.Metrics().Compact.Count, | ||
| "a prune that deletes nothing must not trigger a compaction") | ||
| } | ||
|
|
||
| // TestCompactPrunedRangeSingleKey verifies the inclusive-bound math for the | ||
| // degenerate case where a prune deleted exactly one key (first == last). Pebble | ||
| // rejects Compact unless start < end, so the helper must derive an end bound | ||
| // strictly greater than the single deleted key. | ||
| func TestCompactPrunedRangeSingleKey(t *testing.T) { | ||
| db := newCompactionTestDB(t) | ||
|
|
||
| key := db.mvccEncode([]byte("s/k:store1/key000"), 1) | ||
|
|
||
| // The derived end bound must sort strictly after the deleted key under the | ||
| // MVCC comparer (the default for this config). | ||
| end := append(slices.Clone(key), 0) | ||
| require.Equal(t, -1, MVCCKeyCompare(key, end)) | ||
|
|
||
| // And the compaction itself must not be rejected. | ||
| require.NoError(t, db.compactPrunedRange(key, key)) | ||
| } | ||
|
|
||
| // TestPruneAscendingCompactsDeletedRange covers the legacy ascending-encoding | ||
| // prune path end to end: a prune that deletes keys must compact the pruned | ||
| // range while preserving live data. New DBs use the descending path, so the | ||
| // directory is first seeded with a legacy-style DB to force ascending mode. | ||
| func TestPruneAscendingCompactsDeletedRange(t *testing.T) { | ||
| dir := t.TempDir() | ||
|
|
||
| // Seed a legacy-style DB: ascending-encoded data plus a latestVersionKey but | ||
| // no descending marker, so OpenDB selects ascending mode. | ||
| { | ||
| raw, err := pebble.Open(dir, &pebble.Options{Comparer: MVCCComparer}) | ||
| require.NoError(t, err) | ||
| seedKey := MVCCEncodeAscending(prependStoreKey(compactionTestStore, []byte("seed")), 1) | ||
| require.NoError(t, raw.Set(seedKey, MVCCEncodeAscending([]byte("v"), 0), pebble.Sync)) | ||
| var ts [VersionSize]byte | ||
| ts[0] = 1 | ||
| require.NoError(t, raw.Set([]byte(latestVersionKey), ts[:], pebble.Sync)) | ||
| require.NoError(t, raw.Close()) | ||
| } | ||
|
|
||
| cfg := config.DefaultStateStoreConfig() | ||
| cfg.Backend = "pebbledb" | ||
| store, err := OpenDB(dir, cfg) | ||
| require.NoError(t, err) | ||
| t.Cleanup(func() { _ = store.Close() }) | ||
|
|
||
| db := store.(*Database) | ||
| require.False(t, db.descending, "seeded legacy DB must open in ascending mode") | ||
|
|
||
| require.NoError(t, sstest.FillData(db, 10, 50)) | ||
|
|
||
| // Push the data into SSTables so the post-prune compaction has files to act on. | ||
| require.NoError(t, db.storage.Flush()) | ||
| compactionsBefore := db.storage.Metrics().Compact.Count | ||
|
|
||
| require.NoError(t, db.Prune(25)) | ||
|
|
||
| require.Greater(t, db.storage.Metrics().Compact.Count, compactionsBefore, | ||
| "an ascending prune that deletes keys should compact the range it pruned") | ||
|
|
||
| // Live data is preserved: versions <= 25 are gone, later versions remain. | ||
| bz, err := db.Get(compactionTestStore, 25, []byte("key000")) | ||
| require.NoError(t, err) | ||
| require.Nil(t, bz) | ||
|
|
||
| bz, err = db.Get(compactionTestStore, 50, []byte("key000")) | ||
| require.NoError(t, err) | ||
| require.Equal(t, []byte("val000-050"), bz) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Minor:
append(slices.Clone(last), 0)is correct and safe (clone prevents mutatinglast). For readability you could note that this relies on0x00being the minimum byte so the extended key sorts immediately afterlastfor any suffix — which the test asserts. No change required.