Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ The function is configured through Azure App Settings / Environment variables, y

## Schema tracking export

Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`.
Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`. Completed jobs also include `response/existing.zip` and `response/missing.zip`: each tracked change (I/U/D + primary key) is classified by whether that key exists on the **target** database—existing rows include full **target** column values; missing rows list only operation and primary keys. `result.json` exposes SAS URIs for those ZIPs (`existingZipSasUri`, `missingZipSasUri`) alongside the three segment ZIPs.

HTTP routes (Function authorization level):

Expand Down
15 changes: 15 additions & 0 deletions src/SqlBulkSyncFunction/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ public static class Queues
/// Deleted segment only: correlation id when segment processing throws.
/// </summary>
public const string ExportJobDeletedError = ExportJobDeleted + "-error";

/// <summary>
/// Queue for the worker that writes <c>existing.zip</c> and <c>missing.zip</c> (target row present vs absent for each tracked change).
/// </summary>
public const string ExportJobTargetPresence = "exportjob-target-presence";

/// <summary>
/// Signaled when both target-presence ZIPs have been written for an export job.
/// </summary>
public const string ExportJobTargetPresenceDone = ExportJobTargetPresence + "-done";

/// <summary>
/// Target-presence segment: correlation id when processing throws.
/// </summary>
public const string ExportJobTargetPresenceError = ExportJobTargetPresence + "-error";
}

/// <summary>
Expand Down
21 changes: 20 additions & 1 deletion src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ SchemaTrackingExportService schemaTrackingExportService
)
{
/// <summary>
/// Fans out a new export job to the three segment queues.
/// Fans out a new export job to the segment queues (updated, inserted, deleted, target presence).
/// </summary>
[Function(nameof(ProcessExportJobQueues) + nameof(DispatchExportJob))]
public async Task DispatchExportJob(
Expand Down Expand Up @@ -64,6 +64,15 @@ public Task ProcessExportDeleted(
CancellationToken cancellationToken
) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken);

/// <summary>
/// Builds <c>existing.zip</c> and <c>missing.zip</c> (target row present vs absent for each tracked change).
/// </summary>
[Function(nameof(ProcessExportJobQueues) + nameof(ProcessExportTargetPresence))]
public Task ProcessExportTargetPresence(
[QueueTrigger(Constants.Queues.ExportJobTargetPresence)] string correlationId,
CancellationToken cancellationToken
) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.TargetPresence, cancellationToken);

/// <summary>
/// Records completion of the updated segment and finalizes the job when all segments are done.
/// </summary>
Expand Down Expand Up @@ -91,6 +100,15 @@ public Task OnExportDeletedDone(
CancellationToken cancellationToken
) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken);

/// <summary>
/// Records completion of the target-presence segment and finalizes the job when all segments are done.
/// </summary>
[Function(nameof(ProcessExportJobQueues) + nameof(OnExportTargetPresenceDone))]
public Task OnExportTargetPresenceDone(
[QueueTrigger(Constants.Queues.ExportJobTargetPresenceDone)] string correlationId,
CancellationToken cancellationToken
) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.TargetPresence, cancellationToken);

private async Task ProcessSegmentAsync(
string correlationId,
SchemaTrackingExportSegment segment,
Expand Down Expand Up @@ -147,6 +165,7 @@ CancellationToken cancellationToken
SchemaTrackingExportSegment.Updated => Constants.Queues.ExportJobUpdatedError,
SchemaTrackingExportSegment.Inserted => Constants.Queues.ExportJobInsertedError,
SchemaTrackingExportSegment.Deleted => Constants.Queues.ExportJobDeletedError,
SchemaTrackingExportSegment.TargetPresence => Constants.Queues.ExportJobTargetPresenceError,
_ => throw new ArgumentOutOfRangeException(nameof(segment), segment, null)
};

Expand Down
32 changes: 32 additions & 0 deletions src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Globalization;
using System.Linq;
using SqlBulkSyncFunction.Models.Schema;
using SqlBulkSyncFunction.Models.Schema.Export;
Expand Down Expand Up @@ -87,6 +88,37 @@ FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct
""";
}

/// <summary>
/// Builds a query that returns every change in the version range with <c>SYS_CHANGE_OPERATION</c> as <c>changeOperation</c> (<c>I</c>/<c>U</c>/<c>D</c>)
/// and primary key columns aliased as <c>pk_c0</c>..<c>pk_cN</c> (same order as <paramref name="primaryKeyColumns"/>).
/// </summary>
/// <param name="sourceTableName">Fully qualified source table name.</param>
/// <param name="primaryKeyColumns">Primary key columns in a stable order.</param>
/// <returns>Parameterized SQL using <c>@FromVersion</c>.</returns>
public static string GetChangeTrackingExportChangeOperationAndPrimaryKeysSelectStatement(
string sourceTableName,
Column[] primaryKeyColumns
)
{
if (primaryKeyColumns.Length == 0)
{
throw new InvalidOperationException($"Missing primary key columns for table {sourceTableName}.");
}

var pkAliases = string.Join(
",\r\n ",
primaryKeyColumns.Select(
(column, index) => string.Concat("ct.", column.QuoteName, " AS [pk_c", index.ToString(CultureInfo.InvariantCulture), "]")
)
);

return $"""
SELECT CAST(ct.SYS_CHANGE_OPERATION AS NVARCHAR(1)) AS [changeOperation],
{pkAliases}
FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct
""";
}

/// <summary>
/// Builds a query for one export segment: insert/update rows join the live table for full column values;
/// delete rows return primary key columns from <c>CHANGETABLE</c> only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public sealed class ExportJobTableEntity : ITableEntity
/// </summary>
public bool DeletedDone { get; set; }

/// <summary>
/// Whether the combined target-presence segment (<c>existing.zip</c> and <c>missing.zip</c>) finished successfully.
/// </summary>
public bool TargetPresenceDone { get; set; }

/// <summary>
/// UTC time when the job was accepted.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public static SchemaTrackingExportJobListItem ToListItem(ExportJobTableEntity en
UpdatedDone: entity.UpdatedDone,
InsertedDone: entity.InsertedDone,
DeletedDone: entity.DeletedDone,
TargetPresenceDone: entity.TargetPresenceDone,
Error: entity.Error,
Result: result
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace SqlBulkSyncFunction.Models.Schema.Export;
/// <param name="UpdatedDone">Whether the updated ZIP leg completed.</param>
/// <param name="InsertedDone">Whether the inserted ZIP leg completed.</param>
/// <param name="DeletedDone">Whether the deleted ZIP leg completed.</param>
/// <param name="TargetPresenceDone">Whether the target-presence ZIP pair (<c>existing.zip</c> / <c>missing.zip</c>) completed.</param>
/// <param name="Error">Optional error message when status is failed.</param>
/// <param name="Result">Populated for single-job status when <c>response/result.json</c> exists; otherwise <see langword="null"/>.</param>
public record SchemaTrackingExportJobListItem(
Expand All @@ -28,6 +29,7 @@ public record SchemaTrackingExportJobListItem(
bool UpdatedDone,
bool InsertedDone,
bool DeletedDone,
bool TargetPresenceDone,
string? Error,
SchemaTrackingExportJobListItemResult? Result = null
);
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ namespace SqlBulkSyncFunction.Models.Schema.Export;
/// <param name="UpdatedZipSasUri">Read SAS URI for <c>updated.zip</c>.</param>
/// <param name="InsertedZipSasUri">Read SAS URI for <c>inserted.zip</c>.</param>
/// <param name="DeletedZipSasUri">Read SAS URI for <c>deleted.zip</c>.</param>
/// <param name="ExistingZipSasUri">Read SAS URI for <c>existing.zip</c>; null when absent from stored result.</param>
/// <param name="MissingZipSasUri">Read SAS URI for <c>missing.zip</c>; null when absent from stored result.</param>
public record SchemaTrackingExportJobListItemResult(
DateTimeOffset SasExpires,
Uri UpdatedZipSasUri,
Uri InsertedZipSasUri,
Uri DeletedZipSasUri
Uri DeletedZipSasUri,
Uri? ExistingZipSasUri,
Uri? MissingZipSasUri
)
{
/// <summary>
Expand All @@ -31,6 +35,8 @@ Uri DeletedZipSasUri
jobResult.SasExpires,
jobResult.UpdatedZipSasUri,
jobResult.InsertedZipSasUri,
jobResult.DeletedZipSasUri
jobResult.DeletedZipSasUri,
jobResult.ExistingZipSasUri,
jobResult.MissingZipSasUri
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ namespace SqlBulkSyncFunction.Models.Schema.Export;
/// <param name="UpdatedZipSasUri">Read SAS URI for <c>updated.zip</c>.</param>
/// <param name="InsertedZipSasUri">Read SAS URI for <c>inserted.zip</c>.</param>
/// <param name="DeletedZipSasUri">Read SAS URI for <c>deleted.zip</c>.</param>
/// <param name="ExistingZipSasUri">Read SAS URI for <c>existing.zip</c> (changes whose PK exists on target, including target column values); null for results written before this feature.</param>
/// <param name="MissingZipSasUri">Read SAS URI for <c>missing.zip</c> (changes whose PK does not exist on target); null for results written before this feature.</param>
public record SchemaTrackingExportJobResult(
string Area,
string Id,
Expand All @@ -35,5 +37,7 @@ public record SchemaTrackingExportJobResult(
DateTimeOffset SasExpires,
Uri UpdatedZipSasUri,
Uri InsertedZipSasUri,
Uri DeletedZipSasUri
Uri DeletedZipSasUri,
Uri? ExistingZipSasUri = null,
Uri? MissingZipSasUri = null
);
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@ public enum SchemaTrackingExportSegment
/// <summary>
/// Rows with <c>SYS_CHANGE_OPERATION = N'D'</c> (primary key columns only).
/// </summary>
Deleted = 2
Deleted = 2,

/// <summary>
/// Single worker pass: writes <c>existing.zip</c> and <c>missing.zip</c> by comparing each change (I/U/D + PK) to target row existence (cross-connection safe).
/// </summary>
TargetPresence = 3
}
Loading
Loading