Skip to content

Commit 2aac8d6

Browse files
Implements the support for continuous aggregate policies
1 parent 720eae1 commit 2aac8d6

19 files changed

Lines changed: 1333 additions & 141 deletions
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
using CmdScale.EntityFrameworkCore.TimescaleDB.Configuration.ContinuousAggregatePolicy;
2+
using Microsoft.EntityFrameworkCore.Scaffolding.Metadata;
3+
using static CmdScale.EntityFrameworkCore.TimescaleDB.Design.Scaffolding.ContinuousAggregatePolicyScaffoldingExtractor;
4+
5+
namespace CmdScale.EntityFrameworkCore.TimescaleDB.Design.Scaffolding
6+
{
7+
/// <summary>
8+
/// Applies continuous aggregate policy annotations to scaffolded database views.
9+
/// </summary>
10+
public sealed class ContinuousAggregatePolicyAnnotationApplier : IAnnotationApplier
11+
{
12+
public void ApplyAnnotations(DatabaseTable table, object featureInfo)
13+
{
14+
if (featureInfo is not ContinuousAggregatePolicyInfo info)
15+
{
16+
throw new ArgumentException($"Expected {nameof(ContinuousAggregatePolicyInfo)}, got {featureInfo.GetType().Name}", nameof(featureInfo));
17+
}
18+
19+
// Mark that this continuous aggregate has a refresh policy
20+
table[ContinuousAggregatePolicyAnnotations.HasRefreshPolicy] = true;
21+
22+
// Apply start_offset and end_offset
23+
if (!string.IsNullOrWhiteSpace(info.StartOffset))
24+
{
25+
table[ContinuousAggregatePolicyAnnotations.StartOffset] = info.StartOffset;
26+
}
27+
28+
if (!string.IsNullOrWhiteSpace(info.EndOffset))
29+
{
30+
table[ContinuousAggregatePolicyAnnotations.EndOffset] = info.EndOffset;
31+
}
32+
33+
// Apply schedule_interval
34+
if (!string.IsNullOrWhiteSpace(info.ScheduleInterval))
35+
{
36+
table[ContinuousAggregatePolicyAnnotations.ScheduleInterval] = info.ScheduleInterval;
37+
}
38+
39+
// Apply timezone
40+
if (!string.IsNullOrWhiteSpace(info.Timezone))
41+
{
42+
table[ContinuousAggregatePolicyAnnotations.Timezone] = info.Timezone;
43+
}
44+
45+
// Apply initial_start
46+
if (info.InitialStart.HasValue)
47+
{
48+
table[ContinuousAggregatePolicyAnnotations.InitialStart] = info.InitialStart.Value;
49+
}
50+
51+
// Apply include_tiered_data (only if not null - it's an optional parameter)
52+
if (info.IncludeTieredData.HasValue)
53+
{
54+
table[ContinuousAggregatePolicyAnnotations.IncludeTieredData] = info.IncludeTieredData.Value;
55+
}
56+
57+
// Apply buckets_per_batch (only if different from default value of 1)
58+
if (info.BucketsPerBatch.HasValue && info.BucketsPerBatch.Value != 1)
59+
{
60+
table[ContinuousAggregatePolicyAnnotations.BucketsPerBatch] = info.BucketsPerBatch.Value;
61+
}
62+
63+
// Apply max_batches_per_execution (only if different from default value of 0)
64+
if (info.MaxBatchesPerExecution.HasValue && info.MaxBatchesPerExecution.Value != 0)
65+
{
66+
table[ContinuousAggregatePolicyAnnotations.MaxBatchesPerExecution] = info.MaxBatchesPerExecution.Value;
67+
}
68+
69+
// Apply refresh_newest_first (only if different from default value of true)
70+
if (info.RefreshNewestFirst.HasValue && !info.RefreshNewestFirst.Value)
71+
{
72+
table[ContinuousAggregatePolicyAnnotations.RefreshNewestFirst] = info.RefreshNewestFirst.Value;
73+
}
74+
}
75+
}
76+
}
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
using System.Data;
2+
using System.Data.Common;
3+
using System.Text.Json;
4+
5+
namespace CmdScale.EntityFrameworkCore.TimescaleDB.Design.Scaffolding
6+
{
7+
/// <summary>
8+
/// Extracts continuous aggregate policy metadata from a TimescaleDB database for scaffolding.
9+
/// </summary>
10+
public sealed class ContinuousAggregatePolicyScaffoldingExtractor : ITimescaleFeatureExtractor
11+
{
12+
public sealed record ContinuousAggregatePolicyInfo(
13+
string? StartOffset,
14+
string? EndOffset,
15+
string? ScheduleInterval,
16+
string? Timezone,
17+
DateTime? InitialStart,
18+
bool? IncludeTieredData,
19+
int? BucketsPerBatch,
20+
int? MaxBatchesPerExecution,
21+
bool? RefreshNewestFirst
22+
);
23+
24+
public Dictionary<(string Schema, string TableName), object> Extract(DbConnection connection)
25+
{
26+
bool wasOpen = connection.State == ConnectionState.Open;
27+
if (!wasOpen)
28+
{
29+
connection.Open();
30+
}
31+
32+
try
33+
{
34+
Dictionary<(string, string), ContinuousAggregatePolicyInfo> policies = [];
35+
36+
using (DbCommand command = connection.CreateCommand())
37+
{
38+
// Query continuous aggregate policies from TimescaleDB jobs table
39+
// The config column contains JSONB with start_offset, end_offset, and other policy parameters
40+
command.CommandText = @"
41+
SELECT
42+
ca.view_schema,
43+
ca.view_name,
44+
j.config,
45+
j.schedule_interval::text,
46+
j.initial_start,
47+
j.timezone
48+
FROM timescaledb_information.jobs j
49+
INNER JOIN timescaledb_information.continuous_aggregates ca
50+
ON j.hypertable_schema = ca.materialization_hypertable_schema
51+
AND j.hypertable_name = ca.materialization_hypertable_name
52+
WHERE j.proc_name = 'policy_refresh_continuous_aggregate';";
53+
54+
using DbDataReader reader = command.ExecuteReader();
55+
while (reader.Read())
56+
{
57+
string viewSchema = reader.GetString(0);
58+
string viewName = reader.GetString(1);
59+
string? configJson = reader.IsDBNull(2) ? null : reader.GetString(2);
60+
string? scheduleInterval = reader.IsDBNull(3) ? null : reader.GetString(3);
61+
DateTime? initialStart = reader.IsDBNull(4) ? null : reader.GetDateTime(4);
62+
string? timezone = reader.IsDBNull(5) ? null : reader.GetString(5);
63+
64+
// Parse the JSONB config to extract policy parameters
65+
string? startOffset = null;
66+
string? endOffset = null;
67+
bool? includeTieredData = null;
68+
int? bucketsPerBatch = null;
69+
int? maxBatchesPerExecution = null;
70+
bool? refreshNewestFirst = null;
71+
72+
if (!string.IsNullOrWhiteSpace(configJson))
73+
{
74+
using JsonDocument doc = JsonDocument.Parse(configJson);
75+
JsonElement root = doc.RootElement;
76+
77+
// Extract start_offset
78+
if (root.TryGetProperty("start_offset", out JsonElement startOffsetElement))
79+
{
80+
startOffset = ParseIntervalOrInteger(startOffsetElement);
81+
}
82+
83+
// Extract end_offset
84+
if (root.TryGetProperty("end_offset", out JsonElement endOffsetElement))
85+
{
86+
endOffset = ParseIntervalOrInteger(endOffsetElement);
87+
}
88+
89+
// Extract include_tiered_data (optional)
90+
if (root.TryGetProperty("include_tiered_data", out JsonElement includeTieredDataElement)
91+
&& (includeTieredDataElement.ValueKind == JsonValueKind.True || includeTieredDataElement.ValueKind == JsonValueKind.False))
92+
{
93+
includeTieredData = includeTieredDataElement.GetBoolean();
94+
}
95+
96+
// Extract buckets_per_batch (optional, defaults to 1)
97+
if (root.TryGetProperty("buckets_per_batch", out JsonElement bucketsPerBatchElement)
98+
&& bucketsPerBatchElement.ValueKind == JsonValueKind.Number)
99+
{
100+
bucketsPerBatch = bucketsPerBatchElement.GetInt32();
101+
}
102+
103+
// Extract max_batches_per_execution (optional, defaults to 0)
104+
if (root.TryGetProperty("max_batches_per_execution", out JsonElement maxBatchesElement)
105+
&& maxBatchesElement.ValueKind == JsonValueKind.Number)
106+
{
107+
maxBatchesPerExecution = maxBatchesElement.GetInt32();
108+
}
109+
110+
// Extract refresh_newest_first (optional, defaults to true)
111+
if (root.TryGetProperty("refresh_newest_first", out JsonElement refreshNewestFirstElement)
112+
&& (refreshNewestFirstElement.ValueKind == JsonValueKind.True || refreshNewestFirstElement.ValueKind == JsonValueKind.False))
113+
{
114+
refreshNewestFirst = refreshNewestFirstElement.GetBoolean();
115+
}
116+
}
117+
118+
policies[(viewSchema, viewName)] = new ContinuousAggregatePolicyInfo(
119+
StartOffset: startOffset,
120+
EndOffset: endOffset,
121+
ScheduleInterval: scheduleInterval,
122+
Timezone: timezone,
123+
InitialStart: initialStart,
124+
IncludeTieredData: includeTieredData,
125+
BucketsPerBatch: bucketsPerBatch,
126+
MaxBatchesPerExecution: maxBatchesPerExecution,
127+
RefreshNewestFirst: refreshNewestFirst
128+
);
129+
}
130+
}
131+
132+
// Convert to object dictionary to match interface
133+
return policies.ToDictionary(
134+
kvp => kvp.Key,
135+
kvp => (object)kvp.Value
136+
);
137+
}
138+
finally
139+
{
140+
if (!wasOpen)
141+
{
142+
connection.Close();
143+
}
144+
}
145+
}
146+
147+
/// <summary>
148+
/// Parses an interval or integer value from JSONB.
149+
/// TimescaleDB stores intervals as strings (e.g., "1 mon", "7 days")
150+
/// or integers for integer-based time columns.
151+
/// </summary>
152+
private static string? ParseIntervalOrInteger(JsonElement element)
153+
{
154+
if (element.ValueKind == JsonValueKind.Null)
155+
{
156+
return null;
157+
}
158+
159+
if (element.ValueKind == JsonValueKind.String)
160+
{
161+
string value = element.GetString() ?? string.Empty;
162+
// TimescaleDB stores intervals in PostgreSQL format (e.g., "1 mon", "7 days", "01:00:00")
163+
// We need to normalize these to a format that matches what users would write
164+
return NormalizeInterval(value);
165+
}
166+
167+
if (element.ValueKind == JsonValueKind.Number)
168+
{
169+
// Integer-based time column
170+
return element.GetInt64().ToString();
171+
}
172+
173+
return null;
174+
}
175+
176+
/// <summary>
177+
/// Normalizes PostgreSQL interval format to user-friendly format.
178+
/// </summary>
179+
/// <remarks>
180+
/// PostgreSQL stores intervals in formats like:
181+
/// - "1 mon" for 1 month
182+
/// - "7 days" for 7 days
183+
/// - "01:00:00" for 1 hour
184+
/// We normalize these to match the format users would use in Fluent API:
185+
/// - "1 month"
186+
/// - "7 days"
187+
/// - "1 hour"
188+
/// </remarks>
189+
private static string NormalizeInterval(string pgInterval)
190+
{
191+
if (string.IsNullOrWhiteSpace(pgInterval))
192+
{
193+
return pgInterval;
194+
}
195+
196+
string normalized = pgInterval.Trim();
197+
198+
// Replace "mon" with "month"
199+
normalized = normalized.Replace(" mon", " month");
200+
201+
// Convert time-only intervals (HH:MM:SS) to hour/minute format
202+
if (TimeSpan.TryParse(normalized, out TimeSpan timeSpan))
203+
{
204+
if (timeSpan.TotalMinutes < 60 && timeSpan.Minutes > 0 && timeSpan.Hours == 0)
205+
{
206+
return $"{timeSpan.Minutes} minute{(timeSpan.Minutes > 1 ? "s" : "")}";
207+
}
208+
if (timeSpan.TotalHours < 24 && timeSpan.Hours > 0)
209+
{
210+
return $"{timeSpan.Hours} hour{(timeSpan.Hours > 1 ? "s" : "")}";
211+
}
212+
// For days, use the total days
213+
if (timeSpan.Days > 0)
214+
{
215+
return $"{timeSpan.Days} day{(timeSpan.Days > 1 ? "s" : "")}";
216+
}
217+
}
218+
219+
return normalized;
220+
}
221+
}
222+
}

src/Eftdb.Design/TimescaleCSharpMigrationOperationGenerator.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ protected override void Generate(MigrationOperation operation, IndentedStringBui
1616
HypertableOperationGenerator? hypertableOperationGenerator = null;
1717
ReorderPolicyOperationGenerator? reorderPolicyOperationGenerator = null;
1818
ContinuousAggregateOperationGenerator? continuousAggregateOperationGenerator = null;
19+
ContinuousAggregatePolicyOperationGenerator? continuousAggregatePolicyOperationGenerator = null;
1920

2021
List<string> statements;
2122
bool suppressTransaction = false;
@@ -58,6 +59,16 @@ protected override void Generate(MigrationOperation operation, IndentedStringBui
5859
statements = continuousAggregateOperationGenerator.Generate(dropContinuousAggregate);
5960
break;
6061

62+
case AddContinuousAggregatePolicyOperation addContinuousAggregatePolicy:
63+
continuousAggregatePolicyOperationGenerator ??= new(isDesignTime: true);
64+
statements = continuousAggregatePolicyOperationGenerator.Generate(addContinuousAggregatePolicy);
65+
break;
66+
67+
case RemoveContinuousAggregatePolicyOperation removeContinuousAggregatePolicy:
68+
continuousAggregatePolicyOperationGenerator ??= new(isDesignTime: true);
69+
statements = continuousAggregatePolicyOperationGenerator.Generate(removeContinuousAggregatePolicy);
70+
break;
71+
6172
default:
6273
base.Generate(operation, builder);
6374
return;

src/Eftdb.Design/TimescaleDatabaseModelFactory.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ public class TimescaleDatabaseModelFactory(IDiagnosticsLogger<DbLoggerCategory.S
2020
[
2121
(new HypertableScaffoldingExtractor(), new HypertableAnnotationApplier()),
2222
(new ReorderPolicyScaffoldingExtractor(), new ReorderPolicyAnnotationApplier()),
23-
(new ContinuousAggregateScaffoldingExtractor(), new ContinuousAggregateAnnotationApplier())
23+
(new ContinuousAggregateScaffoldingExtractor(), new ContinuousAggregateAnnotationApplier()),
24+
(new ContinuousAggregatePolicyScaffoldingExtractor(), new ContinuousAggregatePolicyAnnotationApplier())
2425
];
2526

2627
public override DatabaseModel Create(DbConnection connection, DatabaseModelFactoryOptions options)

0 commit comments

Comments
 (0)