Skip to content

Commit b8ed011

Browse files
committed
basic datasets import for clickhouse
1 parent b621a68 commit b8ed011

9 files changed

Lines changed: 103 additions & 84 deletions

File tree

cmd/check.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ to quickly create a Cobra application.`,
2525
if dataSource != "" {
2626
fmt.Println("Data source is not empty: " + dataSource)
2727
}
28-
app.GetDbqConfig()
2928
return nil
3029
},
3130
}

cmd/import.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package cmd
22

33
import (
44
"dbq/internal"
5-
"fmt"
5+
"log"
66

77
"github.com/spf13/cobra"
88
)
@@ -24,16 +24,21 @@ to quickly create a Cobra application.`,
2424
RunE: func(cmd *cobra.Command, args []string) error {
2525
datasets, err := app.ImportDatasets(dataSource, filter)
2626
if err != nil {
27-
fmt.Println("Failed to fetch datasets: " + err.Error())
27+
log.Println("Failed to fetch datasets: " + err.Error())
2828
return nil
2929
}
3030

31-
for _, dataset := range datasets {
32-
fmt.Println("Imported dataset: ", dataset)
33-
}
34-
31+
log.Printf("Found %d datasets to import: %v\n", len(datasets), datasets)
3532
if updateCfg {
36-
fmt.Println("Updating dbq config...")
33+
ds := app.FindDataSourceById(dataSource)
34+
if ds != nil {
35+
ds.Datasets = datasets
36+
err := app.SaveDbqConfig()
37+
if err != nil {
38+
return err
39+
}
40+
log.Println("dbq config has been updated")
41+
}
3742
}
3843

3944
return nil

cmd/ping.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ to quickly create a Cobra application.`,
2929
}
3030

3131
cmd.Flags().StringVarP(&dataSource, "datasource", "d", "", "Datasource")
32-
//_ = cmd.MarkFlagRequired("dataSource")
3332

3433
return cmd
3534
}

dbq.yaml

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
1-
version: 1
1+
version: "1"
22
datasources:
3-
- id: clickhouse-prod
4-
type: clickhouse
5-
configuration:
6-
host: 0.0.0.0:19000
7-
port: 19000
8-
username: default
9-
password: changeme
10-
database: default
11-
datasets:
12-
- default.my_table_1
13-
- default.my_table_2
14-
- ads.my_table_1
15-
- kpi.some_table
16-
- id: pgsql-staging
17-
type: postgres
18-
configuration:
19-
host: 127.0.0.1
20-
port: 9004
21-
username: simple
22-
password: simple_pass
23-
datasets:
24-
- public.table_1
3+
- id: ch-local
4+
type: clickhouse
5+
configuration:
6+
host: 0.0.0.0:19000
7+
port: 19000
8+
username: default
9+
password: changeme
10+
database: default
11+
datasets:
12+
- default.my_first_table
13+
- default.table_2
14+
- default.trips_small
15+
- id: pgsql
16+
type: postgres
17+
configuration:
18+
host: 127.0.0.1
19+
port: 9004
20+
username: simple
21+
password: simple_pass
22+
datasets:
23+
- public.table_1

internal/app.go

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,65 @@ package internal
33
import (
44
"errors"
55
"fmt"
6+
"github.com/spf13/cobra"
7+
"github.com/spf13/viper"
8+
"gopkg.in/yaml.v3"
9+
"log"
10+
"os"
611
"strings"
712
)
813

914
type DbqApp interface {
1015
PingDataSource(srcId string) error
1116
ImportDatasets(srcId string, filter string) ([]string, error)
1217
GetDbqConfig() *DbqConfig
18+
SaveDbqConfig() error
19+
FindDataSourceById(srcId string) *DataSource
1320
}
1421

1522
type DbqAppImpl struct {
16-
dbqConfig *DbqConfig
23+
dbqConfigPath string
24+
dbqConfig *DbqConfig
1725
}
1826

19-
func NewDbqApp(dbqConfig *DbqConfig) DbqApp {
20-
return &DbqAppImpl{dbqConfig: dbqConfig}
27+
func NewDbqApp(dbqConfigPath string) DbqApp {
28+
v := viper.New()
29+
30+
if dbqConfigPath != "" {
31+
v.SetConfigFile(dbqConfigPath)
32+
} else {
33+
home, err := os.UserHomeDir()
34+
cobra.CheckErr(err)
35+
v.AddConfigPath(home)
36+
v.SetConfigType("yaml")
37+
v.SetConfigName(".dbq.yaml")
38+
}
39+
40+
v.AutomaticEnv()
41+
if err := v.ReadInConfig(); err != nil {
42+
cobra.CheckErr(err)
43+
}
44+
45+
var dbqConfig DbqConfig
46+
if err := v.Unmarshal(&dbqConfig); err != nil {
47+
cobra.CheckErr(err)
48+
}
49+
50+
return &DbqAppImpl{
51+
dbqConfigPath: dbqConfigPath,
52+
dbqConfig: &dbqConfig,
53+
}
2154
}
2255

2356
func (app *DbqAppImpl) PingDataSource(srcId string) error {
24-
var dataSource = findDataSourceById(srcId, app.dbqConfig.DataSources)
57+
var dataSource = app.FindDataSourceById(srcId)
2558

2659
cnn, err := getDbqConnector(*dataSource)
2760
if err != nil {
2861
return err
2962
}
3063

31-
fmt.Println("Pinging datasource: " + dataSource.ID)
64+
log.Println("Pinging datasource: " + dataSource.ID)
3265
err = cnn.Ping()
3366
if err != nil {
3467
return err
@@ -38,7 +71,7 @@ func (app *DbqAppImpl) PingDataSource(srcId string) error {
3871
}
3972

4073
func (app *DbqAppImpl) ImportDatasets(srcId string, filter string) ([]string, error) {
41-
var dataSource = findDataSourceById(srcId, app.dbqConfig.DataSources)
74+
var dataSource = app.FindDataSourceById(srcId)
4275
cnn, err := getDbqConnector(*dataSource)
4376
if err != nil {
4477
return []string{}, err
@@ -51,6 +84,29 @@ func (app *DbqAppImpl) GetDbqConfig() *DbqConfig {
5184
return app.dbqConfig
5285
}
5386

87+
func (app *DbqAppImpl) SaveDbqConfig() error {
88+
updatedYaml, err := yaml.Marshal(app.dbqConfig)
89+
if err != nil {
90+
return err
91+
}
92+
93+
err = os.WriteFile(app.dbqConfigPath, updatedYaml, 0644)
94+
if err != nil {
95+
return err
96+
}
97+
98+
return nil
99+
}
100+
101+
func (app *DbqAppImpl) FindDataSourceById(srcId string) *DataSource {
102+
for i := range app.dbqConfig.DataSources {
103+
if app.dbqConfig.DataSources[i].ID == srcId {
104+
return &app.dbqConfig.DataSources[i]
105+
}
106+
}
107+
return nil
108+
}
109+
54110
func getDbqConnector(ds DataSource) (DbqConnector, error) {
55111
dsType := strings.ToLower(ds.Type)
56112
switch dsType {
@@ -60,12 +116,3 @@ func getDbqConnector(ds DataSource) (DbqConnector, error) {
60116
return nil, errors.New(fmt.Sprintf("Data source type '%s' is not supported.", dsType))
61117
}
62118
}
63-
64-
func findDataSourceById(srcId string, dataSources []DataSource) *DataSource {
65-
for _, src := range dataSources {
66-
if src.ID == srcId {
67-
return &src
68-
}
69-
}
70-
return nil
71-
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package internal
22

33
import (
4-
"fmt"
54
"gopkg.in/yaml.v3"
5+
"log"
66
"os"
77
)
88

@@ -36,14 +36,14 @@ func LoadChecksConfig(fileName string) (*ChecksConfig, error) {
3636
file, err := os.Open(fileName)
3737
defer file.Close()
3838
if err != nil {
39-
fmt.Printf("Error opening file: %v\n", err)
39+
log.Printf("Error opening file: %v\n", err)
4040
return nil, err
4141
}
4242

4343
var cfg ChecksConfig
4444
decoder := yaml.NewDecoder(file)
4545
if err := decoder.Decode(&cfg); err != nil {
46-
fmt.Printf("Error decoding YAML: %v\n", err)
46+
log.Printf("Error decoding YAML: %v\n", err)
4747
return nil, err
4848
}
4949

internal/clickhouse.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"github.com/ClickHouse/clickhouse-go/v2"
77
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
8-
"log"
98
)
109

1110
type ClickhouseDbqConnector struct {
@@ -63,7 +62,6 @@ func (c *ClickhouseDbqConnector) ImportDatasets() ([]string, error) {
6362
return nil, fmt.Errorf("error occurred during row iteration: %w", err)
6463
}
6564

66-
log.Printf("Found %d datasets (tables).\n", len(datasets))
6765
return datasets, nil
6866
}
6967

internal/dbq_config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package internal
22

33
import (
4-
"fmt"
54
"gopkg.in/yaml.v3"
5+
"log"
66
"os"
77
)
88

@@ -22,14 +22,14 @@ func LoadDbqSetting(fileName string) (*DbqConfig, error) {
2222
file, err := os.Open(fileName)
2323
defer file.Close()
2424
if err != nil {
25-
fmt.Printf("Error opening file: %v\n", err)
25+
log.Printf("Error opening file: %v\n", err)
2626
return nil, err
2727
}
2828

2929
var settings DbqConfig
3030
decoder := yaml.NewDecoder(file)
3131
if err := decoder.Decode(&settings); err != nil {
32-
fmt.Printf("Error decoding YAML: %v\n", err)
32+
log.Printf("Error decoding YAML: %v\n", err)
3333
return nil, err
3434
}
3535

main.go

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"dbq/internal"
99
"github.com/spf13/cobra"
1010
"github.com/spf13/pflag"
11-
"github.com/spf13/viper"
1211
"os"
1312
)
1413

@@ -21,35 +20,8 @@ func main() {
2120
cobra.CheckErr(err)
2221
}
2322

24-
dbqConfig := initSettings(*dbqConfigFile)
25-
app := internal.NewDbqApp(dbqConfig)
23+
app := internal.NewDbqApp(*dbqConfigFile)
2624

2725
cmd.AddCommands(app)
2826
cmd.Execute()
2927
}
30-
31-
func initSettings(dbqConfigPath string) *internal.DbqConfig {
32-
v := viper.New()
33-
34-
if dbqConfigPath != "" {
35-
v.SetConfigFile(dbqConfigPath)
36-
} else {
37-
home, err := os.UserHomeDir()
38-
cobra.CheckErr(err)
39-
v.AddConfigPath(home)
40-
v.SetConfigType("yaml")
41-
v.SetConfigName(".dbq.yaml")
42-
}
43-
44-
v.AutomaticEnv()
45-
if err := v.ReadInConfig(); err != nil {
46-
cobra.CheckErr(err)
47-
}
48-
49-
var settings internal.DbqConfig
50-
if err := v.Unmarshal(&settings); err != nil {
51-
cobra.CheckErr(err)
52-
}
53-
54-
return &settings
55-
}

0 commit comments

Comments
 (0)