feat(storage): MemTable 字节级令牌桶写背压#125
Conversation
- 新增 pkg/credit:可复用字节信用池(Acquire/Release/TryAcquire),含单测 - MemTable Put/Delete 写入前预占字节信用,不足则触发 flush 并阻塞;flush 归还信用 - SkipList 维护 byteSize(覆盖写按增量),信用对账防泄漏 - Flush 失败保留 dirty 重试,修掉覆盖丢数据隐患 - 配置 MemTableMaxInflightBytes(默认 64MiB,<=0 关闭) - bench 加 -budget 开关与 inflight 采样:实测未刷盘字节精确卡在预算(16MiB),堆 2GiB→173MiB 诚实边界见 docs-step/M2-backpressure-result.md:背压是内存硬上限/安全网, 不解决 M1 小值高频下 SSTable 元数据累积导致的增长(另一条线)。 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR adds byte-level token-bucket backpressure to MemTable writes using a new reusable credit pool. The feature prevents unbounded in-memory growth by blocking writes when unflushed bytes exceed a configured budget (64MiB default), reconciles byte accounting on overwrites, and releases credits after successful flush. Benchmark observability now tracks inflight peak bytes. ChangesByte-Level Backpressure for MemTable
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🐯 BanGD 数据库内核评审整体风险:🟡 中 变更总结:## PR 变更总结 本 PR 在 MemTable 写入路径(存储层内存表)引入了字节级令牌桶背压,解决 M1 发现的「写入无背压 → 内存随负载无界增长」问题。核心改法在三个层面:
PR 附带了详尽的验证数据(背压 16MiB 下 inflight 精确封顶 16.0MiB,heap 从 2GiB 降到 173MiB),并诚实标注了背压的边界——「不解决小值高频下 SSTable 元数据累积」。
架构问题(共 4 项)
普通问题(共 1 项)💡 [建议 · 兼容]
本次评审消耗 token:共 287836 tokens(输入 248289,输出 11387,缓存命中 28160,缓存写入 0)|维度 [concurrency, memory, lock, storage, performance]|补充阅读周边文件 [storage/engine.go, config/config.json]|对抗式复核 3 票/条,过滤疑似误报 1 条 |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@benchmark/ingest/main.go`:
- Around line 34-37: The code always overwrites
config.G.MemTableMaxInflightBytes with a hard-coded default (64<<20) when the
-budget flag is not provided; change the flag default to the current config
value so omission preserves configured settings. Specifically, when creating the
flag (flag.Int64/flag.Int64Var for variable budget) use
config.G.MemTableMaxInflightBytes as the default instead of 64<<20, then after
flag.Parse() assign config.G.MemTableMaxInflightBytes = *budget only if the flag
was actually parsed (or simply rely on the flag default matching the config so
the assignment is safe). Reference symbols: budget, flag.Int64/flag.Int64Var,
flag.Parse, config.G.MemTableMaxInflightBytes.
In `@pkg/credit/credit.go`:
- Around line 29-58: Ensure the exported API rejects negative amounts: in
TryAcquire(n int64) return false immediately for n <= 0 (treat 0 as no-op), and
in Acquire(n int64) and Release(n int64) validate n >= 0 at the top and panic on
n < 0 (treat n == 0 as a no-op) so callers cannot corrupt Pool.used; add these
guards to the start of Pool.TryAcquire, Pool.Acquire and Pool.Release (keep
existing locking/cond logic unchanged and use Pool.fits as before).
In `@storage/zstorage/memtable.go`:
- Around line 189-196: The blocking path in MemTable.acquireCredit can deadlock
writers during shutdown; change acquireCredit so the blocking wait is
shutdown-aware: instead of calling m.credits.Acquire(n) unconditionally, perform
a cancellable wait that selects on credit availability and m.stopCh (or use a
context-aware Acquire if available), and return an error when stopCh is closed;
update callers (Put/Delete call sites referenced around the blocks at the
Put/Delete flows) to handle the new error return (propagate or abort the
operation) and ensure StartFlush() is still invoked before entering the
cancellable wait.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 6ff30fa5-c309-4882-b3aa-969642207fb9
📒 Files selected for processing (6)
benchmark/ingest/main.goconfig/global.godocs-step/M2-backpressure-result.mdpkg/credit/credit.gopkg/credit/credit_test.gostorage/zstorage/memtable.go
| budget := flag.Int64("budget", 64<<20, "MemTableMaxInflightBytes 字节预算(令牌桶背压);0 关闭背压") | ||
| flag.Parse() | ||
|
|
||
| config.G.MemTableMaxInflightBytes = *budget // 由命令行覆盖,便于 before/after 对比 |
There was a problem hiding this comment.
Don't clobber the configured inflight budget when -budget is omitted.
This flag defaults to a hard-coded 64<<20, and Line 37 writes it back unconditionally, so any MemTableMaxInflightBytes value loaded from config is lost unless the caller passes -budget explicitly.
Suggested fix
- budget := flag.Int64("budget", 64<<20, "MemTableMaxInflightBytes 字节预算(令牌桶背压);0 关闭背压")
+ budget := flag.Int64("budget", config.G.MemTableMaxInflightBytes, "MemTableMaxInflightBytes 字节预算(令牌桶背压);0 关闭背压")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@benchmark/ingest/main.go` around lines 34 - 37, The code always overwrites
config.G.MemTableMaxInflightBytes with a hard-coded default (64<<20) when the
-budget flag is not provided; change the flag default to the current config
value so omission preserves configured settings. Specifically, when creating the
flag (flag.Int64/flag.Int64Var for variable budget) use
config.G.MemTableMaxInflightBytes as the default instead of 64<<20, then after
flag.Parse() assign config.G.MemTableMaxInflightBytes = *budget only if the flag
was actually parsed (or simply rely on the flag default matching the config so
the assignment is safe). Reference symbols: budget, flag.Int64/flag.Int64Var,
flag.Parse, config.G.MemTableMaxInflightBytes.
| // TryAcquire 不阻塞地尝试占用 n 字节信用,成功返回 true。 | ||
| func (p *Pool) TryAcquire(n int64) bool { | ||
| p.mu.Lock() | ||
| defer p.mu.Unlock() | ||
| if p.fits(n) { | ||
| p.used += n | ||
| return true | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // Acquire 占用 n 字节信用;信用不足时阻塞,直到他方 Release 释放出空间。 | ||
| func (p *Pool) Acquire(n int64) { | ||
| p.mu.Lock() | ||
| defer p.mu.Unlock() | ||
| for !p.fits(n) { | ||
| p.cond.Wait() | ||
| } | ||
| p.used += n | ||
| } | ||
|
|
||
| // Release 归还 n 字节信用并唤醒所有等待者。 | ||
| func (p *Pool) Release(n int64) { | ||
| p.mu.Lock() | ||
| p.used -= n | ||
| if p.used < 0 { | ||
| p.used = 0 | ||
| } | ||
| p.mu.Unlock() | ||
| p.cond.Broadcast() |
There was a problem hiding this comment.
Reject negative credit amounts at the API boundary.
TryAcquire(-n) / Acquire(-n) currently reduce used, and Release(-n) increases it, so any caller can corrupt the pool’s accounting through the exported API.
Suggested fix
+func requireNonNegative(n int64) {
+ if n < 0 {
+ panic("credit: negative amount")
+ }
+}
+
// TryAcquire 不阻塞地尝试占用 n 字节信用,成功返回 true。
func (p *Pool) TryAcquire(n int64) bool {
+ requireNonNegative(n)
p.mu.Lock()
defer p.mu.Unlock()
if p.fits(n) {
@@
// Acquire 占用 n 字节信用;信用不足时阻塞,直到他方 Release 释放出空间。
func (p *Pool) Acquire(n int64) {
+ requireNonNegative(n)
p.mu.Lock()
defer p.mu.Unlock()
for !p.fits(n) {
@@
// Release 归还 n 字节信用并唤醒所有等待者。
func (p *Pool) Release(n int64) {
+ requireNonNegative(n)
p.mu.Lock()
p.used -= n📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // TryAcquire 不阻塞地尝试占用 n 字节信用,成功返回 true。 | |
| func (p *Pool) TryAcquire(n int64) bool { | |
| p.mu.Lock() | |
| defer p.mu.Unlock() | |
| if p.fits(n) { | |
| p.used += n | |
| return true | |
| } | |
| return false | |
| } | |
| // Acquire 占用 n 字节信用;信用不足时阻塞,直到他方 Release 释放出空间。 | |
| func (p *Pool) Acquire(n int64) { | |
| p.mu.Lock() | |
| defer p.mu.Unlock() | |
| for !p.fits(n) { | |
| p.cond.Wait() | |
| } | |
| p.used += n | |
| } | |
| // Release 归还 n 字节信用并唤醒所有等待者。 | |
| func (p *Pool) Release(n int64) { | |
| p.mu.Lock() | |
| p.used -= n | |
| if p.used < 0 { | |
| p.used = 0 | |
| } | |
| p.mu.Unlock() | |
| p.cond.Broadcast() | |
| func requireNonNegative(n int64) { | |
| if n < 0 { | |
| panic("credit: negative amount") | |
| } | |
| } | |
| // TryAcquire 不阻塞地尝试占用 n 字节信用,成功返回 true。 | |
| func (p *Pool) TryAcquire(n int64) bool { | |
| requireNonNegative(n) | |
| p.mu.Lock() | |
| defer p.mu.Unlock() | |
| if p.fits(n) { | |
| p.used += n | |
| return true | |
| } | |
| return false | |
| } | |
| // Acquire 占用 n 字节信用;信用不足时阻塞,直到他方 Release 释放出空间。 | |
| func (p *Pool) Acquire(n int64) { | |
| requireNonNegative(n) | |
| p.mu.Lock() | |
| defer p.mu.Unlock() | |
| for !p.fits(n) { | |
| p.cond.Wait() | |
| } | |
| p.used += n | |
| } | |
| // Release 归还 n 字节信用并唤醒所有等待者。 | |
| func (p *Pool) Release(n int64) { | |
| requireNonNegative(n) | |
| p.mu.Lock() | |
| p.used -= n | |
| if p.used < 0 { | |
| p.used = 0 | |
| } | |
| p.mu.Unlock() | |
| p.cond.Broadcast() | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/credit/credit.go` around lines 29 - 58, Ensure the exported API rejects
negative amounts: in TryAcquire(n int64) return false immediately for n <= 0
(treat 0 as no-op), and in Acquire(n int64) and Release(n int64) validate n >= 0
at the top and panic on n < 0 (treat n == 0 as a no-op) so callers cannot
corrupt Pool.used; add these guards to the start of Pool.TryAcquire,
Pool.Acquire and Pool.Release (keep existing locking/cond logic unchanged and
use Pool.fits as before).
| // acquireCredit 为本次写入预占 n 字节信用;不足时先触发刷盘以归还信用,再阻塞等待。 | ||
| func (m *MemTable) acquireCredit(n int64) { | ||
| if m.credits.TryAcquire(n) { | ||
| return | ||
| } | ||
| m.StartFlush() // 确保有 flush 在路上来归还信用,避免永久阻塞 | ||
| m.credits.Acquire(n) | ||
| } |
There was a problem hiding this comment.
Make blocked credit waits shutdown-aware.
With the new blocking Acquire() path, Close() can strand writers forever: once stopCh is closed, FlushWorker may exit before servicing a pending flush, and any Put/Delete already waiting in acquireCredit() has no way to unblock.
Also applies to: 309-311, 361-368
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@storage/zstorage/memtable.go` around lines 189 - 196, The blocking path in
MemTable.acquireCredit can deadlock writers during shutdown; change
acquireCredit so the blocking wait is shutdown-aware: instead of calling
m.credits.Acquire(n) unconditionally, perform a cancellable wait that selects on
credit availability and m.stopCh (or use a context-aware Acquire if available),
and return an error when stopCh is closed; update callers (Put/Delete call sites
referenced around the blocks at the Put/Delete flows) to handle the new error
return (propagate or abort the operation) and ensure StartFlush() is still
invoked before entering the cancellable wait.
承接 M1 发现的「MemTable 写入无背压 → 内存随负载无界增长」。
改动
pkg/credit:可复用字节信用池(令牌桶式背压),含单测(阻塞/解除、超大单条放行、预算关闭、对账归零)。byteSize(覆盖写按增量,对账防泄漏)。MemTableMaxInflightBytes(默认 64MiB,<=0关闭)。-budget开关与 inflight 采样。验证(让 MemTable 成为瓶颈)
诚实边界
默认小值配置下按条数刷盘早于字节预算触发,背压平时不绑定——它是内存硬上限/安全网,不解决 M1 小值高频下 SSTable 元数据累积导致的增长(另一条线,M3 候选)。详见
docs-step/M2-backpressure-result.md。🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
-budgetflag to enable/disable memory backpressure for inflight data (default: 64MiB)Documentation