From da7b8c564173ae1b94830c52e4f350764d82ed1c Mon Sep 17 00:00:00 2001 From: Mattias Karlsson Date: Wed, 20 May 2026 10:43:07 +0200 Subject: [PATCH] feat(export): add target-presence segment with existing.zip and missing.zip Extend schema-tracking export with a fourth parallel segment that classifies every tracked change (I/U/D + primary key) against the sync target database. Jobs now produce response/existing.zip and response/missing.zip alongside the existing updated/inserted/deleted segment ZIPs. result.json and job status APIs expose existingZipSasUri and missingZipSasUri (nullable for older results). - Add TargetPresence export segment, queue constants, and ProcessExportJobQueues handlers (process, done, error) wired like the three source-based segments - Track TargetPresenceDone on export job table entities and list/status models - Dispatch target-presence work from the main export job fan-out - Finalize jobs only when all four segments complete; require both new ZIP blobs before writing result.json with SAS URIs - Add GetChangeTrackingExportChangeOperationAndPrimaryKeysSelectStatement for CHANGETABLE rows with changeOperation and pk_c0..pk_cN aliases - Implement ProcessTargetPresenceSegmentAsync: stream changes from source, batch (200) PKs into #ExportPkBatch on target via SqlBulkCopy, run paired INNER JOIN / LEFT OUTER JOIN queries for existing vs missing rows, stream compact camelCase JSON into paired ZIP archives - Add SchemaTrackingExportTargetPresenceZipWriter for existing.json / missing.json array entries inside each ZIP - Document target-presence outputs and SAS fields in README --- README.md | 2 +- src/SqlBulkSyncFunction/Constants.cs | 15 + .../Functions/ProcessExportJobQueues.cs | 21 +- .../Helpers/SqlStatementExtensions.cs | 32 ++ .../Schema/Export/ExportJobTableEntity.cs | 5 + .../Schema/Export/ExportJobTableMapper.cs | 1 + .../Export/SchemaTrackingExportJobListItem.cs | 2 + .../SchemaTrackingExportJobListItemResult.cs | 10 +- .../Export/SchemaTrackingExportJobResult.cs | 6 +- .../Export/SchemaTrackingExportSegment.cs | 7 +- .../Services/SchemaTrackingExportService.cs | 485 +++++++++++++++++- ...maTrackingExportTargetPresenceZipWriter.cs | 137 +++++ 12 files changed, 691 insertions(+), 32 deletions(-) create mode 100644 src/SqlBulkSyncFunction/Services/SchemaTrackingExportTargetPresenceZipWriter.cs diff --git a/README.md b/README.md index 3148c5f..42edbd0 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ The function is configured through Azure App Settings / Environment variables, y ## Schema tracking export -Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`. +Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`. Completed jobs also include `response/existing.zip` and `response/missing.zip`: each tracked change (I/U/D + primary key) is classified by whether that key exists on the **target** database—existing rows include full **target** column values; missing rows list only operation and primary keys. `result.json` exposes SAS URIs for those ZIPs (`existingZipSasUri`, `missingZipSasUri`) alongside the three segment ZIPs. HTTP routes (Function authorization level): diff --git a/src/SqlBulkSyncFunction/Constants.cs b/src/SqlBulkSyncFunction/Constants.cs index e5268fd..1dbdd6f 100644 --- a/src/SqlBulkSyncFunction/Constants.cs +++ b/src/SqlBulkSyncFunction/Constants.cs @@ -71,6 +71,21 @@ public static class Queues /// Deleted segment only: correlation id when segment processing throws. /// public const string ExportJobDeletedError = ExportJobDeleted + "-error"; + + /// + /// Queue for the worker that writes existing.zip and missing.zip (target row present vs absent for each tracked change). + /// + public const string ExportJobTargetPresence = "exportjob-target-presence"; + + /// + /// Signaled when both target-presence ZIPs have been written for an export job. + /// + public const string ExportJobTargetPresenceDone = ExportJobTargetPresence + "-done"; + + /// + /// Target-presence segment: correlation id when processing throws. + /// + public const string ExportJobTargetPresenceError = ExportJobTargetPresence + "-error"; } /// diff --git a/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs b/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs index 8ebc2c2..873634c 100644 --- a/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs +++ b/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs @@ -19,7 +19,7 @@ SchemaTrackingExportService schemaTrackingExportService ) { /// - /// Fans out a new export job to the three segment queues. + /// Fans out a new export job to the segment queues (updated, inserted, deleted, target presence). /// [Function(nameof(ProcessExportJobQueues) + nameof(DispatchExportJob))] public async Task DispatchExportJob( @@ -64,6 +64,15 @@ public Task ProcessExportDeleted( CancellationToken cancellationToken ) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken); + /// + /// Builds existing.zip and missing.zip (target row present vs absent for each tracked change). + /// + [Function(nameof(ProcessExportJobQueues) + nameof(ProcessExportTargetPresence))] + public Task ProcessExportTargetPresence( + [QueueTrigger(Constants.Queues.ExportJobTargetPresence)] string correlationId, + CancellationToken cancellationToken + ) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.TargetPresence, cancellationToken); + /// /// Records completion of the updated segment and finalizes the job when all segments are done. /// @@ -91,6 +100,15 @@ public Task OnExportDeletedDone( CancellationToken cancellationToken ) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken); + /// + /// Records completion of the target-presence segment and finalizes the job when all segments are done. + /// + [Function(nameof(ProcessExportJobQueues) + nameof(OnExportTargetPresenceDone))] + public Task OnExportTargetPresenceDone( + [QueueTrigger(Constants.Queues.ExportJobTargetPresenceDone)] string correlationId, + CancellationToken cancellationToken + ) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.TargetPresence, cancellationToken); + private async Task ProcessSegmentAsync( string correlationId, SchemaTrackingExportSegment segment, @@ -147,6 +165,7 @@ CancellationToken cancellationToken SchemaTrackingExportSegment.Updated => Constants.Queues.ExportJobUpdatedError, SchemaTrackingExportSegment.Inserted => Constants.Queues.ExportJobInsertedError, SchemaTrackingExportSegment.Deleted => Constants.Queues.ExportJobDeletedError, + SchemaTrackingExportSegment.TargetPresence => Constants.Queues.ExportJobTargetPresenceError, _ => throw new ArgumentOutOfRangeException(nameof(segment), segment, null) }; diff --git a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs index 0611105..212c000 100644 --- a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs +++ b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs @@ -1,4 +1,5 @@ using System; +using System.Globalization; using System.Linq; using SqlBulkSyncFunction.Models.Schema; using SqlBulkSyncFunction.Models.Schema.Export; @@ -87,6 +88,37 @@ FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct """; } + /// + /// Builds a query that returns every change in the version range with SYS_CHANGE_OPERATION as changeOperation (I/U/D) + /// and primary key columns aliased as pk_c0..pk_cN (same order as ). + /// + /// Fully qualified source table name. + /// Primary key columns in a stable order. + /// Parameterized SQL using @FromVersion. + public static string GetChangeTrackingExportChangeOperationAndPrimaryKeysSelectStatement( + string sourceTableName, + Column[] primaryKeyColumns + ) + { + if (primaryKeyColumns.Length == 0) + { + throw new InvalidOperationException($"Missing primary key columns for table {sourceTableName}."); + } + + var pkAliases = string.Join( + ",\r\n ", + primaryKeyColumns.Select( + (column, index) => string.Concat("ct.", column.QuoteName, " AS [pk_c", index.ToString(CultureInfo.InvariantCulture), "]") + ) + ); + + return $""" + SELECT CAST(ct.SYS_CHANGE_OPERATION AS NVARCHAR(1)) AS [changeOperation], + {pkAliases} + FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct + """; + } + /// /// Builds a query for one export segment: insert/update rows join the live table for full column values; /// delete rows return primary key columns from CHANGETABLE only. diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs index f7791cd..1a7d555 100644 --- a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs @@ -68,6 +68,11 @@ public sealed class ExportJobTableEntity : ITableEntity /// public bool DeletedDone { get; set; } + /// + /// Whether the combined target-presence segment (existing.zip and missing.zip) finished successfully. + /// + public bool TargetPresenceDone { get; set; } + /// /// UTC time when the job was accepted. /// diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs index 0e78b46..08266bf 100644 --- a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs @@ -23,6 +23,7 @@ public static SchemaTrackingExportJobListItem ToListItem(ExportJobTableEntity en UpdatedDone: entity.UpdatedDone, InsertedDone: entity.InsertedDone, DeletedDone: entity.DeletedDone, + TargetPresenceDone: entity.TargetPresenceDone, Error: entity.Error, Result: result ); diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs index fc2a011..36b9252 100644 --- a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs @@ -16,6 +16,7 @@ namespace SqlBulkSyncFunction.Models.Schema.Export; /// Whether the updated ZIP leg completed. /// Whether the inserted ZIP leg completed. /// Whether the deleted ZIP leg completed. +/// Whether the target-presence ZIP pair (existing.zip / missing.zip) completed. /// Optional error message when status is failed. /// Populated for single-job status when response/result.json exists; otherwise . public record SchemaTrackingExportJobListItem( @@ -28,6 +29,7 @@ public record SchemaTrackingExportJobListItem( bool UpdatedDone, bool InsertedDone, bool DeletedDone, + bool TargetPresenceDone, string? Error, SchemaTrackingExportJobListItemResult? Result = null ); diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs index fefa0d6..160cc2e 100644 --- a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs @@ -12,11 +12,15 @@ namespace SqlBulkSyncFunction.Models.Schema.Export; /// Read SAS URI for updated.zip. /// Read SAS URI for inserted.zip. /// Read SAS URI for deleted.zip. +/// Read SAS URI for existing.zip; null when absent from stored result. +/// Read SAS URI for missing.zip; null when absent from stored result. public record SchemaTrackingExportJobListItemResult( DateTimeOffset SasExpires, Uri UpdatedZipSasUri, Uri InsertedZipSasUri, - Uri DeletedZipSasUri + Uri DeletedZipSasUri, + Uri? ExistingZipSasUri, + Uri? MissingZipSasUri ) { /// @@ -31,6 +35,8 @@ Uri DeletedZipSasUri jobResult.SasExpires, jobResult.UpdatedZipSasUri, jobResult.InsertedZipSasUri, - jobResult.DeletedZipSasUri + jobResult.DeletedZipSasUri, + jobResult.ExistingZipSasUri, + jobResult.MissingZipSasUri ); } diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs index ce330ca..76e783f 100644 --- a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs @@ -21,6 +21,8 @@ namespace SqlBulkSyncFunction.Models.Schema.Export; /// Read SAS URI for updated.zip. /// Read SAS URI for inserted.zip. /// Read SAS URI for deleted.zip. +/// Read SAS URI for existing.zip (changes whose PK exists on target, including target column values); null for results written before this feature. +/// Read SAS URI for missing.zip (changes whose PK does not exist on target); null for results written before this feature. public record SchemaTrackingExportJobResult( string Area, string Id, @@ -35,5 +37,7 @@ public record SchemaTrackingExportJobResult( DateTimeOffset SasExpires, Uri UpdatedZipSasUri, Uri InsertedZipSasUri, - Uri DeletedZipSasUri + Uri DeletedZipSasUri, + Uri? ExistingZipSasUri = null, + Uri? MissingZipSasUri = null ); diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs index 083b182..3181655 100644 --- a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs +++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs @@ -20,5 +20,10 @@ public enum SchemaTrackingExportSegment /// /// Rows with SYS_CHANGE_OPERATION = N'D' (primary key columns only). /// - Deleted = 2 + Deleted = 2, + + /// + /// Single worker pass: writes existing.zip and missing.zip by comparing each change (I/U/D + PK) to target row existence (cross-connection safe). + /// + TargetPresence = 3 } diff --git a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs index 61611cf..8ae018f 100644 --- a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs +++ b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Data; +using System.Globalization; using System.Linq; using System.Text.Json; using System.Threading; @@ -16,6 +17,7 @@ using Microsoft.Extensions.Options; using SqlBulkSyncFunction.Helpers; using SqlBulkSyncFunction.Models.Job; +using SqlBulkSyncFunction.Models.Schema; using SqlBulkSyncFunction.Models.Schema.Export; namespace SqlBulkSyncFunction.Services; @@ -41,6 +43,15 @@ TableServiceClient tableServiceClient WriteIndented = false }; + private static readonly JsonSerializerOptions TargetPresenceRowJsonOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull, + WriteIndented = false + }; + + private const int TargetPresenceBatchSize = 200; + private readonly BlobContainerClient _exportContainer = GetOrCreateBlobContainer(blobServiceClient, Constants.Containers.Export); private readonly TableClient _exportJobsTable = GetOrCreateTable(tableServiceClient, Constants.Tables.ExportJobs); @@ -119,7 +130,8 @@ CancellationToken cancellationToken CreatedUtc = utcNow, UpdatedDone = false, InsertedDone = false, - DeletedDone = false + DeletedDone = false, + TargetPresenceDone = false }; _ = await _exportJobsTable.UpsertEntityAsync(entity, TableUpdateMode.Replace, cancellationToken).ConfigureAwait(false); @@ -211,6 +223,7 @@ CancellationToken cancellationToken UpdatedDone: false, InsertedDone: false, DeletedDone: false, + TargetPresenceDone: false, Error: null, Result: listResult ); @@ -245,7 +258,7 @@ CancellationToken cancellationToken } /// - /// Dispatches a main-queue message to the three segment queues. + /// Dispatches a main-queue message to the segment queues (updated, inserted, deleted, target presence). /// public async Task DispatchExportJobAsync(string correlationId, CancellationToken cancellationToken) { @@ -269,11 +282,13 @@ await PatchEntityAsync( var qUpdated = GetOrCreateQueue(Constants.Queues.ExportJobUpdated); var qInserted = GetOrCreateQueue(Constants.Queues.ExportJobInserted); var qDeleted = GetOrCreateQueue(Constants.Queues.ExportJobDeleted); + var qTargetPresence = GetOrCreateQueue(Constants.Queues.ExportJobTargetPresence); // Do not pass visibilityTimeout on enqueue: non-zero values hide the message from Dequeue/Peek until the timeout elapses (scheduled/delayed work), not a processing lease. _ = await qUpdated.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false); _ = await qInserted.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false); _ = await qDeleted.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false); + _ = await qTargetPresence.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false); } /// @@ -336,31 +351,46 @@ CancellationToken cancellationToken var targetVersion = targetConn.GetTargetVersion(table.Target); var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion; - var sql = SqlStatementExtensions.GetChangeTrackingExportSegmentSelectStatement(table.Source, columns, segment); - - await using var cmd = new SqlCommand(sql, sourceConn) + if (segment == SchemaTrackingExportSegment.TargetPresence) { - CommandTimeout = 3600 - }; - _ = cmd.Parameters.Add(new SqlParameter("@FromVersion", SqlDbType.BigInt) { Value = fromVersion }); + await ProcessTargetPresenceSegmentAsync( + normalized, + table, + columns, + sourceConn, + targetConn, + fromVersion, + cancellationToken + ).ConfigureAwait(false); + } + else + { + var sql = SqlStatementExtensions.GetChangeTrackingExportSegmentSelectStatement(table.Source, columns, segment); - await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken) - .ConfigureAwait(false); + await using var cmd = new SqlCommand(sql, sourceConn) + { + CommandTimeout = 3600 + }; + _ = cmd.Parameters.Add(new SqlParameter("@FromVersion", SqlDbType.BigInt) { Value = fromVersion }); - var (zipPath, jsonPath) = GetZipRelativePath(segment); - var zipBlob = _exportContainer.GetBlobClient($"{normalized}/{zipPath}"); - await using var uploadStream = await zipBlob.OpenWriteAsync( - true, - new BlobOpenWriteOptions - { - HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip } - }, - cancellationToken - ) - .ConfigureAwait(false); + await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken) + .ConfigureAwait(false); - await SchemaTrackingExportStreamingZip.WriteReaderToZipAsync(reader, uploadStream, jsonPath, cancellationToken) - .ConfigureAwait(false); + var (zipPath, jsonPath) = GetZipRelativePath(segment); + var zipBlob = _exportContainer.GetBlobClient($"{normalized}/{zipPath}"); + await using var uploadStream = await zipBlob.OpenWriteAsync( + true, + new BlobOpenWriteOptions + { + HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip } + }, + cancellationToken + ) + .ConfigureAwait(false); + + await SchemaTrackingExportStreamingZip.WriteReaderToZipAsync(reader, uploadStream, jsonPath, cancellationToken) + .ConfigureAwait(false); + } var doneQueueName = GetDoneQueueName(segment); var doneQueue = GetOrCreateQueue(doneQueueName); @@ -454,7 +484,7 @@ private async Task TryFinalizeIfCompleteAsync( CancellationToken cancellationToken ) { - if (!entity.UpdatedDone || !entity.InsertedDone || !entity.DeletedDone) + if (!entity.UpdatedDone || !entity.InsertedDone || !entity.DeletedDone || !entity.TargetPresenceDone) { return; } @@ -470,10 +500,14 @@ CancellationToken cancellationToken var updatedZip = _exportContainer.GetBlobClient($"{correlationId}/response/updated.zip"); var insertedZip = _exportContainer.GetBlobClient($"{correlationId}/response/inserted.zip"); var deletedZip = _exportContainer.GetBlobClient($"{correlationId}/response/deleted.zip"); + var existingZip = _exportContainer.GetBlobClient($"{correlationId}/response/existing.zip"); + var missingZip = _exportContainer.GetBlobClient($"{correlationId}/response/missing.zip"); if (!await updatedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) || !await insertedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) || - !await deletedZip.ExistsAsync(cancellationToken).ConfigureAwait(false)) + !await deletedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) || + !await existingZip.ExistsAsync(cancellationToken).ConfigureAwait(false) || + !await missingZip.ExistsAsync(cancellationToken).ConfigureAwait(false)) { logger.LogWarning("Finalize: missing one or more ZIP blobs for {CorrelationId}", correlationId); return; @@ -483,6 +517,8 @@ CancellationToken cancellationToken var updatedUri = GenerateReadSasUri(updatedZip, sasExpires); var insertedUri = GenerateReadSasUri(insertedZip, sasExpires); var deletedUri = GenerateReadSasUri(deletedZip, sasExpires); + var existingUri = GenerateReadSasUri(existingZip, sasExpires); + var missingUri = GenerateReadSasUri(missingZip, sasExpires); var completed = DateTimeOffset.UtcNow; var result = new SchemaTrackingExportJobResult( @@ -499,7 +535,9 @@ CancellationToken cancellationToken SasExpires: sasExpires, UpdatedZipSasUri: updatedUri, InsertedZipSasUri: insertedUri, - DeletedZipSasUri: deletedUri + DeletedZipSasUri: deletedUri, + ExistingZipSasUri: existingUri, + MissingZipSasUri: missingUri ); try @@ -551,6 +589,7 @@ private async Task MarkTableCompletedAsync(SchemaTrackingExportJob job, Cancella entity.UpdatedDone = true; entity.InsertedDone = true; entity.DeletedDone = true; + entity.TargetPresenceDone = true; try { @@ -657,6 +696,7 @@ private static bool IsLegDone(ExportJobTableEntity entity, SchemaTrackingExportS SchemaTrackingExportSegment.Updated => entity.UpdatedDone, SchemaTrackingExportSegment.Inserted => entity.InsertedDone, SchemaTrackingExportSegment.Deleted => entity.DeletedDone, + SchemaTrackingExportSegment.TargetPresence => entity.TargetPresenceDone, _ => false }; @@ -673,6 +713,9 @@ private static void SetLegDone(ExportJobTableEntity entity, SchemaTrackingExport case SchemaTrackingExportSegment.Deleted: entity.DeletedDone = value; break; + case SchemaTrackingExportSegment.TargetPresence: + entity.TargetPresenceDone = value; + break; default: throw new ArgumentOutOfRangeException(nameof(segment)); } @@ -684,6 +727,7 @@ private static string GetDoneQueueName(SchemaTrackingExportSegment segment) SchemaTrackingExportSegment.Updated => Constants.Queues.ExportJobUpdatedDone, SchemaTrackingExportSegment.Inserted => Constants.Queues.ExportJobInsertedDone, SchemaTrackingExportSegment.Deleted => Constants.Queues.ExportJobDeletedDone, + SchemaTrackingExportSegment.TargetPresence => Constants.Queues.ExportJobTargetPresenceDone, _ => throw new ArgumentOutOfRangeException(nameof(segment)) }; @@ -698,9 +742,398 @@ private static (string ZipPath, string JsonPath) GetZipRelativePath(SchemaTracki SchemaTrackingExportSegment.Updated => ("response/updated.zip", "updated.json"), SchemaTrackingExportSegment.Inserted => ("response/inserted.zip", "inserted.json"), SchemaTrackingExportSegment.Deleted => ("response/deleted.zip", "deleted.json"), + SchemaTrackingExportSegment.TargetPresence => throw new InvalidOperationException("Target presence uses paired ZIP writers, not a single segment path."), _ => throw new ArgumentOutOfRangeException(nameof(segment)) }; + /// + /// Streams all change rows (I/U/D + PK) from the source, resolves target rows in batches on the target connection, and writes existing.zip / missing.zip. + /// + private async Task ProcessTargetPresenceSegmentAsync( + string normalized, + SyncJobTable table, + Column[] sourceColumns, + SqlConnection sourceConn, + SqlConnection targetConn, + long fromVersion, + CancellationToken cancellationToken + ) + { + var pkColumns = sourceColumns.Where(static c => c.IsPrimary).ToArray(); + if (pkColumns.Length == 0) + { + throw new InvalidOperationException("Target presence export requires at least one primary key column."); + } + + var targetColumns = targetConn.GetColumns(table.Target); + var sql = SqlStatementExtensions.GetChangeTrackingExportChangeOperationAndPrimaryKeysSelectStatement( + table.Source, + pkColumns + ); + + await using var ctCmd = new SqlCommand(sql, sourceConn) { CommandTimeout = 3600 }; + _ = ctCmd.Parameters.Add(new SqlParameter("@FromVersion", SqlDbType.BigInt) { Value = fromVersion }); + + await using var ctReader = await ctCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken) + .ConfigureAwait(false); + + var existingBlob = _exportContainer.GetBlobClient($"{normalized}/response/existing.zip"); + var missingBlob = _exportContainer.GetBlobClient($"{normalized}/response/missing.zip"); + await using var existingUpload = await existingBlob.OpenWriteAsync( + true, + new BlobOpenWriteOptions { HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip } }, + cancellationToken + ) + .ConfigureAwait(false); + await using var missingUpload = await missingBlob.OpenWriteAsync( + true, + new BlobOpenWriteOptions { HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip } }, + cancellationToken + ) + .ConfigureAwait(false); + + await using var zipWriter = new SchemaTrackingExportTargetPresenceZipWriter( + existingUpload, + missingUpload, + TargetPresenceRowJsonOptions + ); + + var changeOpOrdinal = ctReader.GetOrdinal("changeOperation"); + var pkOrdinals = new int[pkColumns.Length]; + for (var i = 0; i < pkColumns.Length; i++) + { + pkOrdinals[i] = ctReader.GetOrdinal(FormattableString.Invariant($"pk_c{i}")); + } + + var batch = new List<(string Op, Dictionary Pk)>(TargetPresenceBatchSize); + + async Task FlushBatchAsync() + { + if (batch.Count == 0) + { + return; + } + + await WriteTargetPresenceBatchFromJoinedQueryAsync( + targetConn, + table.Target, + targetColumns, + pkColumns, + batch, + zipWriter, + cancellationToken + ).ConfigureAwait(false); + + batch.Clear(); + } + + while (await ctReader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + var op = ctReader.GetString(changeOpOrdinal); + if (op.Length != 1) + { + op = op.Trim(); + } + + var pk = new Dictionary(StringComparer.OrdinalIgnoreCase); + for (var i = 0; i < pkColumns.Length; i++) + { + var ord = pkOrdinals[i]; + pk[pkColumns[i].Name] = ctReader.IsDBNull(ord) ? null : ctReader.GetValue(ord); + } + + batch.Add((op, pk)); + if (batch.Count >= TargetPresenceBatchSize) + { + await FlushBatchAsync().ConfigureAwait(false); + } + } + + await FlushBatchAsync().ConfigureAwait(false); + } + + /// + /// Stages changeOperation and PKs in a session temp table, then runs one batch returning two result sets: + /// existing rows (INNER JOIN target) and missing rows (LEFT OUTER JOIN with WHERE t.[firstPk] IS NULL). + /// + private static async Task WriteTargetPresenceBatchFromJoinedQueryAsync( + SqlConnection targetConn, + string targetTableName, + Column[] targetColumns, + Column[] pkColumns, + IReadOnlyList<(string Op, Dictionary Pk)> batch, + SchemaTrackingExportTargetPresenceZipWriter zipWriter, + CancellationToken cancellationToken + ) + { + if (batch.Count == 0) + { + return; + } + + const string tempTableName = "#ExportPkBatch"; + + var joinClause = string.Join( + " AND ", + pkColumns.Select(column => string.Concat("t.", column.QuoteName, " = p.", column.QuoteName)) + ); + + var firstPkOnTarget = pkColumns[0]; + + var createPkCols = string.Join( + ",\n ", + pkColumns.Select( + pkCol => + { + var tc = ResolvePkColumnForTarget(pkCol, targetColumns); + var nullability = tc.IsNullable ? "NULL" : "NOT NULL"; + return string.Concat(tc.QuoteName, " ", tc.Type, " ", nullability); + } + ) + ); + + var createSql = FormattableString.Invariant( + $""" + CREATE TABLE {tempTableName} ( + [changeOperation] NCHAR(1) NOT NULL, + {createPkCols} + ); + """); + + var targetSelectList = string.Join( + ",\n ", + targetColumns.Select(column => string.Concat("t.", column.QuoteName, " AS ", column.QuoteName)) + ); + + var existingSql = FormattableString.Invariant( + $""" + SELECT p.[changeOperation], + {targetSelectList} + FROM {tempTableName} AS p + INNER JOIN {targetTableName} AS t ON {joinClause} + """); + + var pkSelectList = string.Join( + ",\n ", + pkColumns.Select(column => string.Concat("p.", column.QuoteName, " AS ", column.QuoteName)) + ); + + var missingSql = FormattableString.Invariant( + $""" + SELECT p.[changeOperation], + {pkSelectList} + FROM {tempTableName} AS p + LEFT OUTER JOIN {targetTableName} AS t ON {joinClause} + WHERE t.{firstPkOnTarget.QuoteName} IS NULL + """); + + var queryBatch = FormattableString.Invariant( + $""" + {existingSql} + ; + {missingSql} + """); + + try + { + await using (var createCmd = new SqlCommand(createSql, targetConn) { CommandTimeout = 3600 }) + { + _ = await createCmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + } + + using var dataTable = BuildTargetPresenceStagingDataTable(pkColumns, targetColumns, batch); + using var bulk = new SqlBulkCopy(targetConn, SqlBulkCopyOptions.Default, externalTransaction: null) + { + DestinationTableName = tempTableName, + BulkCopyTimeout = 3600 + }; + foreach (DataColumn col in dataTable.Columns) + { + bulk.ColumnMappings.Add(col.ColumnName, col.ColumnName); + } + + await bulk.WriteToServerAsync(dataTable, cancellationToken).ConfigureAwait(false); + + await using var readCmd = new SqlCommand(queryBatch, targetConn) { CommandTimeout = 3600 }; + await using var reader = await readCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken) + .ConfigureAwait(false); + + var changeOpOrdinal = reader.GetOrdinal("changeOperation"); + var targetOrdinals = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var col in targetColumns) + { + targetOrdinals[col.Name] = reader.GetOrdinal(col.Name); + } + + while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + var op = reader.GetString(changeOpOrdinal); + if (op.Length != 1) + { + op = op.Trim(); + } + + var targetRow = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var col in targetColumns) + { + var ord = targetOrdinals[col.Name]; + targetRow[col.Name] = reader.IsDBNull(ord) ? null : reader.GetValue(ord); + } + + var pkRow = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var col in pkColumns) + { + _ = targetRow.TryGetValue(col.Name, out var v); + pkRow[col.Name] = v; + } + + await zipWriter.WriteExistingRowAsync(op, pkRow, targetRow, cancellationToken).ConfigureAwait(false); + } + + if (!await reader.NextResultAsync(cancellationToken).ConfigureAwait(false)) + { + throw new InvalidOperationException("Expected second result set (missing rows) from target-presence batch query."); + } + + var missingChangeOpOrdinal = reader.GetOrdinal("changeOperation"); + var missingPkOrdinals = new int[pkColumns.Length]; + for (var i = 0; i < pkColumns.Length; i++) + { + missingPkOrdinals[i] = reader.GetOrdinal(pkColumns[i].Name); + } + + while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + var op = reader.GetString(missingChangeOpOrdinal); + if (op.Length != 1) + { + op = op.Trim(); + } + + var pkRow = new Dictionary(StringComparer.OrdinalIgnoreCase); + for (var i = 0; i < pkColumns.Length; i++) + { + var ord = missingPkOrdinals[i]; + pkRow[pkColumns[i].Name] = reader.IsDBNull(ord) ? null : reader.GetValue(ord); + } + + await zipWriter.WriteMissingRowAsync(op, pkRow, cancellationToken).ConfigureAwait(false); + } + } + finally + { + try + { + await using var dropCmd = new SqlCommand( + FormattableString.Invariant($"DROP TABLE IF EXISTS {tempTableName};"), + targetConn + ) + { + CommandTimeout = 60 + }; + _ = await dropCmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + } + catch (SqlException) + { + // Best-effort cleanup; connection may already be aborting. + } + } + } + + private static Column ResolvePkColumnForTarget(Column pkColumn, Column[] targetColumns) + => targetColumns.FirstOrDefault( + t => string.Equals(t.Name, pkColumn.Name, StringComparison.OrdinalIgnoreCase) + ) ?? pkColumn; + + private static DataTable BuildTargetPresenceStagingDataTable( + Column[] pkColumns, + Column[] targetColumns, + IReadOnlyList<(string Op, Dictionary Pk)> batch + ) + { + var dt = new DataTable(); + _ = dt.Columns.Add("changeOperation", typeof(string)); + + foreach (var pkCol in pkColumns) + { + var tc = ResolvePkColumnForTarget(pkCol, targetColumns); + var clrType = GetClrTypeForSqlType(tc.Type); + var dc = dt.Columns.Add(pkCol.Name, clrType); + dc.AllowDBNull = tc.IsNullable; + } + + foreach (var (op, pkRow) in batch) + { + var dr = dt.NewRow(); + var opTrimmed = string.IsNullOrWhiteSpace(op) ? "?" : op.Trim(); + dr["changeOperation"] = opTrimmed.Length > 0 ? opTrimmed[..1] : "?"; + foreach (var pkCol in pkColumns) + { + _ = pkRow.TryGetValue(pkCol.Name, out var raw); + dr[pkCol.Name] = NormalizeValueForDataColumn(raw, dt.Columns[pkCol.Name]!.DataType); + } + + dt.Rows.Add(dr); + } + + return dt; + } + + private static object NormalizeValueForDataColumn(object? raw, Type columnClrType) + { + if (raw is null or DBNull) + { + return DBNull.Value; + } + + if (columnClrType.IsInstanceOfType(raw)) + { + return raw; + } + + try + { + return Convert.ChangeType(raw, columnClrType, CultureInfo.InvariantCulture); + } + catch (InvalidCastException) + { + return raw; + } + catch (FormatException) + { + return raw; + } + catch (OverflowException) + { + return raw; + } + } + + private static Type GetClrTypeForSqlType(string sqlType) + { + var trimmed = sqlType.Trim(); + var paren = trimmed.IndexOf('(', StringComparison.Ordinal); + var baseName = (paren >= 0 ? trimmed[..paren] : trimmed).Trim().ToLowerInvariant(); + return baseName switch + { + "bigint" => typeof(long), + "int" => typeof(int), + "smallint" => typeof(short), + "tinyint" => typeof(byte), + "bit" => typeof(bool), + "uniqueidentifier" => typeof(Guid), + "float" => typeof(double), + "real" => typeof(float), + "decimal" or "numeric" => typeof(decimal), + "money" or "smallmoney" => typeof(decimal), + "date" or "datetime" or "datetime2" or "smalldatetime" => typeof(DateTime), + "datetimeoffset" => typeof(DateTimeOffset), + "time" => typeof(TimeSpan), + "sql_variant" => typeof(object), + _ => typeof(string) + }; + } + private async Task ReadJobBlobAsync(string correlationId, CancellationToken cancellationToken) { var blob = _exportContainer.GetBlobClient($"{correlationId}/job.json"); diff --git a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportTargetPresenceZipWriter.cs b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportTargetPresenceZipWriter.cs new file mode 100644 index 0000000..f74825c --- /dev/null +++ b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportTargetPresenceZipWriter.cs @@ -0,0 +1,137 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace SqlBulkSyncFunction.Services; + +#nullable enable + +/// +/// Opens two ZIP archives and streams UTF-8 JSON arrays for export rows that exist vs do not exist on the sync target. +/// Disposing finalizes ZIP entries and archives but leaves outer streams open for the caller to flush and dispose. +/// +internal sealed class SchemaTrackingExportTargetPresenceZipWriter : IAsyncDisposable +{ + private readonly Stream _existingOuter; + private readonly Stream _missingOuter; + private readonly ZipArchive _existingZip; + private readonly ZipArchive _missingZip; + private readonly Stream _existingEntryStream; + private readonly Stream _missingEntryStream; + private readonly JsonSerializerOptions _rowJsonOptions; + private bool _existingFirst = true; + private bool _missingFirst = true; + + /// + /// Initializes a writer over two blob (or file) streams, each containing a single JSON-array entry (existing.json / missing.json). + /// + /// Writable stream for existing.zip. + /// Writable stream for missing.zip. + /// Serializer options per row object (compact JSON). + public SchemaTrackingExportTargetPresenceZipWriter( + Stream existingZipDestination, + Stream missingZipDestination, + JsonSerializerOptions rowJsonOptions + ) + { + ArgumentNullException.ThrowIfNull(existingZipDestination); + ArgumentNullException.ThrowIfNull(missingZipDestination); + ArgumentNullException.ThrowIfNull(rowJsonOptions); + + _rowJsonOptions = rowJsonOptions; + _existingOuter = existingZipDestination; + _missingOuter = missingZipDestination; + _existingZip = new ZipArchive(_existingOuter, ZipArchiveMode.Create, leaveOpen: true); + _missingZip = new ZipArchive(_missingOuter, ZipArchiveMode.Create, leaveOpen: true); + _existingEntryStream = _existingZip.CreateEntry("existing.json", CompressionLevel.Optimal).Open(); + _missingEntryStream = _missingZip.CreateEntry("missing.json", CompressionLevel.Optimal).Open(); + _existingEntryStream.WriteByte((byte)'['); + _missingEntryStream.WriteByte((byte)'['); + } + + /// + /// Appends one object to the existing-target array: change operation, primary key map, and full target column map. + /// + public async Task WriteExistingRowAsync( + string changeOperation, + IReadOnlyDictionary primaryKey, + IReadOnlyDictionary targetColumns, + CancellationToken cancellationToken + ) + { + ArgumentException.ThrowIfNullOrWhiteSpace(changeOperation); + ArgumentNullException.ThrowIfNull(primaryKey); + ArgumentNullException.ThrowIfNull(targetColumns); + + if (_existingFirst) + { + _existingFirst = false; + _existingEntryStream.Write("\n "u8); + } + else + { + _existingEntryStream.Write(",\n "u8); + } + + var row = new TargetPresenceExistingRow(changeOperation, primaryKey, targetColumns); + await JsonSerializer + .SerializeAsync(_existingEntryStream, row, _rowJsonOptions, cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Appends one object to the missing-on-target array: change operation and primary key map only. + /// + public async Task WriteMissingRowAsync( + string changeOperation, + IReadOnlyDictionary primaryKey, + CancellationToken cancellationToken + ) + { + ArgumentException.ThrowIfNullOrWhiteSpace(changeOperation); + ArgumentNullException.ThrowIfNull(primaryKey); + + if (_missingFirst) + { + _missingFirst = false; + _missingEntryStream.Write("\n "u8); + } + else + { + _missingEntryStream.Write(",\n "u8); + } + + var row = new TargetPresenceMissingRow(changeOperation, primaryKey); + await JsonSerializer + .SerializeAsync(_missingEntryStream, row, _rowJsonOptions, cancellationToken) + .ConfigureAwait(false); + } + + /// + public async ValueTask DisposeAsync() + { + _existingEntryStream.Write("\n]"u8); + _missingEntryStream.Write("\n]"u8); + await _existingEntryStream.DisposeAsync().ConfigureAwait(false); + await _missingEntryStream.DisposeAsync().ConfigureAwait(false); + _existingZip.Dispose(); + _missingZip.Dispose(); + await _existingOuter.FlushAsync(CancellationToken.None).ConfigureAwait(false); + await _missingOuter.FlushAsync(CancellationToken.None).ConfigureAwait(false); + } + + private sealed record TargetPresenceExistingRow( + string ChangeOperation, + IReadOnlyDictionary PrimaryKey, + IReadOnlyDictionary Target + ); + + private sealed record TargetPresenceMissingRow( + string ChangeOperation, + IReadOnlyDictionary PrimaryKey + ); +}