Skip to content

Commit 4dca652

Browse files
Update performance metrics to use constants from clientinfo package
- Refactored metric increment calls in libp2p to utilize constants for peer connections, disconnections, and ping tests. - Enhanced coordination window metrics by adding a mutex for safe access to previous window data across goroutines. - Introduced a cleanup goroutine to ensure the end time of the last coordination window is recorded on shutdown.
1 parent 0a5ca29 commit 4dca652

3 files changed

Lines changed: 91 additions & 51 deletions

File tree

pkg/net/libp2p/libp2p.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync/atomic"
88
"time"
99

10+
"github.com/keep-network/keep-core/pkg/clientinfo"
1011
"github.com/keep-network/keep-core/pkg/operator"
1112

1213
"github.com/ipfs/go-log"
@@ -594,7 +595,7 @@ func buildNotifiee(libp2pHost host.Host, p *provider) libp2pnet.Notifiee {
594595
SetGauge(name string, value float64)
595596
RecordDuration(name string, duration time.Duration)
596597
})
597-
recorder.IncrementCounter("peer_connections_total", 1)
598+
recorder.IncrementCounter(clientinfo.MetricPeerConnectionsTotal, 1)
598599
}
599600
}
600601

@@ -629,7 +630,7 @@ func buildNotifiee(libp2pHost host.Host, p *provider) libp2pnet.Notifiee {
629630
SetGauge(name string, value float64)
630631
RecordDuration(name string, duration time.Duration)
631632
})
632-
recorder.IncrementCounter("peer_disconnections_total", 1)
633+
recorder.IncrementCounter(clientinfo.MetricPeerDisconnectionsTotal, 1)
633634
}
634635
}
635636

@@ -658,7 +659,7 @@ func executePingTest(
658659
logger.Infof("starting ping test for [%v]", peerMultiaddress)
659660

660661
if metricsRecorder != nil {
661-
metricsRecorder.IncrementCounter("ping_test_total", 1)
662+
metricsRecorder.IncrementCounter(clientinfo.MetricPingTestsTotal, 1)
662663
}
663664

664665
ctx, cancelCtx := context.WithTimeout(
@@ -679,15 +680,15 @@ func executePingTest(
679680
result.Error,
680681
)
681682
if metricsRecorder != nil {
682-
metricsRecorder.IncrementCounter("ping_test_failed_total", 1)
683+
metricsRecorder.IncrementCounter(clientinfo.MetricPingTestFailedTotal, 1)
683684
}
684685
} else if result.Error == nil && result.RTT == 0 {
685686
logger.Warnf(
686687
"peer test for [%v] failed without clear reason",
687688
peerMultiaddress,
688689
)
689690
if metricsRecorder != nil {
690-
metricsRecorder.IncrementCounter("ping_test_failed_total", 1)
691+
metricsRecorder.IncrementCounter(clientinfo.MetricPingTestFailedTotal, 1)
691692
}
692693
} else {
693694
logger.Infof(
@@ -696,15 +697,15 @@ func executePingTest(
696697
result.RTT,
697698
)
698699
if metricsRecorder != nil {
699-
metricsRecorder.IncrementCounter("ping_test_success_total", 1)
700+
metricsRecorder.IncrementCounter(clientinfo.MetricPingTestSuccessTotal, 1)
700701
// Only record duration on successful ping tests
701-
metricsRecorder.RecordDuration("ping_test_duration_seconds", time.Since(startTime))
702+
metricsRecorder.RecordDuration(clientinfo.MetricPingTestDurationSeconds, time.Since(startTime))
702703
}
703704
}
704705
case <-ctx.Done():
705706
logger.Warnf("ping test for [%v] timed out", peerMultiaddress)
706707
if metricsRecorder != nil {
707-
metricsRecorder.IncrementCounter("ping_test_failed_total", 1)
708+
metricsRecorder.IncrementCounter(clientinfo.MetricPingTestFailedTotal, 1)
708709
}
709710
}
710711
}

pkg/tbtc/coordination_window_metrics.go

Lines changed: 65 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@ func (cwm *coordinationWindowMetrics) recordWindowStart(window *coordinationWind
111111
cwm.mu.Lock()
112112
defer cwm.mu.Unlock()
113113

114+
cwm.initializeWindowIfNeeded(window)
115+
}
116+
117+
// initializeWindowIfNeeded initializes window metrics if they don't exist.
118+
// This function assumes the caller already holds cwm.mu.Lock().
119+
func (cwm *coordinationWindowMetrics) initializeWindowIfNeeded(window *coordinationWindow) {
114120
windowIndex := window.index()
115121
if windowIndex == 0 {
116122
// Invalid window, skip
@@ -152,6 +158,11 @@ func (cwm *coordinationWindowMetrics) recordWindowEnd(window *coordinationWindow
152158
return
153159
}
154160

161+
// Don't overwrite EndTime if it's already been set
162+
if !wm.EndTime.IsZero() {
163+
return
164+
}
165+
155166
wm.EndTime = time.Now()
156167
wm.Duration = wm.EndTime.Sub(wm.StartTime)
157168

@@ -162,9 +173,6 @@ func (cwm *coordinationWindowMetrics) recordWindowEnd(window *coordinationWindow
162173
clientinfo.MetricCoordinationWindowDurationSeconds,
163174
wm.Duration,
164175
)
165-
166-
// Record window-level gauges
167-
cwm.recordWindowGauges(windowIndex, wm)
168176
}
169177
}
170178

@@ -191,7 +199,8 @@ func (cwm *coordinationWindowMetrics) recordWalletCoordination(
191199
wm, exists := cwm.windows[windowIndex]
192200
if !exists {
193201
// Window not initialized, initialize it now
194-
cwm.recordWindowStart(window)
202+
// Note: we already hold the lock, so use the lock-free helper
203+
cwm.initializeWindowIfNeeded(window)
195204
wm = cwm.windows[windowIndex]
196205
}
197206

@@ -246,37 +255,6 @@ func (cwm *coordinationWindowMetrics) recordWalletCoordination(
246255
wm.WalletCoordinationDetails = append(wm.WalletCoordinationDetails, detail)
247256
}
248257

249-
// recordWindowGauges records gauge metrics for a specific window.
250-
func (cwm *coordinationWindowMetrics) recordWindowGauges(
251-
windowIndex uint64,
252-
wm *windowMetrics,
253-
) {
254-
// Record window-level gauges with window index suffix
255-
// These allow tracking individual window metrics
256-
windowSuffix := fmt.Sprintf("_window_%d", windowIndex)
257-
258-
cwm.performanceMetrics.SetGauge(
259-
clientinfo.MetricCoordinationWindowWalletsCoordinated+windowSuffix,
260-
float64(wm.WalletsCoordinated),
261-
)
262-
cwm.performanceMetrics.SetGauge(
263-
clientinfo.MetricCoordinationWindowWalletsSuccessful+windowSuffix,
264-
float64(wm.WalletsSuccessful),
265-
)
266-
cwm.performanceMetrics.SetGauge(
267-
clientinfo.MetricCoordinationWindowWalletsFailed+windowSuffix,
268-
float64(wm.WalletsFailed),
269-
)
270-
cwm.performanceMetrics.SetGauge(
271-
clientinfo.MetricCoordinationWindowTotalFaults+windowSuffix,
272-
float64(wm.TotalFaults),
273-
)
274-
cwm.performanceMetrics.SetGauge(
275-
clientinfo.MetricCoordinationWindowCoordinationBlock+windowSuffix,
276-
float64(wm.CoordinationBlock),
277-
)
278-
}
279-
280258
// GetWindowMetrics returns metrics for a specific window.
281259
func (cwm *coordinationWindowMetrics) GetWindowMetrics(windowIndex uint64) (*windowMetrics, bool) {
282260
cwm.mu.RLock()
@@ -287,9 +265,8 @@ func (cwm *coordinationWindowMetrics) GetWindowMetrics(windowIndex uint64) (*win
287265
return nil, false
288266
}
289267

290-
// Return a copy to avoid race conditions
291-
wmCopy := *wm
292-
return &wmCopy, true
268+
// Return a deep copy to avoid race conditions
269+
return wm.deepCopy(), true
293270
}
294271

295272
// GetRecentWindows returns metrics for the most recent N windows.
@@ -317,12 +294,11 @@ func (cwm *coordinationWindowMetrics) GetRecentWindows(limit int) []*windowMetri
317294
indices = indices[:limit]
318295
}
319296

320-
// Return copies
297+
// Return deep copies
321298
result := make([]*windowMetrics, 0, len(indices))
322299
for _, idx := range indices {
323300
wm := cwm.windows[idx]
324-
wmCopy := *wm
325-
result = append(result, &wmCopy)
301+
result = append(result, wm.deepCopy())
326302
}
327303

328304
return result
@@ -376,8 +352,7 @@ func (cwm *coordinationWindowMetrics) GetSummary() WindowMetricsSummary {
376352
summary.TotalWalletsFailed += wm.WalletsFailed
377353
summary.TotalFaults += wm.TotalFaults
378354

379-
wmCopy := *wm
380-
summary.Windows = append(summary.Windows, &wmCopy)
355+
summary.Windows = append(summary.Windows, wm.deepCopy())
381356
}
382357

383358
return summary
@@ -393,6 +368,53 @@ type WindowMetricsSummary struct {
393368
Windows []*windowMetrics `json:"windows"`
394369
}
395370

371+
// deepCopy creates a deep copy of windowMetrics, properly copying all maps and slices.
372+
func (wm *windowMetrics) deepCopy() *windowMetrics {
373+
if wm == nil {
374+
return nil
375+
}
376+
377+
wmCopy := &windowMetrics{
378+
WindowIndex: wm.WindowIndex,
379+
CoordinationBlock: wm.CoordinationBlock,
380+
StartTime: wm.StartTime,
381+
EndTime: wm.EndTime,
382+
Duration: wm.Duration,
383+
ActivePhaseEndBlock: wm.ActivePhaseEndBlock,
384+
EndBlock: wm.EndBlock,
385+
WalletsCoordinated: wm.WalletsCoordinated,
386+
WalletsSuccessful: wm.WalletsSuccessful,
387+
WalletsFailed: wm.WalletsFailed,
388+
TotalProceduresStarted: wm.TotalProceduresStarted,
389+
TotalProceduresCompleted: wm.TotalProceduresCompleted,
390+
TotalFaults: wm.TotalFaults,
391+
Leaders: make(map[string]uint64, len(wm.Leaders)),
392+
ActionTypes: make(map[string]uint64, len(wm.ActionTypes)),
393+
FaultsByType: make(map[string]uint64, len(wm.FaultsByType)),
394+
FaultsByCulprit: make(map[string]uint64, len(wm.FaultsByCulprit)),
395+
WalletCoordinationDetails: make([]walletCoordinationDetail, len(wm.WalletCoordinationDetails)),
396+
}
397+
398+
// Deep copy maps
399+
for k, v := range wm.Leaders {
400+
wmCopy.Leaders[k] = v
401+
}
402+
for k, v := range wm.ActionTypes {
403+
wmCopy.ActionTypes[k] = v
404+
}
405+
for k, v := range wm.FaultsByType {
406+
wmCopy.FaultsByType[k] = v
407+
}
408+
for k, v := range wm.FaultsByCulprit {
409+
wmCopy.FaultsByCulprit[k] = v
410+
}
411+
412+
// Deep copy slice
413+
copy(wmCopy.WalletCoordinationDetails, wm.WalletCoordinationDetails)
414+
415+
return wmCopy
416+
}
417+
396418
// String returns a string representation of window metrics for logging.
397419
func (wm *windowMetrics) String() string {
398420
return fmt.Sprintf(

pkg/tbtc/node.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,15 +1022,19 @@ func (n *node) runCoordinationLayer(
10221022
coordinationResultChan := make(chan *coordinationResult)
10231023

10241024
// Track the previous window to record its end when a new one starts
1025+
// Use a mutex to safely access from multiple goroutines
1026+
var previousWindowMu sync.Mutex
10251027
var previousWindow *coordinationWindow
10261028

10271029
// Prepare a callback function that will be called every time a new
10281030
// coordination window is detected.
10291031
onWindowFn := func(window *coordinationWindow) {
1032+
previousWindowMu.Lock()
10301033
// Record end of previous window if it exists
10311034
if previousWindow != nil && n.windowMetricsTracker != nil {
10321035
n.windowMetricsTracker.recordWindowEnd(previousWindow)
10331036
}
1037+
previousWindowMu.Unlock()
10341038

10351039
// Track coordination window detection
10361040
if n.performanceMetrics != nil {
@@ -1042,7 +1046,9 @@ func (n *node) runCoordinationLayer(
10421046
n.windowMetricsTracker.recordWindowStart(window)
10431047
}
10441048

1049+
previousWindowMu.Lock()
10451050
previousWindow = window
1051+
previousWindowMu.Unlock()
10461052

10471053
// Fetch all wallets controlled by the node. It is important to
10481054
// get the wallets every time the window is triggered as the
@@ -1085,6 +1091,17 @@ func (n *node) runCoordinationLayer(
10851091
}
10861092
}()
10871093

1094+
// Start a cleanup goroutine to record the end time of the last window on shutdown
1095+
go func() {
1096+
<-ctx.Done()
1097+
// Record end time for the active window if it exists and hasn't been ended yet
1098+
previousWindowMu.Lock()
1099+
if previousWindow != nil && n.windowMetricsTracker != nil {
1100+
n.windowMetricsTracker.recordWindowEnd(previousWindow)
1101+
}
1102+
previousWindowMu.Unlock()
1103+
}()
1104+
10881105
return nil
10891106
}
10901107

0 commit comments

Comments
 (0)