Skip to content

Commit c67c62c

Browse files
fredericcarreAnouar Hassine
authored andcommitted
fix issues with websocket (#80)
Fixing issues with websocket
1 parent 3553658 commit c67c62c

5 files changed

Lines changed: 56 additions & 17 deletions

File tree

ReactiveXComponent/Connection/IXCPublisher.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ namespace ReactiveXComponent.Connection
77
{
88
public interface IXCPublisher : IDisposable
99
{
10+
void SendEvent(string stateMachine, object message, string messageType, Visibility visibility = Visibility.Public);
1011
void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public);
1112
void SendEvent(StateMachineRefHeader stateMachineRefHeader, object message, Visibility visibility = Visibility.Public);
1213
List<MessageEventArgs> GetSnapshot(string stateMachine, int timeout = 10000);

ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ public RabbitMqPublisher(string component, IXCConfiguration configuration, IConn
3232

3333
#region IXCPublisher implementation
3434

35-
public void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public)
35+
public void SendEvent(string stateMachine, object message, string messageType, Visibility visibility = Visibility.Public)
3636
{
37-
var header = CreateHeader(_component, stateMachine, message, visibility);
37+
var header = CreateHeader(_component, stateMachine, message, messageType, visibility);
3838

3939
if (header == null) return;
4040

@@ -47,6 +47,11 @@ public void SendEvent(string stateMachine, object message, Visibility visibility
4747
Send(message, routingKey, prop);
4848
}
4949

50+
public void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public)
51+
{
52+
SendEvent(stateMachine, message, message?.GetType().ToString() ?? string.Empty, visibility);
53+
}
54+
5055
public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object message, Visibility visibility = Visibility.Public)
5156
{
5257
if (stateMachineRefHeader == null) return;
@@ -87,10 +92,9 @@ private void CreatePublisherChannel(IConnection connection)
8792
_publisherChannel.ExchangeDeclare(_exchangeName, ExchangeType.Topic);
8893
}
8994

90-
private Header CreateHeader(string component, string stateMachine, object message, Visibility visibility)
95+
private Header CreateHeader(string component, string stateMachine, object message, string messageType, Visibility visibility)
9196
{
9297
var defaultValue = -1;
93-
var messageType = message?.GetType().ToString() ?? string.Empty;
9498

9599
if (_configuration == null)
96100
{

ReactiveXComponent/WebSocket/WebSocketPublisher.cs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,23 @@ public WebSocketPublisher(string component, IWebSocketClient webSocketClient, IX
2626
#region IXCPublisher implementation
2727

2828
public void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public)
29+
{
30+
SendEvent(stateMachine, message, message?.GetType().ToString(), visibility);
31+
}
32+
33+
public void SendEvent(string stateMachine, object message, string messageType, Visibility visibility = Visibility.Public)
2934
{
3035
if (!_webSocketClient.IsOpen) return;
31-
32-
var inputHeader = CreateWebSocketHeader(stateMachine, message, visibility);
36+
37+
var inputHeader = CreateWebSocketHeader(stateMachine, messageType, visibility);
3338
var componentCode = _xcConfiguration.GetComponentCode(_component);
3439
var topic = _xcConfiguration.GetPublisherTopic(_component, stateMachine);
3540
var webSocketRequest = WebSocketMessageHelper.SerializeRequest(
36-
WebSocketCommand.Input,
37-
inputHeader,
38-
message,
39-
componentCode.ToString(),
40-
topic);
41+
WebSocketCommand.Input,
42+
inputHeader,
43+
message,
44+
componentCode.ToString(),
45+
topic);
4146

4247
_webSocketClient.Send(webSocketRequest);
4348
}
@@ -71,15 +76,14 @@ public void GetSnapshotAsync(string stateMachine, Action<List<MessageEventArgs>>
7176

7277
#endregion
7378

74-
private WebSocketEngineHeader CreateWebSocketHeader(string stateMachine, object message, Visibility visibility = Visibility.Public)
79+
private WebSocketEngineHeader CreateWebSocketHeader(string stateMachine, string messageType, Visibility visibility = Visibility.Public)
7580
{
76-
var messageType = message?.GetType();
7781
var webSocketEngineHeader = new WebSocketEngineHeader
7882
{
7983
ComponentCode = _xcConfiguration.GetComponentCode(_component),
8084
StateMachineCode = _xcConfiguration.GetStateMachineCode(_component, stateMachine),
81-
EventCode = _xcConfiguration.GetPublisherEventCode(messageType?.ToString()),
82-
MessageType = messageType?.ToString(),
85+
EventCode = _xcConfiguration.GetPublisherEventCode(messageType),
86+
MessageType = messageType,
8387
PublishTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier)? _privateCommunicationIdentifier : null
8488
};
8589

ReactiveXComponent/WebSocket/WebSocketXCApiManager.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.IO;
4+
using System.IO.Compression;
35
using System.Reactive;
46
using System.Reactive.Linq;
7+
using System.Text;
58
using System.Threading;
69
using Newtonsoft.Json;
710
using ReactiveXComponent.Common;
@@ -62,8 +65,18 @@ public List<string> GetXCApiList(string requestId = null, TimeSpan ? timeout = n
6265

6366
return result;
6467
}
68+
69+
private string UnzipString(byte[] data)
70+
{
71+
using (var memoryStream = new MemoryStream(data, false))
72+
using (var gzipStream = new GZipStream(memoryStream, CompressionMode.Decompress))
73+
using (var reader = new StreamReader(gzipStream, Encoding.UTF8))
74+
{
75+
return reader.ReadToEnd();
76+
}
77+
}
6578

66-
79+
6780
public string GetXCApi(string apiFullName, string requestId = null, TimeSpan ? timeout = null)
6881
{
6982
var delay = timeout ?? TimeSpan.FromSeconds(10);
@@ -73,10 +86,11 @@ public string GetXCApi(string apiFullName, string requestId = null, TimeSpan ? t
7386
{
7487
if (response.RequestId == requestId && response.ApiFound)
7588
{
76-
result = response.ApiName;
89+
result = UnzipString(Convert.FromBase64String(response.Content));
7790
lockEvent.Set();
7891
}
7992
});
93+
8094
var request = new WebSocketXCApiCommand
8195
{
8296
Command = WebSocketCommand.GetXCApi,

ReactiveXComponent/XComponentApi.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using ReactiveXComponent.Configuration;
44
using ReactiveXComponent.Connection;
55
using ReactiveXComponent.Parser;
6+
using ReactiveXComponent.WebSocket;
67

78
namespace ReactiveXComponent
89
{
@@ -20,6 +21,16 @@ private XComponentApi(Stream xcApiStream, string privateCommunicationIdentifier
2021
_xcConnection = connectionFactory.CreateConnection(connectionType);
2122
}
2223

24+
private XComponentApi(Stream xcApiStream, WebSocketEndpoint webSocketEndpoint, string privateCommunicationIdentifier = null)
25+
{
26+
var parser = new XCApiConfigParser();
27+
var xcConfiguration = new XCConfiguration(parser);
28+
xcConfiguration.Init(xcApiStream);
29+
_xcConnection = new WebSocketConnection(xcConfiguration, webSocketEndpoint, 10000, privateCommunicationIdentifier);
30+
}
31+
32+
33+
2334
public static IXComponentApi CreateFromXCApi(string xcApiFilePath, string privateCommunicationIdentifier = null)
2435
{
2536
if (!File.Exists(xcApiFilePath))
@@ -38,6 +49,11 @@ public static IXComponentApi CreateFromXCApi(Stream xcApiStream, string privateC
3849
return new XComponentApi(xcApiStream, privateCommunicationIdentifier);
3950
}
4051

52+
public static IXComponentApi CreateFromXCApi(Stream xcApiStream, WebSocketEndpoint webSocketEndpoint, string privateCommunicationIdentifier = null)
53+
{
54+
return new XComponentApi(xcApiStream, webSocketEndpoint, privateCommunicationIdentifier);
55+
}
56+
4157
public IXCSession CreateSession(ConfigurationOverrides configurationOverrides = null)
4258
{
4359
return _xcConnection.CreateSession(configurationOverrides);

0 commit comments

Comments
 (0)