Skip to content

Commit b3b99ce

Browse files
committed
telemetry: consolidate geoprobe-agent metrics into geoprobe package
Move metrics from the shared metrics package into a Metrics struct in the geoprobe package. This replaces package-level promauto globals with explicit dependency injection, adds source/device_pubkey constant labels, and shortens metric names (doublezero_geoprobe_* instead of doublezero_device_geoprobe_agent_*).
1 parent ad91e7f commit b3b99ce

4 files changed

Lines changed: 365 additions & 164 deletions

File tree

controlplane/telemetry/cmd/geoprobe-agent/main.go

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ import (
2222
solanarpc "github.com/gagliardetto/solana-go/rpc"
2323
"github.com/malbeclabs/doublezero/config"
2424
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/geoprobe"
25-
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/metrics"
2625
geolocation "github.com/malbeclabs/doublezero/sdk/geolocation/go"
2726
twamplight "github.com/malbeclabs/doublezero/tools/twamp/pkg/light"
2827
"github.com/malbeclabs/doublezero/tools/twamp/pkg/signed"
28+
"github.com/prometheus/client_golang/prometheus"
2929
"github.com/prometheus/client_golang/prometheus/promhttp"
3030
)
3131

@@ -337,9 +337,11 @@ func main() {
337337
"geoprobe_pubkey", geoProbePubkey,
338338
)
339339

340-
// Set up prometheus metrics server if enabled.
340+
// Set up prometheus metrics.
341+
m := geoprobe.NewMetrics(geoprobe.SourceGeoProbeAgent, geoProbePubkey.String(), prometheus.DefaultRegisterer)
342+
341343
if *metricsEnable {
342-
metrics.GeoProbeBuildInfo.WithLabelValues(version, commit, date).Set(1)
344+
m.BuildInfo.WithLabelValues(version, commit, date).Set(1)
343345
go func() {
344346
listener, err := net.Listen("tcp", *metricsAddr)
345347
if err != nil {
@@ -491,7 +493,7 @@ func main() {
491493

492494
// Run UDP offset listener.
493495
go func() {
494-
runOffsetListener(ctx, log, offsetListener, cache, pState, signedReflector)
496+
runOffsetListener(ctx, log, offsetListener, cache, pState, signedReflector, m)
495497
}()
496498

497499
// Run eviction goroutine.
@@ -552,7 +554,7 @@ func main() {
552554
return
553555
case update := <-parentUpdateCh:
554556
pState.update(update.Authorities)
555-
metrics.GeoProbeParentsDiscovered.Set(float64(len(update.Authorities)))
557+
m.ParentsDiscovered.Set(float64(len(update.Authorities)))
556558
log.Info("Updated parent authorities from discovery",
557559
"totalParents", len(update.Authorities))
558560
}
@@ -585,12 +587,12 @@ func main() {
585587
if pd != nil {
586588
start := time.Now()
587589
pd.Tick(ctx, parentUpdateCh)
588-
metrics.GeoProbeParentDiscoveryDuration.Observe(time.Since(start).Seconds())
590+
m.ParentDiscoveryDuration.Observe(time.Since(start).Seconds())
589591
}
590592
if td != nil {
591593
start := time.Now()
592594
td.Tick(ctx, targetUpdateCh, inboundKeyCh, icmpTargetUpdateCh)
593-
metrics.GeoProbeTargetDiscoveryDuration.Observe(time.Since(start).Seconds())
595+
m.TargetDiscoveryDuration.Observe(time.Since(start).Seconds())
594596
}
595597
}
596598

@@ -621,6 +623,7 @@ func main() {
621623
senderConn: senderConn,
622624
getCurrentSlot: getCurrentSlot,
623625
signedReflector: signedReflector,
626+
metrics: m,
624627
targetUpdateCh: targetUpdateCh,
625628
icmpTargetUpdateCh: icmpTargetUpdateCh,
626629
inboundKeyCh: inboundKeyCh,
@@ -647,6 +650,7 @@ func runOffsetListener(
647650
cache *offsetCache,
648651
parents *parentState,
649652
signedReflector signed.Reflector,
653+
m *geoprobe.Metrics,
650654
) {
651655
log.Info("Starting offset listener", "addr", conn.LocalAddr().String())
652656

@@ -672,7 +676,7 @@ func runOffsetListener(
672676
return
673677
}
674678
log.Warn("Failed to receive offset", "error", err)
675-
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeOffsetReceive).Inc()
679+
m.Errors.WithLabelValues(geoprobe.ErrorTypeOffsetReceive).Inc()
676680
continue
677681
}
678682

@@ -685,7 +689,7 @@ func runOffsetListener(
685689
expectedAuthority, knownParent := parents.getAuthority(offset.SenderPubkey)
686690
if !knownParent {
687691
log.Debug("Rejecting offset from unknown parent", "sender_pubkey", senderPK, "addr", addr)
688-
metrics.GeoProbeOffsetsRejected.WithLabelValues(metrics.GeoProbeRejectUnknownParent).Inc()
692+
m.OffsetsRejected.WithLabelValues(geoprobe.RejectUnknownParent).Inc()
689693
continue
690694
}
691695
if expectedAuthority != offset.AuthorityPubkey {
@@ -694,22 +698,22 @@ func runOffsetListener(
694698
"expected_authority", solana.PublicKeyFromBytes(expectedAuthority[:]).String(),
695699
"actual_authority", authorityPK,
696700
"addr", addr)
697-
metrics.GeoProbeOffsetsRejected.WithLabelValues(metrics.GeoProbeRejectWrongAuthority).Inc()
701+
m.OffsetsRejected.WithLabelValues(geoprobe.RejectWrongAuthority).Inc()
698702
continue
699703
}
700704

701705
// Verify signature chain (top-level and all references).
702706
if err := geoprobe.VerifyOffsetChain(offset); err != nil {
703707
log.Warn("Offset signature verification failed", "authority_pubkey", authorityPK, "addr", addr, "error", err)
704-
metrics.GeoProbeOffsetsRejected.WithLabelValues(metrics.GeoProbeRejectInvalidSignature).Inc()
708+
m.OffsetsRejected.WithLabelValues(geoprobe.RejectInvalidSignature).Inc()
705709
continue
706710
}
707711

708712
log.Debug("signature verification successful", "authority_pubkey", authorityPK)
709713

710714
cache.Put(offset)
711715
signedReflector.SetOffsets(marshalBestOffset(cache))
712-
metrics.GeoProbeOffsetsReceived.Inc()
716+
m.OffsetsReceived.Inc()
713717

714718
log.Debug("Cached DZD offset",
715719
"authority_pubkey", authorityPK,
@@ -734,6 +738,7 @@ type measurementLoop struct {
734738
senderConn *net.UDPConn
735739
getCurrentSlot func(ctx context.Context) (uint64, error)
736740
signedReflector signed.Reflector
741+
metrics *geoprobe.Metrics
737742

738743
targets []geoprobe.ProbeAddress
739744
icmpTargets []geoprobe.ProbeAddress
@@ -810,10 +815,10 @@ func (ml *measurementLoop) run() error {
810815
func(addr geoprobe.ProbeAddress) (uint64, bool) { return ml.pinger.MeasureOne(ml.ctx, addr) },
811816
)
812817
ml.targets = newTargets
813-
metrics.GeoProbeTargetsDiscovered.Set(float64(len(ml.targets)))
818+
ml.metrics.TargetsDiscovered.Set(float64(len(ml.targets)))
814819
ml.log.Info("Updated targets from discovery", "totalTargets", len(ml.targets))
815820
if len(rttData) > 0 {
816-
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot)
821+
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics)
817822
}
818823

819824
case icmpUpdate := <-ml.icmpTargetUpdateCh:
@@ -825,10 +830,10 @@ func (ml *measurementLoop) run() error {
825830
func(addr geoprobe.ProbeAddress) (uint64, bool) { return ml.icmpPinger.MeasureOne(ml.ctx, addr) },
826831
)
827832
ml.icmpTargets = newTargets
828-
metrics.GeoProbeIcmpTargetsDiscovered.Set(float64(len(ml.icmpTargets)))
833+
ml.metrics.IcmpTargetsDiscovered.Set(float64(len(ml.icmpTargets)))
829834
ml.log.Info("Updated ICMP targets from discovery", "totalIcmpTargets", len(ml.icmpTargets))
830835
if len(rttData) > 0 {
831-
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot)
836+
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics)
832837
}
833838

834839
case keyUpdate := <-ml.inboundKeyCh:
@@ -848,7 +853,7 @@ func (ml *measurementLoop) runCycle() {
848853
ml.log.Debug("Starting measurement cycle", "targets", len(ml.targets), "icmpTargets", len(ml.icmpTargets))
849854
start := time.Now()
850855
defer func() {
851-
metrics.GeoProbeMeasurementCycleDuration.Observe(time.Since(start).Seconds())
856+
ml.metrics.MeasurementCycleDuration.Observe(time.Since(start).Seconds())
852857
}()
853858

854859
rttData := make(map[geoprobe.ProbeAddress]uint64)
@@ -857,7 +862,7 @@ func (ml *measurementLoop) runCycle() {
857862
twampResults, err := ml.pinger.MeasureAll(ml.ctx)
858863
if err != nil {
859864
ml.log.Error("Failed to measure TWAMP targets", "error", err)
860-
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeMeasurementCycle).Inc()
865+
ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeMeasurementCycle).Inc()
861866
} else {
862867
for k, v := range twampResults {
863868
rttData[k] = v
@@ -868,10 +873,10 @@ func (ml *measurementLoop) runCycle() {
868873
if len(ml.icmpTargets) > 0 {
869874
icmpStart := time.Now()
870875
icmpResults, err := ml.icmpPinger.MeasureAll(ml.ctx)
871-
metrics.GeoProbeIcmpMeasurementCycleDuration.Observe(time.Since(icmpStart).Seconds())
876+
ml.metrics.IcmpMeasurementCycleDuration.Observe(time.Since(icmpStart).Seconds())
872877
if err != nil {
873878
ml.log.Error("Failed to measure ICMP targets", "error", err)
874-
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeIcmpMeasurementCycle).Inc()
879+
ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeIcmpMeasurementCycle).Inc()
875880
} else {
876881
for k, v := range icmpResults {
877882
rttData[k] = v
@@ -888,7 +893,7 @@ func (ml *measurementLoop) runCycle() {
888893
ml.log.Debug("target measurement result", "target", addr.Host, "rtt_ms", float64(rttNs)/1000000.0)
889894
}
890895

891-
sent := sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot)
896+
sent := sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics)
892897

893898
ml.log.Info("Completed measurement cycle",
894899
"measured", len(rttData),
@@ -905,6 +910,7 @@ func sendCompositeOffsets(
905910
signer *geoprobe.OffsetSigner,
906911
senderConn *net.UDPConn,
907912
getCurrentSlot func(ctx context.Context) (uint64, error),
913+
m *geoprobe.Metrics,
908914
) int {
909915
dzdOffset := cache.GetBest()
910916
if dzdOffset == nil {
@@ -915,7 +921,7 @@ func sendCompositeOffsets(
915921
slot, err := getCurrentSlot(ctx)
916922
if err != nil {
917923
log.Error("Failed to get current slot", "error", err)
918-
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeSlotFetch).Inc()
924+
m.Errors.WithLabelValues(geoprobe.ErrorTypeSlotFetch).Inc()
919925
return 0
920926
}
921927

@@ -937,19 +943,19 @@ func sendCompositeOffsets(
937943

938944
if err := signer.SignOffset(&compositeOffset); err != nil {
939945
log.Error("Failed to sign composite offset", "target", addr, "error", err)
940-
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeSignOffset).Inc()
946+
m.Errors.WithLabelValues(geoprobe.ErrorTypeSignOffset).Inc()
941947
continue
942948
}
943949

944950
targetAddr := &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.Port)}
945951
if err := geoprobe.SendOffset(senderConn, targetAddr, &compositeOffset); err != nil {
946952
log.Error("Failed to send composite offset", "target", addr, "error", err)
947-
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeSendOffset).Inc()
953+
m.Errors.WithLabelValues(geoprobe.ErrorTypeSendOffset).Inc()
948954
continue
949955
}
950956

951957
sentCount++
952-
metrics.GeoProbeCompositeOffsetsSent.Inc()
958+
m.CompositeOffsetsSent.Inc()
953959
log.Debug("Sent composite offset to target",
954960
"target", addr,
955961
"slot", slot,

0 commit comments

Comments
 (0)