diff --git a/README.md b/README.md
index 3148c5f..42edbd0 100644
--- a/README.md
+++ b/README.md
@@ -63,7 +63,7 @@ The function is configured through Azure App Settings / Environment variables, y
## Schema tracking export
-Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`.
+Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`. Completed jobs also include `response/existing.zip` and `response/missing.zip`: each tracked change (I/U/D + primary key) is classified by whether that key exists on the **target** database—existing rows include full **target** column values; missing rows list only operation and primary keys. `result.json` exposes SAS URIs for those ZIPs (`existingZipSasUri`, `missingZipSasUri`) alongside the three segment ZIPs.
HTTP routes (Function authorization level):
diff --git a/src/SqlBulkSyncFunction/Constants.cs b/src/SqlBulkSyncFunction/Constants.cs
index e5268fd..1dbdd6f 100644
--- a/src/SqlBulkSyncFunction/Constants.cs
+++ b/src/SqlBulkSyncFunction/Constants.cs
@@ -71,6 +71,21 @@ public static class Queues
/// Deleted segment only: correlation id when segment processing throws.
///
public const string ExportJobDeletedError = ExportJobDeleted + "-error";
+
+ ///
+ /// Queue for the worker that writes existing.zip and missing.zip (target row present vs absent for each tracked change).
+ ///
+ public const string ExportJobTargetPresence = "exportjob-target-presence";
+
+ ///
+ /// Signaled when both target-presence ZIPs have been written for an export job.
+ ///
+ public const string ExportJobTargetPresenceDone = ExportJobTargetPresence + "-done";
+
+ ///
+ /// Target-presence segment: correlation id when processing throws.
+ ///
+ public const string ExportJobTargetPresenceError = ExportJobTargetPresence + "-error";
}
///
diff --git a/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs b/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs
index 8ebc2c2..873634c 100644
--- a/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs
+++ b/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs
@@ -19,7 +19,7 @@ SchemaTrackingExportService schemaTrackingExportService
)
{
///
- /// Fans out a new export job to the three segment queues.
+ /// Fans out a new export job to the segment queues (updated, inserted, deleted, target presence).
///
[Function(nameof(ProcessExportJobQueues) + nameof(DispatchExportJob))]
public async Task DispatchExportJob(
@@ -64,6 +64,15 @@ public Task ProcessExportDeleted(
CancellationToken cancellationToken
) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken);
+ ///
+ /// Builds existing.zip and missing.zip (target row present vs absent for each tracked change).
+ ///
+ [Function(nameof(ProcessExportJobQueues) + nameof(ProcessExportTargetPresence))]
+ public Task ProcessExportTargetPresence(
+ [QueueTrigger(Constants.Queues.ExportJobTargetPresence)] string correlationId,
+ CancellationToken cancellationToken
+ ) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.TargetPresence, cancellationToken);
+
///
/// Records completion of the updated segment and finalizes the job when all segments are done.
///
@@ -91,6 +100,15 @@ public Task OnExportDeletedDone(
CancellationToken cancellationToken
) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken);
+ ///
+ /// Records completion of the target-presence segment and finalizes the job when all segments are done.
+ ///
+ [Function(nameof(ProcessExportJobQueues) + nameof(OnExportTargetPresenceDone))]
+ public Task OnExportTargetPresenceDone(
+ [QueueTrigger(Constants.Queues.ExportJobTargetPresenceDone)] string correlationId,
+ CancellationToken cancellationToken
+ ) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.TargetPresence, cancellationToken);
+
private async Task ProcessSegmentAsync(
string correlationId,
SchemaTrackingExportSegment segment,
@@ -147,6 +165,7 @@ CancellationToken cancellationToken
SchemaTrackingExportSegment.Updated => Constants.Queues.ExportJobUpdatedError,
SchemaTrackingExportSegment.Inserted => Constants.Queues.ExportJobInsertedError,
SchemaTrackingExportSegment.Deleted => Constants.Queues.ExportJobDeletedError,
+ SchemaTrackingExportSegment.TargetPresence => Constants.Queues.ExportJobTargetPresenceError,
_ => throw new ArgumentOutOfRangeException(nameof(segment), segment, null)
};
diff --git a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs
index 0611105..212c000 100644
--- a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs
+++ b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs
@@ -1,4 +1,5 @@
using System;
+using System.Globalization;
using System.Linq;
using SqlBulkSyncFunction.Models.Schema;
using SqlBulkSyncFunction.Models.Schema.Export;
@@ -87,6 +88,37 @@ FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct
""";
}
+ ///
+ /// Builds a query that returns every change in the version range with SYS_CHANGE_OPERATION as changeOperation (I/U/D)
+ /// and primary key columns aliased as pk_c0..pk_cN (same order as ).
+ ///
+ /// Fully qualified source table name.
+ /// Primary key columns in a stable order.
+ /// Parameterized SQL using @FromVersion.
+ public static string GetChangeTrackingExportChangeOperationAndPrimaryKeysSelectStatement(
+ string sourceTableName,
+ Column[] primaryKeyColumns
+ )
+ {
+ if (primaryKeyColumns.Length == 0)
+ {
+ throw new InvalidOperationException($"Missing primary key columns for table {sourceTableName}.");
+ }
+
+ var pkAliases = string.Join(
+ ",\r\n ",
+ primaryKeyColumns.Select(
+ (column, index) => string.Concat("ct.", column.QuoteName, " AS [pk_c", index.ToString(CultureInfo.InvariantCulture), "]")
+ )
+ );
+
+ return $"""
+ SELECT CAST(ct.SYS_CHANGE_OPERATION AS NVARCHAR(1)) AS [changeOperation],
+ {pkAliases}
+ FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct
+ """;
+ }
+
///
/// Builds a query for one export segment: insert/update rows join the live table for full column values;
/// delete rows return primary key columns from CHANGETABLE only.
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs
index f7791cd..1a7d555 100644
--- a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs
@@ -68,6 +68,11 @@ public sealed class ExportJobTableEntity : ITableEntity
///
public bool DeletedDone { get; set; }
+ ///
+ /// Whether the combined target-presence segment (existing.zip and missing.zip) finished successfully.
+ ///
+ public bool TargetPresenceDone { get; set; }
+
///
/// UTC time when the job was accepted.
///
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs
index 0e78b46..08266bf 100644
--- a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs
@@ -23,6 +23,7 @@ public static SchemaTrackingExportJobListItem ToListItem(ExportJobTableEntity en
UpdatedDone: entity.UpdatedDone,
InsertedDone: entity.InsertedDone,
DeletedDone: entity.DeletedDone,
+ TargetPresenceDone: entity.TargetPresenceDone,
Error: entity.Error,
Result: result
);
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs
index fc2a011..36b9252 100644
--- a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs
@@ -16,6 +16,7 @@ namespace SqlBulkSyncFunction.Models.Schema.Export;
/// Whether the updated ZIP leg completed.
/// Whether the inserted ZIP leg completed.
/// Whether the deleted ZIP leg completed.
+/// Whether the target-presence ZIP pair (existing.zip / missing.zip) completed.
/// Optional error message when status is failed.
/// Populated for single-job status when response/result.json exists; otherwise .
public record SchemaTrackingExportJobListItem(
@@ -28,6 +29,7 @@ public record SchemaTrackingExportJobListItem(
bool UpdatedDone,
bool InsertedDone,
bool DeletedDone,
+ bool TargetPresenceDone,
string? Error,
SchemaTrackingExportJobListItemResult? Result = null
);
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs
index fefa0d6..160cc2e 100644
--- a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs
@@ -12,11 +12,15 @@ namespace SqlBulkSyncFunction.Models.Schema.Export;
/// Read SAS URI for updated.zip.
/// Read SAS URI for inserted.zip.
/// Read SAS URI for deleted.zip.
+/// Read SAS URI for existing.zip; null when absent from stored result.
+/// Read SAS URI for missing.zip; null when absent from stored result.
public record SchemaTrackingExportJobListItemResult(
DateTimeOffset SasExpires,
Uri UpdatedZipSasUri,
Uri InsertedZipSasUri,
- Uri DeletedZipSasUri
+ Uri DeletedZipSasUri,
+ Uri? ExistingZipSasUri,
+ Uri? MissingZipSasUri
)
{
///
@@ -31,6 +35,8 @@ Uri DeletedZipSasUri
jobResult.SasExpires,
jobResult.UpdatedZipSasUri,
jobResult.InsertedZipSasUri,
- jobResult.DeletedZipSasUri
+ jobResult.DeletedZipSasUri,
+ jobResult.ExistingZipSasUri,
+ jobResult.MissingZipSasUri
);
}
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs
index ce330ca..76e783f 100644
--- a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs
@@ -21,6 +21,8 @@ namespace SqlBulkSyncFunction.Models.Schema.Export;
/// Read SAS URI for updated.zip.
/// Read SAS URI for inserted.zip.
/// Read SAS URI for deleted.zip.
+/// Read SAS URI for existing.zip (changes whose PK exists on target, including target column values); null for results written before this feature.
+/// Read SAS URI for missing.zip (changes whose PK does not exist on target); null for results written before this feature.
public record SchemaTrackingExportJobResult(
string Area,
string Id,
@@ -35,5 +37,7 @@ public record SchemaTrackingExportJobResult(
DateTimeOffset SasExpires,
Uri UpdatedZipSasUri,
Uri InsertedZipSasUri,
- Uri DeletedZipSasUri
+ Uri DeletedZipSasUri,
+ Uri? ExistingZipSasUri = null,
+ Uri? MissingZipSasUri = null
);
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs
index 083b182..3181655 100644
--- a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs
@@ -20,5 +20,10 @@ public enum SchemaTrackingExportSegment
///
/// Rows with SYS_CHANGE_OPERATION = N'D' (primary key columns only).
///
- Deleted = 2
+ Deleted = 2,
+
+ ///
+ /// Single worker pass: writes existing.zip and missing.zip by comparing each change (I/U/D + PK) to target row existence (cross-connection safe).
+ ///
+ TargetPresence = 3
}
diff --git a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs
index 61611cf..8ae018f 100644
--- a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs
+++ b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Data;
+using System.Globalization;
using System.Linq;
using System.Text.Json;
using System.Threading;
@@ -16,6 +17,7 @@
using Microsoft.Extensions.Options;
using SqlBulkSyncFunction.Helpers;
using SqlBulkSyncFunction.Models.Job;
+using SqlBulkSyncFunction.Models.Schema;
using SqlBulkSyncFunction.Models.Schema.Export;
namespace SqlBulkSyncFunction.Services;
@@ -41,6 +43,15 @@ TableServiceClient tableServiceClient
WriteIndented = false
};
+ private static readonly JsonSerializerOptions TargetPresenceRowJsonOptions = new()
+ {
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
+ DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull,
+ WriteIndented = false
+ };
+
+ private const int TargetPresenceBatchSize = 200;
+
private readonly BlobContainerClient _exportContainer = GetOrCreateBlobContainer(blobServiceClient, Constants.Containers.Export);
private readonly TableClient _exportJobsTable = GetOrCreateTable(tableServiceClient, Constants.Tables.ExportJobs);
@@ -119,7 +130,8 @@ CancellationToken cancellationToken
CreatedUtc = utcNow,
UpdatedDone = false,
InsertedDone = false,
- DeletedDone = false
+ DeletedDone = false,
+ TargetPresenceDone = false
};
_ = await _exportJobsTable.UpsertEntityAsync(entity, TableUpdateMode.Replace, cancellationToken).ConfigureAwait(false);
@@ -211,6 +223,7 @@ CancellationToken cancellationToken
UpdatedDone: false,
InsertedDone: false,
DeletedDone: false,
+ TargetPresenceDone: false,
Error: null,
Result: listResult
);
@@ -245,7 +258,7 @@ CancellationToken cancellationToken
}
///
- /// Dispatches a main-queue message to the three segment queues.
+ /// Dispatches a main-queue message to the segment queues (updated, inserted, deleted, target presence).
///
public async Task DispatchExportJobAsync(string correlationId, CancellationToken cancellationToken)
{
@@ -269,11 +282,13 @@ await PatchEntityAsync(
var qUpdated = GetOrCreateQueue(Constants.Queues.ExportJobUpdated);
var qInserted = GetOrCreateQueue(Constants.Queues.ExportJobInserted);
var qDeleted = GetOrCreateQueue(Constants.Queues.ExportJobDeleted);
+ var qTargetPresence = GetOrCreateQueue(Constants.Queues.ExportJobTargetPresence);
// Do not pass visibilityTimeout on enqueue: non-zero values hide the message from Dequeue/Peek until the timeout elapses (scheduled/delayed work), not a processing lease.
_ = await qUpdated.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false);
_ = await qInserted.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false);
_ = await qDeleted.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false);
+ _ = await qTargetPresence.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false);
}
///
@@ -336,31 +351,46 @@ CancellationToken cancellationToken
var targetVersion = targetConn.GetTargetVersion(table.Target);
var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion;
- var sql = SqlStatementExtensions.GetChangeTrackingExportSegmentSelectStatement(table.Source, columns, segment);
-
- await using var cmd = new SqlCommand(sql, sourceConn)
+ if (segment == SchemaTrackingExportSegment.TargetPresence)
{
- CommandTimeout = 3600
- };
- _ = cmd.Parameters.Add(new SqlParameter("@FromVersion", SqlDbType.BigInt) { Value = fromVersion });
+ await ProcessTargetPresenceSegmentAsync(
+ normalized,
+ table,
+ columns,
+ sourceConn,
+ targetConn,
+ fromVersion,
+ cancellationToken
+ ).ConfigureAwait(false);
+ }
+ else
+ {
+ var sql = SqlStatementExtensions.GetChangeTrackingExportSegmentSelectStatement(table.Source, columns, segment);
- await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken)
- .ConfigureAwait(false);
+ await using var cmd = new SqlCommand(sql, sourceConn)
+ {
+ CommandTimeout = 3600
+ };
+ _ = cmd.Parameters.Add(new SqlParameter("@FromVersion", SqlDbType.BigInt) { Value = fromVersion });
- var (zipPath, jsonPath) = GetZipRelativePath(segment);
- var zipBlob = _exportContainer.GetBlobClient($"{normalized}/{zipPath}");
- await using var uploadStream = await zipBlob.OpenWriteAsync(
- true,
- new BlobOpenWriteOptions
- {
- HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip }
- },
- cancellationToken
- )
- .ConfigureAwait(false);
+ await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken)
+ .ConfigureAwait(false);
- await SchemaTrackingExportStreamingZip.WriteReaderToZipAsync(reader, uploadStream, jsonPath, cancellationToken)
- .ConfigureAwait(false);
+ var (zipPath, jsonPath) = GetZipRelativePath(segment);
+ var zipBlob = _exportContainer.GetBlobClient($"{normalized}/{zipPath}");
+ await using var uploadStream = await zipBlob.OpenWriteAsync(
+ true,
+ new BlobOpenWriteOptions
+ {
+ HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip }
+ },
+ cancellationToken
+ )
+ .ConfigureAwait(false);
+
+ await SchemaTrackingExportStreamingZip.WriteReaderToZipAsync(reader, uploadStream, jsonPath, cancellationToken)
+ .ConfigureAwait(false);
+ }
var doneQueueName = GetDoneQueueName(segment);
var doneQueue = GetOrCreateQueue(doneQueueName);
@@ -454,7 +484,7 @@ private async Task TryFinalizeIfCompleteAsync(
CancellationToken cancellationToken
)
{
- if (!entity.UpdatedDone || !entity.InsertedDone || !entity.DeletedDone)
+ if (!entity.UpdatedDone || !entity.InsertedDone || !entity.DeletedDone || !entity.TargetPresenceDone)
{
return;
}
@@ -470,10 +500,14 @@ CancellationToken cancellationToken
var updatedZip = _exportContainer.GetBlobClient($"{correlationId}/response/updated.zip");
var insertedZip = _exportContainer.GetBlobClient($"{correlationId}/response/inserted.zip");
var deletedZip = _exportContainer.GetBlobClient($"{correlationId}/response/deleted.zip");
+ var existingZip = _exportContainer.GetBlobClient($"{correlationId}/response/existing.zip");
+ var missingZip = _exportContainer.GetBlobClient($"{correlationId}/response/missing.zip");
if (!await updatedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) ||
!await insertedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) ||
- !await deletedZip.ExistsAsync(cancellationToken).ConfigureAwait(false))
+ !await deletedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) ||
+ !await existingZip.ExistsAsync(cancellationToken).ConfigureAwait(false) ||
+ !await missingZip.ExistsAsync(cancellationToken).ConfigureAwait(false))
{
logger.LogWarning("Finalize: missing one or more ZIP blobs for {CorrelationId}", correlationId);
return;
@@ -483,6 +517,8 @@ CancellationToken cancellationToken
var updatedUri = GenerateReadSasUri(updatedZip, sasExpires);
var insertedUri = GenerateReadSasUri(insertedZip, sasExpires);
var deletedUri = GenerateReadSasUri(deletedZip, sasExpires);
+ var existingUri = GenerateReadSasUri(existingZip, sasExpires);
+ var missingUri = GenerateReadSasUri(missingZip, sasExpires);
var completed = DateTimeOffset.UtcNow;
var result = new SchemaTrackingExportJobResult(
@@ -499,7 +535,9 @@ CancellationToken cancellationToken
SasExpires: sasExpires,
UpdatedZipSasUri: updatedUri,
InsertedZipSasUri: insertedUri,
- DeletedZipSasUri: deletedUri
+ DeletedZipSasUri: deletedUri,
+ ExistingZipSasUri: existingUri,
+ MissingZipSasUri: missingUri
);
try
@@ -551,6 +589,7 @@ private async Task MarkTableCompletedAsync(SchemaTrackingExportJob job, Cancella
entity.UpdatedDone = true;
entity.InsertedDone = true;
entity.DeletedDone = true;
+ entity.TargetPresenceDone = true;
try
{
@@ -657,6 +696,7 @@ private static bool IsLegDone(ExportJobTableEntity entity, SchemaTrackingExportS
SchemaTrackingExportSegment.Updated => entity.UpdatedDone,
SchemaTrackingExportSegment.Inserted => entity.InsertedDone,
SchemaTrackingExportSegment.Deleted => entity.DeletedDone,
+ SchemaTrackingExportSegment.TargetPresence => entity.TargetPresenceDone,
_ => false
};
@@ -673,6 +713,9 @@ private static void SetLegDone(ExportJobTableEntity entity, SchemaTrackingExport
case SchemaTrackingExportSegment.Deleted:
entity.DeletedDone = value;
break;
+ case SchemaTrackingExportSegment.TargetPresence:
+ entity.TargetPresenceDone = value;
+ break;
default:
throw new ArgumentOutOfRangeException(nameof(segment));
}
@@ -684,6 +727,7 @@ private static string GetDoneQueueName(SchemaTrackingExportSegment segment)
SchemaTrackingExportSegment.Updated => Constants.Queues.ExportJobUpdatedDone,
SchemaTrackingExportSegment.Inserted => Constants.Queues.ExportJobInsertedDone,
SchemaTrackingExportSegment.Deleted => Constants.Queues.ExportJobDeletedDone,
+ SchemaTrackingExportSegment.TargetPresence => Constants.Queues.ExportJobTargetPresenceDone,
_ => throw new ArgumentOutOfRangeException(nameof(segment))
};
@@ -698,9 +742,398 @@ private static (string ZipPath, string JsonPath) GetZipRelativePath(SchemaTracki
SchemaTrackingExportSegment.Updated => ("response/updated.zip", "updated.json"),
SchemaTrackingExportSegment.Inserted => ("response/inserted.zip", "inserted.json"),
SchemaTrackingExportSegment.Deleted => ("response/deleted.zip", "deleted.json"),
+ SchemaTrackingExportSegment.TargetPresence => throw new InvalidOperationException("Target presence uses paired ZIP writers, not a single segment path."),
_ => throw new ArgumentOutOfRangeException(nameof(segment))
};
+ ///
+ /// Streams all change rows (I/U/D + PK) from the source, resolves target rows in batches on the target connection, and writes existing.zip / missing.zip.
+ ///
+ private async Task ProcessTargetPresenceSegmentAsync(
+ string normalized,
+ SyncJobTable table,
+ Column[] sourceColumns,
+ SqlConnection sourceConn,
+ SqlConnection targetConn,
+ long fromVersion,
+ CancellationToken cancellationToken
+ )
+ {
+ var pkColumns = sourceColumns.Where(static c => c.IsPrimary).ToArray();
+ if (pkColumns.Length == 0)
+ {
+ throw new InvalidOperationException("Target presence export requires at least one primary key column.");
+ }
+
+ var targetColumns = targetConn.GetColumns(table.Target);
+ var sql = SqlStatementExtensions.GetChangeTrackingExportChangeOperationAndPrimaryKeysSelectStatement(
+ table.Source,
+ pkColumns
+ );
+
+ await using var ctCmd = new SqlCommand(sql, sourceConn) { CommandTimeout = 3600 };
+ _ = ctCmd.Parameters.Add(new SqlParameter("@FromVersion", SqlDbType.BigInt) { Value = fromVersion });
+
+ await using var ctReader = await ctCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken)
+ .ConfigureAwait(false);
+
+ var existingBlob = _exportContainer.GetBlobClient($"{normalized}/response/existing.zip");
+ var missingBlob = _exportContainer.GetBlobClient($"{normalized}/response/missing.zip");
+ await using var existingUpload = await existingBlob.OpenWriteAsync(
+ true,
+ new BlobOpenWriteOptions { HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip } },
+ cancellationToken
+ )
+ .ConfigureAwait(false);
+ await using var missingUpload = await missingBlob.OpenWriteAsync(
+ true,
+ new BlobOpenWriteOptions { HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip } },
+ cancellationToken
+ )
+ .ConfigureAwait(false);
+
+ await using var zipWriter = new SchemaTrackingExportTargetPresenceZipWriter(
+ existingUpload,
+ missingUpload,
+ TargetPresenceRowJsonOptions
+ );
+
+ var changeOpOrdinal = ctReader.GetOrdinal("changeOperation");
+ var pkOrdinals = new int[pkColumns.Length];
+ for (var i = 0; i < pkColumns.Length; i++)
+ {
+ pkOrdinals[i] = ctReader.GetOrdinal(FormattableString.Invariant($"pk_c{i}"));
+ }
+
+ var batch = new List<(string Op, Dictionary Pk)>(TargetPresenceBatchSize);
+
+ async Task FlushBatchAsync()
+ {
+ if (batch.Count == 0)
+ {
+ return;
+ }
+
+ await WriteTargetPresenceBatchFromJoinedQueryAsync(
+ targetConn,
+ table.Target,
+ targetColumns,
+ pkColumns,
+ batch,
+ zipWriter,
+ cancellationToken
+ ).ConfigureAwait(false);
+
+ batch.Clear();
+ }
+
+ while (await ctReader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ {
+ var op = ctReader.GetString(changeOpOrdinal);
+ if (op.Length != 1)
+ {
+ op = op.Trim();
+ }
+
+ var pk = new Dictionary(StringComparer.OrdinalIgnoreCase);
+ for (var i = 0; i < pkColumns.Length; i++)
+ {
+ var ord = pkOrdinals[i];
+ pk[pkColumns[i].Name] = ctReader.IsDBNull(ord) ? null : ctReader.GetValue(ord);
+ }
+
+ batch.Add((op, pk));
+ if (batch.Count >= TargetPresenceBatchSize)
+ {
+ await FlushBatchAsync().ConfigureAwait(false);
+ }
+ }
+
+ await FlushBatchAsync().ConfigureAwait(false);
+ }
+
+ ///
+ /// Stages changeOperation and PKs in a session temp table, then runs one batch returning two result sets:
+ /// existing rows (INNER JOIN target) and missing rows (LEFT OUTER JOIN with WHERE t.[firstPk] IS NULL).
+ ///
+ private static async Task WriteTargetPresenceBatchFromJoinedQueryAsync(
+ SqlConnection targetConn,
+ string targetTableName,
+ Column[] targetColumns,
+ Column[] pkColumns,
+ IReadOnlyList<(string Op, Dictionary Pk)> batch,
+ SchemaTrackingExportTargetPresenceZipWriter zipWriter,
+ CancellationToken cancellationToken
+ )
+ {
+ if (batch.Count == 0)
+ {
+ return;
+ }
+
+ const string tempTableName = "#ExportPkBatch";
+
+ var joinClause = string.Join(
+ " AND ",
+ pkColumns.Select(column => string.Concat("t.", column.QuoteName, " = p.", column.QuoteName))
+ );
+
+ var firstPkOnTarget = pkColumns[0];
+
+ var createPkCols = string.Join(
+ ",\n ",
+ pkColumns.Select(
+ pkCol =>
+ {
+ var tc = ResolvePkColumnForTarget(pkCol, targetColumns);
+ var nullability = tc.IsNullable ? "NULL" : "NOT NULL";
+ return string.Concat(tc.QuoteName, " ", tc.Type, " ", nullability);
+ }
+ )
+ );
+
+ var createSql = FormattableString.Invariant(
+ $"""
+ CREATE TABLE {tempTableName} (
+ [changeOperation] NCHAR(1) NOT NULL,
+ {createPkCols}
+ );
+ """);
+
+ var targetSelectList = string.Join(
+ ",\n ",
+ targetColumns.Select(column => string.Concat("t.", column.QuoteName, " AS ", column.QuoteName))
+ );
+
+ var existingSql = FormattableString.Invariant(
+ $"""
+ SELECT p.[changeOperation],
+ {targetSelectList}
+ FROM {tempTableName} AS p
+ INNER JOIN {targetTableName} AS t ON {joinClause}
+ """);
+
+ var pkSelectList = string.Join(
+ ",\n ",
+ pkColumns.Select(column => string.Concat("p.", column.QuoteName, " AS ", column.QuoteName))
+ );
+
+ var missingSql = FormattableString.Invariant(
+ $"""
+ SELECT p.[changeOperation],
+ {pkSelectList}
+ FROM {tempTableName} AS p
+ LEFT OUTER JOIN {targetTableName} AS t ON {joinClause}
+ WHERE t.{firstPkOnTarget.QuoteName} IS NULL
+ """);
+
+ var queryBatch = FormattableString.Invariant(
+ $"""
+ {existingSql}
+ ;
+ {missingSql}
+ """);
+
+ try
+ {
+ await using (var createCmd = new SqlCommand(createSql, targetConn) { CommandTimeout = 3600 })
+ {
+ _ = await createCmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ using var dataTable = BuildTargetPresenceStagingDataTable(pkColumns, targetColumns, batch);
+ using var bulk = new SqlBulkCopy(targetConn, SqlBulkCopyOptions.Default, externalTransaction: null)
+ {
+ DestinationTableName = tempTableName,
+ BulkCopyTimeout = 3600
+ };
+ foreach (DataColumn col in dataTable.Columns)
+ {
+ bulk.ColumnMappings.Add(col.ColumnName, col.ColumnName);
+ }
+
+ await bulk.WriteToServerAsync(dataTable, cancellationToken).ConfigureAwait(false);
+
+ await using var readCmd = new SqlCommand(queryBatch, targetConn) { CommandTimeout = 3600 };
+ await using var reader = await readCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken)
+ .ConfigureAwait(false);
+
+ var changeOpOrdinal = reader.GetOrdinal("changeOperation");
+ var targetOrdinals = new Dictionary(StringComparer.OrdinalIgnoreCase);
+ foreach (var col in targetColumns)
+ {
+ targetOrdinals[col.Name] = reader.GetOrdinal(col.Name);
+ }
+
+ while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ {
+ var op = reader.GetString(changeOpOrdinal);
+ if (op.Length != 1)
+ {
+ op = op.Trim();
+ }
+
+ var targetRow = new Dictionary(StringComparer.OrdinalIgnoreCase);
+ foreach (var col in targetColumns)
+ {
+ var ord = targetOrdinals[col.Name];
+ targetRow[col.Name] = reader.IsDBNull(ord) ? null : reader.GetValue(ord);
+ }
+
+ var pkRow = new Dictionary(StringComparer.OrdinalIgnoreCase);
+ foreach (var col in pkColumns)
+ {
+ _ = targetRow.TryGetValue(col.Name, out var v);
+ pkRow[col.Name] = v;
+ }
+
+ await zipWriter.WriteExistingRowAsync(op, pkRow, targetRow, cancellationToken).ConfigureAwait(false);
+ }
+
+ if (!await reader.NextResultAsync(cancellationToken).ConfigureAwait(false))
+ {
+ throw new InvalidOperationException("Expected second result set (missing rows) from target-presence batch query.");
+ }
+
+ var missingChangeOpOrdinal = reader.GetOrdinal("changeOperation");
+ var missingPkOrdinals = new int[pkColumns.Length];
+ for (var i = 0; i < pkColumns.Length; i++)
+ {
+ missingPkOrdinals[i] = reader.GetOrdinal(pkColumns[i].Name);
+ }
+
+ while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ {
+ var op = reader.GetString(missingChangeOpOrdinal);
+ if (op.Length != 1)
+ {
+ op = op.Trim();
+ }
+
+ var pkRow = new Dictionary(StringComparer.OrdinalIgnoreCase);
+ for (var i = 0; i < pkColumns.Length; i++)
+ {
+ var ord = missingPkOrdinals[i];
+ pkRow[pkColumns[i].Name] = reader.IsDBNull(ord) ? null : reader.GetValue(ord);
+ }
+
+ await zipWriter.WriteMissingRowAsync(op, pkRow, cancellationToken).ConfigureAwait(false);
+ }
+ }
+ finally
+ {
+ try
+ {
+ await using var dropCmd = new SqlCommand(
+ FormattableString.Invariant($"DROP TABLE IF EXISTS {tempTableName};"),
+ targetConn
+ )
+ {
+ CommandTimeout = 60
+ };
+ _ = await dropCmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (SqlException)
+ {
+ // Best-effort cleanup; connection may already be aborting.
+ }
+ }
+ }
+
+ private static Column ResolvePkColumnForTarget(Column pkColumn, Column[] targetColumns)
+ => targetColumns.FirstOrDefault(
+ t => string.Equals(t.Name, pkColumn.Name, StringComparison.OrdinalIgnoreCase)
+ ) ?? pkColumn;
+
+ private static DataTable BuildTargetPresenceStagingDataTable(
+ Column[] pkColumns,
+ Column[] targetColumns,
+ IReadOnlyList<(string Op, Dictionary Pk)> batch
+ )
+ {
+ var dt = new DataTable();
+ _ = dt.Columns.Add("changeOperation", typeof(string));
+
+ foreach (var pkCol in pkColumns)
+ {
+ var tc = ResolvePkColumnForTarget(pkCol, targetColumns);
+ var clrType = GetClrTypeForSqlType(tc.Type);
+ var dc = dt.Columns.Add(pkCol.Name, clrType);
+ dc.AllowDBNull = tc.IsNullable;
+ }
+
+ foreach (var (op, pkRow) in batch)
+ {
+ var dr = dt.NewRow();
+ var opTrimmed = string.IsNullOrWhiteSpace(op) ? "?" : op.Trim();
+ dr["changeOperation"] = opTrimmed.Length > 0 ? opTrimmed[..1] : "?";
+ foreach (var pkCol in pkColumns)
+ {
+ _ = pkRow.TryGetValue(pkCol.Name, out var raw);
+ dr[pkCol.Name] = NormalizeValueForDataColumn(raw, dt.Columns[pkCol.Name]!.DataType);
+ }
+
+ dt.Rows.Add(dr);
+ }
+
+ return dt;
+ }
+
+ private static object NormalizeValueForDataColumn(object? raw, Type columnClrType)
+ {
+ if (raw is null or DBNull)
+ {
+ return DBNull.Value;
+ }
+
+ if (columnClrType.IsInstanceOfType(raw))
+ {
+ return raw;
+ }
+
+ try
+ {
+ return Convert.ChangeType(raw, columnClrType, CultureInfo.InvariantCulture);
+ }
+ catch (InvalidCastException)
+ {
+ return raw;
+ }
+ catch (FormatException)
+ {
+ return raw;
+ }
+ catch (OverflowException)
+ {
+ return raw;
+ }
+ }
+
+ private static Type GetClrTypeForSqlType(string sqlType)
+ {
+ var trimmed = sqlType.Trim();
+ var paren = trimmed.IndexOf('(', StringComparison.Ordinal);
+ var baseName = (paren >= 0 ? trimmed[..paren] : trimmed).Trim().ToLowerInvariant();
+ return baseName switch
+ {
+ "bigint" => typeof(long),
+ "int" => typeof(int),
+ "smallint" => typeof(short),
+ "tinyint" => typeof(byte),
+ "bit" => typeof(bool),
+ "uniqueidentifier" => typeof(Guid),
+ "float" => typeof(double),
+ "real" => typeof(float),
+ "decimal" or "numeric" => typeof(decimal),
+ "money" or "smallmoney" => typeof(decimal),
+ "date" or "datetime" or "datetime2" or "smalldatetime" => typeof(DateTime),
+ "datetimeoffset" => typeof(DateTimeOffset),
+ "time" => typeof(TimeSpan),
+ "sql_variant" => typeof(object),
+ _ => typeof(string)
+ };
+ }
+
private async Task ReadJobBlobAsync(string correlationId, CancellationToken cancellationToken)
{
var blob = _exportContainer.GetBlobClient($"{correlationId}/job.json");
diff --git a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportTargetPresenceZipWriter.cs b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportTargetPresenceZipWriter.cs
new file mode 100644
index 0000000..f74825c
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportTargetPresenceZipWriter.cs
@@ -0,0 +1,137 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.IO.Compression;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace SqlBulkSyncFunction.Services;
+
+#nullable enable
+
+///
+/// Opens two ZIP archives and streams UTF-8 JSON arrays for export rows that exist vs do not exist on the sync target.
+/// Disposing finalizes ZIP entries and archives but leaves outer streams open for the caller to flush and dispose.
+///
+internal sealed class SchemaTrackingExportTargetPresenceZipWriter : IAsyncDisposable
+{
+ private readonly Stream _existingOuter;
+ private readonly Stream _missingOuter;
+ private readonly ZipArchive _existingZip;
+ private readonly ZipArchive _missingZip;
+ private readonly Stream _existingEntryStream;
+ private readonly Stream _missingEntryStream;
+ private readonly JsonSerializerOptions _rowJsonOptions;
+ private bool _existingFirst = true;
+ private bool _missingFirst = true;
+
+ ///
+ /// Initializes a writer over two blob (or file) streams, each containing a single JSON-array entry (existing.json / missing.json).
+ ///
+ /// Writable stream for existing.zip.
+ /// Writable stream for missing.zip.
+ /// Serializer options per row object (compact JSON).
+ public SchemaTrackingExportTargetPresenceZipWriter(
+ Stream existingZipDestination,
+ Stream missingZipDestination,
+ JsonSerializerOptions rowJsonOptions
+ )
+ {
+ ArgumentNullException.ThrowIfNull(existingZipDestination);
+ ArgumentNullException.ThrowIfNull(missingZipDestination);
+ ArgumentNullException.ThrowIfNull(rowJsonOptions);
+
+ _rowJsonOptions = rowJsonOptions;
+ _existingOuter = existingZipDestination;
+ _missingOuter = missingZipDestination;
+ _existingZip = new ZipArchive(_existingOuter, ZipArchiveMode.Create, leaveOpen: true);
+ _missingZip = new ZipArchive(_missingOuter, ZipArchiveMode.Create, leaveOpen: true);
+ _existingEntryStream = _existingZip.CreateEntry("existing.json", CompressionLevel.Optimal).Open();
+ _missingEntryStream = _missingZip.CreateEntry("missing.json", CompressionLevel.Optimal).Open();
+ _existingEntryStream.WriteByte((byte)'[');
+ _missingEntryStream.WriteByte((byte)'[');
+ }
+
+ ///
+ /// Appends one object to the existing-target array: change operation, primary key map, and full target column map.
+ ///
+ public async Task WriteExistingRowAsync(
+ string changeOperation,
+ IReadOnlyDictionary primaryKey,
+ IReadOnlyDictionary targetColumns,
+ CancellationToken cancellationToken
+ )
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(changeOperation);
+ ArgumentNullException.ThrowIfNull(primaryKey);
+ ArgumentNullException.ThrowIfNull(targetColumns);
+
+ if (_existingFirst)
+ {
+ _existingFirst = false;
+ _existingEntryStream.Write("\n "u8);
+ }
+ else
+ {
+ _existingEntryStream.Write(",\n "u8);
+ }
+
+ var row = new TargetPresenceExistingRow(changeOperation, primaryKey, targetColumns);
+ await JsonSerializer
+ .SerializeAsync(_existingEntryStream, row, _rowJsonOptions, cancellationToken)
+ .ConfigureAwait(false);
+ }
+
+ ///
+ /// Appends one object to the missing-on-target array: change operation and primary key map only.
+ ///
+ public async Task WriteMissingRowAsync(
+ string changeOperation,
+ IReadOnlyDictionary primaryKey,
+ CancellationToken cancellationToken
+ )
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(changeOperation);
+ ArgumentNullException.ThrowIfNull(primaryKey);
+
+ if (_missingFirst)
+ {
+ _missingFirst = false;
+ _missingEntryStream.Write("\n "u8);
+ }
+ else
+ {
+ _missingEntryStream.Write(",\n "u8);
+ }
+
+ var row = new TargetPresenceMissingRow(changeOperation, primaryKey);
+ await JsonSerializer
+ .SerializeAsync(_missingEntryStream, row, _rowJsonOptions, cancellationToken)
+ .ConfigureAwait(false);
+ }
+
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ _existingEntryStream.Write("\n]"u8);
+ _missingEntryStream.Write("\n]"u8);
+ await _existingEntryStream.DisposeAsync().ConfigureAwait(false);
+ await _missingEntryStream.DisposeAsync().ConfigureAwait(false);
+ _existingZip.Dispose();
+ _missingZip.Dispose();
+ await _existingOuter.FlushAsync(CancellationToken.None).ConfigureAwait(false);
+ await _missingOuter.FlushAsync(CancellationToken.None).ConfigureAwait(false);
+ }
+
+ private sealed record TargetPresenceExistingRow(
+ string ChangeOperation,
+ IReadOnlyDictionary PrimaryKey,
+ IReadOnlyDictionary Target
+ );
+
+ private sealed record TargetPresenceMissingRow(
+ string ChangeOperation,
+ IReadOnlyDictionary PrimaryKey
+ );
+}