Skip to content

Commit bd80281

Browse files
committed
Improved processing with configured rate limits
1 parent c184c05 commit bd80281

1 file changed

Lines changed: 9 additions & 2 deletions

File tree

tb_device_mqtt.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,10 +491,12 @@ def _on_message(self, client, userdata, message):
491491

492492
def _on_decoded_message(self, content, message):
493493
if message.topic.startswith(RPC_REQUEST_TOPIC):
494+
self._messages_rate_limit.increase_rate_limit_counter()
494495
request_id = message.topic[len(RPC_REQUEST_TOPIC):len(message.topic)]
495496
if self.__device_on_server_side_rpc_response:
496497
self.__device_on_server_side_rpc_response(request_id, content)
497498
elif message.topic.startswith(RPC_RESPONSE_TOPIC):
499+
self._messages_rate_limit.increase_rate_limit_counter()
498500
with self._lock:
499501
request_id = int(message.topic[len(RPC_RESPONSE_TOPIC):len(message.topic)])
500502
if self.__device_client_rpc_dict.get(request_id):
@@ -504,6 +506,7 @@ def _on_decoded_message(self, content, message):
504506
if callback is not None:
505507
callback(request_id, content, None)
506508
elif message.topic == ATTRIBUTES_TOPIC:
509+
self._messages_rate_limit.increase_rate_limit_counter()
507510
dict_results = []
508511
with self._lock:
509512
# callbacks for everything
@@ -524,6 +527,7 @@ def _on_decoded_message(self, content, message):
524527
for res in dict_results:
525528
res(content, None)
526529
elif message.topic.startswith(ATTRIBUTES_TOPIC_RESPONSE):
530+
self._messages_rate_limit.increase_rate_limit_counter()
527531
with self._lock:
528532
req_id = int(message.topic[len(ATTRIBUTES_TOPIC + "/response/"):])
529533
# pop callback and use it
@@ -537,6 +541,7 @@ def _on_decoded_message(self, content, message):
537541
callback(content, None)
538542

539543
if message.topic.startswith("v1/devices/me/attributes"):
544+
self._messages_rate_limit.increase_rate_limit_counter()
540545
self.firmware_info = loads(message.payload)
541546
if "/response/" in message.topic:
542547
self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info, dict) else {}
@@ -734,7 +739,8 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
734739
else:
735740
log.debug("Waiting for rate limit to be released...")
736741
log_posted = True
737-
sleep(.01)
742+
if limit_reached_check:
743+
sleep(.005)
738744
if waited:
739745
log.debug("Rate limit released, sending data to ThingsBoard...")
740746

@@ -749,7 +755,8 @@ def wait_until_current_queued_messages_processed(self):
749755
log.debug("Waiting for messages to be processed by paho client, current queue size - %r, max inflight messages: %r",
750756
current_out_messages, inflight_messages)
751757
previous_notification_time = int(monotonic())
752-
sleep(.001)
758+
if current_out_messages >= inflight_messages:
759+
sleep(.001)
753760

754761
def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
755762
msg_rate_limit=None, dp_rate_limit=None):

0 commit comments

Comments
 (0)