From 42b47bca1bcf6dcf6aaaecfde45128677dd6ec49 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Mon, 29 Jun 2026 20:36:16 +0800 Subject: [PATCH 1/8] add evmonly executor load test harness --- giga/evmonly/README.md | 2 + giga/evmonly/cmd/evmonly-loadtest/README.md | 80 ++ giga/evmonly/cmd/evmonly-loadtest/main.go | 808 ++++++++++++++++++ .../evmonly/cmd/evmonly-loadtest/main_test.go | 71 ++ 4 files changed, 961 insertions(+) create mode 100644 giga/evmonly/cmd/evmonly-loadtest/README.md create mode 100644 giga/evmonly/cmd/evmonly-loadtest/main.go create mode 100644 giga/evmonly/cmd/evmonly-loadtest/main_test.go diff --git a/giga/evmonly/README.md b/giga/evmonly/README.md index 52d3a715c1..203ea7a5ff 100644 --- a/giga/evmonly/README.md +++ b/giga/evmonly/README.md @@ -34,6 +34,8 @@ The `evmonly` package currently provides: contract address, and effective gas price - a map-backed `MemoryState` for tests and early integration - fail-closed custom precompile placeholders plus an SDK-free staking precompile +- a standalone load harness at `giga/evmonly/cmd/evmonly-loadtest` that feeds + generated transfer blocks into the executor with mock state and receipt sinks The executor accepts config for nonce checks, gas-price checks, minimum gas price, chain config, and the custom precompile registry. diff --git a/giga/evmonly/cmd/evmonly-loadtest/README.md b/giga/evmonly/cmd/evmonly-loadtest/README.md new file mode 100644 index 0000000000..fc3c4a0186 --- /dev/null +++ b/giga/evmonly/cmd/evmonly-loadtest/README.md @@ -0,0 +1,80 @@ +# evmonly-loadtest + +`evmonly-loadtest` is a standalone executable for feeding synthetic blocks to +the EVM-only executor without Cosmos SDK state, mempool, RPC, or persistence. + +It currently generates pure EVM legacy transfer transactions. Each generated +sender account has exactly one nonce-0 transaction and is funded in the +command's in-memory genesis state before its block is queued. Recipients are +unique by default so the transfer workload can be extended toward +less-conflicting account layouts; pass `--recipient=0x...` to force a single +recipient. + +Run a bounded test: + +```bash +go run ./giga/evmonly/cmd/evmonly-loadtest --blocks=1000 --txs-per-block=1000 +``` + +Run continuously until interrupted: + +```bash +go run ./giga/evmonly/cmd/evmonly-loadtest --txs-per-block=1000 +``` + +Example local saturation run: + +```bash +go run ./giga/evmonly/cmd/evmonly-loadtest \ + --metrics-addr= \ + --report-interval=5s \ + --blocks=2000 \ + --txs-per-block=1000 \ + --builders=16 \ + --workers=1 \ + --executor-workers=12 \ + --gas-price-wei=0 \ + --min-gas-price-wei=0 \ + --queue-size=512 +``` + +The zero gas price/min-gas settings keep the transfer workload focused on the +optimistic no-overlap case. Non-zero fees make every transaction update the +same coinbase balance, which is a real intra-block conflict. + +Useful knobs: + +- `--workers`: parallel executor workers. The default is `1`. +- `--executor-workers`: parallel OCC workers inside each executor. The default + is `min(12, GOMAXPROCS)`, following the `sei-v3` OCC worker default. +- `--builders`: parallel block builders used to keep the input queue full. The + default is `GOMAXPROCS`. +- `--queue-size`: buffered blocks ready for workers. The default is `64`. +- `--target-blocks-per-sec`: cap block input rate. The default `0` feeds as + fast as block generation and the queue allow. +- `--metrics-addr`: Prometheus endpoint. The default is + `127.0.0.1:9698`; set it to empty to disable HTTP metrics. +- `--report-interval`: stdout rate reporting interval. The default is `5s`. +- `--gas-price-wei`, `--min-gas-price-wei`, `--sender-balance-wei`, + `--transfer-value-wei`: transaction economics for the generated accounts. + +The command reports these saturation signals on stdout and at `/metrics`: + +- block input throughput +- block finishing throughput +- finished transactions per second +- total gas consumed per second +- queued blocks and cumulative totals + +The executor output is intentionally discarded through mocks: + +- `generatedState` implements `evmonly.StateReader` and supplies generated + genesis balances, nonces, code, and storage. +- `discardStateWriter` implements `evmonly.StateWriter` and sinks the + executor `StateChangeSet`. +- `discardReceiptSink` sinks Ethereum receipts. + +Future workloads should add another workload builder beside `transferWorkload`. +ERC20 transfers can reuse the same harness by adding contract code/storage to +`generatedState`, generating calldata transactions, and keeping the same block +producer/executor/metrics pipeline. diff --git a/giga/evmonly/cmd/evmonly-loadtest/main.go b/giga/evmonly/cmd/evmonly-loadtest/main.go new file mode 100644 index 0000000000..235ea79cab --- /dev/null +++ b/giga/evmonly/cmd/evmonly-loadtest/main.go @@ -0,0 +1,808 @@ +package main + +import ( + "context" + "crypto/ecdsa" + "encoding/binary" + "errors" + "flag" + "fmt" + "math" + "math/big" + "net" + "net/http" + "os" + "os/signal" + "runtime" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" + + "github.com/sei-protocol/sei-chain/giga/evmonly" +) + +const ( + defaultChainID = "713715" + defaultGasPriceWei = "1000000000" + defaultMinGasPriceWei = "1000000000" + defaultSenderBalance = "1000000000000000000" + defaultTransferValue = "1" + defaultMetricsAddr = "127.0.0.1:9698" + defaultReportInterval = 5 * time.Second + defaultQueueSize = 64 + defaultTxGasLimit = 21_000 + defaultTxsPerBlock = 1_000 + defaultWorkerCount = 1 + defaultCoinbaseAddress = "0x00000000000000000000000000000000000000cb" +) + +type config struct { + blocks uint64 + txsPerBlock int + queueSize int + builders int + workers int + executorWorkers int + targetBlocksPerSec float64 + reportInterval time.Duration + metricsAddr string + workload string + chainID *big.Int + gasPrice *big.Int + minGasPrice *big.Int + senderBalance *big.Int + transferValue *big.Int + txGasLimit uint64 + blockGasLimit uint64 + coinbase common.Address + fixedRecipient *common.Address + disableGasPriceRule bool +} + +type blockEnvelope struct { + number uint64 + request evmonly.BlockRequest +} + +func main() { + cfg, err := parseConfig(os.Args[1:]) + if err != nil { + fmt.Fprintf(os.Stderr, "evmonly-loadtest: %v\n", err) + os.Exit(2) + } + if err := run(cfg); err != nil { + fmt.Fprintf(os.Stderr, "evmonly-loadtest: %v\n", err) + os.Exit(1) + } +} + +func parseConfig(args []string) (config, error) { + cfg := config{} + fs := flag.NewFlagSet("evmonly-loadtest", flag.ContinueOnError) + fs.SetOutput(os.Stderr) + + chainID := fs.String("chain-id", defaultChainID, "EVM chain ID used to sign and execute transactions") + gasPrice := fs.String("gas-price-wei", defaultGasPriceWei, "legacy transaction gas price in wei") + minGasPrice := fs.String("min-gas-price-wei", defaultMinGasPriceWei, "executor minimum gas price in wei") + senderBalance := fs.String("sender-balance-wei", defaultSenderBalance, "generated sender genesis balance in wei") + transferValue := fs.String("transfer-value-wei", defaultTransferValue, "wei transferred by each generated transaction") + coinbase := fs.String("coinbase", defaultCoinbaseAddress, "block coinbase address") + recipient := fs.String("recipient", "", "optional fixed transfer recipient; empty creates one recipient per tx") + + fs.Uint64Var(&cfg.blocks, "blocks", 0, "number of blocks to feed; 0 runs until interrupted") + fs.IntVar(&cfg.txsPerBlock, "txs-per-block", defaultTxsPerBlock, "transactions generated per block") + fs.IntVar(&cfg.queueSize, "queue-size", defaultQueueSize, "buffered blocks waiting for executor workers") + fs.IntVar(&cfg.builders, "builders", runtime.GOMAXPROCS(0), "parallel block builder goroutines") + fs.IntVar(&cfg.workers, "workers", defaultWorkerCount, "parallel executor workers") + fs.IntVar(&cfg.executorWorkers, "executor-workers", defaultExecutorWorkers(), "parallel OCC workers inside each executor") + fs.Float64Var(&cfg.targetBlocksPerSec, "target-blocks-per-sec", 0, "input block rate cap; 0 means unlimited") + fs.DurationVar(&cfg.reportInterval, "report-interval", defaultReportInterval, "stdout and rate-gauge reporting interval; 0 disables periodic reports") + fs.StringVar(&cfg.metricsAddr, "metrics-addr", defaultMetricsAddr, "Prometheus listen address; empty disables HTTP metrics") + fs.StringVar(&cfg.workload, "workload", "transfer", "workload type; currently transfer") + fs.Uint64Var(&cfg.txGasLimit, "tx-gas-limit", defaultTxGasLimit, "gas limit for each generated transaction") + fs.Uint64Var(&cfg.blockGasLimit, "block-gas-limit", 0, "block gas limit; 0 lets the executor use its maximum") + fs.BoolVar(&cfg.disableGasPriceRule, "disable-gas-price-rule", false, "disable the executor min-gas-price validity rule") + + if err := fs.Parse(args); err != nil { + return config{}, err + } + var err error + if cfg.chainID, err = parsePositiveBig("chain-id", *chainID); err != nil { + return config{}, err + } + if cfg.gasPrice, err = parseNonNegativeBig("gas-price-wei", *gasPrice); err != nil { + return config{}, err + } + if cfg.minGasPrice, err = parseNonNegativeBig("min-gas-price-wei", *minGasPrice); err != nil { + return config{}, err + } + if cfg.senderBalance, err = parseNonNegativeBig("sender-balance-wei", *senderBalance); err != nil { + return config{}, err + } + if cfg.transferValue, err = parseNonNegativeBig("transfer-value-wei", *transferValue); err != nil { + return config{}, err + } + if !common.IsHexAddress(*coinbase) { + return config{}, fmt.Errorf("coinbase must be a hex EVM address") + } + cfg.coinbase = common.HexToAddress(*coinbase) + if *recipient != "" { + if !common.IsHexAddress(*recipient) { + return config{}, fmt.Errorf("recipient must be a hex EVM address") + } + addr := common.HexToAddress(*recipient) + cfg.fixedRecipient = &addr + } + cfg.workload = strings.ToLower(strings.TrimSpace(cfg.workload)) + if cfg.workload != "transfer" { + return config{}, fmt.Errorf("unsupported workload %q", cfg.workload) + } + if cfg.txsPerBlock <= 0 { + return config{}, fmt.Errorf("txs-per-block must be positive") + } + if cfg.queueSize <= 0 { + return config{}, fmt.Errorf("queue-size must be positive") + } + if cfg.builders <= 0 { + return config{}, fmt.Errorf("builders must be positive") + } + if cfg.workers <= 0 { + return config{}, fmt.Errorf("workers must be positive") + } + if cfg.executorWorkers <= 0 { + return config{}, fmt.Errorf("executor-workers must be positive") + } + if cfg.targetBlocksPerSec < 0 { + return config{}, fmt.Errorf("target-blocks-per-sec must be non-negative") + } + if cfg.reportInterval < 0 { + return config{}, fmt.Errorf("report-interval must be non-negative") + } + if cfg.txGasLimit == 0 { + return config{}, fmt.Errorf("tx-gas-limit must be positive") + } + if !cfg.disableGasPriceRule && cfg.gasPrice.Cmp(cfg.minGasPrice) < 0 { + return config{}, fmt.Errorf("gas-price-wei must be greater than or equal to min-gas-price-wei unless disable-gas-price-rule is set") + } + requiredBalance := new(big.Int).Mul(new(big.Int).SetUint64(cfg.txGasLimit), cfg.gasPrice) + requiredBalance.Add(requiredBalance, cfg.transferValue) + if cfg.senderBalance.Cmp(requiredBalance) < 0 { + return config{}, fmt.Errorf("sender-balance-wei must cover transfer value plus max gas cost: need at least %s", requiredBalance.String()) + } + return cfg, nil +} + +func defaultExecutorWorkers() int { + workers := runtime.GOMAXPROCS(0) + if workers > 12 { + return 12 + } + return workers +} + +func parsePositiveBig(name, raw string) (*big.Int, error) { + v, err := parseBig(name, raw) + if err != nil { + return nil, err + } + if v.Sign() <= 0 { + return nil, fmt.Errorf("%s must be positive", name) + } + return v, nil +} + +func parseNonNegativeBig(name, raw string) (*big.Int, error) { + v, err := parseBig(name, raw) + if err != nil { + return nil, err + } + if v.Sign() < 0 { + return nil, fmt.Errorf("%s must be non-negative", name) + } + return v, nil +} + +func parseBig(name, raw string) (*big.Int, error) { + v, ok := new(big.Int).SetString(strings.TrimSpace(raw), 10) + if !ok { + return nil, fmt.Errorf("%s must be a base-10 integer", name) + } + return v, nil +} + +func run(cfg config) error { + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + state := newGeneratedState() + workload := newTransferWorkload(cfg, state) + registry := prometheus.NewRegistry() + metrics := newLoadMetrics(registry) + blocks := make(chan blockEnvelope, cfg.queueSize) + + var server *metricsServer + if cfg.metricsAddr != "" { + var err error + server, err = startMetricsServer(cfg.metricsAddr, registry) + if err != nil { + return err + } + defer func() { + if err := server.stop(3 * time.Second); err != nil { + fmt.Fprintf(os.Stderr, "evmonly-loadtest: metrics server shutdown: %v\n", err) + } + }() + fmt.Printf("metrics listening on http://%s/metrics\n", cfg.metricsAddr) + } + + reportCtx, stopReporter := context.WithCancel(ctx) + reportDone := make(chan struct{}) + go func() { + defer close(reportDone) + reportLoop(reportCtx, cfg.reportInterval, metrics, blocks) + }() + + startedAt := time.Now() + group, groupCtx := errgroup.WithContext(ctx) + group.Go(func() error { + defer close(blocks) + return produceBlocks(groupCtx, cfg, workload, blocks, metrics) + }) + for workerID := 0; workerID < cfg.workers; workerID++ { + workerID := workerID + group.Go(func() error { + executor := evmonly.NewExecutor(evmonly.Config{ + MinGasPrice: new(big.Int).Set(cfg.minGasPrice), + DisableGasPriceCheck: cfg.disableGasPriceRule, + OCCWorkers: cfg.executorWorkers, + }, evmonly.WithState(state)) + return executeBlocks(groupCtx, workerID, executor, blocks, &discardStateWriter{}, discardReceiptSink{}, metrics) + }) + } + + err := group.Wait() + stopReporter() + <-reportDone + + if errors.Is(err, context.Canceled) { + err = nil + } + printFinalReport(startedAt, metrics.snapshot()) + return err +} + +func produceBlocks(ctx context.Context, cfg config, workload *transferWorkload, out chan<- blockEnvelope, metrics *loadMetrics) error { + var limiter *rate.Limiter + if cfg.targetBlocksPerSec > 0 { + burst := int(math.Ceil(cfg.targetBlocksPerSec)) + if burst < 1 { + burst = 1 + } + limiter = rate.NewLimiter(rate.Limit(cfg.targetBlocksPerSec), burst) + } + + var nextBlock atomic.Uint64 + group, groupCtx := errgroup.WithContext(ctx) + for builderID := 0; builderID < cfg.builders; builderID++ { + group.Go(func() error { + for { + number := nextBlock.Add(1) + if cfg.blocks != 0 && number > cfg.blocks { + return nil + } + if limiter != nil { + if err := limiter.Wait(groupCtx); err != nil { + return nil + } + } + request, err := workload.buildBlock(groupCtx, number) + if err != nil { + if groupCtx.Err() != nil { + return nil + } + return err + } + block := blockEnvelope{number: number, request: request} + select { + case out <- block: + metrics.recordInput() + case <-groupCtx.Done(): + return nil + } + } + }) + } + return group.Wait() +} + +func executeBlocks( + ctx context.Context, + workerID int, + executor evmonly.BlockExecutor, + blocks <-chan blockEnvelope, + stateWriter evmonly.StateWriter, + receiptSink receiptSink, + metrics *loadMetrics, +) error { + for { + select { + case <-ctx.Done(): + return nil + case block, ok := <-blocks: + if !ok { + return nil + } + result, err := executor.ExecuteBlock(ctx, block.request) + if err != nil { + metrics.recordExecutionError() + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("worker %d execute block %d: %w", workerID, block.number, err) + } + stateWriter.ApplyChangeSet(result.ChangeSet) + receiptSink.StoreReceipts(block.number, result.Receipts) + metrics.recordFinished(len(result.Txs), result.GasUsed) + } + } +} + +type transferWorkload struct { + cfg config + state *generatedState + signer ethtypes.Signer + accountCursor atomic.Uint64 +} + +func newTransferWorkload(cfg config, state *generatedState) *transferWorkload { + return &transferWorkload{ + cfg: cfg, + state: state, + signer: ethtypes.LatestSignerForChainID(cfg.chainID), + } +} + +func (w *transferWorkload) buildBlock(ctx context.Context, number uint64) (evmonly.BlockRequest, error) { + txs := make([][]byte, w.cfg.txsPerBlock) + for i := range txs { + select { + case <-ctx.Done(): + return evmonly.BlockRequest{}, ctx.Err() + default: + } + accountIndex := w.accountCursor.Add(1) + raw, sender, err := w.buildTransferTx(accountIndex) + if err != nil { + return evmonly.BlockRequest{}, err + } + w.state.SetBalance(sender, w.cfg.senderBalance) + txs[i] = raw + } + return evmonly.BlockRequest{ + Context: w.blockContext(number), + Txs: txs, + }, nil +} + +func (w *transferWorkload) buildTransferTx(accountIndex uint64) ([]byte, common.Address, error) { + key, err := deterministicPrivateKey(accountIndex) + if err != nil { + return nil, common.Address{}, err + } + sender := crypto.PubkeyToAddress(key.PublicKey) + recipient := w.recipient(accountIndex) + tx := ethtypes.NewTx(ðtypes.LegacyTx{ + Nonce: 0, + GasPrice: new(big.Int).Set(w.cfg.gasPrice), + Gas: w.cfg.txGasLimit, + To: &recipient, + Value: new(big.Int).Set(w.cfg.transferValue), + }) + signed, err := ethtypes.SignTx(tx, w.signer, key) + if err != nil { + return nil, common.Address{}, err + } + raw, err := signed.MarshalBinary() + if err != nil { + return nil, common.Address{}, err + } + return raw, sender, nil +} + +func (w *transferWorkload) recipient(accountIndex uint64) common.Address { + if w.cfg.fixedRecipient != nil { + return *w.cfg.fixedRecipient + } + return addressFromSeed("sei-evmonly-loadtest-recipient", accountIndex) +} + +func (w *transferWorkload) blockContext(number uint64) evmonly.BlockContext { + return evmonly.BlockContext{ + Number: number, + Time: uint64(time.Now().Unix()), + GasLimit: w.cfg.blockGasLimit, + ChainID: new(big.Int).Set(w.cfg.chainID), + BaseFee: big.NewInt(0), + Coinbase: w.cfg.coinbase, + ParentHash: hashFromSeed("sei-evmonly-loadtest-parent", number-1), + BlockHash: hashFromSeed("sei-evmonly-loadtest-block", number), + PrevRandao: hashFromSeed("sei-evmonly-loadtest-randao", number), + } +} + +func deterministicPrivateKey(index uint64) (*ecdsa.PrivateKey, error) { + var buf [16]byte + binary.BigEndian.PutUint64(buf[:8], index) + for attempt := uint64(0); ; attempt++ { + binary.BigEndian.PutUint64(buf[8:], attempt) + key, err := crypto.ToECDSA(crypto.Keccak256([]byte("sei-evmonly-loadtest-sender"), buf[:])) + if err == nil { + return key, nil + } + if attempt == ^uint64(0) { + break + } + } + return nil, fmt.Errorf("could not derive private key for account %d", index) +} + +func addressFromSeed(prefix string, index uint64) common.Address { + hash := hashFromSeed(prefix, index) + return common.BytesToAddress(hash[12:]) +} + +func hashFromSeed(prefix string, index uint64) common.Hash { + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], index) + return crypto.Keccak256Hash([]byte(prefix), buf[:]) +} + +type generatedState struct { + mu sync.RWMutex + balances map[common.Address]*big.Int + nonces map[common.Address]uint64 + code map[common.Address][]byte + storage map[common.Address]map[common.Hash]common.Hash +} + +var _ evmonly.StateReader = (*generatedState)(nil) + +func newGeneratedState() *generatedState { + return &generatedState{ + balances: map[common.Address]*big.Int{}, + nonces: map[common.Address]uint64{}, + code: map[common.Address][]byte{}, + storage: map[common.Address]map[common.Hash]common.Hash{}, + } +} + +func (s *generatedState) GetBalance(addr common.Address) *big.Int { + s.mu.RLock() + defer s.mu.RUnlock() + if balance, ok := s.balances[addr]; ok && balance != nil { + return new(big.Int).Set(balance) + } + return new(big.Int) +} + +func (s *generatedState) SetBalance(addr common.Address, balance *big.Int) { + s.mu.Lock() + defer s.mu.Unlock() + if balance == nil { + s.balances[addr] = new(big.Int) + return + } + s.balances[addr] = new(big.Int).Set(balance) +} + +func (s *generatedState) GetNonce(addr common.Address) uint64 { + s.mu.RLock() + defer s.mu.RUnlock() + return s.nonces[addr] +} + +func (s *generatedState) SetNonce(addr common.Address, nonce uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.nonces[addr] = nonce +} + +func (s *generatedState) GetCode(addr common.Address) []byte { + s.mu.RLock() + defer s.mu.RUnlock() + return cloneBytes(s.code[addr]) +} + +func (s *generatedState) SetCode(addr common.Address, code []byte) { + s.mu.Lock() + defer s.mu.Unlock() + s.code[addr] = cloneBytes(code) +} + +func (s *generatedState) GetState(addr common.Address, key common.Hash) common.Hash { + s.mu.RLock() + defer s.mu.RUnlock() + if accountStorage, ok := s.storage[addr]; ok { + return accountStorage[key] + } + return common.Hash{} +} + +func (s *generatedState) SetState(addr common.Address, key common.Hash, value common.Hash) { + s.mu.Lock() + defer s.mu.Unlock() + accountStorage, ok := s.storage[addr] + if !ok { + accountStorage = map[common.Hash]common.Hash{} + s.storage[addr] = accountStorage + } + if value == (common.Hash{}) { + delete(accountStorage, key) + return + } + accountStorage[key] = value +} + +func cloneBytes(v []byte) []byte { + if len(v) == 0 { + return nil + } + return append([]byte(nil), v...) +} + +type discardStateWriter struct{} + +var _ evmonly.StateWriter = (*discardStateWriter)(nil) + +func (*discardStateWriter) ApplyChangeSet(evmonly.StateChangeSet) {} + +type receiptSink interface { + StoreReceipts(height uint64, receipts ethtypes.Receipts) +} + +type discardReceiptSink struct{} + +func (discardReceiptSink) StoreReceipts(uint64, ethtypes.Receipts) {} + +type loadMetrics struct { + inputBlocks atomic.Uint64 + finishedBlocks atomic.Uint64 + finishedTxs atomic.Uint64 + gasConsumed atomic.Uint64 + executionErrors atomic.Uint64 + + inputBlocksTotal prometheus.Counter + finishedBlocksTotal prometheus.Counter + finishedTxsTotal prometheus.Counter + gasConsumedTotal prometheus.Counter + executionErrorsTotal prometheus.Counter + + inputBlockRate prometheus.Gauge + finishedBlockRate prometheus.Gauge + txRate prometheus.Gauge + gasRate prometheus.Gauge + queuedBlocks prometheus.Gauge +} + +type metricsSnapshot struct { + at time.Time + inputBlocks uint64 + finishedBlocks uint64 + finishedTxs uint64 + gasConsumed uint64 + executionErrors uint64 +} + +func newLoadMetrics(registry *prometheus.Registry) *loadMetrics { + m := &loadMetrics{ + inputBlocksTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "evmonly_loadtest_block_input_total", + Help: "Total blocks fed to the EVM-only executor input queue.", + }), + finishedBlocksTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "evmonly_loadtest_block_finished_total", + Help: "Total blocks that finished EVM-only executor execution.", + }), + finishedTxsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "evmonly_loadtest_transactions_finished_total", + Help: "Total transactions that finished EVM-only executor execution.", + }), + gasConsumedTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "evmonly_loadtest_gas_consumed_total", + Help: "Total EVM gas consumed by finished blocks.", + }), + executionErrorsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "evmonly_loadtest_execution_errors_total", + Help: "Total block execution errors returned by the EVM-only executor.", + }), + inputBlockRate: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "evmonly_loadtest_block_input_throughput", + Help: "Most recent measured block input throughput in blocks per second.", + }), + finishedBlockRate: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "evmonly_loadtest_block_finished_throughput", + Help: "Most recent measured block completion throughput in blocks per second.", + }), + txRate: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "evmonly_loadtest_transactions_per_second", + Help: "Most recent measured transaction execution throughput in transactions per second.", + }), + gasRate: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "evmonly_loadtest_gas_consumed_per_second", + Help: "Most recent measured gas consumption throughput in gas per second.", + }), + queuedBlocks: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "evmonly_loadtest_queued_blocks", + Help: "Blocks currently waiting in the executor input queue.", + }), + } + registry.MustRegister( + m.inputBlocksTotal, + m.finishedBlocksTotal, + m.finishedTxsTotal, + m.gasConsumedTotal, + m.executionErrorsTotal, + m.inputBlockRate, + m.finishedBlockRate, + m.txRate, + m.gasRate, + m.queuedBlocks, + ) + return m +} + +func (m *loadMetrics) recordInput() { + m.inputBlocks.Add(1) + m.inputBlocksTotal.Inc() +} + +func (m *loadMetrics) recordFinished(txCount int, gasUsed uint64) { + m.finishedBlocks.Add(1) + m.finishedTxs.Add(uint64(txCount)) + m.gasConsumed.Add(gasUsed) + m.finishedBlocksTotal.Inc() + m.finishedTxsTotal.Add(float64(txCount)) + m.gasConsumedTotal.Add(float64(gasUsed)) +} + +func (m *loadMetrics) recordExecutionError() { + m.executionErrors.Add(1) + m.executionErrorsTotal.Inc() +} + +func (m *loadMetrics) snapshot() metricsSnapshot { + return metricsSnapshot{ + at: time.Now(), + inputBlocks: m.inputBlocks.Load(), + finishedBlocks: m.finishedBlocks.Load(), + finishedTxs: m.finishedTxs.Load(), + gasConsumed: m.gasConsumed.Load(), + executionErrors: m.executionErrors.Load(), + } +} + +func (m *loadMetrics) setRates(prev, curr metricsSnapshot, queued int) { + elapsed := curr.at.Sub(prev.at).Seconds() + if elapsed <= 0 { + return + } + inputRate := float64(curr.inputBlocks-prev.inputBlocks) / elapsed + finishedRate := float64(curr.finishedBlocks-prev.finishedBlocks) / elapsed + txRate := float64(curr.finishedTxs-prev.finishedTxs) / elapsed + gasRate := float64(curr.gasConsumed-prev.gasConsumed) / elapsed + m.inputBlockRate.Set(inputRate) + m.finishedBlockRate.Set(finishedRate) + m.txRate.Set(txRate) + m.gasRate.Set(gasRate) + m.queuedBlocks.Set(float64(queued)) +} + +func reportLoop(ctx context.Context, interval time.Duration, metrics *loadMetrics, blocks <-chan blockEnvelope) { + if interval == 0 { + <-ctx.Done() + return + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + + prev := metrics.snapshot() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + curr := metrics.snapshot() + queued := len(blocks) + metrics.setRates(prev, curr, queued) + elapsed := curr.at.Sub(prev.at).Seconds() + if elapsed <= 0 { + prev = curr + continue + } + fmt.Printf( + "input_blocks/s=%.2f finished_blocks/s=%.2f tx/s=%.2f gas/s=%.2f queued_blocks=%d totals(input_blocks=%d finished_blocks=%d txs=%d gas=%d errors=%d)\n", + float64(curr.inputBlocks-prev.inputBlocks)/elapsed, + float64(curr.finishedBlocks-prev.finishedBlocks)/elapsed, + float64(curr.finishedTxs-prev.finishedTxs)/elapsed, + float64(curr.gasConsumed-prev.gasConsumed)/elapsed, + queued, + curr.inputBlocks, + curr.finishedBlocks, + curr.finishedTxs, + curr.gasConsumed, + curr.executionErrors, + ) + prev = curr + } + } +} + +func printFinalReport(startedAt time.Time, snapshot metricsSnapshot) { + elapsed := snapshot.at.Sub(startedAt).Seconds() + if elapsed <= 0 { + elapsed = 1 + } + fmt.Printf( + "complete elapsed=%s input_blocks=%d finished_blocks=%d txs=%d gas=%d errors=%d avg_input_blocks/s=%.2f avg_finished_blocks/s=%.2f avg_tx/s=%.2f avg_gas/s=%.2f\n", + snapshot.at.Sub(startedAt).Round(time.Millisecond), + snapshot.inputBlocks, + snapshot.finishedBlocks, + snapshot.finishedTxs, + snapshot.gasConsumed, + snapshot.executionErrors, + float64(snapshot.inputBlocks)/elapsed, + float64(snapshot.finishedBlocks)/elapsed, + float64(snapshot.finishedTxs)/elapsed, + float64(snapshot.gasConsumed)/elapsed, + ) +} + +type metricsServer struct { + server *http.Server + done chan error +} + +func startMetricsServer(addr string, registry *prometheus.Registry) (*metricsServer, error) { + listener, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("listen for metrics on %s: %w", addr, err) + } + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte("ok\n")) + }) + server := &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 3 * time.Second, + } + ms := &metricsServer{server: server, done: make(chan error, 1)} + go func() { + err := server.Serve(listener) + if errors.Is(err, http.ErrServerClosed) { + err = nil + } + ms.done <- err + }() + return ms, nil +} + +func (s *metricsServer) stop(timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err := s.server.Shutdown(ctx); err != nil { + return err + } + return <-s.done +} diff --git a/giga/evmonly/cmd/evmonly-loadtest/main_test.go b/giga/evmonly/cmd/evmonly-loadtest/main_test.go new file mode 100644 index 0000000000..43813e30c6 --- /dev/null +++ b/giga/evmonly/cmd/evmonly-loadtest/main_test.go @@ -0,0 +1,71 @@ +package main + +import ( + "context" + "testing" + + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/giga/evmonly" +) + +func TestTransferWorkloadExecutesAgainstEVMOnlyExecutor(t *testing.T) { + cfg, err := parseConfig([]string{ + "--metrics-addr=", + "--txs-per-block=4", + }) + require.NoError(t, err) + + state := newGeneratedState() + workload := newTransferWorkload(cfg, state) + request, err := workload.buildBlock(context.Background(), 1) + require.NoError(t, err) + + executor := evmonly.NewExecutor(evmonly.Config{ + MinGasPrice: cfg.minGasPrice, + }, evmonly.WithState(state)) + result, err := executor.ExecuteBlock(context.Background(), request) + require.NoError(t, err) + + require.Len(t, result.Txs, cfg.txsPerBlock) + require.Len(t, result.Receipts, cfg.txsPerBlock) + require.Equal(t, uint64(cfg.txsPerBlock)*cfg.txGasLimit, result.GasUsed) + for _, tx := range result.Txs { + require.Equal(t, ethtypes.ReceiptStatusSuccessful, tx.Status) + require.NoError(t, tx.Err) + } + + writer := &discardStateWriter{} + writer.ApplyChangeSet(result.ChangeSet) + discardReceiptSink{}.StoreReceipts(request.Context.Number, result.Receipts) +} + +func BenchmarkExecuteTransferBlock(b *testing.B) { + cfg, err := parseConfig([]string{ + "--metrics-addr=", + "--txs-per-block=1000", + }) + require.NoError(b, err) + + state := newGeneratedState() + workload := newTransferWorkload(cfg, state) + request, err := workload.buildBlock(context.Background(), 1) + require.NoError(b, err) + executor := evmonly.NewExecutor(evmonly.Config{ + MinGasPrice: cfg.minGasPrice, + }, evmonly.WithState(state)) + + b.ReportAllocs() + b.SetBytes(int64(cfg.txsPerBlock)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + result, err := executor.ExecuteBlock(context.Background(), request) + if err != nil { + b.Fatal(err) + } + if len(result.Txs) != cfg.txsPerBlock { + b.Fatalf("expected %d txs, got %d", cfg.txsPerBlock, len(result.Txs)) + } + } +} From f63312061f083b8f5fd2390c6466967edfc8484a Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Tue, 30 Jun 2026 11:23:54 +0800 Subject: [PATCH 2/8] add prebuilt block loadtest mode --- giga/evmonly/cmd/evmonly-loadtest/README.md | 22 +++ giga/evmonly/cmd/evmonly-loadtest/main.go | 126 +++++++++++++++++- .../evmonly/cmd/evmonly-loadtest/main_test.go | 19 +++ 3 files changed, 166 insertions(+), 1 deletion(-) diff --git a/giga/evmonly/cmd/evmonly-loadtest/README.md b/giga/evmonly/cmd/evmonly-loadtest/README.md index fc3c4a0186..13490d01a4 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/README.md +++ b/giga/evmonly/cmd/evmonly-loadtest/README.md @@ -38,6 +38,26 @@ go run ./giga/evmonly/cmd/evmonly-loadtest \ --queue-size=512 ``` +To isolate executor throughput from block generation, prebuild a bounded run +before starting executor workers: + +```bash +go run ./giga/evmonly/cmd/evmonly-loadtest \ + --metrics-addr= \ + --report-interval=5s \ + --prebuild-blocks \ + --blocks=400 \ + --txs-per-block=5000 \ + --builders=48 \ + --workers=1 \ + --executor-workers=24 \ + --gas-price-wei=0 \ + --min-gas-price-wei=0 \ + --queue-size=512 +``` + +Prebuilding requires `--blocks > 0` and stores every raw block in memory. + The zero gas price/min-gas settings keep the transfer workload focused on the optimistic no-overlap case. Non-zero fees make every transaction update the same coinbase balance, which is a real intra-block conflict. @@ -52,6 +72,8 @@ Useful knobs: - `--queue-size`: buffered blocks ready for workers. The default is `64`. - `--target-blocks-per-sec`: cap block input rate. The default `0` feeds as fast as block generation and the queue allow. +- `--prebuild-blocks`: generate all bounded blocks before starting executor + workers. This separates build throughput from executor throughput. - `--metrics-addr`: Prometheus endpoint. The default is `127.0.0.1:9698`; set it to empty to disable HTTP metrics. - `--report-interval`: stdout rate reporting interval. The default is `5s`. diff --git a/giga/evmonly/cmd/evmonly-loadtest/main.go b/giga/evmonly/cmd/evmonly-loadtest/main.go index 235ea79cab..949a50fa08 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main.go @@ -67,6 +67,7 @@ type config struct { coinbase common.Address fixedRecipient *common.Address disableGasPriceRule bool + prebuildBlocks bool } type blockEnvelope struct { @@ -112,6 +113,7 @@ func parseConfig(args []string) (config, error) { fs.Uint64Var(&cfg.txGasLimit, "tx-gas-limit", defaultTxGasLimit, "gas limit for each generated transaction") fs.Uint64Var(&cfg.blockGasLimit, "block-gas-limit", 0, "block gas limit; 0 lets the executor use its maximum") fs.BoolVar(&cfg.disableGasPriceRule, "disable-gas-price-rule", false, "disable the executor min-gas-price validity rule") + fs.BoolVar(&cfg.prebuildBlocks, "prebuild-blocks", false, "generate all bounded blocks before starting executor workers") if err := fs.Parse(args); err != nil { return config{}, err @@ -171,6 +173,9 @@ func parseConfig(args []string) (config, error) { if cfg.txGasLimit == 0 { return config{}, fmt.Errorf("tx-gas-limit must be positive") } + if cfg.prebuildBlocks && cfg.blocks == 0 { + return config{}, fmt.Errorf("prebuild-blocks requires --blocks > 0") + } if !cfg.disableGasPriceRule && cfg.gasPrice.Cmp(cfg.minGasPrice) < 0 { return config{}, fmt.Errorf("gas-price-wei must be greater than or equal to min-gas-price-wei unless disable-gas-price-rule is set") } @@ -228,7 +233,6 @@ func run(cfg config) error { workload := newTransferWorkload(cfg, state) registry := prometheus.NewRegistry() metrics := newLoadMetrics(registry) - blocks := make(chan blockEnvelope, cfg.queueSize) var server *metricsServer if cfg.metricsAddr != "" { @@ -245,6 +249,14 @@ func run(cfg config) error { fmt.Printf("metrics listening on http://%s/metrics\n", cfg.metricsAddr) } + if cfg.prebuildBlocks { + return runPrebuilt(ctx, cfg, state, workload, metrics) + } + return runStreaming(ctx, cfg, state, workload, metrics) +} + +func runStreaming(ctx context.Context, cfg config, state *generatedState, workload *transferWorkload, metrics *loadMetrics) error { + blocks := make(chan blockEnvelope, cfg.queueSize) reportCtx, stopReporter := context.WithCancel(ctx) reportDone := make(chan struct{}) go func() { @@ -281,6 +293,52 @@ func run(cfg config) error { return err } +func runPrebuilt(ctx context.Context, cfg config, state *generatedState, workload *transferWorkload, metrics *loadMetrics) error { + prebuildStartedAt := time.Now() + prebuilt, err := prebuildBlockRequests(ctx, cfg, workload) + if err != nil { + return err + } + prebuildElapsed := time.Since(prebuildStartedAt) + printPrebuildReport(prebuildElapsed, prebuilt, cfg.txsPerBlock) + + blocks := make(chan blockEnvelope, cfg.queueSize) + reportCtx, stopReporter := context.WithCancel(ctx) + reportDone := make(chan struct{}) + go func() { + defer close(reportDone) + reportLoop(reportCtx, cfg.reportInterval, metrics, blocks) + }() + + startedAt := time.Now() + group, groupCtx := errgroup.WithContext(ctx) + group.Go(func() error { + defer close(blocks) + return feedPrebuiltBlocks(groupCtx, prebuilt, blocks, metrics) + }) + for workerID := 0; workerID < cfg.workers; workerID++ { + workerID := workerID + group.Go(func() error { + executor := evmonly.NewExecutor(evmonly.Config{ + MinGasPrice: new(big.Int).Set(cfg.minGasPrice), + DisableGasPriceCheck: cfg.disableGasPriceRule, + OCCWorkers: cfg.executorWorkers, + }, evmonly.WithState(state)) + return executeBlocks(groupCtx, workerID, executor, blocks, &discardStateWriter{}, discardReceiptSink{}, metrics) + }) + } + + err = group.Wait() + stopReporter() + <-reportDone + + if errors.Is(err, context.Canceled) { + err = nil + } + printFinalReport(startedAt, metrics.snapshot()) + return err +} + func produceBlocks(ctx context.Context, cfg config, workload *transferWorkload, out chan<- blockEnvelope, metrics *loadMetrics) error { var limiter *rate.Limiter if cfg.targetBlocksPerSec > 0 { @@ -325,6 +383,56 @@ func produceBlocks(ctx context.Context, cfg config, workload *transferWorkload, return group.Wait() } +func prebuildBlockRequests(ctx context.Context, cfg config, workload *transferWorkload) ([]blockEnvelope, error) { + if cfg.blocks > uint64(maxInt()) { + return nil, fmt.Errorf("prebuild-blocks cannot allocate %d blocks on this platform", cfg.blocks) + } + prebuilt := make([]blockEnvelope, int(cfg.blocks)) + var nextBlock atomic.Uint64 + group, groupCtx := errgroup.WithContext(ctx) + for builderID := 0; builderID < cfg.builders; builderID++ { + group.Go(func() error { + for { + number := nextBlock.Add(1) + if number > cfg.blocks { + return nil + } + request, err := workload.buildBlock(groupCtx, number) + if err != nil { + if groupCtx.Err() != nil { + return nil + } + return err + } + prebuilt[number-1] = blockEnvelope{ + number: number, + request: request, + } + } + }) + } + if err := group.Wait(); err != nil { + return nil, err + } + return prebuilt, nil +} + +func feedPrebuiltBlocks(ctx context.Context, prebuilt []blockEnvelope, out chan<- blockEnvelope, metrics *loadMetrics) error { + for _, block := range prebuilt { + select { + case out <- block: + metrics.recordInput() + case <-ctx.Done(): + return nil + } + } + return nil +} + +func maxInt() int { + return int(^uint(0) >> 1) +} + func executeBlocks( ctx context.Context, workerID int, @@ -767,6 +875,22 @@ func printFinalReport(startedAt time.Time, snapshot metricsSnapshot) { ) } +func printPrebuildReport(elapsed time.Duration, blocks []blockEnvelope, txsPerBlock int) { + seconds := elapsed.Seconds() + if seconds <= 0 { + seconds = 1 + } + txCount := len(blocks) * txsPerBlock + fmt.Printf( + "prebuild complete elapsed=%s blocks=%d txs=%d build_blocks/s=%.2f build_tx/s=%.2f\n", + elapsed.Round(time.Millisecond), + len(blocks), + txCount, + float64(len(blocks))/seconds, + float64(txCount)/seconds, + ) +} + type metricsServer struct { server *http.Server done chan error diff --git a/giga/evmonly/cmd/evmonly-loadtest/main_test.go b/giga/evmonly/cmd/evmonly-loadtest/main_test.go index 43813e30c6..880bf0c307 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main_test.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main_test.go @@ -41,6 +41,25 @@ func TestTransferWorkloadExecutesAgainstEVMOnlyExecutor(t *testing.T) { discardReceiptSink{}.StoreReceipts(request.Context.Number, result.Receipts) } +func TestPrebuildBlocksRequiresBoundedRun(t *testing.T) { + _, err := parseConfig([]string{ + "--prebuild-blocks", + }) + require.ErrorContains(t, err, "prebuild-blocks requires --blocks > 0") +} + +func TestRunPrebuiltBlocks(t *testing.T) { + cfg, err := parseConfig([]string{ + "--metrics-addr=", + "--report-interval=0", + "--prebuild-blocks", + "--blocks=2", + "--txs-per-block=2", + }) + require.NoError(t, err) + require.NoError(t, run(cfg)) +} + func BenchmarkExecuteTransferBlock(b *testing.B) { cfg, err := parseConfig([]string{ "--metrics-addr=", From 4ec8da52cbfd6fcef8d70fc121db60e94e4c07ef Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Tue, 30 Jun 2026 14:11:40 +0800 Subject: [PATCH 3/8] freeze generated loadtest state after prebuild --- giga/evmonly/cmd/evmonly-loadtest/main.go | 38 ++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/giga/evmonly/cmd/evmonly-loadtest/main.go b/giga/evmonly/cmd/evmonly-loadtest/main.go index 949a50fa08..9c79842dc4 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main.go @@ -299,6 +299,7 @@ func runPrebuilt(ctx context.Context, cfg config, state *generatedState, workloa if err != nil { return err } + state.Freeze() prebuildElapsed := time.Since(prebuildStartedAt) printPrebuildReport(prebuildElapsed, prebuilt, cfg.txsPerBlock) @@ -482,7 +483,7 @@ func newTransferWorkload(cfg config, state *generatedState) *transferWorkload { func (w *transferWorkload) buildBlock(ctx context.Context, number uint64) (evmonly.BlockRequest, error) { txs := make([][]byte, w.cfg.txsPerBlock) - for i := range txs { + for i := 0; i < w.cfg.txsPerBlock; i++ { select { case <-ctx.Done(): return evmonly.BlockRequest{}, ctx.Err() @@ -577,6 +578,7 @@ func hashFromSeed(prefix string, index uint64) common.Hash { type generatedState struct { mu sync.RWMutex + frozen atomic.Bool balances map[common.Address]*big.Int nonces map[common.Address]uint64 code map[common.Address][]byte @@ -585,6 +587,8 @@ type generatedState struct { var _ evmonly.StateReader = (*generatedState)(nil) +var frozenZeroBalance = new(big.Int) + func newGeneratedState() *generatedState { return &generatedState{ balances: map[common.Address]*big.Int{}, @@ -594,7 +598,17 @@ func newGeneratedState() *generatedState { } } +func (s *generatedState) Freeze() { + s.frozen.Store(true) +} + func (s *generatedState) GetBalance(addr common.Address) *big.Int { + if s.frozen.Load() { + if balance, ok := s.balances[addr]; ok && balance != nil { + return balance + } + return frozenZeroBalance + } s.mu.RLock() defer s.mu.RUnlock() if balance, ok := s.balances[addr]; ok && balance != nil { @@ -604,6 +618,7 @@ func (s *generatedState) GetBalance(addr common.Address) *big.Int { } func (s *generatedState) SetBalance(addr common.Address, balance *big.Int) { + s.requireMutable() s.mu.Lock() defer s.mu.Unlock() if balance == nil { @@ -614,30 +629,44 @@ func (s *generatedState) SetBalance(addr common.Address, balance *big.Int) { } func (s *generatedState) GetNonce(addr common.Address) uint64 { + if s.frozen.Load() { + return s.nonces[addr] + } s.mu.RLock() defer s.mu.RUnlock() return s.nonces[addr] } func (s *generatedState) SetNonce(addr common.Address, nonce uint64) { + s.requireMutable() s.mu.Lock() defer s.mu.Unlock() s.nonces[addr] = nonce } func (s *generatedState) GetCode(addr common.Address) []byte { + if s.frozen.Load() { + return cloneBytes(s.code[addr]) + } s.mu.RLock() defer s.mu.RUnlock() return cloneBytes(s.code[addr]) } func (s *generatedState) SetCode(addr common.Address, code []byte) { + s.requireMutable() s.mu.Lock() defer s.mu.Unlock() s.code[addr] = cloneBytes(code) } func (s *generatedState) GetState(addr common.Address, key common.Hash) common.Hash { + if s.frozen.Load() { + if accountStorage, ok := s.storage[addr]; ok { + return accountStorage[key] + } + return common.Hash{} + } s.mu.RLock() defer s.mu.RUnlock() if accountStorage, ok := s.storage[addr]; ok { @@ -647,6 +676,7 @@ func (s *generatedState) GetState(addr common.Address, key common.Hash) common.H } func (s *generatedState) SetState(addr common.Address, key common.Hash, value common.Hash) { + s.requireMutable() s.mu.Lock() defer s.mu.Unlock() accountStorage, ok := s.storage[addr] @@ -661,6 +691,12 @@ func (s *generatedState) SetState(addr common.Address, key common.Hash, value co accountStorage[key] = value } +func (s *generatedState) requireMutable() { + if s.frozen.Load() { + panic("generated state is frozen") + } +} + func cloneBytes(v []byte) []byte { if len(v) == 0 { return nil From 71eba7a0fa1c3f5a4d53a1c738d5007c5ff6d387 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Tue, 30 Jun 2026 16:12:34 +0800 Subject: [PATCH 4/8] add evmonly OCC fallback metrics --- giga/evmonly/cmd/evmonly-loadtest/main.go | 92 ++++++++++++++- giga/evmonly/executor_test.go | 16 +++ giga/evmonly/occ.go | 130 ++++++++++++++++++++-- giga/evmonly/types.go | 20 ++++ 4 files changed, 244 insertions(+), 14 deletions(-) diff --git a/giga/evmonly/cmd/evmonly-loadtest/main.go b/giga/evmonly/cmd/evmonly-loadtest/main.go index 9c79842dc4..ae6783e494 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main.go @@ -461,7 +461,7 @@ func executeBlocks( } stateWriter.ApplyChangeSet(result.ChangeSet) receiptSink.StoreReceipts(block.number, result.Receipts) - metrics.recordFinished(len(result.Txs), result.GasUsed) + metrics.recordFinished(len(result.Txs), result.GasUsed, result.OCCStats) } } } @@ -724,12 +724,21 @@ type loadMetrics struct { finishedTxs atomic.Uint64 gasConsumed atomic.Uint64 executionErrors atomic.Uint64 + occAttempts atomic.Uint64 + occFallbacks atomic.Uint64 + occConflicts atomic.Uint64 inputBlocksTotal prometheus.Counter finishedBlocksTotal prometheus.Counter finishedTxsTotal prometheus.Counter gasConsumedTotal prometheus.Counter executionErrorsTotal prometheus.Counter + occAttemptsTotal prometheus.Counter + occFallbacksTotal prometheus.Counter + occConflictsTotal prometheus.Counter + + occFallbackReasonTotal *prometheus.CounterVec + occConflictTotal *prometheus.CounterVec inputBlockRate prometheus.Gauge finishedBlockRate prometheus.Gauge @@ -745,6 +754,9 @@ type metricsSnapshot struct { finishedTxs uint64 gasConsumed uint64 executionErrors uint64 + occAttempts uint64 + occFallbacks uint64 + occConflicts uint64 } func newLoadMetrics(registry *prometheus.Registry) *loadMetrics { @@ -769,6 +781,26 @@ func newLoadMetrics(registry *prometheus.Registry) *loadMetrics { Name: "evmonly_loadtest_execution_errors_total", Help: "Total block execution errors returned by the EVM-only executor.", }), + occAttemptsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "evmonly_loadtest_occ_attempts_total", + Help: "Total blocks executed with optimistic concurrency control.", + }), + occFallbacksTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "evmonly_loadtest_occ_fallbacks_total", + Help: "Total OCC blocks that fell back to sequential execution.", + }), + occConflictsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "evmonly_loadtest_occ_conflicts_total", + Help: "Total OCC conflict accesses observed before sequential fallback.", + }), + occFallbackReasonTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "evmonly_loadtest_occ_fallback_reasons_total", + Help: "OCC fallback count by reason.", + }, []string{"reason"}), + occConflictTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "evmonly_loadtest_occ_conflict_keys_total", + Help: "OCC conflict accesses by access type, state kind, address, and slot.", + }, []string{"access", "kind", "address", "slot"}), inputBlockRate: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "evmonly_loadtest_block_input_throughput", Help: "Most recent measured block input throughput in blocks per second.", @@ -796,6 +828,11 @@ func newLoadMetrics(registry *prometheus.Registry) *loadMetrics { m.finishedTxsTotal, m.gasConsumedTotal, m.executionErrorsTotal, + m.occAttemptsTotal, + m.occFallbacksTotal, + m.occConflictsTotal, + m.occFallbackReasonTotal, + m.occConflictTotal, m.inputBlockRate, m.finishedBlockRate, m.txRate, @@ -810,13 +847,51 @@ func (m *loadMetrics) recordInput() { m.inputBlocksTotal.Inc() } -func (m *loadMetrics) recordFinished(txCount int, gasUsed uint64) { +func (m *loadMetrics) recordFinished(txCount int, gasUsed uint64, occStats evmonly.OCCStats) { m.finishedBlocks.Add(1) m.finishedTxs.Add(uint64(txCount)) m.gasConsumed.Add(gasUsed) m.finishedBlocksTotal.Inc() m.finishedTxsTotal.Add(float64(txCount)) m.gasConsumedTotal.Add(float64(gasUsed)) + m.recordOCC(occStats) +} + +func (m *loadMetrics) recordOCC(stats evmonly.OCCStats) { + if !stats.Attempted { + return + } + m.occAttempts.Add(1) + m.occAttemptsTotal.Inc() + if stats.Fallback { + reason := stats.FallbackReason + if reason == "" { + reason = "unknown" + } + m.occFallbacks.Add(1) + m.occFallbacksTotal.Inc() + m.occFallbackReasonTotal.WithLabelValues(reason).Inc() + } + if stats.ConflictCount == 0 { + return + } + m.occConflicts.Add(stats.ConflictCount) + m.occConflictsTotal.Add(float64(stats.ConflictCount)) + for _, conflict := range stats.ConflictSamples { + m.occConflictTotal.WithLabelValues( + conflict.Access, + conflict.Kind, + conflict.Address.Hex(), + conflictSlotLabel(conflict), + ).Add(float64(conflict.Count)) + } +} + +func conflictSlotLabel(conflict evmonly.OCCConflictCount) string { + if conflict.Kind != "storage" { + return "" + } + return conflict.Slot.Hex() } func (m *loadMetrics) recordExecutionError() { @@ -832,6 +907,9 @@ func (m *loadMetrics) snapshot() metricsSnapshot { finishedTxs: m.finishedTxs.Load(), gasConsumed: m.gasConsumed.Load(), executionErrors: m.executionErrors.Load(), + occAttempts: m.occAttempts.Load(), + occFallbacks: m.occFallbacks.Load(), + occConflicts: m.occConflicts.Load(), } } @@ -874,7 +952,7 @@ func reportLoop(ctx context.Context, interval time.Duration, metrics *loadMetric continue } fmt.Printf( - "input_blocks/s=%.2f finished_blocks/s=%.2f tx/s=%.2f gas/s=%.2f queued_blocks=%d totals(input_blocks=%d finished_blocks=%d txs=%d gas=%d errors=%d)\n", + "input_blocks/s=%.2f finished_blocks/s=%.2f tx/s=%.2f gas/s=%.2f queued_blocks=%d totals(input_blocks=%d finished_blocks=%d txs=%d gas=%d errors=%d occ_attempts=%d occ_fallbacks=%d occ_conflicts=%d)\n", float64(curr.inputBlocks-prev.inputBlocks)/elapsed, float64(curr.finishedBlocks-prev.finishedBlocks)/elapsed, float64(curr.finishedTxs-prev.finishedTxs)/elapsed, @@ -885,6 +963,9 @@ func reportLoop(ctx context.Context, interval time.Duration, metrics *loadMetric curr.finishedTxs, curr.gasConsumed, curr.executionErrors, + curr.occAttempts, + curr.occFallbacks, + curr.occConflicts, ) prev = curr } @@ -897,13 +978,16 @@ func printFinalReport(startedAt time.Time, snapshot metricsSnapshot) { elapsed = 1 } fmt.Printf( - "complete elapsed=%s input_blocks=%d finished_blocks=%d txs=%d gas=%d errors=%d avg_input_blocks/s=%.2f avg_finished_blocks/s=%.2f avg_tx/s=%.2f avg_gas/s=%.2f\n", + "complete elapsed=%s input_blocks=%d finished_blocks=%d txs=%d gas=%d errors=%d occ_attempts=%d occ_fallbacks=%d occ_conflicts=%d avg_input_blocks/s=%.2f avg_finished_blocks/s=%.2f avg_tx/s=%.2f avg_gas/s=%.2f\n", snapshot.at.Sub(startedAt).Round(time.Millisecond), snapshot.inputBlocks, snapshot.finishedBlocks, snapshot.finishedTxs, snapshot.gasConsumed, snapshot.executionErrors, + snapshot.occAttempts, + snapshot.occFallbacks, + snapshot.occConflicts, float64(snapshot.inputBlocks)/elapsed, float64(snapshot.finishedBlocks)/elapsed, float64(snapshot.finishedTxs)/elapsed, diff --git a/giga/evmonly/executor_test.go b/giga/evmonly/executor_test.go index 7487148829..d32a295186 100644 --- a/giga/evmonly/executor_test.go +++ b/giga/evmonly/executor_test.go @@ -121,6 +121,9 @@ func TestExecutorOCCNonConflictingTransfersMatchSequential(t *testing.T) { require.Equal(t, seqResult.GasUsed, occResult.GasUsed) require.Len(t, occResult.Txs, txCount) require.Len(t, occResult.Receipts, txCount) + require.True(t, occResult.OCCStats.Attempted) + require.False(t, occResult.OCCStats.Fallback) + require.Zero(t, occResult.OCCStats.ConflictCount) for i := range txCount { require.Equal(t, seqResult.Txs[i].Hash, occResult.Txs[i].Hash) require.Equal(t, seqResult.Txs[i].Status, occResult.Txs[i].Status) @@ -162,6 +165,19 @@ func TestExecutorOCCConflictingTransfersMatchSequential(t *testing.T) { seqState.ApplyChangeSet(seqResult.ChangeSet) occState.ApplyChangeSet(occResult.ChangeSet) require.Equal(t, seqResult.GasUsed, occResult.GasUsed) + require.True(t, occResult.OCCStats.Attempted) + require.True(t, occResult.OCCStats.Fallback) + require.Equal(t, "conflict", occResult.OCCStats.FallbackReason) + require.Greater(t, occResult.OCCStats.ConflictCount, uint64(0)) + require.NotEmpty(t, occResult.OCCStats.ConflictSamples) + foundRecipientBalanceConflict := false + for _, conflict := range occResult.OCCStats.ConflictSamples { + if conflict.Kind == "balance" && conflict.Address == recipient { + foundRecipientBalanceConflict = true + require.Greater(t, conflict.Count, uint64(0)) + } + } + require.True(t, foundRecipientBalanceConflict) require.Equal(t, seqState.GetBalance(recipient), occState.GetBalance(recipient)) require.Equal(t, big.NewInt(int64(txCount*3)), occState.GetBalance(recipient)) } diff --git a/giga/evmonly/occ.go b/giga/evmonly/occ.go index f37e6b71b7..3bc2af968e 100644 --- a/giga/evmonly/occ.go +++ b/giga/evmonly/occ.go @@ -89,10 +89,18 @@ func (e *Executor) executeBlockOCC(ctx context.Context, req BlockRequest) (*Bloc if err := group.Wait(); err != nil { return nil, err } - if !validateOCCResults(results, gasLimit) { - return e.executeBlockSequential(ctx, req) + validation := validateOCCResults(results, gasLimit) + if !validation.valid { + result, err := e.executeBlockSequential(ctx, req) + if err != nil { + return nil, err + } + result.OCCStats = validation.stats(true) + return result, nil } - return mergeOCCResults(results), nil + result := mergeOCCResults(results) + result.OCCStats = validation.stats(false) + return result, nil } func occChunkSize(txCount int, workers int) int { @@ -155,23 +163,125 @@ func (e *Executor) executeTxSpeculative( }, nil } -func validateOCCResults(results []occTxExecution, gasLimit uint64) bool { +type occValidationResult struct { + valid bool + fallbackReason string + conflictCount uint64 + conflicts map[occConflictAggregationKey]uint64 +} + +type occConflictAggregationKey struct { + access string + kind stateAccessKind + address common.Address + slot common.Hash +} + +const ( + occFallbackReasonConflict = "conflict" + occFallbackReasonGasLimit = "gas_limit" + occFallbackReasonGasOverflow = "gas_overflow" +) + +func validateOCCResults(results []occTxExecution, gasLimit uint64) occValidationResult { writes := newStateAccessIndex() var totalGas uint64 + validation := occValidationResult{valid: true} for _, result := range results { if result.gasUsed > math.MaxUint64-totalGas { - return false + validation.valid = false + validation.fallbackReason = occFallbackReasonGasOverflow + return validation } totalGas += result.gasUsed if totalGas > gasLimit { - return false - } - if writes.conflictsWithAny(result.readSet) || writes.conflictsWithAny(result.writeSet) { - return false + validation.valid = false + validation.fallbackReason = occFallbackReasonGasLimit + return validation } + validation.addConflicts("read", writes, result.readSet) + validation.addConflicts("write", writes, result.writeSet) writes.addAll(result.writeSet) } - return true + if validation.conflictCount > 0 { + validation.valid = false + validation.fallbackReason = occFallbackReasonConflict + } + return validation +} + +func (r *occValidationResult) addConflicts(access string, writes *stateAccessIndex, set map[stateAccessKey]struct{}) { + for key := range set { + if !writes.conflictsWith(key) { + continue + } + if r.conflicts == nil { + r.conflicts = map[occConflictAggregationKey]uint64{} + } + r.conflictCount++ + r.conflicts[occConflictAggregationKey{ + access: access, + kind: key.kind, + address: key.address, + slot: key.slot, + }]++ + } +} + +func (r occValidationResult) stats(fallback bool) OCCStats { + stats := OCCStats{ + Attempted: true, + Fallback: fallback, + FallbackReason: r.fallbackReason, + ConflictCount: r.conflictCount, + } + if len(r.conflicts) == 0 { + return stats + } + keys := make([]occConflictAggregationKey, 0, len(r.conflicts)) + for key := range r.conflicts { + keys = append(keys, key) + } + sort.Slice(keys, func(i, j int) bool { + left, right := keys[i], keys[j] + if left.access != right.access { + return left.access < right.access + } + if left.kind != right.kind { + return left.kind < right.kind + } + if cmp := bytes.Compare(left.address[:], right.address[:]); cmp != 0 { + return cmp < 0 + } + return bytes.Compare(left.slot[:], right.slot[:]) < 0 + }) + for _, key := range keys { + stats.ConflictSamples = append(stats.ConflictSamples, OCCConflictCount{ + Access: key.access, + Kind: key.kind.String(), + Address: key.address, + Slot: key.slot, + Count: r.conflicts[key], + }) + } + return stats +} + +func (k stateAccessKind) String() string { + switch k { + case stateAccessAccount: + return "account" + case stateAccessBalance: + return "balance" + case stateAccessNonce: + return "nonce" + case stateAccessCode: + return "code" + case stateAccessStorage: + return "storage" + default: + return "unknown" + } } func mergeOCCResults(results []occTxExecution) *BlockResult { diff --git a/giga/evmonly/types.go b/giga/evmonly/types.go index 24a9f9ba01..8ee6726961 100644 --- a/giga/evmonly/types.go +++ b/giga/evmonly/types.go @@ -43,10 +43,30 @@ type BlockResult struct { Txs []TxResult Receipts ethtypes.Receipts GasUsed uint64 + OCCStats OCCStats } type ValidatorUpdate = precompiles.ValidatorUpdate +// OCCStats reports optimistic concurrency control behavior for a block. +type OCCStats struct { + Attempted bool + Fallback bool + FallbackReason string + ConflictCount uint64 + ConflictSamples []OCCConflictCount +} + +// OCCConflictCount aggregates conflicts by the access key that forced OCC to +// fall back to sequential execution. +type OCCConflictCount struct { + Access string + Kind string + Address common.Address + Slot common.Hash + Count uint64 +} + // StateChangeSet is the deterministic EVM-native state output for a block. // Values are post-block values, not deltas. type StateChangeSet struct { From b7eaa8aa52a6fe696f7c7720c9e7af1a5f71c103 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Wed, 1 Jul 2026 11:33:33 +0800 Subject: [PATCH 5/8] add ERC20 transfer loadtest workload --- giga/evmonly/cmd/evmonly-loadtest/main.go | 167 ++++++++++++++++-- .../evmonly/cmd/evmonly-loadtest/main_test.go | 40 +++++ giga/evmonly/occ.go | 5 +- 3 files changed, 196 insertions(+), 16 deletions(-) diff --git a/giga/evmonly/cmd/evmonly-loadtest/main.go b/giga/evmonly/cmd/evmonly-loadtest/main.go index ae6783e494..b74f1456e1 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main.go @@ -37,13 +37,17 @@ const ( defaultMinGasPriceWei = "1000000000" defaultSenderBalance = "1000000000000000000" defaultTransferValue = "1" + defaultERC20Contract = "0x000000000000000000000000000000000000e20c" defaultMetricsAddr = "127.0.0.1:9698" defaultReportInterval = 5 * time.Second defaultQueueSize = 64 defaultTxGasLimit = 21_000 + defaultERC20TxGasLimit = 100_000 defaultTxsPerBlock = 1_000 defaultWorkerCount = 1 defaultCoinbaseAddress = "0x00000000000000000000000000000000000000cb" + workloadTransfer = "transfer" + workloadERC20Transfer = "erc20-transfer" ) type config struct { @@ -65,6 +69,7 @@ type config struct { txGasLimit uint64 blockGasLimit uint64 coinbase common.Address + erc20Contract common.Address fixedRecipient *common.Address disableGasPriceRule bool prebuildBlocks bool @@ -96,8 +101,9 @@ func parseConfig(args []string) (config, error) { gasPrice := fs.String("gas-price-wei", defaultGasPriceWei, "legacy transaction gas price in wei") minGasPrice := fs.String("min-gas-price-wei", defaultMinGasPriceWei, "executor minimum gas price in wei") senderBalance := fs.String("sender-balance-wei", defaultSenderBalance, "generated sender genesis balance in wei") - transferValue := fs.String("transfer-value-wei", defaultTransferValue, "wei transferred by each generated transaction") + transferValue := fs.String("transfer-value-wei", defaultTransferValue, "wei or token units transferred by each generated transaction") coinbase := fs.String("coinbase", defaultCoinbaseAddress, "block coinbase address") + erc20Contract := fs.String("erc20-contract", defaultERC20Contract, "EVM address for the generated ERC20 transfer contract") recipient := fs.String("recipient", "", "optional fixed transfer recipient; empty creates one recipient per tx") fs.Uint64Var(&cfg.blocks, "blocks", 0, "number of blocks to feed; 0 runs until interrupted") @@ -109,7 +115,7 @@ func parseConfig(args []string) (config, error) { fs.Float64Var(&cfg.targetBlocksPerSec, "target-blocks-per-sec", 0, "input block rate cap; 0 means unlimited") fs.DurationVar(&cfg.reportInterval, "report-interval", defaultReportInterval, "stdout and rate-gauge reporting interval; 0 disables periodic reports") fs.StringVar(&cfg.metricsAddr, "metrics-addr", defaultMetricsAddr, "Prometheus listen address; empty disables HTTP metrics") - fs.StringVar(&cfg.workload, "workload", "transfer", "workload type; currently transfer") + fs.StringVar(&cfg.workload, "workload", workloadTransfer, "workload type: transfer or erc20-transfer") fs.Uint64Var(&cfg.txGasLimit, "tx-gas-limit", defaultTxGasLimit, "gas limit for each generated transaction") fs.Uint64Var(&cfg.blockGasLimit, "block-gas-limit", 0, "block gas limit; 0 lets the executor use its maximum") fs.BoolVar(&cfg.disableGasPriceRule, "disable-gas-price-rule", false, "disable the executor min-gas-price validity rule") @@ -145,10 +151,23 @@ func parseConfig(args []string) (config, error) { addr := common.HexToAddress(*recipient) cfg.fixedRecipient = &addr } + if !common.IsHexAddress(*erc20Contract) { + return config{}, fmt.Errorf("erc20-contract must be a hex EVM address") + } + cfg.erc20Contract = common.HexToAddress(*erc20Contract) cfg.workload = strings.ToLower(strings.TrimSpace(cfg.workload)) - if cfg.workload != "transfer" { + if cfg.workload != workloadTransfer && cfg.workload != workloadERC20Transfer { return config{}, fmt.Errorf("unsupported workload %q", cfg.workload) } + txGasLimitSet := false + fs.Visit(func(f *flag.Flag) { + if f.Name == "tx-gas-limit" { + txGasLimitSet = true + } + }) + if cfg.workload == workloadERC20Transfer && !txGasLimitSet { + cfg.txGasLimit = defaultERC20TxGasLimit + } if cfg.txsPerBlock <= 0 { return config{}, fmt.Errorf("txs-per-block must be positive") } @@ -173,6 +192,9 @@ func parseConfig(args []string) (config, error) { if cfg.txGasLimit == 0 { return config{}, fmt.Errorf("tx-gas-limit must be positive") } + if cfg.transferValue.BitLen() > 256 { + return config{}, fmt.Errorf("transfer-value-wei must fit in uint256") + } if cfg.prebuildBlocks && cfg.blocks == 0 { return config{}, fmt.Errorf("prebuild-blocks requires --blocks > 0") } @@ -180,9 +202,13 @@ func parseConfig(args []string) (config, error) { return config{}, fmt.Errorf("gas-price-wei must be greater than or equal to min-gas-price-wei unless disable-gas-price-rule is set") } requiredBalance := new(big.Int).Mul(new(big.Int).SetUint64(cfg.txGasLimit), cfg.gasPrice) - requiredBalance.Add(requiredBalance, cfg.transferValue) + requiredBalanceReason := "max gas cost" + if cfg.workload == workloadTransfer { + requiredBalance.Add(requiredBalance, cfg.transferValue) + requiredBalanceReason = "transfer value plus max gas cost" + } if cfg.senderBalance.Cmp(requiredBalance) < 0 { - return config{}, fmt.Errorf("sender-balance-wei must cover transfer value plus max gas cost: need at least %s", requiredBalance.String()) + return config{}, fmt.Errorf("sender-balance-wei must cover %s: need at least %s", requiredBalanceReason, requiredBalance.String()) } return cfg, nil } @@ -230,7 +256,10 @@ func run(cfg config) error { defer stop() state := newGeneratedState() - workload := newTransferWorkload(cfg, state) + workload, err := newWorkload(cfg, state) + if err != nil { + return err + } registry := prometheus.NewRegistry() metrics := newLoadMetrics(registry) @@ -255,7 +284,22 @@ func run(cfg config) error { return runStreaming(ctx, cfg, state, workload, metrics) } -func runStreaming(ctx context.Context, cfg config, state *generatedState, workload *transferWorkload, metrics *loadMetrics) error { +type blockWorkload interface { + buildBlock(context.Context, uint64) (evmonly.BlockRequest, error) +} + +func newWorkload(cfg config, state *generatedState) (blockWorkload, error) { + switch cfg.workload { + case workloadTransfer: + return newTransferWorkload(cfg, state), nil + case workloadERC20Transfer: + return newERC20TransferWorkload(cfg, state), nil + default: + return nil, fmt.Errorf("unsupported workload %q", cfg.workload) + } +} + +func runStreaming(ctx context.Context, cfg config, state *generatedState, workload blockWorkload, metrics *loadMetrics) error { blocks := make(chan blockEnvelope, cfg.queueSize) reportCtx, stopReporter := context.WithCancel(ctx) reportDone := make(chan struct{}) @@ -293,7 +337,7 @@ func runStreaming(ctx context.Context, cfg config, state *generatedState, worklo return err } -func runPrebuilt(ctx context.Context, cfg config, state *generatedState, workload *transferWorkload, metrics *loadMetrics) error { +func runPrebuilt(ctx context.Context, cfg config, state *generatedState, workload blockWorkload, metrics *loadMetrics) error { prebuildStartedAt := time.Now() prebuilt, err := prebuildBlockRequests(ctx, cfg, workload) if err != nil { @@ -340,7 +384,7 @@ func runPrebuilt(ctx context.Context, cfg config, state *generatedState, workloa return err } -func produceBlocks(ctx context.Context, cfg config, workload *transferWorkload, out chan<- blockEnvelope, metrics *loadMetrics) error { +func produceBlocks(ctx context.Context, cfg config, workload blockWorkload, out chan<- blockEnvelope, metrics *loadMetrics) error { var limiter *rate.Limiter if cfg.targetBlocksPerSec > 0 { burst := int(math.Ceil(cfg.targetBlocksPerSec)) @@ -384,7 +428,7 @@ func produceBlocks(ctx context.Context, cfg config, workload *transferWorkload, return group.Wait() } -func prebuildBlockRequests(ctx context.Context, cfg config, workload *transferWorkload) ([]blockEnvelope, error) { +func prebuildBlockRequests(ctx context.Context, cfg config, workload blockWorkload) ([]blockEnvelope, error) { if cfg.blocks > uint64(maxInt()) { return nil, fmt.Errorf("prebuild-blocks cannot allocate %d blocks on this platform", cfg.blocks) } @@ -498,7 +542,7 @@ func (w *transferWorkload) buildBlock(ctx context.Context, number uint64) (evmon txs[i] = raw } return evmonly.BlockRequest{ - Context: w.blockContext(number), + Context: blockContext(w.cfg, number), Txs: txs, }, nil } @@ -535,14 +579,107 @@ func (w *transferWorkload) recipient(accountIndex uint64) common.Address { return addressFromSeed("sei-evmonly-loadtest-recipient", accountIndex) } -func (w *transferWorkload) blockContext(number uint64) evmonly.BlockContext { +type erc20TransferWorkload struct { + cfg config + state *generatedState + signer ethtypes.Signer + accountCursor atomic.Uint64 +} + +var ( + erc20TransferSelector = [4]byte{0xa9, 0x05, 0x9c, 0xbb} + // Minimal ERC20-like runtime for transfer(address,uint256), with balances at + // storage slot 0 and a standard Transfer(address,address,uint256) log. + erc20TransferRuntimeCode = common.FromHex("0x60003560e01c63a9059cbb1460145760006000fd5b60243560043533600052600060205260406000208054831060805780548303905580600052600060205260406000208054830190558160005280337fddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef60206000a3600160005260206000f35b60006000fd") +) + +func newERC20TransferWorkload(cfg config, state *generatedState) *erc20TransferWorkload { + state.SetCode(cfg.erc20Contract, erc20TransferRuntimeCode) + return &erc20TransferWorkload{ + cfg: cfg, + state: state, + signer: ethtypes.LatestSignerForChainID(cfg.chainID), + } +} + +func (w *erc20TransferWorkload) buildBlock(ctx context.Context, number uint64) (evmonly.BlockRequest, error) { + txs := make([][]byte, w.cfg.txsPerBlock) + for i := 0; i < w.cfg.txsPerBlock; i++ { + select { + case <-ctx.Done(): + return evmonly.BlockRequest{}, ctx.Err() + default: + } + accountIndex := w.accountCursor.Add(1) + raw, sender, err := w.buildTransferTx(accountIndex) + if err != nil { + return evmonly.BlockRequest{}, err + } + w.state.SetBalance(sender, w.cfg.senderBalance) + w.state.SetState(w.cfg.erc20Contract, erc20BalanceSlot(sender), common.BigToHash(w.cfg.transferValue)) + txs[i] = raw + } + return evmonly.BlockRequest{ + Context: blockContext(w.cfg, number), + Txs: txs, + }, nil +} + +func (w *erc20TransferWorkload) buildTransferTx(accountIndex uint64) ([]byte, common.Address, error) { + key, err := deterministicPrivateKey(accountIndex) + if err != nil { + return nil, common.Address{}, err + } + sender := crypto.PubkeyToAddress(key.PublicKey) + recipient := w.recipient(accountIndex) + tx := ethtypes.NewTx(ðtypes.LegacyTx{ + Nonce: 0, + GasPrice: new(big.Int).Set(w.cfg.gasPrice), + Gas: w.cfg.txGasLimit, + To: &w.cfg.erc20Contract, + Value: new(big.Int), + Data: erc20TransferCalldata(recipient, w.cfg.transferValue), + }) + signed, err := ethtypes.SignTx(tx, w.signer, key) + if err != nil { + return nil, common.Address{}, err + } + raw, err := signed.MarshalBinary() + if err != nil { + return nil, common.Address{}, err + } + return raw, sender, nil +} + +func (w *erc20TransferWorkload) recipient(accountIndex uint64) common.Address { + if w.cfg.fixedRecipient != nil { + return *w.cfg.fixedRecipient + } + return addressFromSeed("sei-evmonly-loadtest-erc20-recipient", accountIndex) +} + +func erc20TransferCalldata(recipient common.Address, amount *big.Int) []byte { + data := make([]byte, 4+32+32) + copy(data[:4], erc20TransferSelector[:]) + copy(data[4+12:36], recipient.Bytes()) + amount.FillBytes(data[36:68]) + return data +} + +func erc20BalanceSlot(owner common.Address) common.Hash { + var encoded [64]byte + copy(encoded[12:32], owner.Bytes()) + return crypto.Keccak256Hash(encoded[:]) +} + +func blockContext(cfg config, number uint64) evmonly.BlockContext { return evmonly.BlockContext{ Number: number, Time: uint64(time.Now().Unix()), - GasLimit: w.cfg.blockGasLimit, - ChainID: new(big.Int).Set(w.cfg.chainID), + GasLimit: cfg.blockGasLimit, + ChainID: new(big.Int).Set(cfg.chainID), BaseFee: big.NewInt(0), - Coinbase: w.cfg.coinbase, + Coinbase: cfg.coinbase, ParentHash: hashFromSeed("sei-evmonly-loadtest-parent", number-1), BlockHash: hashFromSeed("sei-evmonly-loadtest-block", number), PrevRandao: hashFromSeed("sei-evmonly-loadtest-randao", number), diff --git a/giga/evmonly/cmd/evmonly-loadtest/main_test.go b/giga/evmonly/cmd/evmonly-loadtest/main_test.go index 880bf0c307..dae649b3bc 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main_test.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main_test.go @@ -41,6 +41,46 @@ func TestTransferWorkloadExecutesAgainstEVMOnlyExecutor(t *testing.T) { discardReceiptSink{}.StoreReceipts(request.Context.Number, result.Receipts) } +func TestERC20TransferWorkloadExecutesAgainstEVMOnlyExecutor(t *testing.T) { + cfg, err := parseConfig([]string{ + "--metrics-addr=", + "--workload=erc20-transfer", + "--txs-per-block=4", + "--gas-price-wei=0", + "--min-gas-price-wei=0", + }) + require.NoError(t, err) + require.Equal(t, uint64(defaultERC20TxGasLimit), cfg.txGasLimit) + + state := newGeneratedState() + workload, err := newWorkload(cfg, state) + require.NoError(t, err) + request, err := workload.buildBlock(context.Background(), 1) + require.NoError(t, err) + + executor := evmonly.NewExecutor(evmonly.Config{ + MinGasPrice: cfg.minGasPrice, + OCCWorkers: cfg.executorWorkers, + }, evmonly.WithState(state)) + result, err := executor.ExecuteBlock(context.Background(), request) + require.NoError(t, err) + + require.Len(t, result.Txs, cfg.txsPerBlock) + require.Len(t, result.Receipts, cfg.txsPerBlock) + require.NotEmpty(t, result.ChangeSet.Storage) + require.True(t, result.OCCStats.Attempted) + require.False(t, result.OCCStats.Fallback) + for _, tx := range result.Txs { + require.Equal(t, ethtypes.ReceiptStatusSuccessful, tx.Status) + require.NoError(t, tx.Err) + require.Greater(t, tx.GasUsed, uint64(21_000)) + require.Len(t, tx.Logs, 1) + } + for _, receipt := range result.Receipts { + require.Len(t, receipt.Logs, 1) + } +} + func TestPrebuildBlocksRequiresBoundedRun(t *testing.T) { _, err := parseConfig([]string{ "--prebuild-blocks", diff --git a/giga/evmonly/occ.go b/giga/evmonly/occ.go index 3bc2af968e..fbf1377996 100644 --- a/giga/evmonly/occ.go +++ b/giga/evmonly/occ.go @@ -345,7 +345,10 @@ func (i *stateAccessIndex) conflictsWith(key stateAccessKey) bool { func (i *stateAccessIndex) addAll(set map[stateAccessKey]struct{}) { for key := range set { i.exact[key] = struct{}{} - i.touched[key.address] = struct{}{} + // Exist/Empty account reads depend on account metadata, not storage slots. + if key.kind != stateAccessStorage { + i.touched[key.address] = struct{}{} + } if key.kind == stateAccessAccount { i.account[key.address] = struct{}{} } From 4d62f9fe6ff140d51ba71a579594278a52922a82 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Wed, 1 Jul 2026 11:57:08 +0800 Subject: [PATCH 6/8] verify ERC20 loadtest token balances --- .../evmonly/cmd/evmonly-loadtest/main_test.go | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/giga/evmonly/cmd/evmonly-loadtest/main_test.go b/giga/evmonly/cmd/evmonly-loadtest/main_test.go index dae649b3bc..defdc3bd44 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main_test.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main_test.go @@ -4,7 +4,9 @@ import ( "context" "testing" + "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/require" "github.com/sei-protocol/sei-chain/giga/evmonly" @@ -79,6 +81,40 @@ func TestERC20TransferWorkloadExecutesAgainstEVMOnlyExecutor(t *testing.T) { for _, receipt := range result.Receipts { require.Len(t, receipt.Logs, 1) } + + applyGeneratedStateChangeSet(state, result.ChangeSet) + transferWorkload := workload.(*erc20TransferWorkload) + for i := uint64(1); i <= uint64(cfg.txsPerBlock); i++ { + key, err := deterministicPrivateKey(i) + require.NoError(t, err) + sender := crypto.PubkeyToAddress(key.PublicKey) + recipient := transferWorkload.recipient(i) + require.Equal(t, common.Hash{}, state.GetState(cfg.erc20Contract, erc20BalanceSlot(sender))) + require.Equal(t, common.BigToHash(cfg.transferValue), state.GetState(cfg.erc20Contract, erc20BalanceSlot(recipient))) + } +} + +func applyGeneratedStateChangeSet(state *generatedState, changeSet evmonly.StateChangeSet) { + for _, change := range changeSet.Balances { + state.SetBalance(change.Address, change.Balance) + } + for _, change := range changeSet.Nonces { + state.SetNonce(change.Address, change.Nonce) + } + for _, change := range changeSet.Code { + if change.Delete { + state.SetCode(change.Address, nil) + } else { + state.SetCode(change.Address, change.Code) + } + } + for _, change := range changeSet.Storage { + value := change.Value + if change.Delete { + value = common.Hash{} + } + state.SetState(change.Address, change.Key, value) + } } func TestPrebuildBlocksRequiresBoundedRun(t *testing.T) { From 552235cf61a9af33823079df610d3713b30980c6 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Wed, 1 Jul 2026 12:07:59 +0800 Subject: [PATCH 7/8] add persistent loadtest result sink --- giga/evmonly/cmd/evmonly-loadtest/main.go | 235 +++++++++++++++++- .../evmonly/cmd/evmonly-loadtest/main_test.go | 45 ++++ 2 files changed, 268 insertions(+), 12 deletions(-) diff --git a/giga/evmonly/cmd/evmonly-loadtest/main.go b/giga/evmonly/cmd/evmonly-loadtest/main.go index b74f1456e1..36b3a08c7e 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "context" "crypto/ecdsa" "encoding/binary" @@ -13,6 +14,7 @@ import ( "net/http" "os" "os/signal" + "path/filepath" "runtime" "strings" "sync" @@ -23,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/sync/errgroup" @@ -44,10 +47,13 @@ const ( defaultTxGasLimit = 21_000 defaultERC20TxGasLimit = 100_000 defaultTxsPerBlock = 1_000 + defaultPersistBuffer = 4 << 20 defaultWorkerCount = 1 defaultCoinbaseAddress = "0x00000000000000000000000000000000000000cb" workloadTransfer = "transfer" workloadERC20Transfer = "erc20-transfer" + resultSinkDiscard = "discard" + resultSinkFile = "file" ) type config struct { @@ -60,6 +66,10 @@ type config struct { targetBlocksPerSec float64 reportInterval time.Duration metricsAddr string + resultSink string + persistDir string + persistSync bool + persistBufferSize int workload string chainID *big.Int gasPrice *big.Int @@ -115,6 +125,10 @@ func parseConfig(args []string) (config, error) { fs.Float64Var(&cfg.targetBlocksPerSec, "target-blocks-per-sec", 0, "input block rate cap; 0 means unlimited") fs.DurationVar(&cfg.reportInterval, "report-interval", defaultReportInterval, "stdout and rate-gauge reporting interval; 0 disables periodic reports") fs.StringVar(&cfg.metricsAddr, "metrics-addr", defaultMetricsAddr, "Prometheus listen address; empty disables HTTP metrics") + fs.StringVar(&cfg.resultSink, "result-sink", resultSinkDiscard, "result sink mode: discard or file") + fs.StringVar(&cfg.persistDir, "persist-dir", "", "directory for --result-sink=file append-only changeset and receipt files") + fs.BoolVar(&cfg.persistSync, "persist-sync", false, "fsync persistent result files after every block; file sink always flushes each block") + fs.IntVar(&cfg.persistBufferSize, "persist-buffer-size", defaultPersistBuffer, "buffer size in bytes for --result-sink=file") fs.StringVar(&cfg.workload, "workload", workloadTransfer, "workload type: transfer or erc20-transfer") fs.Uint64Var(&cfg.txGasLimit, "tx-gas-limit", defaultTxGasLimit, "gas limit for each generated transaction") fs.Uint64Var(&cfg.blockGasLimit, "block-gas-limit", 0, "block gas limit; 0 lets the executor use its maximum") @@ -189,6 +203,16 @@ func parseConfig(args []string) (config, error) { if cfg.reportInterval < 0 { return config{}, fmt.Errorf("report-interval must be non-negative") } + cfg.resultSink = strings.ToLower(strings.TrimSpace(cfg.resultSink)) + if cfg.resultSink != resultSinkDiscard && cfg.resultSink != resultSinkFile { + return config{}, fmt.Errorf("unsupported result-sink %q", cfg.resultSink) + } + if cfg.persistBufferSize <= 0 { + return config{}, fmt.Errorf("persist-buffer-size must be positive") + } + if cfg.resultSink == resultSinkFile && strings.TrimSpace(cfg.persistDir) == "" { + return config{}, fmt.Errorf("persist-dir is required when result-sink=file") + } if cfg.txGasLimit == 0 { return config{}, fmt.Errorf("tx-gas-limit must be positive") } @@ -262,6 +286,10 @@ func run(cfg config) error { } registry := prometheus.NewRegistry() metrics := newLoadMetrics(registry) + sinks, err := newResultSinks(cfg) + if err != nil { + return err + } var server *metricsServer if cfg.metricsAddr != "" { @@ -278,10 +306,20 @@ func run(cfg config) error { fmt.Printf("metrics listening on http://%s/metrics\n", cfg.metricsAddr) } + var runErr error if cfg.prebuildBlocks { - return runPrebuilt(ctx, cfg, state, workload, metrics) + runErr = runPrebuilt(ctx, cfg, state, workload, sinks, metrics) + } else { + runErr = runStreaming(ctx, cfg, state, workload, sinks, metrics) + } + if closeErr := sinks.Close(); closeErr != nil { + if runErr != nil { + fmt.Fprintf(os.Stderr, "evmonly-loadtest: result sink close: %v\n", closeErr) + return runErr + } + return closeErr } - return runStreaming(ctx, cfg, state, workload, metrics) + return runErr } type blockWorkload interface { @@ -299,7 +337,7 @@ func newWorkload(cfg config, state *generatedState) (blockWorkload, error) { } } -func runStreaming(ctx context.Context, cfg config, state *generatedState, workload blockWorkload, metrics *loadMetrics) error { +func runStreaming(ctx context.Context, cfg config, state *generatedState, workload blockWorkload, sinks *resultSinks, metrics *loadMetrics) error { blocks := make(chan blockEnvelope, cfg.queueSize) reportCtx, stopReporter := context.WithCancel(ctx) reportDone := make(chan struct{}) @@ -322,7 +360,7 @@ func runStreaming(ctx context.Context, cfg config, state *generatedState, worklo DisableGasPriceCheck: cfg.disableGasPriceRule, OCCWorkers: cfg.executorWorkers, }, evmonly.WithState(state)) - return executeBlocks(groupCtx, workerID, executor, blocks, &discardStateWriter{}, discardReceiptSink{}, metrics) + return executeBlocks(groupCtx, workerID, executor, blocks, sinks, metrics) }) } @@ -337,7 +375,7 @@ func runStreaming(ctx context.Context, cfg config, state *generatedState, worklo return err } -func runPrebuilt(ctx context.Context, cfg config, state *generatedState, workload blockWorkload, metrics *loadMetrics) error { +func runPrebuilt(ctx context.Context, cfg config, state *generatedState, workload blockWorkload, sinks *resultSinks, metrics *loadMetrics) error { prebuildStartedAt := time.Now() prebuilt, err := prebuildBlockRequests(ctx, cfg, workload) if err != nil { @@ -369,7 +407,7 @@ func runPrebuilt(ctx context.Context, cfg config, state *generatedState, workloa DisableGasPriceCheck: cfg.disableGasPriceRule, OCCWorkers: cfg.executorWorkers, }, evmonly.WithState(state)) - return executeBlocks(groupCtx, workerID, executor, blocks, &discardStateWriter{}, discardReceiptSink{}, metrics) + return executeBlocks(groupCtx, workerID, executor, blocks, sinks, metrics) }) } @@ -483,8 +521,7 @@ func executeBlocks( workerID int, executor evmonly.BlockExecutor, blocks <-chan blockEnvelope, - stateWriter evmonly.StateWriter, - receiptSink receiptSink, + sinks *resultSinks, metrics *loadMetrics, ) error { for { @@ -503,8 +540,14 @@ func executeBlocks( } return fmt.Errorf("worker %d execute block %d: %w", workerID, block.number, err) } - stateWriter.ApplyChangeSet(result.ChangeSet) - receiptSink.StoreReceipts(block.number, result.Receipts) + if err := sinks.StoreChangeSet(block.number, result.ChangeSet); err != nil { + metrics.recordExecutionError() + return fmt.Errorf("worker %d store changeset for block %d: %w", workerID, block.number, err) + } + if err := sinks.StoreReceipts(block.number, result.Receipts); err != nil { + metrics.recordExecutionError() + return fmt.Errorf("worker %d store receipts for block %d: %w", workerID, block.number, err) + } metrics.recordFinished(len(result.Txs), result.GasUsed, result.OCCStats) } } @@ -847,13 +890,181 @@ var _ evmonly.StateWriter = (*discardStateWriter)(nil) func (*discardStateWriter) ApplyChangeSet(evmonly.StateChangeSet) {} +type changeSetSink interface { + StoreChangeSet(height uint64, changeSet evmonly.StateChangeSet) error +} + type receiptSink interface { - StoreReceipts(height uint64, receipts ethtypes.Receipts) + StoreReceipts(height uint64, receipts ethtypes.Receipts) error +} + +type resultSinks struct { + changeSets changeSetSink + receipts receiptSink + close func() error +} + +func newResultSinks(cfg config) (*resultSinks, error) { + switch cfg.resultSink { + case resultSinkDiscard: + return &resultSinks{ + changeSets: discardChangeSetSink{writer: &discardStateWriter{}}, + receipts: discardReceiptSink{}, + }, nil + case resultSinkFile: + return newFileResultSinks(cfg) + default: + return nil, fmt.Errorf("unsupported result-sink %q", cfg.resultSink) + } +} + +func (s *resultSinks) StoreChangeSet(height uint64, changeSet evmonly.StateChangeSet) error { + return s.changeSets.StoreChangeSet(height, changeSet) +} + +func (s *resultSinks) StoreReceipts(height uint64, receipts ethtypes.Receipts) error { + return s.receipts.StoreReceipts(height, receipts) +} + +func (s *resultSinks) Close() error { + if s.close == nil { + return nil + } + return s.close() +} + +type discardChangeSetSink struct { + writer evmonly.StateWriter +} + +func (s discardChangeSetSink) StoreChangeSet(_ uint64, changeSet evmonly.StateChangeSet) error { + s.writer.ApplyChangeSet(changeSet) + return nil } type discardReceiptSink struct{} -func (discardReceiptSink) StoreReceipts(uint64, ethtypes.Receipts) {} +func (discardReceiptSink) StoreReceipts(uint64, ethtypes.Receipts) error { + return nil +} + +type fileResultSinks struct { + changeSetFile *appendRLPFile + receiptFile *appendRLPFile +} + +func newFileResultSinks(cfg config) (*resultSinks, error) { + if err := os.MkdirAll(cfg.persistDir, 0o755); err != nil { + return nil, fmt.Errorf("create persist dir %s: %w", cfg.persistDir, err) + } + files := &fileResultSinks{} + var err error + files.changeSetFile, err = newAppendRLPFile(filepath.Join(cfg.persistDir, "changesets.rlp"), cfg.persistBufferSize, cfg.persistSync) + if err != nil { + return nil, err + } + files.receiptFile, err = newAppendRLPFile(filepath.Join(cfg.persistDir, "receipts.rlp"), cfg.persistBufferSize, cfg.persistSync) + if err != nil { + closeErr := files.changeSetFile.Close() + return nil, errors.Join(err, closeErr) + } + return &resultSinks{ + changeSets: fileChangeSetSink{file: files.changeSetFile}, + receipts: fileReceiptSink{file: files.receiptFile}, + close: files.Close, + }, nil +} + +func (s *fileResultSinks) Close() error { + return errors.Join(s.changeSetFile.Close(), s.receiptFile.Close()) +} + +type fileChangeSetSink struct { + file *appendRLPFile +} + +func (s fileChangeSetSink) StoreChangeSet(height uint64, changeSet evmonly.StateChangeSet) error { + return s.file.WriteRecord(height, changeSet) +} + +type fileReceiptSink struct { + file *appendRLPFile +} + +func (s fileReceiptSink) StoreReceipts(height uint64, receipts ethtypes.Receipts) error { + return s.file.WriteRecord(height, receipts) +} + +type appendRLPFile struct { + mu sync.Mutex + file *os.File + writer *bufio.Writer + syncOnWrite bool + closed bool +} + +func newAppendRLPFile(path string, bufferSize int, syncOnWrite bool) (*appendRLPFile, error) { + file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644) + if err != nil { + return nil, fmt.Errorf("open persist file %s: %w", path, err) + } + return &appendRLPFile{ + file: file, + writer: bufio.NewWriterSize(file, bufferSize), + syncOnWrite: syncOnWrite, + }, nil +} + +func (f *appendRLPFile) WriteRecord(height uint64, value any) error { + payload, err := rlp.EncodeToBytes(value) + if err != nil { + return fmt.Errorf("encode rlp record for height %d: %w", height, err) + } + var header [16]byte + binary.BigEndian.PutUint64(header[:8], height) + binary.BigEndian.PutUint64(header[8:], uint64(len(payload))) + + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return fmt.Errorf("write record for height %d: persist file is closed", height) + } + if _, err := f.writer.Write(header[:]); err != nil { + return fmt.Errorf("write record header for height %d: %w", height, err) + } + if _, err := f.writer.Write(payload); err != nil { + return fmt.Errorf("write record payload for height %d: %w", height, err) + } + if err := f.writer.Flush(); err != nil { + return fmt.Errorf("flush record for height %d: %w", height, err) + } + if f.syncOnWrite { + if err := f.file.Sync(); err != nil { + return fmt.Errorf("sync record for height %d: %w", height, err) + } + } + return nil +} + +func (f *appendRLPFile) sync() error { + if err := f.writer.Flush(); err != nil { + return err + } + if err := f.file.Sync(); err != nil { + return fmt.Errorf("sync persist file: %w", err) + } + return nil +} + +func (f *appendRLPFile) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return nil + } + f.closed = true + return errors.Join(f.sync(), f.file.Close()) +} type loadMetrics struct { inputBlocks atomic.Uint64 diff --git a/giga/evmonly/cmd/evmonly-loadtest/main_test.go b/giga/evmonly/cmd/evmonly-loadtest/main_test.go index defdc3bd44..4e71cedea8 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main_test.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main_test.go @@ -2,11 +2,16 @@ package main import ( "context" + "encoding/binary" + "fmt" + "os" + "path/filepath" "testing" "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" "github.com/stretchr/testify/require" "github.com/sei-protocol/sei-chain/giga/evmonly" @@ -136,6 +141,46 @@ func TestRunPrebuiltBlocks(t *testing.T) { require.NoError(t, run(cfg)) } +func TestRunPrebuiltBlocksWithFileResultSink(t *testing.T) { + dir := t.TempDir() + cfg, err := parseConfig([]string{ + "--metrics-addr=", + "--report-interval=0", + "--prebuild-blocks", + "--blocks=2", + "--txs-per-block=2", + "--gas-price-wei=0", + "--min-gas-price-wei=0", + "--result-sink=file", + "--persist-dir=" + dir, + }) + require.NoError(t, err) + require.NoError(t, run(cfg)) + + var changeSet evmonly.StateChangeSet + height := readPersistedRLPRecord(t, filepath.Join(dir, "changesets.rlp"), &changeSet) + require.Equal(t, uint64(1), height) + require.NotEmpty(t, changeSet.Balances) + + var receipts ethtypes.Receipts + height = readPersistedRLPRecord(t, filepath.Join(dir, "receipts.rlp"), &receipts) + require.Equal(t, uint64(1), height) + require.Len(t, receipts, cfg.txsPerBlock) +} + +func readPersistedRLPRecord(t *testing.T, path string, out any) uint64 { + t.Helper() + data, err := os.ReadFile(path) + require.NoError(t, err) + require.GreaterOrEqual(t, len(data), 16) + height := binary.BigEndian.Uint64(data[:8]) + payloadLen := binary.BigEndian.Uint64(data[8:16]) + require.LessOrEqual(t, payloadLen, uint64(len(data)-16)) + payload := data[16 : 16+payloadLen] + require.NoError(t, rlp.DecodeBytes(payload, out), fmt.Sprintf("decode %s", path)) + return height +} + func BenchmarkExecuteTransferBlock(b *testing.B) { cfg, err := parseConfig([]string{ "--metrics-addr=", From 78319b4b28123f8df1ba84a58dae9b9fbf11d0b7 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Wed, 1 Jul 2026 12:16:08 +0800 Subject: [PATCH 8/8] clean up persistent loadtest files --- giga/evmonly/cmd/evmonly-loadtest/main.go | 105 +++++++++++++++--- .../evmonly/cmd/evmonly-loadtest/main_test.go | 78 +++++++++++-- 2 files changed, 154 insertions(+), 29 deletions(-) diff --git a/giga/evmonly/cmd/evmonly-loadtest/main.go b/giga/evmonly/cmd/evmonly-loadtest/main.go index 36b3a08c7e..ce506fdd6f 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main.go @@ -275,7 +275,7 @@ func parseBig(name, raw string) (*big.Int, error) { return v, nil } -func run(cfg config) error { +func run(cfg config) (err error) { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() @@ -290,6 +290,17 @@ func run(cfg config) error { if err != nil { return err } + defer func() { + if closeErr := sinks.Close(); closeErr != nil { + if err != nil { + fmt.Fprintf(os.Stderr, "evmonly-loadtest: result sink close: %v\n", closeErr) + return + } + err = closeErr + } + }() + stopSinkSignalCleanup := cleanupSinksOnContextCancel(ctx, sinks) + defer stopSinkSignalCleanup() var server *metricsServer if cfg.metricsAddr != "" { @@ -306,20 +317,33 @@ func run(cfg config) error { fmt.Printf("metrics listening on http://%s/metrics\n", cfg.metricsAddr) } - var runErr error if cfg.prebuildBlocks { - runErr = runPrebuilt(ctx, cfg, state, workload, sinks, metrics) - } else { - runErr = runStreaming(ctx, cfg, state, workload, sinks, metrics) + return runPrebuilt(ctx, cfg, state, workload, sinks, metrics) } - if closeErr := sinks.Close(); closeErr != nil { - if runErr != nil { - fmt.Fprintf(os.Stderr, "evmonly-loadtest: result sink close: %v\n", closeErr) - return runErr + return runStreaming(ctx, cfg, state, workload, sinks, metrics) +} + +func cleanupSinksOnContextCancel(ctx context.Context, sinks *resultSinks) func() { + cleanupCtx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + select { + case <-ctx.Done(): + if err := sinks.Cleanup(); err != nil { + fmt.Fprintf(os.Stderr, "evmonly-loadtest: result sink cleanup: %v\n", err) + } + case <-cleanupCtx.Done(): } - return closeErr + }() + return func() { + select { + case <-ctx.Done(): + default: + cancel() + } + <-done } - return runErr } type blockWorkload interface { @@ -902,6 +926,7 @@ type resultSinks struct { changeSets changeSetSink receipts receiptSink close func() error + cleanup func() error } func newResultSinks(cfg config) (*resultSinks, error) { @@ -927,10 +952,20 @@ func (s *resultSinks) StoreReceipts(height uint64, receipts ethtypes.Receipts) e } func (s *resultSinks) Close() error { + var closeErr error if s.close == nil { + closeErr = nil + } else { + closeErr = s.close() + } + return errors.Join(closeErr, s.Cleanup()) +} + +func (s *resultSinks) Cleanup() error { + if s.cleanup == nil { return nil } - return s.close() + return s.cleanup() } type discardChangeSetSink struct { @@ -951,32 +986,66 @@ func (discardReceiptSink) StoreReceipts(uint64, ethtypes.Receipts) error { type fileResultSinks struct { changeSetFile *appendRLPFile receiptFile *appendRLPFile + cleanupMu sync.Mutex + paths []string + cleaned map[string]struct{} } func newFileResultSinks(cfg config) (*resultSinks, error) { if err := os.MkdirAll(cfg.persistDir, 0o755); err != nil { return nil, fmt.Errorf("create persist dir %s: %w", cfg.persistDir, err) } - files := &fileResultSinks{} + changeSetPath := filepath.Join(cfg.persistDir, "changesets.rlp") + receiptPath := filepath.Join(cfg.persistDir, "receipts.rlp") + files := &fileResultSinks{ + paths: []string{changeSetPath, receiptPath}, + cleaned: map[string]struct{}{}, + } var err error - files.changeSetFile, err = newAppendRLPFile(filepath.Join(cfg.persistDir, "changesets.rlp"), cfg.persistBufferSize, cfg.persistSync) + files.changeSetFile, err = newAppendRLPFile(changeSetPath, cfg.persistBufferSize, cfg.persistSync) if err != nil { return nil, err } - files.receiptFile, err = newAppendRLPFile(filepath.Join(cfg.persistDir, "receipts.rlp"), cfg.persistBufferSize, cfg.persistSync) + files.receiptFile, err = newAppendRLPFile(receiptPath, cfg.persistBufferSize, cfg.persistSync) if err != nil { - closeErr := files.changeSetFile.Close() - return nil, errors.Join(err, closeErr) + return nil, errors.Join(err, files.Close()) } return &resultSinks{ changeSets: fileChangeSetSink{file: files.changeSetFile}, receipts: fileReceiptSink{file: files.receiptFile}, close: files.Close, + cleanup: files.Cleanup, }, nil } func (s *fileResultSinks) Close() error { - return errors.Join(s.changeSetFile.Close(), s.receiptFile.Close()) + var errs []error + if s.changeSetFile != nil { + errs = append(errs, s.changeSetFile.Close()) + } + if s.receiptFile != nil { + errs = append(errs, s.receiptFile.Close()) + } + errs = append(errs, s.Cleanup()) + return errors.Join(errs...) +} + +func (s *fileResultSinks) Cleanup() error { + s.cleanupMu.Lock() + defer s.cleanupMu.Unlock() + + var errs []error + for _, path := range s.paths { + if _, ok := s.cleaned[path]; ok { + continue + } + if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { + errs = append(errs, fmt.Errorf("remove persist file %s: %w", path, err)) + continue + } + s.cleaned[path] = struct{}{} + } + return errors.Join(errs...) } type fileChangeSetSink struct { diff --git a/giga/evmonly/cmd/evmonly-loadtest/main_test.go b/giga/evmonly/cmd/evmonly-loadtest/main_test.go index 4e71cedea8..4b20749bdc 100644 --- a/giga/evmonly/cmd/evmonly-loadtest/main_test.go +++ b/giga/evmonly/cmd/evmonly-loadtest/main_test.go @@ -3,7 +3,9 @@ package main import ( "context" "encoding/binary" + "errors" "fmt" + "math/big" "os" "path/filepath" "testing" @@ -141,31 +143,73 @@ func TestRunPrebuiltBlocks(t *testing.T) { require.NoError(t, run(cfg)) } -func TestRunPrebuiltBlocksWithFileResultSink(t *testing.T) { +func TestFileResultSinkWritesRLPRecordsAndCleansUpOnCancel(t *testing.T) { dir := t.TempDir() cfg, err := parseConfig([]string{ "--metrics-addr=", - "--report-interval=0", - "--prebuild-blocks", - "--blocks=2", - "--txs-per-block=2", - "--gas-price-wei=0", - "--min-gas-price-wei=0", "--result-sink=file", "--persist-dir=" + dir, }) require.NoError(t, err) - require.NoError(t, run(cfg)) + sinks, err := newResultSinks(cfg) + require.NoError(t, err) + changePath := filepath.Join(dir, "changesets.rlp") + receiptPath := filepath.Join(dir, "receipts.rlp") + writtenChangeSet := evmonly.StateChangeSet{ + Balances: []evmonly.BalanceChange{{ + Address: common.HexToAddress("0x0000000000000000000000000000000000000001"), + Balance: big.NewInt(7), + }}, + } + writtenReceipts := ethtypes.Receipts{{ + Type: ethtypes.LegacyTxType, + Status: ethtypes.ReceiptStatusSuccessful, + TxHash: common.HexToHash("0x01"), + }} + require.NoError(t, sinks.StoreChangeSet(1, writtenChangeSet)) + require.NoError(t, sinks.StoreReceipts(1, writtenReceipts)) + + requireFileExists(t, changePath) + requireFileExists(t, receiptPath) var changeSet evmonly.StateChangeSet - height := readPersistedRLPRecord(t, filepath.Join(dir, "changesets.rlp"), &changeSet) + height := readPersistedRLPRecord(t, changePath, &changeSet) require.Equal(t, uint64(1), height) require.NotEmpty(t, changeSet.Balances) var receipts ethtypes.Receipts - height = readPersistedRLPRecord(t, filepath.Join(dir, "receipts.rlp"), &receipts) + height = readPersistedRLPRecord(t, receiptPath, &receipts) require.Equal(t, uint64(1), height) - require.Len(t, receipts, cfg.txsPerBlock) + require.Len(t, receipts, 1) + + ctx, cancel := context.WithCancel(context.Background()) + stopCleanup := cleanupSinksOnContextCancel(ctx, sinks) + cancel() + stopCleanup() + requireNoFileExists(t, changePath) + requireNoFileExists(t, receiptPath) + require.NoError(t, sinks.Close()) + requireNoFileExists(t, changePath) + requireNoFileExists(t, receiptPath) +} + +func TestRunPrebuiltBlocksWithFileResultSinkCleansUp(t *testing.T) { + dir := t.TempDir() + cfg, err := parseConfig([]string{ + "--metrics-addr=", + "--report-interval=0", + "--prebuild-blocks", + "--blocks=2", + "--txs-per-block=2", + "--gas-price-wei=0", + "--min-gas-price-wei=0", + "--result-sink=file", + "--persist-dir=" + dir, + }) + require.NoError(t, err) + require.NoError(t, run(cfg)) + requireNoFileExists(t, filepath.Join(dir, "changesets.rlp")) + requireNoFileExists(t, filepath.Join(dir, "receipts.rlp")) } func readPersistedRLPRecord(t *testing.T, path string, out any) uint64 { @@ -181,6 +225,18 @@ func readPersistedRLPRecord(t *testing.T, path string, out any) uint64 { return height } +func requireFileExists(t *testing.T, path string) { + t.Helper() + _, err := os.Stat(path) + require.NoError(t, err) +} + +func requireNoFileExists(t *testing.T, path string) { + t.Helper() + _, err := os.Stat(path) + require.Truef(t, errors.Is(err, os.ErrNotExist), "expected %s to be removed, got %v", path, err) +} + func BenchmarkExecuteTransferBlock(b *testing.B) { cfg, err := parseConfig([]string{ "--metrics-addr=",