Skip to content

Commit ee28dfe

Browse files
jewei1997Cody Littley
andauthored
add receipt generation to cryptosim (#3047)
## Describe your changes and provide context - Add receipt generation to cryptosim: each simulated ERC20 transfer transaction now produces a realistic receipt with logs, bloom filter, gas fields, and tx type distribution - Add a `RecieptStoreSimulator` that consumes generated receipts and writes them to a parquet store with WAL, flush, rotation, and pruning — matching the real node's write path - Add configurable parquet store settings: `ReceiptKeepRecent`, `ReceiptPruneIntervalSeconds`, `ReceiptMaxBlocksPerFile`, `ReceiptBlockFlushInterval` - Add receipt write metrics: `receipt_block_write_duration_seconds` (histogram), `receipt_channel_depth` (gauge), `receipts_written_total` (counter), `receipt_errors_total` (counter) - Move block building to a background goroutine (`blockBuilder`) so transaction generation doesn't block the main execution loop Set `"GenerateReceipts": true` to enable. See `config/reciept-store.json` for an example config. All receipt store tuning parameters have production-matching defaults and can be left unset. ## Testing performed to validate your change unit tests + running cryptosim simulation --------- Signed-off-by: Cody Littley <cody.littley@seinetwork.io> Co-authored-by: Cody Littley <cody.littley@seinetwork.io>
1 parent 0017345 commit ee28dfe

15 files changed

Lines changed: 1289 additions & 42 deletions

File tree

docker/monitornode/dashboards/cryptosim-dashboard.json

Lines changed: 453 additions & 17 deletions
Large diffs are not rendered by default.

sei-db/ledger_db/receipt/parquet_store.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (s *parquetReceiptStore) SetReceipts(ctx sdk.Context, receipts []ReceiptRec
177177
BlockNumber: blockNumber,
178178
ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes),
179179
},
180-
Logs: buildParquetLogRecords(txLogs, blockHash),
180+
Logs: BuildParquetLogRecords(txLogs, blockHash),
181181
ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes),
182182
})
183183
}
@@ -308,7 +308,7 @@ func (s *parquetReceiptStore) replayWAL() error {
308308
BlockNumber: blockNumber,
309309
ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes),
310310
},
311-
Logs: buildParquetLogRecords(txLogs, blockHash),
311+
Logs: BuildParquetLogRecords(txLogs, blockHash),
312312
}
313313

314314
if err := s.store.ApplyReceiptFromReplay(input); err != nil {
@@ -348,14 +348,14 @@ func truncateReplayWAL(w interface{ TruncateBefore(offset uint64) error }, dropO
348348
return nil
349349
}
350350

351-
func buildParquetLogRecords(logs []*ethtypes.Log, blockHash common.Hash) []parquet.LogRecord {
351+
func BuildParquetLogRecords(logs []*ethtypes.Log, blockHash common.Hash) []parquet.LogRecord {
352352
if len(logs) == 0 {
353353
return nil
354354
}
355355

356356
records := make([]parquet.LogRecord, 0, len(logs))
357357
for _, lg := range logs {
358-
topic0, topic1, topic2, topic3 := extractLogTopics(lg.Topics)
358+
topic0, topic1, topic2, topic3 := ExtractLogTopics(lg.Topics)
359359
rec := parquet.LogRecord{
360360
BlockNumber: lg.BlockNumber,
361361
TxHash: lg.TxHash[:],
@@ -393,7 +393,7 @@ func buildTopicsFromParquetLogResult(lr parquet.LogResult) []common.Hash {
393393
return topicList
394394
}
395395

396-
func extractLogTopics(topics []common.Hash) ([]byte, []byte, []byte, []byte) {
396+
func ExtractLogTopics(topics []common.Hash) ([]byte, []byte, []byte, []byte) {
397397
t0 := make([]byte, 0)
398398
t1 := make([]byte, 0)
399399
t2 := make([]byte, 0)

sei-db/state_db/bench/cryptosim/block.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package cryptosim
22

3-
import "iter"
3+
import (
4+
"iter"
5+
6+
evmtypes "github.com/sei-protocol/sei-chain/x/evm/types"
7+
)
48

59
// A simulated block of transactions.
610
type block struct {
@@ -9,6 +13,9 @@ type block struct {
913
// The transactions in the block.
1014
transactions []*transaction
1115

16+
// If receipt generation is enabled, this will contain the receipts for each transaction in the block.
17+
reciepts []*evmtypes.Receipt
18+
1219
// The block number. This is not currently preserved across benchmark restarts, but otherwise monotonically
1320
// increases as you'd expect.
1421
blockNumber int64
@@ -32,11 +39,18 @@ func NewBlock(
3239
blockNumber int64,
3340
capacity int,
3441
) *block {
42+
43+
var reciepts []*evmtypes.Receipt
44+
if config.GenerateReceipts {
45+
reciepts = make([]*evmtypes.Receipt, 0, capacity)
46+
}
47+
3548
return &block{
3649
config: config,
3750
blockNumber: blockNumber,
3851
transactions: make([]*transaction, 0, capacity),
3952
metrics: metrics,
53+
reciepts: reciepts,
4054
}
4155
}
4256

@@ -56,6 +70,11 @@ func (b *block) AddTransaction(txn *transaction) {
5670
b.transactions = append(b.transactions, txn)
5771
}
5872

73+
// Adds a receipt to the block.
74+
func (b *block) AddReceipt(receipt *evmtypes.Receipt) {
75+
b.reciepts = append(b.reciepts, receipt)
76+
}
77+
5978
// Returns the block number.
6079
func (b *block) BlockNumber() int64 {
6180
return b.blockNumber

sei-db/state_db/bench/cryptosim/block_builder.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,21 @@ func (b *blockBuilder) buildBlock() *block {
6969
continue
7070
}
7171
blk.AddTransaction(txn)
72+
73+
if b.config.GenerateReceipts {
74+
receipt, err := BuildERC20TransferReceiptFromTxn(
75+
b.dataGenerator.Rand(),
76+
b.dataGenerator.FeeCollectionAddress(),
77+
uint64(blk.BlockNumber()), //nolint:gosec
78+
uint32(i), //nolint:gosec
79+
txn,
80+
)
81+
if err != nil {
82+
fmt.Printf("failed to build receipt: %v\n", err)
83+
continue
84+
}
85+
blk.AddReceipt(receipt)
86+
}
7287
}
7388

7489
blk.SetBlockAccountStats(
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"Comment": "For testing with the state store and reciept store both enabled.",
3+
"DataDir": "data",
4+
"MinimumNumberOfColdAccounts": 1000000,
5+
"MinimumNumberOfDormantAccounts": 1000000,
6+
"GenerateReceipts": true
7+
}
8+

sei-db/state_db/bench/cryptosim/cryptosim.go

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/sei-protocol/sei-chain/sei-db/state_db/bench/wrappers"
10+
"golang.org/x/time/rate"
1011
)
1112

1213
const (
@@ -17,8 +18,9 @@ const (
1718

1819
// EVM key sizes (matches sei-db/common/evm).
1920
const (
20-
AddressLen = 20 // EVM address length
21-
SlotLen = 32 // EVM storage slot length
21+
AddressLen = 20 // EVM address length
22+
SlotLen = 32 // EVM storage slot length
23+
StorageKeyLen = AddressLen + SlotLen
2224
)
2325

2426
// The test runner for the cryptosim benchmark.
@@ -74,6 +76,12 @@ type CryptoSim struct {
7476
// This is fixed after initial setup is complete, since we don't currently simulate
7577
// the creation of new ERC20 contracts during the benchmark.
7678
nextERC20ContractID int64
79+
80+
// The channel that holds blocks sent to the receipt store.
81+
recieptsChan chan *block
82+
83+
// Enforces a maximum transaction rate (if enabled).
84+
rateLimiter *rate.Limiter
7785
}
7886

7987
// Creates a new cryptosim benchmark runner.
@@ -137,7 +145,7 @@ func NewCryptoSim(
137145

138146
start := time.Now()
139147

140-
database := NewDatabase(config, db, metrics)
148+
database := NewDatabase(config, db, metrics, 0)
141149

142150
dataGenerator, err := NewDataGenerator(config, database, rand, metrics)
143151
if err != nil {
@@ -147,6 +155,7 @@ func NewCryptoSim(
147155
}
148156
return nil, fmt.Errorf("failed to create data generator: %w", err)
149157
}
158+
database.nextBlockNumber = dataGenerator.InitialNextBlockNumber()
150159
threadCount := int(config.ThreadsPerCore)*runtime.NumCPU() + config.ConstantThreadCount
151160
if threadCount < 1 {
152161
threadCount = 1
@@ -156,7 +165,23 @@ func NewCryptoSim(
156165
executors := make([]*TransactionExecutor, threadCount)
157166
for i := 0; i < threadCount; i++ {
158167
executors[i] = NewTransactionExecutor(
159-
ctx, cancel, database, dataGenerator.FeeCollectionAddress(), config.ExecutorQueueSize, metrics)
168+
ctx, cancel, config, database, dataGenerator.FeeCollectionAddress(), config.ExecutorQueueSize, metrics)
169+
}
170+
171+
var recieptsChan chan *block
172+
if config.GenerateReceipts {
173+
recieptsChan = make(chan *block, config.RecieptChannelCapacity)
174+
_, err := NewRecieptStoreSimulator(ctx, config, recieptsChan, metrics)
175+
if err != nil {
176+
cancel()
177+
return nil, fmt.Errorf("failed to create receipt store simulator: %w", err)
178+
}
179+
metrics.startReceiptChannelDepthSampling(recieptsChan, config.BackgroundMetricsScrapeInterval)
180+
}
181+
182+
var rateLimiter *rate.Limiter
183+
if config.MaxTPS > 0 {
184+
rateLimiter = rate.NewLimiter(rate.Limit(config.MaxTPS), config.TransactionsPerBlock)
160185
}
161186

162187
blockBuilder := NewBlockBuilder(ctx, config, metrics, dataGenerator)
@@ -175,6 +200,8 @@ func NewCryptoSim(
175200
executors: executors,
176201
metrics: metrics,
177202
suspendChan: make(chan bool, 1),
203+
recieptsChan: recieptsChan,
204+
rateLimiter: rateLimiter,
178205
}
179206

180207
database.SetFlushFunc(c.flushExecutors)
@@ -233,7 +260,7 @@ func (c *CryptoSim) setupAccounts() error {
233260
if err != nil {
234261
return fmt.Errorf("failed to create new account: %w", err)
235262
}
236-
c.database.IncrementTransactionCount(1)
263+
c.database.IncrementTransactionCount()
237264
finalized, err := c.database.MaybeFinalizeBlock(
238265
c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID())
239266
if err != nil {
@@ -255,7 +282,8 @@ func (c *CryptoSim) setupAccounts() error {
255282
fmt.Printf("Created %s of %s accounts. \n",
256283
int64Commas(c.dataGenerator.NextAccountID()), int64Commas(int64(requiredNumberOfAccounts)))
257284

258-
err := c.database.FinalizeBlock(c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID(), true)
285+
err := c.database.FinalizeBlock(
286+
c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID(), true)
259287
if err != nil {
260288
return fmt.Errorf("failed to finalize block: %w", err)
261289
}
@@ -290,7 +318,7 @@ func (c *CryptoSim) setupErc20Contracts() error {
290318
break
291319
}
292320

293-
c.database.IncrementTransactionCount(1)
321+
c.database.IncrementTransactionCount()
294322

295323
_, _, err := c.dataGenerator.CreateNewErc20Contract(c.config.Erc20ContractSize, true)
296324
if err != nil {
@@ -320,7 +348,10 @@ func (c *CryptoSim) setupErc20Contracts() error {
320348
fmt.Printf("Created %s of %s simulated ERC20 contracts. \n",
321349
int64Commas(c.dataGenerator.NextErc20ContractID()), int64Commas(int64(c.config.MinimumNumberOfErc20Contracts)))
322350

323-
err := c.database.FinalizeBlock(c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID(), true)
351+
err := c.database.FinalizeBlock(
352+
c.dataGenerator.NextAccountID(),
353+
c.dataGenerator.NextErc20ContractID(),
354+
true)
324355
if err != nil {
325356
return fmt.Errorf("failed to finalize block: %w", err)
326357
}
@@ -365,19 +396,38 @@ func (c *CryptoSim) run() {
365396
c.cancel()
366397
return
367398
case blk := <-c.blockBuilder.blocksChan:
399+
c.maybeThrottle()
368400
c.handleNextBlock(blk)
369401
}
370402

371403
c.generateConsoleReport(false)
372404
}
373405
}
374406

407+
// Potentially block for a while if we are throttling the transaction rate.
408+
func (c *CryptoSim) maybeThrottle() {
409+
if c.config.MaxTPS == 0 {
410+
// Throttling is disabled.
411+
return
412+
}
413+
414+
c.metrics.SetMainThreadPhase("throttling")
415+
416+
if err := c.rateLimiter.WaitN(c.ctx, c.config.TransactionsPerBlock); err != nil {
417+
fmt.Printf("failed to wait for rate limit: %v\n", err)
418+
c.cancel()
419+
return
420+
}
421+
}
422+
375423
// Execute and finalize the next block.
376424
func (c *CryptoSim) handleNextBlock(blk *block) {
377425
c.mostRecentBlock = blk
378426
c.metrics.SetMainThreadPhase("send_to_executors")
379427

380-
c.database.IncrementTransactionCount(blk.TransactionCount())
428+
for i := int64(0); i < blk.TransactionCount(); i++ {
429+
c.database.IncrementTransactionCount()
430+
}
381431

382432
for txn := range blk.Iterator() {
383433
c.executors[c.nextExecutorIndex].ScheduleForExecution(txn)
@@ -389,6 +439,15 @@ func (c *CryptoSim) handleNextBlock(blk *block) {
389439
c.cancel()
390440
return
391441
}
442+
443+
if c.config.GenerateReceipts {
444+
select {
445+
case <-c.ctx.Done():
446+
return
447+
case c.recieptsChan <- blk:
448+
}
449+
}
450+
392451
blk.ReportBlockMetrics()
393452
}
394453

sei-db/state_db/bench/cryptosim/cryptosim_config.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,30 @@ type CryptoSimConfig struct {
140140
// The capacity of the channel that holds blocks awaiting execution.
141141
BlockChannelCapacity int
142142

143+
// If true, the benchmark will generate receipts for each transaction in each block and
144+
// feed those receipts into the receipt store.
145+
GenerateReceipts bool
146+
147+
// The capacity of the channel that holds blocks sent to the receipt store.
148+
RecieptChannelCapacity int
149+
150+
// If true, disables simulation of transaction execution, and writes very little to the database. This is
151+
// potentially useful when benchmarking things other than state storage (e.g. the receipt store).
152+
//
153+
// Note that switching execution on after previously running with execution disabled may result in buggy behavior,
154+
// as the benchmark will not be properly maintaining DB state when transaction execution is disabled. In order
155+
// to switch transaction execution back on, it is necessary to delete the on-disk database and start over.
156+
DisableTransactionExecution bool
157+
158+
// If greater than 0, the benchmark will throttle the transaction rate to this value, in hertz.
159+
MaxTPS float64
160+
161+
// Number of recent blocks to keep before pruning parquet files. 0 disables pruning.
162+
ReceiptKeepRecent int64
163+
164+
// Interval in seconds between prune checks. 0 disables pruning.
165+
ReceiptPruneIntervalSeconds int64
166+
143167
// Directory for seilog output files. Independent of DataDir so logs and data
144168
// live in separate trees. Supports ~ expansion and relative paths (resolved
145169
// from cwd). Must be set, there is no default.
@@ -187,6 +211,12 @@ func DefaultCryptoSimConfig() *CryptoSimConfig {
187211
BackgroundMetricsScrapeInterval: 60,
188212
EnableSuspension: true,
189213
BlockChannelCapacity: 8,
214+
GenerateReceipts: false,
215+
RecieptChannelCapacity: 32,
216+
DisableTransactionExecution: false,
217+
MaxTPS: 0,
218+
ReceiptKeepRecent: 100_000,
219+
ReceiptPruneIntervalSeconds: 600,
190220
LogLevel: "info",
191221
}
192222
}
@@ -266,6 +296,12 @@ func (c *CryptoSimConfig) Validate() error {
266296
if c.BlockChannelCapacity < 1 {
267297
return fmt.Errorf("BlockChannelCapacity must be at least 1 (got %d)", c.BlockChannelCapacity)
268298
}
299+
if c.RecieptChannelCapacity < 1 {
300+
return fmt.Errorf("RecieptChannelCapacity must be at least 1 (got %d)", c.RecieptChannelCapacity)
301+
}
302+
if c.MaxTPS < 0 {
303+
return fmt.Errorf("MaxTPS must be non-negative (got %f)", c.MaxTPS)
304+
}
269305
switch strings.ToLower(c.LogLevel) {
270306
case "debug", "info", "warn", "error":
271307
default:

0 commit comments

Comments
 (0)