Skip to content

Commit 7c06b60

Browse files
Merge pull request #3 from sei-protocol/steven/add-user-latency-tracking
Add user latency tracking
2 parents dc00913 + 3d572e1 commit 7c06b60

8 files changed

Lines changed: 160 additions & 32 deletions

File tree

generator/scenarios/EVMTransfer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package scenarios
22

33
import (
44
"math/big"
5+
"time"
56

67
"github.com/ethereum/go-ethereum/common"
78
ethtypes "github.com/ethereum/go-ethereum/core/types"
@@ -49,7 +50,7 @@ func (s *EVMTransferScenario) CreateTransaction(config *config.LoadConfig, scena
4950
tx := &ethtypes.DynamicFeeTx{
5051
Nonce: scenario.Nonce,
5152
To: &scenario.Receiver,
52-
Value: bigOne,
53+
Value: big.NewInt(time.Now().Unix()),
5354
Gas: 21000, // Standard gas limit for ETH transfer
5455
GasTipCap: big.NewInt(2000000000), // 2 gwei
5556
GasFeeCap: big.NewInt(20000000000), // 20 gwei

generator/utils/utils.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package utils
22

33
import (
44
"math/big"
5+
"time"
56

67
"github.com/ethereum/go-ethereum/accounts/abi/bind"
78
"github.com/ethereum/go-ethereum/common"
@@ -47,6 +48,7 @@ func createTransactOpts(chainID *big.Int, account *loadtypes.Account, gasLimit u
4748
// Set transaction parameters
4849
auth.Nonce = big.NewInt(int64(account.Nonce))
4950
auth.NoSend = noSend
51+
auth.Value = big.NewInt(time.Now().Unix())
5052

5153
auth.GasLimit = gasLimit
5254
auth.GasTipCap = big.NewInt(2000000000) // 2 gwei tip (priority fee)

main.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,17 @@ import (
2222
)
2323

2424
var (
25-
configFile string
26-
statsInterval time.Duration
27-
bufferSize int
28-
tps float64
29-
dryRun bool
30-
debug bool
31-
workers int
32-
trackReceipts bool
33-
trackBlocks bool
34-
prewarm bool
25+
configFile string
26+
statsInterval time.Duration
27+
bufferSize int
28+
tps float64
29+
dryRun bool
30+
debug bool
31+
workers int
32+
trackReceipts bool
33+
trackBlocks bool
34+
prewarm bool
35+
trackUserLatency bool
3536
)
3637

3738
var rootCmd = &cobra.Command{
@@ -62,6 +63,7 @@ func init() {
6263
rootCmd.Flags().BoolVarP(&trackReceipts, "track-receipts", "", false, "Track receipts")
6364
rootCmd.Flags().BoolVarP(&trackBlocks, "track-blocks", "", false, "Track blocks")
6465
rootCmd.Flags().BoolVarP(&prewarm, "prewarm", "", false, "Prewarm accounts with self-transactions")
66+
rootCmd.Flags().BoolVarP(&trackUserLatency, "track-user-latency", "", false, "Track user latency")
6567
rootCmd.Flags().IntVarP(&workers, "workers", "w", 1, "Number of workers")
6668

6769
if err := rootCmd.MarkFlagRequired("config"); err != nil {
@@ -83,7 +85,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
8385
// Parse the config file into a config.LoadConfig struct
8486
cfg, err := loadConfig(configFile)
8587
if err != nil {
86-
return fmt.Errorf("Failed to load config: %w", err)
88+
return fmt.Errorf("failed to load config: %w", err)
8789
}
8890

8991
log.Printf("🚀 Starting Sei Chain Load Test v2")
@@ -109,6 +111,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
109111
if prewarm {
110112
log.Printf("📝 Prewarm: enabled")
111113
}
114+
if trackUserLatency {
115+
log.Printf("📝 Track user latency: enabled")
116+
}
112117
log.Println()
113118

114119
// Enable mock deployment in dry-run mode
@@ -124,13 +129,13 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
124129
// Create the generator from the config struct
125130
gen, err := generator.NewConfigBasedGenerator(cfg)
126131
if err != nil {
127-
return fmt.Errorf("Failed to create generator: %w", err)
132+
return fmt.Errorf("failed to create generator: %w", err)
128133
}
129134

130135
// Create the sender from the config struct
131136
snd, err := sender.NewShardedSender(cfg, bufferSize, workers)
132137
if err != nil {
133-
return fmt.Errorf("Failed to create sender: %w", err)
138+
return fmt.Errorf("failed to create sender: %w", err)
134139
}
135140

136141
// Create and start block collector if endpoints are available
@@ -143,6 +148,14 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
143148
})
144149
}
145150

151+
// Create and start user latency tracker if endpoints are available
152+
if len(cfg.Endpoints) > 0 && trackUserLatency {
153+
userLatencyTracker := stats.NewUserLatencyTracker(statsInterval)
154+
s.SpawnBgNamed("user latency tracker", func() error {
155+
return userLatencyTracker.Run(ctx, cfg.Endpoints[0])
156+
})
157+
}
158+
146159
// Enable dry-run mode in sender if specified
147160
if dryRun {
148161
snd.SetDryRun(true)
@@ -187,7 +200,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
187200
// Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions)
188201
if prewarm {
189202
if err := dispatcher.Prewarm(ctx); err != nil {
190-
return fmt.Errorf("Failed to prewarm accounts: %w", err)
203+
return fmt.Errorf("failed to prewarm accounts: %w", err)
191204
}
192205
}
193206

@@ -216,6 +229,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
216229
if trackBlocks {
217230
log.Printf("📝 Track blocks mode: Block data will be collected")
218231
}
232+
if trackUserLatency {
233+
log.Printf("📝 Track user latency mode: User latency will be tracked")
234+
}
219235
log.Print(strings.Repeat("=", 60))
220236

221237
// Main loop - wait for shutdown signal
@@ -240,7 +256,7 @@ func loadConfig(filename string) (*config.LoadConfig, error) {
240256

241257
var cfg config.LoadConfig
242258
if err := json.Unmarshal(data, &cfg); err != nil {
243-
return nil, fmt.Errorf("failed to parse config JSON: %w", err)
259+
return nil, fmt.Errorf("failed to parse config json: %w", err)
244260
}
245261

246262
// Validate configuration

sender/dispatcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error {
140140
// Generate a transaction
141141
tx, ok := d.generator.Generate()
142142
if !ok {
143-
return fmt.Errorf("Dispatcher: Generator returned nil transaction (batch %d/%d)\n", i+1, count)
143+
return fmt.Errorf("dispatcher: generator returned nil transaction (batch %d/%d)", i+1, count)
144144
}
145145
// Send the transaction
146146
if err := d.sender.Send(ctx, tx); err != nil {

sender/worker.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker {
5757
txChan: make(chan *types.LoadTx, bufferSize),
5858
sentTxs: make(chan *types.LoadTx, bufferSize),
5959
workers: workers,
60-
trackReceipts: true,
60+
trackReceipts: false,
6161
}
6262
}
6363

@@ -187,15 +187,19 @@ func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *t
187187
if err != nil {
188188
return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.id, err)
189189
}
190-
defer resp.Body.Close()
191-
192-
// Always read and discard response body to enable connection reuse
193-
// Limit read to prevent memory issues with large responses
194-
_, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB
195-
if err != nil && err != io.EOF {
196-
log.Printf("Worker %d: Failed to read response body: %v", w.id, err)
197-
// Log but don't fail - this is just for connection reuse
198-
}
190+
defer func() {
191+
// Limit read to prevent memory issues with large responses
192+
_, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB
193+
if err != nil && err != io.EOF {
194+
log.Printf("Worker %d: Failed to read response body: %v", w.id, err)
195+
// Log but don't fail - this is just for connection reuse
196+
}
197+
198+
// Close response body and handle error
199+
if closeErr := resp.Body.Close(); closeErr != nil {
200+
log.Printf("Worker %d: Failed to close response body: %v", w.id, closeErr)
201+
}
202+
}()
199203

200204
// Check response status
201205
if resp.StatusCode != http.StatusOK {

stats/logger.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@ func (l *Logger) logCurrentStats() {
111111
}
112112

113113
// Print overall summary line
114-
log.Printf("[%s] throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)",
115-
time.Now().Format("15:04:05"),
114+
log.Printf("throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)",
116115
totalWindowTPS,
117116
totalTxs,
118117
overallAvgLatency.Round(time.Millisecond),
@@ -122,8 +121,7 @@ func (l *Logger) logCurrentStats() {
122121

123122
// Print block statistics if available
124123
if stats.BlockStats != nil && stats.BlockStats.SampleCount > 0 {
125-
log.Printf("[%s] %s",
126-
time.Now().Format("15:04:05"),
124+
log.Printf("%s",
127125
stats.BlockStats.FormatBlockStats())
128126
}
129127

stats/user_latency_tracker.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package stats
2+
3+
import (
4+
"context"
5+
"log"
6+
"math/big"
7+
"sort"
8+
"time"
9+
10+
"github.com/ethereum/go-ethereum/ethclient"
11+
12+
"github.com/sei-protocol/sei-load/utils"
13+
)
14+
15+
// UserLatencyTracker tracks user latency by analyzing block transactions
16+
type UserLatencyTracker struct {
17+
interval time.Duration
18+
}
19+
20+
// NewUserLatencyTracker creates a new user latency tracker
21+
func NewUserLatencyTracker(interval time.Duration) *UserLatencyTracker {
22+
return &UserLatencyTracker{
23+
interval: interval,
24+
}
25+
}
26+
27+
// Run starts the user latency tracking loop
28+
func (ult *UserLatencyTracker) Run(ctx context.Context, endpoint string) error {
29+
// Create ticker for the configured interval
30+
ticker := time.NewTicker(ult.interval)
31+
32+
// Connect to the endpoint
33+
client, err := ethclient.Dial(endpoint)
34+
if err != nil {
35+
return err
36+
}
37+
defer client.Close()
38+
39+
for {
40+
if _, err := utils.Recv(ctx, ticker.C); err != nil {
41+
return err
42+
}
43+
if err := ult.trackLatency(ctx, client); err != nil {
44+
log.Printf("User latency tracker: Error tracking latency: %v", err)
45+
// Continue on error - don't stop the tracker
46+
}
47+
}
48+
}
49+
50+
// trackLatency fetches the latest block and calculates user latency statistics
51+
func (ult *UserLatencyTracker) trackLatency(ctx context.Context, client *ethclient.Client) error {
52+
// Get the latest block with transactions
53+
block, err := client.BlockByNumber(ctx, nil)
54+
if err != nil {
55+
return err
56+
}
57+
58+
// Skip if no transactions
59+
txs := block.Transactions()
60+
if len(txs) == 0 {
61+
log.Printf("User latency tracker: Block %d has no transactions", block.NumberU64())
62+
return nil
63+
}
64+
65+
// Calculate latencies for each transaction
66+
var latencies []time.Duration
67+
blockTimestamp := time.Unix(int64(block.Time()), 0)
68+
69+
for i, tx := range txs {
70+
// Extract timestamp from transaction value (set to time.Now().Unix() during creation)
71+
if tx.Value() != nil && tx.Value().Cmp(big.NewInt(0)) > 0 {
72+
txTimestamp := time.Unix(tx.Value().Int64(), 0)
73+
latency := blockTimestamp.Sub(txTimestamp)
74+
75+
// Only include positive latencies (sanity check)
76+
if latency >= 0 {
77+
latencies = append(latencies, latency)
78+
} else {
79+
log.Printf("User latency tracker: Negative latency detected: %v", latency)
80+
}
81+
} else {
82+
log.Printf("User latency tracker: TX %d has nil or zero value", i)
83+
}
84+
}
85+
86+
// Skip logging if no valid latencies
87+
if len(latencies) == 0 {
88+
log.Printf("User latency tracker: No valid latencies found in block %d", block.NumberU64())
89+
return nil
90+
}
91+
92+
// Calculate statistics
93+
sort.Slice(latencies, func(i, j int) bool {
94+
return latencies[i] < latencies[j]
95+
})
96+
97+
minLatency := latencies[0]
98+
maxLatency := latencies[len(latencies)-1]
99+
p50 := latencies[len(latencies)/2]
100+
101+
// Log the summary
102+
log.Printf("user latency height=%d txs=%d min=%v p50=%v max=%v",
103+
block.NumberU64(), len(latencies),
104+
minLatency, p50, maxLatency)
105+
106+
return nil
107+
}

utils/mutex.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (w *AtomicSend[T]) Update(f func(T) (T, bool)) {
6767
}
6868

6969
func NewAtomicSend[T any](value T) (w AtomicSend[T]) {
70-
w.atomicWatch.ptr.Store(newVersion(value))
70+
w.ptr.Store(newVersion(value))
7171
// nolint:nakedret
7272
return
7373
}

0 commit comments

Comments
 (0)