Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 319 additions & 0 deletions benchmark/ingest/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
// 命令 ingest 是 M1 的 A1 层压测:进程内直接驱动 storage.Engine,证明存储引擎
// 在内存封顶下扛得住高频顺序写。分两相:
// - 饱和相(闭环打满):找写吞吐天花板 Rmax + 内存峰值。
// - 开环相(定速率):在若干 IMU 速率档下证「定速率不丢帧」+ 尾延迟 + 内存封顶。
//
// 注意边界:存储引擎层无 WAL(已被移除),未 flush 的 MemTable 数据易失,
// 故本程序「不」测崩溃恢复——崩溃 0 丢失是 Raft 全链路(A2)的属性。
package main

import (
"flag"
"fmt"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/NeverENG/BanDB/config"
"github.com/NeverENG/BanDB/storage"
"github.com/NeverENG/BanDB/storage/zstorage"
)

func main() {
rates := flag.String("rates", "1000,10000,50000,200000", "开环采集速率档位 (Hz),逗号分隔")
dur := flag.Duration("d", 10*time.Second, "每相/每档的持续时间")
satDur := flag.Duration("sat", 10*time.Second, "饱和相持续时间;0 跳过")
valueSize := flag.Int("vs", 64, "IMU 样本 value 字节数")
qDepth := flag.Int("qdepth", 1024, "有界队列深度;满即记为丢帧")
memTableSize := flag.Int("memtable", 4096, "MaxMemTableSize(active 表条目阈值,调小以强制频繁 flush 验内存封顶)")
flag.Parse()

rateList, err := parseRates(*rates)
if err != nil {
fmt.Fprintf(os.Stderr, "invalid -rates: %v\n", err)
os.Exit(1)
}

fmt.Println("========================================")
fmt.Println(" BanDB Ingest Benchmark (A1 · engine)")
fmt.Println("========================================")
fmt.Printf(" Value size: %d bytes\n", *valueSize)
fmt.Printf(" Queue depth: %d\n", *qDepth)
fmt.Printf(" MemTable cap: %d entries\n", *memTableSize)
fmt.Printf(" Duration: sat=%s open-loop=%s/rate\n", *satDur, *dur)
fmt.Printf(" Open rates: %v Hz\n", rateList)
fmt.Println("========================================")
fmt.Println()

// 1) 饱和相:闭环打满,找吞吐天花板。-sat 0 可跳过(避免与开环相互相污染内存测量)。
var sat Result
hasSat := *satDur > 0
if hasSat {
sat = runSaturation(*satDur, *valueSize, *memTableSize)
fmt.Printf("[Saturation] throughput=%.0f writes/s mean=%s heap_peak=%s sys_peak=%s\n\n",
sat.Throughput, lat(sat.MeanLat), mib(sat.HeapPeak), mib(sat.SysPeak))
}

// 2) 开环相:各速率档证 0 丢帧 + 尾延迟 + 内存封顶。
results := make([]Result, 0, len(rateList))
for _, r := range rateList {
res := runOpenLoop(r, *dur, *valueSize, *qDepth, *memTableSize)
results = append(results, res)
printResult(res)
}

printTable(hasSat, sat, results)
}

// Result 单相/单档的压测结果
type Result struct {
Label string // "saturated" 或 "<rate>Hz"
RateHz int // 开环目标速率;饱和相为 0
Duration time.Duration
Produced int64 // 应投递样本数(饱和相 = 写入数)
Dropped int64 // 队列满导致的丢帧数
Written int64 // 实际写入引擎的样本数
Throughput float64 // 实际写入吞吐 (writes/sec)
MeanLat time.Duration // 饱和相用:1/throughput
P50, P99, P999, P9999, Max time.Duration
HeapPeak uint64 // HeapAlloc 峰值 (bytes)
SysPeak uint64 // Sys 峰值 (bytes)
}

// setupEngine 指向临时目录并以小 memtable 创建引擎,返回引擎与清理函数。
func setupEngine(memTableSize int) (*storage.Engine, *zstorage.MemTable, func()) {
tmp, err := os.MkdirTemp("", "bandb-ingest-*")
if err != nil {
panic(err)
}
config.G.SSTablePath = tmp
config.G.WALPath = filepath.Join(tmp, "wal.log")
config.G.MaxMemTableSize = memTableSize

memTable := zstorage.NewMemTable()
engine := storage.NewEngine(memTable)
cleanup := func() {
_ = memTable.Close()
os.RemoveAll(tmp)
}
return engine, memTable, cleanup
}

// memSampler 每 100ms 采样一次 MemStats,返回停止函数(调用后回填峰值)。
func memSampler(heapPeak, sysPeak *uint64) func() {
stop := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var ms runtime.MemStats
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
runtime.ReadMemStats(&ms)
if ms.HeapAlloc > *heapPeak {
*heapPeak = ms.HeapAlloc
}
if ms.Sys > *sysPeak {
*sysPeak = ms.Sys
}
case <-stop:
return
}
}
}()
return func() {
close(stop)
wg.Wait()
}
}

// runSaturation 闭环打满:不做 per-op 计时(避免计时器开销污染亚微秒写),
// 只数总量得到吞吐天花板,平均延迟由 1/throughput 推导。
func runSaturation(dur time.Duration, valueSize, memTableSize int) Result {
engine, _, cleanup := setupEngine(memTableSize)
defer cleanup()

runtime.GC() // 清掉上一轮残留,使本轮 heap 峰值只反映本轮活对象
var heapPeak, sysPeak uint64
stopMem := memSampler(&heapPeak, &sysPeak)

var written, seq int64
start := time.Now()
deadline := start.Add(dur)
for time.Now().Before(deadline) {
// 每 1024 次才查一次时钟,降低 time.Now 开销对吞吐的干扰。
for i := 0; i < 1024; i++ {
key := []byte(fmt.Sprintf("imu:dev0:%020d", seq))
seq++
value := make([]byte, valueSize) // 每条独立 value,保证内存计量真实
_ = engine.Put(key, value)
written++
}
}
elapsed := time.Since(start)
stopMem()

tput := float64(written) / elapsed.Seconds()
var mean time.Duration
if tput > 0 {
mean = time.Duration(float64(time.Second) / tput)
}
return Result{
Label: "saturated",
Duration: elapsed,
Produced: written,
Written: written,
Throughput: tput,
MeanLat: mean,
HeapPeak: heapPeak,
SysPeak: sysPeak,
}
}

// runOpenLoop 开环定速率:生产者按固定速率非阻塞投递,队列满即丢帧;
// 消费者单写入者,per-op 计时(此处速率有界,计时开销可忽略),得到尾延迟。
func runOpenLoop(rateHz int, dur time.Duration, valueSize, qDepth, memTableSize int) Result {
engine, _, cleanup := setupEngine(memTableSize)
defer cleanup()

runtime.GC() // 清掉上一轮残留,使本轮 heap 峰值只反映本轮活对象
var heapPeak, sysPeak uint64
stopMem := memSampler(&heapPeak, &sysPeak)

type sample struct{ key, value []byte }
q := make(chan sample, qDepth)

var produced, dropped int64
go func() {
interval := time.Duration(int64(time.Second) / int64(rateHz))
next := time.Now()
deadline := time.Now().Add(dur)
var seq int64
for time.Now().Before(deadline) {
key := []byte(fmt.Sprintf("imu:dev0:%020d", seq))
seq++
produced++
value := make([]byte, valueSize) // 每条独立 value
select {
case q <- sample{key: key, value: value}:
default:
dropped++ // 队列满 = 引擎跟不上 = 丢帧
}
next = next.Add(interval)
if d := time.Until(next); d > 0 {
time.Sleep(d)
}
}
close(q)
}()

latencies := make([]time.Duration, 0, rateHz*int(dur/time.Second)+1024)
var written int64
start := time.Now()
for s := range q {
t0 := time.Now()
_ = engine.Put(s.key, s.value)
latencies = append(latencies, time.Since(t0))
written++
}
elapsed := time.Since(start)
stopMem()

sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] })
return Result{
Label: fmt.Sprintf("%dHz", rateHz),
RateHz: rateHz,
Duration: elapsed,
Produced: produced,
Dropped: dropped,
Written: written,
Throughput: float64(written) / elapsed.Seconds(),
P50: pct(latencies, 0.50),
P99: pct(latencies, 0.99),
P999: pct(latencies, 0.999),
P9999: pct(latencies, 0.9999),
Max: pct(latencies, 1.0),
HeapPeak: heapPeak,
SysPeak: sysPeak,
}
}

func pct(sorted []time.Duration, p float64) time.Duration {
if len(sorted) == 0 {
return 0
}
idx := int(float64(len(sorted)) * p)
if idx >= len(sorted) {
idx = len(sorted) - 1
}
return sorted[idx]
}

func printResult(r Result) {
fmt.Printf("--- %s ---\n", r.Label)
fmt.Printf(" produced=%d written=%d dropped=%d\n", r.Produced, r.Written, r.Dropped)
fmt.Printf(" throughput=%.0f writes/s p99.9=%s p99.99=%s max=%s\n",
r.Throughput, lat(r.P999), lat(r.P9999), lat(r.Max))
fmt.Printf(" heap_peak=%s sys_peak=%s\n", mib(r.HeapPeak), mib(r.SysPeak))
fmt.Println()
}

func printTable(hasSat bool, sat Result, rs []Result) {
fmt.Println("========================================")
fmt.Println(" Summary")
fmt.Println("========================================")
if hasSat {
fmt.Printf(" ceiling (saturated): %.0f writes/s heap_peak=%s\n", sat.Throughput, mib(sat.HeapPeak))
}
fmt.Println(" --- open-loop (fixed rate) ---")
fmt.Printf(" %-10s %-12s %-9s %-9s %-9s %-10s\n",
"rate", "throughput", "dropped", "p99.9", "max", "heap_peak")
for _, r := range rs {
fmt.Printf(" %-10s %-12.0f %-9d %-9s %-9s %-10s\n",
r.Label, r.Throughput, r.Dropped, lat(r.P999), lat(r.Max), mib(r.HeapPeak))
}
fmt.Println("========================================")
fmt.Println(" Durability: N/A at engine layer (no storage WAL — see A2 / Raft path)")
fmt.Println("========================================")
}

func parseRates(s string) ([]int, error) {
parts := strings.Split(s, ",")
out := make([]int, 0, len(parts))
for _, p := range parts {
v, err := strconv.Atoi(strings.TrimSpace(p))
if err != nil {
return nil, err
}
if v <= 0 {
return nil, fmt.Errorf("rate must be > 0, got %d", v)
}
out = append(out, v)
}
return out, nil
}

func lat(d time.Duration) string {
switch {
case d < time.Microsecond:
return fmt.Sprintf("%dns", d.Nanoseconds())
case d < time.Millisecond:
return fmt.Sprintf("%.1fus", float64(d.Nanoseconds())/1000)
case d < time.Second:
return fmt.Sprintf("%.2fms", float64(d.Nanoseconds())/1e6)
default:
return fmt.Sprintf("%.2fs", d.Seconds())
}
}

func mib(b uint64) string {
return fmt.Sprintf("%.1fMiB", float64(b)/(1024*1024))
}
77 changes: 77 additions & 0 deletions docs-step/M1-ingest-benchmark-plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# M1 · IMU 高频摄入压测 —— 方案

> 目标:用一个能跑出数字的 demo,证明 BanDB 的存储地基扛得住「高频、定速率、内存受限」的传感器摄入。
> 产出五个指标:**写吞吐 / p99 写延迟 / 内存峰值 / 丢帧数=0 / WAL 重启可恢复**。
> 方案确认后再写代码(遵循 CLAUDE.md「方案先行」)。

---

## 核心模型:开环(open-loop),不是闭环

现有 `benchmark/`(TCP 客户端)和 `storage/bench_test.go`(`b.N` 微基准)都是**闭环**:循环里"做完一个立刻做下一个",测的是"最多能跑多快"。

但 IMU/相机的本质是**固定速率吐数据**(如 1kHz),不管 DB 多快。所以要证"0 丢帧",必须用**开环**模型:

```
生产者 ──以固定速率 R 投递──▶ 有界队列(深度 D) ──▶ 消费者(写入 BanDB)
队列满 = DB 跟不上 = 丢帧(+1)
```

- **生产者**:按 `ticker(1/R)` 节奏生成 IMU 样本,投进有界 channel;channel 满则 `dropped++`(这就是"丢帧"的精确定义)。
- **消费者**:从 channel 取出写入引擎,记录每次写延迟。
- **判定**:在速率 R 下跑 T 秒,`dropped == 0` 即"该速率下不丢帧"。

配套两步跑法:
1. **找天花板(闭环)**:先打满,得到"最大可持续写入速率 Rmax"。
2. **证 0 丢帧(开环)**:在 R = 50%~70% × Rmax 下定速跑,证明 `dropped=0` 且 p99 延迟有界。

---

## ★ 唯一要你拍板的岔路:M1 测哪一层?

| 选项 | 测什么 | 优点 | 缺点 | 我的建议 |
|---|---|---|---|---|
| **A1 引擎内压测**(进程内直接驱动 `storage.Engine`) | LSM 存储引擎本身 | 内存测量干净(同进程 `runtime.ReadMemStats`)、可封顶(`GOMEMLIMIT`)、WAL 重启可恢复就是 close→reopen→scan、无网络/Raft 噪声、确定性强 | 不覆盖 BanNet + Raft 全链路 | **✅ M1 选它** |
| **A2 全链路压测**(扩展 `benchmark/` 走 TCP→Raft→LSM) | 端到端真实路径 | 测的是真部署 | 服务端内存难干净测量、单节点 Raft 每写一次 fsync 引入噪声、变量多 | 作为 **M1.5 后续** |
| A3 两个都做 | — | 全 | M1 拖长 | 不在 M1 |

**理由**:M1 要证的"LSM 扛高频顺序写 / 内存封顶 / 0 丢帧 / WAL 可恢复"**全是存储引擎属性**;内存封顶+测量只有进程内才干净;WAL 恢复是引擎操作。先把**地基**这层钉死、拿到干净数字,再用 A2 证全链路。

> **默认推进 A1**。若你要 M1 直接证全链路,告诉我改走 A2。

---

## 关键发现:耐久性在 Raft 层,不在存储层(决定指标归属)

读代码确认:commit `删除未接入写路径的 storage WAL` 之后,**存储引擎层已无 WAL**。MemTable 数据仅在 flush 成 SSTable 后落盘,未 flush 的 MemTable 数据**易失**。真正的崩溃恢复在 **Raft 层**(`Raft/raft_wal.go` group-commit fsync + 重启 `LoadLogs` 重放 → `service/fsm.go` 应用到引擎)。

因此五个指标**按层归属**,不混淆:

| 指标 | A1 引擎内 | A2 全链路(Raft) |
|---|---|---|
| 写吞吐 | ✅ | ✅ |
| p99 写延迟 | ✅ | ✅ |
| 内存峰值(封顶下不无限涨) | ✅(同进程 MemStats) | △(服务端,难干净测) |
| 定速率 0 丢帧 | ✅ | ✅ |
| **WAL 重启可恢复(崩溃 0 丢失)** | ❌ 引擎无 WAL,做不到 | ✅ **只能这里证** |

A1 明确打印 `Durability: N/A at engine layer (no storage WAL — see A2)`,不含糊其辞。

## 已定的默认(无需你决策,除非你想改)

- **IMU 数据形状**:key = `imu:<dev>:<ts_nanos>`(约 24B,时间戳前缀 → 顺序写 + 天然 range scan);value = 64B(accel/gyro/mag 9×float32 + 头)。
- **速率档位**:200Hz / 1kHz / 5kHz 三档跑(覆盖典型 IMU 到激进档)。
- **内存封顶**:`GOMEMLIMIT` + 调小 `config.G.MaxMemTableSize` 强制频繁 flush,验证内存不随时长无限涨。
- **内存峰值采集**:后台每 100ms `runtime.ReadMemStats`,记录 `HeapAlloc` 峰值与 `Sys`。
- **WAL 重启可恢复**:摄入 N 条 → 关闭引擎 → 重开 → 全量 scan,断言可读条数 == 已 ack 条数(0 丢失)。
- **代码落点**:新建 `benchmark/ingest/`(独立 `package main`,不动现有 `benchmark/` 与 `storage/bench_test.go`)。
- **输出**:跑完把五个指标 + 各速率档表格写进本目录 `M1-ingest-benchmark-result.md`。

---

## 交付物(M1 完成时)

1. `benchmark/ingest/` 开环压测程序(A1)。
2. `docs-step/M1-ingest-benchmark-result.md`:三档速率 × 五指标的真实数据表 + 一句结论。
3. 一条可复现命令(写进 result 文档)。
Loading
Loading