Skip to content

Commit 0a6a86e

Browse files
committed
break from frontend poll if queue is full
1 parent f11bc3f commit 0a6a86e

1 file changed

Lines changed: 6 additions & 1 deletion

File tree

lib/protobuf/rpc/servers/zmq/broker.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def run
5050
while readables_include_frontend && message_count_read_from_frontend < frontend_poll_weight do
5151
message_count_read_from_frontend = message_count_read_from_frontend + 1
5252
process_frontend
53+
break unless local_queue_available? # no need to read frontend just to throw away messages, will prioritize backend when full
5354
@poller.poll_nonblock
5455
readables_include_frontend = @poller.readables.include?(@frontend_socket)
5556
end
@@ -104,6 +105,10 @@ def inproc?
104105
!!@server.try(:inproc?)
105106
end
106107

108+
def local_queue_available?
109+
local_queue.size < local_queue_max_size
110+
end
111+
107112
def local_queue_max_size
108113
@local_queue_max_size ||= [ENV["PB_ZMQ_SERVER_QUEUE_MAX_SIZE"].to_i, 5].max
109114
end
@@ -122,7 +127,7 @@ def process_frontend
122127
address, _, message, *frames = read_from_frontend
123128

124129
if message == ::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE
125-
if local_queue.size < local_queue_max_size
130+
if local_queue_available?
126131
write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE])
127132
else
128133
write_to_frontend([address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE])

0 commit comments

Comments
 (0)