-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathaggregated_values.go
More file actions
125 lines (105 loc) · 2.46 KB
/
aggregated_values.go
File metadata and controls
125 lines (105 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package data
import (
"context"
"database/sql"
"fmt"
)
type AggregatedValuesKey struct{}
type AggregatedValues = OverallWeeklyData[Value]
type WeeklyValueData = WeeklyData[Value]
type AggregatedValuesQuery = Query[AggregatedValuesKey]
func (AggregatedValuesKey) Execute(
ctx context.Context,
db *DB,
q AggregatedValuesQuery,
org, repo string,
weeks []string,
team *int64,
) (any, error) {
return db.GetAggregatedValues(ctx, q, org, repo, weeks, team)
}
func (d DB) GetAggregatedValues(
ctx context.Context,
query AggregatedValuesQuery,
namespace string,
repository string,
weeks []string,
team *int64,
) (*AggregatedValues, error) {
queryParamLength := len(weeks)
queryParams := make([]any, queryParamLength)
for i, v := range weeks {
queryParams[i] = v
}
queryParams = append(queryParams, namespace)
queryParams = append(queryParams, repository)
if team != nil {
queryParams = append(queryParams, team)
}
rows, err := d.db.QueryContext(ctx, query.Get(), queryParams...)
if err != nil {
return nil, err
}
defer rows.Close()
result, err := scanAggregatedValuesRows(rows, weeks)
if err != nil {
return nil, err
}
return result, nil
}
func scanAggregatedValuesRows(rows *sql.Rows, weeks []string) (*AggregatedValues, error) {
datasetByWeek := make(map[string]WeeklyValueData)
var aggregated Value
nullWeeksCount := 0
for rows.Next() {
var nullableWeek sql.NullString
var data Value
if err := rows.Scan(
&nullableWeek,
&data.Value,
); err != nil {
return nil, err
}
if nullableWeek.Valid {
datasetByWeek[nullableWeek.String] = WeeklyValueData{
Week: nullableWeek.String,
Data: data,
}
} else {
aggregated = data
nullWeeksCount++
}
if nullWeeksCount > 1 {
return nil, fmt.Errorf("ScanAggregatedValuesRows found more than one aggregate row")
}
}
if err := rows.Err(); err != nil {
return nil, err
}
var weeklies []WeeklyValueData
for _, week := range weeks {
dataPoint, ok := datasetByWeek[week]
if !ok {
dataPoint = WeeklyValueData{
Week: week,
Data: Value{
Value: nil,
},
}
}
weeklies = append(weeklies, dataPoint)
}
return &AggregatedValues{
Overall: aggregated,
Weekly: weeklies,
}, nil
}
func buildQueryAggregatedValues(baseQuery string) AggregatedValuesQuery {
return AggregatedValuesQuery{value: fmt.Sprintf(`
WITH dataset AS (%s)
SELECT NULL as week, SUM(value) AS value FROM dataset
UNION ALL
SELECT week, value FROM dataset;`,
baseQuery,
)}
}