From 947fc4b149250e0627c6749ad62c359cb42f19b2 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 01:30:34 -0500 Subject: [PATCH 01/16] Add an extension to db config to support the new influx v2 fields --- lib/db/db.go | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/lib/db/db.go b/lib/db/db.go index d39b980..9233e59 100644 --- a/lib/db/db.go +++ b/lib/db/db.go @@ -2,21 +2,42 @@ package db // Handler is an interface for database access implementations type Handler interface { - // Initialize sets up the database client Initialize(host string, port int) error - // Insert sends the measurement data to the database Insert(measurements MeasurementGroup) error - // CreateQuery generates the database query for measurementGroup CreateQuery(measurements MeasurementGroup) string - // Close closes the database client when done Close() error } +// BatchHandler extends Handler with batch write support +type BatchHandler interface { + Handler + Flush() error +} + +// Config holds all configuration needed to initialize any Handler +// V1 only uses Host/Port. V2 requires URL, Token, Org, Bucket. +type Config struct { + // V1 + Host string + Port int + + // V2 + URL string + Token string + Org string + Bucket string + + // Batching (V2 only) + BatchSize uint // Points to buffer before flushing + FlushInterval uint // Max ms before flushing partial batch + Precision string // "ns", "us", "ms", "s" +} + // MeasurementGroup is a group of measurements to be sent to the database type MeasurementGroup struct { - DatabaseName string // Name of the database - Timestamp int64 // Unix timestamp in nanoseconds - Measurements []Measurement // List of measurements to be sent + DatabaseName string + Timestamp int64 + Measurements []Measurement } // Measurement is a single measurement to be sent to the database From d2186e8a02f80da05454a48aca8ebff5c94ed4d2 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 01:42:32 -0500 Subject: [PATCH 02/16] Actually just blast the interface and v1. lets just purge rot --- go.mod | 5 +++ go.sum | 15 ++++++++ lib/db/db.go | 46 ++++++----------------- lib/db/influx_v1.go | 90 --------------------------------------------- lib/db/influx_v2.go | 0 5 files changed, 32 insertions(+), 124 deletions(-) delete mode 100644 lib/db/influx_v1.go create mode 100644 lib/db/influx_v2.go diff --git a/go.mod b/go.mod index ef634fb..e27a834 100644 --- a/go.mod +++ b/go.mod @@ -16,13 +16,18 @@ require ( ) require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gdamore/encoding v1.0.0 // indirect + github.com/google/uuid v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect + github.com/influxdata/influxdb-client-go/v2 v2.14.0 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect diff --git a/go.sum b/go.sum index 6853a0c..ed9a957 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -14,12 +18,19 @@ github.com/gdamore/tcell/v2 v2.7.1 h1:TiCcmpWHiAU7F0rA2I3S2Y4mmLmO9KHxJ7E1QhYzQb github.com/gdamore/tcell/v2 v2.7.1/go.mod h1:dSXtXTSK0VsW1biw65DZLZ2NKr7j0qP/0J7ONmsraWg= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= +github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -32,6 +43,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -59,10 +72,12 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= diff --git a/lib/db/db.go b/lib/db/db.go index 9233e59..8c4d427 100644 --- a/lib/db/db.go +++ b/lib/db/db.go @@ -1,47 +1,25 @@ package db -// Handler is an interface for database access implementations -type Handler interface { - Initialize(host string, port int) error - Insert(measurements MeasurementGroup) error - CreateQuery(measurements MeasurementGroup) string - Close() error +// MeasurementGroup is a group of measurements to be sent to the database +type MeasurementGroup struct { + DatabaseName string // Maps to InfluxDB v2 measurement name + Timestamp int64 // Unix timestamp in nanoseconds + Measurements []Measurement // List of measurements to be sent } -// BatchHandler extends Handler with batch write support -type BatchHandler interface { - Handler - Flush() error +// Measurement is a single measurement to be sent to the database +type Measurement struct { + Name string + Value string } -// Config holds all configuration needed to initialize any Handler -// V1 only uses Host/Port. V2 requires URL, Token, Org, Bucket. +// Config holds all configuration needed to initialize a DB handler type Config struct { - // V1 - Host string - Port int - - // V2 URL string Token string Org string Bucket string - // Batching (V2 only) - BatchSize uint // Points to buffer before flushing - FlushInterval uint // Max ms before flushing partial batch - Precision string // "ns", "us", "ms", "s" -} - -// MeasurementGroup is a group of measurements to be sent to the database -type MeasurementGroup struct { - DatabaseName string - Timestamp int64 - Measurements []Measurement -} - -// Measurement is a single measurement to be sent to the database -type Measurement struct { - Name string // Name of the measurement - Value string // Value of the measurement + BatchSize uint // Points to buffer before auto-flush (default: 100) + FlushInterval uint // Max ms before flushing a partial batch (default: 1000) } diff --git a/lib/db/influx_v1.go b/lib/db/influx_v1.go deleted file mode 100644 index ecaad1e..0000000 --- a/lib/db/influx_v1.go +++ /dev/null @@ -1,90 +0,0 @@ -package db - -import ( - "fmt" - "net" - - "github.com/AarC10/GSW-V2/lib/logger" - "go.uber.org/zap" -) - -// InfluxDBV1Handler is a DB Handler implementation for InfluxDB v1 -type InfluxDBV1Handler struct { - conn net.UDPConn // UDP connection to InfluxDB - addr string // IP address and port of InfluxDB -} - -// Initialize sets up the InfluxDB UDP connection -func (h *InfluxDBV1Handler) Initialize(host string, port int) error { - h.addr = fmt.Sprintf("%s:%d", host, port) - addr, err := net.ResolveUDPAddr("udp", h.addr) - if err != nil { - return fmt.Errorf("resolving db address: %w", err) - } - - logger.Info("resolved database address", zap.String("url", addr.String())) - - conn, err := net.DialUDP("udp", nil, addr) - if err != nil { - return err - } - - h.conn = *conn - - return nil -} - -// CreateQuery generates InfluxDB query for measurement group -func (h *InfluxDBV1Handler) CreateQuery(measurements MeasurementGroup) string { - return CreateQuery(measurements) -} - -// CreateQuery generates InfluxDB query for measurement group -func CreateQuery(measurements MeasurementGroup) string { - query := measurements.DatabaseName + " " - - for _, measurement := range measurements.Measurements { - query += fmt.Sprintf("%s=%s,", measurement.Name, measurement.Value) - } - - // Don't check if string is empty. We expect the Name and the measurements to be non-empty. - query = query[:len(query)-1] - - // Add timestamp if it exists. Otherwise, Influx will default to current nano time - if measurements.Timestamp != 0 { - query += fmt.Sprintf(" %d", measurements.Timestamp) - } - - query += "\n" - - // TODO: Make a debug logger? - - return query -} - -// Insert sends the measurement group data to InfluxDB using UDP -func (h *InfluxDBV1Handler) Insert(measurements MeasurementGroup) error { - // Generate the InfluxDB line protocol query - query := h.CreateQuery(measurements) - - // Convert the query string to bytes - data := []byte(query) - - // Send the query data over UDP - _, err := h.conn.Write(data) - if err != nil { - return fmt.Errorf("error sending data to InfluxDB over UDP: %w", err) - } - - return nil -} - -// Close closes the InfluxDB UDP client when done -func (h *InfluxDBV1Handler) Close() error { - err := h.conn.Close() - if err != nil { - return fmt.Errorf("error closing InfluxDB UDP client: %w", err) - } - - return nil -} diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go new file mode 100644 index 0000000..e69de29 From a02e29309b1e5e69c6a982783f4ddedef3eced2f Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 01:42:49 -0500 Subject: [PATCH 03/16] Revert "Actually just blast the interface and v1. lets just purge rot" Changed my mind actually LOL. Maybe keep v1 for now around support both in case we need fallback. When confirmed v2 works for higher bandwidth application, future people can blast v1 This reverts commit d2186e8a02f80da05454a48aca8ebff5c94ed4d2. --- go.mod | 5 --- go.sum | 15 -------- lib/db/db.go | 46 +++++++++++++++++------ lib/db/influx_v1.go | 90 +++++++++++++++++++++++++++++++++++++++++++++ lib/db/influx_v2.go | 0 5 files changed, 124 insertions(+), 32 deletions(-) create mode 100644 lib/db/influx_v1.go delete mode 100644 lib/db/influx_v2.go diff --git a/go.mod b/go.mod index e27a834..ef634fb 100644 --- a/go.mod +++ b/go.mod @@ -16,18 +16,13 @@ require ( ) require ( - github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gdamore/encoding v1.0.0 // indirect - github.com/google/uuid v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect - github.com/influxdata/influxdb-client-go/v2 v2.14.0 // indirect - github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/oapi-codegen/runtime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect diff --git a/go.sum b/go.sum index ed9a957..6853a0c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,3 @@ -github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= -github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= -github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= -github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -18,19 +14,12 @@ github.com/gdamore/tcell/v2 v2.7.1 h1:TiCcmpWHiAU7F0rA2I3S2Y4mmLmO9KHxJ7E1QhYzQb github.com/gdamore/tcell/v2 v2.7.1/go.mod h1:dSXtXTSK0VsW1biw65DZLZ2NKr7j0qP/0J7ONmsraWg= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= -github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= -github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= -github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= -github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -43,8 +32,6 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= -github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -72,12 +59,10 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= -github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= diff --git a/lib/db/db.go b/lib/db/db.go index 8c4d427..9233e59 100644 --- a/lib/db/db.go +++ b/lib/db/db.go @@ -1,25 +1,47 @@ package db -// MeasurementGroup is a group of measurements to be sent to the database -type MeasurementGroup struct { - DatabaseName string // Maps to InfluxDB v2 measurement name - Timestamp int64 // Unix timestamp in nanoseconds - Measurements []Measurement // List of measurements to be sent +// Handler is an interface for database access implementations +type Handler interface { + Initialize(host string, port int) error + Insert(measurements MeasurementGroup) error + CreateQuery(measurements MeasurementGroup) string + Close() error } -// Measurement is a single measurement to be sent to the database -type Measurement struct { - Name string - Value string +// BatchHandler extends Handler with batch write support +type BatchHandler interface { + Handler + Flush() error } -// Config holds all configuration needed to initialize a DB handler +// Config holds all configuration needed to initialize any Handler +// V1 only uses Host/Port. V2 requires URL, Token, Org, Bucket. type Config struct { + // V1 + Host string + Port int + + // V2 URL string Token string Org string Bucket string - BatchSize uint // Points to buffer before auto-flush (default: 100) - FlushInterval uint // Max ms before flushing a partial batch (default: 1000) + // Batching (V2 only) + BatchSize uint // Points to buffer before flushing + FlushInterval uint // Max ms before flushing partial batch + Precision string // "ns", "us", "ms", "s" +} + +// MeasurementGroup is a group of measurements to be sent to the database +type MeasurementGroup struct { + DatabaseName string + Timestamp int64 + Measurements []Measurement +} + +// Measurement is a single measurement to be sent to the database +type Measurement struct { + Name string // Name of the measurement + Value string // Value of the measurement } diff --git a/lib/db/influx_v1.go b/lib/db/influx_v1.go new file mode 100644 index 0000000..ecaad1e --- /dev/null +++ b/lib/db/influx_v1.go @@ -0,0 +1,90 @@ +package db + +import ( + "fmt" + "net" + + "github.com/AarC10/GSW-V2/lib/logger" + "go.uber.org/zap" +) + +// InfluxDBV1Handler is a DB Handler implementation for InfluxDB v1 +type InfluxDBV1Handler struct { + conn net.UDPConn // UDP connection to InfluxDB + addr string // IP address and port of InfluxDB +} + +// Initialize sets up the InfluxDB UDP connection +func (h *InfluxDBV1Handler) Initialize(host string, port int) error { + h.addr = fmt.Sprintf("%s:%d", host, port) + addr, err := net.ResolveUDPAddr("udp", h.addr) + if err != nil { + return fmt.Errorf("resolving db address: %w", err) + } + + logger.Info("resolved database address", zap.String("url", addr.String())) + + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + return err + } + + h.conn = *conn + + return nil +} + +// CreateQuery generates InfluxDB query for measurement group +func (h *InfluxDBV1Handler) CreateQuery(measurements MeasurementGroup) string { + return CreateQuery(measurements) +} + +// CreateQuery generates InfluxDB query for measurement group +func CreateQuery(measurements MeasurementGroup) string { + query := measurements.DatabaseName + " " + + for _, measurement := range measurements.Measurements { + query += fmt.Sprintf("%s=%s,", measurement.Name, measurement.Value) + } + + // Don't check if string is empty. We expect the Name and the measurements to be non-empty. + query = query[:len(query)-1] + + // Add timestamp if it exists. Otherwise, Influx will default to current nano time + if measurements.Timestamp != 0 { + query += fmt.Sprintf(" %d", measurements.Timestamp) + } + + query += "\n" + + // TODO: Make a debug logger? + + return query +} + +// Insert sends the measurement group data to InfluxDB using UDP +func (h *InfluxDBV1Handler) Insert(measurements MeasurementGroup) error { + // Generate the InfluxDB line protocol query + query := h.CreateQuery(measurements) + + // Convert the query string to bytes + data := []byte(query) + + // Send the query data over UDP + _, err := h.conn.Write(data) + if err != nil { + return fmt.Errorf("error sending data to InfluxDB over UDP: %w", err) + } + + return nil +} + +// Close closes the InfluxDB UDP client when done +func (h *InfluxDBV1Handler) Close() error { + err := h.conn.Close() + if err != nil { + return fmt.Errorf("error closing InfluxDB UDP client: %w", err) + } + + return nil +} diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go deleted file mode 100644 index e69de29..0000000 From 7838e8ec0ebf2b9e3b157c2590c9c13368a7693d Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 01:57:24 -0500 Subject: [PATCH 04/16] redo the v2 impl now with the original interface and implement initialization, struct, flush close and insert batch so low hanging fruit --- go.mod | 5 ++ go.sum | 15 ++++++ lib/db/influx_v2.go | 118 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 138 insertions(+) create mode 100644 lib/db/influx_v2.go diff --git a/go.mod b/go.mod index ef634fb..452dcb5 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.5.1 github.com/gdamore/tcell/v2 v2.7.1 github.com/gorilla/websocket v1.5.3 + github.com/influxdata/influxdb-client-go/v2 v2.14.0 github.com/joho/godotenv v1.5.1 github.com/rivo/tview v0.0.0-20250330220935-949945f8d922 github.com/spf13/viper v1.19.0 @@ -16,13 +17,17 @@ require ( ) require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gdamore/encoding v1.0.0 // indirect + github.com/google/uuid v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect diff --git a/go.sum b/go.sum index 6853a0c..ed9a957 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -14,12 +18,19 @@ github.com/gdamore/tcell/v2 v2.7.1 h1:TiCcmpWHiAU7F0rA2I3S2Y4mmLmO9KHxJ7E1QhYzQb github.com/gdamore/tcell/v2 v2.7.1/go.mod h1:dSXtXTSK0VsW1biw65DZLZ2NKr7j0qP/0J7ONmsraWg= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= +github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -32,6 +43,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -59,10 +72,12 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go new file mode 100644 index 0000000..84de1ab --- /dev/null +++ b/lib/db/influx_v2.go @@ -0,0 +1,118 @@ +package db + +import ( + "context" + "fmt" + + "github.com/AarC10/GSW-V2/lib/logger" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" + "go.uber.org/zap" +) + +// InfluxDBV2Handler is a BatchHandler implementation for InfluxDB v1 +type InfluxDBV2Handler struct { + client influxdb2.Client + writeAPI api.WriteAPI + org string + bucket string + cfg Config +} + +// Initialize satisfies the Handler interface using host/port only so +// for V2 you most likey want InitializeWithConfig instead. +// This is a wrapper around InitializeWithConfig that fills in the URL and leaves +func (handler *InfluxDBV2Handler) Initialize(host string, port int) error { + return handler.InitializeWithConfig(Config{ + URL: fmt.Sprintf("http://%s:%d", host, port), + Token: "", + Org: "gsw", + Bucket: "gsw", + BatchSize: 100, + FlushInterval: 1000, + Precision: "ns", + }) +} + +// InitializeWithConfig sets up the InfluxDB v2 client with full config +func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg Config) error { + if cfg.BatchSize == 0 { + cfg.BatchSize = 100 + } + if cfg.FlushInterval == 0 { + cfg.FlushInterval = 1000 + } + if cfg.Precision == "" { + cfg.Precision = "ns" + } + + handler.cfg = cfg + handler.org = cfg.Org + handler.bucket = cfg.Bucket + + options := influxdb2.DefaultOptions(). + SetBatchSize(cfg.BatchSize). + SetFlushInterval(cfg.FlushInterval) + + handler.client = influxdb2.NewClientWithOptions(cfg.URL, cfg.Token, options) + handler.writeAPI = handler.client.WriteAPI(cfg.Org, cfg.Bucket) + + go func() { + for err := range handler.writeAPI.Errors() { + logger.Error("InfluxDB V2 async write error", zap.Error(err)) + } + }() + + logger.Info("InfluxDB V2 client initialized", + zap.String("url", cfg.URL), + zap.String("org", cfg.Org), + zap.String("bucket", cfg.Bucket), + zap.Uint("batchSize", cfg.BatchSize), + zap.Uint("flushInterval", cfg.FlushInterval), + ) + return nil +} + +// CreateQuery generates InfluxDB line protocol for a MeasurementGroup +func (handler *InfluxDBV2Handler) CreateQuery(measurements MeasurementGroup) string { + return CreateQuery(measurements) +} + +// Insert writes a MeasurementGroup to InfluxDB v2 as a single point +func (handler *InfluxDBV2Handler) Insert(measurements MeasurementGroup) error { + return nil +} + +// InsertBatch writes multiple MeasurementGroups in one shot and flushes +func (handler *InfluxDBV2Handler) InsertBatch(batch []MeasurementGroup) error { + for _, mg := range batch { + if err := handler.Insert(mg); err != nil { + return err + } + } + return handler.Flush() +} + +// Flush forces all buffered points to be sent immediately +func (handler *InfluxDBV2Handler) Flush() error { + handler.writeAPI.Flush() + return nil +} + +// Close flushes pending writes and closes the client. +func (handler *InfluxDBV2Handler) Close() error { + handler.writeAPI.Flush() + handler.client.Close() + return nil +} + +// BlockingInsert writes a point using the blocking write API. +func (handler *InfluxDBV2Handler) BlockingInsert(ctx context.Context, measurements MeasurementGroup) error { + return nil +} + +// Ensure InfluxDBV2Handler satisfies BatchHandler at compile time +var _ BatchHandler = (*InfluxDBV2Handler)(nil) + +type writePoint = write.Point From f6c6c5f5008f5db2c51a71e6404306d1176ea5b2 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 02:00:35 -0500 Subject: [PATCH 05/16] Implement remaining insert code. Make sure godocs end in periods .. . . . . . . . . . . . zz z z z z z z z z --- lib/db/influx_v2.go | 65 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 57 insertions(+), 8 deletions(-) diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go index 84de1ab..7872af3 100644 --- a/lib/db/influx_v2.go +++ b/lib/db/influx_v2.go @@ -3,6 +3,8 @@ package db import ( "context" "fmt" + "strconv" + "time" "github.com/AarC10/GSW-V2/lib/logger" influxdb2 "github.com/influxdata/influxdb-client-go/v2" @@ -58,6 +60,7 @@ func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg Config) error { handler.client = influxdb2.NewClientWithOptions(cfg.URL, cfg.Token, options) handler.writeAPI = handler.client.WriteAPI(cfg.Org, cfg.Bucket) + // error reporter because that'll totally never happen go func() { for err := range handler.writeAPI.Errors() { logger.Error("InfluxDB V2 async write error", zap.Error(err)) @@ -74,27 +77,51 @@ func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg Config) error { return nil } -// CreateQuery generates InfluxDB line protocol for a MeasurementGroup +// CreateQuery generates InfluxDB line protocol for a MeasurementGroup. +// Reuses the same logic as V1 for consistency. func (handler *InfluxDBV2Handler) CreateQuery(measurements MeasurementGroup) string { return CreateQuery(measurements) } -// Insert writes a MeasurementGroup to InfluxDB v2 as a single point +// Insert writes a MeasurementGroup to InfluxDB v2 as a single point. +// The write is buffered and flushed based on batch size and flush interval. func (handler *InfluxDBV2Handler) Insert(measurements MeasurementGroup) error { + point := influxdb2.NewPointWithMeasurement(measurements.DatabaseName) + + var ts time.Time + if measurements.Timestamp != 0 { + ts = time.Unix(0, measurements.Timestamp) + } else { + ts = time.Now() + } + point.SetTime(ts) + + for _, m := range measurements.Measurements { + // Try numeric first; fall back to string field. + if f, err := strconv.ParseFloat(m.Value, 64); err == nil { + point.AddField(m.Name, f) + } else if i, err := strconv.ParseInt(m.Value, 10, 64); err == nil { + point.AddField(m.Name, i) + } else { + point.AddField(m.Name, m.Value) + } + } + + handler.writeAPI.WritePoint(point) return nil } -// InsertBatch writes multiple MeasurementGroups in one shot and flushes +// InsertBatch writes multiple MeasurementGroups in one shot and flushes. func (handler *InfluxDBV2Handler) InsertBatch(batch []MeasurementGroup) error { - for _, mg := range batch { - if err := handler.Insert(mg); err != nil { + for _, measurementGroup := range batch { + if err := handler.Insert(measurementGroup); err != nil { return err } } return handler.Flush() } -// Flush forces all buffered points to be sent immediately +// Flush forces all buffered points to be sent immediately. func (handler *InfluxDBV2Handler) Flush() error { handler.writeAPI.Flush() return nil @@ -109,10 +136,32 @@ func (handler *InfluxDBV2Handler) Close() error { // BlockingInsert writes a point using the blocking write API. func (handler *InfluxDBV2Handler) BlockingInsert(ctx context.Context, measurements MeasurementGroup) error { - return nil + blockingAPI := handler.client.WriteAPIBlocking(handler.org, handler.bucket) + + point := influxdb2.NewPointWithMeasurement(measurements.DatabaseName) + var timestamp time.Time + if measurements.Timestamp != 0 { + timestamp = time.Unix(0, measurements.Timestamp) + } else { + timestamp = time.Now() + } + point.SetTime(timestamp) + + for _, measurement := range measurements.Measurements { + if floatVal, err := strconv.ParseFloat(measurement.Value, 64); err == nil { + point.AddField(measurement.Name, floatVal) + } else if intVal, err := strconv.ParseInt(measurement.Value, 10, 64); err == nil { + point.AddField(measurement.Name, intVal) + } else { + point.AddField(measurement.Name, measurement.Value) + } + } + + return blockingAPI.WritePoint(ctx, point) } -// Ensure InfluxDBV2Handler satisfies BatchHandler at compile time +// Ensure InfluxDBV2Handler satisfies BatchHandler at compile time. var _ BatchHandler = (*InfluxDBV2Handler)(nil) +// writePoint is a helper implementing write.PointWriter for testing. type writePoint = write.Point From 5127bbeec4afe8be07bb9fb87f80be3468854417 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 02:00:54 -0500 Subject: [PATCH 06/16] the letters be floating around on the screen --- lib/db/influx_v2.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go index 7872af3..b91079f 100644 --- a/lib/db/influx_v2.go +++ b/lib/db/influx_v2.go @@ -88,13 +88,13 @@ func (handler *InfluxDBV2Handler) CreateQuery(measurements MeasurementGroup) str func (handler *InfluxDBV2Handler) Insert(measurements MeasurementGroup) error { point := influxdb2.NewPointWithMeasurement(measurements.DatabaseName) - var ts time.Time + var timestamp time.Time if measurements.Timestamp != 0 { - ts = time.Unix(0, measurements.Timestamp) + timestamp = time.Unix(0, measurements.Timestamp) } else { - ts = time.Now() + timestamp = time.Now() } - point.SetTime(ts) + point.SetTime(timestamp) for _, m := range measurements.Measurements { // Try numeric first; fall back to string field. From c361ddc6ea7d9595402a2af9d7eafe31961eaed5 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 02:07:55 -0500 Subject: [PATCH 07/16] update db initialization and configs --- cmd/gsw_service.go | 41 ++++++++++++++++++++++------ data/config/gsw_service.yaml.example | 22 +++++++++++---- lib/db/influx_v2.go | 20 ++++++-------- 3 files changed, 58 insertions(+), 25 deletions(-) diff --git a/cmd/gsw_service.go b/cmd/gsw_service.go index e3e61b4..d709c2a 100644 --- a/cmd/gsw_service.go +++ b/cmd/gsw_service.go @@ -96,22 +96,47 @@ func decomInitialize(ctx context.Context) map[int]chan []byte { return channelMap } -func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, host string, port int) error { - dbHandler := db.InfluxDBV1Handler{} - err := dbHandler.Initialize(host, port) - if err != nil { - return err +func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, config *viper.Viper) error { + var handler db.Handler + + if config.IsSet("database_v2_url") { + v2cfg := db.Config{ + URL: config.GetString("database_v2_url"), + Token: config.GetString("database_v2_token"), + Org: config.GetString("database_v2_org"), + Bucket: config.GetString("database_v2_bucket"), + BatchSize: uint(config.GetInt("database_v2_batch_size")), + FlushInterval: uint(config.GetInt("database_v2_flush_interval_ms")), + Precision: config.GetString("database_v2_precision"), + } + h := &db.InfluxDBV2Handler{} + if err := h.InitializeWithConfig(v2cfg); err != nil { + return fmt.Errorf("initializing InfluxDB V2: %w", err) + } + handler = h + logger.Info("Using InfluxDB V2 handler with batching", + zap.Uint("batchSize", v2cfg.BatchSize), + zap.Uint("flushIntervalMs", v2cfg.FlushInterval), + ) + } else { + host := config.GetString("database_host_name") + port := config.GetInt("database_port_number") + h := &db.InfluxDBV1Handler{} + if err := h.Initialize(host, port); err != nil { + return fmt.Errorf("initializing InfluxDB V1: %w", err) + } + handler = h + logger.Info("Using InfluxDB V1 handler (UDP)") } for _, packet := range proc.GswConfig.TelemetryPackets { go func(packet tlm.TelemetryPacket, ch chan []byte) { - proc.DatabaseWriter(&dbHandler, packet, ch) + proc.DatabaseWriter(handler, packet, ch) }(packet, channelMap[packet.Port]) } return nil } - func readConfig() (*viper.Viper, int) { config := viper.New() config.SetConfigName(*configFilepath) @@ -175,7 +200,7 @@ func main() { channelMap := decomInitialize(ctx) if config.IsSet("database_host_name") && config.IsSet("database_port_number") { - err = dbInitialize(ctx, channelMap, config.GetString("database_host_name"), config.GetInt("database_port_number")) + err = dbInitialize(ctx, channelMap, config) if err != nil { logger.Warn("DB Initialization failed, telemetry packets will not be published to the database", zap.Error(err)) } diff --git a/data/config/gsw_service.yaml.example b/data/config/gsw_service.yaml.example index 598017e..48b0f59 100644 --- a/data/config/gsw_service.yaml.example +++ b/data/config/gsw_service.yaml.example @@ -1,9 +1,19 @@ -# path to telemetry definitions +# Path to telemetry definitions telemetry_config: data/config/backplane.yaml -# database defines the hostname and port of an InfluxDB v1 UDP input -database_host_name: localhost -database_port_number: 8089 +# InfluxDB V1 (UDP) +# database_host_name: localhost +# database_port_number: 8089 -# path to gsw_service logging config -logging_config: data/config/logger.yaml +# InfluxDB V2 (batched HTTP) +# If database_v2_url is set, V2 will be used and V1 settings are ignored +database_v2_url: http://localhost:8086 +database_v2_token: your-token-here +database_v2_org: gsw +database_v2_bucket: gsw +database_v2_batch_size: 100 # points buffered before auto-flush +database_v2_flush_interval_ms: 1000 # max ms before flushing partial batch +database_v2_precision: ns # ns | us | ms | s + +# Path to GSW service logging config +logging_config: data/config/logger.yaml \ No newline at end of file diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go index b91079f..d249601 100644 --- a/lib/db/influx_v2.go +++ b/lib/db/influx_v2.go @@ -22,8 +22,7 @@ type InfluxDBV2Handler struct { cfg Config } -// Initialize satisfies the Handler interface using host/port only so -// for V2 you most likey want InitializeWithConfig instead. +// Initialize satisfies the Handler interface using host/port only // This is a wrapper around InitializeWithConfig that fills in the URL and leaves func (handler *InfluxDBV2Handler) Initialize(host string, port int) error { return handler.InitializeWithConfig(Config{ @@ -78,9 +77,9 @@ func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg Config) error { } // CreateQuery generates InfluxDB line protocol for a MeasurementGroup. -// Reuses the same logic as V1 for consistency. +// This is literally useless so just return nothing. func (handler *InfluxDBV2Handler) CreateQuery(measurements MeasurementGroup) string { - return CreateQuery(measurements) + return "" } // Insert writes a MeasurementGroup to InfluxDB v2 as a single point. @@ -96,14 +95,13 @@ func (handler *InfluxDBV2Handler) Insert(measurements MeasurementGroup) error { } point.SetTime(timestamp) - for _, m := range measurements.Measurements { - // Try numeric first; fall back to string field. - if f, err := strconv.ParseFloat(m.Value, 64); err == nil { - point.AddField(m.Name, f) - } else if i, err := strconv.ParseInt(m.Value, 10, 64); err == nil { - point.AddField(m.Name, i) + for _, measurement := range measurements.Measurements { + if floatVal, err := strconv.ParseFloat(measurement.Value, 64); err == nil { + point.AddField(measurement.Name, floatVal) + } else if intVal, err := strconv.ParseInt(measurement.Value, 10, 64); err == nil { + point.AddField(measurement.Name, intVal) } else { - point.AddField(m.Name, m.Value) + point.AddField(measurement.Name, measurement.Value) } } From 3febf57bfbeae29fa112f87aa184ec945a487957 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 02:12:54 -0500 Subject: [PATCH 08/16] blast config checks in main gsw_service since we check in dbInit now --- cmd/gsw_service.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cmd/gsw_service.go b/cmd/gsw_service.go index d709c2a..2b046d3 100644 --- a/cmd/gsw_service.go +++ b/cmd/gsw_service.go @@ -199,13 +199,9 @@ func main() { defer telemetryConfigCleanup() channelMap := decomInitialize(ctx) - if config.IsSet("database_host_name") && config.IsSet("database_port_number") { - err = dbInitialize(ctx, channelMap, config) - if err != nil { - logger.Warn("DB Initialization failed, telemetry packets will not be published to the database", zap.Error(err)) - } - } else { - logger.Warn("database_host_name or database_port_number is not set, telemetry packets will not be published to the database") + err = dbInitialize(ctx, channelMap, config) + if err != nil { + logger.Warn("DB Initialization failed, telemetry packets will not be published to the database", zap.Error(err)) } // Wait for context cancellation or signal handling From 4e73656c5024b3f680d9199fb0c4ad66713f5cc2 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 02:36:42 -0500 Subject: [PATCH 09/16] Format and add missing periods i forgot at top of influx v2 godocs --- lib/db/db.go | 10 +++++++--- lib/db/influx_v2.go | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/db/db.go b/lib/db/db.go index 9233e59..ec9e564 100644 --- a/lib/db/db.go +++ b/lib/db/db.go @@ -2,9 +2,13 @@ package db // Handler is an interface for database access implementations type Handler interface { + // Initialize sets up the database . Initialize(host string, port int) error + // Insert sends the measurement data to the database. Insert(measurements MeasurementGroup) error + // CreateQuery generates the database query for measurementGroup. CreateQuery(measurements MeasurementGroup) string + // Close closes the database client when done. Close() error } @@ -35,9 +39,9 @@ type Config struct { // MeasurementGroup is a group of measurements to be sent to the database type MeasurementGroup struct { - DatabaseName string - Timestamp int64 - Measurements []Measurement + DatabaseName string // Name of the database + Timestamp int64 // Unix timestamp in nanoseconds + Measurements []Measurement // List of measurements to be sent } // Measurement is a single measurement to be sent to the database diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go index d249601..be0e239 100644 --- a/lib/db/influx_v2.go +++ b/lib/db/influx_v2.go @@ -22,8 +22,8 @@ type InfluxDBV2Handler struct { cfg Config } -// Initialize satisfies the Handler interface using host/port only -// This is a wrapper around InitializeWithConfig that fills in the URL and leaves +// Initialize satisfies the Handler interface using host/port only. +// This is a wrapper around InitializeWithConfig that fills in the URL and leaves. func (handler *InfluxDBV2Handler) Initialize(host string, port int) error { return handler.InitializeWithConfig(Config{ URL: fmt.Sprintf("http://%s:%d", host, port), @@ -36,7 +36,7 @@ func (handler *InfluxDBV2Handler) Initialize(host string, port int) error { }) } -// InitializeWithConfig sets up the InfluxDB v2 client with full config +// InitializeWithConfig sets up the InfluxDB v2 client with full config. func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg Config) error { if cfg.BatchSize == 0 { cfg.BatchSize = 100 From a472c9d3da502085d826a769bfcbc481360f82bb Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Feb 2026 02:38:29 -0500 Subject: [PATCH 10/16] fix ci --- lib/db/influx_v2.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go index be0e239..f7b9a7f 100644 --- a/lib/db/influx_v2.go +++ b/lib/db/influx_v2.go @@ -9,7 +9,6 @@ import ( "github.com/AarC10/GSW-V2/lib/logger" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/influxdata/influxdb-client-go/v2/api/write" "go.uber.org/zap" ) @@ -160,6 +159,3 @@ func (handler *InfluxDBV2Handler) BlockingInsert(ctx context.Context, measuremen // Ensure InfluxDBV2Handler satisfies BatchHandler at compile time. var _ BatchHandler = (*InfluxDBV2Handler)(nil) - -// writePoint is a helper implementing write.PointWriter for testing. -type writePoint = write.Point From b7e2d001316866c051aded9201bf7f8ecd513660 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Mar 2026 15:13:14 -0400 Subject: [PATCH 11/16] Resolve merge conflict compile issues --- cmd/gsw_service.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/cmd/gsw_service.go b/cmd/gsw_service.go index 231d2be..5ed5169 100644 --- a/cmd/gsw_service.go +++ b/cmd/gsw_service.go @@ -99,7 +99,7 @@ func decomInitialize(ctx context.Context, wg *sync.WaitGroup) map[int]chan []byt return channelMap } -func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, host string, port int, wg *sync.WaitGroup) error { +func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, config *viper.Viper, wg *sync.WaitGroup) error { var handler db.Handler if config.IsSet("database_v2_url") { @@ -130,17 +130,18 @@ func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, host stri } handler = h logger.Info("Using InfluxDB V1 handler (UDP)") - + } + for _, packet := range proc.GswConfig.TelemetryPackets { wg.Add(1) go func(packet tlm.TelemetryPacket, ch chan []byte) { defer wg.Done() - proc.DatabaseWriter(ctx, &dbHandler, packet, ch) + proc.DatabaseWriter(ctx, handler, packet, ch) }(packet, channelMap[packet.Port]) } return nil } - + func readConfig() (*viper.Viper, int) { config := viper.New() config.SetConfigName(*configFilepath) @@ -206,12 +207,8 @@ func main() { channelMap := decomInitialize(ctx, &wg) // Start DB writers - if config.IsSet("database_host_name") && config.IsSet("database_port_number") { - if err = dbInitialize(ctx, channelMap, config.GetString("database_host_name"), config.GetInt("database_port_number"), &wg); err != nil { - logger.Warn("DB Initialization failed, telemetry packets will not be published to the database", zap.Error(err)) - } - } else { - logger.Warn("database_host_name or database_port_number is not set, telemetry packets will not be published to the database") + if err = dbInitialize(ctx, channelMap, config, &wg); err != nil { + logger.Warn("DB Initialization failed, telemetry packets will not be published to the database", zap.Error(err)) } // Wait for shutdown signal From 1a3717bc1b44fff461d3b236db6ed0779bffa3a8 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Mar 2026 16:11:07 -0400 Subject: [PATCH 12/16] separatedb configs --- cmd/gsw_service.go | 10 ++++++---- lib/db/db.go | 20 -------------------- lib/db/influx_v1.go | 13 ++++++++++++- lib/db/influx_v2.go | 19 +++++++++++++++---- 4 files changed, 33 insertions(+), 29 deletions(-) diff --git a/cmd/gsw_service.go b/cmd/gsw_service.go index 5ed5169..d5fef15 100644 --- a/cmd/gsw_service.go +++ b/cmd/gsw_service.go @@ -103,7 +103,7 @@ func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, config *v var handler db.Handler if config.IsSet("database_v2_url") { - v2cfg := db.Config{ + v2cfg := db.InfluxDBV2Config{ URL: config.GetString("database_v2_url"), Token: config.GetString("database_v2_token"), Org: config.GetString("database_v2_org"), @@ -122,10 +122,12 @@ func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, config *v zap.Uint("flushIntervalMs", v2cfg.FlushInterval), ) } else { - host := config.GetString("database_host_name") - port := config.GetInt("database_port_number") + v1cfg := db.InfluxDBV1Config{ + Host: config.GetString("database_host_name"), + Port: config.GetInt("database_port_number"), + } h := &db.InfluxDBV1Handler{} - if err := h.Initialize(host, port); err != nil { + if err := h.InitializeWithConfig(v1cfg); err != nil { return fmt.Errorf("initializing InfluxDB V1: %w", err) } handler = h diff --git a/lib/db/db.go b/lib/db/db.go index ec9e564..1c13629 100644 --- a/lib/db/db.go +++ b/lib/db/db.go @@ -2,8 +2,6 @@ package db // Handler is an interface for database access implementations type Handler interface { - // Initialize sets up the database . - Initialize(host string, port int) error // Insert sends the measurement data to the database. Insert(measurements MeasurementGroup) error // CreateQuery generates the database query for measurementGroup. @@ -18,24 +16,6 @@ type BatchHandler interface { Flush() error } -// Config holds all configuration needed to initialize any Handler -// V1 only uses Host/Port. V2 requires URL, Token, Org, Bucket. -type Config struct { - // V1 - Host string - Port int - - // V2 - URL string - Token string - Org string - Bucket string - - // Batching (V2 only) - BatchSize uint // Points to buffer before flushing - FlushInterval uint // Max ms before flushing partial batch - Precision string // "ns", "us", "ms", "s" -} // MeasurementGroup is a group of measurements to be sent to the database type MeasurementGroup struct { diff --git a/lib/db/influx_v1.go b/lib/db/influx_v1.go index 7a07b0d..2189127 100644 --- a/lib/db/influx_v1.go +++ b/lib/db/influx_v1.go @@ -14,9 +14,20 @@ type InfluxDBV1Handler struct { addr string // IP address and port of InfluxDB } +// InfluxDBV1Config holds the fields needed for UDP writes. +type InfluxDBV1Config struct { + Host string + Port int +} + // Initialize sets up the InfluxDB UDP connection func (h *InfluxDBV1Handler) Initialize(host string, port int) error { - h.addr = fmt.Sprintf("%s:%d", host, port) + return h.InitializeWithConfig(InfluxDBV1Config{Host: host, Port: port}) +} + +// InitializeWithConfig sets up the InfluxDB UDP connection. +func (h *InfluxDBV1Handler) InitializeWithConfig(cfg InfluxDBV1Config) error { + h.addr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) addr, err := net.ResolveUDPAddr("udp", h.addr) if err != nil { return fmt.Errorf("resolving db address: %w", err) diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go index f7b9a7f..f7e62e0 100644 --- a/lib/db/influx_v2.go +++ b/lib/db/influx_v2.go @@ -18,13 +18,24 @@ type InfluxDBV2Handler struct { writeAPI api.WriteAPI org string bucket string - cfg Config + cfg InfluxDBV2Config +} + +// InfluxDBV2Config holds the fields needed by the InfluxDB v2 client. +type InfluxDBV2Config struct { + URL string + Token string + Org string + Bucket string + BatchSize uint + FlushInterval uint + Precision string } // Initialize satisfies the Handler interface using host/port only. // This is a wrapper around InitializeWithConfig that fills in the URL and leaves. func (handler *InfluxDBV2Handler) Initialize(host string, port int) error { - return handler.InitializeWithConfig(Config{ + return handler.InitializeWithConfig(InfluxDBV2Config{ URL: fmt.Sprintf("http://%s:%d", host, port), Token: "", Org: "gsw", @@ -36,7 +47,7 @@ func (handler *InfluxDBV2Handler) Initialize(host string, port int) error { } // InitializeWithConfig sets up the InfluxDB v2 client with full config. -func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg Config) error { +func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg InfluxDBV2Config) error { if cfg.BatchSize == 0 { cfg.BatchSize = 100 } @@ -77,7 +88,7 @@ func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg Config) error { // CreateQuery generates InfluxDB line protocol for a MeasurementGroup. // This is literally useless so just return nothing. -func (handler *InfluxDBV2Handler) CreateQuery(measurements MeasurementGroup) string { +func (handler *InfluxDBV2Handler) CreateQuery(_ MeasurementGroup) string { return "" } From f936d5fc4464adb910d34a43af4bd9b517b785c4 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Mar 2026 16:20:43 -0400 Subject: [PATCH 13/16] Precision enum and test --- cmd/gsw_service.go | 7 +++- lib/db/influx_v2.go | 58 +++++++++++++++++++++++++++++--- lib/db/influx_v2_test.go | 71 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 5 deletions(-) create mode 100644 lib/db/influx_v2_test.go diff --git a/cmd/gsw_service.go b/cmd/gsw_service.go index d5fef15..37d1254 100644 --- a/cmd/gsw_service.go +++ b/cmd/gsw_service.go @@ -103,6 +103,11 @@ func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, config *v var handler db.Handler if config.IsSet("database_v2_url") { + precision, err := db.ParsePrecision(config.GetString("database_v2_precision")) + if err != nil { + return fmt.Errorf("invalid database_v2_precision: %w", err) + } + v2cfg := db.InfluxDBV2Config{ URL: config.GetString("database_v2_url"), Token: config.GetString("database_v2_token"), @@ -110,7 +115,7 @@ func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, config *v Bucket: config.GetString("database_v2_bucket"), BatchSize: uint(config.GetInt("database_v2_batch_size")), FlushInterval: uint(config.GetInt("database_v2_flush_interval_ms")), - Precision: config.GetString("database_v2_precision"), + Precision: precision, } h := &db.InfluxDBV2Handler{} if err := h.InitializeWithConfig(v2cfg); err != nil { diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go index f7e62e0..bb93ddc 100644 --- a/lib/db/influx_v2.go +++ b/lib/db/influx_v2.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "github.com/AarC10/GSW-V2/lib/logger" @@ -29,7 +30,49 @@ type InfluxDBV2Config struct { Bucket string BatchSize uint FlushInterval uint - Precision string + Precision Precision +} + +// Precision constrains write precision to values supported by InfluxDB. +type Precision string + +const ( + PrecisionNS Precision = "ns" + PrecisionUS Precision = "us" + PrecisionMS Precision = "ms" + PrecisionS Precision = "s" +) + +// ParsePrecision normalizes and validates config precision values. +func ParsePrecision(raw string) (Precision, error) { + switch strings.ToLower(strings.TrimSpace(raw)) { + case "", string(PrecisionNS): + return PrecisionNS, nil + case string(PrecisionUS): + return PrecisionUS, nil + case string(PrecisionMS): + return PrecisionMS, nil + case string(PrecisionS): + return PrecisionS, nil + default: + return "", fmt.Errorf("expected one of ns, us, ms, s") + } +} + +// Duration maps precision values to the influx client write precision option. +func (p Precision) Duration() (time.Duration, error) { + switch p { + case PrecisionNS: + return time.Nanosecond, nil + case PrecisionUS: + return time.Microsecond, nil + case PrecisionMS: + return time.Millisecond, nil + case PrecisionS: + return time.Second, nil + default: + return 0, fmt.Errorf("unsupported precision %q", p) + } } // Initialize satisfies the Handler interface using host/port only. @@ -42,7 +85,7 @@ func (handler *InfluxDBV2Handler) Initialize(host string, port int) error { Bucket: "gsw", BatchSize: 100, FlushInterval: 1000, - Precision: "ns", + Precision: PrecisionNS, }) } @@ -55,7 +98,12 @@ func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg InfluxDBV2Config) err cfg.FlushInterval = 1000 } if cfg.Precision == "" { - cfg.Precision = "ns" + cfg.Precision = PrecisionNS + } + + precision, err := cfg.Precision.Duration() + if err != nil { + return fmt.Errorf("invalid precision: %w", err) } handler.cfg = cfg @@ -64,7 +112,8 @@ func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg InfluxDBV2Config) err options := influxdb2.DefaultOptions(). SetBatchSize(cfg.BatchSize). - SetFlushInterval(cfg.FlushInterval) + SetFlushInterval(cfg.FlushInterval). + SetPrecision(precision) handler.client = influxdb2.NewClientWithOptions(cfg.URL, cfg.Token, options) handler.writeAPI = handler.client.WriteAPI(cfg.Org, cfg.Bucket) @@ -80,6 +129,7 @@ func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg InfluxDBV2Config) err zap.String("url", cfg.URL), zap.String("org", cfg.Org), zap.String("bucket", cfg.Bucket), + zap.String("precision", string(cfg.Precision)), zap.Uint("batchSize", cfg.BatchSize), zap.Uint("flushInterval", cfg.FlushInterval), ) diff --git a/lib/db/influx_v2_test.go b/lib/db/influx_v2_test.go new file mode 100644 index 0000000..1a0e31d --- /dev/null +++ b/lib/db/influx_v2_test.go @@ -0,0 +1,71 @@ +package db + +import ( + "testing" + "time" +) + +func TestParsePrecision(t *testing.T) { + tests := []struct { + name string + input string + want Precision + wantErr bool + }{ + {name: "default empty", input: "", want: PrecisionNS}, + {name: "ns", input: "ns", want: PrecisionNS}, + {name: "uppercase and spaces", input: " MS ", want: PrecisionMS}, + {name: "us", input: "us", want: PrecisionUS}, + {name: "s", input: "s", want: PrecisionS}, + {name: "invalid", input: "minutes", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParsePrecision(tt.input) + if tt.wantErr { + if err == nil { + t.Fatalf("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != tt.want { + t.Fatalf("got %q, want %q", got, tt.want) + } + }) + } +} + +func TestPrecisionDuration(t *testing.T) { + tests := []struct { + precision Precision + want time.Duration + wantErr bool + }{ + {precision: PrecisionNS, want: time.Nanosecond}, + {precision: PrecisionUS, want: time.Microsecond}, + {precision: PrecisionMS, want: time.Millisecond}, + {precision: PrecisionS, want: time.Second}, + {precision: Precision("bogus"), wantErr: true}, + } + + for _, tt := range tests { + got, err := tt.precision.Duration() + if tt.wantErr { + if err == nil { + t.Fatalf("expected error for precision %q", tt.precision) + } + continue + } + if err != nil { + t.Fatalf("unexpected error for precision %q: %v", tt.precision, err) + } + if got != tt.want { + t.Fatalf("got %v, want %v", got, tt.want) + } + } +} + From fb44ca1eded41fafc9f781117bfa3e3787eb0613 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 26 Mar 2026 16:45:39 -0400 Subject: [PATCH 14/16] mess with the influx configs and optionalstuff --- cmd/gsw_service.go | 118 ++++++++++++++++++++------- data/config/gsw_service.yaml.example | 17 ++-- 2 files changed, 99 insertions(+), 36 deletions(-) diff --git a/cmd/gsw_service.go b/cmd/gsw_service.go index 37d1254..000ec82 100644 --- a/cmd/gsw_service.go +++ b/cmd/gsw_service.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "os/signal" + "strings" "sync" "syscall" @@ -21,6 +22,11 @@ import ( _ "net/http/pprof" ) +type resolvedDBConfig struct { + v1 *db.InfluxDBV1Config + v2 *db.InfluxDBV2Config +} + var ( shmDir = flag.String("shm", "/dev/shm", "directory to use for shared memory") configFilepath = flag.String("c", "gsw_service", "name of config file") @@ -99,44 +105,28 @@ func decomInitialize(ctx context.Context, wg *sync.WaitGroup) map[int]chan []byt return channelMap } -func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, config *viper.Viper, wg *sync.WaitGroup) error { +func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, cfg resolvedDBConfig, wg *sync.WaitGroup) error { var handler db.Handler - if config.IsSet("database_v2_url") { - precision, err := db.ParsePrecision(config.GetString("database_v2_precision")) - if err != nil { - return fmt.Errorf("invalid database_v2_precision: %w", err) - } - - v2cfg := db.InfluxDBV2Config{ - URL: config.GetString("database_v2_url"), - Token: config.GetString("database_v2_token"), - Org: config.GetString("database_v2_org"), - Bucket: config.GetString("database_v2_bucket"), - BatchSize: uint(config.GetInt("database_v2_batch_size")), - FlushInterval: uint(config.GetInt("database_v2_flush_interval_ms")), - Precision: precision, - } + if cfg.v2 != nil { h := &db.InfluxDBV2Handler{} - if err := h.InitializeWithConfig(v2cfg); err != nil { + if err := h.InitializeWithConfig(*cfg.v2); err != nil { return fmt.Errorf("initializing InfluxDB V2: %w", err) } handler = h logger.Info("Using InfluxDB V2 handler with batching", - zap.Uint("batchSize", v2cfg.BatchSize), - zap.Uint("flushIntervalMs", v2cfg.FlushInterval), + zap.Uint("batchSize", cfg.v2.BatchSize), + zap.Uint("flushIntervalMs", cfg.v2.FlushInterval), ) - } else { - v1cfg := db.InfluxDBV1Config{ - Host: config.GetString("database_host_name"), - Port: config.GetInt("database_port_number"), - } + } else if cfg.v1 != nil { h := &db.InfluxDBV1Handler{} - if err := h.InitializeWithConfig(v1cfg); err != nil { + if err := h.InitializeWithConfig(*cfg.v1); err != nil { return fmt.Errorf("initializing InfluxDB V1: %w", err) } handler = h logger.Info("Using InfluxDB V1 handler (UDP)") + } else { + return nil } for _, packet := range proc.GswConfig.TelemetryPackets { @@ -149,11 +139,77 @@ func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, config *v return nil } +func resolveDBConfig(config *viper.Viper) (resolvedDBConfig, error) { + v2Map := config.GetStringMap("database_v2") + if len(v2Map) > 0 { + precision, err := db.ParsePrecision(config.GetString("database_v2.precision")) + if err != nil { + return resolvedDBConfig{}, fmt.Errorf("invalid database_v2.precision: %w", err) + } + + v2cfg := db.InfluxDBV2Config{ + URL: config.GetString("database_v2.url"), + Token: config.GetString("database_v2.token"), + Org: config.GetString("database_v2.org"), + Bucket: config.GetString("database_v2.bucket"), + BatchSize: uint(config.GetInt("database_v2.batch_size")), + FlushInterval: uint(config.GetInt("database_v2.flush_interval_ms")), + Precision: precision, + } + + if v2cfg.URL == "" || v2cfg.Org == "" || v2cfg.Bucket == "" { + return resolvedDBConfig{}, errors.New("database_v2.url, database_v2.org, and database_v2.bucket are required when database_v2 is set") + } + + return resolvedDBConfig{v2: &v2cfg}, nil + } + + if config.IsSet("database_v2_url") || config.IsSet("database_v2_org") || config.IsSet("database_v2_bucket") { + logger.Warn("Legacy database_v2_* keys are deprecated; prefer nested database_v2.* config") + + precision, err := db.ParsePrecision(config.GetString("database_v2_precision")) + if err != nil { + return resolvedDBConfig{}, fmt.Errorf("invalid database_v2_precision: %w", err) + } + + v2cfg := db.InfluxDBV2Config{ + URL: config.GetString("database_v2_url"), + Token: config.GetString("database_v2_token"), + Org: config.GetString("database_v2_org"), + Bucket: config.GetString("database_v2_bucket"), + BatchSize: uint(config.GetInt("database_v2_batch_size")), + FlushInterval: uint(config.GetInt("database_v2_flush_interval_ms")), + Precision: precision, + } + + if v2cfg.URL == "" || v2cfg.Org == "" || v2cfg.Bucket == "" { + return resolvedDBConfig{}, errors.New("database_v2_url, database_v2_org, and database_v2_bucket are required when legacy database_v2_* settings are used") + } + + return resolvedDBConfig{v2: &v2cfg}, nil + } + + hostSet := config.IsSet("database_host_name") + portSet := config.IsSet("database_port_number") + if hostSet || portSet { + host := config.GetString("database_host_name") + port := config.GetInt("database_port_number") + if host == "" || port <= 0 { + return resolvedDBConfig{}, errors.New("database_host_name and database_port_number must both be set for InfluxDB V1") + } + v1cfg := db.InfluxDBV1Config{Host: host, Port: port} + return resolvedDBConfig{v1: &v1cfg}, nil + } + + return resolvedDBConfig{}, nil +} + func readConfig() (*viper.Viper, int) { config := viper.New() config.SetConfigName(*configFilepath) config.SetConfigType("yaml") config.SetEnvPrefix("GSW") + config.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) config.AutomaticEnv() config.AddConfigPath("data/config/") err := config.ReadInConfig() @@ -213,9 +269,15 @@ func main() { // Start decom writers channelMap := decomInitialize(ctx, &wg) - // Start DB writers - if err = dbInitialize(ctx, channelMap, config, &wg); err != nil { - logger.Warn("DB Initialization failed, telemetry packets will not be published to the database", zap.Error(err)) + resolvedDB, err := resolveDBConfig(config) + if err != nil { + logger.Warn("Database configuration is invalid; telemetry packets will not be published to the database", zap.Error(err)) + } else if resolvedDB.v1 != nil || resolvedDB.v2 != nil { + if err = dbInitialize(ctx, channelMap, resolvedDB, &wg); err != nil { + logger.Warn("DB initialization failed, telemetry packets will not be published to the database", zap.Error(err)) + } + } else { + logger.Info("No database configuration found; telemetry packets will not be published to the database") } // Wait for shutdown signal diff --git a/data/config/gsw_service.yaml.example b/data/config/gsw_service.yaml.example index 48b0f59..3da31f9 100644 --- a/data/config/gsw_service.yaml.example +++ b/data/config/gsw_service.yaml.example @@ -6,14 +6,15 @@ telemetry_config: data/config/backplane.yaml # database_port_number: 8089 # InfluxDB V2 (batched HTTP) -# If database_v2_url is set, V2 will be used and V1 settings are ignored -database_v2_url: http://localhost:8086 -database_v2_token: your-token-here -database_v2_org: gsw -database_v2_bucket: gsw -database_v2_batch_size: 100 # points buffered before auto-flush -database_v2_flush_interval_ms: 1000 # max ms before flushing partial batch -database_v2_precision: ns # ns | us | ms | s +# If database_v2 is set, V2 will be used and V1 settings are ignored +database_v2: + url: http://localhost:8086 + token: your-token-here + org: gsw + bucket: gsw + batch_size: 100 # points buffered before auto-flush + flush_interval_ms: 1000 # max ms before flushing partial batch + precision: ns # ns | us | ms | s # Path to GSW service logging config logging_config: data/config/logger.yaml \ No newline at end of file From 06b48e50d67d94da4d67d7ed9ea6e16aa2cb509c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 26 Mar 2026 20:46:56 +0000 Subject: [PATCH 15/16] Applied Formatting Changes During GitHub Build --- lib/db/db.go | 1 - lib/db/influx_v2_test.go | 1 - 2 files changed, 2 deletions(-) diff --git a/lib/db/db.go b/lib/db/db.go index 1c13629..645e2e7 100644 --- a/lib/db/db.go +++ b/lib/db/db.go @@ -16,7 +16,6 @@ type BatchHandler interface { Flush() error } - // MeasurementGroup is a group of measurements to be sent to the database type MeasurementGroup struct { DatabaseName string // Name of the database diff --git a/lib/db/influx_v2_test.go b/lib/db/influx_v2_test.go index 1a0e31d..0ab8caf 100644 --- a/lib/db/influx_v2_test.go +++ b/lib/db/influx_v2_test.go @@ -68,4 +68,3 @@ func TestPrecisionDuration(t *testing.T) { } } } - From 2876eed78c5dcdeb72cf36ed5a64d0c0c9f97995 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 2 Apr 2026 19:13:32 -0400 Subject: [PATCH 16/16] address remaining pr comments --- cmd/gsw_service.go | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/cmd/gsw_service.go b/cmd/gsw_service.go index 000ec82..52c8eda 100644 --- a/cmd/gsw_service.go +++ b/cmd/gsw_service.go @@ -164,31 +164,6 @@ func resolveDBConfig(config *viper.Viper) (resolvedDBConfig, error) { return resolvedDBConfig{v2: &v2cfg}, nil } - if config.IsSet("database_v2_url") || config.IsSet("database_v2_org") || config.IsSet("database_v2_bucket") { - logger.Warn("Legacy database_v2_* keys are deprecated; prefer nested database_v2.* config") - - precision, err := db.ParsePrecision(config.GetString("database_v2_precision")) - if err != nil { - return resolvedDBConfig{}, fmt.Errorf("invalid database_v2_precision: %w", err) - } - - v2cfg := db.InfluxDBV2Config{ - URL: config.GetString("database_v2_url"), - Token: config.GetString("database_v2_token"), - Org: config.GetString("database_v2_org"), - Bucket: config.GetString("database_v2_bucket"), - BatchSize: uint(config.GetInt("database_v2_batch_size")), - FlushInterval: uint(config.GetInt("database_v2_flush_interval_ms")), - Precision: precision, - } - - if v2cfg.URL == "" || v2cfg.Org == "" || v2cfg.Bucket == "" { - return resolvedDBConfig{}, errors.New("database_v2_url, database_v2_org, and database_v2_bucket are required when legacy database_v2_* settings are used") - } - - return resolvedDBConfig{v2: &v2cfg}, nil - } - hostSet := config.IsSet("database_host_name") portSet := config.IsSet("database_port_number") if hostSet || portSet {