Skip to content

Commit a809b81

Browse files
Merge pull request #34 from cmdscale/feature/continuous_aggregate_policies
Adds support for continous aggregate policies
2 parents 679355c + a019ee6 commit a809b81

34 files changed

Lines changed: 7843 additions & 161 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ samples/Eftdb.Samples.DatabaseFirst/**/*.cs
436436
# AI
437437
CLAUDE.md
438438
.claude
439+
tmpclaude-*
439440

440441
# Code coverage report
441442
coverage
Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,30 @@
1-
# EF Core Database-First Example with TimescaleDB
1+
# EF Core Database-First Example with TimescaleDB
22

33
This project demonstrates how to use the **Database-First** approach with [TimescaleDB](https://www.timescale.com/) using the `CmdScale.EntityFrameworkCore.TimescaleDB` package.
44

55
---
66

7-
## 📦 Required NuGet Packages
7+
## Required NuGet Packages
88

99
Ensure the following package is installed in your project:
1010

1111
- `CmdScale.EntityFrameworkCore.TimescaleDB.Design`
1212

1313
---
1414

15-
## 🛠️ Scaffold DbContext and Models
15+
## Scaffold DbContext and Models
1616

1717
Use the following command to scaffold the `DbContext` and entity classes from an existing TimescaleDB database:
1818

1919
```bash
20-
dotnet ef dbcontext scaffold
21-
"Host=localhost;Database=cmdscale-ef-timescaledb;Username=timescale_admin;Password=R#!kro#GP43ra8Ae"
22-
CmdScale.EntityFrameworkCore.TimescaleDB.Design
23-
--output-dir Models
24-
--schema public
25-
--context-dir .
26-
--context MyTimescaleDbContext
27-
--project CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.DbFirst
20+
dotnet ef dbcontext scaffold \
21+
"Host=localhost;Database=cmdscale-ef-timescaledb;Username=timescale_admin;Password=R#!kro#GP43ra8Ae" \
22+
CmdScale.EntityFrameworkCore.TimescaleDB.Design \
23+
--output-dir Models \
24+
--schema public \
25+
--context-dir . \
26+
--context MyTimescaleDbContext \
27+
--project samples/Eftdb.Samples.DatabaseFirst
2828
```
2929

3030
This command will:
@@ -37,20 +37,20 @@ This command will:
3737
3838
---
3939

40-
## 📁 Project Structure
40+
## Project Structure
4141

4242
```text
43-
CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.DbFirst/
44-
45-
├── Models/ # Auto-generated entity models
46-
└── MyTimescaleDbContext.cs # Auto-generated DbContext
43+
samples/Eftdb.Samples.DatabaseFirst/
44+
|
45+
+-- Models/ # Auto-generated entity models
46+
+-- MyTimescaleDbContext.cs # Auto-generated DbContext
4747
```
4848

4949
---
5050

51-
## 🐳 Docker
51+
## Docker
5252

53-
- A `docker-compose.yml` file is available in the **Solution Items** to spin up a TimescaleDB container for local development:
53+
- A `docker-compose.yml` file is available at the repository root to spin up a TimescaleDB container for local development:
5454

5555
```bash
5656
docker-compose up -d
@@ -60,7 +60,7 @@ CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.DbFirst/
6060

6161
---
6262

63-
## 📚 Resources
63+
## Resources
6464

6565
- [Entity Framework Core Documentation](https://learn.microsoft.com/en-us/ef/core/)
6666
- [TimescaleDB Documentation](https://docs.timescale.com/)

samples/Eftdb.Samples.Shared/Configurations/TradeAggregateConfiguration.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using CmdScale.EntityFrameworkCore.TimescaleDB.Abstractions;
22
using CmdScale.EntityFrameworkCore.TimescaleDB.Configuration.ContinuousAggregate;
3+
using CmdScale.EntityFrameworkCore.TimescaleDB.Configuration.ContinuousAggregatePolicy;
34
using CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.Models;
45
using Microsoft.EntityFrameworkCore;
56
using Microsoft.EntityFrameworkCore.Metadata.Builders;
@@ -18,7 +19,9 @@ public void Configure(EntityTypeBuilder<TradeAggregate> builder)
1819
.AddGroupByColumn(x => x.Exchange)
1920
.AddGroupByColumn("1, 2")
2021
.Where("\"ticker\" = 'MCRS'")
21-
.MaterializedOnly();
22+
.MaterializedOnly()
23+
.WithRefreshPolicy(startOffset: "7 days", endOffset: "1 hour", scheduleInterval: "1 hour")
24+
.WithRefreshNewestFirst(true);
2225
}
2326
}
2427
}

samples/Eftdb.Samples.Shared/Models/WeatherAggregate.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using CmdScale.EntityFrameworkCore.TimescaleDB.Abstractions;
22
using CmdScale.EntityFrameworkCore.TimescaleDB.Configuration.ContinuousAggregate;
3+
using CmdScale.EntityFrameworkCore.TimescaleDB.Configuration.ContinuousAggregatePolicy;
34
using Microsoft.EntityFrameworkCore;
45

56
namespace CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.Models
@@ -18,6 +19,11 @@ namespace CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.Models
1819
MaterializedOnly = false,
1920
Where = "\"temperature\" > -50 AND \"humidity\" >= 0")]
2021
[TimeBucket("1 day", nameof(WeatherData.Time), GroupBy = true)]
22+
[ContinuousAggregatePolicy(
23+
StartOffset = "30 days",
24+
EndOffset = "1 day",
25+
ScheduleInterval = "1 hour",
26+
RefreshNewestFirst = true)]
2127
public class WeatherAggregate
2228
{
2329
// Avg aggregate function
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using CmdScale.EntityFrameworkCore.TimescaleDB.Configuration.ContinuousAggregatePolicy;
2+
using Microsoft.EntityFrameworkCore.Scaffolding.Metadata;
3+
using static CmdScale.EntityFrameworkCore.TimescaleDB.Design.Scaffolding.ContinuousAggregatePolicyScaffoldingExtractor;
4+
5+
namespace CmdScale.EntityFrameworkCore.TimescaleDB.Design.Scaffolding
6+
{
7+
/// <summary>
8+
/// Applies continuous aggregate policy annotations to scaffolded database views.
9+
/// </summary>
10+
public sealed class ContinuousAggregatePolicyAnnotationApplier : IAnnotationApplier
11+
{
12+
public void ApplyAnnotations(DatabaseTable table, object featureInfo)
13+
{
14+
if (featureInfo is not ContinuousAggregatePolicyInfo info)
15+
{
16+
throw new ArgumentException($"Expected {nameof(ContinuousAggregatePolicyInfo)}, got {featureInfo.GetType().Name}", nameof(featureInfo));
17+
}
18+
19+
// Mark that this continuous aggregate has a refresh policy
20+
table[ContinuousAggregatePolicyAnnotations.HasRefreshPolicy] = true;
21+
22+
// Apply start_offset and end_offset
23+
if (!string.IsNullOrWhiteSpace(info.StartOffset))
24+
{
25+
table[ContinuousAggregatePolicyAnnotations.StartOffset] = info.StartOffset;
26+
}
27+
28+
if (!string.IsNullOrWhiteSpace(info.EndOffset))
29+
{
30+
table[ContinuousAggregatePolicyAnnotations.EndOffset] = info.EndOffset;
31+
}
32+
33+
// Apply schedule_interval
34+
if (!string.IsNullOrWhiteSpace(info.ScheduleInterval))
35+
{
36+
table[ContinuousAggregatePolicyAnnotations.ScheduleInterval] = info.ScheduleInterval;
37+
}
38+
39+
// Apply initial_start
40+
if (info.InitialStart.HasValue)
41+
{
42+
table[ContinuousAggregatePolicyAnnotations.InitialStart] = info.InitialStart.Value;
43+
}
44+
45+
// Apply include_tiered_data (only if not null - it's an optional parameter)
46+
if (info.IncludeTieredData.HasValue)
47+
{
48+
table[ContinuousAggregatePolicyAnnotations.IncludeTieredData] = info.IncludeTieredData.Value;
49+
}
50+
51+
// Apply buckets_per_batch (only if different from default value of 1)
52+
if (info.BucketsPerBatch.HasValue && info.BucketsPerBatch.Value != 1)
53+
{
54+
table[ContinuousAggregatePolicyAnnotations.BucketsPerBatch] = info.BucketsPerBatch.Value;
55+
}
56+
57+
// Apply max_batches_per_execution (only if different from default value of 0)
58+
if (info.MaxBatchesPerExecution.HasValue && info.MaxBatchesPerExecution.Value != 0)
59+
{
60+
table[ContinuousAggregatePolicyAnnotations.MaxBatchesPerExecution] = info.MaxBatchesPerExecution.Value;
61+
}
62+
63+
// Apply refresh_newest_first (only if different from default value of true)
64+
if (info.RefreshNewestFirst.HasValue && !info.RefreshNewestFirst.Value)
65+
{
66+
table[ContinuousAggregatePolicyAnnotations.RefreshNewestFirst] = info.RefreshNewestFirst.Value;
67+
}
68+
}
69+
}
70+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
using System.Data;
2+
using System.Data.Common;
3+
using System.Text.Json;
4+
5+
namespace CmdScale.EntityFrameworkCore.TimescaleDB.Design.Scaffolding
6+
{
7+
/// <summary>
8+
/// Extracts continuous aggregate policy metadata from a TimescaleDB database for scaffolding.
9+
/// </summary>
10+
public sealed class ContinuousAggregatePolicyScaffoldingExtractor : ITimescaleFeatureExtractor
11+
{
12+
public sealed record ContinuousAggregatePolicyInfo(
13+
string? StartOffset,
14+
string? EndOffset,
15+
string? ScheduleInterval,
16+
DateTime? InitialStart,
17+
bool? IncludeTieredData,
18+
int? BucketsPerBatch,
19+
int? MaxBatchesPerExecution,
20+
bool? RefreshNewestFirst
21+
);
22+
23+
public Dictionary<(string Schema, string TableName), object> Extract(DbConnection connection)
24+
{
25+
bool wasOpen = connection.State == ConnectionState.Open;
26+
if (!wasOpen)
27+
{
28+
connection.Open();
29+
}
30+
31+
try
32+
{
33+
Dictionary<(string, string), ContinuousAggregatePolicyInfo> policies = [];
34+
35+
using (DbCommand command = connection.CreateCommand())
36+
{
37+
// Query continuous aggregate policies from TimescaleDB jobs table
38+
command.CommandText = @"
39+
SELECT
40+
ca.user_view_schema,
41+
ca.user_view_name,
42+
j.config,
43+
j.schedule_interval::text,
44+
j.initial_start
45+
FROM timescaledb_information.jobs j
46+
INNER JOIN _timescaledb_catalog.continuous_agg ca
47+
ON (j.config->>'mat_hypertable_id')::integer = ca.mat_hypertable_id
48+
WHERE j.proc_name = 'policy_refresh_continuous_aggregate';";
49+
50+
using DbDataReader reader = command.ExecuteReader();
51+
while (reader.Read())
52+
{
53+
string viewSchema = reader.GetString(0);
54+
string viewName = reader.GetString(1);
55+
string? configJson = reader.IsDBNull(2) ? null : reader.GetString(2);
56+
string? scheduleInterval = reader.IsDBNull(3) ? null : reader.GetString(3);
57+
DateTime? initialStart = reader.IsDBNull(4) ? null : reader.GetDateTime(4);
58+
59+
// Parse the JSONB config to extract policy parameters
60+
string? startOffset = null;
61+
string? endOffset = null;
62+
bool? includeTieredData = null;
63+
int? bucketsPerBatch = null;
64+
int? maxBatchesPerExecution = null;
65+
bool? refreshNewestFirst = null;
66+
67+
if (!string.IsNullOrWhiteSpace(configJson))
68+
{
69+
using JsonDocument doc = JsonDocument.Parse(configJson);
70+
JsonElement root = doc.RootElement;
71+
72+
// Extract start_offset
73+
if (root.TryGetProperty("start_offset", out JsonElement startOffsetElement))
74+
{
75+
startOffset = IntervalParsingHelper.ParseIntervalOrInteger(startOffsetElement);
76+
}
77+
78+
// Extract end_offset
79+
if (root.TryGetProperty("end_offset", out JsonElement endOffsetElement))
80+
{
81+
endOffset = IntervalParsingHelper.ParseIntervalOrInteger(endOffsetElement);
82+
}
83+
84+
// Extract include_tiered_data (optional)
85+
if (root.TryGetProperty("include_tiered_data", out JsonElement includeTieredDataElement)
86+
&& (includeTieredDataElement.ValueKind == JsonValueKind.True || includeTieredDataElement.ValueKind == JsonValueKind.False))
87+
{
88+
includeTieredData = includeTieredDataElement.GetBoolean();
89+
}
90+
91+
// Extract buckets_per_batch (optional, defaults to 1)
92+
if (root.TryGetProperty("buckets_per_batch", out JsonElement bucketsPerBatchElement)
93+
&& bucketsPerBatchElement.ValueKind == JsonValueKind.Number)
94+
{
95+
bucketsPerBatch = bucketsPerBatchElement.GetInt32();
96+
}
97+
98+
// Extract max_batches_per_execution (optional, defaults to 0)
99+
if (root.TryGetProperty("max_batches_per_execution", out JsonElement maxBatchesElement)
100+
&& maxBatchesElement.ValueKind == JsonValueKind.Number)
101+
{
102+
maxBatchesPerExecution = maxBatchesElement.GetInt32();
103+
}
104+
105+
// Extract refresh_newest_first (optional, defaults to true)
106+
if (root.TryGetProperty("refresh_newest_first", out JsonElement refreshNewestFirstElement)
107+
&& (refreshNewestFirstElement.ValueKind == JsonValueKind.True || refreshNewestFirstElement.ValueKind == JsonValueKind.False))
108+
{
109+
refreshNewestFirst = refreshNewestFirstElement.GetBoolean();
110+
}
111+
}
112+
113+
policies[(viewSchema, viewName)] = new ContinuousAggregatePolicyInfo(
114+
StartOffset: startOffset,
115+
EndOffset: endOffset,
116+
ScheduleInterval: scheduleInterval,
117+
InitialStart: initialStart,
118+
IncludeTieredData: includeTieredData,
119+
BucketsPerBatch: bucketsPerBatch,
120+
MaxBatchesPerExecution: maxBatchesPerExecution,
121+
RefreshNewestFirst: refreshNewestFirst
122+
);
123+
}
124+
}
125+
126+
// Convert to object dictionary to match interface
127+
return policies.ToDictionary(
128+
kvp => kvp.Key,
129+
kvp => (object)kvp.Value
130+
);
131+
}
132+
finally
133+
{
134+
if (!wasOpen)
135+
{
136+
connection.Close();
137+
}
138+
}
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)