33using System . Linq ;
44using System . Threading ;
55using System . Threading . Tasks ;
6- using Azure . Messaging . ServiceBus ;
76using Ev . ServiceBus . Abstractions ;
8- using Ev . ServiceBus . Abstractions . Extensions ;
9- using Ev . ServiceBus . Abstractions . MessageReception ;
10- using Ev . ServiceBus . Diagnostics ;
11- using Ev . ServiceBus . Management ;
12- using Microsoft . Extensions . Options ;
137
148namespace Ev . ServiceBus . Dispatch ;
159
1610public class DispatchSender : IDispatchSender
1711{
18- private const int MaxMessagePerSend = 100 ;
19- private readonly IMessagePayloadSerializer _messagePayloadSerializer ;
20- private readonly ServiceBusRegistry _dispatchRegistry ;
21- private readonly ServiceBusRegistry _registry ;
22- private readonly IMessageMetadataAccessor _messageMetadataAccessor ;
23- private readonly IEnumerable < IDispatchExtender > _dispatchCustomizers ;
24- private readonly ServiceBusOptions _serviceBusOptions ;
12+ private readonly ServiceBusMessageFactory _messageFactory ;
13+ private readonly ServiceBusMessageSender _serviceBusMessageSender ;
2514
2615 public DispatchSender (
27- ServiceBusRegistry registry ,
28- IMessagePayloadSerializer messagePayloadSerializer ,
29- ServiceBusRegistry dispatchRegistry ,
30- IMessageMetadataAccessor messageMetadataAccessor ,
31- IEnumerable < IDispatchExtender > dispatchCustomizers ,
32- IOptions < ServiceBusOptions > serviceBusOptions )
16+ ServiceBusMessageFactory messageFactory ,
17+ ServiceBusMessageSender serviceBusMessageSender )
3318 {
34- _registry = registry ;
35- _messagePayloadSerializer = messagePayloadSerializer ;
36- _dispatchRegistry = dispatchRegistry ;
37- _messageMetadataAccessor = messageMetadataAccessor ;
38- _dispatchCustomizers = dispatchCustomizers ;
39- _serviceBusOptions = serviceBusOptions . Value ;
19+ _messageFactory = messageFactory ;
20+ _serviceBusMessageSender = serviceBusMessageSender ;
4021 }
4122
4223 /// <inheritdoc />
@@ -50,20 +31,14 @@ public async Task SendDispatch(object messagePayload, CancellationToken token =
5031 /// <inheritdoc />
5132 public async Task SendDispatch ( Abstractions . Dispatch messagePayload , CancellationToken token = default )
5233 {
53- var dispatches = CreateMessagesToSend ( [ messagePayload ] ) ;
34+ var dispatches = _messageFactory . CreateMessagesToSend ( [ messagePayload ] ) ;
5435
55- foreach ( var messagePerResource in dispatches )
56- {
57- var message = messagePerResource . Messages . Single ( ) ;
36+ var messagePerResource = dispatches . Single ( ) ;
5837
59- await messagePerResource . Sender . SendMessageAsync ( message . Message , token ) ;
60- ServiceBusMeter . IncrementSentCounter (
61- 1 ,
62- messagePerResource . Sender . ClientType . ToString ( ) ,
63- messagePerResource . Sender . Name ,
64- message . Message . ApplicationProperties [ UserProperties . PayloadTypeIdProperty ] ? . ToString ( )
65- ) ;
66- }
38+ await _serviceBusMessageSender . SendMessages (
39+ messagePerResource . ResourceId ,
40+ messagePerResource . Messages ,
41+ token ) ;
6742 }
6843
6944 /// <inheritdoc />
@@ -86,48 +61,10 @@ public async Task SendDispatches(IEnumerable<Abstractions.Dispatch> messagePaylo
8661 throw new ArgumentNullException ( nameof ( messagePayloads ) ) ;
8762 }
8863
89- var dispatches = CreateMessagesToSend ( messagePayloads ) ;
64+ var dispatches = _messageFactory . CreateMessagesToSend ( messagePayloads ) ;
9065 foreach ( var messagesPerResource in dispatches )
9166 {
92- await BatchAndSendMessages ( messagesPerResource , token , async ( sender , batch ) =>
93- {
94- await sender . SendMessagesAsync ( batch , token ) ;
95- } ) ;
96- }
97- }
98-
99- private async Task BatchAndSendMessages ( MessagesPerResource dispatches , CancellationToken token , Func < IMessageSender , ServiceBusMessageBatch , Task > senderAction )
100- {
101- var batches = new List < ServiceBusMessageBatch > ( ) ;
102- var batch = await dispatches . Sender . CreateMessageBatchAsync ( token ) ;
103- batches . Add ( batch ) ;
104- foreach ( var messageToSend in dispatches . Messages )
105- {
106- ServiceBusMeter . IncrementSentCounter (
107- 1 ,
108- dispatches . Sender . ClientType . ToString ( ) ,
109- dispatches . Sender . Name ,
110- messageToSend . Message . ApplicationProperties [ UserProperties . PayloadTypeIdProperty ] ? . ToString ( )
111- ) ;
112-
113- if ( batch . TryAddMessage ( messageToSend . Message ) )
114- {
115- continue ;
116- }
117- batch = await dispatches . Sender . CreateMessageBatchAsync ( token ) ;
118- batches . Add ( batch ) ;
119- if ( batch . TryAddMessage ( messageToSend . Message ) )
120- {
121- continue ;
122- }
123-
124- throw new ArgumentOutOfRangeException ( "A message is too big to fit in a single batch" ) ;
125- }
126-
127- foreach ( var pageMessages in batches )
128- {
129- await senderAction . Invoke ( dispatches . Sender , pageMessages ) ;
130- pageMessages . Dispose ( ) ;
67+ await _serviceBusMessageSender . SendMessages ( messagesPerResource . ResourceId , messagesPerResource . Messages , token ) ;
13168 }
13269 }
13370
@@ -151,132 +88,10 @@ public async Task ScheduleDispatches(IEnumerable<Abstractions.Dispatch> messageP
15188 throw new ArgumentNullException ( nameof ( messagePayloads ) ) ;
15289 }
15390
154- var dispatches = CreateMessagesToSend ( messagePayloads ) ;
91+ var dispatches = _messageFactory . CreateMessagesToSend ( messagePayloads ) ;
15592 foreach ( var messagesPerResource in dispatches )
15693 {
157- await PaginateAndSendMessages ( messagesPerResource , async ( sender , page ) =>
158- {
159- await sender . ScheduleMessagesAsync ( page , scheduledEnqueueTime , token ) ;
160- } ) ;
161- }
162- }
163-
164- private async Task PaginateAndSendMessages ( MessagesPerResource dispatches , Func < IMessageSender , IEnumerable < ServiceBusMessage > , Task > senderAction )
165- {
166- var paginatedMessages = dispatches . Messages . Select ( o => o . Message )
167- . Select ( ( x , i ) => new
168- {
169- Item = x ,
170- Index = i
171- } )
172- . GroupBy ( x => x . Index / MaxMessagePerSend , x => x . Item ) ;
173-
174- foreach ( var pageMessages in paginatedMessages )
175- {
176- foreach ( var message in pageMessages )
177- {
178- ServiceBusMeter . IncrementSentCounter (
179- 1 ,
180- dispatches . Sender . ClientType . ToString ( ) ,
181- dispatches . Sender . Name ,
182- message . ApplicationProperties [ UserProperties . PayloadTypeIdProperty ] ? . ToString ( )
183- ) ;
184- }
185-
186- await senderAction . Invoke ( dispatches . Sender , pageMessages . Select ( m => m ) . ToArray ( ) ) ;
187- }
188- }
189-
190- private class MessagesPerResource
191- {
192- public MessageToSend [ ] Messages { get ; set ; }
193- public ClientType ClientType { get ; set ; }
194- public string ResourceId { get ; set ; }
195- public IMessageSender Sender { get ; set ; }
196- }
197-
198- private class MessageToSend
199- {
200- public MessageToSend ( ServiceBusMessage message , MessageDispatchRegistration registration )
201- {
202- Message = message ;
203- Registration = registration ;
204- }
205-
206- public ServiceBusMessage Message { get ; }
207- public MessageDispatchRegistration Registration { get ; }
208- }
209-
210- private MessagesPerResource [ ] CreateMessagesToSend ( IEnumerable < Abstractions . Dispatch > messagePayloads )
211- {
212- var dispatches =
213- (
214- from dispatch in messagePayloads
215- // the same dispatch can be published to several senders
216- let registrations = _dispatchRegistry . GetDispatchRegistrations ( dispatch . Payload . GetType ( ) )
217- from eventPublicationRegistration in registrations
218- let message = CreateMessage ( eventPublicationRegistration , dispatch )
219- select new MessageToSend ( message , eventPublicationRegistration )
220- )
221- . ToArray ( ) ;
222-
223- var messagesPerResource = (
224- from dispatch in dispatches
225- group dispatch by new { dispatch . Registration . Options . ClientType , dispatch . Registration . Options . ResourceId } into gr
226- let sender = _registry . GetMessageSender ( gr . Key . ClientType , gr . Key . ResourceId )
227- select new MessagesPerResource ( )
228- {
229- Messages = gr . ToArray ( ) ,
230- ClientType = gr . Key . ClientType ,
231- ResourceId = gr . Key . ResourceId ,
232- Sender = sender
233- } ) . ToArray ( ) ;
234-
235- return messagesPerResource ;
236- }
237-
238- private ServiceBusMessage CreateMessage (
239- MessageDispatchRegistration registration ,
240- Abstractions . Dispatch dispatch )
241- {
242- var result = _messagePayloadSerializer . SerializeBody ( dispatch . Payload ) ;
243- var message = MessageHelper . CreateMessage ( result . ContentType , result . Body , registration . PayloadTypeId ) ;
244-
245- dispatch . ApplicationProperties . Remove ( UserProperties . PayloadTypeIdProperty ) ;
246- foreach ( var dispatchApplicationProperty in dispatch . ApplicationProperties )
247- {
248- message . ApplicationProperties [ dispatchApplicationProperty . Key ] = dispatchApplicationProperty . Value ;
249- }
250-
251- message . SessionId = dispatch . SessionId ;
252-
253- var originalCorrelationId = _messageMetadataAccessor . Metadata ? . CorrelationId ?? Guid . NewGuid ( ) . ToString ( ) ;
254- message . CorrelationId = dispatch . CorrelationId ?? originalCorrelationId ;
255-
256- var originalIsolationKey = _messageMetadataAccessor . Metadata ? . ApplicationProperties . GetIsolationKey ( ) ;
257- message . SetIsolationKey ( originalIsolationKey ?? _serviceBusOptions . Settings . IsolationSettings . IsolationKey ) ;
258-
259- var originalIsolationApps = _messageMetadataAccessor . Metadata ? . ApplicationProperties . GetIsolationApps ( ) ?? [ ] ;
260- message . SetIsolationApps ( originalIsolationApps ) ;
261-
262- if ( dispatch . DiagnosticId != null )
263- {
264- message . SetDiagnosticIdIfIsNot ( dispatch . DiagnosticId ) ;
265- }
266- if ( ! string . IsNullOrWhiteSpace ( dispatch . MessageId ) )
267- {
268- message . MessageId = dispatch . MessageId ;
269- }
270-
271- foreach ( var customizer in registration . OutgoingMessageCustomizers )
272- {
273- customizer ? . Invoke ( message , dispatch . Payload ) ;
274- }
275-
276- foreach ( var dispatchCustomizer in _dispatchCustomizers )
277- {
278- dispatchCustomizer . ExtendDispatch ( message , dispatch . Payload ) ;
94+ await _serviceBusMessageSender . ScheduleMessages ( messagesPerResource , scheduledEnqueueTime , token ) ;
27995 }
280- return message ;
28196 }
28297}
0 commit comments