Skip to content

Commit 72a8e99

Browse files
committed
basic checks support
1 parent a118e43 commit 72a8e99

9 files changed

Lines changed: 259 additions & 52 deletions

File tree

checks.yaml

Lines changed: 76 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,84 @@
11
version: "1"
22
validations:
3-
- dataset: clickhouse-prod-euw@[ads.my_table_1]
4-
where: "timestamp > '2025-01-01'"
3+
# todo: support arrays of data sets [ ... ]
4+
- dataset: ch-local@nyc_taxi.trips_small
5+
where: "pickup_datetime > '2014-01-01'"
56
checks:
6-
- id: row count between 0 and 100
7+
- id: row_count > 0
8+
description: "data is present" # optional
9+
severity: error # optional (error, warn, info), default "error"
10+
11+
- id: null_count(pickup_ntaname) == 0
12+
description: "no nulls in column" # optional
713
severity: error
8-
type: row_count
9-
params:
10-
# filter: "timestamp > '2023-01-01'"
11-
min: 0
12-
max: 100
13-
14-
- id: "no null values"
15-
description: "Check if table_1 has the correct number of columns" # optional
14+
15+
- id: min(pickup_datetime) < now() - interval 3 day
16+
description: "min check"
1617
severity: error
17-
type: custom
18-
params:
19-
query: SELECT COUNT(*) FROM ${table} WHERE column_name IS NULL
20-
expected: 0
2118

22-
- id: "no duplicates"
23-
description: "Check if table_1 has no duplicates"
19+
# todo: support "between X and Y"
20+
- id: stddevPop(trip_distance) < 100_000
21+
description: "check stddev value"
2422
severity: error
25-
type: no_duplicates
26-
params:
27-
columns: ["column_1", "column_2"]
28-
# filter: "timestamp > '2023-01-01'"
2923

30-
- dataset: pgsql-staging@[public.table_1, public.table_2]
31-
checks:
32-
- id: "row count between 0 and 100"
33-
severity: warn
34-
type: row_count
35-
params:
36-
min: 0
37-
max: 1000
24+
- id: sum(fare_amount) <= 10_000_000
25+
description: "sum of value"
26+
severity: error
27+
28+
- id: raw_query
29+
description: "some raw query description here"
30+
severity: error
31+
query: |
32+
select countIf(trip_distance == 0) > 0 from {{table}}
33+
34+
# - dataset: pgsql-staging@[public.table_1, public.table_2]
35+
# checks:
36+
# - id: "row count between 0 and 100"
37+
# severity: warn
38+
# type: row_count
39+
# params:
40+
# min: 0
41+
# max: 1000
42+
43+
# v1 supported functions:
44+
# ---
45+
# row_count > 10
46+
# null_count(col) == 0
47+
# avg(col) <= 24.2
48+
# max(col) < 1000
49+
# min(col) == 0
50+
# sum(col) > 0
51+
# stddev(col) between 1 and 100_000_000
52+
# custom
53+
54+
# AI anomaly detection
55+
56+
# empties / blanks
57+
# invalid count with filter
58+
# duplicates
59+
# avg_length
60+
# cross
61+
# distribution
62+
# duplicate_count
63+
# duplicate_percent
64+
# failed rows
65+
# freshness
66+
# group by
67+
# group evolution
68+
# invalid_count
69+
# invalid_percent
70+
# max_length
71+
# min_length
72+
# missing_count
73+
# missing_percent
74+
# percentile
75+
# reconciliation
76+
# reference
77+
# schema
78+
# schema evolution
79+
# stddev_pop
80+
# stddev_samp
81+
# user-defined
82+
# variance
83+
# var_pop
84+
# var_samp

cmd/check.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package cmd
22

33
import (
44
"dbq/internal"
5-
"fmt"
5+
"log"
6+
"strings"
67

78
"github.com/spf13/cobra"
89
)
910

1011
func NewCheckCommand(app internal.DbqApp) *cobra.Command {
1112
var checksFile string
12-
var dataSource string
1313

1414
cmd := &cobra.Command{
1515
Use: "check",
@@ -20,15 +20,36 @@ which outlines the rules and constraints that the data within the dataset should
2020
By automating these checks, you can proactively identify and address data quality issues, ensuring that your datasets meet the required standards for analysis and decision-making.
2121
`,
2222
RunE: func(cmd *cobra.Command, args []string) error {
23-
fmt.Println("Reading checks from " + checksFile)
24-
if dataSource != "" {
25-
fmt.Println("Data source is not empty: " + dataSource)
23+
log.Printf("Reading checks configuration file: %s \n", checksFile)
24+
25+
checksCfg, err := internal.LoadChecksConfig(checksFile)
26+
if err != nil {
27+
log.Printf("Failed to read checks configuration: %s", err.Error())
2628
}
29+
30+
for i, ruleSet := range checksCfg.Validations {
31+
log.Printf("Running check for %s [%d/%d]", ruleSet.Dataset, i+1, len(checksCfg.Validations))
32+
33+
// todo: validation
34+
parts := strings.Split(ruleSet.Dataset, "@")
35+
dataSourceId := parts[0]
36+
dataSet := parts[1] // todo: parse list
37+
38+
dataSource := app.FindDataSourceById(dataSourceId)
39+
40+
for _, check := range ruleSet.Checks {
41+
_, err := app.RunCheck(&check, dataSource, dataSet, ruleSet.Where)
42+
if err != nil {
43+
log.Printf("Failed to run check: %s", err.Error())
44+
}
45+
// todo: act on check result
46+
}
47+
}
48+
2749
return nil
2850
},
2951
}
3052

31-
cmd.Flags().StringVarP(&dataSource, "datasource", "d", "", "Datasource")
3253
cmd.Flags().StringVarP(&checksFile, "checks", "c", "", "Validation checks")
3354
_ = cmd.MarkFlagRequired("checks")
3455

dbq.yaml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ datasources:
99
password: changeme
1010
database: default
1111
datasets:
12-
- default.my_first_table
13-
- default.trips_small
14-
- uk.uk_price_paid
12+
- nyc_taxi.trips_small
1513
- id: pgsql
1614
type: postgres
1715
configuration:

internal/app.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type DbqApp interface {
1717
GetDbqConfig() *DbqConfig
1818
SaveDbqConfig() error
1919
FindDataSourceById(srcId string) *DataSource
20+
RunCheck(check *Check, dataSource *DataSource, dataSet string, defaultWhere string) (string, error)
2021
}
2122

2223
type DbqAppImpl struct {
@@ -94,6 +95,14 @@ func (app *DbqAppImpl) FindDataSourceById(srcId string) *DataSource {
9495
return nil
9596
}
9697

98+
func (app *DbqAppImpl) RunCheck(check *Check, dataSource *DataSource, dataSet string, defaultWhere string) (string, error) {
99+
cnn, err := getDbqConnector(*dataSource)
100+
if err != nil {
101+
return "", err
102+
}
103+
return cnn.RunCheck(check, dataSet, defaultWhere)
104+
}
105+
97106
func initConfig(dbqConfigPath string) *DbqConfig {
98107
v := viper.New()
99108

internal/checks_config.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,17 @@ type ChecksConfig struct {
1111
Validations []Validation `yaml:"validations"`
1212
}
1313

14-
type ConfigDetails struct {
15-
Host string `yaml:"host"`
16-
Port int `yaml:"port"`
17-
Username string `yaml:"username"`
18-
Password string `yaml:"password"`
19-
Database string `yaml:"database,omitempty"`
20-
}
21-
2214
type Validation struct {
2315
Dataset string `yaml:"dataset"`
16+
Where string `yaml:"where,omitempty"` // Optional where clause
2417
Checks []Check `yaml:"checks"`
2518
}
2619

2720
type Check struct {
28-
ID string `yaml:"id"`
29-
Description string `yaml:"description,omitempty"`
30-
Severity string `yaml:"severity"`
31-
Type string `yaml:"type"`
32-
Params map[string]interface{} `yaml:"params"`
21+
ID string `yaml:"id"`
22+
Description string `yaml:"description,omitempty"` // Optional
23+
Severity string `yaml:"severity,omitempty"` // Optional (error, warn, info)
24+
Query string `yaml:"query,omitempty"` // Optional raw query
3325
}
3426

3527
func LoadChecksConfig(fileName string) (*ChecksConfig, error) {

internal/clickhouse.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/ClickHouse/clickhouse-go/v2"
88
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
99
"log"
10+
"regexp"
1011
"strings"
1112
"time"
1213
)
@@ -243,6 +244,116 @@ func (c *ClickhouseDbqConnector) ProfileDataSet(dataSet string) (*TableMetrics,
243244
return metrics, nil
244245
}
245246

247+
func (c *ClickhouseDbqConnector) RunCheck(check *Check, dataSet string, defaultWhere string) (string, error) {
248+
if c.cnn == nil {
249+
return "", fmt.Errorf("database connection is not initialized")
250+
}
251+
252+
query, err := generateDataCheckQuery(check, dataSet, defaultWhere)
253+
if err != nil {
254+
return "", fmt.Errorf("failed to generate SQL for check %s (%s): %s", check.ID, dataSet, err.Error())
255+
}
256+
257+
log.Printf("Executing SQL for (%s): %s", check.ID, query)
258+
259+
startTime := time.Now()
260+
rows, err := c.cnn.Query(context.Background(), query)
261+
if err != nil {
262+
return "", fmt.Errorf("failed to query database: %w", err)
263+
}
264+
defer rows.Close()
265+
elapsed := time.Since(startTime).Milliseconds()
266+
267+
for rows.Next() {
268+
var checkPassed bool
269+
if err := rows.Scan(&checkPassed); err != nil {
270+
return "", fmt.Errorf("failed to scan row: %w", err)
271+
}
272+
273+
log.Printf("Result is: %t (%d ms)", checkPassed, elapsed)
274+
log.Printf("---")
275+
}
276+
277+
if err = rows.Err(); err != nil {
278+
return "", fmt.Errorf("error occurred during row iteration: %w", err)
279+
}
280+
281+
return "", nil
282+
}
283+
284+
func generateDataCheckQuery(check *Check, dataSet string, whereClause string) (string, error) {
285+
var sqlQuery string
286+
287+
// handle raw_query first
288+
if check.ID == CheckTypeRawQuery {
289+
if check.Query == "" {
290+
return "", fmt.Errorf("check with id 'raw_query' requires a 'query' field")
291+
}
292+
sqlQuery = strings.ReplaceAll(check.Query, "{{table}}", dataSet)
293+
294+
if whereClause != "" {
295+
// todo: more sophisticated check might be needed
296+
if strings.Contains(strings.ToLower(sqlQuery), " where ") {
297+
sqlQuery = fmt.Sprintf("%s and (%s)", sqlQuery, whereClause)
298+
} else {
299+
sqlQuery = fmt.Sprintf("%s where %s", sqlQuery, whereClause)
300+
}
301+
}
302+
303+
return sqlQuery, nil
304+
}
305+
306+
isAggFunction := startWithAnyOf([]string{
307+
"min", "max", "avg", "stddevPop", "sum",
308+
}, check.ID)
309+
310+
var checkExpression string
311+
switch {
312+
case strings.HasPrefix(check.ID, "row_count"):
313+
// format "row_count <operator> <value>"
314+
parts := strings.Fields(check.ID)
315+
if len(parts) != 3 {
316+
return "", fmt.Errorf("invalid format for row_count check: %s", check.ID)
317+
}
318+
checkExpression = fmt.Sprintf("count() %s %s", parts[1], parts[2])
319+
320+
case strings.HasPrefix(check.ID, "null_count"):
321+
// format "null_count(<column_name>) <operator> <value>"
322+
re := regexp.MustCompile(`null_count\((.*?)\)\s*(==|!=|>|<|>=|<=)\s*(\d+)`)
323+
matches := re.FindStringSubmatch(check.ID)
324+
if len(matches) != 4 {
325+
return "", fmt.Errorf("invalid format for null_count check: %s", check.ID)
326+
}
327+
328+
column := matches[1]
329+
operator := matches[2]
330+
value := matches[3]
331+
checkExpression = fmt.Sprintf("countIf(%s IS NULL) %s %s", column, operator, value)
332+
333+
case isAggFunction:
334+
// format: <func>(<column_name>) <operator> <value>
335+
re := regexp.MustCompile(`^(min|max|avg|stddevPop|sum)\(([^)]+)\)\s+(==|>=|<=|>|<)\s+(.*)$`)
336+
matches := re.FindStringSubmatch(check.ID)
337+
if len(matches) < 4 {
338+
return "", fmt.Errorf("invalid format for aggregation function check: %s", check.ID)
339+
}
340+
checkExpression = fmt.Sprintf("%s", matches[0])
341+
342+
default:
343+
// Assume the ID itself is a valid boolean expression if no specific pattern matches
344+
// This is less robust but covers simple cases.
345+
log.Printf("Warning: Check ID '%s' did not match known patterns. Assuming it's a direct SQL boolean expression.", check.ID)
346+
checkExpression = check.ID
347+
}
348+
349+
sqlQuery = fmt.Sprintf("select %s from %s", checkExpression, dataSet)
350+
if whereClause != "" {
351+
sqlQuery = fmt.Sprintf("%s where %s", sqlQuery, whereClause)
352+
}
353+
354+
return sqlQuery, nil
355+
}
356+
246357
// isNumericCHType checks if a ClickHouse data type string represents a numeric type
247358
// that supports standard aggregate functions like min, max, avg, stddev
248359
func isNumericCHType(dataType string) bool {
@@ -260,3 +371,12 @@ func isStringCHType(dataType string) bool {
260371
return strings.HasPrefix(dataType, "string") ||
261372
strings.HasPrefix(dataType, "fixedstring")
262373
}
374+
375+
func startWithAnyOf(prefixes []string, s string) bool {
376+
for _, prefix := range prefixes {
377+
if strings.HasPrefix(s, prefix) {
378+
return true
379+
}
380+
}
381+
return false
382+
}

internal/dbq_config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ type DataSource struct {
1818
Datasets []string `yaml:"datasets"`
1919
}
2020

21+
type ConfigDetails struct {
22+
Host string `yaml:"host"`
23+
Port int `yaml:"port"`
24+
Username string `yaml:"username"`
25+
Password string `yaml:"password"`
26+
Database string `yaml:"database,omitempty"`
27+
}
28+
2129
func LoadDbqSetting(fileName string) (*DbqConfig, error) {
2230
file, err := os.Open(fileName)
2331
defer file.Close()

0 commit comments

Comments
 (0)