Skip to content

Commit ff0bf73

Browse files
author
Anouar Hassine
committed
Enhancing the way we get typed message from received event
1 parent a10349e commit ff0bf73

7 files changed

Lines changed: 110 additions & 21 deletions

File tree

ReactiveXComponent/Common/MessageEventArgs.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Newtonsoft.Json.Linq;
2-
using ReactiveXComponent.Common;
32
using ReactiveXComponent.Serializer;
43

54
namespace ReactiveXComponent.Common
@@ -8,25 +7,28 @@ namespace ReactiveXComponent.Common
87

98
public class MessageEventArgs : EventArgs
109
{
11-
public MessageEventArgs(StateMachineRefHeader stateMachineRefHeader, object messageReceived)
10+
public MessageEventArgs(StateMachineRefHeader stateMachineRefHeader, object messageReceived, SerializationType serializationType)
1211
{
1312
StateMachineRefHeader = stateMachineRefHeader;
1413
MessageReceived = messageReceived;
14+
SerializationType = serializationType;
1515
}
1616

1717
public StateMachineRefHeader StateMachineRefHeader { get; }
1818

1919
public object MessageReceived { get; }
2020

21-
public T GetMessage<T>(SerializationType serializationType) where T : class
21+
public SerializationType SerializationType { get; }
22+
23+
public T GetMessage<T>() where T : class
2224
{
23-
if (serializationType == SerializationType.Binary)
25+
if (SerializationType == SerializationType.Binary)
2426
{
2527
return MessageReceived as T;
2628
}
2729

28-
if (serializationType == SerializationType.Json
29-
|| serializationType == SerializationType.Bson)
30+
if (SerializationType == SerializationType.Json
31+
|| SerializationType == SerializationType.Bson)
3032
{
3133
var jResult = MessageReceived as JObject;
3234
return jResult?.ToObject<T>();

ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class RabbitMqSnapshotManager : IDisposable
2424
private readonly string _privateCommunicationIdentifier;
2525
private readonly ISerializer _serializer;
2626
private readonly string _component;
27+
private SerializationType _serializationType;
2728

2829
private readonly ConcurrentDictionary<SubscriptionKey, RabbitMqSubscriberInfos> _subscribers;
2930

@@ -40,6 +41,7 @@ public RabbitMqSnapshotManager(IConnection connection, string component, IXCConf
4041
_serializer = serializer;
4142
_privateCommunicationIdentifier = privateCommunicationIdentifier;
4243
_subscribers = new ConcurrentDictionary<SubscriptionKey, RabbitMqSubscriberInfos>();
44+
InitSerializationType();
4345
CreateSnapshotChannel(connection);
4446
InitObservableCollection();
4547
}
@@ -246,7 +248,7 @@ private void DispatchMessage(BasicDeliverEventArgs basicAckEventArgs)
246248

247249
var message = Encoding.UTF8.GetString(decompressedMessage.ToArray());
248250

249-
var msgEventArgs = new MessageEventArgs(stateMachineRefHeader, message);
251+
var msgEventArgs = new MessageEventArgs(stateMachineRefHeader, message, _serializationType);
250252

251253
OnSnapshotReceived(msgEventArgs);
252254
}
@@ -266,7 +268,7 @@ private void OnSnapshotReceived(MessageEventArgs e)
266268
StateCode = element.StateCode
267269
};
268270

269-
var messageEventArgs = new MessageEventArgs(stateMachineRefHeader, element.PublicMember);
271+
var messageEventArgs = new MessageEventArgs(stateMachineRefHeader, element.PublicMember, _serializationType);
270272
stateMachineInstances.Add(messageEventArgs);
271273
}
272274
SnapshotReceived?.Invoke(this, stateMachineInstances);
@@ -289,6 +291,25 @@ private void UnsubscribeSnapshot(string stateMachine)
289291
rabbitMqSubscriberInfos.Channel.Close();
290292
}
291293

294+
private void InitSerializationType()
295+
{
296+
var serialization = _xcConfiguration.GetSerializationType();
297+
298+
switch (serialization)
299+
{
300+
case XCApiTags.Binary:
301+
_serializationType = SerializationType.Binary;
302+
break;
303+
case XCApiTags.Json:
304+
_serializationType = SerializationType.Json;
305+
break;
306+
case XCApiTags.Bson:
307+
_serializationType = SerializationType.Bson;
308+
break;
309+
default:
310+
throw new XCSerializationException("Serialization type not supported");
311+
}
312+
}
292313

293314
#region IDisposable implementation
294315

ReactiveXComponent/RabbitMq/RabbitMqSubscriber.cs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class RabbitMqSubscriber : IXCSubscriber
2020
private readonly string _privateCommunicationIdentifier;
2121
private readonly ISerializer _serializer;
2222
private readonly string _component;
23+
private SerializationType _serializationType;
2324

2425
private readonly ConcurrentDictionary<SubscriptionKey, RabbitMqSubscriberInfos> _subscribersDico;
2526
private readonly ConcurrentDictionary<StreamSubscriptionKey, IDisposable> _streamSubscriptionsDico;
@@ -36,6 +37,7 @@ public RabbitMqSubscriber(string component, IXCConfiguration xcConfiguration, IC
3637
_streamSubscriptionsDico = new ConcurrentDictionary<StreamSubscriptionKey, IDisposable>();
3738
_privateCommunicationIdentifier = privateCommunicationIdentifier;
3839
_serializer = serializer;
40+
InitSerializationType();
3941
InitObservableCollection();
4042
}
4143

@@ -129,8 +131,8 @@ private void InitSubscriber(string stateMachine, string privateCommunicationIden
129131
bool createExchangeChannel = false;
130132
RabbitMqSubscriberInfos rabbitMqSubscriberInfos = null;
131133

132-
EventHandler<BasicDeliverEventArgs> handler = (o, basicAckEventArgs) =>
133-
{
134+
EventHandler<BasicDeliverEventArgs> handler = (o, basicAckEventArgs) =>
135+
{
134136
var stateMachineRefHeader = RabbitMqHeaderConverter.ConvertStateMachineRefHeader(basicAckEventArgs.BasicProperties.Headers);
135137

136138
if (stateMachineRefHeader.StateMachineCode == stateMachineCode)
@@ -139,9 +141,9 @@ private void InitSubscriber(string stateMachine, string privateCommunicationIden
139141

140142
var obj = _serializer.Deserialize(new MemoryStream(basicAckEventArgs.Body));
141143

142-
var msgEventArgs = new MessageEventArgs(stateMachineRefHeader, obj);
144+
var msgEventArgs = new MessageEventArgs(stateMachineRefHeader, obj, _serializationType);
143145

144-
OnMessageReceived(msgEventArgs);
146+
OnMessageReceived(msgEventArgs);
145147
}
146148
};
147149

@@ -184,7 +186,7 @@ private void InitSubscriber(string stateMachine, string privateCommunicationIden
184186
{
185187
// Update the existing subscription for that routing key to subscribe the new handler..
186188
rabbitMqSubscriberInfos.AddHandler(handler);
187-
}
189+
}
188190
}
189191
}
190192
catch (OperationInterruptedException e)
@@ -193,6 +195,26 @@ private void InitSubscriber(string stateMachine, string privateCommunicationIden
193195
}
194196
}
195197

198+
private void InitSerializationType()
199+
{
200+
var serialization = _xcConfiguration.GetSerializationType();
201+
202+
switch (serialization)
203+
{
204+
case XCApiTags.Binary:
205+
_serializationType = SerializationType.Binary;
206+
break;
207+
case XCApiTags.Json:
208+
_serializationType = SerializationType.Json;
209+
break;
210+
case XCApiTags.Bson:
211+
_serializationType = SerializationType.Bson;
212+
break;
213+
default:
214+
throw new XCSerializationException("Serialization type not supported");
215+
}
216+
}
217+
196218
private void CreateExchangeChannel(string exchangeName, string routingKey, out IModel channel, out EventingBasicConsumer subscriber)
197219
{
198220
if (_connection == null || !_connection.IsOpen)

ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Newtonsoft.Json;
88
using ReactiveXComponent.Common;
99
using ReactiveXComponent.Configuration;
10+
using ReactiveXComponent.Serializer;
1011

1112
namespace ReactiveXComponent.WebSocket
1213
{
@@ -18,6 +19,7 @@ public class WebSocketSnapshotManager : IDisposable
1819
private readonly IXCConfiguration _xcConfiguration;
1920
private readonly ConcurrentDictionary<SubscriptionKey, EventHandler<WebSocketMessageEventArgs>> _subscriptions;
2021
private readonly ConcurrentDictionary<SubscriptionKey, IDisposable> _streamSubscriptionsDico;
22+
private SerializationType _serializationType;
2123

2224
private event EventHandler<List<MessageEventArgs>> SnapshotReceived;
2325

@@ -31,6 +33,7 @@ public WebSocketSnapshotManager(string component, IWebSocketClient webSocketClie
3133
_privateCommunicationIdentifier = privateCommunicationIdentifier;
3234
_subscriptions = new ConcurrentDictionary<SubscriptionKey, EventHandler<WebSocketMessageEventArgs>>();
3335
_streamSubscriptionsDico = new ConcurrentDictionary<SubscriptionKey, IDisposable>();
36+
InitSerializationType();
3437

3538
_snapshotStream = Observable.FromEvent<EventHandler<List<MessageEventArgs>>, List<MessageEventArgs>>(
3639
handler => (sender, e) => handler(e),
@@ -133,7 +136,7 @@ private void CreateSnapshotReplyHandler(string replyTopic, out EventHandler<WebS
133136
StateCode = element.StateCode
134137
};
135138

136-
var messageEventArgs = new MessageEventArgs(stateMachineRefHeader, element.PublicMember);
139+
var messageEventArgs = new MessageEventArgs(stateMachineRefHeader, element.PublicMember, _serializationType);
137140
stateMachineInstances.Add(messageEventArgs);
138141
}
139142

@@ -191,6 +194,26 @@ private void UnsubscribeAll()
191194
_streamSubscriptionsDico.Clear();
192195
}
193196

197+
private void InitSerializationType()
198+
{
199+
var serialization = _xcConfiguration.GetSerializationType();
200+
201+
switch (serialization)
202+
{
203+
case XCApiTags.Binary:
204+
_serializationType = SerializationType.Binary;
205+
break;
206+
case XCApiTags.Json:
207+
_serializationType = SerializationType.Json;
208+
break;
209+
case XCApiTags.Bson:
210+
_serializationType = SerializationType.Bson;
211+
break;
212+
default:
213+
throw new XCSerializationException("Serialization type not supported");
214+
}
215+
}
216+
194217
#region IDisposable implementation
195218

196219
private bool _disposed;

ReactiveXComponent/WebSocket/WebSocketSubscriber.cs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
using System;
22
using System.Collections.Concurrent;
3-
using System.Collections.Generic;
4-
using System.Linq;
53
using System.Reactive.Linq;
6-
using System.Text;
7-
using System.Threading.Tasks;
84
using ReactiveXComponent.Common;
95
using ReactiveXComponent.Configuration;
106
using ReactiveXComponent.Connection;
@@ -18,6 +14,7 @@ public class WebSocketSubscriber : IXCSubscriber
1814
private readonly IWebSocketClient _webSocketClient;
1915
private readonly IXCConfiguration _xcConfiguration;
2016
private readonly string _privateCommunicationIdentifier;
17+
private SerializationType _serializationType;
2118

2219
private readonly ConcurrentDictionary<SubscriptionKey, EventHandler<WebSocketMessageEventArgs>> _subscriptionsDico;
2320
private readonly ConcurrentDictionary<StreamSubscriptionKey, IDisposable> _streamSubscriptionsDico;
@@ -30,6 +27,7 @@ public WebSocketSubscriber(string component, IWebSocketClient webSocketClient, I
3027
_webSocketClient = webSocketClient;
3128
_xcConfiguration = xcConfiguration;
3229
_privateCommunicationIdentifier = privateCommunicationIdentifier;
30+
InitSerializationType();
3331

3432
_subscriptionsDico = new ConcurrentDictionary<SubscriptionKey, EventHandler<WebSocketMessageEventArgs>>();
3533
_streamSubscriptionsDico = new ConcurrentDictionary<StreamSubscriptionKey, IDisposable>();
@@ -143,7 +141,7 @@ private void InitSubscription(string stateMachine, bool isPrivate = false)
143141
{
144142
var receivedObject = WebSocketMessageHelper.DeserializeString(receivedPacket.JsonMessage);
145143

146-
var message = new MessageEventArgs(stateMachineRefHeader, receivedObject);
144+
var message = new MessageEventArgs(stateMachineRefHeader, receivedObject, _serializationType);
147145

148146
MessageReceived?.Invoke(this, message);
149147
}
@@ -244,6 +242,26 @@ private T GetOptionalValue<T>(T? optionalValue) where T : struct
244242
return optionalValue ?? default(T);
245243
}
246244

245+
private void InitSerializationType()
246+
{
247+
var serialization = _xcConfiguration.GetSerializationType();
248+
249+
switch (serialization)
250+
{
251+
case XCApiTags.Binary:
252+
_serializationType = SerializationType.Binary;
253+
break;
254+
case XCApiTags.Json:
255+
_serializationType = SerializationType.Json;
256+
break;
257+
case XCApiTags.Bson:
258+
_serializationType = SerializationType.Bson;
259+
break;
260+
default:
261+
throw new XCSerializationException("Serialization type not supported");
262+
}
263+
}
264+
247265
#region IDisposable implementation
248266

249267
private bool _disposed;

ReactiveXComponentTest/Common/MessageEventArgsCastTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ public void GetMessageTest(SerializationType serializationType)
2727
stream.Position = 0;
2828

2929
var deserializedObject = serializer.Deserialize(stream);
30-
var messageEventArgs = new MessageEventArgs(null, deserializedObject);
31-
var resultObject = messageEventArgs.GetMessage<TestObject>(serializationType);
30+
var messageEventArgs = new MessageEventArgs(null, deserializedObject, serializationType);
31+
var resultObject = messageEventArgs.GetMessage<TestObject>();
3232
Check.That(resultObject).IsNotNull();
3333
Check.That(resultObject).IsEqualTo(testObject);
3434
}

ReactiveXComponentTest/WebSocket/WebSocketTests.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public void SubscriberTest()
3636
xcConfiguration.GetStateMachineCode(componentName, stateMachineName).Returns(x => stateMachineCode);
3737
xcConfiguration.GetSubscriberTopic(componentName, stateMachineName).Returns(x => subscriberPublicTopic);
3838
xcConfiguration.GetSnapshotTopic(componentName).Returns(x => snapshotTopic);
39+
xcConfiguration.GetSerializationType().Returns(XCApiTags.Json);
3940

4041
var webSocketClient = Substitute.For<IWebSocketClient>();
4142
webSocketClient.IsOpen.Returns(true);
@@ -191,6 +192,7 @@ public void PublisherSendMessageTest(bool isPrivate, bool withStateMachineRef)
191192
xcConfiguration.GetStateMachineCode(componentName, stateMachineName).Returns(x => stateMachineCode);
192193
xcConfiguration.GetPublisherTopic(componentName, stateMachineName).Returns(x => publisherTopic);
193194
xcConfiguration.GetPublisherEventCode("System.String").Returns(9);
195+
xcConfiguration.GetSerializationType().Returns(XCApiTags.Json);
194196

195197
var webSocketClient = Substitute.For<IWebSocketClient>();
196198
webSocketClient.IsOpen.Returns(true);
@@ -270,6 +272,7 @@ public void SnapshotTest()
270272
xcConfiguration.GetStateMachineCode(componentName, stateMachineName).Returns(x => stateMachineCode);
271273
xcConfiguration.GetPublisherTopic(componentName, stateMachineName).Returns(x => publisherTopic);
272274
xcConfiguration.GetSnapshotTopic(componentName).Returns(x => snapshotTopic);
275+
xcConfiguration.GetSerializationType().Returns(XCApiTags.Json);
273276

274277
var snapshotReplyTopic = string.Empty;
275278
var webSocketClient = Substitute.For<IWebSocketClient>();

0 commit comments

Comments
 (0)