-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTimescaleDbCopyExtensions.cs
More file actions
72 lines (62 loc) · 3.25 KB
/
TimescaleDbCopyExtensions.cs
File metadata and controls
72 lines (62 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
using CmdScale.EntityFrameworkCore.TimescaleDB.Abstractions;
using Npgsql;
namespace CmdScale.EntityFrameworkCore.TimescaleDB
{
public static class TimescaleDbCopyExtensions
{
/// <summary>
/// Performs a high-performance bulk copy of a data collection into a PostgreSQL table or TimescaleDB hypertable
/// using PostgreSQL's binary COPY command with parallel workers.
/// </summary>
/// <typeparam name="T">The entity type of the data being inserted.</typeparam>
/// <param name="data">The collection of data to be inserted.</param>
/// <param name="connectionString">The database connection string.</param>
/// <param name="config">A <see cref="TimescaleCopyConfig{T}"/> object that configures the bulk copy operation, including table name, column mappings, and parallelism.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous completion of the entire bulk copy operation.</returns>
public static async Task BulkCopyAsync<T>(
this IEnumerable<T> data,
string connectionString,
TimescaleCopyConfig<T>? config = null)
{
config ??= new TimescaleCopyConfig<T>();
// Generate the SQL COPY command from the configuration
string copyCommand = $"COPY \"{config.TableName}\" (\"{string.Join("\", \"", config.ColumnMappings.Keys)}\") FROM STDIN (FORMAT BINARY)";
// Create parallel workers to ingest the data
List<Task> tasks = [];
int totalRecords = data.Count();
int workerChunkSize = (int)Math.Ceiling((double)totalRecords / config.NumberOfWorkers);
for (int i = 0; i < config.NumberOfWorkers; i++)
{
int startIndex = i * workerChunkSize;
IEnumerable<T> workerData = [.. data.Skip(startIndex).Take(workerChunkSize)];
if (!workerData.Any())
{
break;
}
tasks.Add(Task.Run(async () =>
{
using NpgsqlConnection connection = new(connectionString);
await connection.OpenAsync();
for (int j = 0; j < workerData.Count(); j += config.MaxBatchSize)
{
IEnumerable<T> batch = workerData.Skip(j).Take(config.MaxBatchSize);
// Start a binary import stream
await using NpgsqlBinaryImporter writer = await connection.BeginBinaryImportAsync(copyCommand);
foreach (T? item in batch)
{
await writer.StartRowAsync();
// Write each configured column in the specified order
foreach ((Func<T, object?>? Getter, NpgsqlTypes.NpgsqlDbType DbType) in config.ColumnMappings.Values)
{
object? value = Getter(item);
await writer.WriteAsync(value, DbType);
}
}
await writer.CompleteAsync();
}
}));
}
await Task.WhenAll(tasks);
}
}
}