Skip to content

Commit fa0831f

Browse files
committed
basic support for schema_checks
1 parent e21b2ee commit fa0831f

11 files changed

Lines changed: 531 additions & 12 deletions

adapters/adapters_utils.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package adapters
22

33
import (
44
"fmt"
5+
"strings"
56

67
"github.com/DataBridgeTech/dbqcore"
78
)
@@ -22,3 +23,11 @@ func createMockParsedCheck(functionName string, parameters []string, operator st
2223
ThresholdValue: thresholdValue,
2324
}
2425
}
26+
27+
func extractDatabaseAndTableFromDataset(dataset string) (string, string, error) {
28+
parts := strings.Split(dataset, ".")
29+
if len(parts) != 2 {
30+
return "", "", fmt.Errorf("dataset must be in format database.table")
31+
}
32+
return parts[0], parts[1], nil
33+
}

adapters/clickhouse_adapter.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,29 @@ func NewClickhouseDbqDataSourceAdapter(cnn driver.Conn, logger *slog.Logger) dbq
4242
}
4343

4444
func (a *ClickhouseDbqDataSourceAdapter) InterpretDataQualityCheck(check *dbqcore.DataQualityCheck, dataset string, whereClause string) (string, error) {
45+
// handle schema checks first
46+
if check.SchemaCheck != nil && check.SchemaCheck.ExpectColumnsOrdered != nil {
47+
database, table, err := extractDatabaseAndTableFromDataset(dataset)
48+
if err != nil {
49+
return "", err
50+
}
51+
52+
expectedColumns := check.SchemaCheck.ExpectColumnsOrdered.ColumnsOrder
53+
columnChecks := make([]string, len(expectedColumns))
54+
for i, col := range expectedColumns {
55+
columnChecks[i] = fmt.Sprintf("(name = '%s' and position = %d)", col, i+1)
56+
}
57+
58+
// count of matching columns in correct positions
59+
sqlQuery := fmt.Sprintf(`select count()
60+
from system.columns
61+
where database = '%s'
62+
and table = '%s'
63+
and (%s)`, database, table, strings.Join(columnChecks, " or "))
64+
65+
return sqlQuery, nil
66+
}
67+
4568
if check.ParsedCheck == nil {
4669
return "", fmt.Errorf("check does not have parsed structure")
4770
}

adapters/clickhouse_adapter_test.go

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,45 +142,45 @@ func TestClickhouseAdapter_InterpretDataQualityCheck(t *testing.T) {
142142
name: "raw_query check",
143143
check: &dbqcore.DataQualityCheck{
144144
Expression: "raw_query",
145-
Query: "SELECT count(*) FROM {{dataset}} WHERE status = 'active'",
145+
Query: "select count(*) from {{dataset}} where status = 'active'",
146146
ParsedCheck: createMockParsedCheck("raw_query", []string{}, "", nil),
147147
},
148148
dataset: "users",
149149
whereClause: "",
150-
expectedSQL: "SELECT count(*) FROM users WHERE status = 'active'",
150+
expectedSQL: "select count(*) from users where status = 'active'",
151151
},
152152
{
153153
name: "raw_query check with where clause",
154154
check: &dbqcore.DataQualityCheck{
155155
Expression: "raw_query",
156-
Query: "SELECT count(*) FROM {{dataset}}",
156+
Query: "select count(*) from {{dataset}}",
157157
ParsedCheck: createMockParsedCheck("raw_query", []string{}, "", nil),
158158
},
159159
dataset: "products",
160160
whereClause: "category = 'electronics'",
161-
expectedSQL: "SELECT count(*) FROM products where category = 'electronics'",
161+
expectedSQL: "select count(*) from products where category = 'electronics'",
162162
},
163163
{
164164
name: "raw_query check with existing where clause",
165165
check: &dbqcore.DataQualityCheck{
166166
Expression: "raw_query",
167-
Query: "SELECT AVG(price) FROM {{dataset}} WHERE category = 'books'",
167+
Query: "select avg(price) from {{dataset}} where category = 'books'",
168168
ParsedCheck: createMockParsedCheck("raw_query", []string{}, "", nil),
169169
},
170170
dataset: "products",
171171
whereClause: "active = 1",
172-
expectedSQL: "SELECT AVG(price) FROM products WHERE category = 'books' and (active = 1)",
172+
expectedSQL: "select avg(price) from products where category = 'books' and (active = 1)",
173173
},
174174
{
175175
name: "raw_query check with multiline query",
176176
check: &dbqcore.DataQualityCheck{
177177
Expression: "raw_query",
178-
Query: "SELECT count(*)\nFROM {{dataset}}\nWHERE active = 1",
178+
Query: "select count(*)\nfrom {{dataset}}\nwhere active = 1",
179179
ParsedCheck: createMockParsedCheck("raw_query", []string{}, "", nil),
180180
},
181181
dataset: "orders",
182182
whereClause: "",
183-
expectedSQL: "SELECT count(*) FROM orders WHERE active = 1",
183+
expectedSQL: "select count(*) from orders where active = 1",
184184
},
185185
{
186186
name: "raw_query check missing query field",
@@ -194,6 +194,57 @@ func TestClickhouseAdapter_InterpretDataQualityCheck(t *testing.T) {
194194
expectError: true,
195195
errorMessage: "raw_query check requires a 'query' field",
196196
},
197+
{
198+
name: "expect_columns_ordered check",
199+
check: &dbqcore.DataQualityCheck{
200+
Expression: "expect_columns_ordered",
201+
SchemaCheck: &dbqcore.SchemaCheckConfig{
202+
ExpectColumnsOrdered: &dbqcore.ExpectColumnsOrderedConfig{
203+
ColumnsOrder: []string{"id", "name", "email"},
204+
},
205+
},
206+
},
207+
dataset: "default.users",
208+
whereClause: "",
209+
expectedSQL: `select count()
210+
from system.columns
211+
where database = 'default'
212+
and table = 'users'
213+
and ((name = 'id' and position = 1) or (name = 'name' and position = 2) or (name = 'email' and position = 3))`,
214+
},
215+
{
216+
name: "expect_columns_ordered check with single column",
217+
check: &dbqcore.DataQualityCheck{
218+
Expression: "expect_columns_ordered",
219+
SchemaCheck: &dbqcore.SchemaCheckConfig{
220+
ExpectColumnsOrdered: &dbqcore.ExpectColumnsOrderedConfig{
221+
ColumnsOrder: []string{"id"},
222+
},
223+
},
224+
},
225+
dataset: "analytics.events",
226+
whereClause: "",
227+
expectedSQL: `select count()
228+
from system.columns
229+
where database = 'analytics'
230+
and table = 'events'
231+
and ((name = 'id' and position = 1))`,
232+
},
233+
{
234+
name: "expect_columns_ordered check invalid dataset format",
235+
check: &dbqcore.DataQualityCheck{
236+
Expression: "expect_columns_ordered",
237+
SchemaCheck: &dbqcore.SchemaCheckConfig{
238+
ExpectColumnsOrdered: &dbqcore.ExpectColumnsOrderedConfig{
239+
ColumnsOrder: []string{"id", "name"},
240+
},
241+
},
242+
},
243+
dataset: "invalid_dataset",
244+
whereClause: "",
245+
expectError: true,
246+
errorMessage: "dataset must be in format database.table",
247+
},
197248
{
198249
name: "unknown function fallback",
199250
check: &dbqcore.DataQualityCheck{

adapters/mysql_adapter.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,29 @@ func NewMysqlDbqDataSourceAdapter(db *sql.DB, logger *slog.Logger) dbqcore.DbqDa
4242
}
4343

4444
func (a *MysqlDbqDataSourceAdapter) InterpretDataQualityCheck(check *dbqcore.DataQualityCheck, dataset string, whereClause string) (string, error) {
45+
// Handle schema checks first
46+
if check.SchemaCheck != nil && check.SchemaCheck.ExpectColumnsOrdered != nil {
47+
schema, table, err := extractDatabaseAndTableFromDataset(dataset)
48+
if err != nil {
49+
return "", err
50+
}
51+
52+
expectedColumns := check.SchemaCheck.ExpectColumnsOrdered.ColumnsOrder
53+
columnChecks := make([]string, len(expectedColumns))
54+
for i, col := range expectedColumns {
55+
columnChecks[i] = fmt.Sprintf("(column_name = '%s' and ordinal_position = %d)", col, i+1)
56+
}
57+
58+
// count of matching columns in correct positions
59+
sqlQuery := fmt.Sprintf(`select count(*)
60+
from information_schema.columns
61+
where table_schema = '%s'
62+
and table_name = '%s'
63+
and (%s)`, schema, table, strings.Join(columnChecks, " or "))
64+
65+
return sqlQuery, nil
66+
}
67+
4568
if check.ParsedCheck == nil {
4669
return "", fmt.Errorf("check does not have parsed structure")
4770
}

adapters/mysql_adapter_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,57 @@ func TestMySQLAdapter_InterpretDataQualityCheck(t *testing.T) {
194194
expectError: true,
195195
errorMessage: "raw_query check requires a 'query' field",
196196
},
197+
{
198+
name: "expect_columns_ordered check",
199+
check: &dbqcore.DataQualityCheck{
200+
Expression: "expect_columns_ordered",
201+
SchemaCheck: &dbqcore.SchemaCheckConfig{
202+
ExpectColumnsOrdered: &dbqcore.ExpectColumnsOrderedConfig{
203+
ColumnsOrder: []string{"id", "name", "email"},
204+
},
205+
},
206+
},
207+
dataset: "mydb.users",
208+
whereClause: "",
209+
expectedSQL: `select count(*)
210+
from information_schema.columns
211+
where table_schema = 'mydb'
212+
and table_name = 'users'
213+
and ((column_name = 'id' and ordinal_position = 1) or (column_name = 'name' and ordinal_position = 2) or (column_name = 'email' and ordinal_position = 3))`,
214+
},
215+
{
216+
name: "expect_columns_ordered check with single column",
217+
check: &dbqcore.DataQualityCheck{
218+
Expression: "expect_columns_ordered",
219+
SchemaCheck: &dbqcore.SchemaCheckConfig{
220+
ExpectColumnsOrdered: &dbqcore.ExpectColumnsOrderedConfig{
221+
ColumnsOrder: []string{"id"},
222+
},
223+
},
224+
},
225+
dataset: "testdb.products",
226+
whereClause: "",
227+
expectedSQL: `select count(*)
228+
from information_schema.columns
229+
where table_schema = 'testdb'
230+
and table_name = 'products'
231+
and ((column_name = 'id' and ordinal_position = 1))`,
232+
},
233+
{
234+
name: "expect_columns_ordered check invalid dataset format",
235+
check: &dbqcore.DataQualityCheck{
236+
Expression: "expect_columns_ordered",
237+
SchemaCheck: &dbqcore.SchemaCheckConfig{
238+
ExpectColumnsOrdered: &dbqcore.ExpectColumnsOrderedConfig{
239+
ColumnsOrder: []string{"id", "name"},
240+
},
241+
},
242+
},
243+
dataset: "invalid_dataset",
244+
whereClause: "",
245+
expectError: true,
246+
errorMessage: "dataset must be in format database.table",
247+
},
197248
{
198249
name: "unknown function fallback",
199250
check: &dbqcore.DataQualityCheck{

adapters/postgresql_adapter.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,29 @@ func NewPostgresqlDbqDataSourceAdapter(db *sql.DB, logger *slog.Logger) dbqcore.
4242
}
4343

4444
func (a *PostgresqlDbqDataSourceAdapter) InterpretDataQualityCheck(check *dbqcore.DataQualityCheck, dataset string, whereClause string) (string, error) {
45+
// Handle schema checks first
46+
if check.SchemaCheck != nil && check.SchemaCheck.ExpectColumnsOrdered != nil {
47+
schema, table, err := extractDatabaseAndTableFromDataset(dataset)
48+
if err != nil {
49+
return "", err
50+
}
51+
52+
expectedColumns := check.SchemaCheck.ExpectColumnsOrdered.ColumnsOrder
53+
columnChecks := make([]string, len(expectedColumns))
54+
for i, col := range expectedColumns {
55+
columnChecks[i] = fmt.Sprintf("(column_name = '%s' and ordinal_position = %d)", col, i+1)
56+
}
57+
58+
// Query returns count of matching columns in correct positions
59+
sqlQuery := fmt.Sprintf(`select count(*)
60+
from information_schema.columns
61+
where table_schema = '%s'
62+
and table_name = '%s'
63+
and (%s)`, schema, table, strings.Join(columnChecks, " or "))
64+
65+
return sqlQuery, nil
66+
}
67+
4568
if check.ParsedCheck == nil {
4669
return "", fmt.Errorf("check does not have parsed structure")
4770
}

adapters/postgresql_adapter_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,57 @@ func TestPostgreSQLAdapter_InterpretDataQualityCheck(t *testing.T) {
194194
expectError: true,
195195
errorMessage: "raw_query check requires a 'query' field",
196196
},
197+
{
198+
name: "expect_columns_ordered check",
199+
check: &dbqcore.DataQualityCheck{
200+
Expression: "expect_columns_ordered",
201+
SchemaCheck: &dbqcore.SchemaCheckConfig{
202+
ExpectColumnsOrdered: &dbqcore.ExpectColumnsOrderedConfig{
203+
ColumnsOrder: []string{"id", "name", "email"},
204+
},
205+
},
206+
},
207+
dataset: "public.users",
208+
whereClause: "",
209+
expectedSQL: `select count(*)
210+
from information_schema.columns
211+
where table_schema = 'public'
212+
and table_name = 'users'
213+
and ((column_name = 'id' and ordinal_position = 1) or (column_name = 'name' and ordinal_position = 2) or (column_name = 'email' and ordinal_position = 3))`,
214+
},
215+
{
216+
name: "expect_columns_ordered check with single column",
217+
check: &dbqcore.DataQualityCheck{
218+
Expression: "expect_columns_ordered",
219+
SchemaCheck: &dbqcore.SchemaCheckConfig{
220+
ExpectColumnsOrdered: &dbqcore.ExpectColumnsOrderedConfig{
221+
ColumnsOrder: []string{"id"},
222+
},
223+
},
224+
},
225+
dataset: "mydb.products",
226+
whereClause: "",
227+
expectedSQL: `select count(*)
228+
from information_schema.columns
229+
where table_schema = 'mydb'
230+
and table_name = 'products'
231+
and ((column_name = 'id' and ordinal_position = 1))`,
232+
},
233+
{
234+
name: "expect_columns_ordered check invalid dataset format",
235+
check: &dbqcore.DataQualityCheck{
236+
Expression: "expect_columns_ordered",
237+
SchemaCheck: &dbqcore.SchemaCheckConfig{
238+
ExpectColumnsOrdered: &dbqcore.ExpectColumnsOrderedConfig{
239+
ColumnsOrder: []string{"id", "name"},
240+
},
241+
},
242+
},
243+
dataset: "invalid_dataset",
244+
whereClause: "",
245+
expectError: true,
246+
errorMessage: "dataset must be in format database.table",
247+
},
197248
{
198249
name: "unknown function fallback",
199250
check: &dbqcore.DataQualityCheck{

check_parser.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ var (
5959
}
6060

6161
// reserved for schema checks
62-
schemaScopeFunctions = map[string]bool{}
62+
schemaScopeFunctions = map[string]bool{
63+
// Schema checks are now handled via schema_check config format
64+
}
6365
)
6466

6567
func ParseCheckExpression(expression string) (*CheckExpression, error) {

0 commit comments

Comments
 (0)