diff --git a/README.md b/README.md
index da32523..3148c5f 100644
--- a/README.md
+++ b/README.md
@@ -61,6 +61,15 @@ The function is configured through Azure App Settings / Environment variables, y
>
> `Custom` schedule is default if no scheduled is specified in sync job configuration.
+## Schema tracking export
+
+Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`.
+
+HTTP routes (Function authorization level):
+
+- `POST /api/config/{area}/{id}/schema/tracking/{tableId}/export` — JSON body `{ "author", "referenceId", "purpose" }`; returns `202 Accepted` with `Location` pointing at status.
+- `GET /api/config/{area}/{id}/schema/tracking/{tableId}/export/status/{*correlationId}` — same item shape as list; includes optional `result` when `response/result.json` exists.
+- `GET /api/config/{area}/{id}/schema/tracking/{tableId}/export/status` — list jobs for that table partition.
## Development resources
diff --git a/global.json b/global.json
index 5406078..380e804 100644
--- a/global.json
+++ b/global.json
@@ -1,6 +1,6 @@
{
"sdk": {
- "version": "10.0.201",
+ "version": "10.0.203",
"rollForward": "latestFeature",
"allowPrerelease": false
}
diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index 18b7254..7e31499 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -9,16 +9,17 @@
-
-
-
-
+
+
+
+
-
+
-
+
+
-
+
\ No newline at end of file
diff --git a/src/SqlBulkSyncFunction/Constants.cs b/src/SqlBulkSyncFunction/Constants.cs
index 860d692..e5268fd 100644
--- a/src/SqlBulkSyncFunction/Constants.cs
+++ b/src/SqlBulkSyncFunction/Constants.cs
@@ -21,7 +21,58 @@ public static class Queues
/// Sync progress queue name.
///
public const string SyncJobProgressQueue = "syncjobprogress";
+
+ ///
+ /// Main queue for schema tracking data export jobs (message body: full correlation id path).
+ ///
+ public const string ExportJob = "exportjob";
+
+ ///
+ /// Queue for export workers that build the updated-rows ZIP from change tracking.
+ ///
+ public const string ExportJobUpdated = "exportjob-updated";
+
+ ///
+ /// Queue for export workers that build the inserted-rows ZIP from change tracking.
+ ///
+ public const string ExportJobInserted = "exportjob-inserted";
+
+ ///
+ /// Queue for export workers that build the deleted-rows ZIP from change tracking.
+ ///
+ public const string ExportJobDeleted = "exportjob-deleted";
+
+ ///
+ /// Signaled when the updated ZIP has been written for an export job.
+ ///
+ public const string ExportJobUpdatedDone = ExportJobUpdated + "-done";
+
+ ///
+ /// Signaled when the inserted ZIP has been written for an export job.
+ ///
+ public const string ExportJobInsertedDone = ExportJobInserted + "-done";
+
+ ///
+ /// Signaled when the deleted ZIP has been written for an export job.
+ ///
+ public const string ExportJobDeletedDone = ExportJobDeleted + "-done";
+
+ ///
+ /// Updated segment only: message body is the correlation id when ProcessExportSegmentAsync throws during processing (SQL/ZIP/blob). Invalid queue bodies are rejected at the trigger with ArgumentException.ThrowIfNullOrEmpty.
+ ///
+ public const string ExportJobUpdatedError = ExportJobUpdated + "-error";
+
+ ///
+ /// Inserted segment only: correlation id when segment processing throws.
+ ///
+ public const string ExportJobInsertedError = ExportJobInserted + "-error";
+
+ ///
+ /// Deleted segment only: correlation id when segment processing throws.
+ ///
+ public const string ExportJobDeletedError = ExportJobDeleted + "-error";
}
+
///
/// NCRONTAB expressions and configuration keys for timer triggers.
///
@@ -63,6 +114,22 @@ public static class Containers
/// Blob container for per-job monitoring aggregates (written by the aggregation timer).
///
public const string Monitor = "monitor";
+
+ ///
+ /// Blob container for schema tracking export requests, jobs, response ZIPs, and result metadata.
+ ///
+ public const string Export = "export";
+ }
+
+ ///
+ /// Azure Table Storage table names used by the function app.
+ ///
+ public static class Tables
+ {
+ ///
+ /// Table storing export job state (partition: area_id_tableId, row: export job guid).
+ ///
+ public const string ExportJobs = "exportjobs";
}
///
@@ -72,5 +139,8 @@ public static class BlobContentTypes
{
/// JSON documents (UTF-8).
public const string Json = "application/json; charset=utf-8";
+
+ /// ZIP archives.
+ public const string Zip = "application/zip";
}
}
diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.SchemaTrackingExport.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.SchemaTrackingExport.cs
new file mode 100644
index 0000000..8ddc65f
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.SchemaTrackingExport.cs
@@ -0,0 +1,161 @@
+using System;
+using System.Globalization;
+using System.Linq;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Mvc;
+using Microsoft.Azure.Functions.Worker;
+using SqlBulkSyncFunction.Models.Schema.Export;
+
+namespace SqlBulkSyncFunction.Functions;
+
+#nullable enable
+
+public partial class GetSyncJobConfig
+{
+ private static readonly JsonSerializerOptions ExportJsonOptions = new()
+ {
+ PropertyNameCaseInsensitive = true,
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase
+ };
+
+ ///
+ /// Accepts a schema tracking data export request, persists metadata to blob and table storage, and enqueues processing.
+ ///
+ [Function(nameof(GetSyncJobConfig) + nameof(PostSchemaTrackingExport))]
+ public async Task PostSchemaTrackingExport(
+ [HttpTrigger(
+ AuthorizationLevel.Function,
+ "post",
+ Route = "config/{area}/{id}/schema/tracking/{tableId}/export"
+ )]
+ HttpRequest req,
+ string area,
+ string id,
+ string tableId,
+ CancellationToken cancellationToken
+ )
+ {
+ ArgumentNullException.ThrowIfNull(req);
+
+ SchemaTrackingExportRequestBody? body;
+ try
+ {
+ body = await JsonSerializer
+ .DeserializeAsync(req.Body, ExportJsonOptions, cancellationToken)
+ .ConfigureAwait(false);
+ }
+ catch (JsonException)
+ {
+ return new BadRequestObjectResult("Invalid JSON body.");
+ }
+
+ if (body == null)
+ {
+ return new BadRequestObjectResult("Body is required.");
+ }
+
+ var result = await schemaTrackingExportService
+ .TryCreateExportJobAsync(area, id, tableId, body, cancellationToken)
+ .ConfigureAwait(false);
+
+ if (result.Code == ExportJobCreateResultCode.ValidationFailed)
+ {
+ return new BadRequestObjectResult("author, referenceId, and purpose must be non-empty strings.");
+ }
+
+ if (result.Code == ExportJobCreateResultCode.NotFound || result.Job == null)
+ {
+ return new NotFoundResult();
+ }
+
+ var location = BuildExportStatusLocation(req, area, id, tableId, result.Job.CorrelationId);
+ return new AcceptedResult(location: location, value: result.Job);
+ }
+
+ ///
+ /// Returns detailed status for a single export job when is present in the path,
+ /// or lists export jobs for the table when the URL ends at .../export/status (catch-all binds empty).
+ ///
+ ///
+ /// A separate route without {*correlationId} would never win in the host: the catch-all matches the same URL with an empty remainder,
+ /// and returns null for an empty id, producing 404. List behavior is therefore handled here.
+ ///
+ [Function(nameof(GetSyncJobConfig) + nameof(GetSchemaTrackingExportStatus))]
+ public async Task GetSchemaTrackingExportStatus(
+ [HttpTrigger(
+ AuthorizationLevel.Function,
+ "get",
+ Route = "config/{area}/{id}/schema/tracking/{tableId}/export/status/{*correlationId}"
+ )]
+ HttpRequest req,
+ string area,
+ string id,
+ string tableId,
+ string correlationId,
+ CancellationToken cancellationToken
+ )
+ {
+ ArgumentNullException.ThrowIfNull(req);
+
+ if (string.IsNullOrWhiteSpace(correlationId))
+ {
+ if (string.IsNullOrWhiteSpace(area) ||
+ string.IsNullOrWhiteSpace(id) ||
+ string.IsNullOrWhiteSpace(tableId) ||
+ syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) != true ||
+ jobConfig == null ||
+ !StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig.Area) ||
+ jobConfig.Tables == null ||
+ !jobConfig.Tables.TryGetValue(tableId, out _))
+ {
+ return new NotFoundResult();
+ }
+
+ var items = await schemaTrackingExportService
+ .ListExportJobsAsync(area, id, tableId, cancellationToken)
+ .ConfigureAwait(false);
+
+ return new OkObjectResult(items);
+ }
+
+ var status = await schemaTrackingExportService
+ .TryGetExportStatusAsync(area, id, tableId, correlationId, cancellationToken)
+ .ConfigureAwait(false);
+
+ if (status == null)
+ {
+ return new NotFoundResult();
+ }
+
+ return new OkObjectResult(status);
+ }
+
+ private static string BuildExportStatusLocation(
+ HttpRequest req,
+ string area,
+ string jobId,
+ string tableId,
+ string correlationId
+ )
+ {
+ var encodedCorrelation = string.Join(
+ "/",
+ correlationId.Split('/', StringSplitOptions.RemoveEmptyEntries).Select(Uri.EscapeDataString)
+ );
+ var pathBase = req.PathBase.Value?.TrimEnd('/') ?? string.Empty;
+ var apiRoot = string.IsNullOrEmpty(pathBase) ? "/api" : pathBase;
+ var path = string.Format(
+ CultureInfo.InvariantCulture,
+ "{0}/config/{1}/{2}/schema/tracking/{3}/export/status/{4}",
+ apiRoot,
+ Uri.EscapeDataString(area),
+ Uri.EscapeDataString(jobId),
+ Uri.EscapeDataString(tableId),
+ encodedCorrelation
+ );
+ return string.Format(CultureInfo.InvariantCulture, "{0}://{1}{2}", req.Scheme, req.Host.Value, path);
+ }
+}
diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs
index 56a89f4..a53fa19 100644
--- a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs
+++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs
@@ -8,7 +8,8 @@ namespace SqlBulkSyncFunction.Functions;
public partial class GetSyncJobConfig(
ILogger logger,
IOptions syncJobsConfig,
- ITokenCacheService tokenCacheService
+ ITokenCacheService tokenCacheService,
+ SchemaTrackingExportService schemaTrackingExportService
)
{
}
diff --git a/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs b/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs
new file mode 100644
index 0000000..8ebc2c2
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Functions/ProcessExportJobQueues.cs
@@ -0,0 +1,157 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Storage.Queues;
+using Microsoft.Azure.Functions.Worker;
+using Microsoft.Extensions.Logging;
+using SqlBulkSyncFunction.Models.Schema.Export;
+using SqlBulkSyncFunction.Services;
+
+namespace SqlBulkSyncFunction.Functions;
+
+///
+/// Queue-triggered handlers for schema tracking export orchestration (dispatch, segment ZIPs, finalize).
+///
+public sealed class ProcessExportJobQueues(
+ ILogger logger,
+ QueueServiceClient queueServiceClient,
+ SchemaTrackingExportService schemaTrackingExportService
+ )
+{
+ ///
+ /// Fans out a new export job to the three segment queues.
+ ///
+ [Function(nameof(ProcessExportJobQueues) + nameof(DispatchExportJob))]
+ public async Task DispatchExportJob(
+ [QueueTrigger(Constants.Queues.ExportJob)] string correlationId,
+ CancellationToken cancellationToken
+ )
+ {
+ ArgumentException.ThrowIfNullOrEmpty(correlationId);
+
+ using (logger.BeginScope("CorrelationId={CorrelationId}", correlationId))
+ {
+ await schemaTrackingExportService
+ .DispatchExportJobAsync(correlationId, cancellationToken)
+ .ConfigureAwait(false);
+ }
+ }
+
+ ///
+ /// Builds the updated-rows ZIP for an export job.
+ ///
+ [Function(nameof(ProcessExportJobQueues) + nameof(ProcessExportUpdated))]
+ public Task ProcessExportUpdated(
+ [QueueTrigger(Constants.Queues.ExportJobUpdated)] string correlationId,
+ CancellationToken cancellationToken
+ ) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.Updated, cancellationToken);
+
+ ///
+ /// Builds the inserted-rows ZIP for an export job.
+ ///
+ [Function(nameof(ProcessExportJobQueues) + nameof(ProcessExportInserted))]
+ public Task ProcessExportInserted(
+ [QueueTrigger(Constants.Queues.ExportJobInserted)] string correlationId,
+ CancellationToken cancellationToken
+ ) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.Inserted, cancellationToken);
+
+ ///
+ /// Builds the deleted-rows ZIP (primary keys) for an export job.
+ ///
+ [Function(nameof(ProcessExportJobQueues) + nameof(ProcessExportDeleted))]
+ public Task ProcessExportDeleted(
+ [QueueTrigger(Constants.Queues.ExportJobDeleted)] string correlationId,
+ CancellationToken cancellationToken
+ ) => ProcessSegmentAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken);
+
+ ///
+ /// Records completion of the updated segment and finalizes the job when all segments are done.
+ ///
+ [Function(nameof(ProcessExportJobQueues) + nameof(OnExportUpdatedDone))]
+ public Task OnExportUpdatedDone(
+ [QueueTrigger(Constants.Queues.ExportJobUpdatedDone)] string correlationId,
+ CancellationToken cancellationToken
+ ) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.Updated, cancellationToken);
+
+ ///
+ /// Records completion of the inserted segment and finalizes the job when all segments are done.
+ ///
+ [Function(nameof(ProcessExportJobQueues) + nameof(OnExportInsertedDone))]
+ public Task OnExportInsertedDone(
+ [QueueTrigger(Constants.Queues.ExportJobInsertedDone)] string correlationId,
+ CancellationToken cancellationToken
+ ) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.Inserted, cancellationToken);
+
+ ///
+ /// Records completion of the deleted segment and finalizes the job when all segments are done.
+ ///
+ [Function(nameof(ProcessExportJobQueues) + nameof(OnExportDeletedDone))]
+ public Task OnExportDeletedDone(
+ [QueueTrigger(Constants.Queues.ExportJobDeletedDone)] string correlationId,
+ CancellationToken cancellationToken
+ ) => OnSegmentDoneAsync(correlationId, SchemaTrackingExportSegment.Deleted, cancellationToken);
+
+ private async Task ProcessSegmentAsync(
+ string correlationId,
+ SchemaTrackingExportSegment segment,
+ CancellationToken cancellationToken
+ )
+ {
+ ArgumentException.ThrowIfNullOrEmpty(correlationId);
+
+ using (logger.BeginScope("CorrelationId={CorrelationId}, Segment={Segment}", correlationId, segment))
+ {
+ try
+ {
+ await schemaTrackingExportService
+ .ProcessExportSegmentAsync(correlationId, segment, cancellationToken)
+ .ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(
+ ex,
+ "Export segment {Segment} failed for {CorrelationId}; sending message to segment error queue.",
+ segment,
+ correlationId
+ );
+ await EnqueueSegmentErrorAsync(correlationId, segment, cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }
+
+ private async Task OnSegmentDoneAsync(
+ string correlationId,
+ SchemaTrackingExportSegment segment,
+ CancellationToken cancellationToken
+ )
+ {
+ ArgumentException.ThrowIfNullOrEmpty(correlationId);
+
+ using (logger.BeginScope("CorrelationId={CorrelationId}, Segment={Segment}", correlationId, segment))
+ {
+ await schemaTrackingExportService
+ .OnExportSegmentDoneAsync(correlationId, segment, cancellationToken)
+ .ConfigureAwait(false);
+ }
+ }
+
+ private async Task EnqueueSegmentErrorAsync(
+ string correlationId,
+ SchemaTrackingExportSegment segment,
+ CancellationToken cancellationToken
+ )
+ {
+ var queueName = segment switch
+ {
+ SchemaTrackingExportSegment.Updated => Constants.Queues.ExportJobUpdatedError,
+ SchemaTrackingExportSegment.Inserted => Constants.Queues.ExportJobInsertedError,
+ SchemaTrackingExportSegment.Deleted => Constants.Queues.ExportJobDeletedError,
+ _ => throw new ArgumentOutOfRangeException(nameof(segment), segment, null)
+ };
+
+ var queue = queueServiceClient.GetQueueClient(queueName);
+ _ = await queue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
+ _ = await queue.SendMessageAsync(correlationId, cancellationToken: cancellationToken).ConfigureAwait(false);
+ }
+}
diff --git a/src/SqlBulkSyncFunction/Helpers/SchemaTrackingExportTableKeys.cs b/src/SqlBulkSyncFunction/Helpers/SchemaTrackingExportTableKeys.cs
new file mode 100644
index 0000000..a7b87a5
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Helpers/SchemaTrackingExportTableKeys.cs
@@ -0,0 +1,25 @@
+using System.Globalization;
+using SqlBulkSyncFunction.Services;
+
+namespace SqlBulkSyncFunction.Helpers;
+
+///
+/// Builds Azure Table Storage partition keys for export job entities.
+///
+public static class SchemaTrackingExportTableKeys
+{
+ ///
+ /// Builds partition key {area}_{id}_{tableId} with per-segment sanitization compatible with Azure Table key rules.
+ ///
+ /// Job area segment.
+ /// Job configuration id segment.
+ /// Table mapping id segment.
+ /// Composite partition key string.
+ public static string GetPartitionKey(string area, string jobId, string tableId)
+ => string.Format(
+ CultureInfo.InvariantCulture,
+ "{0}_{1}_{2}",
+ SyncMonitoringAggregationService.SanitizeBlobPathSegment(area),
+ SyncMonitoringAggregationService.SanitizeBlobPathSegment(jobId),
+ SyncMonitoringAggregationService.SanitizeBlobPathSegment(tableId));
+}
diff --git a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs
index ef7f8ce..0611105 100644
--- a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs
+++ b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs
@@ -1,6 +1,7 @@
using System;
using System.Linq;
using SqlBulkSyncFunction.Models.Schema;
+using SqlBulkSyncFunction.Models.Schema.Export;
namespace SqlBulkSyncFunction.Helpers;
@@ -86,6 +87,69 @@ FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct
""";
}
+ ///
+ /// Builds a query for one export segment: insert/update rows join the live table for full column values;
+ /// delete rows return primary key columns from CHANGETABLE only.
+ /// SYS_CHANGE_OPERATION is not projected (segment already implies I/U/D) to reduce payload size.
+ ///
+ /// Fully qualified source table name.
+ /// Source columns (explicit list; no *).
+ /// Which change operation to return.
+ /// Parameterized SQL using @FromVersion.
+ public static string GetChangeTrackingExportSegmentSelectStatement(
+ string sourceTableName,
+ Column[] columns,
+ SchemaTrackingExportSegment segment
+ )
+ {
+ var primaryKeyColumns = columns.Where(column => column.IsPrimary).ToArray();
+ if (primaryKeyColumns.Length == 0)
+ {
+ throw new InvalidOperationException($"Missing primary key columns for table {sourceTableName}.");
+ }
+
+ var operationFilter = segment switch
+ {
+ SchemaTrackingExportSegment.Updated => "N'U'",
+ SchemaTrackingExportSegment.Inserted => "N'I'",
+ SchemaTrackingExportSegment.Deleted => "N'D'",
+ _ => throw new ArgumentOutOfRangeException(nameof(segment))
+ };
+
+ if (segment == SchemaTrackingExportSegment.Deleted)
+ {
+ var primarySelect = string.Join(
+ ",\r\n ",
+ primaryKeyColumns.Select(column => string.Concat("ct.", column.QuoteName, " AS ", column.QuoteName))
+ );
+
+ return $"""
+ SELECT {primarySelect}
+ FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct
+ WHERE ct.SYS_CHANGE_OPERATION = {operationFilter}
+ """;
+ }
+
+ var pkJoin = string.Join(
+ " AND\r\n ",
+ primaryKeyColumns.Select(
+ column => string.Concat("ct.", column.QuoteName, " = t.", column.QuoteName)
+ )
+ );
+
+ var tableColumns = string.Join(
+ ",\r\n ",
+ columns.Select(column => string.Concat("t.", column.QuoteName, " AS ", column.QuoteName))
+ );
+
+ return $"""
+ SELECT {tableColumns}
+ FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct
+ INNER JOIN {sourceTableName} AS t ON {pkJoin}
+ WHERE ct.SYS_CHANGE_OPERATION = {operationFilter}
+ """;
+ }
+
public static string GetDropStatement(this string tableName)
=> $"""
IF OBJECT_ID('{tableName}') IS NOT NULL
@@ -262,7 +326,7 @@ CONSTRAINT [PK_{3}] PRIMARY KEY CLUSTERED
column => column.QuoteName
)
),
- Guid.NewGuid()
+ Guid.CreateVersion7()
);
return statement;
}
@@ -295,7 +359,7 @@ CONSTRAINT [PK_{2}] PRIMARY KEY CLUSTERED
)
)
),
- Guid.NewGuid(),
+ Guid.CreateVersion7(),
string.Join(
",\r\n ",
tableSchema.Columns
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobCreateResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobCreateResult.cs
new file mode 100644
index 0000000..5b69bc3
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobCreateResult.cs
@@ -0,0 +1,33 @@
+using System;
+
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// Outcome of attempting to create a schema tracking export job.
+///
+/// Whether creation succeeded or why it was rejected.
+/// Populated when is .
+public record ExportJobCreateResult(ExportJobCreateResultCode Code, SchemaTrackingExportJob? Job);
+
+///
+/// Result codes for .
+///
+public enum ExportJobCreateResultCode
+{
+ ///
+ /// Job was persisted and enqueued.
+ ///
+ Created = 0,
+
+ ///
+ /// Request body failed validation (missing or whitespace fields).
+ ///
+ ValidationFailed = 1,
+
+ ///
+ /// Job configuration or table mapping was not found for the route.
+ ///
+ NotFound = 2
+}
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs
new file mode 100644
index 0000000..f7791cd
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableEntity.cs
@@ -0,0 +1,85 @@
+using System;
+using Azure.Data.Tables;
+
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// Azure Table Storage entity for schema tracking export job coordination.
+/// Partition key: area_id_tableId (sanitized segments). Row key: export job id (version 7 GUID, n format, no dashes).
+///
+public sealed class ExportJobTableEntity : ITableEntity
+{
+ ///
+ public string PartitionKey { get; set; } = string.Empty;
+
+ ///
+ public string RowKey { get; set; } = string.Empty;
+
+ ///
+ public DateTimeOffset? Timestamp { get; set; }
+
+ ///
+ public Azure.ETag ETag { get; set; }
+
+ ///
+ /// Full correlation id path used as blob prefix and in queue messages.
+ ///
+ public string CorrelationId { get; set; } = string.Empty;
+
+ ///
+ /// Sync job area (configuration).
+ ///
+ public string Area { get; set; } = string.Empty;
+
+ ///
+ /// Sync job configuration id.
+ ///
+ public string JobId { get; set; } = string.Empty;
+
+ ///
+ /// Configured table mapping id.
+ ///
+ public string TableId { get; set; } = string.Empty;
+
+ ///
+ /// Reference id from the export request (denormalized for list queries without reading job.json).
+ ///
+ public string ReferenceId { get; set; } = string.Empty;
+
+ ///
+ /// Serialized value name.
+ ///
+ public string Status { get; set; } = nameof(SchemaTrackingExportJobStatus.Pending);
+
+ ///
+ /// Whether the updated ZIP segment finished successfully.
+ ///
+ public bool UpdatedDone { get; set; }
+
+ ///
+ /// Whether the inserted ZIP segment finished successfully.
+ ///
+ public bool InsertedDone { get; set; }
+
+ ///
+ /// Whether the deleted ZIP segment finished successfully.
+ ///
+ public bool DeletedDone { get; set; }
+
+ ///
+ /// UTC time when the job was accepted.
+ ///
+ public DateTimeOffset CreatedUtc { get; set; }
+
+ ///
+ /// UTC time when the job completed successfully; null if not completed.
+ ///
+ public DateTimeOffset? CompletedUtc { get; set; }
+
+ ///
+ /// Optional failure message.
+ ///
+ public string? Error { get; set; }
+}
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs
new file mode 100644
index 0000000..0e78b46
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/ExportJobTableMapper.cs
@@ -0,0 +1,42 @@
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// Maps rows to API DTOs shared by list and single-job status endpoints.
+///
+public static class ExportJobTableMapper
+{
+ ///
+ /// Maps a table row to ; is set only when loading result.json for detail responses.
+ ///
+ /// Persisted export job row.
+ /// Optional SAS ZIP URIs for status detail when result.json was loaded.
+ public static SchemaTrackingExportJobListItem ToListItem(ExportJobTableEntity entity, SchemaTrackingExportJobListItemResult? result = null)
+ => new(
+ CorrelationId: entity.CorrelationId,
+ ExportJobId: entity.RowKey,
+ Status: ParseStatus(entity.Status),
+ ReferenceId: entity.ReferenceId ?? string.Empty,
+ Created: entity.CreatedUtc,
+ Completed: entity.CompletedUtc,
+ UpdatedDone: entity.UpdatedDone,
+ InsertedDone: entity.InsertedDone,
+ DeletedDone: entity.DeletedDone,
+ Error: entity.Error,
+ Result: result
+ );
+
+ ///
+ /// Parses the persisted status string from table storage into .
+ ///
+ /// Raw value.
+ public static SchemaTrackingExportJobStatus ParseStatus(string? status)
+ => status switch
+ {
+ nameof(SchemaTrackingExportJobStatus.Running) => SchemaTrackingExportJobStatus.Running,
+ nameof(SchemaTrackingExportJobStatus.Completed) => SchemaTrackingExportJobStatus.Completed,
+ nameof(SchemaTrackingExportJobStatus.Failed) => SchemaTrackingExportJobStatus.Failed,
+ _ => SchemaTrackingExportJobStatus.Pending
+ };
+}
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJob.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJob.cs
new file mode 100644
index 0000000..319a04e
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJob.cs
@@ -0,0 +1,29 @@
+using System;
+
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// Metadata for an accepted schema tracking export job, stored as job.json under the export blob prefix.
+///
+/// Configured sync job area.
+/// Configured sync job identifier.
+/// Configured table mapping identifier.
+/// Blob path prefix segments for this job (date/hour/minute + version 7 id).
+/// Version 7 GUID in n format (no dashes); same value as Azure Table row key.
+/// Reference from the export request.
+/// Author from the export request.
+/// Purpose from the export request.
+/// UTC creation time when the job was accepted.
+public record SchemaTrackingExportJob(
+ string Area,
+ string Id,
+ string TableId,
+ string CorrelationId,
+ string ExportJobId,
+ string ReferenceId,
+ string Author,
+ string Purpose,
+ DateTimeOffset Created
+);
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs
new file mode 100644
index 0000000..fc2a011
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItem.cs
@@ -0,0 +1,33 @@
+using System;
+
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// Export job row returned by GET .../export/status (list) and GET .../export/status/{correlationId} (detail); is set only on detail when finalized.
+///
+/// Full correlation path used for blobs and deep links.
+/// Export job id (n-format version 7 GUID); same as Azure Table row key.
+/// Current job status.
+/// Reference from the export request.
+/// UTC creation time.
+/// UTC completion time when finished; if not completed.
+/// Whether the updated ZIP leg completed.
+/// Whether the inserted ZIP leg completed.
+/// Whether the deleted ZIP leg completed.
+/// Optional error message when status is failed.
+/// Populated for single-job status when response/result.json exists; otherwise .
+public record SchemaTrackingExportJobListItem(
+ string CorrelationId,
+ string ExportJobId,
+ SchemaTrackingExportJobStatus Status,
+ string ReferenceId,
+ DateTimeOffset Created,
+ DateTimeOffset? Completed,
+ bool UpdatedDone,
+ bool InsertedDone,
+ bool DeletedDone,
+ string? Error,
+ SchemaTrackingExportJobListItemResult? Result = null
+);
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs
new file mode 100644
index 0000000..fefa0d6
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobListItemResult.cs
@@ -0,0 +1,36 @@
+using System;
+
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// SAS download details exposed on for status responses.
+/// Subset of (blob result.json still stores the full record).
+///
+/// UTC time when the returned SAS URLs expire (aligned with SAS token).
+/// Read SAS URI for updated.zip.
+/// Read SAS URI for inserted.zip.
+/// Read SAS URI for deleted.zip.
+public record SchemaTrackingExportJobListItemResult(
+ DateTimeOffset SasExpires,
+ Uri UpdatedZipSasUri,
+ Uri InsertedZipSasUri,
+ Uri DeletedZipSasUri
+)
+{
+ ///
+ /// Maps a deserialized to the API subset, or returns when is .
+ ///
+ /// Full result from result.json, or .
+ /// List-item result with SAS URIs only, or .
+ public static SchemaTrackingExportJobListItemResult? FromJobResult(SchemaTrackingExportJobResult? jobResult)
+ => jobResult == null
+ ? null
+ : new SchemaTrackingExportJobListItemResult(
+ jobResult.SasExpires,
+ jobResult.UpdatedZipSasUri,
+ jobResult.InsertedZipSasUri,
+ jobResult.DeletedZipSasUri
+ );
+}
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs
new file mode 100644
index 0000000..ce330ca
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobResult.cs
@@ -0,0 +1,39 @@
+using System;
+
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// Final export outcome written to response/result.json, including time-limited read URLs for ZIP responses.
+///
+/// Configured sync job area.
+/// Configured sync job identifier.
+/// Configured table mapping identifier.
+/// Blob path prefix for this job.
+/// Export job id (n format); same as Azure Table row key.
+/// Reference from the export request.
+/// Author from the export request.
+/// Purpose from the export request.
+/// UTC time when the job was accepted.
+/// UTC time when all segments and result metadata finished.
+/// UTC time when the returned SAS URLs expire (aligned with SAS token).
+/// Read SAS URI for updated.zip.
+/// Read SAS URI for inserted.zip.
+/// Read SAS URI for deleted.zip.
+public record SchemaTrackingExportJobResult(
+ string Area,
+ string Id,
+ string TableId,
+ string CorrelationId,
+ string ExportJobId,
+ string ReferenceId,
+ string Author,
+ string Purpose,
+ DateTimeOffset Created,
+ DateTimeOffset Completed,
+ DateTimeOffset SasExpires,
+ Uri UpdatedZipSasUri,
+ Uri InsertedZipSasUri,
+ Uri DeletedZipSasUri
+);
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobStatus.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobStatus.cs
new file mode 100644
index 0000000..91921f6
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportJobStatus.cs
@@ -0,0 +1,32 @@
+using System.Text.Json.Serialization;
+
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// High-level lifecycle state for a schema tracking export job persisted in Azure Table Storage.
+///
+[JsonConverter(typeof(JsonStringEnumConverter))]
+public enum SchemaTrackingExportJobStatus
+{
+ ///
+ /// Job was accepted; work may not have started yet.
+ ///
+ Pending = 0,
+
+ ///
+ /// Dispatcher has enqueued segment workers or segment work is in progress.
+ ///
+ Running = 1,
+
+ ///
+ /// All segments completed and result metadata was written.
+ ///
+ Completed = 2,
+
+ ///
+ /// A fatal error occurred during export or finalize.
+ ///
+ Failed = 3
+}
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportRequestBody.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportRequestBody.cs
new file mode 100644
index 0000000..1e789c2
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportRequestBody.cs
@@ -0,0 +1,15 @@
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// Request body accepted by POST .../schema/tracking/{tableId}/export to justify and trace a sensitive data export.
+///
+/// Person or system initiating the export (required, non-whitespace).
+/// External ticket or reference identifier (required, non-whitespace).
+/// Business justification for exporting table data (required, non-whitespace).
+public record SchemaTrackingExportRequestBody(
+ string Author,
+ string ReferenceId,
+ string Purpose
+);
diff --git a/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs
new file mode 100644
index 0000000..083b182
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Schema/Export/SchemaTrackingExportSegment.cs
@@ -0,0 +1,24 @@
+namespace SqlBulkSyncFunction.Models.Schema.Export;
+
+#nullable enable
+
+///
+/// Identifies which change-tracking operation bucket an export worker or ZIP file belongs to.
+///
+public enum SchemaTrackingExportSegment
+{
+ ///
+ /// Rows with SYS_CHANGE_OPERATION = N'U'.
+ ///
+ Updated = 0,
+
+ ///
+ /// Rows with SYS_CHANGE_OPERATION = N'I'.
+ ///
+ Inserted = 1,
+
+ ///
+ /// Rows with SYS_CHANGE_OPERATION = N'D' (primary key columns only).
+ ///
+ Deleted = 2
+}
diff --git a/src/SqlBulkSyncFunction/Program.cs b/src/SqlBulkSyncFunction/Program.cs
index 023d338..5272e1d 100644
--- a/src/SqlBulkSyncFunction/Program.cs
+++ b/src/SqlBulkSyncFunction/Program.cs
@@ -1,3 +1,5 @@
+using System;
+using Azure.Data.Tables;
using Azure.Monitor.OpenTelemetry.Exporter;
using Microsoft.Azure.Functions.Worker.OpenTelemetry;
using Microsoft.Extensions.Azure;
@@ -33,6 +35,14 @@
.AddSingleton()
.AddSingleton()
.AddSingleton()
+ .AddSingleton(
+ static _ =>
+ {
+ var connectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage")
+ ?? throw new InvalidOperationException("AzureWebJobsStorage is not set.");
+ return new TableServiceClient(connectionString);
+ })
+ .AddSingleton()
.AddAzureClients(
static az => {
var connectionString = System.Environment.GetEnvironmentVariable("AzureWebJobsStorage");
diff --git a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs
new file mode 100644
index 0000000..61611cf
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportService.cs
@@ -0,0 +1,779 @@
+using System;
+using System.Collections.Generic;
+using System.Data;
+using System.Linq;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure;
+using Azure.Data.Tables;
+using Azure.Storage.Blobs;
+using Azure.Storage.Blobs.Models;
+using Azure.Storage.Queues;
+using Azure.Storage.Sas;
+using Microsoft.Data.SqlClient;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using SqlBulkSyncFunction.Helpers;
+using SqlBulkSyncFunction.Models.Job;
+using SqlBulkSyncFunction.Models.Schema.Export;
+
+namespace SqlBulkSyncFunction.Services;
+
+#nullable enable
+
+///
+/// Coordinates schema tracking export jobs (blobs, queues, table state, and segment processing).
+///
+public sealed class SchemaTrackingExportService(
+ ILogger logger,
+ IOptions syncJobsConfig,
+ ITokenCacheService tokenCacheService,
+ BlobServiceClient blobServiceClient,
+ QueueServiceClient queueServiceClient,
+ TableServiceClient tableServiceClient
+ )
+{
+ private static readonly JsonSerializerOptions JsonOptions = new()
+ {
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
+ PropertyNameCaseInsensitive = true,
+ WriteIndented = false
+ };
+
+ private readonly BlobContainerClient _exportContainer = GetOrCreateBlobContainer(blobServiceClient, Constants.Containers.Export);
+ private readonly TableClient _exportJobsTable = GetOrCreateTable(tableServiceClient, Constants.Tables.ExportJobs);
+
+ ///
+ /// Validates configuration, persists request and job blobs, creates the table row, and enqueues the main export queue.
+ ///
+ public async Task TryCreateExportJobAsync(
+ string area,
+ string jobId,
+ string tableId,
+ SchemaTrackingExportRequestBody request,
+ CancellationToken cancellationToken
+ )
+ {
+ if (string.IsNullOrWhiteSpace(request.Author) ||
+ string.IsNullOrWhiteSpace(request.ReferenceId) ||
+ string.IsNullOrWhiteSpace(request.Purpose))
+ {
+ return new ExportJobCreateResult(ExportJobCreateResultCode.ValidationFailed, null);
+ }
+
+ if (!TryResolveJobTable(area, jobId, tableId))
+ {
+ return new ExportJobCreateResult(ExportJobCreateResultCode.NotFound, null);
+ }
+
+ var utcNow = DateTimeOffset.UtcNow;
+ var jobGuid = Guid.CreateVersion7();
+ var exportJobId = jobGuid.ToString("n");
+ var correlationId = FormattableString.Invariant(
+ $"{utcNow.Year:0000}/{utcNow.Month:00}/{utcNow.Day:00}/{utcNow.Hour:00}/{utcNow.Minute:00}/{exportJobId}");
+
+ var job = new SchemaTrackingExportJob(
+ Area: area,
+ Id: jobId,
+ TableId: tableId,
+ CorrelationId: correlationId,
+ ExportJobId: exportJobId,
+ ReferenceId: request.ReferenceId,
+ Author: request.Author,
+ Purpose: request.Purpose,
+ Created: utcNow
+ );
+
+ var requestBlob = _exportContainer.GetBlobClient($"{correlationId}/request.json");
+ var jobBlob = _exportContainer.GetBlobClient($"{correlationId}/job.json");
+
+ _ = await requestBlob.UploadAsync(
+ BinaryData.FromObjectAsJson(request, JsonOptions),
+ new BlobUploadOptions
+ {
+ HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json }
+ },
+ cancellationToken
+ ).ConfigureAwait(false);
+
+ _ = await jobBlob.UploadAsync(
+ BinaryData.FromObjectAsJson(job, JsonOptions),
+ new BlobUploadOptions
+ {
+ HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json }
+ },
+ cancellationToken
+ ).ConfigureAwait(false);
+
+ var entity = new ExportJobTableEntity
+ {
+ PartitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(area, jobId, tableId),
+ RowKey = exportJobId,
+ CorrelationId = correlationId,
+ Area = area,
+ JobId = jobId,
+ TableId = tableId,
+ ReferenceId = request.ReferenceId,
+ Status = nameof(SchemaTrackingExportJobStatus.Pending),
+ CreatedUtc = utcNow,
+ UpdatedDone = false,
+ InsertedDone = false,
+ DeletedDone = false
+ };
+
+ _ = await _exportJobsTable.UpsertEntityAsync(entity, TableUpdateMode.Replace, cancellationToken).ConfigureAwait(false);
+
+ var mainQueue = GetOrCreateQueue(Constants.Queues.ExportJob);
+ _ = await mainQueue.SendMessageAsync(correlationId, cancellationToken: cancellationToken).ConfigureAwait(false);
+
+ return new ExportJobCreateResult(ExportJobCreateResultCode.Created, job);
+ }
+
+ ///
+ /// Loads job and table state for a single correlation id under the given route context (same DTO as ).
+ ///
+ public async Task TryGetExportStatusAsync(
+ string area,
+ string jobId,
+ string tableId,
+ string correlationId,
+ CancellationToken cancellationToken
+ )
+ {
+ if (!TryResolveJobTable(area, jobId, tableId))
+ {
+ return null;
+ }
+
+ var normalizedCorrelation = NormalizeCorrelationId(correlationId);
+ if (string.IsNullOrEmpty(normalizedCorrelation))
+ {
+ return null;
+ }
+
+ var job = await ReadJobBlobAsync(normalizedCorrelation, cancellationToken).ConfigureAwait(false);
+ if (job == null)
+ {
+ return null;
+ }
+
+ if (!PartitionMatchesRoute(job, area, jobId, tableId))
+ {
+ return null;
+ }
+
+ var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(area, jobId, tableId);
+ var rowKey = job.ExportJobId;
+
+ ExportJobTableEntity? entity;
+ try
+ {
+ var response = await _exportJobsTable
+ .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ entity = response.Value;
+ }
+ catch (RequestFailedException ex) when (ex.Status == 404)
+ {
+ entity = null;
+ }
+
+ var resultBlob = _exportContainer.GetBlobClient($"{normalizedCorrelation}/response/result.json");
+ SchemaTrackingExportJobResult? result = null;
+ if (await resultBlob.ExistsAsync(cancellationToken).ConfigureAwait(false))
+ {
+ try
+ {
+ var download = await resultBlob.DownloadContentAsync(cancellationToken).ConfigureAwait(false);
+ result = download.Value.Content.ToObjectFromJson(JsonOptions);
+ }
+ catch (Exception ex)
+ {
+ logger.LogWarning(ex, "Failed to deserialize result.json for {CorrelationId}", normalizedCorrelation);
+ }
+ }
+
+ var listResult = SchemaTrackingExportJobListItemResult.FromJobResult(result);
+
+ if (entity != null)
+ {
+ return ExportJobTableMapper.ToListItem(entity, listResult);
+ }
+
+ return new SchemaTrackingExportJobListItem(
+ CorrelationId: job.CorrelationId,
+ ExportJobId: job.ExportJobId,
+ Status: ExportJobTableMapper.ParseStatus(null),
+ ReferenceId: job.ReferenceId,
+ Created: job.Created,
+ Completed: null,
+ UpdatedDone: false,
+ InsertedDone: false,
+ DeletedDone: false,
+ Error: null,
+ Result: listResult
+ );
+ }
+
+ ///
+ /// Lists export jobs for the partition derived from , , and .
+ ///
+ public async Task> ListExportJobsAsync(
+ string area,
+ string jobId,
+ string tableId,
+ CancellationToken cancellationToken
+ )
+ {
+ if (!TryResolveJobTable(area, jobId, tableId))
+ {
+ return [];
+ }
+
+ var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(area, jobId, tableId);
+ var filter = FormattableString.Invariant($"PartitionKey eq '{EscapeODataString(partitionKey)}'");
+ return await _exportJobsTable
+ .QueryAsync(
+ filter,
+ cancellationToken: cancellationToken
+ )
+ .Select(static e => ExportJobTableMapper.ToListItem(e))
+ .OrderByDescending(static x => x.Created)
+ .ToArrayAsync(cancellationToken)
+ .ConfigureAwait(false);
+ }
+
+ ///
+ /// Dispatches a main-queue message to the three segment queues.
+ ///
+ public async Task DispatchExportJobAsync(string correlationId, CancellationToken cancellationToken)
+ {
+ var normalized = NormalizeCorrelationId(correlationId);
+ var job = await ReadJobBlobAsync(normalized, cancellationToken).ConfigureAwait(false);
+ if (job == null)
+ {
+ logger.LogWarning("Dispatch skipped: missing job.json for {CorrelationId}", normalized);
+ return;
+ }
+
+ var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(job.Area, job.Id, job.TableId);
+ var rowKey = job.ExportJobId;
+ await PatchEntityAsync(
+ partitionKey,
+ rowKey,
+ static e => e.Status = nameof(SchemaTrackingExportJobStatus.Running),
+ cancellationToken
+ ).ConfigureAwait(false);
+
+ var qUpdated = GetOrCreateQueue(Constants.Queues.ExportJobUpdated);
+ var qInserted = GetOrCreateQueue(Constants.Queues.ExportJobInserted);
+ var qDeleted = GetOrCreateQueue(Constants.Queues.ExportJobDeleted);
+
+ // Do not pass visibilityTimeout on enqueue: non-zero values hide the message from Dequeue/Peek until the timeout elapses (scheduled/delayed work), not a processing lease.
+ _ = await qUpdated.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false);
+ _ = await qInserted.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false);
+ _ = await qDeleted.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false);
+ }
+
+ ///
+ /// Builds one segment ZIP from SQL change tracking and notifies the corresponding done queue.
+ ///
+ public async Task ProcessExportSegmentAsync(
+ string correlationId,
+ SchemaTrackingExportSegment segment,
+ CancellationToken cancellationToken
+ )
+ {
+ var normalized = NormalizeCorrelationId(correlationId);
+ var job = await ReadJobBlobAsync(normalized, cancellationToken).ConfigureAwait(false);
+ if (job == null)
+ {
+ logger.LogWarning("Segment worker: missing job for {CorrelationId}", normalized);
+ return;
+ }
+
+ if (!syncJobsConfig.Value.Jobs.TryGetValue(job.Id, out var jobConfig) ||
+ !string.Equals(jobConfig.Area, job.Area, StringComparison.OrdinalIgnoreCase))
+ {
+ await MarkJobFailedAsync(job, "Job configuration not found or area mismatch.", cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
+ var syncJob = jobConfig.ToSyncJob(
+ scheduleCorrelationId: null,
+ tokenCache: await tokenCacheService.GetTokenCache(jobConfig).ConfigureAwait(false),
+ timestamp: DateTimeOffset.UtcNow,
+ expires: DateTimeOffset.UtcNow.AddMinutes(4),
+ id: job.Id,
+ schedule: nameof(jobConfig.Manual),
+ seed: false
+ );
+
+ var table = (syncJob.Tables ?? []).FirstOrDefault(
+ t => string.Equals(t.Id, job.TableId, StringComparison.OrdinalIgnoreCase)
+ );
+ if (table == null)
+ {
+ await MarkJobFailedAsync(job, "Table mapping not found.", cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
+ try
+ {
+ await using var sourceConn = new SqlConnection(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken };
+ await using var targetConn = new SqlConnection(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };
+ await sourceConn.OpenAsync(cancellationToken).ConfigureAwait(false);
+ await targetConn.OpenAsync(cancellationToken).ConfigureAwait(false);
+
+ targetConn.EnsureSyncSchemaAndTableExists(
+ FormattableString.Invariant($"config/{job.Id}/{job.Area}/schema/tracking/{job.TableId}"),
+ logger
+ );
+
+ var columns = sourceConn.GetColumns(table.Source);
+ var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns);
+ var targetVersion = targetConn.GetTargetVersion(table.Target);
+ var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion;
+
+ var sql = SqlStatementExtensions.GetChangeTrackingExportSegmentSelectStatement(table.Source, columns, segment);
+
+ await using var cmd = new SqlCommand(sql, sourceConn)
+ {
+ CommandTimeout = 3600
+ };
+ _ = cmd.Parameters.Add(new SqlParameter("@FromVersion", SqlDbType.BigInt) { Value = fromVersion });
+
+ await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken)
+ .ConfigureAwait(false);
+
+ var (zipPath, jsonPath) = GetZipRelativePath(segment);
+ var zipBlob = _exportContainer.GetBlobClient($"{normalized}/{zipPath}");
+ await using var uploadStream = await zipBlob.OpenWriteAsync(
+ true,
+ new BlobOpenWriteOptions
+ {
+ HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Zip }
+ },
+ cancellationToken
+ )
+ .ConfigureAwait(false);
+
+ await SchemaTrackingExportStreamingZip.WriteReaderToZipAsync(reader, uploadStream, jsonPath, cancellationToken)
+ .ConfigureAwait(false);
+
+ var doneQueueName = GetDoneQueueName(segment);
+ var doneQueue = GetOrCreateQueue(doneQueueName);
+ _ = await doneQueue.SendMessageAsync(normalized, cancellationToken: cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Export segment {Segment} failed for {CorrelationId}", segment, normalized);
+ await MarkJobFailedAsync(job, ex.Message, cancellationToken).ConfigureAwait(false);
+ throw;
+ }
+ }
+
+ ///
+ /// Records segment completion and finalizes the job when all segments are done.
+ ///
+ public async Task OnExportSegmentDoneAsync(
+ string correlationId,
+ SchemaTrackingExportSegment segment,
+ CancellationToken cancellationToken
+ )
+ {
+ var normalized = NormalizeCorrelationId(correlationId);
+ var job = await ReadJobBlobAsync(normalized, cancellationToken).ConfigureAwait(false);
+ if (job == null)
+ {
+ logger.LogWarning(
+ "Segment done handler: missing job.json for {CorrelationId} (check queue message body / encoding vs blob path).",
+ normalized
+ );
+ return;
+ }
+
+ var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(job.Area, job.Id, job.TableId);
+ var rowKey = job.ExportJobId;
+
+ for (var attempt = 0; attempt < 12; attempt++)
+ {
+ Response response;
+ try
+ {
+ response = await _exportJobsTable
+ .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ }
+ catch (RequestFailedException ex) when (ex.Status == 404)
+ {
+ logger.LogWarning("Finalize: missing table entity for {CorrelationId}", normalized);
+ return;
+ }
+
+ var entity = response.Value;
+ if (IsLegDone(entity, segment))
+ {
+ var alreadyRefreshed = await _exportJobsTable
+ .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ await TryFinalizeIfCompleteAsync(job, alreadyRefreshed.Value, cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
+ SetLegDone(entity, segment, true);
+
+ try
+ {
+ _ = await _exportJobsTable.UpdateEntityAsync(
+ entity,
+ response.Value.ETag,
+ TableUpdateMode.Merge,
+ cancellationToken
+ ).ConfigureAwait(false);
+ }
+ catch (RequestFailedException ex) when (ex.Status == 412)
+ {
+ continue;
+ }
+
+ var refreshed = await _exportJobsTable
+ .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ await TryFinalizeIfCompleteAsync(job, refreshed.Value, cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
+ logger.LogWarning("Finalize: exhausted optimistic retries for {CorrelationId}", normalized);
+ }
+
+ private async Task TryFinalizeIfCompleteAsync(
+ SchemaTrackingExportJob job,
+ ExportJobTableEntity entity,
+ CancellationToken cancellationToken
+ )
+ {
+ if (!entity.UpdatedDone || !entity.InsertedDone || !entity.DeletedDone)
+ {
+ return;
+ }
+
+ var correlationId = NormalizeCorrelationId(job.CorrelationId);
+ var resultBlob = _exportContainer.GetBlobClient($"{correlationId}/response/result.json");
+ if (await resultBlob.ExistsAsync(cancellationToken).ConfigureAwait(false))
+ {
+ await MarkTableCompletedAsync(job, cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
+ var updatedZip = _exportContainer.GetBlobClient($"{correlationId}/response/updated.zip");
+ var insertedZip = _exportContainer.GetBlobClient($"{correlationId}/response/inserted.zip");
+ var deletedZip = _exportContainer.GetBlobClient($"{correlationId}/response/deleted.zip");
+
+ if (!await updatedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) ||
+ !await insertedZip.ExistsAsync(cancellationToken).ConfigureAwait(false) ||
+ !await deletedZip.ExistsAsync(cancellationToken).ConfigureAwait(false))
+ {
+ logger.LogWarning("Finalize: missing one or more ZIP blobs for {CorrelationId}", correlationId);
+ return;
+ }
+
+ var sasExpires = DateTimeOffset.UtcNow.AddDays(7);
+ var updatedUri = GenerateReadSasUri(updatedZip, sasExpires);
+ var insertedUri = GenerateReadSasUri(insertedZip, sasExpires);
+ var deletedUri = GenerateReadSasUri(deletedZip, sasExpires);
+
+ var completed = DateTimeOffset.UtcNow;
+ var result = new SchemaTrackingExportJobResult(
+ Area: job.Area,
+ Id: job.Id,
+ TableId: job.TableId,
+ CorrelationId: job.CorrelationId,
+ ExportJobId: job.ExportJobId,
+ ReferenceId: job.ReferenceId,
+ Author: job.Author,
+ Purpose: job.Purpose,
+ Created: job.Created,
+ Completed: completed,
+ SasExpires: sasExpires,
+ UpdatedZipSasUri: updatedUri,
+ InsertedZipSasUri: insertedUri,
+ DeletedZipSasUri: deletedUri
+ );
+
+ try
+ {
+ _ = await resultBlob.UploadAsync(
+ BinaryData.FromObjectAsJson(result, JsonOptions),
+ new BlobUploadOptions
+ {
+ HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json },
+ Conditions = new BlobRequestConditions { IfNoneMatch = ETag.All }
+ },
+ cancellationToken
+ ).ConfigureAwait(false);
+ }
+ catch (RequestFailedException ex) when (ex.Status == 412)
+ {
+ logger.LogInformation("result.json already created for {CorrelationId}", correlationId);
+ }
+
+ await MarkTableCompletedAsync(job, cancellationToken).ConfigureAwait(false);
+ }
+
+ private async Task MarkTableCompletedAsync(SchemaTrackingExportJob job, CancellationToken cancellationToken)
+ {
+ var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(job.Area, job.Id, job.TableId);
+ var rowKey = job.ExportJobId;
+ for (var attempt = 0; attempt < 12; attempt++)
+ {
+ Response response;
+ try
+ {
+ response = await _exportJobsTable
+ .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ }
+ catch (RequestFailedException ex) when (ex.Status == 404)
+ {
+ return;
+ }
+
+ var entity = response.Value;
+ if (string.Equals(entity.Status, nameof(SchemaTrackingExportJobStatus.Completed), StringComparison.Ordinal))
+ {
+ return;
+ }
+
+ entity.Status = nameof(SchemaTrackingExportJobStatus.Completed);
+ entity.CompletedUtc = DateTimeOffset.UtcNow;
+ entity.UpdatedDone = true;
+ entity.InsertedDone = true;
+ entity.DeletedDone = true;
+
+ try
+ {
+ _ = await _exportJobsTable.UpdateEntityAsync(
+ entity,
+ response.Value.ETag,
+ TableUpdateMode.Merge,
+ cancellationToken
+ ).ConfigureAwait(false);
+ return;
+ }
+ catch (RequestFailedException ex) when (ex.Status == 412)
+ {
+ continue;
+ }
+ }
+ }
+
+ private async Task MarkJobFailedAsync(SchemaTrackingExportJob job, string error, CancellationToken cancellationToken)
+ {
+ var partitionKey = SchemaTrackingExportTableKeys.GetPartitionKey(job.Area, job.Id, job.TableId);
+ var rowKey = job.ExportJobId;
+ for (var attempt = 0; attempt < 8; attempt++)
+ {
+ try
+ {
+ var response = await _exportJobsTable
+ .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ var entity = response.Value;
+ entity.Status = nameof(SchemaTrackingExportJobStatus.Failed);
+ entity.Error = error;
+ _ = await _exportJobsTable.UpdateEntityAsync(
+ entity,
+ response.Value.ETag,
+ TableUpdateMode.Merge,
+ cancellationToken
+ ).ConfigureAwait(false);
+ return;
+ }
+ catch (RequestFailedException ex) when (ex.Status == 404)
+ {
+ return;
+ }
+ catch (RequestFailedException ex) when (ex.Status == 412)
+ {
+ continue;
+ }
+ }
+ }
+
+ private async Task PatchEntityAsync(
+ string partitionKey,
+ string rowKey,
+ Action patch,
+ CancellationToken cancellationToken
+ )
+ {
+ for (var attempt = 0; attempt < 8; attempt++)
+ {
+ try
+ {
+ var response = await _exportJobsTable
+ .GetEntityAsync(partitionKey, rowKey, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ var entity = response.Value;
+ patch(entity);
+ _ = await _exportJobsTable.UpdateEntityAsync(
+ entity,
+ response.Value.ETag,
+ TableUpdateMode.Merge,
+ cancellationToken
+ ).ConfigureAwait(false);
+ return;
+ }
+ catch (RequestFailedException ex) when (ex.Status == 404)
+ {
+ return;
+ }
+ catch (RequestFailedException ex) when (ex.Status == 412)
+ {
+ continue;
+ }
+ }
+ }
+
+ private static Uri GenerateReadSasUri(BlobClient blob, DateTimeOffset expiresOn)
+ {
+ var sas = new BlobSasBuilder
+ {
+ Resource = "b",
+ BlobContainerName = blob.BlobContainerName,
+ BlobName = blob.Name,
+ StartsOn = DateTimeOffset.UtcNow.AddMinutes(-5),
+ ExpiresOn = expiresOn
+ };
+ sas.SetPermissions(BlobSasPermissions.Read);
+ return blob.GenerateSasUri(sas);
+ }
+
+ private static bool IsLegDone(ExportJobTableEntity entity, SchemaTrackingExportSegment segment)
+ => segment switch
+ {
+ SchemaTrackingExportSegment.Updated => entity.UpdatedDone,
+ SchemaTrackingExportSegment.Inserted => entity.InsertedDone,
+ SchemaTrackingExportSegment.Deleted => entity.DeletedDone,
+ _ => false
+ };
+
+ private static void SetLegDone(ExportJobTableEntity entity, SchemaTrackingExportSegment segment, bool value)
+ {
+ switch (segment)
+ {
+ case SchemaTrackingExportSegment.Updated:
+ entity.UpdatedDone = value;
+ break;
+ case SchemaTrackingExportSegment.Inserted:
+ entity.InsertedDone = value;
+ break;
+ case SchemaTrackingExportSegment.Deleted:
+ entity.DeletedDone = value;
+ break;
+ default:
+ throw new ArgumentOutOfRangeException(nameof(segment));
+ }
+ }
+
+ private static string GetDoneQueueName(SchemaTrackingExportSegment segment)
+ => segment switch
+ {
+ SchemaTrackingExportSegment.Updated => Constants.Queues.ExportJobUpdatedDone,
+ SchemaTrackingExportSegment.Inserted => Constants.Queues.ExportJobInsertedDone,
+ SchemaTrackingExportSegment.Deleted => Constants.Queues.ExportJobDeletedDone,
+ _ => throw new ArgumentOutOfRangeException(nameof(segment))
+ };
+
+ ///
+ /// Blob path under the correlation prefix for the segment ZIP, and the single JSON entry name inside that ZIP.
+ ///
+ /// Export segment.
+ /// ZipPath matches finalize/SAS blob names (response/*.zip); JsonPath is the inner entry file name.
+ private static (string ZipPath, string JsonPath) GetZipRelativePath(SchemaTrackingExportSegment segment)
+ => segment switch
+ {
+ SchemaTrackingExportSegment.Updated => ("response/updated.zip", "updated.json"),
+ SchemaTrackingExportSegment.Inserted => ("response/inserted.zip", "inserted.json"),
+ SchemaTrackingExportSegment.Deleted => ("response/deleted.zip", "deleted.json"),
+ _ => throw new ArgumentOutOfRangeException(nameof(segment))
+ };
+
+ private async Task ReadJobBlobAsync(string correlationId, CancellationToken cancellationToken)
+ {
+ var blob = _exportContainer.GetBlobClient($"{correlationId}/job.json");
+ if (!await blob.ExistsAsync(cancellationToken).ConfigureAwait(false))
+ {
+ return null;
+ }
+
+ var content = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false);
+ return content.Value.Content.ToObjectFromJson(JsonOptions);
+ }
+
+ private bool TryResolveJobTable(string area, string jobId, string tableId)
+ {
+ var jobs = syncJobsConfig.Value.Jobs;
+ if (string.IsNullOrWhiteSpace(area) ||
+ string.IsNullOrWhiteSpace(jobId) ||
+ string.IsNullOrWhiteSpace(tableId) ||
+ jobs == null ||
+ !jobs.TryGetValue(jobId, out var jc) ||
+ jc == null ||
+ !string.Equals(jc.Area, area, StringComparison.OrdinalIgnoreCase) ||
+ jc.Tables == null ||
+ !jc.Tables.TryGetValue(tableId, out var sourceTableName) ||
+ string.IsNullOrWhiteSpace(sourceTableName))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ private static bool PartitionMatchesRoute(SchemaTrackingExportJob job, string area, string jobId, string tableId)
+ => string.Equals(job.Area, area, StringComparison.OrdinalIgnoreCase)
+ && string.Equals(job.Id, jobId, StringComparison.OrdinalIgnoreCase)
+ && string.Equals(job.TableId, tableId, StringComparison.OrdinalIgnoreCase);
+
+ private static string NormalizeCorrelationId(string correlationId)
+ {
+ if (string.IsNullOrWhiteSpace(correlationId))
+ {
+ return string.Empty;
+ }
+
+ var s = correlationId.Trim().Replace('\\', '/').Trim('/');
+ if (s.Contains("..", StringComparison.Ordinal))
+ {
+ return string.Empty;
+ }
+
+ return s;
+ }
+
+ private static string EscapeODataString(string value) => value.Replace("'", "''", StringComparison.Ordinal);
+
+ private static BlobContainerClient GetOrCreateBlobContainer(BlobServiceClient client, string name)
+ {
+ var c = client.GetBlobContainerClient(name);
+ _ = c.CreateIfNotExists(PublicAccessType.None);
+ return c;
+ }
+
+ private static TableClient GetOrCreateTable(TableServiceClient client, string tableName)
+ {
+ var t = client.GetTableClient(tableName);
+ _ = t.CreateIfNotExists();
+ return t;
+ }
+
+ private QueueClient GetOrCreateQueue(string queueName)
+ {
+ var q = queueServiceClient.GetQueueClient(queueName);
+ _ = q.CreateIfNotExists();
+ return q;
+ }
+}
diff --git a/src/SqlBulkSyncFunction/Services/SchemaTrackingExportStreamingZip.cs b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportStreamingZip.cs
new file mode 100644
index 0000000..0d0d6e4
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Services/SchemaTrackingExportStreamingZip.cs
@@ -0,0 +1,110 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.IO.Compression;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Data.SqlClient;
+
+namespace SqlBulkSyncFunction.Services;
+
+#nullable enable
+
+///
+/// Streams SQL rows into a JSON array inside a single ZIP entry written to (e.g. Azure Blob OpenWriteAsync), avoiding a temp file and full in-memory buffering of the export.
+///
+public static class SchemaTrackingExportStreamingZip
+{
+ private static readonly JsonSerializerOptions RowSerializerOptions = new()
+ {
+ WriteIndented = false,
+ DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
+ };
+
+ ///
+ /// Reads forward-only from and writes a JSON array into one ZIP entry named on .
+ ///
+ /// Open data reader; disposed by caller.
+ /// Writable stream for the ZIP payload (e.g. block blob staged write). Must remain open until this method returns; caller disposes it.
+ /// Entry name inside the archive (e.g. inserted.json).
+ /// Cancellation token.
+ public static async Task WriteReaderToZipAsync(
+ SqlDataReader reader,
+ Stream zipDestination,
+ string zipEntryFileName,
+ CancellationToken cancellationToken
+ )
+ {
+ ArgumentNullException.ThrowIfNull(reader);
+ ArgumentNullException.ThrowIfNull(zipDestination);
+ ArgumentException.ThrowIfNullOrWhiteSpace(zipEntryFileName);
+ if (!zipDestination.CanWrite)
+ {
+ throw new ArgumentException("Stream must be writable.", nameof(zipDestination));
+ }
+
+ using var bufferedStream = new BufferedStream(zipDestination, 8192);
+ using var zipArchive = new ZipArchive(bufferedStream, ZipArchiveMode.Create, leaveOpen: true);
+ await WriteReaderAsIndentedJsonArrayAsync(reader, zipArchive, zipEntryFileName, cancellationToken).ConfigureAwait(false);
+ await bufferedStream.FlushAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ ///
+ /// Creates a single ZIP entry and streams a UTF-8 JSON array: opening bracket, one compact object per SQL row (commas between rows), closing bracket.
+ ///
+ /// Forward-only SQL reader; disposed by caller.
+ /// ZIP archive already opened in create mode.
+ /// Name of the entry inside the archive (e.g. inserted.json).
+ /// Cancellation token.
+ private static async Task WriteReaderAsIndentedJsonArrayAsync(
+ SqlDataReader reader,
+ ZipArchive zipArchive,
+ string zipEntryFileName,
+ CancellationToken cancellationToken
+ )
+ {
+ // One deflated entry; JSON is built incrementally so the full export is not held in memory.
+ var entry = zipArchive.CreateEntry(zipEntryFileName, CompressionLevel.Optimal);
+ await using var entryStream = entry.Open();
+
+ // Outer JSON array: '[' then rows; each row is one JsonSerializer object (no extra array indent options).
+ entryStream.WriteByte(0x5b); // '['
+ var rowBuffer = new Dictionary(StringComparer.OrdinalIgnoreCase);
+ var firstRow = true;
+ while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
+ {
+ // Human-friendly separation between top-level array elements (objects stay compact via RowSerializerOptions).
+ if (firstRow)
+ {
+ firstRow = false;
+ entryStream.Write("\n "u8);
+ }
+ else
+ {
+ entryStream.Write(",\n "u8);
+ }
+
+ // Reuse one dictionary per row to avoid allocating a new map for every record.
+ rowBuffer.Clear();
+ for (var i = 0; i < reader.FieldCount; i++)
+ {
+ // Skip SQL NULLs: omit properties entirely (smaller JSON than null fields).
+ if (reader.IsDBNull(i))
+ {
+ continue;
+ }
+
+ rowBuffer.Add(reader.GetName(i), reader.GetValue(i));
+ }
+
+ await JsonSerializer
+ .SerializeAsync(entryStream, rowBuffer, RowSerializerOptions, cancellationToken)
+ .ConfigureAwait(false);
+ }
+
+ // Closing bracket: ']' with trailing newline for human readability.
+ entryStream.Write("\n]"u8);
+ }
+}
diff --git a/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj b/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj
index 20a1160..14b5cca 100644
--- a/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj
+++ b/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj
@@ -23,6 +23,7 @@
+