Skip to content

Commit 0e805f8

Browse files
feat: implement support for code-first continuous aggregates
1 parent 6ae595e commit 0e805f8

24 files changed

Lines changed: 1043 additions & 72 deletions

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,4 +431,7 @@ FodyWeavers.xsd
431431
CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess/Migrations/
432432

433433
# Ignore all scaffolded models and the DbContext from the DbFirst project
434-
CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.DbFirst/
434+
CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.DbFirst/
435+
436+
# AI
437+
CLAUDE.md

CmdScale.EntityFrameworkCore.TimescaleDB.Design/TimescaleCSharpMigrationOperationGenerator.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ protected override void Generate(MigrationOperation operation, IndentedStringBui
1515

1616
HypertableOperationGenerator? hypertableOperationGenerator = null;
1717
ReorderPolicyOperationGenerator? reorderPolicyOperationGenerator = null;
18+
ContinuousAggregateOperationGenerator? continuousAggregateOperationGenerator = null;
19+
1820
List<string> statements = [];
21+
bool suppressTransaction = false;
1922

2023
switch (operation)
2124
{
@@ -41,12 +44,26 @@ protected override void Generate(MigrationOperation operation, IndentedStringBui
4144
statements = reorderPolicyOperationGenerator.Generate(dropReorder);
4245
break;
4346

47+
case CreateContinuousAggregateOperation createContinuousAggregate:
48+
continuousAggregateOperationGenerator ??= new(isDesignTime: true);
49+
statements = continuousAggregateOperationGenerator.Generate(createContinuousAggregate);
50+
suppressTransaction = true;
51+
break;
52+
case AlterContinuousAggregateOperation alterContinuousAggregate:
53+
continuousAggregateOperationGenerator ??= new(isDesignTime: true);
54+
statements = continuousAggregateOperationGenerator.Generate(alterContinuousAggregate);
55+
break;
56+
case DropContinuousAggregateOperation dropContinuousAggregate:
57+
continuousAggregateOperationGenerator ??= new(isDesignTime: true);
58+
statements = continuousAggregateOperationGenerator.Generate(dropContinuousAggregate);
59+
break;
60+
4461
default:
4562
base.Generate(operation, builder);
4663
break;
4764
}
4865

49-
SqlBuilderHelper.BuildQueryString(statements, builder);
66+
SqlBuilderHelper.BuildQueryString(statements, builder, suppressTransaction);
5067
}
5168

5269
}

CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess/Configurations/TradeAggregateConfiguration.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ public class TradeAggregateConfiguration : IEntityTypeConfiguration<TradeAggrega
1111
public void Configure(EntityTypeBuilder<TradeAggregate> builder)
1212
{
1313
builder.HasNoKey();
14-
builder.IsContinuousAggregate<TradeAggregate, Trade>("trade_aggregate_view", "1 hour", x => x.Timestamp)
14+
builder.IsContinuousAggregate<TradeAggregate, Trade>("trade_aggregate_view", "1 hour", x => x.Timestamp, true, "7 days")
1515
.AddAggregateFunction(x => x.AveragePrice, x => x.Price, EAggregateFunction.Avg)
1616
.AddAggregateFunction(x => x.MinPrice, x => x.Price, EAggregateFunction.Max)
1717
.AddAggregateFunction(x => x.MaxPrice, x => x.Price, EAggregateFunction.Min)
1818
.AddGroupByColumn(x => x.Exchange)
19-
.AddGroupByColumn("1, 1")
20-
.SetWhereClause(x => x.Ticker == "MCRS");
19+
.AddGroupByColumn("1, 2")
20+
.Where("\"ticker\" = 'MCRS'")
21+
.MaterializedOnly();
2122
}
2223
}
2324
}

CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess/Configurations/TradeConfiguration.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public void Configure(EntityTypeBuilder<Trade> builder)
1414
builder.HasNoKey()
1515
.IsHypertable(x => x.Timestamp)
1616
.WithChunkTimeInterval("1 day");
17+
builder.HasIndex(x => x.Timestamp).HasDatabaseName("Trades_Timestamp_idx");
1718
builder.WithReorderPolicy("Trades_Timestamp_idx", DateTime.Parse("2025-09-23T09:15:19.3905112Z"), "2 days", "10 minutes", -1, "1 minute");
1819
}
1920
}

CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess/Models/DeviceReading.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
namespace CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.Models
66
{
77
[Hypertable(nameof(Time), ChunkSkipColumns = new[] { "Time" }, ChunkTimeInterval = "1 day", EnableCompression = true)]
8-
[ReorderPolicy("DeviceReadings_Time_idx", InitialStart = "2025-09-23T09:15:19.3905112Z", ScheduleInterval = "1 day", MaxRuntime = "00:00:00", RetryPeriod = "00:05:00", MaxRetries = 3)]
8+
[Index(nameof(Time), Name = "ix_device_readings_time")]
99
[PrimaryKey(nameof(Id), nameof(Time))]
10+
[ReorderPolicy("ix_device_readings_time", InitialStart = "2025-09-23T09:15:19.3905112Z", ScheduleInterval = "1 day", MaxRuntime = "00:00:00", RetryPeriod = "00:05:00", MaxRetries = 3)]
1011
public class DeviceReading
1112
{
1213
public Guid Id { get; set; }
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using CmdScale.EntityFrameworkCore.TimescaleDB.Abstractions;
2+
using CmdScale.EntityFrameworkCore.TimescaleDB.Configuration.ContinuousAggregate;
3+
using Microsoft.EntityFrameworkCore;
4+
5+
namespace CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess.Models
6+
{
7+
/// <summary>
8+
/// Example continuous aggregate showcasing all possible configuration properties and aggregate functions.
9+
/// This aggregates weather data into daily buckets with various statistical measures.
10+
/// </summary>
11+
[Keyless]
12+
[ContinuousAggregate(
13+
MaterializedViewName = "weather_aggregates",
14+
ParentName = nameof(WeatherData),
15+
ChunkInterval = "1 month",
16+
WithNoData = true,
17+
CreateGroupIndexes = true,
18+
MaterializedOnly = false,
19+
Where = "\"temperature\" > -50 AND \"humidity\" >= 0")]
20+
[TimeBucket("1 day", nameof(WeatherData.Time), GroupBy = true)]
21+
public class WeatherAggregate
22+
{
23+
// Avg aggregate function
24+
[Aggregate(EAggregateFunction.Avg, nameof(WeatherData.Temperature))]
25+
public double AverageTemperature { get; set; }
26+
27+
// Max aggregate function
28+
[Aggregate(EAggregateFunction.Max, nameof(WeatherData.Humidity))]
29+
public double MaxHumidity { get; set; }
30+
31+
// Min aggregate function
32+
[Aggregate(EAggregateFunction.Min, nameof(WeatherData.Humidity))]
33+
public double MinHumidity { get; set; }
34+
35+
// Sum aggregate function
36+
[Aggregate(EAggregateFunction.Sum, nameof(WeatherData.Temperature))]
37+
public double TotalTemperature { get; set; }
38+
39+
// Count aggregate function (using "*" for count all records)
40+
[Aggregate(EAggregateFunction.Cout, "*")]
41+
public int RecordCount { get; set; }
42+
43+
// First aggregate function (gets first temperature value in time bucket)
44+
[Aggregate(EAggregateFunction.First, nameof(WeatherData.Temperature))]
45+
public double FirstTemperature { get; set; }
46+
47+
// Last aggregate function (gets last temperature value in time bucket)
48+
[Aggregate(EAggregateFunction.Last, nameof(WeatherData.Temperature))]
49+
public double LastTemperature { get; set; }
50+
}
51+
}

CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess/Models/WeatherAggregates.cs

Lines changed: 0 additions & 25 deletions
This file was deleted.

CmdScale.EntityFrameworkCore.TimescaleDB.Example.DataAccess/TimescaleContext.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public class TimescaleContext(DbContextOptions<TimescaleContext> options) : DbCo
1111
public DbSet<OrderStatusEvent> OrderStatusEvents { get; set; }
1212
public DbSet<Trade> Trades { get; set; }
1313
public DbSet<TradeWithId> TradesWithId { get; set; }
14+
public DbSet<TradeAggregate> TradeAggregates { get; set; }
15+
public DbSet<WeatherAggregate> WeatherAggregates { get; set; }
1416

1517
protected override void OnModelCreating(ModelBuilder modelBuilder)
1618
{

CmdScale.EntityFrameworkCore.TimescaleDB.Tests/CmdScale.EntityFrameworkCore.TimescaleDB.Tests.csproj

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,13 @@
1414
<PrivateAssets>all</PrivateAssets>
1515
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
1616
</PackageReference>
17+
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.0">
18+
<PrivateAssets>all</PrivateAssets>
19+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
20+
</PackageReference>
1721
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="18.0.0" />
1822
<PackageReference Include="Moq" Version="4.20.72" />
23+
<PackageReference Include="Testcontainers.PostgreSql" Version="4.8.1" />
1924
<PackageReference Include="xunit" Version="2.9.3" />
2025
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.5">
2126
<PrivateAssets>all</PrivateAssets>
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
using CmdScale.EntityFrameworkCore.TimescaleDB.Abstractions;
2+
using CmdScale.EntityFrameworkCore.TimescaleDB.Configuration.ContinuousAggregate;
3+
using CmdScale.EntityFrameworkCore.TimescaleDB.Configuration.Hypertable;
4+
using Microsoft.EntityFrameworkCore;
5+
using Testcontainers.PostgreSql;
6+
7+
namespace CmdScale.EntityFrameworkCore.TimescaleDB.Tests.Integration
8+
{
9+
public class ContinuousAggregateIntegrationTests : IAsyncLifetime
10+
{
11+
private PostgreSqlContainer? _container;
12+
private string? _connectionString;
13+
14+
public async Task InitializeAsync()
15+
{
16+
// Start TimescaleDB container
17+
_container = new PostgreSqlBuilder()
18+
.WithImage("timescale/timescaledb:latest-pg16")
19+
.WithDatabase("test_db")
20+
.WithUsername("test_user")
21+
.WithPassword("test_password")
22+
.Build();
23+
24+
await _container.StartAsync();
25+
_connectionString = _container.GetConnectionString();
26+
27+
await using var context = new TestDbContext(_connectionString);
28+
}
29+
30+
public async Task DisposeAsync()
31+
{
32+
if (_container != null)
33+
{
34+
await _container.DisposeAsync();
35+
}
36+
}
37+
38+
[Fact]
39+
public async Task ContinuousAggregate_ShouldAggregateTradeData_Successfully()
40+
{
41+
// Arrange
42+
await using var context = new TestDbContext(_connectionString!);
43+
44+
// Create the database schema (tables, hypertables, and continuous aggregates)
45+
await context.Database.ExecuteSqlRawAsync(@"
46+
CREATE TABLE IF NOT EXISTS ""Trades"" (
47+
""Timestamp"" timestamp with time zone NOT NULL,
48+
""Ticker"" text NOT NULL,
49+
""Price"" numeric NOT NULL,
50+
""Size"" integer NOT NULL,
51+
""Exchange"" text NOT NULL
52+
);
53+
");
54+
55+
// Create hypertable
56+
await context.Database.ExecuteSqlRawAsync(@"
57+
SELECT create_hypertable('""Trades""', 'Timestamp', if_not_exists => TRUE);
58+
SELECT set_chunk_time_interval('public.""Trades""', INTERVAL '1 day');
59+
");
60+
61+
// Create continuous aggregate (must be separate due to transaction requirements)
62+
await context.Database.ExecuteSqlRawAsync(@"
63+
CREATE MATERIALIZED VIEW IF NOT EXISTS trade_aggregate_view
64+
WITH (timescaledb.continuous, timescaledb.create_group_indexes = false, timescaledb.materialized_only = false) AS
65+
SELECT time_bucket('1 hour', ""Timestamp"") AS time_bucket, ""Exchange"", AVG(""Price"") AS ""AveragePrice"", MAX(""Price"") AS ""MaxPrice"", MIN(""Price"") AS ""MinPrice""
66+
FROM ""Trades""
67+
WHERE ""Ticker"" = 'MCRS'
68+
GROUP BY time_bucket, ""Exchange"";
69+
");
70+
71+
// Insert test data using raw SQL (since TestTrade is keyless and can't be tracked)
72+
await context.Database.ExecuteSqlRawAsync(@"
73+
INSERT INTO ""Trades"" (""Timestamp"", ""Ticker"", ""Price"", ""Size"", ""Exchange"")
74+
VALUES
75+
('2025-01-06 10:15:00+00', 'MCRS', 150.50, 100, 'NYSE'),
76+
('2025-01-06 10:30:00+00', 'MCRS', 151.00, 200, 'NYSE'),
77+
('2025-01-06 10:45:00+00', 'MCRS', 149.75, 150, 'NYSE'),
78+
('2025-01-06 10:20:00+00', 'MCRS', 150.00, 180, 'NASDAQ'),
79+
('2025-01-06 10:50:00+00', 'MCRS', 151.50, 220, 'NASDAQ'),
80+
('2025-01-06 11:10:00+00', 'MCRS', 152.00, 300, 'NYSE'),
81+
('2025-01-06 11:25:00+00', 'MCRS', 153.25, 250, 'NYSE'),
82+
('2025-01-06 11:40:00+00', 'MCRS', 151.75, 180, 'NYSE'),
83+
('2025-01-06 10:15:00+00', 'AAPL', 180.00, 100, 'NYSE'),
84+
('2025-01-06 10:30:00+00', 'TSLA', 250.00, 50, 'NASDAQ');
85+
");
86+
87+
// Act - Manually refresh the continuous aggregate
88+
await context.Database.ExecuteSqlRawAsync(
89+
"CALL refresh_continuous_aggregate('public.trade_aggregate_view', NULL, NULL);");
90+
91+
// Query the continuous aggregate
92+
var aggregates = await context.TradeAggregates
93+
.OrderBy(a => a.TimeBucket)
94+
.ThenBy(a => a.Exchange)
95+
.ToListAsync();
96+
97+
// Assert
98+
// We expect 3 aggregates: 2 for hour 10:00 (NYSE + NASDAQ) and 1 for hour 11:00 (NYSE only)
99+
Assert.Equal(3, aggregates.Count);
100+
101+
// Verify Hour 1 - NYSE aggregate
102+
var hour1Nyse = aggregates.First(a =>
103+
a.TimeBucket == new DateTime(2025, 1, 6, 10, 0, 0, DateTimeKind.Utc) &&
104+
a.Exchange == "NYSE");
105+
Assert.Equal(150.4166666666666667m, hour1Nyse.AveragePrice); // (150.50 + 151.00 + 149.75) / 3
106+
Assert.Equal(151.00m, hour1Nyse.MaxPrice);
107+
Assert.Equal(149.75m, hour1Nyse.MinPrice);
108+
109+
// Verify Hour 1 - NASDAQ aggregate
110+
var hour1Nasdaq = aggregates.First(a =>
111+
a.TimeBucket == new DateTime(2025, 1, 6, 10, 0, 0, DateTimeKind.Utc) &&
112+
a.Exchange == "NASDAQ");
113+
Assert.Equal(150.75m, hour1Nasdaq.AveragePrice); // (150.00 + 151.50) / 2
114+
Assert.Equal(151.50m, hour1Nasdaq.MaxPrice);
115+
Assert.Equal(150.00m, hour1Nasdaq.MinPrice);
116+
117+
// Verify Hour 2 - NYSE aggregate
118+
var hour2Nyse = aggregates.First(a =>
119+
a.TimeBucket == new DateTime(2025, 1, 6, 11, 0, 0, DateTimeKind.Utc) &&
120+
a.Exchange == "NYSE");
121+
Assert.Equal(152.3333333333333333m, hour2Nyse.AveragePrice); // (152.00 + 153.25 + 151.75) / 3
122+
Assert.Equal(153.25m, hour2Nyse.MaxPrice);
123+
Assert.Equal(151.75m, hour2Nyse.MinPrice);
124+
125+
// Verify that other tickers (AAPL, TSLA) are NOT in the aggregates
126+
Assert.DoesNotContain(aggregates, a => a.Exchange != "NYSE" && a.Exchange != "NASDAQ");
127+
}
128+
129+
#region Test Models and DbContext
130+
131+
private class TestTrade
132+
{
133+
public DateTime Timestamp { get; set; }
134+
public string Ticker { get; set; } = string.Empty;
135+
public decimal Price { get; set; }
136+
public int Size { get; set; }
137+
public string Exchange { get; set; } = string.Empty;
138+
}
139+
140+
private class TestTradeAggregate
141+
{
142+
public DateTime TimeBucket { get; set; }
143+
public string Exchange { get; set; } = string.Empty;
144+
public decimal AveragePrice { get; set; }
145+
public decimal MaxPrice { get; set; }
146+
public decimal MinPrice { get; set; }
147+
}
148+
149+
private class TestDbContext : DbContext
150+
{
151+
private readonly string _connectionString;
152+
153+
public TestDbContext(string connectionString)
154+
{
155+
_connectionString = connectionString;
156+
}
157+
158+
public DbSet<TestTrade> Trades => Set<TestTrade>();
159+
public DbSet<TestTradeAggregate> TradeAggregates => Set<TestTradeAggregate>();
160+
161+
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
162+
{
163+
optionsBuilder.UseNpgsql(_connectionString)
164+
.UseTimescaleDb();
165+
}
166+
167+
protected override void OnModelCreating(ModelBuilder modelBuilder)
168+
{
169+
// Configure Trade as a hypertable
170+
modelBuilder.Entity<TestTrade>(entity =>
171+
{
172+
entity.ToTable("Trades");
173+
entity.HasNoKey();
174+
entity.IsHypertable(x => x.Timestamp)
175+
.WithChunkTimeInterval("1 day");
176+
});
177+
178+
// Configure TradeAggregate as a continuous aggregate
179+
modelBuilder.Entity<TestTradeAggregate>(entity =>
180+
{
181+
entity.HasNoKey();
182+
entity.IsContinuousAggregate<TestTradeAggregate, TestTrade>(
183+
"trade_aggregate_view",
184+
"1 hour",
185+
x => x.Timestamp)
186+
.AddAggregateFunction(x => x.AveragePrice, x => x.Price, EAggregateFunction.Avg)
187+
.AddAggregateFunction(x => x.MaxPrice, x => x.Price, EAggregateFunction.Max)
188+
.AddAggregateFunction(x => x.MinPrice, x => x.Price, EAggregateFunction.Min)
189+
.AddGroupByColumn(x => x.Exchange)
190+
.Where("\"Ticker\" = 'MCRS'");
191+
192+
// Map properties to view columns
193+
entity.Property(x => x.TimeBucket).HasColumnName("time_bucket");
194+
entity.Property(x => x.Exchange).HasColumnName("Exchange");
195+
entity.Property(x => x.AveragePrice).HasColumnName("AveragePrice");
196+
entity.Property(x => x.MaxPrice).HasColumnName("MaxPrice");
197+
entity.Property(x => x.MinPrice).HasColumnName("MinPrice");
198+
});
199+
}
200+
}
201+
202+
#endregion
203+
}
204+
}

0 commit comments

Comments
 (0)