Skip to content

Commit 3c9c9c4

Browse files
committed
basic profiling support
1 parent 2063c6b commit 3c9c9c4

6 files changed

Lines changed: 263 additions & 11 deletions

File tree

cmd/profile.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package cmd
22

33
import (
44
"dbq/internal"
5-
"fmt"
6-
5+
"encoding/json"
76
"github.com/spf13/cobra"
7+
"log"
88
)
99

1010
func NewProfileCommand(app internal.DbqApp) *cobra.Command {
@@ -21,7 +21,39 @@ Cobra is a CLI library for Go that empowers applications.
2121
This application is a tool to generate the needed files
2222
to quickly create a Cobra application.`,
2323
RunE: func(cmd *cobra.Command, args []string) error {
24-
fmt.Printf("profiling %s in %s\n", dataSet, dataSource)
24+
var dataSetsToProfile []string
25+
if dataSet != "" {
26+
dataSetsToProfile = append(dataSetsToProfile, dataSet)
27+
} else {
28+
ds := app.FindDataSourceById(dataSource)
29+
if ds != nil {
30+
for _, curDataSet := range ds.Datasets {
31+
dataSetsToProfile = append(dataSetsToProfile, curDataSet)
32+
}
33+
}
34+
}
35+
36+
profileResults := &internal.ProfileResultOutput{
37+
Profiles: make(map[string]*internal.TableMetrics),
38+
}
39+
40+
for _, curDataSet := range dataSetsToProfile {
41+
metrics, err := app.ProfileDataSourceById(dataSource, curDataSet)
42+
if err != nil {
43+
log.Printf("Failed to profile %s: %s\n", curDataSet, err)
44+
} else {
45+
profileResults.Profiles[curDataSet] = metrics
46+
}
47+
}
48+
49+
jsonData, err := json.Marshal(profileResults)
50+
if err != nil {
51+
log.Fatalf("Failed to marshal metrics to JSON: %v", err)
52+
}
53+
54+
// todo: handle empty tables
55+
log.Println(string(jsonData))
56+
2557
return nil
2658
},
2759
}

dbq.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ datasources:
1010
database: default
1111
datasets:
1212
- default.my_first_table
13-
- default.table_2
1413
- default.trips_small
1514
- id: pgsql
1615
type: postgres

internal/app.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
type DbqApp interface {
1515
PingDataSource(srcId string) error
1616
ImportDatasets(srcId string, filter string) ([]string, error)
17+
ProfileDataSourceById(srcId string, dataset string) (*TableMetrics, error)
1718
GetDbqConfig() *DbqConfig
1819
SaveDbqConfig() error
1920
FindDataSourceById(srcId string) *DataSource
@@ -56,7 +57,16 @@ func (app *DbqAppImpl) ImportDatasets(srcId string, filter string) ([]string, er
5657
return []string{}, err
5758
}
5859

59-
return cnn.ImportDatasets(filter)
60+
return cnn.ImportDataSets(filter)
61+
}
62+
63+
func (app *DbqAppImpl) ProfileDataSourceById(srcId string, dataset string) (*TableMetrics, error) {
64+
var dataSource = app.FindDataSourceById(srcId)
65+
cnn, err := getDbqConnector(*dataSource)
66+
if err != nil {
67+
return nil, err
68+
}
69+
return cnn.ProfileDataSet(dataset)
6070
}
6171

6272
func (app *DbqAppImpl) GetDbqConfig() *DbqConfig {

internal/clickhouse.go

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ package internal
22

33
import (
44
"context"
5+
"database/sql"
56
"fmt"
67
"github.com/ClickHouse/clickhouse-go/v2"
78
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
9+
"log"
810
"strings"
11+
"time"
912
)
1013

1114
type ClickhouseDbqConnector struct {
@@ -34,7 +37,7 @@ func (c *ClickhouseDbqConnector) Ping() error {
3437
return c.cnn.Ping(context.Background())
3538
}
3639

37-
func (c *ClickhouseDbqConnector) ImportDatasets(filter string) ([]string, error) {
40+
func (c *ClickhouseDbqConnector) ImportDataSets(filter string) ([]string, error) {
3841
if c.cnn == nil {
3942
return nil, fmt.Errorf("database connection is not initialized")
4043
}
@@ -73,3 +76,177 @@ func (c *ClickhouseDbqConnector) ImportDatasets(filter string) ([]string, error)
7376

7477
return datasets, nil
7578
}
79+
80+
// ProfileDataSet todo: optimize queries
81+
func (c *ClickhouseDbqConnector) ProfileDataSet(dataSet string) (*TableMetrics, error) {
82+
startTime := time.Now()
83+
ctx := context.Background()
84+
85+
var databaseName, tableName string
86+
parts := strings.SplitN(dataSet, ".", 2)
87+
if len(parts) == 2 {
88+
databaseName = strings.TrimSpace(parts[0])
89+
tableName = strings.TrimSpace(parts[1])
90+
}
91+
92+
log.Printf("Calculating metrics for table: %s", dataSet)
93+
94+
metrics := &TableMetrics{
95+
ProfiledAt: time.Now().Unix(),
96+
TableName: tableName,
97+
DatabaseName: databaseName,
98+
Columns: make(map[string]*ColumnMetrics),
99+
}
100+
101+
// Total Row Count
102+
log.Printf("Fetching total row count...")
103+
err := c.cnn.QueryRow(ctx, fmt.Sprintf("SELECT count() FROM %s", dataSet)).Scan(&metrics.TotalRows)
104+
if err != nil {
105+
return nil, fmt.Errorf("failed to get total row count for %s: %w", dataSet, err)
106+
}
107+
log.Printf("Total rows: %d", metrics.TotalRows)
108+
109+
// Get Column Information (Name and Type)
110+
log.Printf("Fetching column information...")
111+
columnQuery := `
112+
SELECT name, type
113+
FROM system.columns
114+
WHERE database = ? AND table = ?
115+
ORDER BY position`
116+
rows, err := c.cnn.Query(ctx, columnQuery, databaseName, tableName)
117+
if err != nil {
118+
return nil, fmt.Errorf("failed to query system.columns for %s.%s: %w", databaseName, tableName, err)
119+
}
120+
defer rows.Close()
121+
122+
var columnsToProcess []struct {
123+
Name string
124+
Type string
125+
}
126+
for rows.Next() {
127+
var colName, colType string
128+
if err := rows.Scan(&colName, &colType); err != nil {
129+
return nil, fmt.Errorf("failed to scan column info: %w", err)
130+
}
131+
columnsToProcess = append(columnsToProcess, struct {
132+
Name string
133+
Type string
134+
}{Name: colName, Type: colType})
135+
}
136+
if err = rows.Err(); err != nil {
137+
return nil, fmt.Errorf("error iterating column info rows: %w", err)
138+
}
139+
rows.Close()
140+
141+
if len(columnsToProcess) == 0 {
142+
log.Printf("Warning: No columns found for table %s. Returning basic info.", dataSet)
143+
metrics.ProfilingDurationMs = time.Since(startTime).Milliseconds()
144+
return metrics, nil
145+
}
146+
147+
log.Printf("Found %d columns to process.", len(columnsToProcess))
148+
149+
// Calculate Metrics per Column
150+
for _, col := range columnsToProcess {
151+
colStartTime := time.Now()
152+
log.Printf("Processing column: %s (Type: %s)", col.Name, col.Type)
153+
colMetrics := &ColumnMetrics{
154+
ColumnName: col.Name,
155+
DataType: col.Type,
156+
}
157+
158+
// a) Null Count (all types)
159+
nullQuery := fmt.Sprintf("SELECT count() FROM %s WHERE %s IS NULL", dataSet, col.Name)
160+
err = c.cnn.QueryRow(ctx, nullQuery).Scan(&colMetrics.NullCount)
161+
if err != nil {
162+
// Log error but continue if possible, maybe column type doesn't support NULL checks easily?
163+
log.Printf("Warning: Failed to get NULL count for column %s: %v", col.Name, err)
164+
}
165+
166+
// b) Blank Count (String types only)
167+
if isStringCHType(col.Type) {
168+
blankQuery := fmt.Sprintf("SELECT count() FROM %s WHERE empty(%s)", dataSet, col.Name)
169+
// Alternative: SELECT countIf(%s = '') FROM %s
170+
var blankCount uint64
171+
err = c.cnn.QueryRow(ctx, blankQuery).Scan(&blankCount)
172+
if err != nil {
173+
log.Printf("Warning: Failed to get blank count for string column %s: %v", col.Name, err)
174+
colMetrics.BlankCount = sql.NullInt64{Valid: false}
175+
} else {
176+
colMetrics.BlankCount = sql.NullInt64{Int64: int64(blankCount), Valid: true}
177+
}
178+
}
179+
180+
// c) Numeric Metrics (Numeric types only)
181+
if isNumericCHType(col.Type) {
182+
// Use Nullable aggregates to handle cases where all values are NULL or table is empty
183+
// Use toFloat64 to ensure results are float64 for consistency, handle potential overflows if needed
184+
numericQuery := fmt.Sprintf(`
185+
SELECT
186+
min(%s),
187+
max(%s),
188+
avg(%s),
189+
stddevPop(%s)
190+
FROM %s`, col.Name, col.Name, col.Name, col.Name, dataSet)
191+
192+
err = c.cnn.QueryRow(ctx, numericQuery).Scan(
193+
&colMetrics.MinValue,
194+
&colMetrics.MaxValue,
195+
&colMetrics.AvgValue,
196+
&colMetrics.StddevValue,
197+
)
198+
199+
if err != nil {
200+
log.Printf("Warning: Failed to get numeric aggregates for column %s: %v", col.Name, err)
201+
// invalidate potentially partially scanned results
202+
colMetrics.MinValue = sql.NullFloat64{Valid: false}
203+
colMetrics.MaxValue = sql.NullFloat64{Valid: false}
204+
colMetrics.AvgValue = sql.NullFloat64{Valid: false}
205+
colMetrics.StddevValue = sql.NullFloat64{Valid: false}
206+
}
207+
}
208+
209+
// d) Most Frequent Value (all types - using topK)
210+
// topK(1) returns an array, we need to extract the first element if it exists.
211+
// It handles NULL correctly. CAST to String for consistent retrieval.
212+
// Note: If the most frequent value is NULL, it should be represented correctly by sql.NullString
213+
mfvQuery := fmt.Sprintf("SELECT CAST(arrayElement(topK(1)(%s), 1), 'Nullable(String)') FROM %s", col.Name, dataSet)
214+
err = c.cnn.QueryRow(ctx, mfvQuery).Scan(&colMetrics.MostFrequentValue)
215+
if err != nil {
216+
// This can happen if the table is empty or all values are NULL
217+
if strings.Contains(err.Error(), "empty result") || strings.Contains(err.Error(), "Illegal type") {
218+
log.Printf("Info: No most frequent value found or calculable for column %s (possibly empty or all NULLs).", col.Name)
219+
colMetrics.MostFrequentValue = sql.NullString{Valid: false} // Ensure it's marked invalid
220+
} else {
221+
log.Printf("Warning: Failed to get most frequent value for column %s: %v", col.Name, err)
222+
colMetrics.MostFrequentValue = sql.NullString{Valid: false} // Mark as invalid on other errors too
223+
}
224+
}
225+
226+
metrics.Columns[col.Name] = colMetrics
227+
log.Printf("Finished column: %s in %s", col.Name, time.Since(colStartTime))
228+
}
229+
230+
metrics.ProfilingDurationMs = time.Since(startTime).Milliseconds()
231+
log.Printf("Finished calculating all metrics for %s in %d ms", dataSet, metrics.ProfilingDurationMs)
232+
233+
return metrics, nil
234+
}
235+
236+
// isNumericCHType checks if a ClickHouse data type string represents a numeric type
237+
// that supports standard aggregate functions like min, max, avg, stddev
238+
func isNumericCHType(dataType string) bool {
239+
// Basic check, might need additional refinement
240+
dataType = strings.ToLower(dataType)
241+
return strings.HasPrefix(dataType, "int") ||
242+
strings.HasPrefix(dataType, "uint") ||
243+
strings.HasPrefix(dataType, "float") ||
244+
strings.HasPrefix(dataType, "decimal")
245+
}
246+
247+
// isStringCHType checks if a ClickHouse data type is a string type
248+
func isStringCHType(dataType string) bool {
249+
dataType = strings.ToLower(dataType)
250+
return strings.HasPrefix(dataType, "string") ||
251+
strings.HasPrefix(dataType, "fixedstring")
252+
}

internal/dbq_connector.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,36 @@
11
package internal
22

3+
import (
4+
"database/sql"
5+
)
6+
37
type DbqConnector interface {
48
Ping() error
5-
ImportDatasets(filter string) ([]string, error)
9+
ImportDataSets(filter string) ([]string, error)
10+
ProfileDataSet(dataSet string) (*TableMetrics, error)
11+
}
12+
13+
type ColumnMetrics struct {
14+
ColumnName string `json:"column_name"`
15+
DataType string `json:"data_type"`
16+
NullCount uint64 `json:"null_count"`
17+
BlankCount sql.NullInt64 `json:"blank_count,omitempty"` // Applicable only for String types
18+
MinValue sql.NullFloat64 `json:"min_value,omitempty"` // Numeric only
19+
MaxValue sql.NullFloat64 `json:"max_value,omitempty"` // Numeric only
20+
AvgValue sql.NullFloat64 `json:"avg_value,omitempty"` // Numeric only
21+
StddevValue sql.NullFloat64 `json:"stddev_value,omitempty"` // Numeric only (Population StdDev)
22+
MostFrequentValue sql.NullString `json:"most_frequent_value,omitempty"` // Using NullString to handle NULL as most frequent
23+
}
24+
25+
type TableMetrics struct {
26+
ProfiledAt int64 `json:"profiled_at"`
27+
TableName string `json:"table_name"`
28+
DatabaseName string `json:"database_name"`
29+
TotalRows uint64 `json:"total_rows"`
30+
Columns map[string]*ColumnMetrics `json:"columns"`
31+
ProfilingDurationMs int64 `json:"profiling_duration_ms"`
32+
}
33+
34+
type ProfileResultOutput struct {
35+
Profiles map[string]*TableMetrics `json:"profiles"`
636
}

readme.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@ It is designed to be easy to use and integrate into your existing workflow.
1515
- [ ] config parser
1616
- [ ] clickhouse support
1717
- [x] ping
18-
- [ ] import datasets
19-
- [ ] profile dataset
18+
- [x] import datasets
19+
- [x] profile dataset
20+
- [x] rows in table
21+
- [x] min, max, avg, stddev for numeric columns
22+
- [x] count of nulls and blanks
23+
- [x] most frequent value in column
2024
- [ ] run checks
2125
- [ ] implement support for custom sql check
22-
- [ ] implement aliases for common checks based on raw sql check
23-
- [ ] explore and auto import data sets
26+
- [ ] implement aliases for common checks based on raw sql check
2427
- [ ] basic cross validation (dataset is defined)
2528
- [ ] fix cmd descriptions
2629
- [ ] review todos
@@ -30,6 +33,7 @@ It is designed to be easy to use and integrate into your existing workflow.
3033
- config validation
3134
- add postgres support
3235
- CLI for adding more checks
36+
- schema changes checks
3337

3438

3539
---

0 commit comments

Comments
 (0)