Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 57 additions & 47 deletions benchmark/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ func main() {
valueSize := flag.Int("vs", 64, "IMU 样本 value 字节数")
qDepth := flag.Int("qdepth", 1024, "有界队列深度;满即记为丢帧")
memTableSize := flag.Int("memtable", 4096, "MaxMemTableSize(active 表条目阈值,调小以强制频繁 flush 验内存封顶)")
budget := flag.Int64("budget", 64<<20, "MemTableMaxInflightBytes 字节预算(令牌桶背压);0 关闭背压")
flag.Parse()

config.G.MemTableMaxInflightBytes = *budget // 由命令行覆盖,便于 before/after 对比
Comment on lines +34 to +37

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don't clobber the configured inflight budget when -budget is omitted.

This flag defaults to a hard-coded 64<<20, and Line 37 writes it back unconditionally, so any MemTableMaxInflightBytes value loaded from config is lost unless the caller passes -budget explicitly.

Suggested fix
-	budget := flag.Int64("budget", 64<<20, "MemTableMaxInflightBytes 字节预算(令牌桶背压);0 关闭背压")
+	budget := flag.Int64("budget", config.G.MemTableMaxInflightBytes, "MemTableMaxInflightBytes 字节预算(令牌桶背压);0 关闭背压")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@benchmark/ingest/main.go` around lines 34 - 37, The code always overwrites
config.G.MemTableMaxInflightBytes with a hard-coded default (64<<20) when the
-budget flag is not provided; change the flag default to the current config
value so omission preserves configured settings. Specifically, when creating the
flag (flag.Int64/flag.Int64Var for variable budget) use
config.G.MemTableMaxInflightBytes as the default instead of 64<<20, then after
flag.Parse() assign config.G.MemTableMaxInflightBytes = *budget only if the flag
was actually parsed (or simply rely on the flag default matching the config so
the assignment is safe). Reference symbols: budget, flag.Int64/flag.Int64Var,
flag.Parse, config.G.MemTableMaxInflightBytes.


rateList, err := parseRates(*rates)
if err != nil {
fmt.Fprintf(os.Stderr, "invalid -rates: %v\n", err)
Expand All @@ -45,6 +48,7 @@ func main() {
fmt.Printf(" Value size: %d bytes\n", *valueSize)
fmt.Printf(" Queue depth: %d\n", *qDepth)
fmt.Printf(" MemTable cap: %d entries\n", *memTableSize)
fmt.Printf(" Inflight budget: %d bytes (0=背压关闭)\n", *budget)
fmt.Printf(" Duration: sat=%s open-loop=%s/rate\n", *satDur, *dur)
fmt.Printf(" Open rates: %v Hz\n", rateList)
fmt.Println("========================================")
Expand All @@ -55,8 +59,8 @@ func main() {
hasSat := *satDur > 0
if hasSat {
sat = runSaturation(*satDur, *valueSize, *memTableSize)
fmt.Printf("[Saturation] throughput=%.0f writes/s mean=%s heap_peak=%s sys_peak=%s\n\n",
sat.Throughput, lat(sat.MeanLat), mib(sat.HeapPeak), mib(sat.SysPeak))
fmt.Printf("[Saturation] throughput=%.0f writes/s mean=%s inflight_peak=%s heap_peak=%s\n\n",
sat.Throughput, lat(sat.MeanLat), mib(sat.InflightPeak), mib(sat.HeapPeak))
}

// 2) 开环相:各速率档证 0 丢帧 + 尾延迟 + 内存封顶。
Expand All @@ -72,17 +76,18 @@ func main() {

// Result 单相/单档的压测结果
type Result struct {
Label string // "saturated" 或 "<rate>Hz"
RateHz int // 开环目标速率;饱和相为 0
Duration time.Duration
Produced int64 // 应投递样本数(饱和相 = 写入数)
Dropped int64 // 队列满导致的丢帧数
Written int64 // 实际写入引擎的样本数
Throughput float64 // 实际写入吞吐 (writes/sec)
MeanLat time.Duration // 饱和相用:1/throughput
Label string // "saturated" 或 "<rate>Hz"
RateHz int // 开环目标速率;饱和相为 0
Duration time.Duration
Produced int64 // 应投递样本数(饱和相 = 写入数)
Dropped int64 // 队列满导致的丢帧数
Written int64 // 实际写入引擎的样本数
Throughput float64 // 实际写入吞吐 (writes/sec)
MeanLat time.Duration // 饱和相用:1/throughput
P50, P99, P999, P9999, Max time.Duration
HeapPeak uint64 // HeapAlloc 峰值 (bytes)
SysPeak uint64 // Sys 峰值 (bytes)
HeapPeak uint64 // HeapAlloc 峰值 (bytes)
SysPeak uint64 // Sys 峰值 (bytes)
InflightPeak uint64 // 未刷盘字节信用峰值 (bytes)
}

// setupEngine 指向临时目录并以小 memtable 创建引擎,返回引擎与清理函数。
Expand All @@ -104,8 +109,8 @@ func setupEngine(memTableSize int) (*storage.Engine, *zstorage.MemTable, func())
return engine, memTable, cleanup
}

// memSampler 每 100ms 采样一次 MemStats,返回停止函数(调用后回填峰值)。
func memSampler(heapPeak, sysPeak *uint64) func() {
// memSampler 每 100ms 采样一次堆内存与未刷盘字节信用,返回停止函数(调用后回填峰值)。
func memSampler(mt *zstorage.MemTable, heapPeak, sysPeak, inflightPeak *uint64) func() {
stop := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -124,6 +129,9 @@ func memSampler(heapPeak, sysPeak *uint64) func() {
if ms.Sys > *sysPeak {
*sysPeak = ms.Sys
}
if inflight := uint64(mt.InflightBytes()); inflight > *inflightPeak {
*inflightPeak = inflight
}
case <-stop:
return
}
Expand All @@ -138,12 +146,12 @@ func memSampler(heapPeak, sysPeak *uint64) func() {
// runSaturation 闭环打满:不做 per-op 计时(避免计时器开销污染亚微秒写),
// 只数总量得到吞吐天花板,平均延迟由 1/throughput 推导。
func runSaturation(dur time.Duration, valueSize, memTableSize int) Result {
engine, _, cleanup := setupEngine(memTableSize)
engine, memTable, cleanup := setupEngine(memTableSize)
defer cleanup()

runtime.GC() // 清掉上一轮残留,使本轮 heap 峰值只反映本轮活对象
var heapPeak, sysPeak uint64
stopMem := memSampler(&heapPeak, &sysPeak)
var heapPeak, sysPeak, inflightPeak uint64
stopMem := memSampler(memTable, &heapPeak, &sysPeak, &inflightPeak)

var written, seq int64
start := time.Now()
Expand All @@ -167,26 +175,27 @@ func runSaturation(dur time.Duration, valueSize, memTableSize int) Result {
mean = time.Duration(float64(time.Second) / tput)
}
return Result{
Label: "saturated",
Duration: elapsed,
Produced: written,
Written: written,
Throughput: tput,
MeanLat: mean,
HeapPeak: heapPeak,
SysPeak: sysPeak,
Label: "saturated",
Duration: elapsed,
Produced: written,
Written: written,
Throughput: tput,
MeanLat: mean,
HeapPeak: heapPeak,
SysPeak: sysPeak,
InflightPeak: inflightPeak,
}
}

// runOpenLoop 开环定速率:生产者按固定速率非阻塞投递,队列满即丢帧;
// 消费者单写入者,per-op 计时(此处速率有界,计时开销可忽略),得到尾延迟。
func runOpenLoop(rateHz int, dur time.Duration, valueSize, qDepth, memTableSize int) Result {
engine, _, cleanup := setupEngine(memTableSize)
engine, memTable, cleanup := setupEngine(memTableSize)
defer cleanup()

runtime.GC() // 清掉上一轮残留,使本轮 heap 峰值只反映本轮活对象
var heapPeak, sysPeak uint64
stopMem := memSampler(&heapPeak, &sysPeak)
var heapPeak, sysPeak, inflightPeak uint64
stopMem := memSampler(memTable, &heapPeak, &sysPeak, &inflightPeak)

type sample struct{ key, value []byte }
q := make(chan sample, qDepth)
Expand Down Expand Up @@ -229,20 +238,21 @@ func runOpenLoop(rateHz int, dur time.Duration, valueSize, qDepth, memTableSize

sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] })
return Result{
Label: fmt.Sprintf("%dHz", rateHz),
RateHz: rateHz,
Duration: elapsed,
Produced: produced,
Dropped: dropped,
Written: written,
Throughput: float64(written) / elapsed.Seconds(),
P50: pct(latencies, 0.50),
P99: pct(latencies, 0.99),
P999: pct(latencies, 0.999),
P9999: pct(latencies, 0.9999),
Max: pct(latencies, 1.0),
HeapPeak: heapPeak,
SysPeak: sysPeak,
Label: fmt.Sprintf("%dHz", rateHz),
RateHz: rateHz,
Duration: elapsed,
Produced: produced,
Dropped: dropped,
Written: written,
Throughput: float64(written) / elapsed.Seconds(),
P50: pct(latencies, 0.50),
P99: pct(latencies, 0.99),
P999: pct(latencies, 0.999),
P9999: pct(latencies, 0.9999),
Max: pct(latencies, 1.0),
HeapPeak: heapPeak,
SysPeak: sysPeak,
InflightPeak: inflightPeak,
}
}

Expand All @@ -262,7 +272,7 @@ func printResult(r Result) {
fmt.Printf(" produced=%d written=%d dropped=%d\n", r.Produced, r.Written, r.Dropped)
fmt.Printf(" throughput=%.0f writes/s p99.9=%s p99.99=%s max=%s\n",
r.Throughput, lat(r.P999), lat(r.P9999), lat(r.Max))
fmt.Printf(" heap_peak=%s sys_peak=%s\n", mib(r.HeapPeak), mib(r.SysPeak))
fmt.Printf(" inflight_peak=%s heap_peak=%s sys_peak=%s\n", mib(r.InflightPeak), mib(r.HeapPeak), mib(r.SysPeak))
fmt.Println()
}

Expand All @@ -274,11 +284,11 @@ func printTable(hasSat bool, sat Result, rs []Result) {
fmt.Printf(" ceiling (saturated): %.0f writes/s heap_peak=%s\n", sat.Throughput, mib(sat.HeapPeak))
}
fmt.Println(" --- open-loop (fixed rate) ---")
fmt.Printf(" %-10s %-12s %-9s %-9s %-9s %-10s\n",
"rate", "throughput", "dropped", "p99.9", "max", "heap_peak")
fmt.Printf(" %-10s %-12s %-9s %-9s %-13s %-10s\n",
"rate", "throughput", "dropped", "max", "inflight_peak", "heap_peak")
for _, r := range rs {
fmt.Printf(" %-10s %-12.0f %-9d %-9s %-9s %-10s\n",
r.Label, r.Throughput, r.Dropped, lat(r.P999), lat(r.Max), mib(r.HeapPeak))
fmt.Printf(" %-10s %-12.0f %-9d %-9s %-13s %-10s\n",
r.Label, r.Throughput, r.Dropped, lat(r.Max), mib(r.InflightPeak), mib(r.HeapPeak))
}
fmt.Println("========================================")
fmt.Println(" Durability: N/A at engine layer (no storage WAL — see A2 / Raft path)")
Expand Down
43 changes: 24 additions & 19 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type GlobalConfig struct {
MaxMemTableSize int
MaxCompactionSize int

// MemTableMaxInflightBytes 未刷盘数据(active + 正在刷的 dirty)的字节预算。
// 字节级令牌桶背压:超出预算时写入阻塞,等 flush 归还信用。<=0 关闭背压。
MemTableMaxInflightBytes int64

TcpServer banIface.IServer
MaxConn int
MaxPackageSize uint32
Expand Down Expand Up @@ -81,25 +85,26 @@ func NewGlobalConfig() *GlobalConfig {
logDir := defaultLogDir()
global := &GlobalConfig{

Name: "Raft",
Port: 8080,
Host: "localhost",
Version: "1.0.0",
MaxConn: 1000,
MaxPackageSize: 1024,
WorkerPoolSize: 10,
MaxWorkerTaskLen: 10000,
MaxMsgChanLen: 100,
TcpServer: nil,
MaxMemTableP: 0.5,
MaxMemTableLevel: 32,
MaxMemTableSize: 1024,
WALPath: filepath.Join(logDir, "wal.log"),
SSTablePath: logDir,
Peers: []string{"localhost:8080"}, // 默认单节点
Me: 0, // 默认节点ID
RaftSnapshotThreshold: 1000, // 默认快照阈值
RaftSnapshotKeepEntries: 100, // 默认保留条目数
Name: "Raft",
Port: 8080,
Host: "localhost",
Version: "1.0.0",
MaxConn: 1000,
MaxPackageSize: 1024,
WorkerPoolSize: 10,
MaxWorkerTaskLen: 10000,
MaxMsgChanLen: 100,
TcpServer: nil,
MaxMemTableP: 0.5,
MaxMemTableLevel: 32,
MaxMemTableSize: 1024,
MemTableMaxInflightBytes: 64 << 20, // 64MiB 未刷盘字节预算
WALPath: filepath.Join(logDir, "wal.log"),
SSTablePath: logDir,
Peers: []string{"localhost:8080"}, // 默认单节点
Me: 0, // 默认节点ID
RaftSnapshotThreshold: 1000, // 默认快照阈值
RaftSnapshotKeepEntries: 100, // 默认保留条目数
}
global.Init()
global.ParseFlags()
Expand Down
50 changes: 50 additions & 0 deletions docs-step/M2-backpressure-result.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# M2 · 字节级令牌桶写背压 —— 设计与结果

> 承接 M1 发现的「MemTable 写入无背压 → 内存随负载无界增长」。本步给写路径加**字节级令牌桶背压**,并诚实标注它能解决什么、不能解决什么。

## 设计

- 新增可复用组件 `pkg/credit`:字节信用池 `Pool`(`Acquire`/`TryAcquire`/`Release`/`Used`)。信用不足时 `Acquire` 阻塞,直到持久化方 `Release` 归还。预算 `<=0` 关闭背压。
- `MemTable` 接入:
- `SkipList` 维护 `byteSize`(覆盖写按新旧 value 差值增量,防漂移)。
- `Put`/`Delete` 写入前 `acquireCredit(full)`:`TryAcquire` 失败则先 `StartFlush` 再阻塞 `Acquire`,避免永久阻塞。
- 覆盖写实际增量 < 预占额时,归还多占部分(信用与 `byteSize` 对账,不泄漏)。
- `Flush` 成功后 `Release(dirty.byteSize)` 归还信用并唤醒被阻塞的写。
- `Flush` 失败:保留 dirty(不丢数据)并重新触发刷盘重试;修掉了原 `Flush` 在失败后被下次刷盘覆盖 dirty 的隐患。
- 预算配置:`config.MemTableMaxInflightBytes`,默认 64MiB。

## 验证(A 方案:阻塞、0 丢帧)

### 背压确实绑定并精确封顶(关掉条数触发,让 MemTable 成为瓶颈)
`-memtable 1e8 -sat 5s`:

| | 吞吐 | inflight 峰值 | heap 峰值 |
|---|---|---|---|
| 背压关闭 `-budget 0` | 1,768,172 w/s | **784.2 MiB(无界)** | 2,014.7 MiB |
| 背压 `-budget 16MiB` | 1,225,599 w/s | **16.0 MiB(精确卡在预算)** | 173.4 MiB |

→ 未刷盘字节被**精确限制在 16.0MiB = 预算**,堆从 2GiB 降到 173MiB。代价是吞吐被限到 flush 速率(1.77M→1.23M)——**这正是背压的本意**:源跑赢 flush 时,限速写入而非撑爆内存。`pkg/credit` 有单测覆盖(阻塞/解除、超大单条放行、预算关闭、对账归零)。

### 复现命令
```powershell
go run ./benchmark/ingest/ -budget 0 -sat 5s -d 1s -rates 1000 -memtable 100000000
go run ./benchmark/ingest/ -budget 16777216 -sat 5s -d 1s -rates 1000 -memtable 100000000
```

## 诚实边界:背压不是 M1 那个增长的解药

默认小值配置(`MaxMemTableSize=4096`)下,**按条数刷盘(每 4096 条 ≈ 0.37MiB)远早于字节预算触发**,所以背压平时不绑定。实测 100k/200kHz 跑 60s:

| 速率 | 丢帧 | inflight 峰值 | heap 峰值 |
|---|---|---|---|
| 100 kHz | 0 | 0.4 MiB | 121 MiB |
| 200 kHz | 3,918 | 0.4 MiB | 265 MiB |

`inflight_peak` 仅 0.4MiB(≈一个 4096 条的表),**未刷盘量根本不大**。但 heap 仍涨到 265MiB——说明 **M1 看到的内存增长不在 MemTable,而在 SSTable 元数据 / bloom 过滤器随文件数累积**(12M 条 / 4096 ≈ 2900 个 SSTable,每个一份常驻 bloom + 索引)。

**结论**:
- 背压的角色是**内存硬上限 / 安全网**——当 value 很大或 `MaxMemTableSize` 很高、条数触发不及时,它兜底封顶(已证)。
- 它**不解决**小值高频下的内存增长,那是 SSTable 元数据/compaction 问题,是**下一条线**(M3 候选):核查 compaction 是否跟得上文件增长、bloom/索引常驻内存能否随 compaction 收敛。

## 200kHz 丢帧说明
开背压后 200kHz/60s 出现 3,918 丢帧,是因为消费者写入被背压/flush 节流,生产者有界队列(qdepth=1024)溢出——属预期的"源超过可持续速率"行为,非引擎崩溃。
66 changes: 66 additions & 0 deletions pkg/credit/credit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Package credit 提供字节级信用池(令牌桶式背压):写入方 Acquire 占用字节信用,
// 持久化方 Release 归还信用;信用不足时 Acquire 阻塞,从而把未持久化数据的内存占用
// 限制在预算之内。它本身与具体存储无关,可被任何"写入快、落盘慢"的路径复用。
package credit

import "sync"

// Pool 是一个按字节计量的阻塞式信用池。零值不可用,请用 New 构造。
type Pool struct {
mu sync.Mutex
cond *sync.Cond
budget int64 // <=0 表示不限制(背压关闭)
used int64
}

// New 构造一个预算为 budget 字节的信用池;budget <= 0 表示不限制。
func New(budget int64) *Pool {
p := &Pool{budget: budget}
p.cond = sync.NewCond(&p.mu)
return p
}

// fits 报告在已用 used 之上再占 n 是否允许(调用者须持锁)。
// 规则:预算关闭、或池空(保证至少一条能写入,避免超大单条永久阻塞)、或不超预算 → 允许。
func (p *Pool) fits(n int64) bool {
return p.budget <= 0 || p.used == 0 || p.used+n <= p.budget
}

// TryAcquire 不阻塞地尝试占用 n 字节信用,成功返回 true。
func (p *Pool) TryAcquire(n int64) bool {
p.mu.Lock()
defer p.mu.Unlock()
if p.fits(n) {
p.used += n
return true
}
return false
}

// Acquire 占用 n 字节信用;信用不足时阻塞,直到他方 Release 释放出空间。
func (p *Pool) Acquire(n int64) {
p.mu.Lock()
defer p.mu.Unlock()
for !p.fits(n) {
p.cond.Wait()
}
p.used += n
}

// Release 归还 n 字节信用并唤醒所有等待者。
func (p *Pool) Release(n int64) {
p.mu.Lock()
p.used -= n
if p.used < 0 {
p.used = 0
}
p.mu.Unlock()
p.cond.Broadcast()
Comment on lines +29 to +58

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject negative credit amounts at the API boundary.

TryAcquire(-n) / Acquire(-n) currently reduce used, and Release(-n) increases it, so any caller can corrupt the pool’s accounting through the exported API.

Suggested fix
+func requireNonNegative(n int64) {
+	if n < 0 {
+		panic("credit: negative amount")
+	}
+}
+
 // TryAcquire 不阻塞地尝试占用 n 字节信用,成功返回 true。
 func (p *Pool) TryAcquire(n int64) bool {
+	requireNonNegative(n)
 	p.mu.Lock()
 	defer p.mu.Unlock()
 	if p.fits(n) {
@@
 // Acquire 占用 n 字节信用;信用不足时阻塞,直到他方 Release 释放出空间。
 func (p *Pool) Acquire(n int64) {
+	requireNonNegative(n)
 	p.mu.Lock()
 	defer p.mu.Unlock()
 	for !p.fits(n) {
@@
 // Release 归还 n 字节信用并唤醒所有等待者。
 func (p *Pool) Release(n int64) {
+	requireNonNegative(n)
 	p.mu.Lock()
 	p.used -= n
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// TryAcquire 不阻塞地尝试占用 n 字节信用,成功返回 true。
func (p *Pool) TryAcquire(n int64) bool {
p.mu.Lock()
defer p.mu.Unlock()
if p.fits(n) {
p.used += n
return true
}
return false
}
// Acquire 占用 n 字节信用;信用不足时阻塞,直到他方 Release 释放出空间。
func (p *Pool) Acquire(n int64) {
p.mu.Lock()
defer p.mu.Unlock()
for !p.fits(n) {
p.cond.Wait()
}
p.used += n
}
// Release 归还 n 字节信用并唤醒所有等待者。
func (p *Pool) Release(n int64) {
p.mu.Lock()
p.used -= n
if p.used < 0 {
p.used = 0
}
p.mu.Unlock()
p.cond.Broadcast()
func requireNonNegative(n int64) {
if n < 0 {
panic("credit: negative amount")
}
}
// TryAcquire 不阻塞地尝试占用 n 字节信用,成功返回 true。
func (p *Pool) TryAcquire(n int64) bool {
requireNonNegative(n)
p.mu.Lock()
defer p.mu.Unlock()
if p.fits(n) {
p.used += n
return true
}
return false
}
// Acquire 占用 n 字节信用;信用不足时阻塞,直到他方 Release 释放出空间。
func (p *Pool) Acquire(n int64) {
requireNonNegative(n)
p.mu.Lock()
defer p.mu.Unlock()
for !p.fits(n) {
p.cond.Wait()
}
p.used += n
}
// Release 归还 n 字节信用并唤醒所有等待者。
func (p *Pool) Release(n int64) {
requireNonNegative(n)
p.mu.Lock()
p.used -= n
if p.used < 0 {
p.used = 0
}
p.mu.Unlock()
p.cond.Broadcast()
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/credit/credit.go` around lines 29 - 58, Ensure the exported API rejects
negative amounts: in TryAcquire(n int64) return false immediately for n <= 0
(treat 0 as no-op), and in Acquire(n int64) and Release(n int64) validate n >= 0
at the top and panic on n < 0 (treat n == 0 as a no-op) so callers cannot
corrupt Pool.used; add these guards to the start of Pool.TryAcquire,
Pool.Acquire and Pool.Release (keep existing locking/cond logic unchanged and
use Pool.fits as before).

}

// Used 返回当前已占用字节数。
func (p *Pool) Used() int64 {
p.mu.Lock()
defer p.mu.Unlock()
return p.used
}
Loading
Loading