22using System . Collections . Generic ;
33using System . IO ;
44using System . Net . Http ;
5+ using System . Text ;
56using System . Text . Json ;
67using System . Threading ;
78using System . Threading . Tasks ;
@@ -115,6 +116,7 @@ public class ServerEventsClient : ISSEClient
115116
116117 protected string _host ;
117118 protected int _port ;
119+ protected Uri _sseUri ;
118120 protected int _nodeVersion = 1 ;
119121
120122 public ServerEventsClient ( )
@@ -150,6 +152,31 @@ public ServerEventsClient(string host, int port, int nodeVersion = 2) : this()
150152 _nodeVersion = nodeVersion ;
151153 }
152154
155+ /// <summary>
156+ /// Instantiate the class indicating the full SSE stream URL of a node.
157+ /// Example: https://node.testnet.casper.network/events
158+ /// </summary>
159+ /// <param name="sseUrl">Full URL of the SSE stream.</param>
160+ /// <remarks>Use this constructor when the node is running version 2.x of the Casper protocol.</remarks>
161+ public ServerEventsClient ( string sseUrl ) : this ( )
162+ {
163+ if ( string . IsNullOrWhiteSpace ( sseUrl ) )
164+ throw new ArgumentException ( "SSE URL cannot be null or empty." , nameof ( sseUrl ) ) ;
165+
166+ if ( ! Uri . TryCreate ( sseUrl , UriKind . Absolute , out var uri ) )
167+ throw new ArgumentException ( "SSE URL is not a valid absolute URI." , nameof ( sseUrl ) ) ;
168+
169+ if ( uri . Scheme != Uri . UriSchemeHttp && uri . Scheme != Uri . UriSchemeHttps )
170+ throw new ArgumentException ( "SSE URL must use the HTTP or HTTPS scheme." , nameof ( sseUrl ) ) ;
171+
172+ _sseUri = uri ;
173+ _host = uri . Host ;
174+ _port = uri . IsDefaultPort
175+ ? ( uri . Scheme == Uri . UriSchemeHttps ? 443 : 80 )
176+ : uri . Port ;
177+ _nodeVersion = 2 ;
178+ }
179+
153180 public int NodeVersion
154181 {
155182 get { return _nodeVersion ; }
@@ -300,10 +327,44 @@ public bool IsRunning()
300327 protected virtual HttpClient _getHttpClient ( )
301328 {
302329 var client = new HttpClient ( ) ;
303- client . BaseAddress = new Uri ( $ "http://{ _host } :{ _port } ") ;
330+ client . BaseAddress = _sseUri != null
331+ ? new Uri ( _sseUri . GetLeftPart ( UriPartial . Authority ) )
332+ : new Uri ( $ "http://{ _host } :{ _port } ") ;
304333 return client ;
305334 }
306335
336+ private Uri BuildStreamUri ( HttpClient client , ChannelType channelType , int ? startFrom )
337+ {
338+ UriBuilder uriBuilder ;
339+ if ( _sseUri != null )
340+ {
341+ uriBuilder = new UriBuilder ( _sseUri ) ;
342+ }
343+ else
344+ {
345+ uriBuilder = new UriBuilder ( new Uri ( client . BaseAddress +
346+ $ "events" +
347+ ( _nodeVersion == 1 ? $ "/{ channelType . ToString ( ) . ToLowerInvariant ( ) } " : "" ) ) ) ;
348+ }
349+
350+ var queryBuilder = new StringBuilder ( ) ;
351+ var existingQuery = uriBuilder . Query ;
352+ if ( ! string . IsNullOrWhiteSpace ( existingQuery ) )
353+ {
354+ queryBuilder . Append ( existingQuery . TrimStart ( '?' ) ) ;
355+ if ( queryBuilder . Length > 0 )
356+ queryBuilder . Append ( '&' ) ;
357+ }
358+
359+ if ( startFrom != null && startFrom != int . MaxValue )
360+ queryBuilder . Append ( $ "start_from={ startFrom } ") ;
361+ else
362+ queryBuilder . Append ( "start_from=0" ) ;
363+
364+ uriBuilder . Query = queryBuilder . ToString ( ) ;
365+ return uriBuilder . Uri ;
366+ }
367+
307368 private Task ListenChannelAsync ( ChannelType channelType , int ? startFrom , CancellationToken cancelToken )
308369 {
309370 var task = Task . Run ( async ( ) =>
@@ -317,17 +378,10 @@ private Task ListenChannelAsync(ChannelType channelType, int? startFrom, Cancell
317378 {
318379 try
319380 {
320- var uriBuilder = new UriBuilder ( new Uri ( client . BaseAddress +
321- $ "events" +
322- ( _nodeVersion == 1 ? $ "/{ channelType . ToString ( ) . ToLowerInvariant ( ) } " : "" ) ) ) ;
323-
324- if ( startFrom != null && startFrom != int . MaxValue )
325- uriBuilder . Query = $ "start_from={ startFrom } ";
326- else
327- uriBuilder . Query = $ "start_from={ 0 } ";
328-
381+ var streamUri = BuildStreamUri ( client , channelType , startFrom ) ;
382+
329383 using ( var streamReader =
330- new StreamReader ( await client . GetStreamAsync ( uriBuilder . Uri , cancelToken ) ) )
384+ new StreamReader ( await client . GetStreamAsync ( streamUri , cancelToken ) ) )
331385 {
332386 while ( ! streamReader . EndOfStream && ! cancelToken . IsCancellationRequested )
333387 {
@@ -426,4 +480,4 @@ private void EmitEvent(EventData eventData)
426480 }
427481 }
428482 }
429- }
483+ }
0 commit comments