Skip to content

Commit 7452e8a

Browse files
committed
add configuration ENV vars for the values that have been hard-coded
in the connector for zmq and the broker for zmq the poll weight influences how many messages will be pulled from the polling socket before it moves on to check the other socket, this is currently 2/1 weighted in favor of the backend socket (the work complete) Also made a configuration value of the number of attempts to make on getting the server uri on the client side
1 parent c5d73e9 commit 7452e8a

2 files changed

Lines changed: 38 additions & 8 deletions

File tree

lib/protobuf/rpc/connectors/zmq.rb

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,7 @@ def create_socket
7575

7676
if socket # Make sure the context builds the socket
7777
socket.setsockopt(::ZMQ::LINGER, 0)
78-
79-
logger.debug { sign_message("Establishing connection: #{server_uri}") }
8078
zmq_error_check(socket.connect(server_uri), :socket_connect)
81-
logger.debug { sign_message("Connection established to #{server_uri}") }
8279

8380
if first_alive_load_balance?
8481
begin
@@ -110,7 +107,7 @@ def error?
110107
# to the host and port in the options
111108
#
112109
def lookup_server_uri
113-
15.times do
110+
server_lookup_attempts.times do
114111
service_directory.all_listings_for(service).each do |listing|
115112
host = listing.try(:address)
116113
port = listing.try(:port)
@@ -121,7 +118,7 @@ def lookup_server_uri
121118
port = options[:port]
122119
return "tcp://#{host}:#{port}" if host_alive?(host)
123120

124-
sleep (5.0/100.0)
121+
sleep (1.0/100.0)
125122
end
126123

127124
raise "Host not found for service #{service}"
@@ -131,7 +128,7 @@ def host_alive?(host)
131128
return true unless ping_port_enabled?
132129

133130
if (last_response = self.class.ping_port_responses[host])
134-
if (Time.now.to_i - last_response[:at]) <= 2
131+
if (Time.now.to_i - last_response[:at]) <= host_alive_check_interval
135132
return last_response[:ping_port_open]
136133
end
137134
end
@@ -144,6 +141,10 @@ def host_alive?(host)
144141
ping_port_open
145142
end
146143

144+
def host_alive_check_interval
145+
@host_alive_check_interval ||= [ENV["PB_ZMQ_CLIENT_HOST_ALIVE_CHECK_INTERVAL"].to_i, 1].max
146+
end
147+
147148
def ping_port_open?(host)
148149
socket = TCPSocket.new(host, ping_port.to_i)
149150
socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
@@ -197,6 +198,10 @@ def send_request_with_timeout(timeout, attempt = 0)
197198
logger.debug { sign_message("Socket closed") }
198199
end
199200

201+
def server_lookup_attempts
202+
@server_lookup_attempts ||= [ENV["PB_ZMQ_CLIENT_SERVER_LOOKUP_ATTEMPTS"].to_i, 5].max
203+
end
204+
200205
# The service we're attempting to connect to
201206
#
202207
def service

lib/protobuf/rpc/servers/zmq/broker.rb

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,26 @@ def run
3333
# Something went wrong
3434
break if rc == -1
3535

36-
process_backend if @poller.readables.include?(@backend_socket)
36+
readables_include_backend = @poller.readables.include?(@backend_socket)
37+
message_count_read_from_backend = 0
38+
39+
while readables_include_backend && message_count_read_from_backend < backend_poll_weight do
40+
message_count_read_from_backend = message_count_read_from_backend + 1
41+
process_backend
42+
@poller.poll_nonblock
43+
readables_include_backend = @poller.readables.include?(@backend_socket)
44+
end
45+
3746
process_local_queue # Fair ordering so queued requests get in before new requests
38-
process_frontend if @poller.readables.include?(@frontend_socket)
47+
readables_include_frontend = @poller.readables.include?(@frontend_socket)
48+
message_count_read_from_frontend = 0
49+
50+
while readables_include_frontend && message_count_read_from_frontend < frontend_poll_weight do
51+
message_count_read_from_frontend = message_count_read_from_frontend + 1
52+
process_frontend
53+
@poller.poll_nonblock
54+
readables_include_frontend = @poller.readables.include?(@frontend_socket)
55+
end
3956
end
4057
ensure
4158
teardown
@@ -47,6 +64,14 @@ def running?
4764

4865
private
4966

67+
def backend_poll_weight
68+
@backend_poll_weight ||= [ENV["PB_ZMQ_SERVER_BACKEND_POLL_WEIGHT"].to_i, 2].max
69+
end
70+
71+
def frontend_poll_weight
72+
@frontend_poll_weight ||= [ENV["PB_ZMQ_SERVER_FRONTEND_POLL_WEIGHT"].to_i, 1].max
73+
end
74+
5075
def init_backend_socket
5176
@backend_socket = @zmq_context.socket(ZMQ::ROUTER)
5277
zmq_error_check(@backend_socket.bind(@server.backend_uri))

0 commit comments

Comments
 (0)