Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 71 additions & 28 deletions controlplane/telemetry/cmd/geoprobe-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ func main() {
getCurrentSlot: getCurrentSlot,
signedReflector: signedReflector,
metrics: m,
dnsCache: geoprobe.NewDNSCache(5 * time.Minute),
targetUpdateCh: targetUpdateCh,
icmpTargetUpdateCh: icmpTargetUpdateCh,
inboundKeyCh: inboundKeyCh,
Expand Down Expand Up @@ -739,9 +740,12 @@ type measurementLoop struct {
getCurrentSlot func(ctx context.Context) (uint64, error)
signedReflector signed.Reflector
metrics *geoprobe.Metrics
dnsCache *geoprobe.DNSCache

targets []geoprobe.ProbeAddress
icmpTargets []geoprobe.ProbeAddress
targets []geoprobe.ProbeAddress
icmpTargets []geoprobe.ProbeAddress
deliveryAddrs map[geoprobe.ProbeAddress]string
icmpDeliveryAddrs map[geoprobe.ProbeAddress]string

targetUpdateCh <-chan geoprobe.TargetUpdate
icmpTargetUpdateCh <-chan geoprobe.ICMPTargetUpdate
Expand Down Expand Up @@ -815,10 +819,11 @@ func (ml *measurementLoop) run() error {
func(addr geoprobe.ProbeAddress) (uint64, bool) { return ml.pinger.MeasureOne(ml.ctx, addr) },
)
ml.targets = newTargets
ml.deliveryAddrs = update.DeliveryAddrs
ml.metrics.TargetsDiscovered.Set(float64(len(ml.targets)))
ml.log.Info("Updated targets from discovery", "totalTargets", len(ml.targets))
if len(rttData) > 0 {
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics)
ml.sendCompositeOffsets(rttData, ml.deliveryAddrs, nil)
}

case icmpUpdate := <-ml.icmpTargetUpdateCh:
Expand All @@ -830,10 +835,15 @@ func (ml *measurementLoop) run() error {
func(addr geoprobe.ProbeAddress) (uint64, bool) { return ml.icmpPinger.MeasureOne(ml.ctx, addr) },
)
ml.icmpTargets = newTargets
ml.icmpDeliveryAddrs = icmpUpdate.DeliveryAddrs
ml.metrics.IcmpTargetsDiscovered.Set(float64(len(ml.icmpTargets)))
ml.log.Info("Updated ICMP targets from discovery", "totalIcmpTargets", len(ml.icmpTargets))
if len(rttData) > 0 {
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics)
icmpSet := make(map[geoprobe.ProbeAddress]struct{}, len(rttData))
for addr := range rttData {
icmpSet[addr] = struct{}{}
}
ml.sendCompositeOffsets(rttData, ml.icmpDeliveryAddrs, icmpSet)
}

case keyUpdate := <-ml.inboundKeyCh:
Expand Down Expand Up @@ -893,7 +903,21 @@ func (ml *measurementLoop) runCycle() {
ml.log.Debug("target measurement result", "target", addr.Host, "rtt_ms", float64(rttNs)/1000000.0)
}

sent := sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics)
// Merge delivery addrs from both target types and build ICMP set.
// Keys are disjoint: outbound targets have TWAMPPort set, ICMP targets have TWAMPPort=0.
mergedDelivery := make(map[geoprobe.ProbeAddress]string, len(ml.deliveryAddrs)+len(ml.icmpDeliveryAddrs))
for k, v := range ml.deliveryAddrs {
mergedDelivery[k] = v
}
for k, v := range ml.icmpDeliveryAddrs {
mergedDelivery[k] = v
}
icmpSet := make(map[geoprobe.ProbeAddress]struct{}, len(ml.icmpTargets))
for _, t := range ml.icmpTargets {
icmpSet[t] = struct{}{}
}

sent := ml.sendCompositeOffsets(rttData, mergedDelivery, icmpSet)

ml.log.Info("Completed measurement cycle",
"measured", len(rttData),
Expand All @@ -902,33 +926,52 @@ func (ml *measurementLoop) runCycle() {
"total_icmp_targets", len(ml.icmpTargets))
}

func sendCompositeOffsets(
ctx context.Context,
log *slog.Logger,
func (ml *measurementLoop) sendCompositeOffsets(
rttData map[geoprobe.ProbeAddress]uint64,
cache *offsetCache,
signer *geoprobe.OffsetSigner,
senderConn *net.UDPConn,
getCurrentSlot func(ctx context.Context) (uint64, error),
m *geoprobe.Metrics,
deliveryAddrs map[geoprobe.ProbeAddress]string,
icmpTargets map[geoprobe.ProbeAddress]struct{},
) int {
dzdOffset := cache.GetBest()
dzdOffset := ml.cache.GetBest()
if dzdOffset == nil {
log.Warn("No valid DZD offsets in cache, skipping composite generation")
ml.log.Warn("No valid DZD offsets in cache, skipping composite generation")
return 0
}

slot, err := getCurrentSlot(ctx)
slot, err := ml.getCurrentSlot(ml.ctx)
if err != nil {
log.Error("Failed to get current slot", "error", err)
m.Errors.WithLabelValues(geoprobe.ErrorTypeSlotFetch).Inc()
ml.log.Error("Failed to get current slot", "error", err)
ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeSlotFetch).Inc()
return 0
}

log.Debug("fetched current slot", "slot", slot)
ml.log.Debug("fetched current slot", "slot", slot)

sentCount := 0
for addr, measuredRttNs := range rttData {
// Determine where to deliver the offset.
var targetAddr *net.UDPAddr
deliveryDest, hasDelivery := deliveryAddrs[addr]
_, isICMP := icmpTargets[addr]

if hasDelivery {
// Result destination override — resolve (may involve DNS).
resolved, resolveErr := ml.dnsCache.Resolve(deliveryDest)
if resolveErr != nil {
ml.log.Warn("Failed to resolve delivery address, skipping target",
"target", addr, "delivery", deliveryDest, "error", resolveErr)
continue
}
targetAddr = resolved
} else if isICMP {
// ICMP target without a result destination — nothing is listening.
ml.log.Debug("ICMP target has no result destination, skipping offset delivery",
"target", addr.Host)
continue
} else {
// Standard outbound target — send to the measurement address.
targetAddr = &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.Port)}
}

compositeOffset := geoprobe.LocationOffset{
Version: geoprobe.LocationOffsetVersion,
MeasurementSlot: slot,
Expand All @@ -941,23 +984,23 @@ func sendCompositeOffsets(
References: []geoprobe.LocationOffset{*dzdOffset},
}

if err := signer.SignOffset(&compositeOffset); err != nil {
log.Error("Failed to sign composite offset", "target", addr, "error", err)
m.Errors.WithLabelValues(geoprobe.ErrorTypeSignOffset).Inc()
if err := ml.signer.SignOffset(&compositeOffset); err != nil {
ml.log.Error("Failed to sign composite offset", "target", addr, "error", err)
ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeSignOffset).Inc()
continue
}

targetAddr := &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.Port)}
if err := geoprobe.SendOffset(senderConn, targetAddr, &compositeOffset); err != nil {
log.Error("Failed to send composite offset", "target", addr, "error", err)
m.Errors.WithLabelValues(geoprobe.ErrorTypeSendOffset).Inc()
if err := geoprobe.SendOffset(ml.senderConn, targetAddr, &compositeOffset); err != nil {
ml.log.Error("Failed to send composite offset", "target", addr, "error", err)
ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeSendOffset).Inc()
continue
}

sentCount++
m.CompositeOffsetsSent.Inc()
log.Debug("Sent composite offset to target",
ml.metrics.CompositeOffsetsSent.Inc()
ml.log.Debug("Sent composite offset",
"target", addr,
"delivery", targetAddr,
"slot", slot,
"measured_rtt_ns", measuredRttNs,
"total_rtt_ns", compositeOffset.RttNs,
Expand Down
101 changes: 101 additions & 0 deletions controlplane/telemetry/internal/geoprobe/dns_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package geoprobe

import (
"fmt"
"net"
"strconv"
"sync"
"time"
)

// DNSCache resolves host:port strings to *net.UDPAddr, caching DNS lookups
// for domain-based hosts with a configurable TTL.
type DNSCache struct {
mu sync.RWMutex
entries map[string]dnsCacheEntry
ttl time.Duration
now func() time.Time // for testing
lookup func(string) ([]string, error) // for testing
}

type dnsCacheEntry struct {
ip string
expiresAt time.Time
}

// NewDNSCache creates a DNSCache with the given TTL for cached lookups.
func NewDNSCache(ttl time.Duration) *DNSCache {
return &DNSCache{
entries: make(map[string]dnsCacheEntry),
ttl: ttl,
now: time.Now,
lookup: net.LookupHost,
}
}

// Resolve resolves a host:port string to a *net.UDPAddr.
// If the host is an IP address, it is used directly (no caching).
// If the host is a domain name, DNS lookup is performed and the result
// is cached for the configured TTL.
func (c *DNSCache) Resolve(hostPort string) (*net.UDPAddr, error) {
host, portStr, err := net.SplitHostPort(hostPort)
if err != nil {
return nil, fmt.Errorf("invalid address %q: %w", hostPort, err)
}

port, err := strconv.Atoi(portStr)
if err != nil {
return nil, fmt.Errorf("invalid port in %q: %w", hostPort, err)
}

// If host is already an IP, validate scope and use directly.
if ip := net.ParseIP(host); ip != nil {
scopeCheck := ProbeAddress{Host: host}
if err := scopeCheck.ValidateScope(); err != nil {
return nil, fmt.Errorf("delivery address rejected: %w", err)
}
return &net.UDPAddr{IP: ip, Port: port}, nil
}

// Domain name — check cache.
now := c.now()

c.mu.RLock()
entry, ok := c.entries[host]
c.mu.RUnlock()

if ok && now.Before(entry.expiresAt) {
return &net.UDPAddr{IP: net.ParseIP(entry.ip), Port: port}, nil
}

// Cache miss or expired — resolve.
ips, err := c.lookup(host)
if err != nil {
return nil, fmt.Errorf("DNS lookup failed for %q: %w", host, err)
}
if len(ips) == 0 {
return nil, fmt.Errorf("DNS lookup returned no results for %q", host)
}

resolved := ips[0]
resolvedIP := net.ParseIP(resolved)
if resolvedIP == nil {
return nil, fmt.Errorf("DNS lookup for %q returned unparseable IP %q", host, resolved)
}

// Validate resolved IP against scope check to prevent DNS rebinding attacks
// (e.g., domain initially resolves to public IP but later rebinds to internal).
scopeCheck := ProbeAddress{Host: resolved}
if err := scopeCheck.ValidateScope(); err != nil {
return nil, fmt.Errorf("DNS-resolved address for %q rejected: %w", host, err)
}

c.mu.Lock()
c.entries[host] = dnsCacheEntry{
ip: resolved,
expiresAt: now.Add(c.ttl),
}
c.mu.Unlock()

return &net.UDPAddr{IP: resolvedIP, Port: port}, nil
}
Loading
Loading