Skip to content

Commit fc2dfd8

Browse files
committed
feat: add process time histogram metrics
1 parent e865741 commit fc2dfd8

11 files changed

Lines changed: 141 additions & 34 deletions

File tree

app/app.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/sprintertech/sprinter-signing/chains/evm/calls/events"
3535
evmListener "github.com/sprintertech/sprinter-signing/chains/evm/listener"
3636
evmMessage "github.com/sprintertech/sprinter-signing/chains/evm/message"
37+
"github.com/sprintertech/sprinter-signing/metrics"
3738

3839
lifiConfig "github.com/sprintertech/lifi-solver/pkg/config"
3940
"github.com/sprintertech/sprinter-signing/chains/lighter"
@@ -44,7 +45,6 @@ import (
4445
"github.com/sprintertech/sprinter-signing/config"
4546
"github.com/sprintertech/sprinter-signing/jobs"
4647
"github.com/sprintertech/sprinter-signing/keyshare"
47-
"github.com/sprintertech/sprinter-signing/metrics"
4848
"github.com/sprintertech/sprinter-signing/price"
4949
"github.com/sprintertech/sprinter-signing/protocol/across"
5050
"github.com/sprintertech/sprinter-signing/protocol/lifi"
@@ -109,16 +109,8 @@ func Run() error {
109109
panicOnError(err)
110110
log.Info().Str("peerID", host.ID().String()).Msg("Successfully created libp2p host")
111111

112-
communication := p2p.NewCommunication(host, "p2p/sprinter")
113-
electorFactory := elector.NewCoordinatorElectorFactory(host, configuration.RelayerConfig.BullyConfig)
114-
coordinator := tss.NewCoordinator(host, communication, electorFactory)
115-
116-
db, err := lvldb.NewLvlDB(viper.GetString(config.BlockstoreFlagName))
117-
if err != nil {
118-
panicOnError(err)
119-
}
120-
blockstore := store.NewBlockStore(db)
121-
keyshareStore := keyshare.NewECDSAKeyshareStore(configuration.RelayerConfig.MpcConfig.KeysharePath)
112+
ctx, cancel := context.WithCancel(context.Background())
113+
defer cancel()
122114

123115
mp, err := observability.InitMetricProvider(context.Background(), configuration.RelayerConfig.OpenTelemetryCollectorURL)
124116
panicOnError(err)
@@ -127,22 +119,30 @@ func Run() error {
127119
log.Error().Msgf("Error shutting down meter provider: %v", err)
128120
}
129121
}()
122+
sygmaMetrics, err := metrics.NewSprinterMetrics(ctx, mp.Meter("relayer-metric-provider"), configuration.RelayerConfig.Env, configuration.RelayerConfig.Id, Version)
123+
if err != nil {
124+
panic(err)
125+
}
130126

131-
ctx, cancel := context.WithCancel(context.Background())
132-
defer cancel()
127+
communication := p2p.NewCommunication(host, "p2p/sprinter")
128+
electorFactory := elector.NewCoordinatorElectorFactory(host, configuration.RelayerConfig.BullyConfig)
129+
coordinator := tss.NewCoordinator(host, communication, sygmaMetrics, electorFactory)
133130

134-
sygmaMetrics, err := metrics.NewSygmaMetrics(ctx, mp.Meter("relayer-metric-provider"), configuration.RelayerConfig.Env, configuration.RelayerConfig.Id, Version)
131+
db, err := lvldb.NewLvlDB(viper.GetString(config.BlockstoreFlagName))
135132
if err != nil {
136-
panic(err)
133+
panicOnError(err)
137134
}
135+
blockstore := store.NewBlockStore(db)
136+
keyshareStore := keyshare.NewECDSAKeyshareStore(configuration.RelayerConfig.MpcConfig.KeysharePath)
137+
138138
msgChan := make(chan []*message.Message)
139139
sigChn := make(chan interface{})
140140

141141
priceAPI := price.NewCoinmarketcapAPI(
142142
configuration.RelayerConfig.CoinmarketcapConfig.Url,
143143
configuration.RelayerConfig.CoinmarketcapConfig.ApiKey)
144144

145-
signatureCache := cache.NewSignatureCache(communication)
145+
signatureCache := cache.NewSignatureCache(communication, sygmaMetrics)
146146
go signatureCache.Watch(ctx, sigChn)
147147

148148
supportedChains := make(map[uint64]struct{})

cache/signature.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,25 @@ const (
1616
SIGNATURE_TTL = time.Minute * 10
1717
)
1818

19+
type Metrics interface {
20+
EndProcess(sessionID string)
21+
}
22+
1923
type SignatureCache struct {
2024
sigCache *ttlcache.Cache[string, []byte]
2125
comm comm.Communication
26+
metrics Metrics
2227
}
2328

24-
func NewSignatureCache(c comm.Communication) *SignatureCache {
29+
func NewSignatureCache(c comm.Communication, metrics Metrics) *SignatureCache {
2530
cache := ttlcache.New(
2631
ttlcache.WithTTL[string, []byte](SIGNATURE_TTL),
2732
)
2833

2934
sc := &SignatureCache{
3035
sigCache: cache,
3136
comm: c,
37+
metrics: metrics,
3238
}
3339

3440
go cache.Start()
@@ -43,7 +49,7 @@ func (s *SignatureCache) Subscribe(ctx context.Context, id string, sigChannel ch
4349
return
4450
}
4551

46-
ping := time.Tick(time.Millisecond * 250)
52+
ping := time.Tick(time.Millisecond * 50)
4753
for {
4854
select {
4955
case <-ctx.Done():
@@ -81,6 +87,7 @@ func (s *SignatureCache) Watch(ctx context.Context, sigChn chan interface{}) {
8187
{
8288
sig := sig.(signing.EcdsaSignature)
8389
s.sigCache.Set(sig.ID, sig.Signature, ttlcache.DefaultTTL)
90+
s.metrics.EndProcess(sig.ID)
8491
}
8592
case msg := <-msgChn:
8693
{
@@ -92,6 +99,7 @@ func (s *SignatureCache) Watch(ctx context.Context, sigChn chan interface{}) {
9299

93100
log.Debug().Msgf("Received signature for ID: %s", msg.ID)
94101
s.sigCache.Set(msg.ID, msg.Signature, ttlcache.DefaultTTL)
102+
s.metrics.EndProcess(msg.ID)
95103
}
96104
case <-ctx.Done():
97105
{

cache/signature_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
mock_communication "github.com/sprintertech/sprinter-signing/comm/mock"
1111
"github.com/sprintertech/sprinter-signing/tss/ecdsa/signing"
1212
"github.com/sprintertech/sprinter-signing/tss/message"
13+
mock_tss "github.com/sprintertech/sprinter-signing/tss/mock"
1314
"github.com/stretchr/testify/suite"
1415
"go.uber.org/mock/gomock"
1516
)
@@ -21,6 +22,7 @@ type SignatureCacheTestSuite struct {
2122
ctx context.Context
2223

2324
mockCommunication *mock_communication.MockCommunication
25+
mockMetrics *mock_tss.MockMetrics
2426
cancel context.CancelFunc
2527
sigChn chan interface{}
2628
msgChn chan *comm.WrappedMessage
@@ -46,7 +48,8 @@ func (s *SignatureCacheTestSuite) SetupTest() {
4648
s.cancel = cancel
4749
s.ctx = ctx
4850

49-
s.sc = cache.NewSignatureCache(s.mockCommunication)
51+
s.mockMetrics = mock_tss.NewMockMetrics(gomock.NewController(s.T()))
52+
s.sc = cache.NewSignatureCache(s.mockCommunication, s.mockMetrics)
5053
go s.sc.Watch(s.ctx, s.sigChn)
5154
time.Sleep(time.Millisecond * 100)
5255
}
@@ -65,6 +68,7 @@ func (s *SignatureCacheTestSuite) Test_Signature_ValidSignatureResult() {
6568
Signature: []byte("signature"),
6669
ID: "signatureID",
6770
}
71+
s.mockMetrics.EXPECT().EndProcess(expectedSig.ID)
6872
s.sigChn <- expectedSig
6973
time.Sleep(time.Millisecond * 100)
7074

@@ -79,6 +83,7 @@ func (s *SignatureCacheTestSuite) Test_Signature_ValidMessage() {
7983
Signature: []byte("signature"),
8084
ID: "signatureID",
8185
}
86+
s.mockMetrics.EXPECT().EndProcess(expectedSig.ID)
8287
wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature)
8388
wMsg := &comm.WrappedMessage{
8489
Payload: wMsgBytes,
@@ -98,6 +103,7 @@ func (s *SignatureCacheTestSuite) Test_Subscribe_ValidMessage_EarlyExit() {
98103
Signature: []byte("signature"),
99104
ID: "signatureID",
100105
}
106+
s.mockMetrics.EXPECT().EndProcess(expectedSig.ID)
101107
wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature)
102108
wMsg := &comm.WrappedMessage{
103109
Payload: wMsgBytes,
@@ -122,6 +128,7 @@ func (s *SignatureCacheTestSuite) Test_Subscribe_ValidMessage() {
122128
Signature: []byte("signature"),
123129
ID: "signatureID",
124130
}
131+
s.mockMetrics.EXPECT().EndProcess(expectedSig.ID)
125132
wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature)
126133
wMsg := &comm.WrappedMessage{
127134
Payload: wMsgBytes,

metrics/metrics.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ import (
1111
api "go.opentelemetry.io/otel/metric"
1212
)
1313

14-
type SygmaMetrics struct {
14+
type SprinterMetrics struct {
1515
*observability.RelayerMetrics
1616
*MpcMetrics
1717
*HostMetrics
1818
}
1919

2020
// NewSygmaMetrics creates an instance of metrics
21-
func NewSygmaMetrics(ctx context.Context, meter api.Meter, env, relayerID, version string) (*SygmaMetrics, error) {
21+
func NewSprinterMetrics(ctx context.Context, meter api.Meter, env, relayerID, version string) (*SprinterMetrics, error) {
2222
attributes := []attribute.KeyValue{attribute.String("relayerid", relayerID), attribute.String("env", env), attribute.String("version", version)}
2323
opts := api.WithAttributes(attributes...)
2424
relayerMetrics, err := observability.NewRelayerMetrics(ctx, meter, attributes...)
@@ -36,7 +36,7 @@ func NewSygmaMetrics(ctx context.Context, meter api.Meter, env, relayerID, versi
3636
return nil, err
3737
}
3838

39-
return &SygmaMetrics{
39+
return &SprinterMetrics{
4040
RelayerMetrics: relayerMetrics,
4141
MpcMetrics: mpcMetrics,
4242
HostMetrics: hostMetrics,

metrics/mpc.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package metrics
22

33
import (
44
"context"
5+
"sync"
6+
"time"
57

68
"github.com/libp2p/go-libp2p/core/peer"
9+
"github.com/rs/zerolog/log"
710
"go.opentelemetry.io/otel/metric"
811
)
912

@@ -12,6 +15,10 @@ type MpcMetrics struct {
1215
availableRelayersGauge metric.Int64ObservableGauge
1316
totalRelayerCount *int64
1417
availableRelayerCount *int64
18+
19+
sessionTimeHistogram metric.Float64Histogram
20+
sessionStartTime map[string]time.Time
21+
histogramMutex sync.Mutex
1522
}
1623

1724
// NewMpcMetrics initializes metrics related to the MPC set
@@ -41,15 +48,40 @@ func NewMpcMetrics(ctx context.Context, meter metric.Meter, opts metric.Measurem
4148
return nil, err
4249
}
4350

51+
sessionTimeHistogram, err := meter.Float64Histogram("relayer.SessionTime")
52+
4453
return &MpcMetrics{
4554
totalRelayersGauge: totalRelayersGauge,
4655
availableRelayersGauge: availableRelayersGauge,
4756
totalRelayerCount: totalRelayerCount,
4857
availableRelayerCount: availableRelayerCount,
58+
sessionTimeHistogram: sessionTimeHistogram,
59+
sessionStartTime: make(map[string]time.Time),
60+
histogramMutex: sync.Mutex{},
4961
}, nil
5062
}
5163

5264
func (m *MpcMetrics) TrackRelayerStatus(unavailable peer.IDSlice, all peer.IDSlice) {
5365
*m.totalRelayerCount = int64(len(all))
5466
*m.availableRelayerCount = int64(len(all) - len(unavailable))
5567
}
68+
69+
func (m *MpcMetrics) StartProcess(sessionID string) {
70+
m.histogramMutex.Lock()
71+
defer m.histogramMutex.Unlock()
72+
73+
m.sessionStartTime[sessionID] = time.Now()
74+
}
75+
76+
func (m *MpcMetrics) EndProcess(sessionID string) {
77+
m.histogramMutex.Lock()
78+
defer m.histogramMutex.Unlock()
79+
80+
startTime, ok := m.sessionStartTime[sessionID]
81+
if !ok {
82+
log.Warn().Msgf("Session start time with ID %s not found", sessionID)
83+
return
84+
}
85+
86+
m.sessionTimeHistogram.Record(context.Background(), time.Since(startTime).Seconds())
87+
}

tss/coordinator.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,19 @@ type TssProcess interface {
3737
Timeout() time.Duration
3838
}
3939

40+
type Metrics interface {
41+
StartProcess(sessionID string)
42+
EndProcess(sessionID string)
43+
}
44+
4045
type Coordinator struct {
4146
host host.Host
4247
communication comm.Communication
4348
electorFactory *elector.CoordinatorElectorFactory
4449

4550
pendingProcesses map[string]bool
4651
processLock sync.Mutex
52+
metrics Metrics
4753

4854
CoordinatorTimeout time.Duration
4955
InitiatePeriod time.Duration
@@ -52,12 +58,14 @@ type Coordinator struct {
5258
func NewCoordinator(
5359
host host.Host,
5460
communication comm.Communication,
61+
metrics Metrics,
5562
electorFactory *elector.CoordinatorElectorFactory,
5663
) *Coordinator {
5764
return &Coordinator{
5865
host: host,
5966
communication: communication,
6067
electorFactory: electorFactory,
68+
metrics: metrics,
6169

6270
pendingProcesses: make(map[string]bool),
6371

@@ -79,6 +87,7 @@ func (c *Coordinator) Execute(ctx context.Context, tssProcesses []TssProcess, re
7987
c.processLock.Lock()
8088
c.pendingProcesses[sessionID] = true
8189
c.processLock.Unlock()
90+
c.metrics.StartProcess(sessionID)
8291

8392
ctx, cancel := context.WithCancel(ctx)
8493
p := pool.New().WithContext(ctx).WithCancelOnError()

tss/ecdsa/keygen/keygen_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"testing"
99
"time"
1010

11-
"github.com/golang/mock/gomock"
1211
"github.com/libp2p/go-libp2p/core/peer"
1312
"github.com/sourcegraph/conc/pool"
1413
"github.com/sprintertech/sprinter-signing/comm"
@@ -17,6 +16,7 @@ import (
1716
"github.com/sprintertech/sprinter-signing/tss/ecdsa/keygen"
1817
tsstest "github.com/sprintertech/sprinter-signing/tss/test"
1918
"github.com/stretchr/testify/suite"
19+
"go.uber.org/mock/gomock"
2020
)
2121

2222
type KeygenTestSuite struct {
@@ -40,7 +40,7 @@ func (s *KeygenTestSuite) Test_ValidKeygenProcess() {
4040
communicationMap[host.ID()] = &communication
4141
keygen := keygen.NewKeygen("keygen", s.Threshold, host, &communication, s.MockECDSAStorer)
4242
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
43-
coordinator := tss.NewCoordinator(host, &communication, electorFactory)
43+
coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)
4444
coordinators = append(coordinators, coordinator)
4545
processes = append(processes, keygen)
4646
}
@@ -76,7 +76,7 @@ func (s *KeygenTestSuite) Test_KeygenTimeout() {
7676
communicationMap[host.ID()] = &communication
7777
keygen := keygen.NewKeygen("keygen2", s.Threshold, host, &communication, s.MockECDSAStorer)
7878
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
79-
coordinator := tss.NewCoordinator(host, &communication, electorFactory)
79+
coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)
8080
keygen.TssTimeout = time.Millisecond
8181
coordinators = append(coordinators, coordinator)
8282
processes = append(processes, keygen)

tss/ecdsa/resharing/resharing_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"fmt"
99
"testing"
1010

11-
"github.com/golang/mock/gomock"
1211
"github.com/libp2p/go-libp2p/core/host"
1312
"github.com/libp2p/go-libp2p/core/peer"
1413
"github.com/libp2p/go-libp2p/core/peerstore"
@@ -20,6 +19,7 @@ import (
2019
"github.com/sprintertech/sprinter-signing/tss/ecdsa/resharing"
2120
tsstest "github.com/sprintertech/sprinter-signing/tss/test"
2221
"github.com/stretchr/testify/suite"
22+
"go.uber.org/mock/gomock"
2323
)
2424

2525
type ResharingTestSuite struct {
@@ -60,7 +60,7 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_OldAndNewSubset() {
6060
s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Return(nil)
6161
resharing := resharing.NewResharing("resharing2", 1, host, &communication, s.MockECDSAStorer)
6262
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
63-
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory))
63+
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory))
6464
processes = append(processes, resharing)
6565
}
6666
tsstest.SetupCommunication(communicationMap)
@@ -107,7 +107,7 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_RemovePeer() {
107107
s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Return(nil)
108108
resharing := resharing.NewResharing("resharing2", 1, host, &communication, s.MockECDSAStorer)
109109
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
110-
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory))
110+
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory))
111111
processes = append(processes, resharing)
112112
}
113113
tsstest.SetupCommunication(communicationMap)
@@ -157,7 +157,7 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Le
157157
s.MockECDSAStorer.EXPECT().GetKeyshare().Return(share, nil)
158158
resharing := resharing.NewResharing("resharing3", 1, host, &communication, s.MockECDSAStorer)
159159
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
160-
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory))
160+
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory))
161161
processes = append(processes, resharing)
162162
}
163163
tsstest.SetupCommunication(communicationMap)
@@ -206,7 +206,7 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Bi
206206
s.MockECDSAStorer.EXPECT().GetKeyshare().Return(share, nil)
207207
resharing := resharing.NewResharing("resharing4", 1, host, &communication, s.MockECDSAStorer)
208208
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
209-
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory))
209+
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory))
210210
processes = append(processes, resharing)
211211
}
212212
tsstest.SetupCommunication(communicationMap)

0 commit comments

Comments
 (0)