Skip to content

Commit 2930052

Browse files
committed
DXCC and LoTW data handling fixes. This closes #8 and closes #10 for good!
1 parent 1eab127 commit 2930052

6 files changed

Lines changed: 246 additions & 129 deletions

File tree

cmd/dxcluster-client/main.go

Lines changed: 168 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"log"
7-
"net"
87
"net/http"
98
"os"
109
"os/signal"
@@ -142,13 +141,13 @@ func RunApplication(ctx context.Context, args []string) int {
142141
logging.Info("Configuration loaded. WebPort: %d, MaxCache: %d, DataDir: %s", cfg.WebPort, cfg.MaxCache, cfg.DataDir)
143142

144143
// Debug configuration details
145-
logging.Debug("DX Config Debug: DXCHost='%s', DXCPort='%s', DXCCallsign='%s', DXCPassword='%s'", cfg.DXCHost, cfg.DXCPort, cfg.DXCCallsign, cfg.DXCPassword)
146144
logging.Debug("CLUSTERS JSON: '%s'", cfg.RawClustersJSON)
145+
logging.Debug("Global Callsign: '%s'", cfg.Callsign)
147146

148147
if len(cfg.Clusters) > 0 {
149148
logging.Info("DX Clusters configured: %d", len(cfg.Clusters))
150149
for i, cluster := range cfg.Clusters {
151-
logging.Info(" Cluster %d: %s (%s:%s) callsign=%s", i+1, cluster.ClusterName, cluster.Host, cluster.Port, cluster.Callsign)
150+
logging.Info(" Cluster %d: %s:%s callsign=%s", i+1, cluster.Host, cluster.Port, cluster.Callsign)
152151
}
153152
} else {
154153
logging.Warn("No DX Clusters configured.")
@@ -198,15 +197,33 @@ func RunApplication(ctx context.Context, args []string) int {
198197
if err != nil {
199198
log.Fatalf("FATAL: Failed to initialize LoTW client: %v\n", err)
200199
}
201-
lotwClient.StartUpdater(ctx) // Start periodic updates
202-
logging.Info("LoTW client initialized.")
200+
201+
// Perform initial synchronous LoTW data check and download BEFORE starting spot sources
202+
logging.Info("Checking LoTW data status...")
203+
lastLoTWUpdate, err := lotwClient.GetLastDownloadTime(ctx)
204+
if err != nil {
205+
logging.Warn("Failed to check LoTW last update time: %v", err)
206+
}
207+
needsLoTWUpdate := lastLoTWUpdate.IsZero() || time.Since(lastLoTWUpdate) >= cfg.LoTWUpdateInterval
208+
if needsLoTWUpdate {
209+
logging.Info("LoTW data needs update, downloading now (this may take a moment)...")
210+
if err := lotwClient.FetchAndStoreUsers(ctx); err != nil {
211+
logging.Error("Initial LoTW data fetch failed: %v", err)
212+
log.Fatalf("FATAL: Cannot start without LoTW data\n")
213+
}
214+
logging.Info("LoTW data downloaded and loaded successfully.")
215+
} else {
216+
logging.Info("LoTW data is up to date (last updated: %s)", lastLoTWUpdate.Format(time.RFC3339))
217+
}
218+
219+
// Now start background periodic updater
220+
lotwClient.StartUpdater(ctx)
221+
logging.Info("LoTW client ready. Background updater started for periodic checks.")
203222
defer func() {
204223
logging.Info("Deferred: stopping LoTW updater...")
205224
lotwClient.StopUpdater()
206225
logging.Info("Deferred: LoTW updater stopped.")
207-
}()
208-
209-
// --- 4. Initialize DXCC Client ---
226+
}() // --- 4. Initialize DXCC Client ---
210227
dxccDBClient, err := db.NewSQLiteClient(cfg.DataDir, dxcc.DBFileName)
211228
if err != nil {
212229
log.Fatalf("FATAL: Failed to initialize DXCC DB client: %v\n", err)
@@ -217,15 +234,48 @@ func RunApplication(ctx context.Context, args []string) int {
217234
if err != nil {
218235
log.Fatalf("FATAL: Failed to initialize DXCC client: %v\n", err)
219236
}
220-
dxccClient.StartUpdater(ctx) // Start periodic updates
221-
logging.Info("DXCC client initialized.")
237+
238+
// Perform initial synchronous DXCC data check and download BEFORE starting spot sources
239+
logging.Info("Checking DXCC data status...")
240+
lastDXCCUpdate, err := dxccClient.GetLastDownloadTime(ctx)
241+
if err != nil {
242+
logging.Warn("Failed to check DXCC last update time: %v", err)
243+
}
244+
needsDXCCUpdate := lastDXCCUpdate.IsZero() || time.Since(lastDXCCUpdate) >= cfg.DXCCUpdateInterval
245+
if needsDXCCUpdate {
246+
logging.Info("DXCC data needs update, downloading now (this may take a moment)...")
247+
dxccClient.FetchAndStoreData(ctx)
248+
logging.Info("DXCC data downloaded and loaded successfully.")
249+
} else {
250+
logging.Info("DXCC data is up to date (last updated: %s)", lastDXCCUpdate.Format(time.RFC3339))
251+
// Load in-memory maps from database NOW to ensure they're ready before spots arrive
252+
if err := dxccClient.LoadMapsFromDB(ctx); err != nil {
253+
log.Fatalf("FATAL: Failed to load DXCC maps from database: %v\n", err)
254+
}
255+
// Verify maps are actually populated (database might be empty despite recent update timestamp)
256+
if len(dxccClient.PrefixesMap) == 0 {
257+
logging.Warn("DXCC database is empty despite recent update timestamp. Forcing download now...")
258+
dxccClient.FetchAndStoreData(ctx)
259+
logging.Info("DXCC data downloaded and loaded successfully after empty database detection.")
260+
}
261+
}
262+
263+
// Now start background periodic updater
264+
dxccClient.StartUpdater(ctx)
265+
logging.Info("DXCC client ready. Background updater started for periodic checks.")
222266
defer func() {
223267
logging.Info("Deferred: closing DXCC client (stopping updater)...")
224268
dxccClient.Close()
225269
logging.Info("Deferred: DXCC client closed.")
226270
}()
227271

228-
// --- 5. Initialize POTA Client (if enabled) ---
272+
// ═══════════════════════════════════════════════════════════════════════════
273+
// CRITICAL BARRIER: DXCC and LoTW data are now FULLY LOADED and ready.
274+
// ═══════════════════════════════════════════════════════════════════════════
275+
// Spots can now be enriched immediately upon arrival from any source.
276+
logging.Info("DXCC and LoTW data loaded. Ready to initialize spot sources and HTTP API.")
277+
278+
// --- 6. Initialize POTA Client (if enabled) ---
229279
var potaClient *pota.Client
230280
if cfg.EnablePOTA {
231281
potaClient, err = pota.NewClient(ctx, *cfg, rdb)
@@ -244,53 +294,97 @@ func RunApplication(ctx context.Context, args []string) int {
244294
}()
245295
}
246296

247-
// --- 6. Initialize DX Cluster Clients ---
297+
// --- 6. Initialize DX Cluster Clients (but don't connect yet) ---
248298
dxClusterClients := make([]*cluster.Client, 0, len(cfg.Clusters))
249-
dxClusterNames := make([]string, 0, len(cfg.Clusters))
299+
dxClusterHosts := make([]string, 0, len(cfg.Clusters))
250300
for _, cc := range cfg.Clusters {
251301
client, err := cluster.NewClient(cc)
252302
if err != nil {
253303
logging.Warn("Failed to create DX Cluster client for %s:%s: %v. Skipping this cluster.", cc.Host, cc.Port, err)
254304
continue
255305
}
256306
dxClusterClients = append(dxClusterClients, client)
257-
dxClusterNames = append(dxClusterNames, cc.ClusterName)
258-
client.Connect(ctx) // Start connection and reconnection loop
259-
// Log a concise one-line message showing the configured channel buffer
307+
dxClusterHosts = append(dxClusterHosts, cc.Host)
308+
// DON'T call Connect() yet - we'll do that after HTTP API is ready
260309
buf := cap(client.SpotChan)
261-
logging.Info("DX Cluster client for %s (%s:%s) initialized and connecting. channel_buffer=%d", cc.ClusterName, cc.Host, cc.Port, buf)
310+
logging.Info("DX Cluster client for %s:%s created. channel_buffer=%d", cc.Host, cc.Port, buf)
262311
}
263312
if len(dxClusterClients) == 0 && !cfg.EnablePOTA {
264313
logging.Error("No active DX Cluster connections and POTA is disabled. Exiting as there are no spot sources.")
265314
return 2
266315
}
267316

268-
// --- 7. Spot Aggregation & Enrichment ---
317+
// --- 7. Create Spot Cache and Set up HTTP API BEFORE connecting spot sources ---
269318
centralSpotCache := newSpotCache(cfg.MaxCache)
319+
320+
router := gin.Default()
321+
router.SetTrustedProxies(nil) // To prevent "x-forwarded-for" issues if not behind a proxy
322+
323+
// Middleware to handle BaseURL prefix if configured
324+
if cfg.BaseURL != "/" && cfg.BaseURL != "" {
325+
router.Group(cfg.BaseURL).Use(func(c *gin.Context) {
326+
// Trim base URL from request path for internal routing
327+
c.Request.URL.Path = strings.TrimPrefix(c.Request.URL.Path, cfg.BaseURL)
328+
c.Next()
329+
}).GET("/healthz", func(c *gin.Context) {
330+
c.Status(http.StatusOK)
331+
})
332+
apiGroup := router.Group(cfg.BaseURL)
333+
setupAPIRoutes(apiGroup, centralSpotCache, dxccClient, lotwClient)
334+
} else {
335+
router.GET("/healthz", func(c *gin.Context) {
336+
c.Status(http.StatusOK)
337+
})
338+
setupAPIRoutes(router.Group("/"), centralSpotCache, dxccClient, lotwClient)
339+
}
340+
341+
srv := &http.Server{
342+
Addr: fmt.Sprintf(":%d", cfg.WebPort),
343+
Handler: router,
344+
}
345+
346+
// Start HTTP server in goroutine
347+
go func() {
348+
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
349+
log.Fatalf("FATAL: HTTP server failed: %v\n", err)
350+
}
351+
}()
352+
logging.Info("HTTP API listening on :%d (BaseURL: %s)", cfg.WebPort, cfg.BaseURL)
353+
354+
// ═══════════════════════════════════════════════════════════════════════════
355+
// NOW connect DX clusters and start POTA polling - everything is ready!
356+
// ═══════════════════════════════════════════════════════════════════════════
357+
358+
for i, client := range dxClusterClients {
359+
client.Connect(ctx) // Start connection and reconnection loop
360+
logging.Info("DX Cluster client %s:%s connecting...", dxClusterHosts[i], cfg.Clusters[i].Port)
361+
}
362+
363+
// --- 8. Spot Aggregation & Enrichment ---
270364
spotChannels := make([]<-chan spot.Spot, 0) // Collect all spot output channels
271365

272366
// Add DX Cluster spot channels (convert cluster.Spot -> unified spot.Spot)
273367
for i, client := range dxClusterClients {
274-
clusterName := dxClusterNames[i]
368+
clusterHost := dxClusterHosts[i]
275369
// Buffer the forwarder channel so brief startup ordering doesn't drop
276370
// spots that are emitted before the merge goroutines are scheduled.
277371
ch := make(chan spot.Spot, 8)
278372
// Forwarder goroutine: convert cluster.Spot to spot.Spot and send on ch
279-
go func(c *cluster.Client, out chan<- spot.Spot, clusterName string) {
373+
go func(c *cluster.Client, out chan<- spot.Spot, clusterHost string) {
280374
defer close(out)
281375
forwardedCount := 0
282376
for {
283377
select {
284378
case <-ctx.Done():
285-
logging.Info("Cluster forwarder [%s] shutting down. Forwarded %d spots.", clusterName, forwardedCount)
379+
logging.Info("Cluster forwarder [%s] shutting down. Forwarded %d spots.", clusterHost, forwardedCount)
286380
return
287381
case s, ok := <-c.SpotChan:
288382
if !ok {
289-
logging.Info("Cluster forwarder [%s] spot channel closed. Forwarded %d spots.", clusterName, forwardedCount)
383+
logging.Info("Cluster forwarder [%s] spot channel closed. Forwarded %d spots.", clusterHost, forwardedCount)
290384
return
291385
}
292386
forwardedCount++
293-
logging.Info("CLUSTER [%s] RAW SPOT #%d: %s -> %s @ %.3f kHz - %s", clusterName, forwardedCount, s.Spotter, s.Spotted, s.Frequency, s.Message)
387+
logging.Info("CLUSTER [%s] RAW SPOT #%d: %s -> %s @ %.3f kHz - %s", clusterHost, forwardedCount, s.Spotter, s.Spotted, s.Frequency, s.Message)
294388

295389
// Send into the buffered forwarder channel. If the application is
296390
// shutting down, exit promptly via ctx.Done.
@@ -303,13 +397,13 @@ func RunApplication(ctx context.Context, args []string) int {
303397
When: s.When,
304398
Source: s.Source,
305399
}:
306-
logging.Info("CLUSTER [%s] FORWARDED SPOT #%d to aggregator", clusterName, forwardedCount)
400+
logging.Info("CLUSTER [%s] FORWARDED SPOT #%d to aggregator", clusterHost, forwardedCount)
307401
case <-ctx.Done():
308402
return
309403
}
310404
}
311405
}
312-
}(client, ch, clusterName)
406+
}(client, ch, clusterHost)
313407

314408
spotChannels = append(spotChannels, ch)
315409

@@ -376,6 +470,15 @@ func RunApplication(ctx context.Context, args []string) int {
376470
// Verbose tracing for spot pipeline; enabled by setting DX_API_VERBOSE_SPOT_PIPELINE=1
377471
verbose := os.Getenv("DX_API_VERBOSE_SPOT_PIPELINE") == "1" || strings.ToLower(os.Getenv("DX_API_VERBOSE_SPOT_PIPELINE")) == "true"
378472
spotCount := 0
473+
474+
// Duplicate detection: track spotter+spotted pairs with timestamp
475+
type spotKey struct {
476+
spotter string
477+
spotted string
478+
}
479+
recentSpots := make(map[spotKey]time.Time)
480+
const dedupeWindow = 30 * time.Second
481+
379482
for {
380483
select {
381484
case <-ctx.Done():
@@ -386,6 +489,43 @@ func RunApplication(ctx context.Context, args []string) int {
386489
logging.Info("Spot aggregation: merged channel closed, exiting goroutine. Total spots processed: %d", spotCount)
387490
return
388491
}
492+
493+
// Validate required fields - reject spots with missing callsigns
494+
if strings.TrimSpace(receivedSpot.Spotter) == "" {
495+
logging.Warn("SPOT REJECTED: missing spotter callsign. spotted=%q freq=%.3f source=%s", receivedSpot.Spotted, receivedSpot.Frequency, receivedSpot.Source)
496+
continue
497+
}
498+
if strings.TrimSpace(receivedSpot.Spotted) == "" {
499+
logging.Warn("SPOT REJECTED: missing spotted callsign. spotter=%q freq=%.3f source=%s", receivedSpot.Spotter, receivedSpot.Frequency, receivedSpot.Source)
500+
continue
501+
}
502+
503+
// Check for duplicate spot (same spotter+spotted within 30 seconds)
504+
key := spotKey{
505+
spotter: receivedSpot.Spotter,
506+
spotted: receivedSpot.Spotted,
507+
}
508+
now := time.Now()
509+
510+
// Clean up old entries from the map (older than 30 seconds)
511+
for k, t := range recentSpots {
512+
if now.Sub(t) > dedupeWindow {
513+
delete(recentSpots, k)
514+
}
515+
}
516+
517+
// Check if this is a duplicate
518+
if lastSeen, exists := recentSpots[key]; exists {
519+
if now.Sub(lastSeen) < dedupeWindow {
520+
logging.Debug("DUPLICATE SPOT FILTERED: %s -> %s @ %.3f kHz [%s] (seen %.1f seconds ago)",
521+
receivedSpot.Spotter, receivedSpot.Spotted, receivedSpot.Frequency, receivedSpot.Source, now.Sub(lastSeen).Seconds())
522+
continue
523+
}
524+
}
525+
526+
// Record this spot
527+
recentSpots[key] = now
528+
389529
spotCount++
390530
logging.Info("SPOT #%d RECEIVED: %s -> %s @ %.3f kHz [%s] - %s", spotCount, receivedSpot.Spotter, receivedSpot.Spotted, receivedSpot.Frequency, receivedSpot.Source, receivedSpot.Message)
391531
if verbose {
@@ -404,7 +544,7 @@ func RunApplication(ctx context.Context, args []string) int {
404544
continue
405545
}
406546
centralSpotCache.AddSpot(enrichedSpot)
407-
logging.Info("SPOT #%d CACHED (enriched): %s -> %s @ %.3f kHz [%s] Band=%s", spotCount, enrichedSpot.Spotter, enrichedSpot.Spotted, enrichedSpot.Frequency, enrichedSpot.Source, enrichedSpot.Band)
547+
logging.Info("SPOT #%d CACHED (enriched): %s -> %s @ %.3fMHz (%s) [%s]", spotCount, enrichedSpot.Spotter, enrichedSpot.Spotted, enrichedSpot.Frequency, enrichedSpot.Band, enrichedSpot.Source)
408548
if verbose {
409549
logging.Debug("Aggregator added enriched spot from %s (spotter=%s)", enrichedSpot.Source, enrichedSpot.Spotter)
410550
}
@@ -421,50 +561,6 @@ func RunApplication(ctx context.Context, args []string) int {
421561
logging.Info("POTA polling started (deferred).")
422562
}
423563

424-
// --- 8. Set up HTTP API (Gin) ---
425-
router := gin.Default()
426-
router.SetTrustedProxies(nil) // To prevent "x-forwarded-for" issues if not behind a proxy
427-
428-
// Middleware to handle BaseURL prefix if configured
429-
if cfg.BaseURL != "/" && cfg.BaseURL != "" {
430-
router.Group(cfg.BaseURL).Use(func(c *gin.Context) {
431-
// Trim base URL from request path for internal routing
432-
c.Request.URL.Path = strings.TrimPrefix(c.Request.URL.Path, cfg.BaseURL)
433-
c.Next()
434-
}).GET("/healthz", func(c *gin.Context) {
435-
c.Status(http.StatusOK)
436-
})
437-
apiGroup := router.Group(cfg.BaseURL)
438-
setupAPIRoutes(apiGroup, centralSpotCache, dxccClient, lotwClient)
439-
} else {
440-
router.GET("/healthz", func(c *gin.Context) {
441-
c.Status(http.StatusOK)
442-
})
443-
setupAPIRoutes(router.Group("/"), centralSpotCache, dxccClient, lotwClient)
444-
}
445-
446-
srv := &http.Server{
447-
Addr: fmt.Sprintf(":%d", cfg.WebPort),
448-
Handler: router,
449-
}
450-
451-
// Try to listen synchronously so we can fail fast on bind errors. Tests expect
452-
// RunApplication to return quickly on failure rather than hang while other
453-
// goroutines continue running.
454-
addr := fmt.Sprintf(":%d", cfg.WebPort)
455-
ln, err := net.Listen("tcp", addr)
456-
if err != nil {
457-
log.Printf("FATAL: HTTP server failed to listen on %s: %v", addr, err)
458-
return 3
459-
}
460-
logging.Info("HTTP API listening on %s (BaseURL: %s)", addr, cfg.BaseURL)
461-
// Serve with the acquired listener in a goroutine. Serve will return when Shutdown is called.
462-
go func() {
463-
if err := srv.Serve(ln); err != nil && err != http.ErrServerClosed {
464-
logging.Error("HTTP server exited with error: %v", err)
465-
}
466-
}()
467-
468564
// --- 9. Graceful Shutdown ---
469565
quit := make(chan os.Signal, 1)
470566
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
@@ -494,8 +590,8 @@ func RunApplication(ctx context.Context, args []string) int {
494590
// a stuck client Close() won't hang the entire shutdown process.
495591
for i, c := range dxClusterClients {
496592
name := "<unknown>"
497-
if i < len(dxClusterNames) {
498-
name = dxClusterNames[i]
593+
if i < len(dxClusterHosts) {
594+
name = dxClusterHosts[i]
499595
}
500596
logging.Info("Closing DX Cluster client %s...", name)
501597
done := make(chan struct{})

0 commit comments

Comments
 (0)