1+ using System ;
2+ using System . Collections . Concurrent ;
3+ using System . Collections . Generic ;
4+ using System . IO ;
5+ using System . Linq ;
6+ using System . Net . Sockets ;
7+ using System . Threading ;
8+ using System . Threading . Tasks ;
9+ using TS3QueryLib . Core . Common ;
10+ using TS3QueryLib . Core . Common . Responses ;
11+
12+ namespace TS3QueryLib . Core
13+ {
14+ public class AwaitableQueryDispatcher : IQueryDispatcher
15+ {
16+ #region Events
17+
18+ /// <summary>
19+ /// Raised when the connection to the server was closed
20+ /// </summary>
21+ public event EventHandler < EventArgs < string > > ConnectionClosed ;
22+
23+ public bool IsDisposed { get ; private set ; }
24+ public int ? LastServerConnectionHandlerId { get ; private set ; }
25+
26+ /// <summary>
27+ /// Raised when a notification was received
28+ /// </summary>
29+ public event EventHandler < EventArgs < string > > NotificationReceived ;
30+
31+ /// <summary>
32+ /// Raised when a ban was detected
33+ /// </summary>
34+ public event EventHandler < EventArgs < SimpleResponse > > BanDetected ;
35+
36+ #endregion
37+
38+ #region Properties
39+
40+ public string Host { get ; }
41+ public int Port { get ; }
42+ protected TimeSpan ? KeepAliveInterval { get ; }
43+ public bool Connected { get ; protected set ; }
44+
45+ private List < string > ReceivedLines { get ; } = new List < string > ( ) ;
46+ private bool AtLeastOneResponseReceived { get ; set ; }
47+ private ConcurrentQueue < string > MessageResponses { get ; } = new ConcurrentQueue < string > ( ) ;
48+ private Task ReadLoopTask { get ; set ; }
49+ private Task KeepAliveTask { get ; set ; }
50+
51+ private TcpClient Client { get ; set ; }
52+ private StreamReader ClientReader { get ; set ; }
53+ private StreamWriter ClientWriter { get ; set ; }
54+ private NetworkStream ClientStream { get ; set ; }
55+ protected SynchronizationContext SyncContext { get ; set ; }
56+
57+ #endregion
58+
59+ #region Constructor
60+
61+ /// <summary>
62+ /// Creates an instance of the current class
63+ /// </summary>
64+ /// <param name="host">The host to connect to</param>
65+ /// <param name="port">The port to connect to</param>
66+ /// <param name="keepAliveInterval">The keep alive interval used to send heart beats in a specific interval to the server to not get timed out (disconnected)</param>
67+ /// <param name="synchronizationContext">The synchronization context on which to raise events.</param>
68+ public AwaitableQueryDispatcher ( string host = null , ushort ? port = null , TimeSpan ? keepAliveInterval = null , SynchronizationContext synchronizationContext = null )
69+ {
70+ Host = host ?? "localhost" ;
71+ Port = port ?? 10011 ;
72+ KeepAliveInterval = keepAliveInterval ;
73+ SyncContext = synchronizationContext ?? SynchronizationContext . Current ;
74+ }
75+
76+ #endregion
77+
78+ #region Public Methods
79+
80+ public ConnectResponse Connect ( )
81+ {
82+ return AsyncHelper . RunSync ( ConnectAsync ) ;
83+ }
84+
85+ public async Task < ConnectResponse > ConnectAsync ( )
86+ {
87+ if ( Client != null )
88+ return new ConnectResponse ( message : "Already connected!" ) ;
89+
90+ Client = new TcpClient ( ) ;
91+ await Client . ConnectAsync ( Host , Port ) . ConfigureAwait ( false ) ;
92+
93+ if ( ! Client . Connected )
94+ throw new IOException ( $ "Could not connect to { Host } on port { Port } .") ;
95+
96+ ReceivedLines . Clear ( ) ;
97+ AtLeastOneResponseReceived = false ;
98+ ClientStream = Client . GetStream ( ) ;
99+ ClientReader = new StreamReader ( ClientStream ) ;
100+ ClientWriter = new StreamWriter ( ClientStream ) { NewLine = "\n " } ;
101+
102+ string message = await ReadLineAsync ( ) . ConfigureAwait ( false ) ;
103+
104+ QueryType queryType ;
105+
106+ if ( message . StartsWith ( "TS3" , StringComparison . OrdinalIgnoreCase ) )
107+ {
108+ queryType = QueryType . Server ;
109+ }
110+ else if ( message . StartsWith ( "TS3 Client" , StringComparison . OrdinalIgnoreCase ) )
111+ {
112+ queryType = QueryType . Client ;
113+ }
114+ else
115+ {
116+ string statusMessage = $ "Invalid greeting received: { message } ";
117+ DisconnectForced ( statusMessage ) ;
118+ return new ConnectResponse ( statusMessage ) ;
119+ }
120+
121+ Connected = true ;
122+ ReadLoopTask = Task . Factory . StartNew ( ReadLoop , TaskCreationOptions . LongRunning ) ;
123+ KeepAliveTask = Task . Factory . StartNew ( KeepAliveLoop , TaskCreationOptions . LongRunning ) ;
124+ return new ConnectResponse ( message , queryType , true ) ;
125+ }
126+
127+ public string Send ( string messageToSend )
128+ {
129+ return AsyncHelper . RunSync ( ( ) => SendAsync ( messageToSend ) ) ;
130+ }
131+
132+ public async Task < string > SendAsync ( string messageToSend )
133+ {
134+ await SendAsync ( ClientWriter , messageToSend ) ;
135+
136+ do
137+ {
138+ if ( MessageResponses . TryDequeue ( out var result ) )
139+ return result ;
140+
141+ await Task . Delay ( TimeSpan . FromMilliseconds ( 10 ) ) . ConfigureAwait ( false ) ;
142+ } while ( Connected ) ;
143+
144+ return null ;
145+ }
146+
147+ protected static async Task SendAsync ( StreamWriter writer , string messageToSend )
148+ {
149+ await writer . WriteLineAsync ( messageToSend ) . ConfigureAwait ( false ) ;
150+ await writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
151+ }
152+
153+ public void Disconnect ( )
154+ {
155+ DisconnectForced ( ) ;
156+ }
157+
158+ public void Dispose ( )
159+ {
160+ Dispose ( true ) ;
161+ GC . SuppressFinalize ( this ) ;
162+ }
163+
164+ public string Dispatch ( string commandText )
165+ {
166+ return Send ( commandText ) ;
167+ }
168+
169+ #endregion
170+
171+ #region Non Public Methods
172+
173+ protected async void KeepAliveLoop ( )
174+ {
175+ while ( Client != null && KeepAliveInterval . HasValue )
176+ {
177+ await Task . Delay ( KeepAliveInterval . Value ) ;
178+ await SendAsync ( ClientWriter , "\n " ) ;
179+ }
180+ }
181+
182+ protected async void ReadLoop ( )
183+ {
184+ while ( Client != null && Client . Connected )
185+ {
186+ string message = await ReadLineAsync ( false ) . ConfigureAwait ( false ) ;
187+
188+ if ( message == null )
189+ continue ;
190+
191+ if ( message . StartsWith ( "error" , StringComparison . CurrentCultureIgnoreCase ) )
192+ {
193+ if ( ! AtLeastOneResponseReceived )
194+ {
195+ AtLeastOneResponseReceived = true ;
196+ // Remove welcome messages after connect
197+ ReceivedLines . Clear ( ) ;
198+ }
199+
200+ string responseText = string . Join ( "\r \n " , ReceivedLines . Concat ( new [ ] { message } ) ) ;
201+ MessageResponses . Enqueue ( responseText ) ;
202+ ReceivedLines . Clear ( ) ;
203+
204+ SimpleResponse response = SimpleResponse . Parse ( responseText ) ;
205+
206+ if ( response . IsBanned )
207+ {
208+ BanDetected ? . Invoke ( this , new EventArgs < SimpleResponse > ( response ) ) ;
209+ DisconnectForced ( "Banned!" ) ;
210+ return ;
211+ }
212+ }
213+ else if ( message . StartsWith ( "notify" , StringComparison . CurrentCultureIgnoreCase ) )
214+ {
215+ ThreadPool . QueueUserWorkItem ( OnNotificationReceived , message ) ;
216+ }
217+ else
218+ {
219+ if ( ! AtLeastOneResponseReceived )
220+ {
221+ const string LastServerConnectionHandlerIdText = "selected schandlerid=" ;
222+
223+ if ( message . StartsWith ( LastServerConnectionHandlerIdText , StringComparison . InvariantCultureIgnoreCase ) && int . TryParse ( message . Substring ( LastServerConnectionHandlerIdText . Length ) . Trim ( ) , out int handlerId ) )
224+ LastServerConnectionHandlerId = handlerId ;
225+ }
226+
227+ ReceivedLines . Add ( message ) ;
228+ }
229+ }
230+ }
231+
232+ protected async Task < string > ReadLineAsync ( bool throwOnEmptyMessage = true )
233+ {
234+ string message = await ClientReader . ReadLineAsync ( ) . ConfigureAwait ( false ) ;
235+
236+ if ( message != null )
237+ return message ;
238+
239+ DisconnectForced ( "Empty message received from server." ) ;
240+
241+ if ( throwOnEmptyMessage )
242+ throw new InvalidOperationException ( "Received no message. Socket got disconnected." ) ;
243+
244+ return null ;
245+ }
246+
247+ protected void OnNotificationReceived ( object notificationText )
248+ {
249+ if ( NotificationReceived != null )
250+ SyncContext . PostEx ( p => NotificationReceived ( ( ( object [ ] ) p ) [ 0 ] , new EventArgs < string > ( Convert . ToString ( ( ( object [ ] ) p ) [ 1 ] ) ) ) , new [ ] { this , notificationText } ) ;
251+ }
252+
253+ private void DisconnectForced ( string reason = null )
254+ {
255+ bool clientWasConnected = Client ? . Connected == true ;
256+ ReceivedLines . Clear ( ) ;
257+ Client ? . Close ( ) ;
258+ ClientStream ? . Dispose ( ) ;
259+ ClientReader ? . Dispose ( ) ;
260+ ClientWriter ? . Dispose ( ) ;
261+
262+ Client = null ;
263+ ClientStream = null ;
264+ ClientReader = null ;
265+ ClientWriter = null ;
266+
267+ Connected = false ;
268+ ReadLoopTask = null ;
269+ KeepAliveTask = null ;
270+
271+ if ( clientWasConnected )
272+ ConnectionClosed ? . Invoke ( this , new EventArgs < string > ( reason ) ) ;
273+ }
274+
275+ protected virtual void Dispose ( bool disposing )
276+ {
277+ if ( ! IsDisposed )
278+ {
279+ if ( disposing )
280+ DisconnectForced ( ) ;
281+ }
282+
283+ IsDisposed = true ;
284+ }
285+
286+ #endregion
287+
288+ #region Embedded Types
289+
290+ public class ConnectResponse
291+ {
292+ public string Greeting { get ; }
293+ public QueryType ? QueryType { get ; }
294+
295+ public bool Success { get ; }
296+ public string Message { get ; set ; }
297+
298+ public ConnectResponse ( string greeting = null , QueryType ? queryType = null , bool success = false , string message = null )
299+ {
300+ Greeting = greeting ;
301+ QueryType = queryType ;
302+ Success = success ;
303+ Message = message ;
304+ }
305+ }
306+
307+ #endregion
308+ }
309+ }
0 commit comments