Skip to content

Commit 3359047

Browse files
committed
Rewrote the azure topic for better control
- Rewrote the azure topic for better control and ease of operation
1 parent cd9409b commit 3359047

3 files changed

Lines changed: 165 additions & 8 deletions

File tree

src/example.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,28 @@
1313

1414
core = Core()
1515
print(f'Core version: {Core.__version__}')
16-
topic = 'temp-request'
16+
topic = 'temp-nar'
1717
subscription = 'temp'
1818
some_other_sub = 'temp'
19+
respond_topic = 'temp-response'
20+
# respond_topic_object = core.get_topic(topic_name=respond_topic)
21+
22+
23+
class MessageSender:
24+
def __init__(self, topic_name):
25+
self.topic = core.get_topic(topic_name=topic_name)
26+
27+
def send_message(self, message):
28+
self.topic.publish(data=QueueMessage.data_from(message))
29+
30+
1931

2032

2133
def publish_messages(topic_name):
2234
topic_object = core.get_topic(topic_name=topic_name)
2335
queue_message = QueueMessage.data_from({
2436
'message': str(uuid.uuid4().hex),
25-
'data': {'a': random.randint(60, 120)}
37+
'data': {'a': random.randint(10, 25)}
2638
})
2739
topic_object.publish(data=queue_message)
2840
print('Message Published')
@@ -31,24 +43,46 @@ def publish_messages(topic_name):
3143
def long_running_task(sleep_time):
3244
# Simulate a long-running task
3345
time.sleep(sleep_time)
46+
sender_obj = MessageSender(topic_name=respond_topic)
47+
sender_obj.send_message({'message': 'Task Completed'})
48+
print(f' > Task Completed: {sleep_time}')
3449

3550

3651
def subscribe(topic_name, subscription_name):
3752
def process(message):
3853
print(f'Message Received: {message.data}')
39-
long_running_thread = threading.Thread(target=long_running_task, args=(message.data['a'],))
40-
long_running_thread.start()
41-
long_running_thread.join()
54+
# long_running_thread = threading.Thread(target=long_running_task, args=(message.data['a'],))
55+
# long_running_thread.start()
56+
# long_running_thread.join()
57+
long_running_task(message.data['a'])
4258
print(f' > Message Completed: {message.data}')
4359

44-
topic_object = core.get_topic(topic_name=topic_name)
60+
topic_object = core.get_topic(topic_name=topic_name,max_concurrent_messages=2)
4561
try:
4662
topic_object.subscribe(subscription=subscription_name, callback=process)
4763
except Exception as e:
4864
print(e)
4965

5066

51-
subscribe(topic, subscription)
67+
68+
import argparse
69+
70+
parser = argparse.ArgumentParser(description='Process the core topic.')
71+
parser.add_argument('-m','--mode', type=str, help='Mode of operation (publish/subscribe)', required=True)
72+
parser.add_argument('-s','--size', type=int, help='Number of the messages to send', required=False)
73+
74+
mode = parser.parse_args().mode
75+
size = parser.parse_args().size
76+
if mode == 'publish':
77+
for x in range(size):
78+
print(f'Publishing message {x+1}')
79+
publish_messages(topic_name=topic)
80+
if mode == 'subscribe':
81+
subscribe(topic, subscription)
82+
# subscribe(topic, subscription)
83+
# for x in range(10):
84+
# publish_messages(topic_name=topic)
85+
# subscribe(topic, subscription)
5286
# for x in range(10):
5387
# publish_messages(topic_name=topic)
5488

src/python_ms_core/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .core.logger.logger import Logger
44
from .core.logger.local_logger import LocalLogger
55
from .core.topic.topic import Topic
6+
from .core.topic.azure_topic import AzureTopic
67
from .core.topic.local_topic import LocalTopic
78
from .core.storage.providers.azure.azure_storage_client import AzureStorageClient
89
from .core.storage.providers.local.local_storage_client import LocalStorageClient
@@ -49,7 +50,7 @@ def get_topic(self, topic_name: str, max_concurrent_messages=os.cpu_count()):
4950
if topic_config.provider.upper() == LOCAL_ENV:
5051
return LocalTopic(config=topic_config, topic_name=topic_name)
5152
elif topic_config.provider.upper() == AZURE_ENV:
52-
return Topic(config=topic_config, topic_name=topic_name, max_concurrent_messages=max_concurrent_messages)
53+
return AzureTopic(config=topic_config, topic_name=topic_name, max_concurrent_messages=max_concurrent_messages)
5354
else:
5455
logging.error(f'Failed to initialize core.get_topic for provider: {topic_config.provider}')
5556

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import json
2+
3+
import logging
4+
import time
5+
from ..config.config import TopicConfig
6+
from ..resource_errors import ExceptionHandler
7+
from concurrent.futures import ThreadPoolExecutor
8+
from .abstract.topic_abstract import TopicAbstract
9+
from ..queue.models.queue_message import QueueMessage
10+
from azure.servicebus import ServiceBusClient, ServiceBusMessage
11+
from azure.servicebus import AutoLockRenewer
12+
import concurrent.futures as cf
13+
14+
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
15+
logger = logging.getLogger('AzureTopic')
16+
logger.setLevel(logging.INFO)
17+
18+
19+
"""
20+
AzureTopic class represents a topic in Azure Service Bus.
21+
Attributes:
22+
topic (str): The name of the topic.
23+
client (ServiceBusClient): The ServiceBusClient object used to interact with the Service Bus.
24+
max_concurrent_messages (int): The maximum number of concurrent messages to process.
25+
topic_name (str): The name of the topic.
26+
publisher (TopicSender): The TopicSender object used to send messages to the topic.
27+
executor (ThreadPoolExecutor): The ThreadPoolExecutor object used to execute callback functions.
28+
internal_count (int): The internal count of concurrent messages being processed.
29+
lock_renewal (AutoLockRenewer): The AutoLockRenewer object used to renew message locks.
30+
max_renewal_duration (int): The maximum duration in seconds to renew a message lock.
31+
wait_time_for_message (int): The maximum wait time in seconds to receive messages.
32+
Methods:
33+
publish(data: QueueMessage) -> None:
34+
Publishes a message to the topic.
35+
subscribe(subscription: str, callback) -> None:
36+
Subscribes to a subscription of the topic and processes incoming messages.
37+
internal_callback(message, callbackfn) -> ServiceBusMessage:
38+
Internal callback function that processes a message and invokes the callback function.
39+
settle_message(x: cf.Future) -> None:
40+
Sets the message as completed and updates the internal count.
41+
"""
42+
class AzureTopic(TopicAbstract):
43+
def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_messages:int=1):
44+
self.topic = topic_name
45+
self.client = ServiceBusClient.from_connection_string(conn_str=config.connection_string, retry_total=10, retry_backoff_factor=1, retry_backoff_max=30)
46+
self.max_concurrent_messages = max_concurrent_messages
47+
self.topic_name = topic_name
48+
self.publisher = self.client.get_topic_sender(topic_name=topic_name)
49+
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages)
50+
self.internal_count = 0
51+
self.lock_renewal = AutoLockRenewer(max_workers=4)
52+
self.max_renewal_duration = 86400 # Renew the message upto 1 day
53+
self.wait_time_for_message = 5
54+
55+
56+
def publish(self, data: QueueMessage):
57+
"""
58+
Publishes a message to the topic.
59+
Args:
60+
data (QueueMessage): The message to publish.
61+
62+
"""
63+
message = QueueMessage.to_dict(data)
64+
self.publisher.send_messages(ServiceBusMessage(json.dumps(message)))
65+
66+
def subscribe(self, subscription: str, callback):
67+
68+
"""
69+
Subscribes to a subscription of the topic and processes incoming messages.
70+
Args:
71+
subscription (str): The name of the subscription to subscribe to.
72+
callback (function): The callback function to invoke for each message.
73+
"""
74+
self.receiver = self.client.get_subscription_receiver(topic_name=self.topic_name, subscription_name=subscription)
75+
while True:
76+
try:
77+
to_receive = (self.max_concurrent_messages - self.internal_count)
78+
if to_receive > 0:
79+
messages = self.receiver.receive_messages(max_message_count=to_receive, max_wait_time=self.wait_time_for_message)
80+
if not messages or len(messages) == 0:
81+
continue
82+
for message in messages:
83+
self.lock_renewal.register(self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration)
84+
execution_task = self.executor.submit(self.internal_callback, message, callback)
85+
execution_task.add_done_callback(lambda x: self.settle_message(x))
86+
else:
87+
time.sleep(self.wait_time_for_message)
88+
except Exception as e:
89+
logger.error(f'Error in receiving messages: {e}')
90+
91+
92+
def internal_callback(self, message, callbackfn):
93+
"""
94+
Internal callback function that processes a message and invokes the callback function.
95+
Args:
96+
message (ServiceBusMessage): The message to process.
97+
callbackfn (function): The callback function to invoke.
98+
Returns:
99+
ServiceBusMessage: The processed message.
100+
"""
101+
try:
102+
self.internal_count += 1
103+
queue_message = QueueMessage.data_from(str(message))
104+
callbackfn(queue_message)
105+
except Exception as e:
106+
logger.error(f'Error in processing message: {e}')
107+
finally:
108+
return message
109+
110+
111+
def settle_message(self, x: cf.Future):
112+
"""
113+
Sets the message as completed and updates the internal count.
114+
Args:
115+
x (cf.Future): The future object representing the message processing.
116+
"""
117+
self.internal_count -= 1
118+
incoming_message = x.result()
119+
self.receiver.complete_message(incoming_message)
120+
return
121+
122+

0 commit comments

Comments
 (0)