Skip to content

Commit 2441767

Browse files
committed
Update azure_topic.py
Added thread locking
1 parent 4e3c96f commit 2441767

1 file changed

Lines changed: 8 additions & 2 deletions

File tree

src/python_ms_core/core/topic/azure_topic.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from azure.servicebus import ServiceBusClient, ServiceBusMessage
1111
from azure.servicebus import AutoLockRenewer
1212
import concurrent.futures as cf
13+
import threading
14+
1315

1416
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
1517
logger = logging.getLogger('AzureTopic')
@@ -51,6 +53,7 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
5153
self.lock_renewal = AutoLockRenewer(max_workers=max_concurrent_messages)
5254
self.max_renewal_duration = 86400 # Renew the message upto 1 day
5355
self.wait_time_for_message = 5
56+
self.thread_lock = threading.Lock()
5457

5558

5659
def publish(self, data: QueueMessage):
@@ -99,7 +102,8 @@ def internal_callback(self, message, callbackfn):
99102
ServiceBusMessage: The processed message.
100103
"""
101104
try:
102-
self.internal_count += 1 # thread safe.
105+
with self.thread_lock:
106+
self.internal_count += 1 # thread safe.
103107
queue_message = QueueMessage.data_from(str(message))
104108
callbackfn(queue_message)
105109
return [True,message]
@@ -114,7 +118,9 @@ def settle_message(self, x: cf.Future):
114118
Args:
115119
x (cf.Future): The future object representing the message processing.
116120
"""
117-
self.internal_count -= 1 # thread safe.
121+
# Lock the internal count
122+
with self.thread_lock:
123+
self.internal_count -= 1
118124
# Check if the future has an exception
119125
[is_success,incoming_message] = x.result()
120126
if is_success:

0 commit comments

Comments
 (0)