-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathAzureBlobBodyStorage.cs
More file actions
134 lines (117 loc) · 5.57 KB
/
AzureBlobBodyStorage.cs
File metadata and controls
134 lines (117 loc) · 5.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.Logging;
/// <summary>
/// Body storage implementation that uses Azure Blob Storage to store message bodies. Each body is stored as a separate blob, with metadata for content type and size.
/// The implementation includes retry logic for transient failures when uploading blobs, and uses a batched writer to optimize performance when storing large volumes of messages.
/// </summary>
class AzureBlobBodyStorage(
Channel<BodyWriteItem> channel,
MongoSettings settings,
ILogger<AzureBlobBodyStorage> logger)
: BatchedBodyStorageWriter<BodyWriteItem>(channel, settings, logger), IBodyStorage, IBodyWriter
{
const int MaxRetries = 3;
readonly BlobContainerClient containerClient = new(settings.BlobConnectionString, settings.BlobContainerName);
protected override string WriterName => "Azure Blob body storage writer";
// Initialization
public async Task Initialize(CancellationToken cancellationToken)
{
_ = await containerClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
logger.LogInformation("Azure Blob body storage initialized. Container: {ContainerName}", containerClient.Name);
}
// IBodyWriter
public bool IsEnabled => true;
public async ValueTask WriteAsync(string id, string contentType, ReadOnlyMemory<byte> body, DateTime expiresAt, CancellationToken cancellationToken)
{
await WriteToChannelAsync(new BodyWriteItem
{
Id = id,
ContentType = contentType,
BodySize = body.Length,
Body = body.ToArray(),
ExpiresAt = expiresAt
}, cancellationToken).ConfigureAwait(false);
}
// IBodyStorage
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
=> Task.CompletedTask;
public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
{
var blobClient = containerClient.GetBlobClient(bodyId);
try
{
var response = await blobClient.DownloadStreamingAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var details = response.Value.Details;
var bodySize = 0;
if (details.Metadata.TryGetValue("bodySize", out var bodySizeStr))
{
_ = int.TryParse(bodySizeStr, out bodySize);
}
return new StreamResult
{
HasResult = true,
Stream = response.Value.Content,
ContentType = details.ContentType ?? "text/plain",
BodySize = bodySize,
Etag = details.ETag.ToString()
};
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
return new StreamResult { HasResult = false };
}
}
// BatchedBodyStorageWriter
protected override async Task FlushBatchAsync(List<BodyWriteItem> batch, CancellationToken cancellationToken)
{
var uploadTasks = batch.Select(entry => UploadBlobWithRetry(entry, cancellationToken));
await Task.WhenAll(uploadTasks).ConfigureAwait(false);
}
async Task UploadBlobWithRetry(BodyWriteItem entry, CancellationToken cancellationToken)
{
var blobClient = containerClient.GetBlobClient(entry.Id);
for (var attempt = 1; attempt <= MaxRetries; attempt++)
{
try
{
using var stream = new MemoryStream(entry.Body);
var options = new BlobUploadOptions
{
HttpHeaders = new BlobHttpHeaders { ContentType = entry.ContentType.Trim() },
Metadata = new Dictionary<string, string>
{
["messageId"] = entry.Id.Trim(),
["bodySize"] = entry.BodySize.ToString(),
["mongoExpiresAt"] = entry.ExpiresAt.ToString("O")
}
};
_ = await blobClient.UploadAsync(stream, options, cancellationToken).ConfigureAwait(false);
return;
}
catch (Exception ex) when (attempt < MaxRetries && !cancellationToken.IsCancellationRequested)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt - 1));
logger.LogWarning(ex, "Failed to upload blob {BlobId} (attempt {Attempt}/{MaxRetries}), retrying in {Delay}s",
entry.Id, attempt, MaxRetries, delay.TotalSeconds);
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to upload blob {BlobId} after {MaxRetries} attempts", entry.Id, MaxRetries);
}
}
}
}
}