Skip to content

Commit 236bcd2

Browse files
committed
Merge pull request #218 from localshred/abrandoned/config_zmq
Abrandoned/config zmq
2 parents c5d73e9 + eeb4e29 commit 236bcd2

2 files changed

Lines changed: 51 additions & 9 deletions

File tree

lib/protobuf/rpc/connectors/zmq.rb

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,7 @@ def create_socket
7575

7676
if socket # Make sure the context builds the socket
7777
socket.setsockopt(::ZMQ::LINGER, 0)
78-
79-
logger.debug { sign_message("Establishing connection: #{server_uri}") }
8078
zmq_error_check(socket.connect(server_uri), :socket_connect)
81-
logger.debug { sign_message("Connection established to #{server_uri}") }
8279

8380
if first_alive_load_balance?
8481
begin
@@ -110,7 +107,7 @@ def error?
110107
# to the host and port in the options
111108
#
112109
def lookup_server_uri
113-
15.times do
110+
server_lookup_attempts.times do
114111
service_directory.all_listings_for(service).each do |listing|
115112
host = listing.try(:address)
116113
port = listing.try(:port)
@@ -121,7 +118,7 @@ def lookup_server_uri
121118
port = options[:port]
122119
return "tcp://#{host}:#{port}" if host_alive?(host)
123120

124-
sleep (5.0/100.0)
121+
sleep (1.0/100.0)
125122
end
126123

127124
raise "Host not found for service #{service}"
@@ -131,7 +128,7 @@ def host_alive?(host)
131128
return true unless ping_port_enabled?
132129

133130
if (last_response = self.class.ping_port_responses[host])
134-
if (Time.now.to_i - last_response[:at]) <= 2
131+
if (Time.now.to_i - last_response[:at]) <= host_alive_check_interval
135132
return last_response[:ping_port_open]
136133
end
137134
end
@@ -144,6 +141,10 @@ def host_alive?(host)
144141
ping_port_open
145142
end
146143

144+
def host_alive_check_interval
145+
@host_alive_check_interval ||= [ENV["PB_ZMQ_CLIENT_HOST_ALIVE_CHECK_INTERVAL"].to_i, 1].max
146+
end
147+
147148
def ping_port_open?(host)
148149
socket = TCPSocket.new(host, ping_port.to_i)
149150
socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
@@ -197,6 +198,10 @@ def send_request_with_timeout(timeout, attempt = 0)
197198
logger.debug { sign_message("Socket closed") }
198199
end
199200

201+
def server_lookup_attempts
202+
@server_lookup_attempts ||= [ENV["PB_ZMQ_CLIENT_SERVER_LOOKUP_ATTEMPTS"].to_i, 5].max
203+
end
204+
200205
# The service we're attempting to connect to
201206
#
202207
def service

lib/protobuf/rpc/servers/zmq/broker.rb

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ def run
3333
# Something went wrong
3434
break if rc == -1
3535

36-
process_backend if @poller.readables.include?(@backend_socket)
36+
check_and_process_backend
3737
process_local_queue # Fair ordering so queued requests get in before new requests
38-
process_frontend if @poller.readables.include?(@frontend_socket)
38+
check_and_process_frontend
3939
end
4040
ensure
4141
teardown
@@ -47,6 +47,39 @@ def running?
4747

4848
private
4949

50+
def backend_poll_weight
51+
@backend_poll_weight ||= [ENV["PB_ZMQ_SERVER_BACKEND_POLL_WEIGHT"].to_i, 1].max
52+
end
53+
54+
def check_and_process_backend
55+
readables_include_backend = @poller.readables.include?(@backend_socket)
56+
message_count_read_from_backend = 0
57+
58+
while readables_include_backend && message_count_read_from_backend < backend_poll_weight do
59+
message_count_read_from_backend += 1
60+
process_backend
61+
@poller.poll_nonblock
62+
readables_include_backend = @poller.readables.include?(@backend_socket)
63+
end
64+
end
65+
66+
def check_and_process_frontend
67+
readables_include_frontend = @poller.readables.include?(@frontend_socket)
68+
message_count_read_from_frontend = 0
69+
70+
while readables_include_frontend && message_count_read_from_frontend < frontend_poll_weight do
71+
message_count_read_from_frontend += 1
72+
process_frontend
73+
break unless local_queue_available? # no need to read frontend just to throw away messages, will prioritize backend when full
74+
@poller.poll_nonblock
75+
readables_include_frontend = @poller.readables.include?(@frontend_socket)
76+
end
77+
end
78+
79+
def frontend_poll_weight
80+
@frontend_poll_weight ||= [ENV["PB_ZMQ_SERVER_FRONTEND_POLL_WEIGHT"].to_i, 1].max
81+
end
82+
5083
def init_backend_socket
5184
@backend_socket = @zmq_context.socket(ZMQ::ROUTER)
5285
zmq_error_check(@backend_socket.bind(@server.backend_uri))
@@ -79,6 +112,10 @@ def inproc?
79112
!!@server.try(:inproc?)
80113
end
81114

115+
def local_queue_available?
116+
local_queue.size < local_queue_max_size
117+
end
118+
82119
def local_queue_max_size
83120
@local_queue_max_size ||= [ENV["PB_ZMQ_SERVER_QUEUE_MAX_SIZE"].to_i, 5].max
84121
end
@@ -97,7 +134,7 @@ def process_frontend
97134
address, _, message, *frames = read_from_frontend
98135

99136
if message == ::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE
100-
if local_queue.size < local_queue_max_size
137+
if local_queue_available?
101138
write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE])
102139
else
103140
write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE])

0 commit comments

Comments
 (0)