Skip to content

Commit a840c77

Browse files
committed
revert
1 parent 8be5813 commit a840c77

4 files changed

Lines changed: 86 additions & 35 deletions

File tree

main.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,17 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
160160
}
161161

162162
// Create shared rate limiter for all workers if TPS is specified
163-
sharedLimiter := rate.NewLimiter(rate.Inf, 1) // no limit by default
163+
var sharedLimiter *rate.Limiter
164164
if settings.TPS > 0 {
165165
sharedLimiter = rate.NewLimiter(rate.Limit(settings.TPS), 1)
166166
log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", settings.TPS)
167+
} else {
168+
// No rate limiting
169+
sharedLimiter = rate.NewLimiter(rate.Inf, 1)
167170
}
168171

169172
// Create the sender from the config struct
170-
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers)
173+
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter)
171174
if err != nil {
172175
return fmt.Errorf("failed to create sender: %w", err)
173176
}
@@ -237,7 +240,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
237240
}
238241

239242
// Start the sender (starts all workers)
240-
s.SpawnBgNamed("sender", func() error { return snd.Run(ctx, sharedLimiter) })
243+
s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) })
241244
log.Printf("✅ Connected to %d endpoints", snd.GetNumShards())
242245

243246
// Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions)

profiles/profiles_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestProfilesAlignment(t *testing.T) {
8080
// Use a decoder with DisallowUnknownFields to catch any extra fields
8181
decoder := json.NewDecoder(strings.NewReader(string(data)))
8282
decoder.DisallowUnknownFields()
83-
83+
8484
var strictConfig config.LoadConfig
8585
if err := decoder.Decode(&strictConfig); err != nil {
8686
t.Errorf("Profile %s contains unexpected/unaligned fields: %v", file.Name(), err)

sender/sharded_sender.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,34 +23,36 @@ type ShardedSender struct {
2323
mu sync.RWMutex
2424
collector *stats.Collector
2525
logger *stats.Logger
26+
limiter *rate.Limiter // Shared rate limiter for all workers
2627
}
2728

2829
// NewShardedSender creates a new sharded sender with workers for each endpoint
29-
func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int) (*ShardedSender, error) {
30+
func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limiter *rate.Limiter) (*ShardedSender, error) {
3031
if len(cfg.Endpoints) == 0 {
3132
return nil, fmt.Errorf("no endpoints configured")
3233
}
3334

3435
workerList := make([]*Worker, len(cfg.Endpoints))
3536
for i, endpoint := range cfg.Endpoints {
36-
workerList[i] = NewWorker(i, cfg.SeiChainID, endpoint, bufferSize, workers)
37+
workerList[i] = NewWorker(i, cfg.SeiChainID, endpoint, bufferSize, workers, limiter)
3738
}
3839

3940
return &ShardedSender{
4041
workers: workerList,
4142
numShards: len(cfg.Endpoints),
4243
bufferSize: bufferSize,
44+
limiter: limiter,
4345
}, nil
4446
}
4547

4648
// Start initializes and starts all workers
47-
func (s *ShardedSender) Run(ctx context.Context, limiter *rate.Limiter) error {
49+
func (s *ShardedSender) Run(ctx context.Context) error {
4850
s.mu.Lock()
4951
workers := s.workers
5052
s.mu.Unlock()
5153
return service.Run(ctx, func(ctx context.Context, s service.Scope) error {
5254
for _, worker := range workers {
53-
s.Spawn(func() error { return worker.Run(ctx, limiter) })
55+
s.Spawn(func() error { return worker.Run(ctx) })
5456
}
5557
return nil
5658
})

sender/worker.go

Lines changed: 73 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package sender
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
8+
"io"
79
"log"
10+
"net"
11+
"net/http"
812
"time"
913

1014
"github.com/ethereum/go-ethereum"
@@ -32,10 +36,29 @@ type Worker struct {
3236
logger *stats.Logger
3337
workers int
3438
trackReceipts bool
39+
limiter *rate.Limiter // Shared rate limiter for transaction sending
40+
}
41+
42+
func newHttpClient() *http.Client {
43+
return &http.Client{
44+
Timeout: 30 * time.Second,
45+
Transport: &http.Transport{
46+
DialContext: (&net.Dialer{
47+
Timeout: 10 * time.Second,
48+
KeepAlive: 30 * time.Second,
49+
}).DialContext,
50+
MaxIdleConns: 100,
51+
MaxIdleConnsPerHost: 10,
52+
IdleConnTimeout: 90 * time.Second,
53+
TLSHandshakeTimeout: 10 * time.Second,
54+
ExpectContinueTimeout: 1 * time.Second,
55+
DisableKeepAlives: false,
56+
},
57+
}
3558
}
3659

3760
// NewWorker creates a new worker for a specific endpoint
38-
func NewWorker(id int, seiChainID string, endpoint string, bufferSize int, workers int) *Worker {
61+
func NewWorker(id int, seiChainID string, endpoint string, bufferSize int, workers int, limiter *rate.Limiter) *Worker {
3962
w := &Worker{
4063
id: id,
4164
seiChainID: seiChainID,
@@ -44,6 +67,7 @@ func NewWorker(id int, seiChainID string, endpoint string, bufferSize int, worke
4467
sentTxs: make(chan *types.LoadTx, bufferSize),
4568
workers: workers,
4669
trackReceipts: false,
70+
limiter: limiter,
4771
}
4872
meterWorkerQueueLength(w)
4973
return w
@@ -56,11 +80,12 @@ func (w *Worker) SetStatsCollector(collector *stats.Collector, logger *stats.Log
5680
}
5781

5882
// Start begins the worker's processing loop
59-
func (w *Worker) Run(ctx context.Context, limiter *rate.Limiter) error {
83+
func (w *Worker) Run(ctx context.Context) error {
6084
return service.Run(ctx, func(ctx context.Context, s service.Scope) error {
6185
// Start multiple worker goroutines that share the same channel
86+
client := newHttpClient()
6287
for range w.workers {
63-
s.Spawn(func() error { return w.processTransactions(ctx, limiter) })
88+
s.Spawn(func() error { return w.processTransactions(ctx, client) })
6489
}
6590
return w.watchTransactions(ctx)
6691
})
@@ -90,21 +115,18 @@ func (w *Worker) watchTransactions(ctx context.Context) error {
90115
if w.dryRun || !w.trackReceipts {
91116
return nil
92117
}
93-
94-
// Create a separate ethclient connection for receipt tracking
95-
ethClient, err := ethclient.Dial(w.endpoint)
118+
eth, err := ethclient.Dial(w.endpoint)
96119
if err != nil {
97120
return fmt.Errorf("ethclient.Dial(%q): %w", w.endpoint, err)
98121
}
99-
defer ethClient.Close()
100122
for ctx.Err() == nil {
101123
tx, err := utils.Recv(ctx, w.sentTxs)
102124
if err != nil {
103125
return err
104126
}
105127
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
106128
defer cancel()
107-
if err := w.waitForReceipt(ctx, ethClient, tx); err != nil {
129+
if err := w.waitForReceipt(ctx, eth, tx); err != nil {
108130
log.Printf("❌ %v", err)
109131
}
110132
}
@@ -149,40 +171,35 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx *
149171
}
150172

151173
// processTransactions is the main worker loop that processes transactions
152-
func (w *Worker) processTransactions(ctx context.Context, limiter *rate.Limiter) error {
153-
// Dial ethclient for this worker goroutine
154-
ethClient, err := ethclient.Dial(w.endpoint)
155-
if err != nil {
156-
return fmt.Errorf("failed to connect to endpoint %s: %w", w.endpoint, err)
157-
}
158-
defer ethClient.Close()
159-
174+
func (w *Worker) processTransactions(ctx context.Context, client *http.Client) error {
160175
for ctx.Err() == nil {
161176
// Apply rate limiting before getting the next transaction
162-
if err := limiter.Wait(ctx); err != nil {
163-
return err
177+
if w.limiter != nil {
178+
if !w.limiter.Allow() {
179+
continue
180+
}
164181
}
182+
165183
tx, err := utils.Recv(ctx, w.txChan)
166184
if err != nil {
167185
return err
168186
}
169187

170188
startTime := time.Now()
171-
// TODO: we cannot afford losing transactions due to nonce gaps.
172-
// Consider retries though.
173-
if err := w.sendTransaction(ctx, ethClient, tx); err != nil {
174-
return fmt.Errorf("w.sendTransaction(): %w", err)
175-
}
189+
err = w.sendTransaction(ctx, client, tx)
176190
// Record statistics if collector is available
177191
if w.collector != nil {
178-
w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, time.Since(startTime), false)
192+
w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, time.Since(startTime), err == nil)
193+
}
194+
if err != nil {
195+
log.Printf("%v", err)
179196
}
180197
}
181198
return ctx.Err()
182199
}
183200

184201
// sendTransaction sends a single transaction to the endpoint
185-
func (w *Worker) sendTransaction(ctx context.Context, ethClient *ethclient.Client, tx *types.LoadTx) (_err error) {
202+
func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *types.LoadTx) (_err error) {
186203
defer func(start time.Time) {
187204
metrics.sendLatency.Record(ctx, time.Since(start).Seconds(),
188205
metric.WithAttributes(
@@ -199,11 +216,40 @@ func (w *Worker) sendTransaction(ctx context.Context, ethClient *ethclient.Clien
199216
return utils.Sleep(ctx, 10*time.Microsecond) // Much faster simulation
200217
}
201218

202-
// Use go-ethereum client to send the transaction
203-
err := ethClient.SendTransaction(ctx, tx.EthTx)
219+
// Create HTTP request with JSON-RPC payload
220+
req, err := http.NewRequestWithContext(ctx, "POST", w.endpoint, bytes.NewReader(tx.JSONRPCPayload))
221+
if err != nil {
222+
return fmt.Errorf("Worker %d: Failed to create request: %w", w.id, err)
223+
}
224+
225+
// Set headers for JSON-RPC
226+
req.Header.Set("Content-Type", "application/json")
227+
req.Header.Set("Accept", "application/json")
228+
229+
// Send the request
230+
resp, err := client.Do(req)
204231
if err != nil {
205232
return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.id, err)
206233
}
234+
defer func() {
235+
// Limit read to prevent memory issues with large responses
236+
_, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB
237+
if err != nil && err != io.EOF {
238+
log.Printf("Worker %d: Failed to read response body: %v", w.id, err)
239+
// Log but don't fail - this is just for connection reuse
240+
}
241+
242+
// Close response body and handle error
243+
if closeErr := resp.Body.Close(); closeErr != nil {
244+
log.Printf("Worker %d: Failed to close response body: %v", w.id, closeErr)
245+
}
246+
}()
247+
248+
// Check response status
249+
if resp.StatusCode != http.StatusOK {
250+
return fmt.Errorf("Worker %d: HTTP error %d for transaction to %s", w.id, resp.StatusCode, w.endpoint)
251+
}
252+
207253
// Write to sentTxs channel without blocking
208254
select {
209255
case w.sentTxs <- tx:

0 commit comments

Comments
 (0)