diff --git a/cmd/gsw_service.go b/cmd/gsw_service.go index b734216..52c8eda 100644 --- a/cmd/gsw_service.go +++ b/cmd/gsw_service.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "os/signal" + "strings" "sync" "syscall" @@ -21,6 +22,11 @@ import ( _ "net/http/pprof" ) +type resolvedDBConfig struct { + v1 *db.InfluxDBV1Config + v2 *db.InfluxDBV2Config +} + var ( shmDir = flag.String("shm", "/dev/shm", "directory to use for shared memory") configFilepath = flag.String("c", "gsw_service", "name of config file") @@ -99,27 +105,86 @@ func decomInitialize(ctx context.Context, wg *sync.WaitGroup) map[int]chan []byt return channelMap } -func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, host string, port int, wg *sync.WaitGroup) error { - dbHandler := db.InfluxDBV1Handler{} - if err := dbHandler.Initialize(host, port); err != nil { - return err +func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, cfg resolvedDBConfig, wg *sync.WaitGroup) error { + var handler db.Handler + + if cfg.v2 != nil { + h := &db.InfluxDBV2Handler{} + if err := h.InitializeWithConfig(*cfg.v2); err != nil { + return fmt.Errorf("initializing InfluxDB V2: %w", err) + } + handler = h + logger.Info("Using InfluxDB V2 handler with batching", + zap.Uint("batchSize", cfg.v2.BatchSize), + zap.Uint("flushIntervalMs", cfg.v2.FlushInterval), + ) + } else if cfg.v1 != nil { + h := &db.InfluxDBV1Handler{} + if err := h.InitializeWithConfig(*cfg.v1); err != nil { + return fmt.Errorf("initializing InfluxDB V1: %w", err) + } + handler = h + logger.Info("Using InfluxDB V1 handler (UDP)") + } else { + return nil } for _, packet := range proc.GswConfig.TelemetryPackets { wg.Add(1) go func(packet tlm.TelemetryPacket, ch chan []byte) { defer wg.Done() - proc.DatabaseWriter(ctx, &dbHandler, packet, ch) + proc.DatabaseWriter(ctx, handler, packet, ch) }(packet, channelMap[packet.Port]) } return nil } +func resolveDBConfig(config *viper.Viper) (resolvedDBConfig, error) { + v2Map := config.GetStringMap("database_v2") + if len(v2Map) > 0 { + precision, err := db.ParsePrecision(config.GetString("database_v2.precision")) + if err != nil { + return resolvedDBConfig{}, fmt.Errorf("invalid database_v2.precision: %w", err) + } + + v2cfg := db.InfluxDBV2Config{ + URL: config.GetString("database_v2.url"), + Token: config.GetString("database_v2.token"), + Org: config.GetString("database_v2.org"), + Bucket: config.GetString("database_v2.bucket"), + BatchSize: uint(config.GetInt("database_v2.batch_size")), + FlushInterval: uint(config.GetInt("database_v2.flush_interval_ms")), + Precision: precision, + } + + if v2cfg.URL == "" || v2cfg.Org == "" || v2cfg.Bucket == "" { + return resolvedDBConfig{}, errors.New("database_v2.url, database_v2.org, and database_v2.bucket are required when database_v2 is set") + } + + return resolvedDBConfig{v2: &v2cfg}, nil + } + + hostSet := config.IsSet("database_host_name") + portSet := config.IsSet("database_port_number") + if hostSet || portSet { + host := config.GetString("database_host_name") + port := config.GetInt("database_port_number") + if host == "" || port <= 0 { + return resolvedDBConfig{}, errors.New("database_host_name and database_port_number must both be set for InfluxDB V1") + } + v1cfg := db.InfluxDBV1Config{Host: host, Port: port} + return resolvedDBConfig{v1: &v1cfg}, nil + } + + return resolvedDBConfig{}, nil +} + func readConfig() (*viper.Viper, int) { config := viper.New() config.SetConfigName(*configFilepath) config.SetConfigType("yaml") config.SetEnvPrefix("GSW") + config.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) config.AutomaticEnv() config.AddConfigPath("data/config/") err := config.ReadInConfig() @@ -179,13 +244,15 @@ func main() { // Start decom writers channelMap := decomInitialize(ctx, &wg) - // Start DB writers - if config.IsSet("database_host_name") && config.IsSet("database_port_number") { - if err = dbInitialize(ctx, channelMap, config.GetString("database_host_name"), config.GetInt("database_port_number"), &wg); err != nil { - logger.Warn("DB Initialization failed, telemetry packets will not be published to the database", zap.Error(err)) + resolvedDB, err := resolveDBConfig(config) + if err != nil { + logger.Warn("Database configuration is invalid; telemetry packets will not be published to the database", zap.Error(err)) + } else if resolvedDB.v1 != nil || resolvedDB.v2 != nil { + if err = dbInitialize(ctx, channelMap, resolvedDB, &wg); err != nil { + logger.Warn("DB initialization failed, telemetry packets will not be published to the database", zap.Error(err)) } } else { - logger.Warn("database_host_name or database_port_number is not set, telemetry packets will not be published to the database") + logger.Info("No database configuration found; telemetry packets will not be published to the database") } // Wait for shutdown signal diff --git a/data/config/gsw_service.yaml.example b/data/config/gsw_service.yaml.example index 598017e..3da31f9 100644 --- a/data/config/gsw_service.yaml.example +++ b/data/config/gsw_service.yaml.example @@ -1,9 +1,20 @@ -# path to telemetry definitions +# Path to telemetry definitions telemetry_config: data/config/backplane.yaml -# database defines the hostname and port of an InfluxDB v1 UDP input -database_host_name: localhost -database_port_number: 8089 +# InfluxDB V1 (UDP) +# database_host_name: localhost +# database_port_number: 8089 -# path to gsw_service logging config -logging_config: data/config/logger.yaml +# InfluxDB V2 (batched HTTP) +# If database_v2 is set, V2 will be used and V1 settings are ignored +database_v2: + url: http://localhost:8086 + token: your-token-here + org: gsw + bucket: gsw + batch_size: 100 # points buffered before auto-flush + flush_interval_ms: 1000 # max ms before flushing partial batch + precision: ns # ns | us | ms | s + +# Path to GSW service logging config +logging_config: data/config/logger.yaml \ No newline at end of file diff --git a/go.mod b/go.mod index 3f26ec7..efc2bef 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/gdamore/tcell/v2 v2.7.1 github.com/google/gopacket v1.1.19 github.com/gorilla/websocket v1.5.3 + github.com/influxdata/influxdb-client-go/v2 v2.14.0 github.com/joho/godotenv v1.5.1 github.com/rivo/tview v0.0.0-20250330220935-949945f8d922 github.com/spf13/viper v1.19.0 @@ -17,13 +18,17 @@ require ( ) require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gdamore/encoding v1.0.0 // indirect + github.com/google/uuid v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect diff --git a/go.sum b/go.sum index d3621b8..657ea55 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -14,14 +18,21 @@ github.com/gdamore/tcell/v2 v2.7.1 h1:TiCcmpWHiAU7F0rA2I3S2Y4mmLmO9KHxJ7E1QhYzQb github.com/gdamore/tcell/v2 v2.7.1/go.mod h1:dSXtXTSK0VsW1biw65DZLZ2NKr7j0qP/0J7ONmsraWg= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= +github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -34,6 +45,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -61,10 +74,12 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= diff --git a/lib/db/db.go b/lib/db/db.go index d39b980..645e2e7 100644 --- a/lib/db/db.go +++ b/lib/db/db.go @@ -2,16 +2,20 @@ package db // Handler is an interface for database access implementations type Handler interface { - // Initialize sets up the database client - Initialize(host string, port int) error - // Insert sends the measurement data to the database + // Insert sends the measurement data to the database. Insert(measurements MeasurementGroup) error - // CreateQuery generates the database query for measurementGroup + // CreateQuery generates the database query for measurementGroup. CreateQuery(measurements MeasurementGroup) string - // Close closes the database client when done + // Close closes the database client when done. Close() error } +// BatchHandler extends Handler with batch write support +type BatchHandler interface { + Handler + Flush() error +} + // MeasurementGroup is a group of measurements to be sent to the database type MeasurementGroup struct { DatabaseName string // Name of the database diff --git a/lib/db/influx_v1.go b/lib/db/influx_v1.go index 7a07b0d..2189127 100644 --- a/lib/db/influx_v1.go +++ b/lib/db/influx_v1.go @@ -14,9 +14,20 @@ type InfluxDBV1Handler struct { addr string // IP address and port of InfluxDB } +// InfluxDBV1Config holds the fields needed for UDP writes. +type InfluxDBV1Config struct { + Host string + Port int +} + // Initialize sets up the InfluxDB UDP connection func (h *InfluxDBV1Handler) Initialize(host string, port int) error { - h.addr = fmt.Sprintf("%s:%d", host, port) + return h.InitializeWithConfig(InfluxDBV1Config{Host: host, Port: port}) +} + +// InitializeWithConfig sets up the InfluxDB UDP connection. +func (h *InfluxDBV1Handler) InitializeWithConfig(cfg InfluxDBV1Config) error { + h.addr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) addr, err := net.ResolveUDPAddr("udp", h.addr) if err != nil { return fmt.Errorf("resolving db address: %w", err) diff --git a/lib/db/influx_v2.go b/lib/db/influx_v2.go new file mode 100644 index 0000000..bb93ddc --- /dev/null +++ b/lib/db/influx_v2.go @@ -0,0 +1,222 @@ +package db + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/AarC10/GSW-V2/lib/logger" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "go.uber.org/zap" +) + +// InfluxDBV2Handler is a BatchHandler implementation for InfluxDB v1 +type InfluxDBV2Handler struct { + client influxdb2.Client + writeAPI api.WriteAPI + org string + bucket string + cfg InfluxDBV2Config +} + +// InfluxDBV2Config holds the fields needed by the InfluxDB v2 client. +type InfluxDBV2Config struct { + URL string + Token string + Org string + Bucket string + BatchSize uint + FlushInterval uint + Precision Precision +} + +// Precision constrains write precision to values supported by InfluxDB. +type Precision string + +const ( + PrecisionNS Precision = "ns" + PrecisionUS Precision = "us" + PrecisionMS Precision = "ms" + PrecisionS Precision = "s" +) + +// ParsePrecision normalizes and validates config precision values. +func ParsePrecision(raw string) (Precision, error) { + switch strings.ToLower(strings.TrimSpace(raw)) { + case "", string(PrecisionNS): + return PrecisionNS, nil + case string(PrecisionUS): + return PrecisionUS, nil + case string(PrecisionMS): + return PrecisionMS, nil + case string(PrecisionS): + return PrecisionS, nil + default: + return "", fmt.Errorf("expected one of ns, us, ms, s") + } +} + +// Duration maps precision values to the influx client write precision option. +func (p Precision) Duration() (time.Duration, error) { + switch p { + case PrecisionNS: + return time.Nanosecond, nil + case PrecisionUS: + return time.Microsecond, nil + case PrecisionMS: + return time.Millisecond, nil + case PrecisionS: + return time.Second, nil + default: + return 0, fmt.Errorf("unsupported precision %q", p) + } +} + +// Initialize satisfies the Handler interface using host/port only. +// This is a wrapper around InitializeWithConfig that fills in the URL and leaves. +func (handler *InfluxDBV2Handler) Initialize(host string, port int) error { + return handler.InitializeWithConfig(InfluxDBV2Config{ + URL: fmt.Sprintf("http://%s:%d", host, port), + Token: "", + Org: "gsw", + Bucket: "gsw", + BatchSize: 100, + FlushInterval: 1000, + Precision: PrecisionNS, + }) +} + +// InitializeWithConfig sets up the InfluxDB v2 client with full config. +func (handler *InfluxDBV2Handler) InitializeWithConfig(cfg InfluxDBV2Config) error { + if cfg.BatchSize == 0 { + cfg.BatchSize = 100 + } + if cfg.FlushInterval == 0 { + cfg.FlushInterval = 1000 + } + if cfg.Precision == "" { + cfg.Precision = PrecisionNS + } + + precision, err := cfg.Precision.Duration() + if err != nil { + return fmt.Errorf("invalid precision: %w", err) + } + + handler.cfg = cfg + handler.org = cfg.Org + handler.bucket = cfg.Bucket + + options := influxdb2.DefaultOptions(). + SetBatchSize(cfg.BatchSize). + SetFlushInterval(cfg.FlushInterval). + SetPrecision(precision) + + handler.client = influxdb2.NewClientWithOptions(cfg.URL, cfg.Token, options) + handler.writeAPI = handler.client.WriteAPI(cfg.Org, cfg.Bucket) + + // error reporter because that'll totally never happen + go func() { + for err := range handler.writeAPI.Errors() { + logger.Error("InfluxDB V2 async write error", zap.Error(err)) + } + }() + + logger.Info("InfluxDB V2 client initialized", + zap.String("url", cfg.URL), + zap.String("org", cfg.Org), + zap.String("bucket", cfg.Bucket), + zap.String("precision", string(cfg.Precision)), + zap.Uint("batchSize", cfg.BatchSize), + zap.Uint("flushInterval", cfg.FlushInterval), + ) + return nil +} + +// CreateQuery generates InfluxDB line protocol for a MeasurementGroup. +// This is literally useless so just return nothing. +func (handler *InfluxDBV2Handler) CreateQuery(_ MeasurementGroup) string { + return "" +} + +// Insert writes a MeasurementGroup to InfluxDB v2 as a single point. +// The write is buffered and flushed based on batch size and flush interval. +func (handler *InfluxDBV2Handler) Insert(measurements MeasurementGroup) error { + point := influxdb2.NewPointWithMeasurement(measurements.DatabaseName) + + var timestamp time.Time + if measurements.Timestamp != 0 { + timestamp = time.Unix(0, measurements.Timestamp) + } else { + timestamp = time.Now() + } + point.SetTime(timestamp) + + for _, measurement := range measurements.Measurements { + if floatVal, err := strconv.ParseFloat(measurement.Value, 64); err == nil { + point.AddField(measurement.Name, floatVal) + } else if intVal, err := strconv.ParseInt(measurement.Value, 10, 64); err == nil { + point.AddField(measurement.Name, intVal) + } else { + point.AddField(measurement.Name, measurement.Value) + } + } + + handler.writeAPI.WritePoint(point) + return nil +} + +// InsertBatch writes multiple MeasurementGroups in one shot and flushes. +func (handler *InfluxDBV2Handler) InsertBatch(batch []MeasurementGroup) error { + for _, measurementGroup := range batch { + if err := handler.Insert(measurementGroup); err != nil { + return err + } + } + return handler.Flush() +} + +// Flush forces all buffered points to be sent immediately. +func (handler *InfluxDBV2Handler) Flush() error { + handler.writeAPI.Flush() + return nil +} + +// Close flushes pending writes and closes the client. +func (handler *InfluxDBV2Handler) Close() error { + handler.writeAPI.Flush() + handler.client.Close() + return nil +} + +// BlockingInsert writes a point using the blocking write API. +func (handler *InfluxDBV2Handler) BlockingInsert(ctx context.Context, measurements MeasurementGroup) error { + blockingAPI := handler.client.WriteAPIBlocking(handler.org, handler.bucket) + + point := influxdb2.NewPointWithMeasurement(measurements.DatabaseName) + var timestamp time.Time + if measurements.Timestamp != 0 { + timestamp = time.Unix(0, measurements.Timestamp) + } else { + timestamp = time.Now() + } + point.SetTime(timestamp) + + for _, measurement := range measurements.Measurements { + if floatVal, err := strconv.ParseFloat(measurement.Value, 64); err == nil { + point.AddField(measurement.Name, floatVal) + } else if intVal, err := strconv.ParseInt(measurement.Value, 10, 64); err == nil { + point.AddField(measurement.Name, intVal) + } else { + point.AddField(measurement.Name, measurement.Value) + } + } + + return blockingAPI.WritePoint(ctx, point) +} + +// Ensure InfluxDBV2Handler satisfies BatchHandler at compile time. +var _ BatchHandler = (*InfluxDBV2Handler)(nil) diff --git a/lib/db/influx_v2_test.go b/lib/db/influx_v2_test.go new file mode 100644 index 0000000..0ab8caf --- /dev/null +++ b/lib/db/influx_v2_test.go @@ -0,0 +1,70 @@ +package db + +import ( + "testing" + "time" +) + +func TestParsePrecision(t *testing.T) { + tests := []struct { + name string + input string + want Precision + wantErr bool + }{ + {name: "default empty", input: "", want: PrecisionNS}, + {name: "ns", input: "ns", want: PrecisionNS}, + {name: "uppercase and spaces", input: " MS ", want: PrecisionMS}, + {name: "us", input: "us", want: PrecisionUS}, + {name: "s", input: "s", want: PrecisionS}, + {name: "invalid", input: "minutes", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParsePrecision(tt.input) + if tt.wantErr { + if err == nil { + t.Fatalf("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != tt.want { + t.Fatalf("got %q, want %q", got, tt.want) + } + }) + } +} + +func TestPrecisionDuration(t *testing.T) { + tests := []struct { + precision Precision + want time.Duration + wantErr bool + }{ + {precision: PrecisionNS, want: time.Nanosecond}, + {precision: PrecisionUS, want: time.Microsecond}, + {precision: PrecisionMS, want: time.Millisecond}, + {precision: PrecisionS, want: time.Second}, + {precision: Precision("bogus"), wantErr: true}, + } + + for _, tt := range tests { + got, err := tt.precision.Duration() + if tt.wantErr { + if err == nil { + t.Fatalf("expected error for precision %q", tt.precision) + } + continue + } + if err != nil { + t.Fatalf("unexpected error for precision %q: %v", tt.precision, err) + } + if got != tt.want { + t.Fatalf("got %v, want %v", got, tt.want) + } + } +}