Skip to content

Commit 5f8bd1b

Browse files
committed
add user latency tracking
1 parent dc00913 commit 5f8bd1b

8 files changed

Lines changed: 178 additions & 37 deletions

File tree

generator/scenarios/EVMTransfer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package scenarios
22

33
import (
44
"math/big"
5+
"time"
56

67
"github.com/ethereum/go-ethereum/common"
78
ethtypes "github.com/ethereum/go-ethereum/core/types"
@@ -49,7 +50,7 @@ func (s *EVMTransferScenario) CreateTransaction(config *config.LoadConfig, scena
4950
tx := &ethtypes.DynamicFeeTx{
5051
Nonce: scenario.Nonce,
5152
To: &scenario.Receiver,
52-
Value: bigOne,
53+
Value: big.NewInt(time.Now().Unix()),
5354
Gas: 21000, // Standard gas limit for ETH transfer
5455
GasTipCap: big.NewInt(2000000000), // 2 gwei
5556
GasFeeCap: big.NewInt(20000000000), // 20 gwei

generator/utils/utils.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package utils
22

33
import (
44
"math/big"
5+
"time"
56

67
"github.com/ethereum/go-ethereum/accounts/abi/bind"
78
"github.com/ethereum/go-ethereum/common"
@@ -47,6 +48,7 @@ func createTransactOpts(chainID *big.Int, account *loadtypes.Account, gasLimit u
4748
// Set transaction parameters
4849
auth.Nonce = big.NewInt(int64(account.Nonce))
4950
auth.NoSend = noSend
51+
auth.Value = big.NewInt(time.Now().Unix())
5052

5153
auth.GasLimit = gasLimit
5254
auth.GasTipCap = big.NewInt(2000000000) // 2 gwei tip (priority fee)

main.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,17 @@ import (
2222
)
2323

2424
var (
25-
configFile string
26-
statsInterval time.Duration
27-
bufferSize int
28-
tps float64
29-
dryRun bool
30-
debug bool
31-
workers int
32-
trackReceipts bool
33-
trackBlocks bool
34-
prewarm bool
25+
configFile string
26+
statsInterval time.Duration
27+
bufferSize int
28+
tps float64
29+
dryRun bool
30+
debug bool
31+
workers int
32+
trackReceipts bool
33+
trackBlocks bool
34+
prewarm bool
35+
trackUserLatency bool
3536
)
3637

3738
var rootCmd = &cobra.Command{
@@ -62,6 +63,7 @@ func init() {
6263
rootCmd.Flags().BoolVarP(&trackReceipts, "track-receipts", "", false, "Track receipts")
6364
rootCmd.Flags().BoolVarP(&trackBlocks, "track-blocks", "", false, "Track blocks")
6465
rootCmd.Flags().BoolVarP(&prewarm, "prewarm", "", false, "Prewarm accounts with self-transactions")
66+
rootCmd.Flags().BoolVarP(&trackUserLatency, "track-user-latency", "", false, "Track user latency")
6567
rootCmd.Flags().IntVarP(&workers, "workers", "w", 1, "Number of workers")
6668

6769
if err := rootCmd.MarkFlagRequired("config"); err != nil {
@@ -83,7 +85,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
8385
// Parse the config file into a config.LoadConfig struct
8486
cfg, err := loadConfig(configFile)
8587
if err != nil {
86-
return fmt.Errorf("Failed to load config: %w", err)
88+
return fmt.Errorf("failed to load config: %w", err)
8789
}
8890

8991
log.Printf("🚀 Starting Sei Chain Load Test v2")
@@ -109,6 +111,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
109111
if prewarm {
110112
log.Printf("📝 Prewarm: enabled")
111113
}
114+
if trackUserLatency {
115+
log.Printf("📝 Track user latency: enabled")
116+
}
112117
log.Println()
113118

114119
// Enable mock deployment in dry-run mode
@@ -124,13 +129,13 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
124129
// Create the generator from the config struct
125130
gen, err := generator.NewConfigBasedGenerator(cfg)
126131
if err != nil {
127-
return fmt.Errorf("Failed to create generator: %w", err)
132+
return fmt.Errorf("failed to create generator: %w", err)
128133
}
129134

130135
// Create the sender from the config struct
131136
snd, err := sender.NewShardedSender(cfg, bufferSize, workers)
132137
if err != nil {
133-
return fmt.Errorf("Failed to create sender: %w", err)
138+
return fmt.Errorf("failed to create sender: %w", err)
134139
}
135140

136141
// Create and start block collector if endpoints are available
@@ -143,6 +148,14 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
143148
})
144149
}
145150

151+
// Create and start user latency tracker if endpoints are available
152+
if len(cfg.Endpoints) > 0 && trackUserLatency {
153+
userLatencyTracker := stats.NewUserLatencyTracker(statsInterval)
154+
s.SpawnBgNamed("user latency tracker", func() error {
155+
return userLatencyTracker.Run(ctx, cfg.Endpoints[0])
156+
})
157+
}
158+
146159
// Enable dry-run mode in sender if specified
147160
if dryRun {
148161
snd.SetDryRun(true)
@@ -187,7 +200,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
187200
// Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions)
188201
if prewarm {
189202
if err := dispatcher.Prewarm(ctx); err != nil {
190-
return fmt.Errorf("Failed to prewarm accounts: %w", err)
203+
return fmt.Errorf("failed to prewarm accounts: %w", err)
191204
}
192205
}
193206

@@ -216,6 +229,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
216229
if trackBlocks {
217230
log.Printf("📝 Track blocks mode: Block data will be collected")
218231
}
232+
if trackUserLatency {
233+
log.Printf("📝 Track user latency mode: User latency will be tracked")
234+
}
219235
log.Print(strings.Repeat("=", 60))
220236

221237
// Main loop - wait for shutdown signal
@@ -240,7 +256,7 @@ func loadConfig(filename string) (*config.LoadConfig, error) {
240256

241257
var cfg config.LoadConfig
242258
if err := json.Unmarshal(data, &cfg); err != nil {
243-
return nil, fmt.Errorf("failed to parse config JSON: %w", err)
259+
return nil, fmt.Errorf("failed to parse config json: %w", err)
244260
}
245261

246262
// Validate configuration

sender/dispatcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,14 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error {
133133
d.mu.RLock()
134134
limiter := d.limiter
135135
d.mu.RUnlock()
136-
for i := range count {
136+
for i := 0; i < count; i++ {
137137
if err := limiter.Wait(ctx); err != nil {
138138
return err
139139
}
140140
// Generate a transaction
141141
tx, ok := d.generator.Generate()
142142
if !ok {
143-
return fmt.Errorf("Dispatcher: Generator returned nil transaction (batch %d/%d)\n", i+1, count)
143+
return fmt.Errorf("dispatcher: generator returned nil transaction (batch %d/%d)", i+1, count)
144144
}
145145
// Send the transaction
146146
if err := d.sender.Send(ctx, tx); err != nil {

sender/worker.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker {
5757
txChan: make(chan *types.LoadTx, bufferSize),
5858
sentTxs: make(chan *types.LoadTx, bufferSize),
5959
workers: workers,
60-
trackReceipts: true,
60+
trackReceipts: false,
6161
}
6262
}
6363

@@ -187,15 +187,19 @@ func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *t
187187
if err != nil {
188188
return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.id, err)
189189
}
190-
defer resp.Body.Close()
191-
192-
// Always read and discard response body to enable connection reuse
193-
// Limit read to prevent memory issues with large responses
194-
_, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB
195-
if err != nil && err != io.EOF {
196-
log.Printf("Worker %d: Failed to read response body: %v", w.id, err)
197-
// Log but don't fail - this is just for connection reuse
198-
}
190+
defer func() {
191+
// Limit read to prevent memory issues with large responses
192+
_, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB
193+
if err != nil && err != io.EOF {
194+
log.Printf("Worker %d: Failed to read response body: %v", w.id, err)
195+
// Log but don't fail - this is just for connection reuse
196+
}
197+
198+
// Close response body and handle error
199+
if closeErr := resp.Body.Close(); closeErr != nil {
200+
log.Printf("Worker %d: Failed to close response body: %v", w.id, closeErr)
201+
}
202+
}()
199203

200204
// Check response status
201205
if resp.StatusCode != http.StatusOK {

stats/logger.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@ func (l *Logger) logCurrentStats() {
111111
}
112112

113113
// Print overall summary line
114-
log.Printf("[%s] throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)",
115-
time.Now().Format("15:04:05"),
114+
log.Printf("throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)",
116115
totalWindowTPS,
117116
totalTxs,
118117
overallAvgLatency.Round(time.Millisecond),
@@ -122,8 +121,7 @@ func (l *Logger) logCurrentStats() {
122121

123122
// Print block statistics if available
124123
if stats.BlockStats != nil && stats.BlockStats.SampleCount > 0 {
125-
log.Printf("[%s] %s",
126-
time.Now().Format("15:04:05"),
124+
log.Printf("%s",
127125
stats.BlockStats.FormatBlockStats())
128126
}
129127

stats/user_latency_tracker.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package stats
2+
3+
import (
4+
"context"
5+
"log"
6+
"math/big"
7+
"sort"
8+
"time"
9+
10+
"github.com/ethereum/go-ethereum/ethclient"
11+
)
12+
13+
// UserLatencyTracker tracks user latency by analyzing block transactions
14+
type UserLatencyTracker struct {
15+
endpoint string
16+
interval time.Duration
17+
}
18+
19+
// NewUserLatencyTracker creates a new user latency tracker
20+
func NewUserLatencyTracker(interval time.Duration) *UserLatencyTracker {
21+
return &UserLatencyTracker{
22+
interval: interval,
23+
}
24+
}
25+
26+
// Run starts the user latency tracking loop
27+
func (ult *UserLatencyTracker) Run(ctx context.Context, endpoint string) error {
28+
ult.endpoint = endpoint
29+
30+
// Create ticker for the configured interval
31+
ticker := time.NewTicker(ult.interval)
32+
defer ticker.Stop()
33+
34+
// Connect to the endpoint
35+
client, err := ethclient.Dial(endpoint)
36+
if err != nil {
37+
return err
38+
}
39+
defer client.Close()
40+
41+
for {
42+
select {
43+
case <-ctx.Done():
44+
return ctx.Err()
45+
case <-ticker.C:
46+
if err := ult.trackLatency(ctx, client); err != nil {
47+
log.Printf("User latency tracker: Error tracking latency: %v", err)
48+
// Continue on error - don't stop the tracker
49+
}
50+
}
51+
}
52+
}
53+
54+
// trackLatency fetches the latest block and calculates user latency statistics
55+
func (ult *UserLatencyTracker) trackLatency(ctx context.Context, client *ethclient.Client) error {
56+
// Get the latest block with transactions
57+
block, err := client.BlockByNumber(ctx, nil)
58+
if err != nil {
59+
return err
60+
}
61+
62+
// Skip if no transactions
63+
txs := block.Transactions()
64+
if len(txs) == 0 {
65+
log.Printf("User latency tracker: Block %d has no transactions", block.NumberU64())
66+
return nil
67+
}
68+
69+
// Calculate latencies for each transaction
70+
var latencies []time.Duration
71+
blockTimestamp := time.Unix(int64(block.Time()), 0)
72+
73+
for i, tx := range txs {
74+
// Extract timestamp from transaction value (set to time.Now().Unix() during creation)
75+
if tx.Value() != nil && tx.Value().Cmp(big.NewInt(0)) > 0 {
76+
txTimestamp := time.Unix(tx.Value().Int64(), 0)
77+
latency := blockTimestamp.Sub(txTimestamp)
78+
79+
// Only include positive latencies (sanity check)
80+
if latency >= 0 {
81+
latencies = append(latencies, latency)
82+
} else {
83+
log.Printf("User latency tracker: Negative latency detected: %v", latency)
84+
}
85+
} else {
86+
log.Printf("User latency tracker: TX %d has nil or zero value", i)
87+
}
88+
}
89+
90+
// Skip logging if no valid latencies
91+
if len(latencies) == 0 {
92+
log.Printf("User latency tracker: No valid latencies found in block %d", block.NumberU64())
93+
return nil
94+
}
95+
96+
// Calculate statistics
97+
sort.Slice(latencies, func(i, j int) bool {
98+
return latencies[i] < latencies[j]
99+
})
100+
101+
minLatency := latencies[0]
102+
maxLatency := latencies[len(latencies)-1]
103+
p50 := latencies[len(latencies)/2]
104+
105+
// Log the summary
106+
log.Printf("user latency height=%d txs=%d min=%v p50=%v max=%v",
107+
block.NumberU64(), len(latencies),
108+
minLatency, p50, maxLatency)
109+
110+
return nil
111+
}

utils/mutex.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type atomicWatch[T any] struct {
4949
}
5050

5151
type AtomicSend[T any] struct {
52-
atomicWatch[T]
52+
*atomicWatch[T]
5353
}
5454

5555
// Store updates the value of the atomic watch.
@@ -67,21 +67,22 @@ func (w *AtomicSend[T]) Update(f func(T) (T, bool)) {
6767
}
6868

6969
func NewAtomicSend[T any](value T) (w AtomicSend[T]) {
70-
w.atomicWatch.ptr.Store(newVersion(value))
70+
w.atomicWatch = &atomicWatch[T]{}
71+
w.ptr.Store(newVersion(value))
7172
// nolint:nakedret
7273
return
7374
}
7475

7576
func (w *AtomicSend[T]) Subscribe() AtomicRecv[T] {
76-
return AtomicRecv[T]{&w.atomicWatch}
77+
return AtomicRecv[T]{w.atomicWatch}
7778
}
7879

7980
// AtomicWatch stores a pointer to an IMMUTABLE value.
8081
// Loading and waiting for updates do NOT require locking.
8182
// TODO(gprusak): remove mutex and rename to AtomicSend,
8283
// this will allow for sharing a mutex across multiple AtomicSenders.
8384
type AtomicWatch[T any] struct {
84-
atomicWatch[T]
85+
*atomicWatch[T]
8586
mu sync.Mutex
8687
}
8788

@@ -97,7 +98,7 @@ func NewAtomicWatch[T any](value T) (w AtomicWatch[T]) {
9798

9899
// Subscribe returns a view-only API of the atomic watch.
99100
func (w *AtomicWatch[T]) Subscribe() AtomicRecv[T] {
100-
return AtomicRecv[T]{&w.atomicWatch}
101+
return AtomicRecv[T]{w.atomicWatch}
101102
}
102103

103104
// Load returns the current value of the atomic watch.
@@ -232,3 +233,11 @@ func (w *Watch[T]) Lock() iter.Seq2[T, *WatchCtrl] {
232233
_ = yield(w.val, &w.ctrl)
233234
}
234235
}
236+
237+
// Set updates the value and notifies all watchers
238+
func (w *Mutex[T]) Set(value T) {
239+
newVersion := newVersion(value)
240+
w.mu.Lock()
241+
defer w.mu.Unlock()
242+
w.value = newVersion.value
243+
}

0 commit comments

Comments
 (0)