-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathContinuousAggregateOperationGenerator.cs
More file actions
233 lines (200 loc) · 9.47 KB
/
ContinuousAggregateOperationGenerator.cs
File metadata and controls
233 lines (200 loc) · 9.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
using CmdScale.EntityFrameworkCore.TimescaleDB.Operations;
using System.Text;
namespace CmdScale.EntityFrameworkCore.TimescaleDB.Generators
{
public class ContinuousAggregateOperationGenerator
{
private readonly string quoteString = "\"";
private readonly SqlBuilderHelper sqlHelper;
public ContinuousAggregateOperationGenerator(bool isDesignTime = false)
{
if (isDesignTime)
{
quoteString = "\"\"";
}
sqlHelper = new SqlBuilderHelper(quoteString);
}
public List<string> Generate(CreateContinuousAggregateOperation operation)
{
string qualifiedIdentifier = sqlHelper.QualifiedIdentifier(operation.MaterializedViewName, operation.Schema);
string parentQualifiedIdentifier = sqlHelper.QualifiedIdentifier(operation.ParentName, operation.Schema);
List<string> statements = [];
// Build WITH options
List<string> withOptions =
[
"timescaledb.continuous",
$"timescaledb.create_group_indexes = {operation.CreateGroupIndexes.ToString().ToLower()}",
$"timescaledb.materialized_only = {operation.MaterializedOnly.ToString().ToLower()}"
];
// Add optional chunk_interval if specified
if (!string.IsNullOrEmpty(operation.ChunkInterval))
{
withOptions.Add($"timescaledb.chunk_interval = '{operation.ChunkInterval}'");
}
// Build the SELECT list
List<string> selectList = [];
// Add time_bucket column
string timeBucketColumn = $"{quoteString}{operation.TimeBucketSourceColumn}{quoteString}";
string timeBucketWidthSql = $"'{operation.TimeBucketWidth}'";
selectList.Add($"time_bucket({timeBucketWidthSql}, {timeBucketColumn}) AS time_bucket");
// Add GROUP BY columns to SELECT (only actual columns, not SQL expressions)
foreach (string groupByColumn in operation.GroupByColumns)
{
// Check if it's a raw SQL expression or a column name
bool isRawSqlExpression = groupByColumn.Contains(',') || groupByColumn.Contains('(') || groupByColumn.Contains(' ');
if (!isRawSqlExpression)
{
selectList.Add($"{quoteString}{groupByColumn}{quoteString}");
}
}
// Build aggregate functions
foreach (string aggInfo in operation.AggregateFunctions)
{
string[] parts = aggInfo.Split(':');
if (parts.Length != 3)
{
// Skip malformed string
continue;
}
string alias = parts[0];
string functionEnumString = parts[1];
string sourceColumn = parts[2];
string sqlFunction = GetSqlAggregateFunction(functionEnumString);
string quotedSourceColumn = $"{quoteString}{sourceColumn}{quoteString}";
string quotedAlias = $"{quoteString}{alias}{quoteString}";
string aggregateExpression;
// Handle special TimescaleDB aggregates 'first' and 'last'
// which require (value_column, time_column)
if (sqlFunction == "first" || sqlFunction == "last")
{
aggregateExpression = $"{sqlFunction}({quotedSourceColumn}, {timeBucketColumn})";
}
else
{
aggregateExpression = $"{sqlFunction}({quotedSourceColumn})";
}
selectList.Add($"{aggregateExpression} AS {quotedAlias}");
}
// Build the GROUP BY list
List<string> groupByList = [];
if (operation.TimeBucketGroupBy)
{
groupByList.Add("time_bucket");
}
// Add group by columns
foreach (string groupByColumn in operation.GroupByColumns)
{
if (groupByColumn.Contains(',') || groupByColumn.Contains('(') || groupByColumn.Contains(' '))
{
// It's a raw SQL expression, use as-is
groupByList.Add(groupByColumn);
}
else
{
// It's a column name, quote it
groupByList.Add($"{quoteString}{groupByColumn}{quoteString}");
}
}
// Build the complete CREATE MATERIALIZED VIEW statement as a single string
StringBuilder sqlBuilder = new();
sqlBuilder.Append($"CREATE MATERIALIZED VIEW {qualifiedIdentifier}");
sqlBuilder.AppendLine();
sqlBuilder.Append($"WITH ({string.Join(", ", withOptions)}) AS");
sqlBuilder.AppendLine();
sqlBuilder.Append($"SELECT {string.Join(", ", selectList)}");
sqlBuilder.AppendLine();
sqlBuilder.Append($"FROM {parentQualifiedIdentifier}");
// Add WHERE clause if specified
if (!string.IsNullOrWhiteSpace(operation.WhereClause))
{
string whereClause = operation.WhereClause.Replace("\"", quoteString);
sqlBuilder.AppendLine();
sqlBuilder.Append($"WHERE {whereClause}");
}
// Add GROUP BY clause
if (groupByList.Count > 0)
{
sqlBuilder.AppendLine();
sqlBuilder.Append($"GROUP BY {string.Join(", ", groupByList)}");
}
// Add WITH [NO] DATA
if (operation.WithNoData)
{
sqlBuilder.AppendLine();
sqlBuilder.Append("WITH NO DATA");
}
sqlBuilder.Append(';');
statements.Add(sqlBuilder.ToString());
return statements;
}
public List<string> Generate(AlterContinuousAggregateOperation operation)
{
string qualifiedIdentifier = sqlHelper.QualifiedIdentifier(operation.MaterializedViewName, operation.Schema);
List<string> statements = [];
// Check for ChunkInterval change
// Note: TimescaleDB continuous aggregates only support SET for chunk_interval, not RESET
if (operation.ChunkInterval != operation.OldChunkInterval)
{
// Only generate SQL if we have a valid new value to set
// We cannot RESET chunk_interval as TimescaleDB doesn't support it
if (!string.IsNullOrEmpty(operation.ChunkInterval))
{
string chunkIntervalSql = $"'{operation.ChunkInterval}'";
statements.Add($"ALTER MATERIALIZED VIEW {qualifiedIdentifier} SET (timescaledb.chunk_interval = {chunkIntervalSql});");
}
else if (!string.IsNullOrEmpty(operation.OldChunkInterval))
{
// Special case: If new value is null/empty but old value exists,
// restore the old value instead of trying to RESET (which is unsupported)
string chunkIntervalSql = $"'{operation.OldChunkInterval}'";
statements.Add($"ALTER MATERIALIZED VIEW {qualifiedIdentifier} SET (timescaledb.chunk_interval = {chunkIntervalSql});");
}
}
// Check for CreateGroupIndexes change
if (operation.CreateGroupIndexes != operation.OldCreateGroupIndexes)
{
string createGroupIndexesValue = operation.CreateGroupIndexes.ToString().ToLower();
statements.Add($"ALTER MATERIALIZED VIEW {qualifiedIdentifier} SET (timescaledb.create_group_indexes = {createGroupIndexesValue});");
}
// Check for MaterializedOnly change
if (operation.MaterializedOnly != operation.OldMaterializedOnly)
{
string materializedOnlyValue = operation.MaterializedOnly.ToString().ToLower();
statements.Add($"ALTER MATERIALIZED VIEW {qualifiedIdentifier} SET (timescaledb.materialized_only = {materializedOnlyValue});");
}
return statements;
}
public List<string> Generate(DropContinuousAggregateOperation operation)
{
string qualifiedIdentifier = sqlHelper.QualifiedIdentifier(operation.MaterializedViewName, operation.Schema);
List<string> statements = [];
statements.Add($"DROP MATERIALIZED VIEW IF EXISTS {qualifiedIdentifier};");
return statements;
}
/// <summary>
/// Translates the string representation of EAggregateFunction into a SQL function.
/// </summary>
private static string GetSqlAggregateFunction(string functionEnumString)
{
switch (functionEnumString)
{
case "Avg":
return "AVG";
case "Max":
return "MAX";
case "Min":
return "MIN";
case "Sum":
return "SUM";
case "Count":
return "COUNT";
case "First":
return "first";
case "Last":
return "last";
default:
throw new NotSupportedException($"The aggregate function '{functionEnumString}' is not supported by the generator.");
}
}
}
}