Skip to content

Commit 6d90159

Browse files
committed
worker upgraded
1 parent 8d2838d commit 6d90159

2 files changed

Lines changed: 30 additions & 77 deletions

File tree

sender/sharded_sender.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ type ShardedSender struct {
2323
mu sync.RWMutex
2424
collector *stats.Collector
2525
logger *stats.Logger
26-
limiter *rate.Limiter // Shared rate limiter for all workers
2726
}
2827

2928
// NewShardedSender creates a new sharded sender with workers for each endpoint
@@ -34,7 +33,7 @@ func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limit
3433

3534
workerList := make([]*Worker, len(cfg.Endpoints))
3635
for i, endpoint := range cfg.Endpoints {
37-
workerList[i] = NewWorker(i, cfg.SeiChainID, endpoint, bufferSize, workers, limiter)
36+
workerList[i] = NewWorker(i, cfg.SeiChainID, endpoint, bufferSize, workers)
3837
}
3938

4039
return &ShardedSender{
@@ -46,13 +45,13 @@ func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limit
4645
}
4746

4847
// Start initializes and starts all workers
49-
func (s *ShardedSender) Run(ctx context.Context) error {
48+
func (s *ShardedSender) Run(ctx context.Context, limiter *rate.Limiter) error {
5049
s.mu.Lock()
5150
workers := s.workers
5251
s.mu.Unlock()
5352
return service.Run(ctx, func(ctx context.Context, s service.Scope) error {
5453
for _, worker := range workers {
55-
s.Spawn(func() error { return worker.Run(ctx) })
54+
s.Spawn(func() error { return worker.Run(ctx,limiter) })
5655
}
5756
return nil
5857
})

sender/worker.go

Lines changed: 27 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
package sender
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
8-
"io"
97
"log"
10-
"net"
11-
"net/http"
128
"time"
139

1410
"github.com/ethereum/go-ethereum"
@@ -36,29 +32,10 @@ type Worker struct {
3632
logger *stats.Logger
3733
workers int
3834
trackReceipts bool
39-
limiter *rate.Limiter // Shared rate limiter for transaction sending
40-
}
41-
42-
func newHttpClient() *http.Client {
43-
return &http.Client{
44-
Timeout: 30 * time.Second,
45-
Transport: &http.Transport{
46-
DialContext: (&net.Dialer{
47-
Timeout: 10 * time.Second,
48-
KeepAlive: 30 * time.Second,
49-
}).DialContext,
50-
MaxIdleConns: 100,
51-
MaxIdleConnsPerHost: 10,
52-
IdleConnTimeout: 90 * time.Second,
53-
TLSHandshakeTimeout: 10 * time.Second,
54-
ExpectContinueTimeout: 1 * time.Second,
55-
DisableKeepAlives: false,
56-
},
57-
}
5835
}
5936

6037
// NewWorker creates a new worker for a specific endpoint
61-
func NewWorker(id int, seiChainID string, endpoint string, bufferSize int, workers int, limiter *rate.Limiter) *Worker {
38+
func NewWorker(id int, seiChainID string, endpoint string, bufferSize int, workers int) *Worker {
6239
w := &Worker{
6340
id: id,
6441
seiChainID: seiChainID,
@@ -67,7 +44,6 @@ func NewWorker(id int, seiChainID string, endpoint string, bufferSize int, worke
6744
sentTxs: make(chan *types.LoadTx, bufferSize),
6845
workers: workers,
6946
trackReceipts: false,
70-
limiter: limiter,
7147
}
7248
meterWorkerQueueLength(w)
7349
return w
@@ -80,12 +56,11 @@ func (w *Worker) SetStatsCollector(collector *stats.Collector, logger *stats.Log
8056
}
8157

8258
// Start begins the worker's processing loop
83-
func (w *Worker) Run(ctx context.Context) error {
59+
func (w *Worker) Run(ctx context.Context, limiter *rate.Limiter) error {
8460
return service.Run(ctx, func(ctx context.Context, s service.Scope) error {
8561
// Start multiple worker goroutines that share the same channel
86-
client := newHttpClient()
8762
for range w.workers {
88-
s.Spawn(func() error { return w.processTransactions(ctx, client) })
63+
s.Spawn(func() error { return w.processTransactions(ctx,limiter) })
8964
}
9065
return w.watchTransactions(ctx)
9166
})
@@ -115,18 +90,21 @@ func (w *Worker) watchTransactions(ctx context.Context) error {
11590
if w.dryRun || !w.trackReceipts {
11691
return nil
11792
}
118-
eth, err := ethclient.Dial(w.endpoint)
93+
94+
// Create a separate ethclient connection for receipt tracking
95+
ethClient, err := ethclient.Dial(w.endpoint)
11996
if err != nil {
12097
return fmt.Errorf("ethclient.Dial(%q): %w", w.endpoint, err)
12198
}
99+
defer ethClient.Close()
122100
for ctx.Err() == nil {
123101
tx, err := utils.Recv(ctx, w.sentTxs)
124102
if err != nil {
125103
return err
126104
}
127105
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
128106
defer cancel()
129-
if err := w.waitForReceipt(ctx, eth, tx); err != nil {
107+
if err := w.waitForReceipt(ctx, ethClient, tx); err != nil {
130108
log.Printf("❌ %v", err)
131109
}
132110
}
@@ -171,35 +149,40 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx *
171149
}
172150

173151
// processTransactions is the main worker loop that processes transactions
174-
func (w *Worker) processTransactions(ctx context.Context, client *http.Client) error {
152+
func (w *Worker) processTransactions(ctx context.Context, limiter *rate.Limiter) error {
153+
// Dial ethclient for this worker goroutine
154+
ethClient, err := ethclient.Dial(w.endpoint)
155+
if err != nil {
156+
return fmt.Errorf("failed to connect to endpoint %s: %w", w.endpoint, err)
157+
}
158+
defer ethClient.Close()
159+
175160
for ctx.Err() == nil {
176161
// Apply rate limiting before getting the next transaction
177-
if w.limiter != nil {
178-
if !w.limiter.Allow() {
179-
continue
180-
}
162+
if err := limiter.Wait(ctx); err!=nil {
163+
return err
181164
}
182-
183165
tx, err := utils.Recv(ctx, w.txChan)
184166
if err != nil {
185167
return err
186168
}
187169

188170
startTime := time.Now()
189-
err = w.sendTransaction(ctx, client, tx)
171+
// TODO: we cannot afford losing transactions due to nonce gaps.
172+
// Consider retries though.
173+
if err := w.sendTransaction(ctx, ethClient, tx); err != nil {
174+
return fmt.Errorf("w.sendTransaction(): %w", err)
175+
}
190176
// Record statistics if collector is available
191177
if w.collector != nil {
192-
w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, time.Since(startTime), err == nil)
193-
}
194-
if err != nil {
195-
log.Printf("%v", err)
178+
w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, time.Since(startTime), false)
196179
}
197180
}
198181
return ctx.Err()
199182
}
200183

201184
// sendTransaction sends a single transaction to the endpoint
202-
func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *types.LoadTx) (_err error) {
185+
func (w *Worker) sendTransaction(ctx context.Context, ethClient *ethclient.Client, tx *types.LoadTx) (_err error) {
203186
defer func(start time.Time) {
204187
metrics.sendLatency.Record(ctx, time.Since(start).Seconds(),
205188
metric.WithAttributes(
@@ -216,40 +199,11 @@ func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *t
216199
return utils.Sleep(ctx, 10*time.Microsecond) // Much faster simulation
217200
}
218201

219-
// Create HTTP request with JSON-RPC payload
220-
req, err := http.NewRequestWithContext(ctx, "POST", w.endpoint, bytes.NewReader(tx.JSONRPCPayload))
221-
if err != nil {
222-
return fmt.Errorf("Worker %d: Failed to create request: %w", w.id, err)
223-
}
224-
225-
// Set headers for JSON-RPC
226-
req.Header.Set("Content-Type", "application/json")
227-
req.Header.Set("Accept", "application/json")
228-
229-
// Send the request
230-
resp, err := client.Do(req)
202+
// Use go-ethereum client to send the transaction
203+
err := ethClient.SendTransaction(ctx, tx.EthTx)
231204
if err != nil {
232205
return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.id, err)
233206
}
234-
defer func() {
235-
// Limit read to prevent memory issues with large responses
236-
_, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB
237-
if err != nil && err != io.EOF {
238-
log.Printf("Worker %d: Failed to read response body: %v", w.id, err)
239-
// Log but don't fail - this is just for connection reuse
240-
}
241-
242-
// Close response body and handle error
243-
if closeErr := resp.Body.Close(); closeErr != nil {
244-
log.Printf("Worker %d: Failed to close response body: %v", w.id, closeErr)
245-
}
246-
}()
247-
248-
// Check response status
249-
if resp.StatusCode != http.StatusOK {
250-
return fmt.Errorf("Worker %d: HTTP error %d for transaction to %s", w.id, resp.StatusCode, w.endpoint)
251-
}
252-
253207
// Write to sentTxs channel without blocking
254208
select {
255209
case w.sentTxs <- tx:

0 commit comments

Comments
 (0)