-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathclient.rb
More file actions
264 lines (228 loc) · 9.33 KB
/
client.rb
File metadata and controls
264 lines (228 loc) · 9.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# frozen_string_literal: false
require 'socket'
require 'openssl'
require 'uri'
require 'timeout'
module SplitIoClient
module SSE
module EventSource
class Client
DEFAULT_READ_TIMEOUT = 70
CONNECT_TIMEOUT = 30_000
OK_CODE = 200
KEEP_ALIVE_RESPONSE = "c\r\n:keepalive\n\n\r\n".freeze
ERROR_EVENT_TYPE = 'error'.freeze
def initialize(config,
api_key,
telemetry_runtime_producer,
event_parser,
notification_manager_keeper,
notification_processor,
status_queue,
read_timeout: DEFAULT_READ_TIMEOUT)
@config = config
@api_key = api_key
@telemetry_runtime_producer = telemetry_runtime_producer
@event_parser = event_parser
@notification_manager_keeper = notification_manager_keeper
@notification_processor = notification_processor
@status_queue = status_queue
@read_timeout = read_timeout
@connected = Concurrent::AtomicBoolean.new(false)
@first_event = Concurrent::AtomicBoolean.new(true)
@socket = nil
end
def close(status = nil)
unless connected?
@config.logger.debug('SSEClient already disconected.')
return
end
@config.logger.debug("Closing SSEClient socket")
push_status(status)
@connected.make_false
@socket.sync_close = true if @socket.is_a? OpenSSL::SSL::SSLSocket
@socket.close
@config.logger.debug("SSEClient socket state #{@socket.state}") if @socket.is_a? OpenSSL::SSL::SSLSocket
rescue StandardError => e
@config.logger.error("SSEClient close Error: #{e.inspect}")
end
def start(url)
if connected?
@config.logger.debug('SSEClient already running.')
return true
end
@uri = URI(url)
latch = Concurrent::CountDownLatch.new(1)
connect_thread(latch)
return false unless latch.wait(CONNECT_TIMEOUT)
connected?
rescue StandardError => e
@config.logger.error("SSEClient start Error: #{e.inspect}")
connected?
end
def connected?
@connected.value
end
private
def connect_thread(latch)
@config.threads[:connect_stream] = Thread.new do
@config.logger.info('Starting connect_stream thread ...')
new_status = connect_stream(latch)
push_status(new_status)
@config.logger.info('connect_stream thread finished.')
end
end
def connect_stream(latch)
return Constants::PUSH_NONRETRYABLE_ERROR unless socket_write(latch)
while connected? || @first_event.value
begin
if IO.select([@socket], nil, nil, @read_timeout)
begin
partial_data = @socket.readpartial(10_000)
read_first_event(partial_data, latch)
raise 'eof exception' if partial_data == :eof
rescue IO::WaitReadable => e
@config.logger.debug("SSE client IO::WaitReadable transient error: #{e.inspect}")
IO.select([@socket], nil, nil, @read_timeout)
retry
rescue Errno::EAGAIN => e
@config.logger.debug("SSE client transient error: #{e.inspect}")
IO.select([@socket], nil, nil, @read_timeout)
retry
rescue Errno::ETIMEDOUT => e
@config.logger.error("SSE read operation timed out!: #{e.inspect}")
return Constants::PUSH_RETRYABLE_ERROR
rescue EOFError => e
puts "SSE read operation EOF Exception!: #{e.inspect}"
@config.logger.error("SSE read operation EOF Exception!: #{e.inspect}")
raise 'eof exception'
rescue Errno::EBADF, IOError => e
@config.logger.error("SSE read operation EBADF or IOError: #{e.inspect}")
return Constants::PUSH_RETRYABLE_ERROR
rescue StandardError => e
@config.logger.error("SSE read operation StandardError: #{e.inspect}")
return nil if ENV['SPLITCLIENT_ENV'] == 'test'
@config.logger.error("Error reading partial data: #{e.inspect}")
return Constants::PUSH_RETRYABLE_ERROR
end
else
@config.logger.error("SSE read operation timed out, no data available.")
return Constants::PUSH_RETRYABLE_ERROR
end
rescue Errno::EBADF
@config.logger.debug("SSE socket is not connected (Errno::EBADF)")
break
rescue RuntimeError
raise 'eof exception'
rescue Exception => e
@config.logger.debug("SSE socket is not connected: #{e.inspect}")
break
end
process_data(partial_data)
end
@config.logger.info("SSE read operation exited: #{connected?}")
nil
end
def socket_write(latch)
@first_event.make_true
@socket = socket_connect
@socket.puts(build_request(@uri))
true
rescue StandardError => e
@config.logger.error("Error during connecting to #{@uri.host}. Error: #{e.inspect}")
latch.count_down
false
end
def read_first_event(data, latch)
return unless @first_event.value
response_code = @event_parser.first_event(data)
@config.logger.debug("SSE client first event code: #{response_code}")
error_event = false
events = @event_parser.parse(data)
events.each { |e| error_event = true if e.event_type == ERROR_EVENT_TYPE }
@first_event.make_false
if response_code == OK_CODE && !error_event
@connected.make_true
@config.logger.debug("SSE client first event Connected is true")
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SSE_CONNECTION_ESTABLISHED, nil)
push_status(Constants::PUSH_CONNECTED)
end
latch.count_down
end
def socket_connect
tcp_socket = TCPSocket.new(@uri.host, @uri.port)
if @uri.scheme.casecmp('https').zero?
begin
ssl_context = OpenSSL::SSL::SSLContext.new
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
ssl_socket.hostname = @uri.host
begin
ssl_socket.connect_nonblock
rescue IO::WaitReadable
IO.select([ssl_socket])
retry
rescue IO::WaitWritable
IO.select(nil, [ssl_socket])
retry
end
return ssl_socket
rescue Exception => e
@config.logger.error("socket connect error: #{e.inspect}")
return nil
end
end
tcp_socket
end
def process_data(partial_data)
@config.logger.debug("Event partial data: #{partial_data}")
return if partial_data.nil? || partial_data == KEEP_ALIVE_RESPONSE
events = @event_parser.parse(partial_data)
events.each { |event| process_event(event) }
rescue StandardError => e
@config.logger.error("process_data error: #{e.inspect}")
end
def build_request(uri)
req = "GET #{uri.request_uri} HTTP/1.1\r\n"
req << "Host: #{uri.host}\r\n"
req << "Accept: text/event-stream\r\n"
req << "SplitSDKVersion: #{@config.language}-#{@config.version}\r\n"
req << "SplitSDKMachineIP: #{@config.machine_ip}\r\n"
req << "SplitSDKMachineName: #{@config.machine_name}\r\n"
req << "SplitSDKClientKey: #{@api_key.split(//).last(4).join}\r\n" unless @api_key.nil?
req << "Cache-Control: no-cache\r\n\r\n"
@config.logger.debug("Request info: #{req}")
req
end
def process_event(event)
case event.event_type
when ERROR_EVENT_TYPE
dispatch_error(event)
else
dispatch_event(event)
end
end
def dispatch_error(event)
@config.logger.error("Event error: #{event.event_type}, #{event.data}")
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::ABLY_ERROR, event.data['code'])
if event.data['code'] >= 40_140 && event.data['code'] <= 40_149
close(Constants::PUSH_RETRYABLE_ERROR)
elsif event.data['code'] >= 40_000 && event.data['code'] <= 49_999
close(Constants::PUSH_NONRETRYABLE_ERROR)
end
end
def dispatch_event(event)
if event.occupancy?
@notification_manager_keeper.handle_incoming_occupancy_event(event)
else
@notification_processor.process(event)
end
end
def push_status(status)
return if status.nil?
@config.logger.debug("Pushing new sse status: #{status}")
@status_queue.push(status)
end
end
end
end
end