Skip to content

Commit 6f144d0

Browse files
authored
Add ramping loadtests to test cluster (#15)
This adds a ramping loadtest that can be used test progressive levels of load on a loadtest cluster to identify the point at which we experience failure to meet SLO (which is p90 block times of 1s in this case). This can be run with any scenario as specified, and we can configure later CI to run loadtests for varying scenarios to evaluate TPS under different load profiles.
1 parent 7a15860 commit 6f144d0

11 files changed

Lines changed: 756 additions & 15 deletions

config/settings.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type Settings struct {
2020
TrackBlocks bool `json:"trackBlocks"`
2121
TrackUserLatency bool `json:"trackUserLatency"`
2222
Prewarm bool `json:"prewarm"`
23+
RampUp bool `json:"rampUp"`
2324
}
2425

2526
// DefaultSettings returns the default configuration values
@@ -35,6 +36,7 @@ func DefaultSettings() Settings {
3536
TrackBlocks: false,
3637
TrackUserLatency: false,
3738
Prewarm: false,
39+
RampUp: false,
3840
}
3941
}
4042

@@ -52,6 +54,7 @@ func InitializeViper(cmd *cobra.Command) error {
5254
"prewarm": "prewarm",
5355
"trackUserLatency": "track-user-latency",
5456
"workers": "workers",
57+
"rampUp": "ramp-up",
5558
}
5659

5760
for viperKey, flagName := range flagBindings {
@@ -72,7 +75,7 @@ func InitializeViper(cmd *cobra.Command) error {
7275
viper.SetDefault("prewarm", defaults.Prewarm)
7376
viper.SetDefault("trackUserLatency", defaults.TrackUserLatency)
7477
viper.SetDefault("workers", defaults.Workers)
75-
78+
viper.SetDefault("rampUp", defaults.RampUp)
7679
return nil
7780
}
7881

@@ -103,5 +106,6 @@ func ResolveSettings() Settings {
103106
TrackBlocks: viper.GetBool("trackBlocks"),
104107
TrackUserLatency: viper.GetBool("trackUserLatency"),
105108
Prewarm: viper.GetBool("prewarm"),
109+
RampUp: viper.GetBool("rampUp"),
106110
}
107111
}

config/settings_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func TestArgumentPrecedence(t *testing.T) {
9090
cmd.Flags().Bool("prewarm", false, "Prewarm")
9191
cmd.Flags().Bool("track-user-latency", false, "Track user latency")
9292
cmd.Flags().Int("buffer-size", 0, "Buffer size")
93+
cmd.Flags().Bool("ramp-up", false, "Ramp up loadtest")
9394

9495
// Parse CLI args
9596
if len(tt.cliArgs) > 0 {
@@ -128,6 +129,7 @@ func TestDefaultSettings(t *testing.T) {
128129
TrackBlocks: false,
129130
TrackUserLatency: false,
130131
Prewarm: false,
132+
RampUp: false,
131133
}
132134

133135
if defaults != expected {

main.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ var rootCmd = &cobra.Command{
3636
Short: "Sei Chain Load Test v2",
3737
Long: `A load test generator for Sei Chain.
3838
39-
Supports both contract and non-contract scenarios with factory
40-
and weighted scenario selection mechanisms. Features sharded sending
39+
Supports both contract and non-contract scenarios with factory
40+
and weighted scenario selection mechanisms. Features sharded sending
4141
to multiple endpoints with account pooling management.
4242
43-
Use --dry-run to test configuration and view transaction details
43+
Use --dry-run to test configuration and view transaction details
4444
without actually sending requests or deploying contracts.`,
4545
Run: func(cmd *cobra.Command, args []string) {
4646
if err := runLoadTest(context.Background(), cmd, args); err != nil {
@@ -63,6 +63,7 @@ func init() {
6363
rootCmd.Flags().IntP("workers", "w", 0, "Number of workers")
6464
rootCmd.Flags().IntP("nodes", "n", 0, "Number of nodes/endpoints to use (0 = use all)")
6565
rootCmd.Flags().String("metricsListenAddr", "0.0.0.0:9090", "The ip:port on which to export prometheus metrics.")
66+
rootCmd.Flags().Bool("ramp-up", false, "Ramp up loadtest")
6667

6768
// Initialize Viper with proper error handling
6869
if err := config.InitializeViper(rootCmd); err != nil {
@@ -148,6 +149,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
148149
// Create statistics collector and logger
149150
collector := stats.NewCollector()
150151
logger := stats.NewLogger(collector, settings.StatsInterval, settings.Debug)
152+
var ramper *sender.Ramper
151153

152154
err = service.Run(ctx, func(ctx context.Context, s service.Scope) error {
153155
// Create the generator from the config struct
@@ -159,9 +161,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
159161
// Create shared rate limiter for all workers if TPS is specified
160162
var sharedLimiter *rate.Limiter
161163
if settings.TPS > 0 {
162-
// Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds
163-
intervalNs := int64((1.0 / settings.TPS) * 1e9)
164-
sharedLimiter = rate.NewLimiter(rate.Every(time.Duration(intervalNs)), 1)
164+
sharedLimiter = rate.NewLimiter(rate.Limit(settings.TPS), 1)
165165
log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", settings.TPS)
166166
} else {
167167
// No rate limiting
@@ -184,6 +184,20 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
184184
})
185185
}
186186

187+
if settings.RampUp {
188+
ramperBlockCollector := stats.NewBlockCollector(cfg.SeiChainID)
189+
s.SpawnBgNamed("ramper block collector", func() error {
190+
return ramperBlockCollector.Run(ctx, cfg.Endpoints[0])
191+
})
192+
193+
ramper = sender.NewRamper(
194+
sender.NewRampCurveStep(100, 100, 120*time.Second, 30*time.Second),
195+
ramperBlockCollector,
196+
sharedLimiter,
197+
)
198+
s.SpawnBgNamed("ramper", func() error { return ramper.Run(ctx) })
199+
}
200+
187201
// Create and start user latency tracker if endpoints are available
188202
if len(cfg.Endpoints) > 0 && settings.TrackUserLatency {
189203
userLatencyTracker := stats.NewUserLatencyTracker(settings.StatsInterval)
@@ -274,6 +288,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
274288
})
275289
// Print final statistics
276290
logger.LogFinalStats()
291+
if settings.RampUp && ramper != nil {
292+
ramper.LogFinalStats()
293+
}
277294
log.Printf("👋 Shutdown complete")
278295
return err
279296
}

profiles/evm_transfer.json

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"chainId": 713714,
3+
"seiChainId": "sei-chain",
4+
"endpoints": [
5+
"http://127.0.0.1:8545"
6+
],
7+
"accounts": {
8+
"count": 5000,
9+
"newAccountRate": 0.0
10+
},
11+
"scenarios": [
12+
{
13+
"name": "EVMTransfer",
14+
"weight": 1
15+
}
16+
],
17+
"workers": 1,
18+
"tps": 0,
19+
"statsInterval": "5s",
20+
"bufferSize": 1000,
21+
"dryRun": false,
22+
"debug": false,
23+
"trackReceipts": false,
24+
"trackBlocks": false,
25+
"trackUserLatency": false,
26+
"prewarm": false
27+
}

profiles/local_docker.json

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"chainId": 713714,
3+
"seiChainId": "local-docker",
4+
"endpoints": [
5+
"http://127.0.0.1:8545",
6+
"http://127.0.0.1:8547",
7+
"http://127.0.0.1:8549",
8+
"http://127.0.0.1:8551"
9+
],
10+
"accounts": {
11+
"count": 5000,
12+
"newAccountRate": 0.0
13+
},
14+
"scenarios": [
15+
{
16+
"name": "ERC20",
17+
"weight": 1
18+
}
19+
],
20+
"workers": 1,
21+
"tps": 0,
22+
"statsInterval": "5s",
23+
"bufferSize": 1000,
24+
"dryRun": false,
25+
"debug": false,
26+
"trackReceipts": false,
27+
"trackBlocks": false,
28+
"trackUserLatency": false,
29+
"prewarm": false
30+
}

sender/ramper.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package sender
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"time"
9+
10+
"github.com/sei-protocol/sei-load/stats"
11+
"github.com/sei-protocol/sei-load/utils/service"
12+
"golang.org/x/time/rate"
13+
)
14+
15+
// This will manage the ramping process for the loadtest
16+
// Ramping loadtest will being at the StartTps and spend LoadTime at each step, ending when we violate the chain SLO of
17+
// 1 block per second over a given ramp period (as measured in the back half of the ramp time)
18+
// If we successfully pass a given TPS, we will pause for PauseTime, and then start the next step.
19+
// If we fail to pass a given TPS, we will stop the loadtest.
20+
21+
var ErrRampTestFailedSLO = errors.New("Ramp Test failed SLO")
22+
23+
func (r *Ramper) FormatRampStats() string {
24+
return fmt.Sprintf(`
25+
─────────────────────────────────────────
26+
RAMP STATISTICS
27+
─────────────────────────────────────────
28+
Ramp Curve Stats:
29+
%s
30+
─────────────────────────────────────────
31+
Window Block Stats:
32+
%s
33+
─────────────────────────────────────────`,
34+
r.rampCurve.GetCurveStats(), r.lastWindowStats.FormatBlockStats())
35+
}
36+
37+
type Ramper struct {
38+
sharedLimiter *rate.Limiter
39+
blockCollector stats.BlockStatsProvider
40+
currentTps float64
41+
startTime time.Time
42+
rampCurve RampCurve
43+
lastWindowStats stats.BlockStats
44+
}
45+
46+
// RampCurve is a function that returns the target TPS at a given time in the ramp period
47+
type RampCurve interface {
48+
GetTPS(t time.Duration) float64
49+
GetCurveStats() string
50+
}
51+
52+
func NewRamper(rampCurve RampCurve, blockCollector stats.BlockStatsProvider, sharedLimiter *rate.Limiter) *Ramper {
53+
sharedLimiter.SetLimit(rate.Limit(1)) // reset limiter to 1
54+
return &Ramper{
55+
sharedLimiter: sharedLimiter,
56+
blockCollector: blockCollector,
57+
rampCurve: rampCurve,
58+
}
59+
}
60+
61+
func (r *Ramper) UpdateTPS() {
62+
timeSinceStart := time.Since(r.startTime)
63+
r.currentTps = r.rampCurve.GetTPS(timeSinceStart)
64+
r.sharedLimiter.SetLimit(rate.Limit(r.currentTps))
65+
}
66+
67+
func (r *Ramper) LogFinalStats() {
68+
log.Printf("Final Ramp stats: \n%s", r.FormatRampStats())
69+
}
70+
71+
// WatchSLO will evaluate the chain SLO every 100ms using a 30 second window, and return a channel if the SLO is violated
72+
func (r *Ramper) WatchSLO(ctx context.Context) <-chan struct{} {
73+
ch := make(chan struct{})
74+
go func() {
75+
defer close(ch)
76+
77+
log.Println("🔍 Ramping watching chain SLO with 30s windows, checking every 100ms")
78+
79+
// Two separate timers: frequent SLO checks and window resets
80+
sloCheckTicker := time.NewTicker(100 * time.Millisecond)
81+
windowResetTicker := time.NewTicker(30 * time.Second)
82+
defer sloCheckTicker.Stop()
83+
defer windowResetTicker.Stop()
84+
85+
// Reset window stats at the start
86+
r.blockCollector.ResetWindowStats()
87+
88+
for {
89+
select {
90+
case <-ctx.Done():
91+
return
92+
case <-sloCheckTicker.C:
93+
// Check SLO every 100ms
94+
p90BlockTime := r.blockCollector.GetWindowBlockTimePercentile(90)
95+
if p90BlockTime > 1*time.Second {
96+
log.Printf("❌ SLO violated: 90th percentile block time %v exceeds 1s threshold", p90BlockTime)
97+
select {
98+
case ch <- struct{}{}:
99+
case <-ctx.Done():
100+
}
101+
return
102+
}
103+
case <-windowResetTicker.C:
104+
// Reset window stats every 30 seconds for fresh measurements
105+
log.Printf("🔄 Resetting SLO window stats (30s period)")
106+
// save last window stats
107+
r.lastWindowStats = r.blockCollector.GetWindowBlockStats()
108+
r.blockCollector.ResetWindowStats()
109+
}
110+
}
111+
}()
112+
return ch
113+
}
114+
115+
// Start initializes and starts all workers
116+
func (r *Ramper) Run(ctx context.Context) error {
117+
return service.Run(ctx, func(ctx context.Context, s service.Scope) error {
118+
// TODO: Implement ramping logic
119+
r.startTime = time.Now()
120+
sloChan := r.WatchSLO(ctx)
121+
tpsUpdateTicker := time.NewTicker(100 * time.Millisecond)
122+
for ctx.Err() == nil {
123+
124+
select {
125+
case <-sloChan:
126+
r.sharedLimiter.SetLimit(rate.Limit(1))
127+
log.Printf("❌ Ramping failed to pass SLO, stopping loadtest, failure window blockstats:")
128+
log.Println(r.blockCollector.GetWindowBlockStats().FormatBlockStats())
129+
return ErrRampTestFailedSLO
130+
case <-tpsUpdateTicker.C:
131+
r.UpdateTPS()
132+
case <-ctx.Done():
133+
return ctx.Err()
134+
}
135+
}
136+
return ctx.Err()
137+
})
138+
}
139+
140+
type RampCurveStep struct {
141+
StartTps float64
142+
IncrementTps float64
143+
LoadInterval time.Duration
144+
RecoveryInterval time.Duration
145+
Step int
146+
CurrentTPS float64
147+
}
148+
149+
func NewRampCurveStep(startTps float64, incrementTps float64, loadInterval time.Duration, recoveryInterval time.Duration) *RampCurveStep {
150+
return &RampCurveStep{
151+
StartTps: startTps,
152+
IncrementTps: incrementTps,
153+
LoadInterval: loadInterval,
154+
RecoveryInterval: recoveryInterval,
155+
Step: 0,
156+
CurrentTPS: startTps,
157+
}
158+
}
159+
160+
func (r *RampCurveStep) GetStartTps() float64 {
161+
return r.StartTps
162+
}
163+
164+
func (r *RampCurveStep) GetIncrementTps() float64 {
165+
return r.IncrementTps
166+
}
167+
168+
func (r *RampCurveStep) GetTPS(t time.Duration) float64 {
169+
// figure out where we are in the load interval
170+
cycleInterval := r.LoadInterval + r.RecoveryInterval
171+
cycleProgress := t % cycleInterval
172+
173+
// if we're in the recovery interval, return 1.00 (close to 0 but doesn't fully block the limiter)
174+
if cycleProgress > r.LoadInterval {
175+
return 1.00
176+
}
177+
178+
cycleNumber := int(t / cycleInterval)
179+
180+
// this means we're in a new step, so we need to update step and TPS
181+
if cycleNumber > r.Step {
182+
r.Step = cycleNumber
183+
newTps := r.StartTps + r.IncrementTps*float64(r.Step)
184+
log.Printf("📈 Ramping to %f TPS for %v", newTps, r.LoadInterval)
185+
r.CurrentTPS = newTps
186+
return newTps
187+
}
188+
189+
return r.CurrentTPS
190+
}
191+
192+
// this should return the highest target TPS that is PRIOR to the current step
193+
func (r *RampCurveStep) GetCurveStats() string {
194+
step := r.Step - 1
195+
if step < 0 {
196+
return "no ramp curve stats available"
197+
}
198+
return fmt.Sprintf("Highest Passed TPS: %.2f", r.StartTps+r.IncrementTps*float64(step))
199+
}

0 commit comments

Comments
 (0)