Skip to content

Commit cb3ffea

Browse files
committed
Add tests coverage and doing small minor refactorings
1 parent fde9b91 commit cb3ffea

2 files changed

Lines changed: 169 additions & 142 deletions

File tree

EventSource4Net.Test/MessagesTest.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,29 @@ public void TestDataSentInParts()
144144
Assert.AreEqual(receivedMessages.Count, 1);
145145
Assert.AreEqual(receivedMessages[0].EventType, "test");
146146
Assert.AreEqual(receivedMessages[0].Data, "simple\n");
147+
}
148+
[TestMethod]
149+
public void TestDataSentMixedWithComments()
150+
{
151+
// setup
152+
TestableEventSource es = SetupAndConnect();
153+
154+
// act
155+
es.Start(cts.Token);
156+
stateIsOpen.WaitOrThrow();
157+
response.WriteTestTextToStream(":coment line\r\n");
158+
response.WriteTestTextToStream("Event: test\n");
159+
response.WriteTestTextToStream(":\r");
160+
response.WriteTestTextToStream("DATA: simple\n");
161+
response.WriteTestTextToStream(":line commented\n");
162+
response.WriteTestTextToStream("retry:9\n \n");
163+
eventReceived.WaitOrThrow();
164+
165+
// assert
166+
Assert.AreEqual(receivedMessages.Count, 1);
167+
Assert.AreEqual(receivedMessages[0].EventType, "test");
168+
Assert.AreEqual(receivedMessages[0].Data, "simple\n");
169+
Assert.AreEqual(receivedMessages[0].Retry, 9);
147170
}
148171
[TestMethod]
149172
public void TestMultipleEvents()

EventSource4Net/ConnectedState.cs

Lines changed: 146 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -1,142 +1,146 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Linq;
4-
using System.Text;
5-
using System.Net;
6-
using System.IO;
7-
using System.Threading.Tasks;
8-
using System.Threading;
9-
10-
namespace EventSource4Net
11-
{
12-
class ConnectedState : IConnectionState
13-
{
14-
private static readonly slf4net.ILogger _logger = slf4net.LoggerFactory.GetLogger(typeof(ConnectedState));
15-
16-
private IWebRequesterFactory mWebRequesterFactory;
17-
private ServerSentEvent mSse = null;
18-
private string mRemainingText = string.Empty; // the text that is not ended with a lineending char is saved for next call.
19-
private IServerResponse mResponse;
20-
public EventSourceState State { get { return EventSourceState.OPEN; } }
21-
22-
public ConnectedState(IServerResponse response, IWebRequesterFactory webRequesterFactory)
23-
{
24-
mResponse = response;
25-
mWebRequesterFactory = webRequesterFactory;
26-
}
27-
28-
public Task<IConnectionState> Run(Action<ServerSentEvent> msgReceived, CancellationToken cancelToken)
29-
{
30-
int i = 0;
31-
32-
Task<IConnectionState> t = new Task<IConnectionState>(() =>
33-
{
34-
//using (mResponse)
35-
{
36-
//using (var stream = mResponse.GetResponseStream())
37-
var stream = mResponse.GetResponseStream();
38-
{
39-
byte[] buffer = new byte[1024 * 8];
40-
var taskRead = stream.ReadAsync(buffer, 0, buffer.Length, cancelToken);
41-
42-
try
43-
{
44-
taskRead.Wait(cancelToken);
45-
}
46-
catch (Exception ex)
47-
{
48-
_logger.Trace(ex, "ConnectedState.Run");
49-
}
50-
if (!cancelToken.IsCancellationRequested && !taskRead.IsFaulted)
51-
{
52-
int bytesRead = taskRead.Result;
53-
if (bytesRead > 0) // stream has not reached the end yet
54-
{
55-
//Console.WriteLine("ReadCallback {0} bytesRead", bytesRead);
56-
string text = Encoding.UTF8.GetString(buffer, 0, bytesRead);
57-
text = mRemainingText + text;
58-
string[] lines = StringSplitter.SplitIntoLines(text, out mRemainingText);
59-
foreach (string line in lines)
60-
{
61-
if (cancelToken.IsCancellationRequested) break;
62-
63-
// Dispatch message if empty lne
64-
if (string.IsNullOrEmpty(line.Trim()) && mSse != null)
65-
{
66-
_logger.Trace("Message received");
67-
msgReceived(mSse);
68-
mSse = null;
69-
}
70-
else if (line.StartsWith(":"))
71-
{
72-
// This a comment, just log it.
73-
_logger.Trace("A comment was received: " + line);
74-
}
75-
else
76-
{
77-
string fieldName = String.Empty;
78-
string fieldValue = String.Empty;
79-
if (line.Contains(':'))
80-
{
81-
int index = line.IndexOf(':');
82-
fieldName = line.Substring(0, index);
83-
fieldValue = line.Substring(index + 1).TrimStart();
84-
}
85-
else
86-
fieldName = line;
87-
88-
if (String.Compare(fieldName, "event", true) == 0)
89-
{
90-
mSse = mSse ?? new ServerSentEvent();
91-
mSse.EventType = fieldValue;
92-
}
93-
else if (String.Compare(fieldName, "data", true) == 0)
94-
{
95-
mSse = mSse ?? new ServerSentEvent();
96-
mSse.Data = fieldValue + '\n';
97-
}
98-
else if (String.Compare(fieldName, "id", true) == 0)
99-
{
100-
mSse = mSse ?? new ServerSentEvent();
101-
mSse.LastEventId = fieldValue;
102-
}
103-
else if (String.Compare(fieldName, "retry", true) == 0)
104-
{
105-
int parsedRetry;
106-
if (int.TryParse(fieldValue, out parsedRetry))
107-
{
108-
mSse = mSse ?? new ServerSentEvent();
109-
mSse.Retry = parsedRetry;
110-
}
111-
}
112-
else
113-
{
114-
// Ignore this, just log it
115-
_logger.Warn("A unknown line was received: " + line);
116-
}
117-
}
118-
}
119-
120-
if (!cancelToken.IsCancellationRequested)
121-
return this;
122-
}
123-
else // end of the stream reached
124-
{
125-
_logger.Trace("No bytes read. End of stream.");
126-
}
127-
}
128-
129-
//stream.Dispose()
130-
//stream.Close();
131-
//mResponse.Close();
132-
//mResponse.Dispose();
133-
return new DisconnectedState(mResponse.ResponseUri, mWebRequesterFactory);
134-
}
135-
}
136-
});
137-
138-
t.Start();
139-
return t;
140-
}
141-
}
142-
}
1+
using System;
2+
using System.Text;
3+
using System.Threading.Tasks;
4+
using System.Threading;
5+
6+
namespace EventSource4Net
7+
{
8+
class ConnectedState : IConnectionState
9+
{
10+
private static readonly slf4net.ILogger _logger = slf4net.LoggerFactory.GetLogger(typeof(ConnectedState));
11+
12+
private readonly IWebRequesterFactory mWebRequesterFactory;
13+
private ServerSentEvent mSse;
14+
private string mRemainingText = string.Empty; // the text that is not ended with a lineending char is saved for next call.
15+
private readonly IServerResponse mResponse;
16+
public EventSourceState State { get { return EventSourceState.OPEN; } }
17+
18+
public ConnectedState(IServerResponse response, IWebRequesterFactory webRequesterFactory)
19+
{
20+
mResponse = response;
21+
mWebRequesterFactory = webRequesterFactory;
22+
}
23+
24+
public Task<IConnectionState> Run(Action<ServerSentEvent> msgReceived, CancellationToken cancelToken)
25+
{
26+
int i = 0;
27+
28+
Task<IConnectionState> t = new Task<IConnectionState>(() =>
29+
{
30+
//using (mResponse)
31+
{
32+
//using (var stream = mResponse.GetResponseStream())
33+
var stream = mResponse.GetResponseStream();
34+
{
35+
byte[] buffer = new byte[1024 * 8];
36+
var taskRead = stream.ReadAsync(buffer, 0, buffer.Length, cancelToken);
37+
38+
try
39+
{
40+
taskRead.Wait(cancelToken);
41+
}
42+
catch (Exception ex)
43+
{
44+
_logger.Trace(ex, "ConnectedState.Run");
45+
}
46+
if (!cancelToken.IsCancellationRequested && !taskRead.IsFaulted)
47+
{
48+
int bytesRead = taskRead.Result;
49+
if (bytesRead > 0) // stream has not reached the end yet
50+
{
51+
//Console.WriteLine("ReadCallback {0} bytesRead", bytesRead);
52+
string text = Encoding.UTF8.GetString(buffer, 0, bytesRead);
53+
text = mRemainingText + text;
54+
string[] lines = StringSplitter.SplitIntoLines(text, out mRemainingText);
55+
foreach (string line in lines)
56+
{
57+
if (cancelToken.IsCancellationRequested) break;
58+
59+
// Dispatch message if empty line
60+
var indexOfColon = line.IndexOf(':');
61+
if (string.IsNullOrWhiteSpace(line) && mSse != null)
62+
{
63+
_logger.Trace("Message received");
64+
msgReceived(mSse);
65+
mSse = null;
66+
}
67+
else if (indexOfColon == 0)
68+
{
69+
// This a comment, just log it.
70+
_logger.Trace("A comment was received: " + line);
71+
}
72+
else
73+
{
74+
string fieldName;
75+
string fieldValue;
76+
if (indexOfColon > 0)
77+
{
78+
fieldName = line.Substring(0, indexOfColon);
79+
fieldValue = line.Substring(indexOfColon + 1).TrimStart();
80+
}
81+
else
82+
{
83+
fieldName = line;
84+
fieldValue = string.Empty;
85+
}
86+
87+
if ("event".Equals(fieldName, StringComparison.InvariantCultureIgnoreCase))
88+
{
89+
mSse = mSse ?? new ServerSentEvent();
90+
mSse.EventType = fieldValue;
91+
}
92+
else if ("data".Equals(fieldName, StringComparison.InvariantCultureIgnoreCase))
93+
{
94+
mSse = mSse ?? new ServerSentEvent();
95+
mSse.Data = fieldValue + '\n';
96+
}
97+
else if ("id".Equals(fieldName, StringComparison.InvariantCultureIgnoreCase))
98+
{
99+
mSse = mSse ?? new ServerSentEvent();
100+
mSse.LastEventId = fieldValue;
101+
}
102+
else if ("retry".Equals(fieldName, StringComparison.InvariantCultureIgnoreCase))
103+
{
104+
int parsedRetry;
105+
if (int.TryParse(fieldValue, out parsedRetry))
106+
{
107+
mSse = mSse ?? new ServerSentEvent();
108+
mSse.Retry = parsedRetry;
109+
}
110+
else
111+
{
112+
// Ignore this, just log it
113+
_logger.Warn("Unreadable retry: " + fieldValue);
114+
}
115+
}
116+
else
117+
{
118+
// Ignore this, just log it
119+
_logger.Warn("A unknown line was received: " + line);
120+
}
121+
}
122+
}
123+
124+
if (!cancelToken.IsCancellationRequested)
125+
return this;
126+
}
127+
else // end of the stream reached
128+
{
129+
_logger.Trace("No bytes read. End of stream.");
130+
}
131+
}
132+
133+
//stream.Dispose()
134+
//stream.Close();
135+
//mResponse.Close();
136+
//mResponse.Dispose();
137+
return new DisconnectedState(mResponse.ResponseUri, mWebRequesterFactory);
138+
}
139+
}
140+
});
141+
142+
t.Start();
143+
return t;
144+
}
145+
}
146+
}

0 commit comments

Comments
 (0)