77from ..resource_errors import ExceptionHandler
88from ..queue .models .queue_message import QueueMessage
99
10- logging .basicConfig (format = '%(asctime)s %(levelname)-8s %(message)s' ,datefmt = '%Y-%m-%d %H:%M:%S' )
10+ logging .basicConfig (format = '%(asctime)s %(levelname)-8s %(message)s' , datefmt = '%Y-%m-%d %H:%M:%S' )
1111logger = logging .getLogger ('Topic' )
1212logger .setLevel (logging .INFO )
1313
1414
1515class Callback :
16- def __init__ (self , fn = None ):
16+ def __init__ (self , fn = None , max_concurrent_messages = 1 ):
1717 self ._function_to_call = fn
18+ self ._max_concurrent_messages = max_concurrent_messages
19+ self ._stop_event = threading .Event ()
20+ self ._renewal_interval = 10 # seconds
21+
22+ def _renew_message_lock (self , message , receiver ):
23+ while not self ._stop_event .is_set ():
24+ try :
25+ time .sleep (self ._renewal_interval )
26+ if not message ._lock_expired :
27+ receiver .renew_message_lock (message )
28+ except Exception as e :
29+ break
1830
1931 # old method to fetch messages. Not used anymore
2032 def messages (self , provider , topic , subscription ):
@@ -33,41 +45,62 @@ def messages(self, provider, topic, subscription):
3345 logger .info ('Completed topic receiver' )
3446
3547 # Sends data to the callback function
36- def process_message (self , message : str ):
37- queue_message = QueueMessage .data_from (message )
48+ def process_message (self , message , receiver ):
49+ queue_message = QueueMessage .data_from (str ( message ) )
3850 self ._function_to_call (queue_message )
39-
51+ receiver .complete_message (message )
52+
4053 # Starts listening to the messages
4154 def start_listening (self , provider , topic , subscription ):
42- with provider .client : # service bus client
55+ with provider .client : # service bus client
4356 logger .info ('Initiating receiver' )
44- topic_receiver = provider .client .get_subscription_receiver (topic , subscription_name = subscription ) # servicebusclientsubscriptionreceiver
45- logger .info ('Done' )
57+ topic_receiver = provider .client .get_subscription_receiver (
58+ topic_name = topic ,
59+ subscription_name = subscription
60+ )
61+ logger .info ('Receiver started' )
4662 with topic_receiver :
4763 while True :
4864 try :
49- for message in topic_receiver :
65+ messages = topic_receiver .receive_messages (
66+ max_message_count = self ._max_concurrent_messages ,
67+ max_wait_time = 5
68+ )
69+ if not messages :
70+ continue
71+
72+ for message in messages :
73+ stop_event = threading .Event ()
74+ lock_renewal_thread = threading .Thread (
75+ target = self ._renew_message_lock ,
76+ args = (message , topic_receiver )
77+ )
78+ lock_renewal_thread .start ()
5079 try :
51- self .process_message (message = str ( message )) # sync call. [By default 1minute ] -> lock renewal for 300 seconds
80+ self .process_message (message , topic_receiver )
5281 except Exception as e :
53- print (f'Error : { e } , Invalid message received : { message } ' )
82+ logger .error (f'Error processing message: { message } . Error: { e } ' )
83+ topic_receiver .abandon_message (message )
84+ logger .info (f'Message { message .message_id } abandoned' )
5485 finally :
55- topic_receiver .complete_message (message )
86+ stop_event .set () # Signal the lock renewal thread to stop
87+ lock_renewal_thread .join ()
5688 except Exception as et :
57- print (f'Error in service bus connection : { et } ' )
89+ logger . error (f'Error in service bus connection: { et } ' )
5890 # Change mode from PEEK_LOCK to RECEIVE_AND_DELETE
59- logger .info ('Topic receiver invalidated ' )
91+ logger .info ('Receiver stopped ' )
6092
6193
6294class Topic (TopicAbstract ):
63- def __init__ (self , config = None , topic_name = None ):
95+ def __init__ (self , config = None , topic_name = None , max_concurrent_messages = 1 ):
6496 self .topic = topic_name
6597 self .provider = Config (config = config , topic_name = topic_name )
98+ self .max_concurrent_messages = max_concurrent_messages
6699
67100 @ExceptionHandler .decorated
68101 def subscribe (self , subscription = None , callback = None ):
69102 if subscription is not None :
70- cb = Callback (callback )
103+ cb = Callback (callback , max_concurrent_messages = self . max_concurrent_messages )
71104 thread = threading .Thread (target = cb .start_listening , args = (self .provider , self .topic , subscription ))
72105 thread .start ()
73106 time .sleep (5 )
0 commit comments