@@ -48,7 +48,7 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
4848 self .publisher = self .client .get_topic_sender (topic_name = topic_name )
4949 self .executor = ThreadPoolExecutor (max_workers = max_concurrent_messages )
5050 self .internal_count = 0
51- self .lock_renewal = AutoLockRenewer (max_workers = 4 )
51+ self .lock_renewal = AutoLockRenewer (max_workers = max_concurrent_messages )
5252 self .max_renewal_duration = 86400 # Renew the message upto 1 day
5353 self .wait_time_for_message = 5
5454
@@ -99,24 +99,29 @@ def internal_callback(self, message, callbackfn):
9999 ServiceBusMessage: The processed message.
100100 """
101101 try :
102- self .internal_count += 1
102+ self .internal_count += 1 # thread safe.
103103 queue_message = QueueMessage .data_from (str (message ))
104104 callbackfn (queue_message )
105+ return [True ,message ]
105106 except Exception as e :
106107 logger .error (f'Error in processing message: { e } ' )
107- finally :
108- return message
109-
108+ return [False ,message ]
109+
110110
111111 def settle_message (self , x : cf .Future ):
112112 """
113113 Sets the message as completed and updates the internal count.
114114 Args:
115115 x (cf.Future): The future object representing the message processing.
116116 """
117- self .internal_count -= 1
118- incoming_message = x .result ()
119- self .receiver .complete_message (incoming_message )
117+ self .internal_count -= 1 # thread safe.
118+ # Check if the future has an exception
119+ [is_success ,incoming_message ] = x .result ()
120+ if is_success :
121+ self .receiver .complete_message (incoming_message )
122+ else :
123+ print (f'Abandoning message: { incoming_message } ' )
124+ self .receiver .abandon_message (incoming_message ) # send back to the topic
120125 return
121126
122127
0 commit comments