Skip to content

Commit a32e4dd

Browse files
committed
Updated splitted messages processing to avoid queue size error
1 parent bd80281 commit a32e4dd

1 file changed

Lines changed: 62 additions & 29 deletions

File tree

tb_device_mqtt.py

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from enum import Enum
3030

3131
from paho.mqtt.reasoncodes import ReasonCodes
32+
from paho.mqtt.client import MQTT_ERR_QUEUE_SIZE
3233

3334
from orjson import dumps, loads, JSONDecodeError
3435

@@ -148,8 +149,13 @@ def mid(self):
148149

149150
def get(self):
150151
if isinstance(self.message_info, list):
151-
for info in self.message_info:
152-
info.wait_for_publish(timeout=1)
152+
try:
153+
for info in self.message_info:
154+
info.wait_for_publish(timeout=1)
155+
except Exception as e:
156+
global log
157+
log = logging.getLogger('tb_connection')
158+
log.error("Error while waiting for publish: %s", e)
153159
else:
154160
self.message_info.wait_for_publish(timeout=1)
155161
return self.rc()
@@ -345,6 +351,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
345351
self.__device_sub_dict = {}
346352
self.__device_client_rpc_dict = {}
347353
self.__attr_request_number = 0
354+
self.__error_logged = 0
348355
self.max_payload_size = max_payload_size
349356
self.service_configuration_callback = self.on_service_configuration
350357
telemetry_rate_limit, telemetry_dp_rate_limit = RateLimit.get_rate_limits_by_host(self.__host,
@@ -357,7 +364,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
357364
self._telemetry_dp_rate_limit = RateLimit(telemetry_dp_rate_limit, "Rate limit for telemetry data points")
358365
self.max_inflight_messages_set(self._telemetry_rate_limit.get_minimal_limit())
359366
self.__attrs_request_timeout = {}
360-
self.__timeout_thread = Thread(target=self.__timeout_check)
367+
self.__timeout_thread = Thread(target=self.__timeout_check, name="Timeout check thread")
361368
self.__timeout_thread.daemon = True
362369
self.__timeout_thread.start()
363370
self._client.on_connect = self._on_connect
@@ -671,6 +678,8 @@ def request_service_configuration(self, callback):
671678
self.send_rpc_call("getSessionLimits", {"timeout": 5000}, callback)
672679

673680
def on_service_configuration(self, _, response, *args, **kwargs):
681+
global log
682+
log = logging.getLogger('tb_connection')
674683
if "error" in response:
675684
log.warning("Timeout while waiting for service configuration!, session will use default configuration.")
676685
self.rate_limits_received = True
@@ -694,6 +703,7 @@ def on_service_configuration(self, _, response, *args, **kwargs):
694703
self._telemetry_rate_limit.get_minimal_limit(),
695704
service_config.get('maxInflightMessages', 100)) * 80 / 100)
696705
self.max_inflight_messages_set(max_inflight_messages)
706+
self.max_queued_messages_set(max_inflight_messages)
697707
if service_config.get('maxPayloadSize'):
698708
self.max_payload_size = int(int(service_config.get('maxPayloadSize')) * 80 / 100)
699709
log.info("Service configuration was successfully retrieved and applied.")
@@ -744,19 +754,30 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
744754
if waited:
745755
log.debug("Rate limit released, sending data to ThingsBoard...")
746756

747-
def wait_until_current_queued_messages_processed(self):
757+
def _wait_until_current_queued_messages_processed(self):
748758
previous_notification_time = 0
749759
current_out_messages = len(self._client._out_messages) * 2
750760
inflight_messages = self._client._max_inflight_messages or 5
761+
logger = None
762+
waiting_started = int(monotonic())
763+
connection_was_lost = False
764+
timeout_for_break = 600
765+
751766
if current_out_messages > 0:
752767
while current_out_messages >= inflight_messages and not self.stopped:
753768
current_out_messages = len(self._client._out_messages)
754769
if int(monotonic()) - previous_notification_time > 5 and current_out_messages > inflight_messages:
755-
log.debug("Waiting for messages to be processed by paho client, current queue size - %r, max inflight messages: %r",
770+
if logger is None:
771+
logger = logging.getLogger('tb_connection')
772+
logger.debug("Waiting for messages to be processed by paho client, current queue size - %r, max inflight messages: %r",
756773
current_out_messages, inflight_messages)
757774
previous_notification_time = int(monotonic())
775+
if not self.is_connected():
776+
connection_was_lost = True
758777
if current_out_messages >= inflight_messages:
759778
sleep(.001)
779+
if int(monotonic()) - waiting_started > timeout_for_break and not connection_was_lost:
780+
break
760781

761782
def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
762783
msg_rate_limit=None, dp_rate_limit=None):
@@ -834,32 +855,44 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
834855
for part in split_messages:
835856
if not part:
836857
continue
837-
dp_rate_limit.increase_rate_limit_counter(part['datapoints'])
838-
rate_limited = self._wait_for_rate_limit_released(timeout,
839-
message_rate_limit=msg_rate_limit,
840-
dp_rate_limit=dp_rate_limit,
841-
amount=part['datapoints'])
842-
if rate_limited:
843-
return rate_limited
844-
msg_rate_limit.increase_rate_limit_counter()
845-
kwargs["payload"] = dumps(part['message'])
846-
self.wait_until_current_queued_messages_processed()
847-
if not self.stopped:
848-
if device is not None:
849-
log.debug("Device: %s, Sending message to topic: %s ", device, topic)
850-
if part['datapoints'] > 0:
851-
log.debug("Sending message with %i datapoints", part['datapoints'])
852-
log.debug("Message payload: %r", kwargs["payload"])
853-
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
854-
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
855-
else:
856-
log.debug("Sending message with %r", kwargs["payload"])
857-
log.debug("Message payload: %r", kwargs["payload"])
858-
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
859-
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
860-
results.append(self._client.publish(**kwargs))
858+
self.__send_split_message(results, part, kwargs, timeout, device, msg_rate_limit, dp_rate_limit, topic)
861859
return TBPublishInfo(results)
862860

861+
def __send_split_message(self, results, part, kwargs, timeout, device, msg_rate_limit, dp_rate_limit,
862+
topic):
863+
dp_rate_limit.increase_rate_limit_counter(part['datapoints'])
864+
rate_limited = self._wait_for_rate_limit_released(timeout,
865+
message_rate_limit=msg_rate_limit,
866+
dp_rate_limit=dp_rate_limit,
867+
amount=part['datapoints'])
868+
if rate_limited:
869+
return rate_limited
870+
msg_rate_limit.increase_rate_limit_counter()
871+
kwargs["payload"] = dumps(part['message'])
872+
self._wait_until_current_queued_messages_processed()
873+
if not self.stopped:
874+
if device is not None:
875+
log.debug("Device: %s, Sending message to topic: %s ", device, topic)
876+
if part['datapoints'] > 0:
877+
log.debug("Sending message with %i datapoints", part['datapoints'])
878+
log.debug("Message payload: %r", kwargs["payload"])
879+
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
880+
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
881+
else:
882+
log.debug("Sending message with %r", kwargs["payload"])
883+
log.debug("Message payload: %r", kwargs["payload"])
884+
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
885+
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
886+
result = self._client.publish(**kwargs)
887+
if result.rc == MQTT_ERR_QUEUE_SIZE:
888+
while not self.stopped and result.rc == MQTT_ERR_QUEUE_SIZE:
889+
if int(monotonic()) - self.__error_logged > 10:
890+
log.warning("Queue size exceeded, waiting for messages to be processed by paho client.")
891+
self.__error_logged = int(monotonic())
892+
sleep(.01) # Give some time for paho to process messages
893+
result = self._client.publish(**kwargs)
894+
results.append(result)
895+
863896
def _subscribe_to_topic(self, topic, qos=None, timeout=DEFAULT_TIMEOUT):
864897
if qos is None:
865898
qos = self.quality_of_service

0 commit comments

Comments
 (0)