Skip to content

Commit 78fbdc1

Browse files
authored
Write txs to a load txs directory for direct block loading (#25)
This generates a configured level of load into blocks written to a directory to be loaded by a node for direct block loading in testing
1 parent 052ec99 commit 78fbdc1

7 files changed

Lines changed: 274 additions & 11 deletions

File tree

config/settings.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type Settings struct {
2424
Prewarm bool `json:"prewarm,omitempty"`
2525
RampUp bool `json:"rampUp,omitempty"`
2626
ReportPath string `json:"reportPath,omitempty"`
27+
TxsDir string `json:"txsDir,omitempty"`
28+
TargetGas uint64 `json:"targetGas,omitempty"`
29+
NumBlocksToWrite int `json:"numBlocksToWrite,omitempty"`
2730
}
2831

2932
// DefaultSettings returns the default configuration values
@@ -41,6 +44,9 @@ func DefaultSettings() Settings {
4144
Prewarm: false,
4245
RampUp: false,
4346
ReportPath: "",
47+
TxsDir: "",
48+
TargetGas: 10_000_000,
49+
NumBlocksToWrite: 100,
4450
}
4551
}
4652

@@ -60,6 +66,9 @@ func InitializeViper(cmd *cobra.Command) error {
6066
"workers": "workers",
6167
"rampUp": "ramp-up",
6268
"reportPath": "report-path",
69+
"txsDir": "txs-dir",
70+
"targetGas": "target-gas",
71+
"numBlocksToWrite": "num-blocks-to-write",
6372
}
6473

6574
for viperKey, flagName := range flagBindings {
@@ -82,6 +91,9 @@ func InitializeViper(cmd *cobra.Command) error {
8291
viper.SetDefault("workers", defaults.Workers)
8392
viper.SetDefault("rampUp", defaults.RampUp)
8493
viper.SetDefault("reportPath", defaults.ReportPath)
94+
viper.SetDefault("txsDir", defaults.TxsDir)
95+
viper.SetDefault("targetGas", defaults.TargetGas)
96+
viper.SetDefault("numBlocksToWrite", defaults.NumBlocksToWrite)
8597
return nil
8698
}
8799

@@ -120,5 +132,8 @@ func ResolveSettings() Settings {
120132
Prewarm: viper.GetBool("prewarm"),
121133
RampUp: viper.GetBool("rampUp"),
122134
ReportPath: viper.GetString("reportPath"),
135+
TxsDir: viper.GetString("txsDir"),
136+
TargetGas: viper.GetUint64("targetGas"),
137+
NumBlocksToWrite: viper.GetInt("numBlocksToWrite"),
123138
}
124139
}

config/settings_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ func TestArgumentPrecedence(t *testing.T) {
9393
cmd.Flags().Int("buffer-size", 0, "Buffer size")
9494
cmd.Flags().Bool("ramp-up", false, "Ramp up loadtest")
9595
cmd.Flags().String("report-path", "", "Report path")
96+
cmd.Flags().String("txs-dir", "", "Txs dir")
97+
cmd.Flags().Uint64("target-gas", 0, "Target gas")
98+
cmd.Flags().Int("num-blocks-to-write", 0, "Number of blocks to write")
9699

97100
// Parse CLI args
98101
if len(tt.cliArgs) > 0 {
@@ -133,6 +136,9 @@ func TestDefaultSettings(t *testing.T) {
133136
Prewarm: false,
134137
RampUp: false,
135138
ReportPath: "",
139+
TxsDir: "",
140+
TargetGas: 10_000_000,
141+
NumBlocksToWrite: 100,
136142
}
137143

138144
if defaults != expected {

main.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"syscall"
1313
"time"
1414

15+
"github.com/ethereum/go-ethereum/ethclient"
1516
"github.com/prometheus/client_golang/prometheus/promhttp"
1617
"github.com/spf13/cobra"
1718
"go.opentelemetry.io/otel"
@@ -65,6 +66,9 @@ func init() {
6566
rootCmd.Flags().String("metricsListenAddr", "0.0.0.0:9090", "The ip:port on which to export prometheus metrics.")
6667
rootCmd.Flags().Bool("ramp-up", false, "Ramp up loadtest")
6768
rootCmd.Flags().String("report-path", "", "Path to save the report")
69+
rootCmd.Flags().String("txs-dir", "", "Path to save the transactions")
70+
rootCmd.Flags().Uint64("target-gas", 10_000_000, "Target gas per block")
71+
rootCmd.Flags().Int("num-blocks-to-write", 100, "Number of blocks to write")
6872

6973
// Initialize Viper with proper error handling
7074
if err := config.InitializeViper(rootCmd); err != nil {
@@ -169,12 +173,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
169173
sharedLimiter = rate.NewLimiter(rate.Inf, 1)
170174
}
171175

172-
// Create the sender from the config struct
173-
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter)
174-
if err != nil {
175-
return fmt.Errorf("failed to create sender: %w", err)
176-
}
177-
178176
// Create and start block collector if endpoints are available
179177
var blockCollector *stats.BlockCollector
180178
if len(cfg.Endpoints) > 0 && settings.TrackBlocks {
@@ -207,6 +205,12 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
207205
})
208206
}
209207

208+
// Create the sender from the config struct
209+
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter)
210+
if err != nil {
211+
return fmt.Errorf("failed to create sender: %w", err)
212+
}
213+
210214
// Enable dry-run mode in sender if specified
211215
if settings.DryRun {
212216
snd.SetDryRun(true)
@@ -225,7 +229,25 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
225229
snd.SetStatsCollector(collector, logger)
226230

227231
// Create dispatcher
228-
dispatcher := sender.NewDispatcher(gen, snd)
232+
var dispatcher *sender.Dispatcher
233+
if settings.TxsDir != "" {
234+
// get latest height
235+
ethclient, err := ethclient.Dial(cfg.Endpoints[0])
236+
if err != nil {
237+
return fmt.Errorf("failed to create ethclient: %w", err)
238+
}
239+
latestHeight, err := ethclient.BlockNumber(ctx)
240+
if err != nil {
241+
return fmt.Errorf("failed to get latest height: %w", err)
242+
}
243+
numBlocksToWrite := settings.NumBlocksToWrite
244+
writerHeight := latestHeight + 10 // some buffer
245+
log.Printf("🔍 Latest height: %d, writer start height: %d", latestHeight, writerHeight)
246+
writer := sender.NewTxsWriter(settings.TargetGas, settings.TxsDir, writerHeight, uint64(numBlocksToWrite))
247+
dispatcher = sender.NewDispatcher(gen, writer)
248+
} else {
249+
dispatcher = sender.NewDispatcher(gen, snd)
250+
}
229251

230252
// Set statistics collector for dispatcher
231253
dispatcher.SetStatsCollector(collector)
@@ -239,10 +261,11 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
239261
log.Printf("📝 Prewarm mode: Accounts will be prewarmed")
240262
}
241263

242-
// Start the sender (starts all workers)
243-
s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) })
244-
log.Printf("✅ Connected to %d endpoints", snd.GetNumShards())
245-
264+
if settings.TxsDir == "" {
265+
// Start the sender (starts all workers)
266+
s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) })
267+
log.Printf("✅ Connected to %d endpoints", snd.GetNumShards())
268+
}
246269
// Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions)
247270
if settings.Prewarm {
248271
if err := dispatcher.Prewarm(ctx); err != nil {

profiles/conflict.json

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"chainId": 713714,
3+
"seiChainId": "sei-chain",
4+
"endpoints": [
5+
"http://127.0.0.1:8545"
6+
],
7+
"accounts": {
8+
"count": 5000,
9+
"newAccountRate": 0.0
10+
},
11+
"scenarios": [
12+
{
13+
"name": "ERC20Conflict",
14+
"weight": 1
15+
}
16+
],
17+
"settings": {
18+
"workers": 1,
19+
"tps": 0,
20+
"statsInterval": "5s",
21+
"bufferSize": 1000,
22+
"dryRun": false,
23+
"debug": false,
24+
"trackReceipts": false,
25+
"trackBlocks": false,
26+
"trackUserLatency": false,
27+
"prewarm": false,
28+
"rampUp": false
29+
}
30+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"chainId": 713714,
3+
"seiChainId": "sei-chain",
4+
"endpoints": [
5+
"http://127.0.0.1:8545"
6+
],
7+
"accounts": {
8+
"count": 5000,
9+
"newAccountRate": 0.0
10+
},
11+
"scenarios": [
12+
{
13+
"name": "EVMTransfer",
14+
"weight": 1
15+
}
16+
],
17+
"settings": {
18+
"workers": 1,
19+
"tps": 0,
20+
"statsInterval": "5s",
21+
"bufferSize": 1000,
22+
"dryRun": false,
23+
"debug": false,
24+
"trackReceipts": false,
25+
"trackBlocks": false,
26+
"trackUserLatency": false,
27+
"prewarm": false,
28+
"rampUp": false,
29+
"txsDir": "/root/load_txs",
30+
"targetGas": 30000000,
31+
"numBlocksToWrite": 1000
32+
}
33+
}

sender/writer.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package sender
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log"
8+
"os"
9+
"path/filepath"
10+
11+
"github.com/sei-protocol/sei-load/types"
12+
)
13+
14+
// implements `Send`
15+
16+
type TxsWriter struct {
17+
gasPerBlock uint64
18+
nextHeight uint64
19+
txsDir string
20+
blocksGenerated uint64
21+
numBlocks uint64
22+
23+
bufferGas uint64
24+
txBuffer []*types.LoadTx
25+
}
26+
27+
func NewTxsWriter(gasPerBlock uint64, txsDir string, startHeight uint64, numBlocks uint64) *TxsWriter {
28+
// what height to start at?
29+
return &TxsWriter{
30+
gasPerBlock: gasPerBlock,
31+
nextHeight: startHeight,
32+
txsDir: txsDir,
33+
blocksGenerated: 0,
34+
numBlocks: numBlocks,
35+
36+
bufferGas: 0,
37+
txBuffer: make([]*types.LoadTx, 0),
38+
}
39+
}
40+
41+
// Send writes the transaction to the writer
42+
func (w *TxsWriter) Send(ctx context.Context, tx *types.LoadTx) error {
43+
// if bwe would exceed gasPerBlock, flush
44+
if w.bufferGas+tx.EthTx.Gas() > w.gasPerBlock {
45+
if err := w.Flush(); err != nil {
46+
return err
47+
}
48+
}
49+
50+
// add to buffer
51+
w.txBuffer = append(w.txBuffer, tx)
52+
w.bufferGas += tx.EthTx.Gas()
53+
return nil
54+
}
55+
56+
type TxWriteData struct {
57+
TxPayloads [][]byte `json:"tx_payloads"`
58+
}
59+
60+
func (w *TxsWriter) Flush() error {
61+
defer func() {
62+
// clear buffer and reset bufferGas and increment nextHeight
63+
w.txBuffer = make([]*types.LoadTx, 0)
64+
w.bufferGas = 0
65+
w.nextHeight++
66+
w.blocksGenerated++
67+
}()
68+
// write to dir `~/load_txs`
69+
// make dir if it doesn't exist
70+
err := os.MkdirAll(w.txsDir, 0755)
71+
if err != nil {
72+
return err
73+
}
74+
txsFile := filepath.Join(w.txsDir, fmt.Sprintf("%d_txs.json", w.nextHeight))
75+
txData := TxWriteData{
76+
TxPayloads: make([][]byte, 0),
77+
}
78+
for _, tx := range w.txBuffer {
79+
txData.TxPayloads = append(txData.TxPayloads, tx.Payload)
80+
}
81+
82+
txDataJSON, err := json.Marshal(txData)
83+
if err != nil {
84+
return err
85+
}
86+
87+
if err := os.WriteFile(txsFile, txDataJSON, 0644); err != nil {
88+
return err
89+
}
90+
91+
log.Printf("Flushed %d transactions to %s", len(w.txBuffer), txsFile)
92+
93+
if w.blocksGenerated >= w.numBlocks {
94+
return fmt.Errorf("reached max number of blocks: %d", w.numBlocks)
95+
}
96+
97+
return nil
98+
}

sender/writer_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package sender
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/sei-protocol/sei-load/config"
8+
"github.com/sei-protocol/sei-load/generator"
9+
"github.com/sei-protocol/sei-load/generator/scenarios"
10+
"github.com/sei-protocol/sei-load/types"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestTxsWriter_Flush(t *testing.T) {
15+
// two evm transfer txs
16+
writer := NewTxsWriter(42000, "/tmp", 1, 1)
17+
18+
loadConfig := &config.LoadConfig{
19+
ChainID: 7777,
20+
}
21+
22+
sharedAccounts := types.NewAccountPool(&types.AccountConfig{
23+
Accounts: types.GenerateAccounts(10),
24+
NewAccountRate: 0.0,
25+
})
26+
27+
evmScenario := scenarios.CreateScenario(config.Scenario{
28+
Name: "EVMTransfer",
29+
Weight: 1,
30+
})
31+
evmScenario.Deploy(loadConfig, sharedAccounts.NextAccount())
32+
33+
generator := generator.NewScenarioGenerator(sharedAccounts, evmScenario)
34+
35+
txs := generator.GenerateN(3)
36+
37+
err := writer.Send(context.Background(), txs[0])
38+
require.NoError(t, err)
39+
require.Equal(t, uint64(1), writer.nextHeight)
40+
require.Equal(t, uint64(21000), writer.bufferGas)
41+
require.Len(t, writer.txBuffer, 1)
42+
require.Equal(t, txs[0], writer.txBuffer[0])
43+
44+
err = writer.Send(context.Background(), txs[1])
45+
require.NoError(t, err)
46+
require.Equal(t, uint64(1), writer.nextHeight)
47+
require.Equal(t, uint64(42000), writer.bufferGas)
48+
require.Len(t, writer.txBuffer, 2)
49+
require.Equal(t, txs[1], writer.txBuffer[1])
50+
51+
err = writer.Send(context.Background(), txs[2])
52+
require.NoError(t, err)
53+
// now should be flushed and have the new tx
54+
require.Equal(t, uint64(2), writer.nextHeight)
55+
require.Equal(t, uint64(21000), writer.bufferGas)
56+
require.Len(t, writer.txBuffer, 1)
57+
58+
}

0 commit comments

Comments
 (0)