diff --git a/benchmark/ingest/main.go b/benchmark/ingest/main.go new file mode 100644 index 0000000..338f33a --- /dev/null +++ b/benchmark/ingest/main.go @@ -0,0 +1,319 @@ +// 命令 ingest 是 M1 的 A1 层压测:进程内直接驱动 storage.Engine,证明存储引擎 +// 在内存封顶下扛得住高频顺序写。分两相: +// - 饱和相(闭环打满):找写吞吐天花板 Rmax + 内存峰值。 +// - 开环相(定速率):在若干 IMU 速率档下证「定速率不丢帧」+ 尾延迟 + 内存封顶。 +// +// 注意边界:存储引擎层无 WAL(已被移除),未 flush 的 MemTable 数据易失, +// 故本程序「不」测崩溃恢复——崩溃 0 丢失是 Raft 全链路(A2)的属性。 +package main + +import ( + "flag" + "fmt" + "os" + "path/filepath" + "runtime" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/NeverENG/BanDB/config" + "github.com/NeverENG/BanDB/storage" + "github.com/NeverENG/BanDB/storage/zstorage" +) + +func main() { + rates := flag.String("rates", "1000,10000,50000,200000", "开环采集速率档位 (Hz),逗号分隔") + dur := flag.Duration("d", 10*time.Second, "每相/每档的持续时间") + satDur := flag.Duration("sat", 10*time.Second, "饱和相持续时间;0 跳过") + valueSize := flag.Int("vs", 64, "IMU 样本 value 字节数") + qDepth := flag.Int("qdepth", 1024, "有界队列深度;满即记为丢帧") + memTableSize := flag.Int("memtable", 4096, "MaxMemTableSize(active 表条目阈值,调小以强制频繁 flush 验内存封顶)") + flag.Parse() + + rateList, err := parseRates(*rates) + if err != nil { + fmt.Fprintf(os.Stderr, "invalid -rates: %v\n", err) + os.Exit(1) + } + + fmt.Println("========================================") + fmt.Println(" BanDB Ingest Benchmark (A1 · engine)") + fmt.Println("========================================") + fmt.Printf(" Value size: %d bytes\n", *valueSize) + fmt.Printf(" Queue depth: %d\n", *qDepth) + fmt.Printf(" MemTable cap: %d entries\n", *memTableSize) + fmt.Printf(" Duration: sat=%s open-loop=%s/rate\n", *satDur, *dur) + fmt.Printf(" Open rates: %v Hz\n", rateList) + fmt.Println("========================================") + fmt.Println() + + // 1) 饱和相:闭环打满,找吞吐天花板。-sat 0 可跳过(避免与开环相互相污染内存测量)。 + var sat Result + hasSat := *satDur > 0 + if hasSat { + sat = runSaturation(*satDur, *valueSize, *memTableSize) + fmt.Printf("[Saturation] throughput=%.0f writes/s mean=%s heap_peak=%s sys_peak=%s\n\n", + sat.Throughput, lat(sat.MeanLat), mib(sat.HeapPeak), mib(sat.SysPeak)) + } + + // 2) 开环相:各速率档证 0 丢帧 + 尾延迟 + 内存封顶。 + results := make([]Result, 0, len(rateList)) + for _, r := range rateList { + res := runOpenLoop(r, *dur, *valueSize, *qDepth, *memTableSize) + results = append(results, res) + printResult(res) + } + + printTable(hasSat, sat, results) +} + +// Result 单相/单档的压测结果 +type Result struct { + Label string // "saturated" 或 "Hz" + RateHz int // 开环目标速率;饱和相为 0 + Duration time.Duration + Produced int64 // 应投递样本数(饱和相 = 写入数) + Dropped int64 // 队列满导致的丢帧数 + Written int64 // 实际写入引擎的样本数 + Throughput float64 // 实际写入吞吐 (writes/sec) + MeanLat time.Duration // 饱和相用:1/throughput + P50, P99, P999, P9999, Max time.Duration + HeapPeak uint64 // HeapAlloc 峰值 (bytes) + SysPeak uint64 // Sys 峰值 (bytes) +} + +// setupEngine 指向临时目录并以小 memtable 创建引擎,返回引擎与清理函数。 +func setupEngine(memTableSize int) (*storage.Engine, *zstorage.MemTable, func()) { + tmp, err := os.MkdirTemp("", "bandb-ingest-*") + if err != nil { + panic(err) + } + config.G.SSTablePath = tmp + config.G.WALPath = filepath.Join(tmp, "wal.log") + config.G.MaxMemTableSize = memTableSize + + memTable := zstorage.NewMemTable() + engine := storage.NewEngine(memTable) + cleanup := func() { + _ = memTable.Close() + os.RemoveAll(tmp) + } + return engine, memTable, cleanup +} + +// memSampler 每 100ms 采样一次 MemStats,返回停止函数(调用后回填峰值)。 +func memSampler(heapPeak, sysPeak *uint64) func() { + stop := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + var ms runtime.MemStats + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + runtime.ReadMemStats(&ms) + if ms.HeapAlloc > *heapPeak { + *heapPeak = ms.HeapAlloc + } + if ms.Sys > *sysPeak { + *sysPeak = ms.Sys + } + case <-stop: + return + } + } + }() + return func() { + close(stop) + wg.Wait() + } +} + +// runSaturation 闭环打满:不做 per-op 计时(避免计时器开销污染亚微秒写), +// 只数总量得到吞吐天花板,平均延迟由 1/throughput 推导。 +func runSaturation(dur time.Duration, valueSize, memTableSize int) Result { + engine, _, cleanup := setupEngine(memTableSize) + defer cleanup() + + runtime.GC() // 清掉上一轮残留,使本轮 heap 峰值只反映本轮活对象 + var heapPeak, sysPeak uint64 + stopMem := memSampler(&heapPeak, &sysPeak) + + var written, seq int64 + start := time.Now() + deadline := start.Add(dur) + for time.Now().Before(deadline) { + // 每 1024 次才查一次时钟,降低 time.Now 开销对吞吐的干扰。 + for i := 0; i < 1024; i++ { + key := []byte(fmt.Sprintf("imu:dev0:%020d", seq)) + seq++ + value := make([]byte, valueSize) // 每条独立 value,保证内存计量真实 + _ = engine.Put(key, value) + written++ + } + } + elapsed := time.Since(start) + stopMem() + + tput := float64(written) / elapsed.Seconds() + var mean time.Duration + if tput > 0 { + mean = time.Duration(float64(time.Second) / tput) + } + return Result{ + Label: "saturated", + Duration: elapsed, + Produced: written, + Written: written, + Throughput: tput, + MeanLat: mean, + HeapPeak: heapPeak, + SysPeak: sysPeak, + } +} + +// runOpenLoop 开环定速率:生产者按固定速率非阻塞投递,队列满即丢帧; +// 消费者单写入者,per-op 计时(此处速率有界,计时开销可忽略),得到尾延迟。 +func runOpenLoop(rateHz int, dur time.Duration, valueSize, qDepth, memTableSize int) Result { + engine, _, cleanup := setupEngine(memTableSize) + defer cleanup() + + runtime.GC() // 清掉上一轮残留,使本轮 heap 峰值只反映本轮活对象 + var heapPeak, sysPeak uint64 + stopMem := memSampler(&heapPeak, &sysPeak) + + type sample struct{ key, value []byte } + q := make(chan sample, qDepth) + + var produced, dropped int64 + go func() { + interval := time.Duration(int64(time.Second) / int64(rateHz)) + next := time.Now() + deadline := time.Now().Add(dur) + var seq int64 + for time.Now().Before(deadline) { + key := []byte(fmt.Sprintf("imu:dev0:%020d", seq)) + seq++ + produced++ + value := make([]byte, valueSize) // 每条独立 value + select { + case q <- sample{key: key, value: value}: + default: + dropped++ // 队列满 = 引擎跟不上 = 丢帧 + } + next = next.Add(interval) + if d := time.Until(next); d > 0 { + time.Sleep(d) + } + } + close(q) + }() + + latencies := make([]time.Duration, 0, rateHz*int(dur/time.Second)+1024) + var written int64 + start := time.Now() + for s := range q { + t0 := time.Now() + _ = engine.Put(s.key, s.value) + latencies = append(latencies, time.Since(t0)) + written++ + } + elapsed := time.Since(start) + stopMem() + + sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] }) + return Result{ + Label: fmt.Sprintf("%dHz", rateHz), + RateHz: rateHz, + Duration: elapsed, + Produced: produced, + Dropped: dropped, + Written: written, + Throughput: float64(written) / elapsed.Seconds(), + P50: pct(latencies, 0.50), + P99: pct(latencies, 0.99), + P999: pct(latencies, 0.999), + P9999: pct(latencies, 0.9999), + Max: pct(latencies, 1.0), + HeapPeak: heapPeak, + SysPeak: sysPeak, + } +} + +func pct(sorted []time.Duration, p float64) time.Duration { + if len(sorted) == 0 { + return 0 + } + idx := int(float64(len(sorted)) * p) + if idx >= len(sorted) { + idx = len(sorted) - 1 + } + return sorted[idx] +} + +func printResult(r Result) { + fmt.Printf("--- %s ---\n", r.Label) + fmt.Printf(" produced=%d written=%d dropped=%d\n", r.Produced, r.Written, r.Dropped) + fmt.Printf(" throughput=%.0f writes/s p99.9=%s p99.99=%s max=%s\n", + r.Throughput, lat(r.P999), lat(r.P9999), lat(r.Max)) + fmt.Printf(" heap_peak=%s sys_peak=%s\n", mib(r.HeapPeak), mib(r.SysPeak)) + fmt.Println() +} + +func printTable(hasSat bool, sat Result, rs []Result) { + fmt.Println("========================================") + fmt.Println(" Summary") + fmt.Println("========================================") + if hasSat { + fmt.Printf(" ceiling (saturated): %.0f writes/s heap_peak=%s\n", sat.Throughput, mib(sat.HeapPeak)) + } + fmt.Println(" --- open-loop (fixed rate) ---") + fmt.Printf(" %-10s %-12s %-9s %-9s %-9s %-10s\n", + "rate", "throughput", "dropped", "p99.9", "max", "heap_peak") + for _, r := range rs { + fmt.Printf(" %-10s %-12.0f %-9d %-9s %-9s %-10s\n", + r.Label, r.Throughput, r.Dropped, lat(r.P999), lat(r.Max), mib(r.HeapPeak)) + } + fmt.Println("========================================") + fmt.Println(" Durability: N/A at engine layer (no storage WAL — see A2 / Raft path)") + fmt.Println("========================================") +} + +func parseRates(s string) ([]int, error) { + parts := strings.Split(s, ",") + out := make([]int, 0, len(parts)) + for _, p := range parts { + v, err := strconv.Atoi(strings.TrimSpace(p)) + if err != nil { + return nil, err + } + if v <= 0 { + return nil, fmt.Errorf("rate must be > 0, got %d", v) + } + out = append(out, v) + } + return out, nil +} + +func lat(d time.Duration) string { + switch { + case d < time.Microsecond: + return fmt.Sprintf("%dns", d.Nanoseconds()) + case d < time.Millisecond: + return fmt.Sprintf("%.1fus", float64(d.Nanoseconds())/1000) + case d < time.Second: + return fmt.Sprintf("%.2fms", float64(d.Nanoseconds())/1e6) + default: + return fmt.Sprintf("%.2fs", d.Seconds()) + } +} + +func mib(b uint64) string { + return fmt.Sprintf("%.1fMiB", float64(b)/(1024*1024)) +} diff --git a/docs-step/M1-ingest-benchmark-plan.md b/docs-step/M1-ingest-benchmark-plan.md new file mode 100644 index 0000000..b3056e7 --- /dev/null +++ b/docs-step/M1-ingest-benchmark-plan.md @@ -0,0 +1,77 @@ +# M1 · IMU 高频摄入压测 —— 方案 + +> 目标:用一个能跑出数字的 demo,证明 BanDB 的存储地基扛得住「高频、定速率、内存受限」的传感器摄入。 +> 产出五个指标:**写吞吐 / p99 写延迟 / 内存峰值 / 丢帧数=0 / WAL 重启可恢复**。 +> 方案确认后再写代码(遵循 CLAUDE.md「方案先行」)。 + +--- + +## 核心模型:开环(open-loop),不是闭环 + +现有 `benchmark/`(TCP 客户端)和 `storage/bench_test.go`(`b.N` 微基准)都是**闭环**:循环里"做完一个立刻做下一个",测的是"最多能跑多快"。 + +但 IMU/相机的本质是**固定速率吐数据**(如 1kHz),不管 DB 多快。所以要证"0 丢帧",必须用**开环**模型: + +``` +生产者 ──以固定速率 R 投递──▶ 有界队列(深度 D) ──▶ 消费者(写入 BanDB) + │ + 队列满 = DB 跟不上 = 丢帧(+1) +``` + +- **生产者**:按 `ticker(1/R)` 节奏生成 IMU 样本,投进有界 channel;channel 满则 `dropped++`(这就是"丢帧"的精确定义)。 +- **消费者**:从 channel 取出写入引擎,记录每次写延迟。 +- **判定**:在速率 R 下跑 T 秒,`dropped == 0` 即"该速率下不丢帧"。 + +配套两步跑法: +1. **找天花板(闭环)**:先打满,得到"最大可持续写入速率 Rmax"。 +2. **证 0 丢帧(开环)**:在 R = 50%~70% × Rmax 下定速跑,证明 `dropped=0` 且 p99 延迟有界。 + +--- + +## ★ 唯一要你拍板的岔路:M1 测哪一层? + +| 选项 | 测什么 | 优点 | 缺点 | 我的建议 | +|---|---|---|---|---| +| **A1 引擎内压测**(进程内直接驱动 `storage.Engine`) | LSM 存储引擎本身 | 内存测量干净(同进程 `runtime.ReadMemStats`)、可封顶(`GOMEMLIMIT`)、WAL 重启可恢复就是 close→reopen→scan、无网络/Raft 噪声、确定性强 | 不覆盖 BanNet + Raft 全链路 | **✅ M1 选它** | +| **A2 全链路压测**(扩展 `benchmark/` 走 TCP→Raft→LSM) | 端到端真实路径 | 测的是真部署 | 服务端内存难干净测量、单节点 Raft 每写一次 fsync 引入噪声、变量多 | 作为 **M1.5 后续** | +| A3 两个都做 | — | 全 | M1 拖长 | 不在 M1 | + +**理由**:M1 要证的"LSM 扛高频顺序写 / 内存封顶 / 0 丢帧 / WAL 可恢复"**全是存储引擎属性**;内存封顶+测量只有进程内才干净;WAL 恢复是引擎操作。先把**地基**这层钉死、拿到干净数字,再用 A2 证全链路。 + +> **默认推进 A1**。若你要 M1 直接证全链路,告诉我改走 A2。 + +--- + +## 关键发现:耐久性在 Raft 层,不在存储层(决定指标归属) + +读代码确认:commit `删除未接入写路径的 storage WAL` 之后,**存储引擎层已无 WAL**。MemTable 数据仅在 flush 成 SSTable 后落盘,未 flush 的 MemTable 数据**易失**。真正的崩溃恢复在 **Raft 层**(`Raft/raft_wal.go` group-commit fsync + 重启 `LoadLogs` 重放 → `service/fsm.go` 应用到引擎)。 + +因此五个指标**按层归属**,不混淆: + +| 指标 | A1 引擎内 | A2 全链路(Raft) | +|---|---|---| +| 写吞吐 | ✅ | ✅ | +| p99 写延迟 | ✅ | ✅ | +| 内存峰值(封顶下不无限涨) | ✅(同进程 MemStats) | △(服务端,难干净测) | +| 定速率 0 丢帧 | ✅ | ✅ | +| **WAL 重启可恢复(崩溃 0 丢失)** | ❌ 引擎无 WAL,做不到 | ✅ **只能这里证** | + +A1 明确打印 `Durability: N/A at engine layer (no storage WAL — see A2)`,不含糊其辞。 + +## 已定的默认(无需你决策,除非你想改) + +- **IMU 数据形状**:key = `imu::`(约 24B,时间戳前缀 → 顺序写 + 天然 range scan);value = 64B(accel/gyro/mag 9×float32 + 头)。 +- **速率档位**:200Hz / 1kHz / 5kHz 三档跑(覆盖典型 IMU 到激进档)。 +- **内存封顶**:`GOMEMLIMIT` + 调小 `config.G.MaxMemTableSize` 强制频繁 flush,验证内存不随时长无限涨。 +- **内存峰值采集**:后台每 100ms `runtime.ReadMemStats`,记录 `HeapAlloc` 峰值与 `Sys`。 +- **WAL 重启可恢复**:摄入 N 条 → 关闭引擎 → 重开 → 全量 scan,断言可读条数 == 已 ack 条数(0 丢失)。 +- **代码落点**:新建 `benchmark/ingest/`(独立 `package main`,不动现有 `benchmark/` 与 `storage/bench_test.go`)。 +- **输出**:跑完把五个指标 + 各速率档表格写进本目录 `M1-ingest-benchmark-result.md`。 + +--- + +## 交付物(M1 完成时) + +1. `benchmark/ingest/` 开环压测程序(A1)。 +2. `docs-step/M1-ingest-benchmark-result.md`:三档速率 × 五指标的真实数据表 + 一句结论。 +3. 一条可复现命令(写进 result 文档)。 diff --git a/docs-step/M1-ingest-benchmark-result.md b/docs-step/M1-ingest-benchmark-result.md new file mode 100644 index 0000000..7fce910 --- /dev/null +++ b/docs-step/M1-ingest-benchmark-result.md @@ -0,0 +1,80 @@ +# M1 · IMU 高频摄入压测 —— 结果(A1 引擎层) + +> 对应方案 [`M1-ingest-benchmark-plan.md`](./M1-ingest-benchmark-plan.md)。本页是 A1(进程内引擎层)的真实数据与诚实结论。A2(Raft 全链路 / 崩溃恢复)待补。 +> +> ⚠️ 短跑(6s)一度让人以为"现实速率下内存有界",但 60s 持续性复测推翻了它。下面的结论是复测后的版本。 + +## 环境 +- Go 1.26.1, windows/amd64,本机 `go run`,单写入者。 +- value=64B(每条独立分配),key=`imu:dev0:<20位序号>`(时间戳序前缀)。 +- `MaxMemTableSize=4096` 条目(强制频繁 flush),默认开环队列深度 `qdepth=1024`。 +- 饱和相与开环相分两次独立进程跑,避免内存测量互相污染。 + +## 核心结论(诚实版) + +1. **峰值写速率 ~2.0M w/s,但这是"内存峰值写速率",不是可持续吞吐。** 打满时 flush 彻底跟不上,几乎全部数据堆在内存,heap 飙到 **1.5 GiB**。 +2. **内存在所有测试速率下都随时间持续增长,60s 内无平稳态。** 同一速率,6s vs 60s 的 heap 峰值: + + | 速率 | heap @6s | heap @60s | + |---|---|---| + | 50 kHz | 10.3 MiB | 62.3 MiB | + | 200 kHz | 29.5 MiB | 239.7 MiB | + + → **"内存封顶"在任何持续负载下都不成立**(详见下方"真问题")。所谓"可持续速率"此前是 6 秒的错觉。 +3. **丢帧是队列深度 / Windows 计时毛刺的产物,不是引擎极限。** 500 kHz 在 `qdepth=1024` 下丢 1,188 帧,把 `qdepth` 调到 65536 后**丢帧归零**(6s,写入 300 万)。100 kHz 的 111 丢帧同理。 +4. **现实规模下短期可用**:≤200 kHz(≈200 个传感器各 1 kHz)下可数分钟 0 丢帧、尾延迟亚毫秒——但内存会一路涨到最终 OOM。够 demo,不够生产。 + +## 复现命令 +```powershell +# 饱和相(峰值写速率) +go run ./benchmark/ingest/ -sat 8s -d 1s -rates 1000 +# 开环相(定速率 6s) +go run ./benchmark/ingest/ -sat 0 -d 6s -rates 1000,10000,50000,200000,500000 +# 持续性复测(关键:证明内存随时间增长) +go run ./benchmark/ingest/ -sat 0 -d 60s -rates 50000,100000,200000 +# 丢帧归因(大队列下 500kHz 丢帧归零) +go run ./benchmark/ingest/ -sat 0 -d 6s -rates 500000 -qdepth 65536 +``` + +## 数据明细 + +### 饱和相(闭环打满,8s) +| 指标 | 值 | +|---|---| +| 峰值写速率 | **2,031,789 w/s** | +| 平均单写 | 492 ns | +| heap 峰值 | **1,507 MiB(无界)** | + +### 开环相(定速率 6s,qdepth=1024) +| 速率 | 实际吞吐 (w/s) | 丢帧 | max 延迟 | heap 峰值 | +|---|---|---|---|---| +| 1 kHz | 1,000 | 0 | 544 µs | 2.9 MiB | +| 10 kHz | 9,999 | 0 | 307 µs | 4.8 MiB | +| 50 kHz | 50,000 | 0 | 640 µs | 10.3 MiB | +| 200 kHz | 200,053 | 0 | 526 µs | 29.5 MiB | +| 500 kHz | 500,128 | 1,188 *(队列/计时毛刺)* | 615 µs | 70.7 MiB | + +### 持续性复测(60s) +| 速率 | 写入 | 丢帧 | heap 峰值 | +|---|---|---|---| +| 50 kHz | 3,000,000 | 0 | 62.3 MiB | +| 100 kHz | 6,000,000 | 111 *(毛刺)* | 134.6 MiB | +| 200 kHz | 12,000,000 | 0 | 239.7 MiB | + +### 丢帧归因(500 kHz, qdepth=65536, 6s) +写入 2,999,668,**丢帧 0**,heap 62.1 MiB → 证明丢帧来自队列容量/突发,不是引擎。 + +## 发现的真问题:MemTable 写入无背压 + 元数据累积 + +- `StartFlush()` 是**非阻塞**信号,`Put()` **永不阻塞**;双缓冲只有 active + dirty 两块。 +- 当写入速率 > flush 写盘速率时,active 表**无界膨胀**,内存只受 `写入速率 × flush 耗时` 约束,**与 `MaxMemTableSize` 无关**。 +- 60s 复测显示:即便 50–200 kHz,内存也随累计数据量持续增长,未见稳态。根因疑为两部分叠加,需 pprof heap 进一步定位: + 1. 未刷盘 MemTable 积压(无背压); + 2. SSTable 元数据 / bloom 过滤器随文件数累积(compaction 未能及时归并)。 + +→ **下一项存储改进**:给写路径加背压(未刷盘积压超硬上限时 `Put` 阻塞直到 flush 追上),并核查 compaction 是否跟得上文件增长。修好后跑同样的 60s 复测,附"加背压前后内存随时间曲线","内存封顶"才算坐实。 + +## 测量局限 +- **p99 在 Windows 上 ≈ 0ns,且这很可能是真实分布**:flush 停顿仅命中约 1/4096 次写(<0.025%),所以 p99/p99.9/p99.99 仍落在亚微秒主体里,只有 `max` 捕捉到停顿(亚毫秒)。要更细的尾部需 Linux + 更细计时复测。 +- **单写入者**:MemTable 内部 RWMutex 串行化,多写入者不提升单表吞吐,故未测并发写入者。 +- **producer 在高速率下非平滑**:Windows `time.Sleep` 无法 µs 级精确节拍,高速档生产者呈突发,故"丢帧门槛"反映的是"qdepth=1024 下的突发容忍度",非引擎吞吐极限。