Skip to content

Commit 8be5813

Browse files
committed
fix
1 parent 6d90159 commit 8be5813

4 files changed

Lines changed: 8 additions & 12 deletions

File tree

main.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,17 +160,14 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
160160
}
161161

162162
// Create shared rate limiter for all workers if TPS is specified
163-
var sharedLimiter *rate.Limiter
163+
sharedLimiter := rate.NewLimiter(rate.Inf, 1) // no limit by default
164164
if settings.TPS > 0 {
165165
sharedLimiter = rate.NewLimiter(rate.Limit(settings.TPS), 1)
166166
log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", settings.TPS)
167-
} else {
168-
// No rate limiting
169-
sharedLimiter = rate.NewLimiter(rate.Inf, 1)
170167
}
171168

172169
// Create the sender from the config struct
173-
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter)
170+
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers)
174171
if err != nil {
175172
return fmt.Errorf("failed to create sender: %w", err)
176173
}
@@ -240,7 +237,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
240237
}
241238

242239
// Start the sender (starts all workers)
243-
s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) })
240+
s.SpawnBgNamed("sender", func() error { return snd.Run(ctx, sharedLimiter) })
244241
log.Printf("✅ Connected to %d endpoints", snd.GetNumShards())
245242

246243
// Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions)

profiles/profiles_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestProfilesAlignment(t *testing.T) {
8080
// Use a decoder with DisallowUnknownFields to catch any extra fields
8181
decoder := json.NewDecoder(strings.NewReader(string(data)))
8282
decoder.DisallowUnknownFields()
83-
83+
8484
var strictConfig config.LoadConfig
8585
if err := decoder.Decode(&strictConfig); err != nil {
8686
t.Errorf("Profile %s contains unexpected/unaligned fields: %v", file.Name(), err)

sender/sharded_sender.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type ShardedSender struct {
2626
}
2727

2828
// NewShardedSender creates a new sharded sender with workers for each endpoint
29-
func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limiter *rate.Limiter) (*ShardedSender, error) {
29+
func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int) (*ShardedSender, error) {
3030
if len(cfg.Endpoints) == 0 {
3131
return nil, fmt.Errorf("no endpoints configured")
3232
}
@@ -40,7 +40,6 @@ func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limit
4040
workers: workerList,
4141
numShards: len(cfg.Endpoints),
4242
bufferSize: bufferSize,
43-
limiter: limiter,
4443
}, nil
4544
}
4645

@@ -51,7 +50,7 @@ func (s *ShardedSender) Run(ctx context.Context, limiter *rate.Limiter) error {
5150
s.mu.Unlock()
5251
return service.Run(ctx, func(ctx context.Context, s service.Scope) error {
5352
for _, worker := range workers {
54-
s.Spawn(func() error { return worker.Run(ctx,limiter) })
53+
s.Spawn(func() error { return worker.Run(ctx, limiter) })
5554
}
5655
return nil
5756
})

sender/worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (w *Worker) Run(ctx context.Context, limiter *rate.Limiter) error {
6060
return service.Run(ctx, func(ctx context.Context, s service.Scope) error {
6161
// Start multiple worker goroutines that share the same channel
6262
for range w.workers {
63-
s.Spawn(func() error { return w.processTransactions(ctx,limiter) })
63+
s.Spawn(func() error { return w.processTransactions(ctx, limiter) })
6464
}
6565
return w.watchTransactions(ctx)
6666
})
@@ -159,7 +159,7 @@ func (w *Worker) processTransactions(ctx context.Context, limiter *rate.Limiter)
159159

160160
for ctx.Err() == nil {
161161
// Apply rate limiting before getting the next transaction
162-
if err := limiter.Wait(ctx); err!=nil {
162+
if err := limiter.Wait(ctx); err != nil {
163163
return err
164164
}
165165
tx, err := utils.Recv(ctx, w.txChan)

0 commit comments

Comments
 (0)