Skip to content

Commit ca55def

Browse files
authored
Merge branch 'cybertec-postgresql:master' into enhance-metric-form
2 parents e2080f0 + d01389d commit ca55def

12 files changed

Lines changed: 446 additions & 243 deletions

File tree

cmd/pgwatch/version.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,21 @@ import "fmt"
44

55
// version output variables
66
var (
7-
commit = "unknown"
8-
version = "unknown"
9-
date = "unknown"
10-
dbapi = "00824"
7+
commit = "unknown"
8+
version = "unknown"
9+
date = "unknown"
10+
configSchema = "00824"
11+
sinkSchema = "01180"
1112
)
1213

1314
func printVersion() {
1415
fmt.Printf(`
1516
Version info:
16-
Version: %s
17-
DB Schema: %s
18-
Git Commit: %s
19-
Built: %s
17+
Version: %s
18+
Config Schema: %s
19+
Sink Schema: %s
20+
Git Commit: %s
21+
Built: %s
2022
21-
`, version, dbapi, commit, date)
23+
`, version, configSchema, sinkSchema, commit, date)
2224
}

grafana/postgres/v12/4-tables-overview.json

Lines changed: 63 additions & 131 deletions
Large diffs are not rendered by default.

internal/cmdopts/cmdconfig.go

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cmdopts
33
import (
44
"context"
55
"errors"
6+
"fmt"
67

78
"github.com/cybertec-postgresql/pgwatch/v5/internal/db"
89
"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
@@ -87,46 +88,62 @@ type ConfigUpgradeCommand struct {
8788
// Execute upgrades the configuration schema.
8889
func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) {
8990
opts := cmd.owner
90-
if err = opts.ValidateConfig(); err != nil {
91-
return
91+
// For upgrade command, validate that at least one component is specified
92+
if len(opts.Sources.Sources)+len(opts.Metrics.Metrics)+len(opts.Sinks.Sinks) == 0 {
93+
opts.CompleteCommand(ExitCodeConfigError)
94+
return errors.New("at least one of --sources, --metrics, or --sink must be specified")
9295
}
96+
9397
ctx := context.Background()
94-
// Upgrade metrics/sources configuration if it's postgres
95-
if opts.IsPgConnStr(opts.Metrics.Metrics) && opts.IsPgConnStr(opts.Sources.Sources) {
96-
err = opts.InitMetricReader(ctx)
97-
if err != nil {
98-
opts.CompleteCommand(ExitCodeConfigError)
99-
return
98+
99+
f := func(uri string, newMigratorFunc func() (any, error)) error {
100+
if uri == "" {
101+
return nil
100102
}
101-
if m, ok := opts.MetricsReaderWriter.(db.Migrator); ok {
102-
err = m.Migrate()
103-
if err != nil {
104-
opts.CompleteCommand(ExitCodeConfigError)
105-
return
106-
}
103+
if !opts.IsPgConnStr(uri) {
104+
return fmt.Errorf("cannot upgrade storage %s: %w", uri, errors.ErrUnsupported)
107105
}
108-
} else {
109-
opts.CompleteCommand(ExitCodeConfigError)
110-
return errors.New("configuration storage does not support upgrade")
111-
}
112-
// Upgrade sinks configuration if it's postgres
113-
if len(opts.Sinks.Sinks) > 0 {
114-
opts.SinksWriter, err = sinks.NewSinkWriter(ctx, &opts.Sinks)
115-
if err != nil {
116-
opts.CompleteCommand(ExitCodeConfigError)
117-
return
106+
m, initErr := newMigratorFunc()
107+
if initErr != nil {
108+
return initErr
118109
}
119-
if m, ok := opts.SinksWriter.(db.Migrator); ok {
120-
err = m.Migrate()
121-
if err != nil {
122-
opts.CompleteCommand(ExitCodeConfigError)
123-
return
124-
}
125-
} else {
126-
opts.CompleteCommand(ExitCodeConfigError)
127-
return errors.New("sink storage does not support upgrade")
110+
return m.(db.Migrator).Migrate()
111+
112+
}
113+
114+
err = f(opts.Sources.Sources, func() (any, error) {
115+
return sources.NewPostgresSourcesReaderWriter(ctx, opts.Sources.Sources)
116+
})
117+
118+
err = errors.Join(err, f(opts.Metrics.Metrics, func() (any, error) {
119+
return metrics.NewPostgresMetricReaderWriter(ctx, opts.Metrics.Metrics)
120+
}))
121+
122+
for _, uri := range opts.Sinks.Sinks {
123+
err = errors.Join(err, f(uri, func() (any, error) {
124+
return sinks.NewPostgresSinkMigrator(ctx, uri)
125+
}))
126+
}
127+
128+
if err == nil {
129+
opts.CompleteCommand(ExitCodeOK)
130+
return nil
131+
}
132+
133+
// Check if all errors are ErrUnsupported
134+
allUnsupported := true
135+
for _, e := range err.(interface{ Unwrap() []error }).Unwrap() {
136+
if !errors.Is(e, errors.ErrUnsupported) {
137+
allUnsupported = false
138+
break
128139
}
140+
fmt.Fprintln(opts.OutputWriter, e)
129141
}
130-
opts.CompleteCommand(ExitCodeOK)
131-
return
142+
143+
if allUnsupported {
144+
opts.CompleteCommand(ExitCodeOK)
145+
return nil
146+
}
147+
opts.CompleteCommand(ExitCodeConfigError)
148+
return err
132149
}

internal/cmdopts/cmdconfig_integration_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cmdopts
22

33
import (
44
"context"
5+
"io"
56
"testing"
67
"time"
78

@@ -29,16 +30,15 @@ func TestConfigUpgrade_VerifyNoCircularDependency(t *testing.T) {
2930
require.NoError(t, err)
3031

3132
ctx := context.Background()
33+
34+
// Test with only metrics and sinks (sources reader doesn't implement Migrator)
3235
opts := &Options{
33-
Metrics: metrics.CmdOpts{Metrics: connStr},
34-
Sources: sources.CmdOpts{Sources: connStr, Refresh: 120, MaxParallelConnectionsPerDb: 1},
35-
Sinks: sinks.CmdOpts{BatchingDelay: time.Second},
36+
Metrics: metrics.CmdOpts{Metrics: connStr},
37+
Sinks: sinks.CmdOpts{Sinks: []string{connStr}, RetentionInterval: "30 days", BatchingDelay: time.Second, PartitionInterval: "1 week", MaintenanceInterval: "12 hours"},
38+
OutputWriter: io.Discard,
3639
}
3740

38-
// This is the key test: config upgrade should work even on empty database
39-
// (or database needing migrations). Before the fix, this would fail with
40-
// circular dependency because InitConfigReaders would fail, preventing
41-
// the upgrade from running
41+
// Config upgrade should work on metrics and sinks
4242
cmd := ConfigUpgradeCommand{owner: opts}
4343
err = cmd.Execute(nil)
4444
assert.NoError(t, err)
@@ -48,7 +48,7 @@ func TestConfigUpgrade_VerifyNoCircularDependency(t *testing.T) {
4848
opts2 := &Options{
4949
Metrics: metrics.CmdOpts{Metrics: connStr},
5050
Sources: sources.CmdOpts{Sources: connStr, Refresh: 120, MaxParallelConnectionsPerDb: 1},
51-
Sinks: sinks.CmdOpts{BatchingDelay: time.Second},
51+
Sinks: sinks.CmdOpts{Sinks: []string{connStr}},
5252
}
5353
err = opts2.InitConfigReaders(ctx)
5454
assert.NoError(t, err)

0 commit comments

Comments
 (0)