diff --git a/benchmark/ingest/main.go b/benchmark/ingest/main.go index 338f33a..bc4672c 100644 --- a/benchmark/ingest/main.go +++ b/benchmark/ingest/main.go @@ -31,8 +31,11 @@ func main() { valueSize := flag.Int("vs", 64, "IMU 样本 value 字节数") qDepth := flag.Int("qdepth", 1024, "有界队列深度;满即记为丢帧") memTableSize := flag.Int("memtable", 4096, "MaxMemTableSize(active 表条目阈值,调小以强制频繁 flush 验内存封顶)") + budget := flag.Int64("budget", 64<<20, "MemTableMaxInflightBytes 字节预算(令牌桶背压);0 关闭背压") flag.Parse() + config.G.MemTableMaxInflightBytes = *budget // 由命令行覆盖,便于 before/after 对比 + rateList, err := parseRates(*rates) if err != nil { fmt.Fprintf(os.Stderr, "invalid -rates: %v\n", err) @@ -45,6 +48,7 @@ func main() { fmt.Printf(" Value size: %d bytes\n", *valueSize) fmt.Printf(" Queue depth: %d\n", *qDepth) fmt.Printf(" MemTable cap: %d entries\n", *memTableSize) + fmt.Printf(" Inflight budget: %d bytes (0=背压关闭)\n", *budget) fmt.Printf(" Duration: sat=%s open-loop=%s/rate\n", *satDur, *dur) fmt.Printf(" Open rates: %v Hz\n", rateList) fmt.Println("========================================") @@ -55,8 +59,8 @@ func main() { hasSat := *satDur > 0 if hasSat { sat = runSaturation(*satDur, *valueSize, *memTableSize) - fmt.Printf("[Saturation] throughput=%.0f writes/s mean=%s heap_peak=%s sys_peak=%s\n\n", - sat.Throughput, lat(sat.MeanLat), mib(sat.HeapPeak), mib(sat.SysPeak)) + fmt.Printf("[Saturation] throughput=%.0f writes/s mean=%s inflight_peak=%s heap_peak=%s\n\n", + sat.Throughput, lat(sat.MeanLat), mib(sat.InflightPeak), mib(sat.HeapPeak)) } // 2) 开环相:各速率档证 0 丢帧 + 尾延迟 + 内存封顶。 @@ -72,17 +76,18 @@ func main() { // Result 单相/单档的压测结果 type Result struct { - Label string // "saturated" 或 "Hz" - RateHz int // 开环目标速率;饱和相为 0 - Duration time.Duration - Produced int64 // 应投递样本数(饱和相 = 写入数) - Dropped int64 // 队列满导致的丢帧数 - Written int64 // 实际写入引擎的样本数 - Throughput float64 // 实际写入吞吐 (writes/sec) - MeanLat time.Duration // 饱和相用:1/throughput + Label string // "saturated" 或 "Hz" + RateHz int // 开环目标速率;饱和相为 0 + Duration time.Duration + Produced int64 // 应投递样本数(饱和相 = 写入数) + Dropped int64 // 队列满导致的丢帧数 + Written int64 // 实际写入引擎的样本数 + Throughput float64 // 实际写入吞吐 (writes/sec) + MeanLat time.Duration // 饱和相用:1/throughput P50, P99, P999, P9999, Max time.Duration - HeapPeak uint64 // HeapAlloc 峰值 (bytes) - SysPeak uint64 // Sys 峰值 (bytes) + HeapPeak uint64 // HeapAlloc 峰值 (bytes) + SysPeak uint64 // Sys 峰值 (bytes) + InflightPeak uint64 // 未刷盘字节信用峰值 (bytes) } // setupEngine 指向临时目录并以小 memtable 创建引擎,返回引擎与清理函数。 @@ -104,8 +109,8 @@ func setupEngine(memTableSize int) (*storage.Engine, *zstorage.MemTable, func()) return engine, memTable, cleanup } -// memSampler 每 100ms 采样一次 MemStats,返回停止函数(调用后回填峰值)。 -func memSampler(heapPeak, sysPeak *uint64) func() { +// memSampler 每 100ms 采样一次堆内存与未刷盘字节信用,返回停止函数(调用后回填峰值)。 +func memSampler(mt *zstorage.MemTable, heapPeak, sysPeak, inflightPeak *uint64) func() { stop := make(chan struct{}) var wg sync.WaitGroup wg.Add(1) @@ -124,6 +129,9 @@ func memSampler(heapPeak, sysPeak *uint64) func() { if ms.Sys > *sysPeak { *sysPeak = ms.Sys } + if inflight := uint64(mt.InflightBytes()); inflight > *inflightPeak { + *inflightPeak = inflight + } case <-stop: return } @@ -138,12 +146,12 @@ func memSampler(heapPeak, sysPeak *uint64) func() { // runSaturation 闭环打满:不做 per-op 计时(避免计时器开销污染亚微秒写), // 只数总量得到吞吐天花板,平均延迟由 1/throughput 推导。 func runSaturation(dur time.Duration, valueSize, memTableSize int) Result { - engine, _, cleanup := setupEngine(memTableSize) + engine, memTable, cleanup := setupEngine(memTableSize) defer cleanup() runtime.GC() // 清掉上一轮残留,使本轮 heap 峰值只反映本轮活对象 - var heapPeak, sysPeak uint64 - stopMem := memSampler(&heapPeak, &sysPeak) + var heapPeak, sysPeak, inflightPeak uint64 + stopMem := memSampler(memTable, &heapPeak, &sysPeak, &inflightPeak) var written, seq int64 start := time.Now() @@ -167,26 +175,27 @@ func runSaturation(dur time.Duration, valueSize, memTableSize int) Result { mean = time.Duration(float64(time.Second) / tput) } return Result{ - Label: "saturated", - Duration: elapsed, - Produced: written, - Written: written, - Throughput: tput, - MeanLat: mean, - HeapPeak: heapPeak, - SysPeak: sysPeak, + Label: "saturated", + Duration: elapsed, + Produced: written, + Written: written, + Throughput: tput, + MeanLat: mean, + HeapPeak: heapPeak, + SysPeak: sysPeak, + InflightPeak: inflightPeak, } } // runOpenLoop 开环定速率:生产者按固定速率非阻塞投递,队列满即丢帧; // 消费者单写入者,per-op 计时(此处速率有界,计时开销可忽略),得到尾延迟。 func runOpenLoop(rateHz int, dur time.Duration, valueSize, qDepth, memTableSize int) Result { - engine, _, cleanup := setupEngine(memTableSize) + engine, memTable, cleanup := setupEngine(memTableSize) defer cleanup() runtime.GC() // 清掉上一轮残留,使本轮 heap 峰值只反映本轮活对象 - var heapPeak, sysPeak uint64 - stopMem := memSampler(&heapPeak, &sysPeak) + var heapPeak, sysPeak, inflightPeak uint64 + stopMem := memSampler(memTable, &heapPeak, &sysPeak, &inflightPeak) type sample struct{ key, value []byte } q := make(chan sample, qDepth) @@ -229,20 +238,21 @@ func runOpenLoop(rateHz int, dur time.Duration, valueSize, qDepth, memTableSize sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] }) return Result{ - Label: fmt.Sprintf("%dHz", rateHz), - RateHz: rateHz, - Duration: elapsed, - Produced: produced, - Dropped: dropped, - Written: written, - Throughput: float64(written) / elapsed.Seconds(), - P50: pct(latencies, 0.50), - P99: pct(latencies, 0.99), - P999: pct(latencies, 0.999), - P9999: pct(latencies, 0.9999), - Max: pct(latencies, 1.0), - HeapPeak: heapPeak, - SysPeak: sysPeak, + Label: fmt.Sprintf("%dHz", rateHz), + RateHz: rateHz, + Duration: elapsed, + Produced: produced, + Dropped: dropped, + Written: written, + Throughput: float64(written) / elapsed.Seconds(), + P50: pct(latencies, 0.50), + P99: pct(latencies, 0.99), + P999: pct(latencies, 0.999), + P9999: pct(latencies, 0.9999), + Max: pct(latencies, 1.0), + HeapPeak: heapPeak, + SysPeak: sysPeak, + InflightPeak: inflightPeak, } } @@ -262,7 +272,7 @@ func printResult(r Result) { fmt.Printf(" produced=%d written=%d dropped=%d\n", r.Produced, r.Written, r.Dropped) fmt.Printf(" throughput=%.0f writes/s p99.9=%s p99.99=%s max=%s\n", r.Throughput, lat(r.P999), lat(r.P9999), lat(r.Max)) - fmt.Printf(" heap_peak=%s sys_peak=%s\n", mib(r.HeapPeak), mib(r.SysPeak)) + fmt.Printf(" inflight_peak=%s heap_peak=%s sys_peak=%s\n", mib(r.InflightPeak), mib(r.HeapPeak), mib(r.SysPeak)) fmt.Println() } @@ -274,11 +284,11 @@ func printTable(hasSat bool, sat Result, rs []Result) { fmt.Printf(" ceiling (saturated): %.0f writes/s heap_peak=%s\n", sat.Throughput, mib(sat.HeapPeak)) } fmt.Println(" --- open-loop (fixed rate) ---") - fmt.Printf(" %-10s %-12s %-9s %-9s %-9s %-10s\n", - "rate", "throughput", "dropped", "p99.9", "max", "heap_peak") + fmt.Printf(" %-10s %-12s %-9s %-9s %-13s %-10s\n", + "rate", "throughput", "dropped", "max", "inflight_peak", "heap_peak") for _, r := range rs { - fmt.Printf(" %-10s %-12.0f %-9d %-9s %-9s %-10s\n", - r.Label, r.Throughput, r.Dropped, lat(r.P999), lat(r.Max), mib(r.HeapPeak)) + fmt.Printf(" %-10s %-12.0f %-9d %-9s %-13s %-10s\n", + r.Label, r.Throughput, r.Dropped, lat(r.Max), mib(r.InflightPeak), mib(r.HeapPeak)) } fmt.Println("========================================") fmt.Println(" Durability: N/A at engine layer (no storage WAL — see A2 / Raft path)") diff --git a/config/global.go b/config/global.go index 3485449..fc3fa01 100644 --- a/config/global.go +++ b/config/global.go @@ -23,6 +23,10 @@ type GlobalConfig struct { MaxMemTableSize int MaxCompactionSize int + // MemTableMaxInflightBytes 未刷盘数据(active + 正在刷的 dirty)的字节预算。 + // 字节级令牌桶背压:超出预算时写入阻塞,等 flush 归还信用。<=0 关闭背压。 + MemTableMaxInflightBytes int64 + TcpServer banIface.IServer MaxConn int MaxPackageSize uint32 @@ -81,25 +85,26 @@ func NewGlobalConfig() *GlobalConfig { logDir := defaultLogDir() global := &GlobalConfig{ - Name: "Raft", - Port: 8080, - Host: "localhost", - Version: "1.0.0", - MaxConn: 1000, - MaxPackageSize: 1024, - WorkerPoolSize: 10, - MaxWorkerTaskLen: 10000, - MaxMsgChanLen: 100, - TcpServer: nil, - MaxMemTableP: 0.5, - MaxMemTableLevel: 32, - MaxMemTableSize: 1024, - WALPath: filepath.Join(logDir, "wal.log"), - SSTablePath: logDir, - Peers: []string{"localhost:8080"}, // 默认单节点 - Me: 0, // 默认节点ID - RaftSnapshotThreshold: 1000, // 默认快照阈值 - RaftSnapshotKeepEntries: 100, // 默认保留条目数 + Name: "Raft", + Port: 8080, + Host: "localhost", + Version: "1.0.0", + MaxConn: 1000, + MaxPackageSize: 1024, + WorkerPoolSize: 10, + MaxWorkerTaskLen: 10000, + MaxMsgChanLen: 100, + TcpServer: nil, + MaxMemTableP: 0.5, + MaxMemTableLevel: 32, + MaxMemTableSize: 1024, + MemTableMaxInflightBytes: 64 << 20, // 64MiB 未刷盘字节预算 + WALPath: filepath.Join(logDir, "wal.log"), + SSTablePath: logDir, + Peers: []string{"localhost:8080"}, // 默认单节点 + Me: 0, // 默认节点ID + RaftSnapshotThreshold: 1000, // 默认快照阈值 + RaftSnapshotKeepEntries: 100, // 默认保留条目数 } global.Init() global.ParseFlags() diff --git a/docs-step/M2-backpressure-result.md b/docs-step/M2-backpressure-result.md new file mode 100644 index 0000000..d7bf82f --- /dev/null +++ b/docs-step/M2-backpressure-result.md @@ -0,0 +1,50 @@ +# M2 · 字节级令牌桶写背压 —— 设计与结果 + +> 承接 M1 发现的「MemTable 写入无背压 → 内存随负载无界增长」。本步给写路径加**字节级令牌桶背压**,并诚实标注它能解决什么、不能解决什么。 + +## 设计 + +- 新增可复用组件 `pkg/credit`:字节信用池 `Pool`(`Acquire`/`TryAcquire`/`Release`/`Used`)。信用不足时 `Acquire` 阻塞,直到持久化方 `Release` 归还。预算 `<=0` 关闭背压。 +- `MemTable` 接入: + - `SkipList` 维护 `byteSize`(覆盖写按新旧 value 差值增量,防漂移)。 + - `Put`/`Delete` 写入前 `acquireCredit(full)`:`TryAcquire` 失败则先 `StartFlush` 再阻塞 `Acquire`,避免永久阻塞。 + - 覆盖写实际增量 < 预占额时,归还多占部分(信用与 `byteSize` 对账,不泄漏)。 + - `Flush` 成功后 `Release(dirty.byteSize)` 归还信用并唤醒被阻塞的写。 + - `Flush` 失败:保留 dirty(不丢数据)并重新触发刷盘重试;修掉了原 `Flush` 在失败后被下次刷盘覆盖 dirty 的隐患。 +- 预算配置:`config.MemTableMaxInflightBytes`,默认 64MiB。 + +## 验证(A 方案:阻塞、0 丢帧) + +### 背压确实绑定并精确封顶(关掉条数触发,让 MemTable 成为瓶颈) +`-memtable 1e8 -sat 5s`: + +| | 吞吐 | inflight 峰值 | heap 峰值 | +|---|---|---|---| +| 背压关闭 `-budget 0` | 1,768,172 w/s | **784.2 MiB(无界)** | 2,014.7 MiB | +| 背压 `-budget 16MiB` | 1,225,599 w/s | **16.0 MiB(精确卡在预算)** | 173.4 MiB | + +→ 未刷盘字节被**精确限制在 16.0MiB = 预算**,堆从 2GiB 降到 173MiB。代价是吞吐被限到 flush 速率(1.77M→1.23M)——**这正是背压的本意**:源跑赢 flush 时,限速写入而非撑爆内存。`pkg/credit` 有单测覆盖(阻塞/解除、超大单条放行、预算关闭、对账归零)。 + +### 复现命令 +```powershell +go run ./benchmark/ingest/ -budget 0 -sat 5s -d 1s -rates 1000 -memtable 100000000 +go run ./benchmark/ingest/ -budget 16777216 -sat 5s -d 1s -rates 1000 -memtable 100000000 +``` + +## 诚实边界:背压不是 M1 那个增长的解药 + +默认小值配置(`MaxMemTableSize=4096`)下,**按条数刷盘(每 4096 条 ≈ 0.37MiB)远早于字节预算触发**,所以背压平时不绑定。实测 100k/200kHz 跑 60s: + +| 速率 | 丢帧 | inflight 峰值 | heap 峰值 | +|---|---|---|---| +| 100 kHz | 0 | 0.4 MiB | 121 MiB | +| 200 kHz | 3,918 | 0.4 MiB | 265 MiB | + +`inflight_peak` 仅 0.4MiB(≈一个 4096 条的表),**未刷盘量根本不大**。但 heap 仍涨到 265MiB——说明 **M1 看到的内存增长不在 MemTable,而在 SSTable 元数据 / bloom 过滤器随文件数累积**(12M 条 / 4096 ≈ 2900 个 SSTable,每个一份常驻 bloom + 索引)。 + +**结论**: +- 背压的角色是**内存硬上限 / 安全网**——当 value 很大或 `MaxMemTableSize` 很高、条数触发不及时,它兜底封顶(已证)。 +- 它**不解决**小值高频下的内存增长,那是 SSTable 元数据/compaction 问题,是**下一条线**(M3 候选):核查 compaction 是否跟得上文件增长、bloom/索引常驻内存能否随 compaction 收敛。 + +## 200kHz 丢帧说明 +开背压后 200kHz/60s 出现 3,918 丢帧,是因为消费者写入被背压/flush 节流,生产者有界队列(qdepth=1024)溢出——属预期的"源超过可持续速率"行为,非引擎崩溃。 diff --git a/pkg/credit/credit.go b/pkg/credit/credit.go new file mode 100644 index 0000000..e18152f --- /dev/null +++ b/pkg/credit/credit.go @@ -0,0 +1,66 @@ +// Package credit 提供字节级信用池(令牌桶式背压):写入方 Acquire 占用字节信用, +// 持久化方 Release 归还信用;信用不足时 Acquire 阻塞,从而把未持久化数据的内存占用 +// 限制在预算之内。它本身与具体存储无关,可被任何"写入快、落盘慢"的路径复用。 +package credit + +import "sync" + +// Pool 是一个按字节计量的阻塞式信用池。零值不可用,请用 New 构造。 +type Pool struct { + mu sync.Mutex + cond *sync.Cond + budget int64 // <=0 表示不限制(背压关闭) + used int64 +} + +// New 构造一个预算为 budget 字节的信用池;budget <= 0 表示不限制。 +func New(budget int64) *Pool { + p := &Pool{budget: budget} + p.cond = sync.NewCond(&p.mu) + return p +} + +// fits 报告在已用 used 之上再占 n 是否允许(调用者须持锁)。 +// 规则:预算关闭、或池空(保证至少一条能写入,避免超大单条永久阻塞)、或不超预算 → 允许。 +func (p *Pool) fits(n int64) bool { + return p.budget <= 0 || p.used == 0 || p.used+n <= p.budget +} + +// TryAcquire 不阻塞地尝试占用 n 字节信用,成功返回 true。 +func (p *Pool) TryAcquire(n int64) bool { + p.mu.Lock() + defer p.mu.Unlock() + if p.fits(n) { + p.used += n + return true + } + return false +} + +// Acquire 占用 n 字节信用;信用不足时阻塞,直到他方 Release 释放出空间。 +func (p *Pool) Acquire(n int64) { + p.mu.Lock() + defer p.mu.Unlock() + for !p.fits(n) { + p.cond.Wait() + } + p.used += n +} + +// Release 归还 n 字节信用并唤醒所有等待者。 +func (p *Pool) Release(n int64) { + p.mu.Lock() + p.used -= n + if p.used < 0 { + p.used = 0 + } + p.mu.Unlock() + p.cond.Broadcast() +} + +// Used 返回当前已占用字节数。 +func (p *Pool) Used() int64 { + p.mu.Lock() + defer p.mu.Unlock() + return p.used +} diff --git a/pkg/credit/credit_test.go b/pkg/credit/credit_test.go new file mode 100644 index 0000000..59e061f --- /dev/null +++ b/pkg/credit/credit_test.go @@ -0,0 +1,87 @@ +package credit + +import ( + "testing" + "time" +) + +func TestAcquireReleaseUsed(t *testing.T) { + p := New(100) + p.Acquire(40) + p.Acquire(30) + if got := p.Used(); got != 70 { + t.Fatalf("used = %d, want 70", got) + } + p.Release(40) + if got := p.Used(); got != 30 { + t.Fatalf("used after release = %d, want 30", got) + } +} + +func TestTryAcquireRejectsWhenFull(t *testing.T) { + p := New(100) + if !p.TryAcquire(90) { + t.Fatal("first TryAcquire(90) should succeed") + } + if p.TryAcquire(20) { + t.Fatal("TryAcquire(20) should fail: 90+20 > 100") + } + if !p.TryAcquire(10) { + t.Fatal("TryAcquire(10) should succeed: 90+10 == 100") + } +} + +func TestAcquireBlocksUntilRelease(t *testing.T) { + p := New(100) + p.Acquire(100) // 占满 + + done := make(chan struct{}) + go func() { + p.Acquire(50) // 应阻塞,直到下面 Release + close(done) + }() + + select { + case <-done: + t.Fatal("Acquire(50) returned while pool full — should have blocked") + case <-time.After(50 * time.Millisecond): + // 仍阻塞,符合预期 + } + + p.Release(60) // used: 100 -> 40,腾出空间 + select { + case <-done: + // 解除阻塞,符合预期 + case <-time.After(time.Second): + t.Fatal("Acquire(50) did not unblock after Release") + } + if got := p.Used(); got != 90 { + t.Fatalf("used = %d, want 90 (40 + 50)", got) + } +} + +func TestOversizeAllowedWhenEmpty(t *testing.T) { + p := New(100) + // 单条超预算:池为空时放行,避免永久阻塞。 + p.Acquire(150) + if got := p.Used(); got != 150 { + t.Fatalf("used = %d, want 150", got) + } +} + +func TestZeroBudgetNeverBlocks(t *testing.T) { + p := New(0) // 关闭背压 + p.Acquire(1 << 30) + if !p.TryAcquire(1 << 30) { + t.Fatal("TryAcquire should always succeed when budget disabled") + } +} + +func TestReleaseClampsAtZero(t *testing.T) { + p := New(100) + p.Acquire(10) + p.Release(50) // 多还也不应为负 + if got := p.Used(); got != 0 { + t.Fatalf("used = %d, want 0", got) + } +} diff --git a/storage/zstorage/memtable.go b/storage/zstorage/memtable.go index 730e7da..8dbf4b8 100644 --- a/storage/zstorage/memtable.go +++ b/storage/zstorage/memtable.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/NeverENG/BanDB/config" + "github.com/NeverENG/BanDB/pkg/credit" "github.com/NeverENG/BanDB/storage/istorage" ) @@ -21,9 +22,10 @@ var ( // SkipList 跳表数据结构,封装跳表的 size、level 和 head 指针 type SkipList struct { - size int - level int - head *SkipNode + size int + level int + head *SkipNode + byteSize int64 // 当前表内 key+value 的累计字节数(覆盖写按增量维护) } // MemTable 基于跳表的双表内存表实现 @@ -40,6 +42,8 @@ type MemTable struct { stopCh chan struct{} sst *SSTable + + credits *credit.Pool // 字节级令牌桶背压:限制未刷盘数据的内存占用 } // SkipNode 跳表节点 @@ -64,6 +68,7 @@ func NewMemTable() *MemTable { compactCh: make(chan bool, 1), stopCh: make(chan struct{}), sst: NewSSTable(), + credits: credit.New(config.G.MemTableMaxInflightBytes), } go mt.FlushWorker() go mt.ListenCompactCh() @@ -156,25 +161,49 @@ func (sl *SkipList) search(key []byte) ([]byte, bool) { // Put 插入或更新键值对,始终操作 active 表 func (m *MemTable) Put(key []byte, value []byte) error { - m.mu.Lock() - defer m.mu.Unlock() + full := int64(len(key)) + int64(len(value)) + m.acquireCredit(full) // 字节级令牌桶背压:信用不足则阻塞 + m.mu.Lock() if m.active == nil || m.active.head == nil { + m.mu.Unlock() + m.credits.Release(full) // 写入未发生,归还信用 return errors.New("NO DATA IN MEMTABLE") } - m.active.insert(key, value) + delta := m.active.insert(key, value) // 检查 active 表大小是否超过阈值,触发刷盘 if m.active.size > config.G.MaxMemTableSize { m.StartFlush() } + m.mu.Unlock() + // 覆盖写实际增量 < 预占的 full,归还多占部分,避免信用单调泄漏 + if over := full - delta; over != 0 { + m.credits.Release(over) + } return nil } +// acquireCredit 为本次写入预占 n 字节信用;不足时先触发刷盘以归还信用,再阻塞等待。 +func (m *MemTable) acquireCredit(n int64) { + if m.credits.TryAcquire(n) { + return + } + m.StartFlush() // 确保有 flush 在路上来归还信用,避免永久阻塞 + m.credits.Acquire(n) +} + +// InflightBytes 返回当前未刷盘(active + 正在刷的 dirty)占用的字节信用,供观测/压测使用。 +func (m *MemTable) InflightBytes() int64 { + return m.credits.Used() +} + // insert 在跳表中插入键值对(无锁版本,由调用者保证线程安全) -func (sl *SkipList) insert(key []byte, value []byte) { +// insert 插入或覆盖 key,并返回本次操作使 byteSize 变化的增量(覆盖写可能为负)。 +// 调用方用该增量做背压信用对账。 +func (sl *SkipList) insert(key []byte, value []byte) int64 { update := make([]*SkipNode, maxLevel) p := sl.head @@ -188,9 +217,11 @@ func (sl *SkipList) insert(key []byte, value []byte) { // 检查 key 是否已存在 p = p.Next[0] if p != nil && bytes.Compare(p.Key, key) == 0 { - // key 已存在,更新值 + // key 已存在,更新值:byteSize 按新旧 value 差值调整 + delta := int64(len(value)) - int64(len(p.Value)) p.Value = value - return + sl.byteSize += delta + return delta } // 生成新节点的随机层级 @@ -210,25 +241,36 @@ func (sl *SkipList) insert(key []byte, value []byte) { } sl.size++ + delta := int64(len(key)) + int64(len(value)) + sl.byteSize += delta + return delta } // Delete 删除指定 key 的节点,始终操作 active 表 func (m *MemTable) Delete(key []byte) error { - m.mu.Lock() - defer m.mu.Unlock() + full := int64(len(key)) // 墓碑 value 为 nil + m.acquireCredit(full) + m.mu.Lock() if m.active == nil || m.active.head == nil { + m.mu.Unlock() + m.credits.Release(full) return errors.New("NO DATA IN MEMTABLE") } // 写入墓碑(Value==nil)而非物理删除:物理删除只能去掉 active 中的节点, // 无法 shadow 已 flush 到 SSTable 的同名旧值——读路径会从 SSTable 把它「复活」。 // 删除因此变为幂等盲写,不再返回 key not found。 - m.active.insert(key, nil) + delta := m.active.insert(key, nil) if m.active.size > config.G.MaxMemTableSize { m.StartFlush() } + m.mu.Unlock() + + if over := full - delta; over != 0 { + m.credits.Release(over) + } return nil } @@ -285,13 +327,15 @@ func (m *MemTable) StartFlush() { func (m *MemTable) Flush() { // 步骤 1-2: 持锁进行交换(快速操作) m.mu.Lock() - if m.active.size == 0 { - m.mu.Unlock() - return + // dirty 非 nil 说明上一次刷盘失败遗留,本次直接重试它,不再交换(避免覆盖丢数据) + if m.dirty == nil { + if m.active.size == 0 { + m.mu.Unlock() + return + } + m.dirty = m.active + m.active = newSkipList() } - - m.dirty = m.active - m.active = newSkipList() dirty := m.dirty m.mu.Unlock() @@ -301,13 +345,16 @@ func (m *MemTable) Flush() { err := m.sst.WriteToSSTable(allEntries) if err != nil { slog.Error("flush error", "error", err) + m.StartFlush() // 重试;dirty 保留不丢数据,信用待重试成功后再释放 return } m.mu.Lock() + freed := dirty.byteSize m.dirty = nil m.mu.Unlock() + m.credits.Release(freed) // 归还信用,唤醒被背压阻塞的写 slog.Debug("flush completed") }