@@ -66,7 +66,7 @@ def publish(self, data: QueueMessage):
6666 message = QueueMessage .to_dict (data )
6767 self .publisher .send_messages (ServiceBusMessage (json .dumps (message )))
6868
69- def subscribe (self , subscription : str , callback ):
69+ def subscribe (self , subscription : str , callback , max_receivable_messages = - 1 ):
7070
7171 """
7272 Subscribes to a subscription of the topic and processes incoming messages.
@@ -75,17 +75,24 @@ def subscribe(self, subscription: str, callback):
7575 callback (function): The callback function to invoke for each message.
7676 """
7777 self .receiver = self .client .get_subscription_receiver (topic_name = self .topic_name , subscription_name = subscription )
78+ self .receiver .local_received_messages = 0
7879 while True :
7980 try :
8081 to_receive = (self .max_concurrent_messages - self .internal_count )
82+ total_messages_to_recieve_more = max_receivable_messages - self .receiver .local_received_messages
83+ if max_receivable_messages > 0 :
84+ to_receive = min (to_receive , total_messages_to_recieve_more )
8185 if to_receive > 0 :
8286 messages = self .receiver .receive_messages (max_message_count = to_receive , max_wait_time = self .wait_time_for_message )
8387 if not messages or len (messages ) == 0 :
8488 continue
89+ self .receiver .local_received_messages += len (messages )
8590 for message in messages :
8691 self .lock_renewal .register (self .receiver , message , max_lock_renewal_duration = self .max_renewal_duration )
8792 execution_task = self .executor .submit (self .internal_callback , message , callback )
8893 execution_task .add_done_callback (lambda x : self .settle_message (x ))
94+ if self .receiver .local_received_messages >= max_receivable_messages and max_receivable_messages > 0 : # Break if the messages are more than max_receivable_messages
95+ break
8996 else :
9097 time .sleep (self .wait_time_for_message )
9198 except Exception as e :
0 commit comments