Skip to content

Commit 60ab498

Browse files
committed
support for expect_columns schema_check
1 parent fa0831f commit 60ab498

11 files changed

Lines changed: 456 additions & 65 deletions

adapters/clickhouse_adapter.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,45 @@ func NewClickhouseDbqDataSourceAdapter(cnn driver.Conn, logger *slog.Logger) dbq
4343

4444
func (a *ClickhouseDbqDataSourceAdapter) InterpretDataQualityCheck(check *dbqcore.DataQualityCheck, dataset string, whereClause string) (string, error) {
4545
// handle schema checks first
46-
if check.SchemaCheck != nil && check.SchemaCheck.ExpectColumnsOrdered != nil {
46+
if check.SchemaCheck != nil {
4747
database, table, err := extractDatabaseAndTableFromDataset(dataset)
4848
if err != nil {
4949
return "", err
5050
}
5151

52-
expectedColumns := check.SchemaCheck.ExpectColumnsOrdered.ColumnsOrder
53-
columnChecks := make([]string, len(expectedColumns))
54-
for i, col := range expectedColumns {
55-
columnChecks[i] = fmt.Sprintf("(name = '%s' and position = %d)", col, i+1)
52+
if check.SchemaCheck.ExpectColumnsOrdered != nil {
53+
expectedColumns := check.SchemaCheck.ExpectColumnsOrdered.ColumnsOrder
54+
columnChecks := make([]string, len(expectedColumns))
55+
for i, col := range expectedColumns {
56+
columnChecks[i] = fmt.Sprintf("(name = '%s' and position = %d)", col, i+1)
57+
}
58+
59+
// count of matching columns in correct positions
60+
sqlQuery := fmt.Sprintf(`select count()
61+
from system.columns
62+
where database = '%s'
63+
and table = '%s'
64+
and (%s)`, database, table, strings.Join(columnChecks, " or "))
65+
66+
return sqlQuery, nil
5667
}
5768

58-
// count of matching columns in correct positions
59-
sqlQuery := fmt.Sprintf(`select count()
60-
from system.columns
61-
where database = '%s'
62-
and table = '%s'
63-
and (%s)`, database, table, strings.Join(columnChecks, " or "))
69+
if check.SchemaCheck.ExpectColumns != nil {
70+
// Query returns count of matching columns
71+
expectedColumns := check.SchemaCheck.ExpectColumns.Columns
72+
columnChecks := make([]string, len(expectedColumns))
73+
for i, col := range expectedColumns {
74+
columnChecks[i] = fmt.Sprintf("name = '%s'", col)
75+
}
6476

65-
return sqlQuery, nil
77+
sqlQuery := fmt.Sprintf(`select count()
78+
from system.columns
79+
where database = '%s'
80+
and table = '%s'
81+
and (%s)`, database, table, strings.Join(columnChecks, " or "))
82+
83+
return sqlQuery, nil
84+
}
6685
}
6786

6887
if check.ParsedCheck == nil {

adapters/clickhouse_adapter_test.go

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,10 @@ func TestClickhouseAdapter_InterpretDataQualityCheck(t *testing.T) {
207207
dataset: "default.users",
208208
whereClause: "",
209209
expectedSQL: `select count()
210-
from system.columns
211-
where database = 'default'
212-
and table = 'users'
213-
and ((name = 'id' and position = 1) or (name = 'name' and position = 2) or (name = 'email' and position = 3))`,
210+
from system.columns
211+
where database = 'default'
212+
and table = 'users'
213+
and ((name = 'id' and position = 1) or (name = 'name' and position = 2) or (name = 'email' and position = 3))`,
214214
},
215215
{
216216
name: "expect_columns_ordered check with single column",
@@ -225,10 +225,10 @@ func TestClickhouseAdapter_InterpretDataQualityCheck(t *testing.T) {
225225
dataset: "analytics.events",
226226
whereClause: "",
227227
expectedSQL: `select count()
228-
from system.columns
229-
where database = 'analytics'
230-
and table = 'events'
231-
and ((name = 'id' and position = 1))`,
228+
from system.columns
229+
where database = 'analytics'
230+
and table = 'events'
231+
and ((name = 'id' and position = 1))`,
232232
},
233233
{
234234
name: "expect_columns_ordered check invalid dataset format",
@@ -245,6 +245,57 @@ func TestClickhouseAdapter_InterpretDataQualityCheck(t *testing.T) {
245245
expectError: true,
246246
errorMessage: "dataset must be in format database.table",
247247
},
248+
{
249+
name: "expect_columns check with multiple columns",
250+
check: &dbqcore.DataQualityCheck{
251+
Expression: "expect_columns",
252+
SchemaCheck: &dbqcore.SchemaCheckConfig{
253+
ExpectColumns: &dbqcore.ExpectColumnsConfig{
254+
Columns: []string{"event_id", "user_id", "timestamp"},
255+
},
256+
},
257+
},
258+
dataset: "analytics.events",
259+
whereClause: "",
260+
expectedSQL: `select count()
261+
from system.columns
262+
where database = 'analytics'
263+
and table = 'events'
264+
and (name = 'event_id' or name = 'user_id' or name = 'timestamp')`,
265+
},
266+
{
267+
name: "expect_columns check with two columns",
268+
check: &dbqcore.DataQualityCheck{
269+
Expression: "expect_columns",
270+
SchemaCheck: &dbqcore.SchemaCheckConfig{
271+
ExpectColumns: &dbqcore.ExpectColumnsConfig{
272+
Columns: []string{"metric_name", "value"},
273+
},
274+
},
275+
},
276+
dataset: "metrics.timeseries",
277+
whereClause: "",
278+
expectedSQL: `select count()
279+
from system.columns
280+
where database = 'metrics'
281+
and table = 'timeseries'
282+
and (name = 'metric_name' or name = 'value')`,
283+
},
284+
{
285+
name: "expect_columns check invalid dataset format",
286+
check: &dbqcore.DataQualityCheck{
287+
Expression: "expect_columns",
288+
SchemaCheck: &dbqcore.SchemaCheckConfig{
289+
ExpectColumns: &dbqcore.ExpectColumnsConfig{
290+
Columns: []string{"id", "name"},
291+
},
292+
},
293+
},
294+
dataset: "invalid_dataset",
295+
whereClause: "",
296+
expectError: true,
297+
errorMessage: "dataset must be in format database.table",
298+
},
248299
{
249300
name: "unknown function fallback",
250301
check: &dbqcore.DataQualityCheck{

adapters/mysql_adapter.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,45 @@ func NewMysqlDbqDataSourceAdapter(db *sql.DB, logger *slog.Logger) dbqcore.DbqDa
4343

4444
func (a *MysqlDbqDataSourceAdapter) InterpretDataQualityCheck(check *dbqcore.DataQualityCheck, dataset string, whereClause string) (string, error) {
4545
// Handle schema checks first
46-
if check.SchemaCheck != nil && check.SchemaCheck.ExpectColumnsOrdered != nil {
46+
if check.SchemaCheck != nil {
4747
schema, table, err := extractDatabaseAndTableFromDataset(dataset)
4848
if err != nil {
4949
return "", err
5050
}
5151

52-
expectedColumns := check.SchemaCheck.ExpectColumnsOrdered.ColumnsOrder
53-
columnChecks := make([]string, len(expectedColumns))
54-
for i, col := range expectedColumns {
55-
columnChecks[i] = fmt.Sprintf("(column_name = '%s' and ordinal_position = %d)", col, i+1)
52+
if check.SchemaCheck.ExpectColumnsOrdered != nil {
53+
expectedColumns := check.SchemaCheck.ExpectColumnsOrdered.ColumnsOrder
54+
columnChecks := make([]string, len(expectedColumns))
55+
for i, col := range expectedColumns {
56+
columnChecks[i] = fmt.Sprintf("(column_name = '%s' and ordinal_position = %d)", col, i+1)
57+
}
58+
59+
// count of matching columns in correct positions
60+
sqlQuery := fmt.Sprintf(`select count(*)
61+
from information_schema.columns
62+
where table_schema = '%s'
63+
and table_name = '%s'
64+
and (%s)`, schema, table, strings.Join(columnChecks, " or "))
65+
66+
return sqlQuery, nil
5667
}
5768

58-
// count of matching columns in correct positions
59-
sqlQuery := fmt.Sprintf(`select count(*)
60-
from information_schema.columns
61-
where table_schema = '%s'
62-
and table_name = '%s'
63-
and (%s)`, schema, table, strings.Join(columnChecks, " or "))
69+
if check.SchemaCheck.ExpectColumns != nil {
70+
// Query returns count of matching columns
71+
expectedColumns := check.SchemaCheck.ExpectColumns.Columns
72+
columnChecks := make([]string, len(expectedColumns))
73+
for i, col := range expectedColumns {
74+
columnChecks[i] = fmt.Sprintf("column_name = '%s'", col)
75+
}
6476

65-
return sqlQuery, nil
77+
sqlQuery := fmt.Sprintf(`select count(*)
78+
from information_schema.columns
79+
where table_schema = '%s'
80+
and table_name = '%s'
81+
and (%s)`, schema, table, strings.Join(columnChecks, " or "))
82+
83+
return sqlQuery, nil
84+
}
6685
}
6786

6887
if check.ParsedCheck == nil {

adapters/mysql_adapter_test.go

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,10 @@ func TestMySQLAdapter_InterpretDataQualityCheck(t *testing.T) {
207207
dataset: "mydb.users",
208208
whereClause: "",
209209
expectedSQL: `select count(*)
210-
from information_schema.columns
211-
where table_schema = 'mydb'
212-
and table_name = 'users'
213-
and ((column_name = 'id' and ordinal_position = 1) or (column_name = 'name' and ordinal_position = 2) or (column_name = 'email' and ordinal_position = 3))`,
210+
from information_schema.columns
211+
where table_schema = 'mydb'
212+
and table_name = 'users'
213+
and ((column_name = 'id' and ordinal_position = 1) or (column_name = 'name' and ordinal_position = 2) or (column_name = 'email' and ordinal_position = 3))`,
214214
},
215215
{
216216
name: "expect_columns_ordered check with single column",
@@ -225,10 +225,10 @@ func TestMySQLAdapter_InterpretDataQualityCheck(t *testing.T) {
225225
dataset: "testdb.products",
226226
whereClause: "",
227227
expectedSQL: `select count(*)
228-
from information_schema.columns
229-
where table_schema = 'testdb'
230-
and table_name = 'products'
231-
and ((column_name = 'id' and ordinal_position = 1))`,
228+
from information_schema.columns
229+
where table_schema = 'testdb'
230+
and table_name = 'products'
231+
and ((column_name = 'id' and ordinal_position = 1))`,
232232
},
233233
{
234234
name: "expect_columns_ordered check invalid dataset format",
@@ -245,6 +245,57 @@ func TestMySQLAdapter_InterpretDataQualityCheck(t *testing.T) {
245245
expectError: true,
246246
errorMessage: "dataset must be in format database.table",
247247
},
248+
{
249+
name: "expect_columns check with multiple columns",
250+
check: &dbqcore.DataQualityCheck{
251+
Expression: "expect_columns",
252+
SchemaCheck: &dbqcore.SchemaCheckConfig{
253+
ExpectColumns: &dbqcore.ExpectColumnsConfig{
254+
Columns: []string{"order_id", "customer_id", "total_amount"},
255+
},
256+
},
257+
},
258+
dataset: "ecommerce.orders",
259+
whereClause: "",
260+
expectedSQL: `select count(*)
261+
from information_schema.columns
262+
where table_schema = 'ecommerce'
263+
and table_name = 'orders'
264+
and (column_name = 'order_id' or column_name = 'customer_id' or column_name = 'total_amount')`,
265+
},
266+
{
267+
name: "expect_columns check with two columns",
268+
check: &dbqcore.DataQualityCheck{
269+
Expression: "expect_columns",
270+
SchemaCheck: &dbqcore.SchemaCheckConfig{
271+
ExpectColumns: &dbqcore.ExpectColumnsConfig{
272+
Columns: []string{"product_id", "price"},
273+
},
274+
},
275+
},
276+
dataset: "shop.products",
277+
whereClause: "",
278+
expectedSQL: `select count(*)
279+
from information_schema.columns
280+
where table_schema = 'shop'
281+
and table_name = 'products'
282+
and (column_name = 'product_id' or column_name = 'price')`,
283+
},
284+
{
285+
name: "expect_columns check invalid dataset format",
286+
check: &dbqcore.DataQualityCheck{
287+
Expression: "expect_columns",
288+
SchemaCheck: &dbqcore.SchemaCheckConfig{
289+
ExpectColumns: &dbqcore.ExpectColumnsConfig{
290+
Columns: []string{"id", "name"},
291+
},
292+
},
293+
},
294+
dataset: "invalid_dataset",
295+
whereClause: "",
296+
expectError: true,
297+
errorMessage: "dataset must be in format database.table",
298+
},
248299
{
249300
name: "unknown function fallback",
250301
check: &dbqcore.DataQualityCheck{

adapters/postgresql_adapter.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,45 @@ func NewPostgresqlDbqDataSourceAdapter(db *sql.DB, logger *slog.Logger) dbqcore.
4343

4444
func (a *PostgresqlDbqDataSourceAdapter) InterpretDataQualityCheck(check *dbqcore.DataQualityCheck, dataset string, whereClause string) (string, error) {
4545
// Handle schema checks first
46-
if check.SchemaCheck != nil && check.SchemaCheck.ExpectColumnsOrdered != nil {
46+
if check.SchemaCheck != nil {
4747
schema, table, err := extractDatabaseAndTableFromDataset(dataset)
4848
if err != nil {
4949
return "", err
5050
}
5151

52-
expectedColumns := check.SchemaCheck.ExpectColumnsOrdered.ColumnsOrder
53-
columnChecks := make([]string, len(expectedColumns))
54-
for i, col := range expectedColumns {
55-
columnChecks[i] = fmt.Sprintf("(column_name = '%s' and ordinal_position = %d)", col, i+1)
52+
if check.SchemaCheck.ExpectColumnsOrdered != nil {
53+
expectedColumns := check.SchemaCheck.ExpectColumnsOrdered.ColumnsOrder
54+
columnChecks := make([]string, len(expectedColumns))
55+
for i, col := range expectedColumns {
56+
columnChecks[i] = fmt.Sprintf("(column_name = '%s' and ordinal_position = %d)", col, i+1)
57+
}
58+
59+
// Query returns count of matching columns in correct positions
60+
sqlQuery := fmt.Sprintf(`select count(*)
61+
from information_schema.columns
62+
where table_schema = '%s'
63+
and table_name = '%s'
64+
and (%s)`, schema, table, strings.Join(columnChecks, " or "))
65+
66+
return sqlQuery, nil
5667
}
5768

58-
// Query returns count of matching columns in correct positions
59-
sqlQuery := fmt.Sprintf(`select count(*)
60-
from information_schema.columns
61-
where table_schema = '%s'
62-
and table_name = '%s'
63-
and (%s)`, schema, table, strings.Join(columnChecks, " or "))
69+
if check.SchemaCheck.ExpectColumns != nil {
70+
// Query returns count of matching columns
71+
expectedColumns := check.SchemaCheck.ExpectColumns.Columns
72+
columnChecks := make([]string, len(expectedColumns))
73+
for i, col := range expectedColumns {
74+
columnChecks[i] = fmt.Sprintf("column_name = '%s'", col)
75+
}
6476

65-
return sqlQuery, nil
77+
sqlQuery := fmt.Sprintf(`select count(*)
78+
from information_schema.columns
79+
where table_schema = '%s'
80+
and table_name = '%s'
81+
and (%s)`, schema, table, strings.Join(columnChecks, " or "))
82+
83+
return sqlQuery, nil
84+
}
6685
}
6786

6887
if check.ParsedCheck == nil {

0 commit comments

Comments
 (0)