diff --git a/controlplane/telemetry/cmd/geoprobe-agent/main.go b/controlplane/telemetry/cmd/geoprobe-agent/main.go index c1bad62495..d513264b44 100644 --- a/controlplane/telemetry/cmd/geoprobe-agent/main.go +++ b/controlplane/telemetry/cmd/geoprobe-agent/main.go @@ -624,6 +624,7 @@ func main() { getCurrentSlot: getCurrentSlot, signedReflector: signedReflector, metrics: m, + dnsCache: geoprobe.NewDNSCache(5 * time.Minute), targetUpdateCh: targetUpdateCh, icmpTargetUpdateCh: icmpTargetUpdateCh, inboundKeyCh: inboundKeyCh, @@ -739,9 +740,12 @@ type measurementLoop struct { getCurrentSlot func(ctx context.Context) (uint64, error) signedReflector signed.Reflector metrics *geoprobe.Metrics + dnsCache *geoprobe.DNSCache - targets []geoprobe.ProbeAddress - icmpTargets []geoprobe.ProbeAddress + targets []geoprobe.ProbeAddress + icmpTargets []geoprobe.ProbeAddress + deliveryAddrs map[geoprobe.ProbeAddress]string + icmpDeliveryAddrs map[geoprobe.ProbeAddress]string targetUpdateCh <-chan geoprobe.TargetUpdate icmpTargetUpdateCh <-chan geoprobe.ICMPTargetUpdate @@ -815,10 +819,11 @@ func (ml *measurementLoop) run() error { func(addr geoprobe.ProbeAddress) (uint64, bool) { return ml.pinger.MeasureOne(ml.ctx, addr) }, ) ml.targets = newTargets + ml.deliveryAddrs = update.DeliveryAddrs ml.metrics.TargetsDiscovered.Set(float64(len(ml.targets))) ml.log.Info("Updated targets from discovery", "totalTargets", len(ml.targets)) if len(rttData) > 0 { - sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics) + ml.sendCompositeOffsets(rttData, ml.deliveryAddrs, nil) } case icmpUpdate := <-ml.icmpTargetUpdateCh: @@ -830,10 +835,15 @@ func (ml *measurementLoop) run() error { func(addr geoprobe.ProbeAddress) (uint64, bool) { return ml.icmpPinger.MeasureOne(ml.ctx, addr) }, ) ml.icmpTargets = newTargets + ml.icmpDeliveryAddrs = icmpUpdate.DeliveryAddrs ml.metrics.IcmpTargetsDiscovered.Set(float64(len(ml.icmpTargets))) ml.log.Info("Updated ICMP targets from discovery", "totalIcmpTargets", len(ml.icmpTargets)) if len(rttData) > 0 { - sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics) + icmpSet := make(map[geoprobe.ProbeAddress]struct{}, len(rttData)) + for addr := range rttData { + icmpSet[addr] = struct{}{} + } + ml.sendCompositeOffsets(rttData, ml.icmpDeliveryAddrs, icmpSet) } case keyUpdate := <-ml.inboundKeyCh: @@ -893,7 +903,21 @@ func (ml *measurementLoop) runCycle() { ml.log.Debug("target measurement result", "target", addr.Host, "rtt_ms", float64(rttNs)/1000000.0) } - sent := sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics) + // Merge delivery addrs from both target types and build ICMP set. + // Keys are disjoint: outbound targets have TWAMPPort set, ICMP targets have TWAMPPort=0. + mergedDelivery := make(map[geoprobe.ProbeAddress]string, len(ml.deliveryAddrs)+len(ml.icmpDeliveryAddrs)) + for k, v := range ml.deliveryAddrs { + mergedDelivery[k] = v + } + for k, v := range ml.icmpDeliveryAddrs { + mergedDelivery[k] = v + } + icmpSet := make(map[geoprobe.ProbeAddress]struct{}, len(ml.icmpTargets)) + for _, t := range ml.icmpTargets { + icmpSet[t] = struct{}{} + } + + sent := ml.sendCompositeOffsets(rttData, mergedDelivery, icmpSet) ml.log.Info("Completed measurement cycle", "measured", len(rttData), @@ -902,33 +926,52 @@ func (ml *measurementLoop) runCycle() { "total_icmp_targets", len(ml.icmpTargets)) } -func sendCompositeOffsets( - ctx context.Context, - log *slog.Logger, +func (ml *measurementLoop) sendCompositeOffsets( rttData map[geoprobe.ProbeAddress]uint64, - cache *offsetCache, - signer *geoprobe.OffsetSigner, - senderConn *net.UDPConn, - getCurrentSlot func(ctx context.Context) (uint64, error), - m *geoprobe.Metrics, + deliveryAddrs map[geoprobe.ProbeAddress]string, + icmpTargets map[geoprobe.ProbeAddress]struct{}, ) int { - dzdOffset := cache.GetBest() + dzdOffset := ml.cache.GetBest() if dzdOffset == nil { - log.Warn("No valid DZD offsets in cache, skipping composite generation") + ml.log.Warn("No valid DZD offsets in cache, skipping composite generation") return 0 } - slot, err := getCurrentSlot(ctx) + slot, err := ml.getCurrentSlot(ml.ctx) if err != nil { - log.Error("Failed to get current slot", "error", err) - m.Errors.WithLabelValues(geoprobe.ErrorTypeSlotFetch).Inc() + ml.log.Error("Failed to get current slot", "error", err) + ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeSlotFetch).Inc() return 0 } - log.Debug("fetched current slot", "slot", slot) + ml.log.Debug("fetched current slot", "slot", slot) sentCount := 0 for addr, measuredRttNs := range rttData { + // Determine where to deliver the offset. + var targetAddr *net.UDPAddr + deliveryDest, hasDelivery := deliveryAddrs[addr] + _, isICMP := icmpTargets[addr] + + if hasDelivery { + // Result destination override — resolve (may involve DNS). + resolved, resolveErr := ml.dnsCache.Resolve(deliveryDest) + if resolveErr != nil { + ml.log.Warn("Failed to resolve delivery address, skipping target", + "target", addr, "delivery", deliveryDest, "error", resolveErr) + continue + } + targetAddr = resolved + } else if isICMP { + // ICMP target without a result destination — nothing is listening. + ml.log.Debug("ICMP target has no result destination, skipping offset delivery", + "target", addr.Host) + continue + } else { + // Standard outbound target — send to the measurement address. + targetAddr = &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.Port)} + } + compositeOffset := geoprobe.LocationOffset{ Version: geoprobe.LocationOffsetVersion, MeasurementSlot: slot, @@ -941,23 +984,23 @@ func sendCompositeOffsets( References: []geoprobe.LocationOffset{*dzdOffset}, } - if err := signer.SignOffset(&compositeOffset); err != nil { - log.Error("Failed to sign composite offset", "target", addr, "error", err) - m.Errors.WithLabelValues(geoprobe.ErrorTypeSignOffset).Inc() + if err := ml.signer.SignOffset(&compositeOffset); err != nil { + ml.log.Error("Failed to sign composite offset", "target", addr, "error", err) + ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeSignOffset).Inc() continue } - targetAddr := &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.Port)} - if err := geoprobe.SendOffset(senderConn, targetAddr, &compositeOffset); err != nil { - log.Error("Failed to send composite offset", "target", addr, "error", err) - m.Errors.WithLabelValues(geoprobe.ErrorTypeSendOffset).Inc() + if err := geoprobe.SendOffset(ml.senderConn, targetAddr, &compositeOffset); err != nil { + ml.log.Error("Failed to send composite offset", "target", addr, "error", err) + ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeSendOffset).Inc() continue } sentCount++ - m.CompositeOffsetsSent.Inc() - log.Debug("Sent composite offset to target", + ml.metrics.CompositeOffsetsSent.Inc() + ml.log.Debug("Sent composite offset", "target", addr, + "delivery", targetAddr, "slot", slot, "measured_rtt_ns", measuredRttNs, "total_rtt_ns", compositeOffset.RttNs, diff --git a/controlplane/telemetry/internal/geoprobe/dns_cache.go b/controlplane/telemetry/internal/geoprobe/dns_cache.go new file mode 100644 index 0000000000..44c3fec7fa --- /dev/null +++ b/controlplane/telemetry/internal/geoprobe/dns_cache.go @@ -0,0 +1,101 @@ +package geoprobe + +import ( + "fmt" + "net" + "strconv" + "sync" + "time" +) + +// DNSCache resolves host:port strings to *net.UDPAddr, caching DNS lookups +// for domain-based hosts with a configurable TTL. +type DNSCache struct { + mu sync.RWMutex + entries map[string]dnsCacheEntry + ttl time.Duration + now func() time.Time // for testing + lookup func(string) ([]string, error) // for testing +} + +type dnsCacheEntry struct { + ip string + expiresAt time.Time +} + +// NewDNSCache creates a DNSCache with the given TTL for cached lookups. +func NewDNSCache(ttl time.Duration) *DNSCache { + return &DNSCache{ + entries: make(map[string]dnsCacheEntry), + ttl: ttl, + now: time.Now, + lookup: net.LookupHost, + } +} + +// Resolve resolves a host:port string to a *net.UDPAddr. +// If the host is an IP address, it is used directly (no caching). +// If the host is a domain name, DNS lookup is performed and the result +// is cached for the configured TTL. +func (c *DNSCache) Resolve(hostPort string) (*net.UDPAddr, error) { + host, portStr, err := net.SplitHostPort(hostPort) + if err != nil { + return nil, fmt.Errorf("invalid address %q: %w", hostPort, err) + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, fmt.Errorf("invalid port in %q: %w", hostPort, err) + } + + // If host is already an IP, validate scope and use directly. + if ip := net.ParseIP(host); ip != nil { + scopeCheck := ProbeAddress{Host: host} + if err := scopeCheck.ValidateScope(); err != nil { + return nil, fmt.Errorf("delivery address rejected: %w", err) + } + return &net.UDPAddr{IP: ip, Port: port}, nil + } + + // Domain name — check cache. + now := c.now() + + c.mu.RLock() + entry, ok := c.entries[host] + c.mu.RUnlock() + + if ok && now.Before(entry.expiresAt) { + return &net.UDPAddr{IP: net.ParseIP(entry.ip), Port: port}, nil + } + + // Cache miss or expired — resolve. + ips, err := c.lookup(host) + if err != nil { + return nil, fmt.Errorf("DNS lookup failed for %q: %w", host, err) + } + if len(ips) == 0 { + return nil, fmt.Errorf("DNS lookup returned no results for %q", host) + } + + resolved := ips[0] + resolvedIP := net.ParseIP(resolved) + if resolvedIP == nil { + return nil, fmt.Errorf("DNS lookup for %q returned unparseable IP %q", host, resolved) + } + + // Validate resolved IP against scope check to prevent DNS rebinding attacks + // (e.g., domain initially resolves to public IP but later rebinds to internal). + scopeCheck := ProbeAddress{Host: resolved} + if err := scopeCheck.ValidateScope(); err != nil { + return nil, fmt.Errorf("DNS-resolved address for %q rejected: %w", host, err) + } + + c.mu.Lock() + c.entries[host] = dnsCacheEntry{ + ip: resolved, + expiresAt: now.Add(c.ttl), + } + c.mu.Unlock() + + return &net.UDPAddr{IP: resolvedIP, Port: port}, nil +} diff --git a/controlplane/telemetry/internal/geoprobe/dns_cache_test.go b/controlplane/telemetry/internal/geoprobe/dns_cache_test.go new file mode 100644 index 0000000000..e65d952460 --- /dev/null +++ b/controlplane/telemetry/internal/geoprobe/dns_cache_test.go @@ -0,0 +1,270 @@ +package geoprobe + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func TestDNSCache_ResolveIPAddress(t *testing.T) { + cache := NewDNSCache(5 * time.Minute) + + addr, err := cache.Resolve("185.199.108.1:9000") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if addr.IP.String() != "185.199.108.1" { + t.Errorf("expected IP 185.199.108.1, got %s", addr.IP) + } + if addr.Port != 9000 { + t.Errorf("expected port 9000, got %d", addr.Port) + } +} + +func TestDNSCache_ResolveIPAddress_NoCaching(t *testing.T) { + lookupCalls := 0 + cache := NewDNSCache(5 * time.Minute) + cache.lookup = func(host string) ([]string, error) { + lookupCalls++ + return []string{"1.2.3.4"}, nil + } + + // IP addresses should not trigger DNS lookup. + for i := 0; i < 3; i++ { + _, err := cache.Resolve("44.0.0.1:8080") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + if lookupCalls != 0 { + t.Errorf("expected 0 DNS lookups for IP address, got %d", lookupCalls) + } +} + +func TestDNSCache_ResolveDomain(t *testing.T) { + cache := NewDNSCache(5 * time.Minute) + cache.lookup = func(host string) ([]string, error) { + if host == "results.example.com" { + return []string{"93.184.216.34"}, nil + } + return nil, fmt.Errorf("unknown host: %s", host) + } + + addr, err := cache.Resolve("results.example.com:9000") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if addr.IP.String() != "93.184.216.34" { + t.Errorf("expected IP 93.184.216.34, got %s", addr.IP) + } + if addr.Port != 9000 { + t.Errorf("expected port 9000, got %d", addr.Port) + } +} + +func TestDNSCache_CachesDNSLookup(t *testing.T) { + lookupCalls := 0 + cache := NewDNSCache(5 * time.Minute) + cache.lookup = func(host string) ([]string, error) { + lookupCalls++ + return []string{"93.184.216.34"}, nil + } + + // First call triggers lookup. + _, err := cache.Resolve("results.example.com:9000") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lookupCalls != 1 { + t.Fatalf("expected 1 lookup call, got %d", lookupCalls) + } + + // Second call should use cache. + _, err = cache.Resolve("results.example.com:9000") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lookupCalls != 1 { + t.Errorf("expected 1 lookup call (cached), got %d", lookupCalls) + } +} + +func TestDNSCache_TTLExpiry(t *testing.T) { + lookupCalls := 0 + now := time.Now() + + cache := NewDNSCache(5 * time.Minute) + cache.now = func() time.Time { return now } + cache.lookup = func(host string) ([]string, error) { + lookupCalls++ + return []string{"93.184.216.34"}, nil + } + + // First lookup. + _, err := cache.Resolve("results.example.com:9000") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lookupCalls != 1 { + t.Fatalf("expected 1 lookup, got %d", lookupCalls) + } + + // Advance past TTL. + now = now.Add(6 * time.Minute) + + // Should trigger new lookup. + _, err = cache.Resolve("results.example.com:9000") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lookupCalls != 2 { + t.Errorf("expected 2 lookups after TTL expiry, got %d", lookupCalls) + } +} + +func TestDNSCache_LookupFailure(t *testing.T) { + cache := NewDNSCache(5 * time.Minute) + cache.lookup = func(host string) ([]string, error) { + return nil, fmt.Errorf("dns: NXDOMAIN") + } + + _, err := cache.Resolve("nonexistent.example.com:9000") + if err == nil { + t.Fatal("expected error for failed DNS lookup") + } +} + +func TestDNSCache_InvalidAddress(t *testing.T) { + cache := NewDNSCache(5 * time.Minute) + + tests := []struct { + name string + address string + }{ + {"no port", "example.com"}, + {"empty", ""}, + {"invalid port", "example.com:abc"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := cache.Resolve(tt.address) + if err == nil { + t.Error("expected error for invalid address") + } + }) + } +} + +func TestDNSCache_ConcurrentAccess(t *testing.T) { + cache := NewDNSCache(5 * time.Minute) + cache.lookup = func(host string) ([]string, error) { + return []string{"93.184.216.34"}, nil + } + + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := cache.Resolve("results.example.com:9000") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + } + wg.Wait() +} + +func TestDNSCache_DifferentDomains(t *testing.T) { + lookupCalls := make(map[string]int) + cache := NewDNSCache(5 * time.Minute) + cache.lookup = func(host string) ([]string, error) { + lookupCalls[host]++ + return []string{"1.2.3.4"}, nil + } + + _, _ = cache.Resolve("a.example.com:9000") + _, _ = cache.Resolve("b.example.com:9000") + _, _ = cache.Resolve("a.example.com:9000") // cached + + if lookupCalls["a.example.com"] != 1 { + t.Errorf("expected 1 lookup for a.example.com, got %d", lookupCalls["a.example.com"]) + } + if lookupCalls["b.example.com"] != 1 { + t.Errorf("expected 1 lookup for b.example.com, got %d", lookupCalls["b.example.com"]) + } +} + +func TestDNSCache_DifferentPortsSameDomain(t *testing.T) { + lookupCalls := 0 + cache := NewDNSCache(5 * time.Minute) + cache.lookup = func(host string) ([]string, error) { + lookupCalls++ + return []string{"93.184.216.34"}, nil + } + + addr1, _ := cache.Resolve("results.example.com:9000") + addr2, _ := cache.Resolve("results.example.com:8080") + + // Same domain should only be resolved once. + if lookupCalls != 1 { + t.Errorf("expected 1 lookup for same domain different ports, got %d", lookupCalls) + } + if addr1.Port != 9000 { + t.Errorf("expected port 9000, got %d", addr1.Port) + } + if addr2.Port != 8080 { + t.Errorf("expected port 8080, got %d", addr2.Port) + } +} + +func TestDNSCache_RejectsPrivateIPAddress(t *testing.T) { + cache := NewDNSCache(5 * time.Minute) + + tests := []struct { + name string + address string + }{ + {"loopback", "127.0.0.1:9000"}, + {"private 10/8", "10.0.0.1:9000"}, + {"private 172.16/12", "172.16.0.1:9000"}, + {"private 192.168/16", "192.168.1.1:9000"}, + {"link-local", "169.254.1.1:9000"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := cache.Resolve(tt.address) + if err == nil { + t.Errorf("expected error for non-public IP %s", tt.address) + } + }) + } +} + +func TestDNSCache_RejectsDNSRebindingToPrivateIP(t *testing.T) { + cache := NewDNSCache(5 * time.Minute) + cache.lookup = func(host string) ([]string, error) { + // Simulate DNS rebinding: domain resolves to a private IP. + return []string{"127.0.0.1"}, nil + } + + _, err := cache.Resolve("malicious.example.com:9000") + if err == nil { + t.Error("expected error when DNS resolves to private IP") + } +} + +func TestDNSCache_AcceptsPublicIPAddress(t *testing.T) { + cache := NewDNSCache(5 * time.Minute) + + addr, err := cache.Resolve("44.0.0.1:9000") + if err != nil { + t.Fatalf("unexpected error for public IP: %v", err) + } + if addr.IP.String() != "44.0.0.1" { + t.Errorf("expected 44.0.0.1, got %s", addr.IP) + } +} diff --git a/controlplane/telemetry/internal/geoprobe/target_discovery.go b/controlplane/telemetry/internal/geoprobe/target_discovery.go index 82b29dfbe1..4733e68831 100644 --- a/controlplane/telemetry/internal/geoprobe/target_discovery.go +++ b/controlplane/telemetry/internal/geoprobe/target_discovery.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "net" "sort" "sync/atomic" @@ -19,7 +20,8 @@ type GeolocationUserClient interface { // TargetUpdate contains outbound probe targets discovered from onchain data. type TargetUpdate struct { - Targets []ProbeAddress + Targets []ProbeAddress + DeliveryAddrs map[ProbeAddress]string // measurement target → "host:port" override (empty map = all send to target) } // InboundKeyUpdate contains inbound allowed pubkeys discovered from onchain data. @@ -29,7 +31,8 @@ type InboundKeyUpdate struct { // ICMPTargetUpdate contains outbound ICMP probe targets discovered from onchain data. type ICMPTargetUpdate struct { - Targets []ProbeAddress + Targets []ProbeAddress + DeliveryAddrs map[ProbeAddress]string // measurement target → "host:port" override (empty map = no listener) } // targetDiscoveryFullRefreshEvery controls how often a full GeolocationUser scan @@ -57,6 +60,8 @@ type TargetDiscovery struct { cachedTargets []ProbeAddress cachedIcmpTargets []ProbeAddress cachedInboundKeys [][32]byte + cachedOutboundDelivery map[ProbeAddress]string + cachedIcmpDelivery map[ProbeAddress]string tickCount uint64 lastSeenTargetUpdateCount uint32 } @@ -86,7 +91,7 @@ func (d *TargetDiscovery) Tick(ctx context.Context, targetCh chan<- TargetUpdate } func (d *TargetDiscovery) discoverAndSend(ctx context.Context, targetCh chan<- TargetUpdate, keyCh chan<- InboundKeyUpdate, icmpTargetCh chan<- ICMPTargetUpdate) { - targets, icmpTargets, inboundKeys, err := d.discover(ctx) + targets, icmpTargets, inboundKeys, outboundDelivery, icmpDelivery, err := d.discover(ctx) if err != nil { d.log.Warn("Target discovery tick failed", "error", err) return @@ -97,10 +102,11 @@ func (d *TargetDiscovery) discoverAndSend(ctx context.Context, targetCh chan<- T return } - if !probeAddressSlicesEqual(targets, d.cachedTargets) { + if !probeAddressSlicesEqual(targets, d.cachedTargets) || !deliveryAddrsEqual(outboundDelivery, d.cachedOutboundDelivery) { d.cachedTargets = targets + d.cachedOutboundDelivery = outboundDelivery select { - case targetCh <- TargetUpdate{Targets: targets}: + case targetCh <- TargetUpdate{Targets: targets, DeliveryAddrs: outboundDelivery}: default: d.log.Warn("Target update channel full, skipping update") } @@ -115,10 +121,11 @@ func (d *TargetDiscovery) discoverAndSend(ctx context.Context, targetCh chan<- T } } - if !probeAddressSlicesEqual(icmpTargets, d.cachedIcmpTargets) { + if !probeAddressSlicesEqual(icmpTargets, d.cachedIcmpTargets) || !deliveryAddrsEqual(icmpDelivery, d.cachedIcmpDelivery) { d.cachedIcmpTargets = icmpTargets + d.cachedIcmpDelivery = icmpDelivery select { - case icmpTargetCh <- ICMPTargetUpdate{Targets: icmpTargets}: + case icmpTargetCh <- ICMPTargetUpdate{Targets: icmpTargets, DeliveryAddrs: icmpDelivery}: default: d.log.Warn("ICMP target update channel full, skipping update") } @@ -126,8 +133,10 @@ func (d *TargetDiscovery) discoverAndSend(ctx context.Context, targetCh chan<- T } // discover performs a single discovery cycle: fetch users, filter, extract targets/keys, -// merge with CLI values. Returns nil, nil, nil, nil when the scan is skipped. -func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, []ProbeAddress, [][32]byte, error) { +// merge with CLI values. Returns nil, nil, nil, nil, nil, nil when the scan is skipped. +// The returned delivery maps map measurement target → result destination for targets +// whose user has a non-empty ResultDestination, split by target type. +func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, []ProbeAddress, [][32]byte, map[ProbeAddress]string, map[ProbeAddress]string, error) { forceFullRefresh := d.tickCount%targetDiscoveryFullRefreshEvery == 0 d.tickCount++ @@ -136,14 +145,14 @@ func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, []Probe if current == d.lastSeenTargetUpdateCount && d.tickCount > 1 { d.log.Debug("GeoProbe target_update_count unchanged, skipping target scan", "targetUpdateCount", current) - return nil, nil, nil, nil + return nil, nil, nil, nil, nil, nil } d.lastSeenTargetUpdateCount = current } users, err := d.client.GetGeolocationUsers(ctx) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to fetch GeolocationUser accounts: %w", err) + return nil, nil, nil, nil, nil, fmt.Errorf("failed to fetch GeolocationUser accounts: %w", err) } var probePKBytes [32]byte @@ -152,6 +161,8 @@ func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, []Probe var onchainTargets []ProbeAddress var onchainIcmpTargets []ProbeAddress var onchainKeys [][32]byte + outboundDelivery := make(map[ProbeAddress]string) + icmpDelivery := make(map[ProbeAddress]string) seenKeys := make(map[[32]byte]struct{}) for i := range users { @@ -165,6 +176,15 @@ func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, []Probe continue } + resultDest := user.ResultDestination + if resultDest != "" { + if _, _, err := net.SplitHostPort(resultDest); err != nil { + d.log.Warn("Skipping invalid result destination", + "user", users[i].Code, "resultDestination", resultDest, "error", err) + resultDest = "" + } + } + for j := range user.Targets { target := &user.Targets[j] @@ -187,6 +207,9 @@ func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, []Probe continue } onchainTargets = append(onchainTargets, addr) + if resultDest != "" { + outboundDelivery[addr] = resultDest + } case geolocation.GeoLocationTargetTypeInbound: var key [32]byte @@ -209,6 +232,9 @@ func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, []Probe continue } onchainIcmpTargets = append(onchainIcmpTargets, addr) + if resultDest != "" { + icmpDelivery[addr] = resultDest + } } } } @@ -223,9 +249,11 @@ func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, []Probe "onchainOutbound", len(onchainTargets), "onchainOutboundIcmp", len(onchainIcmpTargets), "onchainInbound", len(onchainKeys), + "outboundDeliveryOverrides", len(outboundDelivery), + "icmpDeliveryOverrides", len(icmpDelivery), ) - return onchainTargets, onchainIcmpTargets, onchainKeys, nil + return onchainTargets, onchainIcmpTargets, onchainKeys, outboundDelivery, icmpDelivery, nil } // targetToProbeAddress converts a GeolocationTarget to a ProbeAddress. @@ -274,6 +302,19 @@ func probeAddressSlicesEqual(a, b []ProbeAddress) bool { return true } +// deliveryAddrsEqual checks if two delivery address maps are equal. +func deliveryAddrsEqual(a, b map[ProbeAddress]string) bool { + if len(a) != len(b) { + return false + } + for k, v := range a { + if bv, ok := b[k]; !ok || bv != v { + return false + } + } + return true +} + // keySlicesEqual checks if two key slices contain the same keys (order-independent). func keySlicesEqual(a, b [][32]byte) bool { if len(a) != len(b) { diff --git a/controlplane/telemetry/internal/geoprobe/target_discovery_test.go b/controlplane/telemetry/internal/geoprobe/target_discovery_test.go index 665cdf68bd..85576ce923 100644 --- a/controlplane/telemetry/internal/geoprobe/target_discovery_test.go +++ b/controlplane/telemetry/internal/geoprobe/target_discovery_test.go @@ -50,6 +50,20 @@ func makeUser(status geolocation.GeolocationUserStatus, payment geolocation.Geol } } +func makeUserWithResultDest(status geolocation.GeolocationUserStatus, payment geolocation.GeolocationPaymentStatus, code string, targets []geolocation.GeolocationTarget, resultDest string) geolocation.KeyedGeolocationUser { + return geolocation.KeyedGeolocationUser{ + Pubkey: solana.NewWallet().PublicKey(), + GeolocationUser: geolocation.GeolocationUser{ + AccountType: geolocation.AccountTypeGeolocationUser, + Status: status, + PaymentStatus: payment, + Code: code, + Targets: targets, + ResultDestination: resultDest, + }, + } +} + func outboundTarget(ip [4]uint8, port uint16, probePK solana.PublicKey) geolocation.GeolocationTarget { return geolocation.GeolocationTarget{ TargetType: geolocation.GeoLocationTargetTypeOutbound, @@ -96,7 +110,7 @@ func TestTargetDiscovery_HappyPath(t *testing.T) { } td := newTestTargetDiscovery(client) - targets, _, keys, err := td.discover(context.Background()) + targets, _, keys, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -122,7 +136,7 @@ func TestTargetDiscovery_StatusFilter(t *testing.T) { } td := newTestTargetDiscovery(client) - targets, _, keys, err := td.discover(context.Background()) + targets, _, keys, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -145,7 +159,7 @@ func TestTargetDiscovery_PaymentFilter(t *testing.T) { } td := newTestTargetDiscovery(client) - targets, _, keys, err := td.discover(context.Background()) + targets, _, keys, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -171,7 +185,7 @@ func TestTargetDiscovery_CombinedFilter(t *testing.T) { } td := newTestTargetDiscovery(client) - targets, _, _, err := td.discover(context.Background()) + targets, _, _, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -194,7 +208,7 @@ func TestTargetDiscovery_ProbePKFilter(t *testing.T) { } td := newTestTargetDiscovery(client) - targets, _, _, err := td.discover(context.Background()) + targets, _, _, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -215,7 +229,7 @@ func TestTargetDiscovery_InboundTargets(t *testing.T) { } td := newTestTargetDiscovery(client) - targets, _, keys, err := td.discover(context.Background()) + targets, _, keys, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -245,7 +259,7 @@ func TestTargetDiscovery_MixedTargets(t *testing.T) { } td := newTestTargetDiscovery(client) - targets, _, keys, err := td.discover(context.Background()) + targets, _, keys, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -318,7 +332,7 @@ func TestTargetDiscovery_DeduplicateInboundKeys(t *testing.T) { } td := newTestTargetDiscovery(client) - _, _, keys, err := td.discover(context.Background()) + _, _, keys, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -372,7 +386,7 @@ func TestTargetDiscovery_TargetUpdateCountUnchanged_SkipsScan(t *testing.T) { }) // First call (tick 0): always does full scan (forceFullRefresh). - targets, _, _, err := td.discover(context.Background()) + targets, _, _, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -384,7 +398,7 @@ func TestTargetDiscovery_TargetUpdateCountUnchanged_SkipsScan(t *testing.T) { } // Second call: counter unchanged → should skip. - targets, _, keys, err := td.discover(context.Background()) + targets, _, keys, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -417,11 +431,11 @@ func TestTargetDiscovery_TargetUpdateCountChanged_DoesFullScan(t *testing.T) { }) // First call: full scan. - _, _, _, _ = td.discover(context.Background()) + _, _, _, _, _, _ = td.discover(context.Background()) // Change counter, second call should do full scan. counter.Store(6) - targets, _, _, err := td.discover(context.Background()) + targets, _, _, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -456,12 +470,12 @@ func TestTargetDiscovery_ForcedFullRefresh_IgnoresCounter(t *testing.T) { // Tick through to the next forced refresh (every 5th tick). // Tick 0: forced (0 % 5 == 0), tick 1-4: skipped (counter unchanged), tick 5: forced. for i := 0; i < targetDiscoveryFullRefreshEvery; i++ { - _, _, _, _ = td.discover(context.Background()) + _, _, _, _, _, _ = td.discover(context.Background()) } callsBefore := client.calls // Next tick (tick 5): forced full refresh even though counter unchanged. - targets, _, _, err := td.discover(context.Background()) + targets, _, _, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -487,7 +501,7 @@ func TestTargetDiscovery_NilProbeTargetUpdateCount_AlwaysScans(t *testing.T) { td := newTestTargetDiscovery(client) for i := 0; i < 3; i++ { - _, _, _, _ = td.discover(context.Background()) + _, _, _, _, _, _ = td.discover(context.Background()) } if client.calls != 3 { t.Errorf("expected 3 RPC calls without ProbeTargetUpdateCount, got %d", client.calls) @@ -519,7 +533,7 @@ func TestTargetDiscovery_RejectsNonPublicOutboundTargets(t *testing.T) { }, } td := newTestTargetDiscovery(client) - targets, _, _, err := td.discover(context.Background()) + targets, _, _, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -541,7 +555,7 @@ func TestTargetDiscovery_OutboundIcmpTargets(t *testing.T) { } td := newTestTargetDiscovery(client) - targets, icmpTargets, keys, err := td.discover(context.Background()) + targets, icmpTargets, keys, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -574,7 +588,7 @@ func TestTargetDiscovery_MixedOutboundAndIcmp(t *testing.T) { } td := newTestTargetDiscovery(client) - targets, icmpTargets, keys, err := td.discover(context.Background()) + targets, icmpTargets, keys, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -606,7 +620,7 @@ func TestTargetDiscovery_OutboundIcmpPrivateIPRejected(t *testing.T) { } td := newTestTargetDiscovery(client) - _, icmpTargets, _, err := td.discover(context.Background()) + _, icmpTargets, _, _, _, err := td.discover(context.Background()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -614,3 +628,264 @@ func TestTargetDiscovery_OutboundIcmpPrivateIPRejected(t *testing.T) { t.Errorf("expected private IP to be rejected, got %d targets", len(icmpTargets)) } } + +func TestTargetDiscovery_ResultDestination_OutboundOverride(t *testing.T) { + probePK := testProbePubkey() + client := &mockGeolocationUserClient{ + users: []geolocation.KeyedGeolocationUser{ + makeUserWithResultDest( + geolocation.GeolocationUserStatusActivated, + geolocation.GeolocationPaymentStatusPaid, + "user1", + []geolocation.GeolocationTarget{ + outboundTarget([4]uint8{44, 0, 0, 1}, 9000, probePK), + }, + "185.199.108.1:9000", + ), + }, + } + + td := newTestTargetDiscovery(client) + targets, _, _, delivery, _, err := td.discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(targets) != 1 { + t.Fatalf("expected 1 target, got %d", len(targets)) + } + if len(delivery) != 1 { + t.Fatalf("expected 1 delivery override, got %d", len(delivery)) + } + dest, ok := delivery[targets[0]] + if !ok { + t.Fatal("expected delivery address for target") + } + if dest != "185.199.108.1:9000" { + t.Errorf("expected delivery 185.199.108.1:9000, got %s", dest) + } +} + +func TestTargetDiscovery_NoResultDestination_NoDeliveryOverride(t *testing.T) { + probePK := testProbePubkey() + client := &mockGeolocationUserClient{ + users: []geolocation.KeyedGeolocationUser{ + makeUser( + geolocation.GeolocationUserStatusActivated, + geolocation.GeolocationPaymentStatusPaid, + "user1", + []geolocation.GeolocationTarget{ + outboundTarget([4]uint8{44, 0, 0, 1}, 9000, probePK), + }, + ), + }, + } + + td := newTestTargetDiscovery(client) + targets, _, _, delivery, _, err := td.discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(targets) != 1 { + t.Fatalf("expected 1 target, got %d", len(targets)) + } + if len(delivery) != 0 { + t.Errorf("expected no delivery overrides for user without result destination, got %d", len(delivery)) + } +} + +func TestTargetDiscovery_ResultDestination_ICMPOverride(t *testing.T) { + probePK := testProbePubkey() + client := &mockGeolocationUserClient{ + users: []geolocation.KeyedGeolocationUser{ + makeUserWithResultDest( + geolocation.GeolocationUserStatusActivated, + geolocation.GeolocationPaymentStatusPaid, + "user1", + []geolocation.GeolocationTarget{ + outboundIcmpTarget([4]uint8{44, 0, 0, 1}, 9000, probePK), + }, + "results.example.com:9000", + ), + }, + } + + td := newTestTargetDiscovery(client) + _, icmpTargets, _, _, delivery, err := td.discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(icmpTargets) != 1 { + t.Fatalf("expected 1 ICMP target, got %d", len(icmpTargets)) + } + dest, ok := delivery[icmpTargets[0]] + if !ok { + t.Fatal("expected delivery address for ICMP target") + } + if dest != "results.example.com:9000" { + t.Errorf("expected delivery results.example.com:9000, got %s", dest) + } +} + +func TestTargetDiscovery_ResultDestination_MixedUsers(t *testing.T) { + probePK := testProbePubkey() + client := &mockGeolocationUserClient{ + users: []geolocation.KeyedGeolocationUser{ + // User with result destination. + makeUserWithResultDest( + geolocation.GeolocationUserStatusActivated, + geolocation.GeolocationPaymentStatusPaid, + "user-with-dest", + []geolocation.GeolocationTarget{ + outboundTarget([4]uint8{44, 0, 0, 1}, 9000, probePK), + }, + "185.199.108.1:9000", + ), + // User without result destination. + makeUser( + geolocation.GeolocationUserStatusActivated, + geolocation.GeolocationPaymentStatusPaid, + "user-no-dest", + []geolocation.GeolocationTarget{ + outboundTarget([4]uint8{44, 0, 0, 2}, 9001, probePK), + }, + ), + }, + } + + td := newTestTargetDiscovery(client) + targets, _, _, delivery, _, err := td.discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(targets) != 2 { + t.Fatalf("expected 2 targets, got %d", len(targets)) + } + if len(delivery) != 1 { + t.Fatalf("expected 1 delivery override, got %d", len(delivery)) + } + + // Find the target with the delivery override. + addr1 := ProbeAddress{Host: "44.0.0.1", Port: 9000, TWAMPPort: 8925} + addr2 := ProbeAddress{Host: "44.0.0.2", Port: 9001, TWAMPPort: 8925} + + if dest, ok := delivery[addr1]; !ok || dest != "185.199.108.1:9000" { + t.Errorf("expected delivery override for addr1, got ok=%v dest=%q", ok, dest) + } + if _, ok := delivery[addr2]; ok { + t.Error("expected no delivery override for addr2") + } +} + +func TestTargetDiscovery_ResultDestination_DomainName(t *testing.T) { + probePK := testProbePubkey() + client := &mockGeolocationUserClient{ + users: []geolocation.KeyedGeolocationUser{ + makeUserWithResultDest( + geolocation.GeolocationUserStatusActivated, + geolocation.GeolocationPaymentStatusPaid, + "user1", + []geolocation.GeolocationTarget{ + outboundTarget([4]uint8{44, 0, 0, 1}, 9000, probePK), + }, + "results.example.com:9000", + ), + }, + } + + td := newTestTargetDiscovery(client) + targets, _, _, delivery, _, err := td.discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(targets) != 1 { + t.Fatalf("expected 1 target, got %d", len(targets)) + } + dest, ok := delivery[targets[0]] + if !ok { + t.Fatal("expected delivery address for target") + } + if dest != "results.example.com:9000" { + t.Errorf("expected domain delivery address, got %s", dest) + } +} + +func TestTargetDiscovery_DeliveryAddrsChangeTriggersUpdate(t *testing.T) { + probePK := testProbePubkey() + client := &mockGeolocationUserClient{ + users: []geolocation.KeyedGeolocationUser{ + makeUserWithResultDest( + geolocation.GeolocationUserStatusActivated, + geolocation.GeolocationPaymentStatusPaid, + "user1", + []geolocation.GeolocationTarget{ + outboundTarget([4]uint8{44, 0, 0, 1}, 9000, probePK), + }, + "185.199.108.1:9000", + ), + }, + } + + td := newTestTargetDiscovery(client) + targetCh := make(chan TargetUpdate, 2) + keyCh := make(chan InboundKeyUpdate, 2) + icmpTargetCh := make(chan ICMPTargetUpdate, 2) + + ctx := context.Background() + + // First call should send update. + td.discoverAndSend(ctx, targetCh, keyCh, icmpTargetCh) + if len(targetCh) != 1 { + t.Fatalf("expected 1 target update after first call, got %d", len(targetCh)) + } + update := <-targetCh + if len(update.DeliveryAddrs) != 1 { + t.Fatalf("expected 1 delivery addr in update, got %d", len(update.DeliveryAddrs)) + } + + // Same data — no update. + td.discoverAndSend(ctx, targetCh, keyCh, icmpTargetCh) + if len(targetCh) != 0 { + t.Errorf("expected no update for unchanged data, got %d", len(targetCh)) + } + + // Change delivery address — should trigger update even though targets are the same. + client.users[0].GeolocationUser.ResultDestination = "44.0.0.99:8080" + td.discoverAndSend(ctx, targetCh, keyCh, icmpTargetCh) + if len(targetCh) != 1 { + t.Fatalf("expected 1 target update after delivery change, got %d", len(targetCh)) + } + update = <-targetCh + addr := ProbeAddress{Host: "44.0.0.1", Port: 9000, TWAMPPort: 8925} + if update.DeliveryAddrs[addr] != "44.0.0.99:8080" { + t.Errorf("expected updated delivery address, got %s", update.DeliveryAddrs[addr]) + } +} + +func TestTargetDiscovery_InvalidResultDestination_Ignored(t *testing.T) { + probePK := testProbePubkey() + client := &mockGeolocationUserClient{ + users: []geolocation.KeyedGeolocationUser{ + makeUserWithResultDest( + geolocation.GeolocationUserStatusActivated, + geolocation.GeolocationPaymentStatusPaid, + "user1", + []geolocation.GeolocationTarget{ + outboundTarget([4]uint8{44, 0, 0, 1}, 9000, probePK), + }, + "not-a-valid-address", // missing port + ), + }, + } + + td := newTestTargetDiscovery(client) + targets, _, _, delivery, _, err := td.discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(targets) != 1 { + t.Fatalf("expected 1 target (address still valid), got %d", len(targets)) + } + if len(delivery) != 0 { + t.Errorf("expected no delivery overrides for invalid result destination, got %d", len(delivery)) + } +} diff --git a/e2e/geoprobe_test.go b/e2e/geoprobe_test.go index 03b8bbf0a7..b78271c3e0 100644 --- a/e2e/geoprobe_test.go +++ b/e2e/geoprobe_test.go @@ -544,11 +544,16 @@ func startGeoprobeTarget(t *testing.T, log *slog.Logger, dn *devnet.Devnet, cyoa env["CLICKHOUSE_TLS_DISABLED"] = "true" } + containerName := "geoprobe-target" + if opts != nil && opts.nameSuffix != "" { + containerName = opts.nameSuffix + } + req := testcontainers.ContainerRequest{ Image: geoprobeImage, - Name: dn.Spec.DeployID + "-geoprobe-target", + Name: dn.Spec.DeployID + "-" + containerName, ConfigModifier: func(cfg *dockercontainer.Config) { - cfg.Hostname = "geoprobe-target" + cfg.Hostname = containerName }, Cmd: []string{"doublezero-geoprobe-target", "-twamp-port", "8925", "-udp-port", "8923"}, Env: env, @@ -583,7 +588,7 @@ func startGeoprobeTarget(t *testing.T, log *slog.Logger, dn *devnet.Devnet, cyoa } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - dumpContainerLogs(ctx, containerID, "geoprobe-target") + dumpContainerLogs(ctx, containerID, containerName) }) return containerID @@ -592,6 +597,7 @@ func startGeoprobeTarget(t *testing.T, log *slog.Logger, dn *devnet.Devnet, cyoa type geoprobeTargetOpts struct { clickhouseAddr string clickhousePass string + nameSuffix string // optional: overrides container name/hostname (default: "geoprobe-target") } // waitForTargetOffsetReceived polls the geoprobe-target container logs until they show @@ -836,6 +842,11 @@ func TestE2E_GeoprobeIcmpTargets(t *testing.T) { log.Debug("==> Adding outbound-icmp target") addGeolocationOutboundIcmpTarget(t, dn, "geo-user-01", targetIPStr, 8923, "geoprobe1") + // ICMP targets require a result destination for offset delivery (no UDP listener on the + // measurement target itself). Point it at the target container's UDP listener. + log.Debug("==> Setting result destination for ICMP offset delivery") + setGeolocationUserResultDestination(t, dn, "geo-user-01", targetIPStr+":8923") + // Start agent with CAP_NET_RAW for ICMP probing. log.Debug("==> Starting geoprobe agent (ICMP target discovery)") _ = startGeoprobeAgent(t, log, dn, geoprobeIPStr, geoprobeAccountPK, @@ -877,6 +888,17 @@ func updateGeolocationUserPayment(t *testing.T, dn *devnet.Devnet, code, status require.NoError(t, err, "user update-payment failed: %s", string(output)) } +// setGeolocationUserResultDestination sets the result destination on a GeolocationUser. +func setGeolocationUserResultDestination(t *testing.T, dn *devnet.Devnet, userCode, destination string) { + t.Helper() + output, err := dn.Manager.Exec(t.Context(), []string{ + "doublezero-geolocation", "user", "set-result-destination", + "--user", userCode, + "--destination", destination, + }) + require.NoError(t, err, "user set-result-destination failed: %s", string(output)) +} + // addGeolocationOutboundTarget adds an outbound target to a GeolocationUser. func addGeolocationOutboundTarget(t *testing.T, dn *devnet.Devnet, userCode, targetIP string, targetPort int, probeCode string) { t.Helper() @@ -1019,3 +1041,225 @@ func waitForInboundProbeSuccess(t *testing.T, containerID string, timeout time.D return false }, timeout, 5*time.Second, "Expected target-sender log to contain a successful probe pair with valid signatures and offsets") } + +// TestE2E_GeoprobeResultDestination verifies that composite offsets are routed to a +// ResultDestination instead of the measurement target. The test sets up an outbound target +// at one IP and a result destination listener at a different IP, then confirms the offset +// arrives at the result destination. +func TestE2E_GeoprobeResultDestination(t *testing.T) { + t.Parallel() + + deployID := "dz-e2e-" + t.Name() + "-" + random.ShortID() + log := newTestLoggerForTest(t) + + currentDir, err := os.Getwd() + require.NoError(t, err) + serviceabilityProgramKeypairPath := filepath.Join(currentDir, "data", "serviceability-program-keypair.json") + + minBalanceSOL := 3.0 + topUpSOL := 5.0 + dn, err := devnet.New(devnet.DevnetSpec{ + DeployID: deployID, + DeployDir: t.TempDir(), + CYOANetwork: devnet.CYOANetworkSpec{ + CIDRPrefix: subnetCIDRPrefix, + }, + DeviceTunnelNet: "192.168.99.0/24", + Manager: devnet.ManagerSpec{ + ServiceabilityProgramKeypairPath: serviceabilityProgramKeypairPath, + }, + Funder: devnet.FunderSpec{ + Verbose: true, + MinBalanceSOL: minBalanceSOL, + TopUpSOL: topUpSOL, + Interval: 3 * time.Second, + }, + }, log, dockerClient, subnetAllocator) + require.NoError(t, err) + + log.Debug("==> Starting containernet") + err = dn.Start(t.Context(), nil) + require.NoError(t, err) + + t.Cleanup(func() { + if !t.Failed() { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var buf strings.Builder + fmt.Fprintf(&buf, "\n=== GEOPROBE RESULT-DEST DIAGNOSTIC DUMP (deploy=%s) ===\n", deployID) + for code, device := range dn.Devices { + for _, cmd := range []struct { + label string + command []string + }{ + {"doublezero-telemetry log (last 200 lines)", []string{"tail", "-200", "/var/log/agents-latest/doublezero-telemetry"}}, + } { + output, err := device.Exec(ctx, cmd.command) + if err != nil { + fmt.Fprintf(&buf, "\n--- Device %s: %s (ERROR: %v)\n", code, cmd.label, err) + } else { + fmt.Fprintf(&buf, "\n--- Device %s: %s\n%s", code, cmd.label, string(output)) + } + } + } + fmt.Fprintf(&buf, "\n=== GEOPROBE RESULT-DEST DIAGNOSTIC DUMP END ===\n") + fmt.Fprint(os.Stderr, buf.String()) + }) + + linkNetwork := devnet.NewMiscNetwork(dn, log, "ams-dz01:ams-dz02") + _, err = linkNetwork.CreateIfNotExists(t.Context()) + require.NoError(t, err) + + // Add 2 devices in parallel. + var ( + dz1TelemetryKeypairPK solana.PublicKey + dz2TelemetryKeypairPK solana.PublicKey + ) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + telemetryKeypair := solana.NewWallet().PrivateKey + telemetryKeypairJSON, _ := json.Marshal(telemetryKeypair[:]) + telemetryKeypairPath := t.TempDir() + "/ams-dz01-telemetry-keypair.json" + require.NoError(t, os.WriteFile(telemetryKeypairPath, telemetryKeypairJSON, 0600)) + dz1TelemetryKeypairPK = telemetryKeypair.PublicKey() + + _, err := dn.AddDevice(t.Context(), devnet.DeviceSpec{ + Code: "ams-dz01", + Location: "ams", + Exchange: "xams", + MetricsPublisherPK: dz1TelemetryKeypairPK.String(), + CYOANetworkIPHostID: 8, + CYOANetworkAllocatablePrefix: 29, + Telemetry: devnet.DeviceTelemetrySpec{ + Enabled: true, + KeypairPath: telemetryKeypairPath, + TWAMPListenPort: 862, + ProbeInterval: 2 * time.Second, + SubmissionInterval: 5 * time.Second, + PeersRefreshInterval: 5 * time.Second, + Verbose: true, + }, + AdditionalNetworks: []string{linkNetwork.Name}, + Interfaces: map[string]string{ + "Ethernet2": "physical", + }, + }) + require.NoError(t, err) + requireEventuallyFunded(t, log, dn.Ledger.GetRPCClient(), dz1TelemetryKeypairPK, minBalanceSOL, "dz1 telemetry publisher") + }() + go func() { + defer wg.Done() + telemetryKeypair := solana.NewWallet().PrivateKey + telemetryKeypairJSON, _ := json.Marshal(telemetryKeypair[:]) + telemetryKeypairPath := t.TempDir() + "/ams-dz02-telemetry-keypair.json" + require.NoError(t, os.WriteFile(telemetryKeypairPath, telemetryKeypairJSON, 0600)) + dz2TelemetryKeypairPK = telemetryKeypair.PublicKey() + + _, err := dn.AddDevice(t.Context(), devnet.DeviceSpec{ + Code: "ams-dz02", + Location: "ams", + Exchange: "xams", + MetricsPublisherPK: dz2TelemetryKeypairPK.String(), + CYOANetworkIPHostID: 16, + CYOANetworkAllocatablePrefix: 29, + Telemetry: devnet.DeviceTelemetrySpec{ + Enabled: true, + KeypairPath: telemetryKeypairPath, + TWAMPListenPort: 862, + ProbeInterval: 2 * time.Second, + SubmissionInterval: 5 * time.Second, + PeersRefreshInterval: 5 * time.Second, + Verbose: true, + }, + AdditionalNetworks: []string{linkNetwork.Name}, + Interfaces: map[string]string{ + "Ethernet2": "physical", + }, + }) + require.NoError(t, err) + requireEventuallyFunded(t, log, dn.Ledger.GetRPCClient(), dz2TelemetryKeypairPK, minBalanceSOL, "dz2 telemetry publisher") + }() + wg.Wait() + + _ = dz2TelemetryKeypairPK // dz2 exists to form a link pair; not directly used below. + + dz1 := dn.Devices["ams-dz01"] + require.NotNil(t, dz1) + + geoprobeHostID := uint32(32) + geoprobeIP, err := netutil.DeriveIPFromCIDR(dn.CYOANetwork.SubnetCIDR, geoprobeHostID) + require.NoError(t, err) + geoprobeIPStr := geoprobeIP.To4().String() + + exchangePK := getExchangePK(t, dn, "xams") + + dz1DevicePK := dz1.ID + require.NotEmpty(t, dz1DevicePK) + + log.Debug("==> Creating geoprobe onchain") + geoprobeAccountPK := createGeoprobeOnchain(t, dn, "geoprobe1", exchangePK, geoprobeIPStr, dz1TelemetryKeypairPK.String()) + + log.Debug("==> Adding dz1 as geoprobe parent") + addGeoprobeParent(t, dn, "geoprobe1", dz1DevicePK) + + // Measurement target: geoprobe-target container that the agent probes via TWAMP. + measurementHostID := uint32(40) + measurementIP, err := netutil.DeriveIPFromCIDR(dn.CYOANetwork.SubnetCIDR, measurementHostID) + require.NoError(t, err) + measurementIPStr := measurementIP.To4().String() + + // Result destination: separate geoprobe-target container that receives the composite offset. + resultDestHostID := uint32(48) + resultDestIP, err := netutil.DeriveIPFromCIDR(dn.CYOANetwork.SubnetCIDR, resultDestHostID) + require.NoError(t, err) + resultDestIPStr := resultDestIP.To4().String() + + log.Debug("==> Starting measurement target container", "ip", measurementIPStr) + _ = startGeoprobeTarget(t, log, dn, measurementIPStr, &geoprobeTargetOpts{ + nameSuffix: "measurement-target", + }) + + log.Debug("==> Starting result destination container", "ip", resultDestIPStr) + resultDestContainerID := startGeoprobeTarget(t, log, dn, resultDestIPStr, &geoprobeTargetOpts{ + nameSuffix: "result-dest", + }) + + // Create a GeolocationUser with an outbound target pointing at the measurement container. + tokenAccount := solana.NewWallet().PublicKey().String() + log.Debug("==> Creating GeolocationUser onchain") + createGeolocationUser(t, dn, "geo-user-01", tokenAccount) + updateGeolocationUserPayment(t, dn, "geo-user-01", "paid") + + log.Debug("==> Adding outbound target (measurement)") + addGeolocationOutboundTarget(t, dn, "geo-user-01", measurementIPStr, 8923, "geoprobe1") + + // Set result destination to the separate container — offsets should land here, not + // at the measurement target. + log.Debug("==> Setting result destination", "destination", resultDestIPStr+":8923") + setGeolocationUserResultDestination(t, dn, "geo-user-01", resultDestIPStr+":8923") + + // Start agent — it will discover the target and result destination from onchain state. + log.Debug("==> Starting geoprobe agent") + _ = startGeoprobeAgent(t, log, dn, geoprobeIPStr, geoprobeAccountPK, + dn.Manager.GeolocationProgramID, dn.Manager.ServiceabilityProgramID, + &geoprobeAgentOpts{ + probeInterval: 5 * time.Second, + }) + + // --- Outbound flow with result destination --- + // Wait for dz1's telemetry agent to discover the geoprobe and successfully probe it. + log.Debug("==> Waiting for geoprobe discovery and successful measurement") + waitForGeoprobeSuccess(t, dz1, geoprobeIPStr, 180*time.Second) + + // Wait for the result destination container (not the measurement target) to receive + // the composite offset. This proves the offset was routed to the alternate address. + log.Debug("==> Waiting for result destination to receive composite offset") + waitForTargetOffsetReceived(t, resultDestContainerID, 120*time.Second) + log.Debug("==> Result destination received valid composite offset — routing verified") +}