Skip to content

Commit e3f5034

Browse files
committed
Add max_receivable_messages to topic subscribe
Introduce an optional max_receivable_messages parameter to TopicAbstract.subscribe and implement support in AzureTopic and LocalTopic. AzureTopic now tracks per-receiver local_received_messages, limits receive batch sizes accordingly, and stops once the requested max is reached. Update example to use 'test-response', disable the random exception, fix task sleep to 5s, and lower concurrency to 1 with subscribe(..., max_receivable_messages=1). Bump package version to 0.0.24.
1 parent 2a4587c commit e3f5034

5 files changed

Lines changed: 22 additions & 13 deletions

File tree

src/example.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
topic = 'temp-nar'
1717
subscription = 'temp'
1818
some_other_sub = 'temp'
19-
respond_topic = 'temp-response'
19+
respond_topic = 'test-response'
2020
# respond_topic_object = core.get_topic(topic_name=respond_topic)
2121

2222

@@ -45,10 +45,11 @@ def publish_messages(topic_name):
4545
def long_running_task(sleep_time):
4646
# Simulate a long-running task
4747
global exec_count
48-
exec_count += 1
49-
if exec_count % random_exec == 0:
50-
# throw an exception
51-
raise Exception('Random Exception')
48+
exec_count += 1
49+
# Throw exceptions at random
50+
# if exec_count % random_exec == 0:
51+
# # throw an exception
52+
# raise Exception('Random Exception')
5253
time.sleep(sleep_time)
5354
sender_obj = MessageSender(topic_name=respond_topic)
5455
sender_obj.send_message({'message': 'Task Completed'})
@@ -61,12 +62,13 @@ def process(message):
6162
# long_running_thread = threading.Thread(target=long_running_task, args=(message.data['a'],))
6263
# long_running_thread.start()
6364
# long_running_thread.join()
64-
long_running_task(message.data['a'])
65+
# long_running_task(message.data['a'])
66+
long_running_task(5) # 5 seconds for each task
6567
print(f' > Message Completed: {message.data}')
6668

67-
topic_object = core.get_topic(topic_name=topic_name,max_concurrent_messages=2)
69+
topic_object = core.get_topic(topic_name=topic_name,max_concurrent_messages=1)
6870
try:
69-
topic_object.subscribe(subscription=subscription_name, callback=process)
71+
topic_object.subscribe(subscription=subscription_name, callback=process, max_receivable_messages=1)
7072
except Exception as e:
7173
print(e)
7274

src/python_ms_core/core/topic/abstract/topic_abstract.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class TopicAbstract(ABC):
66
def __init__(self, config=None, topic_name=None): pass
77

88
@abstractmethod
9-
def subscribe(self, subscription=None, callback=None): pass
9+
def subscribe(self, subscription=None, callback=None, max_receivable_messages=-1): pass
1010

1111
@abstractmethod
12-
def publish(self, data=None): pass
12+
def publish(self, data=None): pass

src/python_ms_core/core/topic/azure_topic.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def publish(self, data: QueueMessage):
6666
message = QueueMessage.to_dict(data)
6767
self.publisher.send_messages(ServiceBusMessage(json.dumps(message)))
6868

69-
def subscribe(self, subscription: str, callback):
69+
def subscribe(self, subscription: str, callback, max_receivable_messages=-1):
7070

7171
"""
7272
Subscribes to a subscription of the topic and processes incoming messages.
@@ -75,17 +75,24 @@ def subscribe(self, subscription: str, callback):
7575
callback (function): The callback function to invoke for each message.
7676
"""
7777
self.receiver = self.client.get_subscription_receiver(topic_name=self.topic_name, subscription_name=subscription)
78+
self.receiver.local_received_messages = 0
7879
while True:
7980
try:
8081
to_receive = (self.max_concurrent_messages - self.internal_count)
82+
total_messages_to_recieve_more = max_receivable_messages - self.receiver.local_received_messages
83+
if max_receivable_messages > 0:
84+
to_receive = min(to_receive, total_messages_to_recieve_more)
8185
if to_receive > 0:
8286
messages = self.receiver.receive_messages(max_message_count=to_receive, max_wait_time=self.wait_time_for_message)
8387
if not messages or len(messages) == 0:
8488
continue
89+
self.receiver.local_received_messages += len(messages)
8590
for message in messages:
8691
self.lock_renewal.register(self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration)
8792
execution_task = self.executor.submit(self.internal_callback, message, callback)
8893
execution_task.add_done_callback(lambda x: self.settle_message(x))
94+
if self.receiver.local_received_messages >= max_receivable_messages and max_receivable_messages > 0: # Break if the messages are more than max_receivable_messages
95+
break
8996
else:
9097
time.sleep(self.wait_time_for_message)
9198
except Exception as e:

src/python_ms_core/core/topic/local_topic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self, config=None, topic_name=None):
3232
self.channel.exchange_declare(exchange=self.topic, exchange_type='fanout')
3333

3434
@ExceptionHandler.decorated
35-
def subscribe(self, subscription=None, callback=None):
35+
def subscribe(self, subscription=None, callback=None, max_receivable_messages=-1):
3636

3737
if subscription is not None:
3838
if self.connection.is_open:

src/python_ms_core/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.0.23'
1+
__version__ = '0.0.24'

0 commit comments

Comments
 (0)