Skip to content

Commit 8ff347e

Browse files
committed
add global request and response filter
1 parent f5cb6ab commit 8ff347e

4 files changed

Lines changed: 83 additions & 17 deletions

File tree

README.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ The [AzureBusServer](src\ServiceStack.AzureServiceBus\AzureBusServer.cs) has the
4747
- `int` **RetryCount** - How many times a message should be retried before sending to the DLQ.
4848
- `string` **connectionString** - The connection string to the Azure Service Bus namespace
4949
- `IMessageFactory` **MessageFactory** - the MQ Message Factory used by this MQ Server
50+
- `Func<IMessage, IMessage>` **RequestFilter** - Execute global transformation or custom logic before a request is processed. Must be thread-safe.
51+
- `Func<object, object>` **ResponseFilter** - Execute global transformation or custom logic on the response. Must be thread-safe.
5052
- `Action<QueueDescription>` **CreateQueueFilter** - A filter to customize the options Azure Queues are created/updated with.
5153
- `Action<string, BrokeredMessage>` **GetMessageFilter** - Called every time a message is received.
5254
- `Action<string, BrokeredMessage, IMessage>` **PublishMessageFilter** - Called every time a message gets published.
@@ -66,9 +68,7 @@ mqServer.RegisterHandler<Hello>(m => { .. }, noOfThreads:4);
6668

6769
> Behind the scenes, it delegates the work to Azure Service Bus [event-driven message pump](https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.messagereceiver.onmessage?view=azureservicebus-4.1.1#Microsoft_ServiceBus_Messaging_MessageReceiver_OnMessage_System_Action_Microsoft_ServiceBus_Messaging_BrokeredMessage__Microsoft_ServiceBus_Messaging_OnMessageOptions_).
6870
69-
### Filters
70-
71-
#### Create Queue Filter
71+
### Create Queue Filter
7272

7373
To modify the options a queue gets created with, provide a `CreateQueueFilter` filter and modify the `QueueDescription`.
7474

@@ -83,7 +83,7 @@ For instance, to change the default TTL and have messages expire automatically a
8383

8484
If the queue already exists, any change to the queue options will result in an update.
8585

86-
#### Message Filters
86+
### Message Filters
8787

8888
There are optional `PublishMessageFilter` and `GetMessageFilter` callbacks which can be used to intercept outgoing and incoming messages. The Type name of the message body that was published is available in the `Label` property, e.g:
8989

@@ -111,5 +111,4 @@ Note that `brokeredMsg` parameter of `GetMessageFilter` when explicitly retrievi
111111
## Upcoming Features
112112

113113
- [ ] queue whitelisting
114-
- [ ] request and response global filter
115114
- [ ] error handler

src/ServiceStack.AzureServiceBus/AzureBusServer.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,18 @@ public Action<string, BrokeredMessage, IMessage> PublishMessageFilter
7272
set => messageFactory.PublishMessageFilter = value;
7373
}
7474

75+
/// <summary>
76+
/// Execute global transformation or custom logic before a request is processed.
77+
/// Must be thread-safe.
78+
/// </summary>
79+
public Func<IMessage, IMessage> RequestFilter { get; set; }
80+
81+
/// <summary>
82+
/// Execute global transformation or custom logic on the response.
83+
/// Must be thread-safe.
84+
/// </summary>
85+
public Func<object, object> ResponseFilter { get; set; }
86+
7587
private int status;
7688

7789
public AzureBusServer(string connectionString): this(new AzureBusMessageFactory(connectionString))
@@ -142,6 +154,8 @@ protected IMessageHandlerFactory CreateMessageHandlerFactory<T>(Func<IMessage<T>
142154
{
143155
return new MessageHandlerFactory<T>(this, processMessageFn, processExceptionEx)
144156
{
157+
RequestFilter = RequestFilter,
158+
ResponseFilter = ResponseFilter,
145159
RetryCount = RetryCount,
146160
};
147161
}

tests/ServiceStack.AzureServiceBus.Tests/AzureBusServerTests.cs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,16 @@ public class Hello : IReturn<HelloResponse>
155155
{
156156
public string Name { get; set; }
157157
}
158-
public class HelloNull : IReturn<HelloResponse>
158+
public class HelloNull1 : IReturn<HelloResponse>
159159
{
160160
public string Name { get; set; }
161161
}
162+
163+
public class HelloNull2 : IReturn<HelloResponse>
164+
{
165+
public string Name { get; set; }
166+
}
167+
162168
public class HelloResponse
163169
{
164170
public string Result { get; set; }
@@ -307,8 +313,11 @@ public async Task Messages_with_null_Response_is_published_to_OutMQ()
307313
int msgsReceived = 0;
308314
using (var mqServer = CreateMqServer())
309315
{
310-
await mqServer.MessageFactory.PurgeQueueAsync<HelloNull>();
311-
mqServer.RegisterHandler<HelloNull>(m =>
316+
await mqServer.MessageFactory.PurgeQueuesAsync(
317+
QueueNames<HelloNull1>.In,
318+
QueueNames<HelloNull1>.Out
319+
);
320+
mqServer.RegisterHandler<HelloNull1>(m =>
312321
{
313322
Interlocked.Increment(ref msgsReceived);
314323
return null;
@@ -318,12 +327,12 @@ public async Task Messages_with_null_Response_is_published_to_OutMQ()
318327

319328
using (var mqClient = mqServer.CreateMessageQueueClient())
320329
{
321-
mqClient.Publish(new HelloNull { Name = "Into the Void" });
330+
mqClient.Publish(new HelloNull1 { Name = "Into the Void" });
322331

323-
var msg = mqClient.Get<HelloNull>(QueueNames<HelloNull>.Out, TimeSpan.FromSeconds(10));
332+
var msg = mqClient.Get<HelloNull1>(QueueNames<HelloNull1>.Out, TimeSpan.FromSeconds(10));
324333
Assert.That(msg, Is.Not.Null);
325334

326-
HelloNull response = msg.GetBody();
335+
HelloNull1 response = msg.GetBody();
327336

328337
Thread.Sleep(100);
329338

@@ -339,8 +348,11 @@ public async Task Messages_with_null_Response_is_published_to_ReplyMQ()
339348
int msgsReceived = 0;
340349
using (var mqServer = CreateMqServer())
341350
{
342-
await mqServer.MessageFactory.PurgeQueuesAsync(QueueNames<HelloNull>.In);
343-
mqServer.RegisterHandler<HelloNull>(m =>
351+
await mqServer.MessageFactory.PurgeQueuesAsync(
352+
QueueNames<HelloNull2>.In,
353+
QueueNames<HelloNull2>.Out
354+
);
355+
mqServer.RegisterHandler<HelloNull2>(m =>
344356
{
345357
Interlocked.Increment(ref msgsReceived);
346358
return null;
@@ -351,16 +363,16 @@ public async Task Messages_with_null_Response_is_published_to_ReplyMQ()
351363
using (var mqClient = mqServer.CreateMessageQueueClient())
352364
{
353365
var replyMq = mqClient.GetTempQueueName();
354-
mqClient.Publish(new Message<HelloNull>(new HelloNull { Name = "Into the Void" })
366+
mqClient.Publish(new Message<HelloNull2>(new HelloNull2 { Name = "Into the Void" })
355367
{
356368
ReplyTo = replyMq
357369
});
358370

359-
var msg = mqClient.Get<HelloNull>(replyMq, TimeSpan.FromSeconds(10));
371+
var msg = mqClient.Get<HelloNull2>(replyMq, TimeSpan.FromSeconds(10));
360372

361-
await Task.Delay(200);
373+
await Task.Delay(100);
362374

363-
HelloNull response = msg.GetBody();
375+
HelloNull2 response = msg.GetBody();
364376
Assert.That(response.Name, Is.EqualTo("Into the Void"));
365377
Assert.That(msgsReceived, Is.EqualTo(1));
366378
}

tests/ServiceStack.AzureServiceBus.Tests/MqServerIntroTests.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,5 +417,46 @@ await Task.WhenAll(
417417
appHost.Resolve<IMessageService>().Dispose();
418418
}
419419
}
420+
421+
[Test]
422+
public async Task Global_request_and_response_get_called()
423+
{
424+
using (var mqServer = CreateMqServer())
425+
{
426+
await Task.WhenAll(
427+
mqServer.MessageFactory.PurgeQueueAsync<HelloIntro>(),
428+
mqServer.MessageFactory.PurgeQueuesAsync(QueueNames<HelloIntroResponse>.In));
429+
430+
var azureServer = mqServer as AzureBusServer;
431+
azureServer.RequestFilter = msg =>
432+
{
433+
msg.Meta["prefix"] = "Global";
434+
return msg;
435+
};
436+
437+
azureServer.ResponseFilter = response =>
438+
{
439+
if (response is HelloIntroResponse helloResp)
440+
{
441+
helloResp.Result += "!!!";
442+
}
443+
444+
return response;
445+
};
446+
447+
mqServer.RegisterHandler<HelloIntro>(m =>
448+
new HelloIntroResponse { Result = $"Hello, {m.Meta["prefix"]} {m.GetBody().Name}".Fmt() });
449+
mqServer.Start();
450+
451+
using (var mqClient = mqServer.CreateMessageQueueClient())
452+
{
453+
mqClient.Publish(new HelloIntro { Name = "World" });
454+
455+
IMessage<HelloIntroResponse> responseMsg = mqClient.Get<HelloIntroResponse>(QueueNames<HelloIntroResponse>.In);
456+
mqClient.Ack(responseMsg);
457+
Assert.That(responseMsg.GetBody().Result, Is.EqualTo("Hello, Global World!!!"));
458+
}
459+
}
460+
}
420461
}
421462
}

0 commit comments

Comments
 (0)