Skip to content

Commit c0257ec

Browse files
committed
Merge pull request #244 from localshred/abrandoned/broker_probs
Abrandoned/broker probs
2 parents a7c8203 + b9fb942 commit c0257ec

5 files changed

Lines changed: 52 additions & 19 deletions

File tree

.rubocop.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ Lint/EndAlignment:
66
Lint/Loop:
77
Enabled: false
88

9+
Metrics/ClassLength:
10+
Enabled: false
11+
912
Style/CaseIndentation:
1013
IndentWhenRelativeTo: end
1114

lib/protobuf/rpc/connectors/zmq.rb

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,25 @@ def send_request
5959
# Private Instance methods
6060
#
6161
def check_available_rcv_timeout
62-
@check_available_rcv_timeout ||= [ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT"].to_i, 200].max
62+
@check_available_rcv_timeout ||= begin
63+
case
64+
when ENV.key?("PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT") then
65+
ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT"].to_i
66+
else
67+
200 # ms
68+
end
69+
end
6370
end
6471

6572
def check_available_snd_timeout
66-
@check_available_snd_timeout ||= [ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT"].to_i, 200].max
73+
@check_available_snd_timeout ||= begin
74+
case
75+
when ENV.key?("PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT") then
76+
ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT"].to_i
77+
else
78+
200 # ms
79+
end
80+
end
6781
end
6882

6983
def close_connection

lib/protobuf/rpc/servers/zmq/broker.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def initialize(server)
2323

2424
def run
2525
@idle_workers = []
26+
@running = true
2627

2728
loop do
2829
process_local_queue
@@ -39,10 +40,11 @@ def run
3940
end
4041
ensure
4142
teardown
43+
@running = false
4244
end
4345

4446
def running?
45-
@server.running?
47+
@running && @server.running?
4648
end
4749

4850
private

lib/protobuf/rpc/servers/zmq/server.rb

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ def broadcast_beacons?
7676
!brokerless? && options[:broadcast_beacons]
7777
end
7878

79+
def broadcast_busy?
80+
broadcast_beacons? && options[:broadcast_busy]
81+
end
82+
7983
def broadcast_flatline
8084
flatline = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new(
8185
:beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE,
@@ -176,15 +180,11 @@ def reaping_interval
176180

177181
def run
178182
@running = true
179-
180-
start_broker unless brokerless?
181-
start_missing_workers
182-
183183
yield if block_given? # runs on startup
184184
wait_for_shutdown_signal
185185
broadcast_flatline if broadcast_beacons?
186186
Thread.pass until reap_dead_workers.empty?
187-
@broker.join unless brokerless?
187+
@broker_thread.join unless brokerless?
188188
ensure
189189
@running = false
190190
teardown
@@ -246,14 +246,13 @@ def wait_for_shutdown_signal
246246
loop do
247247
break if IO.select([@shutdown_r], nil, nil, timeout)
248248

249-
if reap_dead_workers?
250-
reap_dead_workers
251-
start_missing_workers
252-
end
249+
start_broker unless brokerless?
250+
reap_dead_workers if reap_dead_workers?
251+
start_missing_workers
253252

254253
next unless broadcast_heartbeat?
255254

256-
if options[:broadcast_busy] && all_workers_busy?
255+
if broadcast_busy? && all_workers_busy?
257256
broadcast_flatline
258257
else
259258
broadcast_heartbeat
@@ -285,15 +284,29 @@ def init_zmq_context
285284
end
286285

287286
def start_broker
288-
@broker = Thread.new(self) do |server|
289-
::Protobuf::Rpc::Zmq::Broker.new(server).run
287+
return if @broker && @broker.running? && !@broker_thread.stop?
288+
if @broker && !@broker.running?
289+
broadcast_flatline if broadcast_busy?
290+
@broker_thread.join if @broker_thread
291+
init_zmq_context # need a new context to restart the broker
292+
end
293+
294+
@broker = ::Protobuf::Rpc::Zmq::Broker.new(self)
295+
@broker_thread = Thread.new(@broker) do |broker|
296+
begin
297+
broker.run
298+
rescue => e
299+
message = "Broker failed: #{e.inspect}\n #{e.backtrace.join($INPUT_RECORD_SEPARATOR)}"
300+
$stderr.puts(message)
301+
logger.error { message }
302+
end
290303
end
291304
end
292305

293306
def start_worker
294-
@workers << Thread.new(self) do |server|
307+
@workers << Thread.new(self, @broker) do |server, broker|
295308
begin
296-
::Protobuf::Rpc::Zmq::Worker.new(server).run
309+
::Protobuf::Rpc::Zmq::Worker.new(server, broker).run
297310
rescue => e
298311
message = "Worker failed: #{e.inspect}\n #{e.backtrace.join($INPUT_RECORD_SEPARATOR)}"
299312
$stderr.puts(message)

lib/protobuf/rpc/servers/zmq/worker.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ class Worker
1212
##
1313
# Constructor
1414
#
15-
def initialize(server)
15+
def initialize(server, broker)
1616
@server = server
17+
@broker = broker
1718

1819
init_zmq_context
1920
init_backend_socket
@@ -61,7 +62,7 @@ def run
6162
end
6263

6364
def running?
64-
@server.running?
65+
@broker.running? && @server.running?
6566
end
6667

6768
private

0 commit comments

Comments
 (0)