Skip to content

Commit 6552bfb

Browse files
committed
Update topic.py
Refactored the code that listens to the subscription.
1 parent 1dc8736 commit 6552bfb

1 file changed

Lines changed: 14 additions & 6 deletions

File tree

src/python_ms_core/core/topic/topic.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ class Callback:
1616
def __init__(self, fn=None):
1717
self._function_to_call = fn
1818

19+
# old method to fetch messages. Not used anymore
1920
def messages(self, provider, topic, subscription):
2021
with provider.client:
21-
topic_receiver = provider.client.get_subscription_receiver(topic, subscription_name=subscription,max_wait_time=200)
22+
topic_receiver = provider.client.get_subscription_receiver(topic, subscription_name=subscription)
2223
logger.info(f'Started receiver for {subscription}')
2324
with topic_receiver:
2425
for message in topic_receiver:
@@ -33,21 +34,28 @@ def messages(self, provider, topic, subscription):
3334
logger.info(f'Completed gathering messages')
3435
logger.info('Completed topic receiver')
3536

37+
# Sends data to the callback function
3638
def process_message(self, message:str):
3739
queue_message = QueueMessage.data_from(message)
3840
self._function_to_call(queue_message)
39-
41+
42+
# Starts listening to the messages
4043
def start_listening(self, provider, topic, subscription):
4144
with provider.client: # service bus client
4245
while True:
43-
logger.info('Going into while')
46+
logger.info('Initiatig receiver')
4447
topic_receiver = provider.client.get_subscription_receiver(topic, subscription_name=subscription) # servicebusclientsubscriptionreceiver
4548
with topic_receiver:
4649
for message in topic_receiver:
47-
self.process_message(message=str(message)) # sync call. [By default 1minute ] -> lock renewal for 300 seconds
48-
topic_receiver.complete_message(message) # fails -> peeklock is timedout
50+
try:
51+
self.process_message(message=str(message)) # sync call. [By default 1minute ] -> lock renewal for 300 seconds
52+
topic_receiver.complete_message(message)
53+
except Exception as e:
54+
print(f'Error : {e}, Invalid message received : {message}')
55+
finally:
56+
topic_receiver.complete_message(message)
4957
# Change mode from PEEK_LOCK to RECEIVE_AND_DELETE
50-
logger.info('Completed topic receiver')
58+
logger.info('Topic receiver invalidated')
5159

5260

5361
class Topic(TopicAbstract):

0 commit comments

Comments
 (0)