Skip to content

Commit b7d2319

Browse files
Add global limiter (#9)
1 parent 72d85ab commit b7d2319

4 files changed

Lines changed: 34 additions & 50 deletions

File tree

main.go

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/spf13/cobra"
15+
"golang.org/x/time/rate"
1516

1617
"github.com/sei-protocol/sei-load/config"
1718
"github.com/sei-protocol/sei-load/generator"
@@ -25,20 +26,6 @@ var (
2526
configFile string
2627
)
2728

28-
// ResolvedSettings holds the final resolved settings after applying precedence
29-
type ResolvedSettings struct {
30-
Workers int
31-
TPS float64
32-
StatsInterval time.Duration
33-
BufferSize int
34-
DryRun bool
35-
Debug bool
36-
TrackReceipts bool
37-
TrackBlocks bool
38-
TrackUserLatency bool
39-
Prewarm bool
40-
}
41-
4229
var rootCmd = &cobra.Command{
4330
Use: "seiload",
4431
Short: "Sei Chain Load Test v2",
@@ -149,8 +136,20 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
149136
return fmt.Errorf("failed to create generator: %w", err)
150137
}
151138

139+
// Create shared rate limiter for all workers if TPS is specified
140+
var sharedLimiter *rate.Limiter
141+
if settings.TPS > 0 {
142+
// Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds
143+
intervalNs := int64((1.0 / settings.TPS) * 1e9)
144+
sharedLimiter = rate.NewLimiter(rate.Every(time.Duration(intervalNs)), 1)
145+
log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", settings.TPS)
146+
} else {
147+
// No rate limiting
148+
sharedLimiter = rate.NewLimiter(rate.Inf, 1)
149+
}
150+
152151
// Create the sender from the config struct
153-
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers)
152+
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter)
154153
if err != nil {
155154
return fmt.Errorf("failed to create sender: %w", err)
156155
}
@@ -192,11 +191,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
192191

193192
// Create dispatcher
194193
dispatcher := sender.NewDispatcher(gen, snd)
195-
if settings.TPS > 0 {
196-
// Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds
197-
intervalNs := int64((1.0 / settings.TPS) * 1e9)
198-
dispatcher.SetRateLimit(time.Duration(intervalNs))
199-
}
200194

201195
// Set statistics collector for dispatcher
202196
dispatcher.SetStatsCollector(collector)

sender/dispatcher.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ package sender
33
import (
44
"context"
55
"fmt"
6-
"golang.org/x/time/rate"
76
"log"
87
"sync"
9-
"time"
108

119
"github.com/sei-protocol/sei-load/generator"
1210
"github.com/sei-protocol/sei-load/stats"
@@ -19,9 +17,6 @@ type Dispatcher struct {
1917
prewarmGen utils.Option[generator.Generator] // Optional prewarm generator
2018
sender TxSender
2119

22-
// Configuration
23-
limiter *rate.Limiter
24-
2520
// Statistics
2621
totalSent uint64
2722
mu sync.RWMutex
@@ -33,17 +28,9 @@ func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher {
3328
return &Dispatcher{
3429
generator: gen,
3530
sender: sender,
36-
limiter: rate.NewLimiter(rate.Inf, 1), // No rate limiting by default
3731
}
3832
}
3933

40-
// SetRateLimit sets the minimum time between transaction generations
41-
func (d *Dispatcher) SetRateLimit(duration time.Duration) {
42-
d.mu.Lock()
43-
defer d.mu.Unlock()
44-
d.limiter = rate.NewLimiter(rate.Every(duration), 1)
45-
}
46-
4734
// SetStatsCollector sets the statistics collector for this dispatcher
4835
func (d *Dispatcher) SetStatsCollector(collector *stats.Collector) {
4936
d.mu.Lock()
@@ -100,14 +87,7 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error {
10087

10188
// Start begins the dispatcher's transaction generation and sending loop
10289
func (d *Dispatcher) Run(ctx context.Context) error {
103-
d.mu.RLock()
104-
limiter := d.limiter
105-
d.mu.RUnlock()
106-
10790
for {
108-
if err := limiter.Wait(ctx); err != nil {
109-
return err
110-
}
11191
// Generate a transaction from main generator
11292
tx, ok := d.generator.Generate()
11393
if !ok {
@@ -130,13 +110,7 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error {
130110
if count <= 0 {
131111
return fmt.Errorf("count must be positive")
132112
}
133-
d.mu.RLock()
134-
limiter := d.limiter
135-
d.mu.RUnlock()
136113
for i := range count {
137-
if err := limiter.Wait(ctx); err != nil {
138-
return err
139-
}
140114
// Generate a transaction
141115
tx, ok := d.generator.Generate()
142116
if !ok {

sender/sharded_sender.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"sync"
77

8+
"golang.org/x/time/rate"
9+
810
"github.com/sei-protocol/sei-load/config"
911
"github.com/sei-protocol/sei-load/stats"
1012
"github.com/sei-protocol/sei-load/types"
@@ -21,23 +23,25 @@ type ShardedSender struct {
2123
mu sync.RWMutex
2224
collector *stats.Collector
2325
logger *stats.Logger
26+
limiter *rate.Limiter // Shared rate limiter for all workers
2427
}
2528

2629
// NewShardedSender creates a new sharded sender with workers for each endpoint
27-
func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int) (*ShardedSender, error) {
30+
func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limiter *rate.Limiter) (*ShardedSender, error) {
2831
if len(cfg.Endpoints) == 0 {
2932
return nil, fmt.Errorf("no endpoints configured")
3033
}
3134

3235
workerList := make([]*Worker, len(cfg.Endpoints))
3336
for i, endpoint := range cfg.Endpoints {
34-
workerList[i] = NewWorker(i, endpoint, bufferSize, workers)
37+
workerList[i] = NewWorker(i, endpoint, bufferSize, workers, limiter)
3538
}
3639

3740
return &ShardedSender{
3841
workers: workerList,
3942
numShards: len(cfg.Endpoints),
4043
bufferSize: bufferSize,
44+
limiter: limiter,
4145
}, nil
4246
}
4347

sender/worker.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"github.com/ethereum/go-ethereum/ethclient"
87
"io"
98
"log"
109
"net"
1110
"net/http"
1211
"time"
1312

13+
"github.com/ethereum/go-ethereum/ethclient"
14+
"golang.org/x/time/rate"
15+
1416
"github.com/sei-protocol/sei-load/stats"
1517
"github.com/sei-protocol/sei-load/types"
1618
"github.com/sei-protocol/sei-load/utils"
@@ -29,6 +31,7 @@ type Worker struct {
2931
logger *stats.Logger
3032
workers int
3133
trackReceipts bool
34+
limiter *rate.Limiter // Shared rate limiter for transaction sending
3235
}
3336

3437
func newHttpClient() *http.Client {
@@ -50,14 +53,15 @@ func newHttpClient() *http.Client {
5053
}
5154

5255
// NewWorker creates a new worker for a specific endpoint
53-
func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker {
56+
func NewWorker(id int, endpoint string, bufferSize int, workers int, limiter *rate.Limiter) *Worker {
5457
return &Worker{
5558
id: id,
5659
endpoint: endpoint,
5760
txChan: make(chan *types.LoadTx, bufferSize),
5861
sentTxs: make(chan *types.LoadTx, bufferSize),
5962
workers: workers,
6063
trackReceipts: false,
64+
limiter: limiter,
6165
}
6266
}
6367

@@ -152,6 +156,14 @@ func (w *Worker) processTransactions(ctx context.Context, client *http.Client) e
152156
if err != nil {
153157
return err
154158
}
159+
160+
// Apply rate limiting before sending the transaction
161+
if w.limiter != nil {
162+
if err := w.limiter.Wait(ctx); err != nil {
163+
return err
164+
}
165+
}
166+
155167
startTime := time.Now()
156168
err = w.sendTransaction(ctx, client, tx)
157169
// Record statistics if collector is available

0 commit comments

Comments
 (0)