Skip to content

Commit 4eef2ad

Browse files
authored
Merge pull request #38 from TaskarCenterAtUW/feature-azure-topic-rewrite
Rewrote the azure topic for better control
2 parents cd9409b + ad39c23 commit 4eef2ad

4 files changed

Lines changed: 206 additions & 9 deletions

File tree

README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,33 @@ def process(message):
150150
print(f'Message Received: {message}')
151151

152152
try:
153-
topic_object.subscribe(subscription='subscriptionName')
153+
topic_object.subscribe(subscription='subscriptionName',process)
154154
except Exception as e:
155155
print(e)
156156
```
157157

158+
- Please note that `subscribe` is a blocking call which runs on a loop. Either use a `thread.Thread` to subscribe or an async method to continue with the other parts of the program
159+
160+
eg.
161+
```python
162+
163+
from python_ms_core import Core
164+
import threading
165+
166+
core = Core()
167+
topic_object = topic = Core.get_topic(topic_name='topicName')
168+
169+
def process(message):
170+
print(f'Message Received: {message}')
171+
172+
try:
173+
thread = threading.Thread(topic_object.subscribe, ['subscriptionName',process])
174+
thread.start()
175+
except Exception as e:
176+
print(e)
177+
178+
```
179+
158180
### Storage
159181
For all the azure blobs and other storages, storage components will offer simple ways to upload/download and read the existing data.
160182
```python

src/example.py

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,42 +13,83 @@
1313

1414
core = Core()
1515
print(f'Core version: {Core.__version__}')
16-
topic = 'temp-request'
16+
topic = 'temp-nar'
1717
subscription = 'temp'
1818
some_other_sub = 'temp'
19+
respond_topic = 'temp-response'
20+
# respond_topic_object = core.get_topic(topic_name=respond_topic)
21+
22+
23+
class MessageSender:
24+
def __init__(self, topic_name):
25+
self.topic = core.get_topic(topic_name=topic_name)
26+
27+
def send_message(self, message):
28+
self.topic.publish(data=QueueMessage.data_from(message))
29+
30+
1931

2032

2133
def publish_messages(topic_name):
2234
topic_object = core.get_topic(topic_name=topic_name)
2335
queue_message = QueueMessage.data_from({
2436
'message': str(uuid.uuid4().hex),
25-
'data': {'a': random.randint(60, 120)}
37+
'data': {'a': random.randint(10, 25)}
2638
})
2739
topic_object.publish(data=queue_message)
2840
print('Message Published')
2941

3042

43+
random_exec = 3
44+
exec_count = 0
3145
def long_running_task(sleep_time):
3246
# Simulate a long-running task
47+
global exec_count
48+
exec_count += 1
49+
if exec_count % random_exec == 0:
50+
# throw an exception
51+
raise Exception('Random Exception')
3352
time.sleep(sleep_time)
53+
sender_obj = MessageSender(topic_name=respond_topic)
54+
sender_obj.send_message({'message': 'Task Completed'})
55+
print(f' > Task Completed: {sleep_time}')
3456

3557

3658
def subscribe(topic_name, subscription_name):
3759
def process(message):
3860
print(f'Message Received: {message.data}')
39-
long_running_thread = threading.Thread(target=long_running_task, args=(message.data['a'],))
40-
long_running_thread.start()
41-
long_running_thread.join()
61+
# long_running_thread = threading.Thread(target=long_running_task, args=(message.data['a'],))
62+
# long_running_thread.start()
63+
# long_running_thread.join()
64+
long_running_task(message.data['a'])
4265
print(f' > Message Completed: {message.data}')
4366

44-
topic_object = core.get_topic(topic_name=topic_name)
67+
topic_object = core.get_topic(topic_name=topic_name,max_concurrent_messages=2)
4568
try:
4669
topic_object.subscribe(subscription=subscription_name, callback=process)
4770
except Exception as e:
4871
print(e)
4972

5073

51-
subscribe(topic, subscription)
74+
75+
import argparse
76+
77+
parser = argparse.ArgumentParser(description='Process the core topic.')
78+
parser.add_argument('-m','--mode', type=str, help='Mode of operation (publish/subscribe)', required=True)
79+
parser.add_argument('-s','--size', type=int, help='Number of the messages to send', required=False)
80+
81+
mode = parser.parse_args().mode
82+
size = parser.parse_args().size
83+
if mode == 'publish':
84+
for x in range(size):
85+
print(f'Publishing message {x+1}')
86+
publish_messages(topic_name=topic)
87+
if mode == 'subscribe':
88+
subscribe(topic, subscription)
89+
# subscribe(topic, subscription)
90+
# for x in range(10):
91+
# publish_messages(topic_name=topic)
92+
# subscribe(topic, subscription)
5293
# for x in range(10):
5394
# publish_messages(topic_name=topic)
5495

src/python_ms_core/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .core.logger.logger import Logger
44
from .core.logger.local_logger import LocalLogger
55
from .core.topic.topic import Topic
6+
from .core.topic.azure_topic import AzureTopic
67
from .core.topic.local_topic import LocalTopic
78
from .core.storage.providers.azure.azure_storage_client import AzureStorageClient
89
from .core.storage.providers.local.local_storage_client import LocalStorageClient
@@ -49,7 +50,7 @@ def get_topic(self, topic_name: str, max_concurrent_messages=os.cpu_count()):
4950
if topic_config.provider.upper() == LOCAL_ENV:
5051
return LocalTopic(config=topic_config, topic_name=topic_name)
5152
elif topic_config.provider.upper() == AZURE_ENV:
52-
return Topic(config=topic_config, topic_name=topic_name, max_concurrent_messages=max_concurrent_messages)
53+
return AzureTopic(config=topic_config, topic_name=topic_name, max_concurrent_messages=max_concurrent_messages)
5354
else:
5455
logging.error(f'Failed to initialize core.get_topic for provider: {topic_config.provider}')
5556

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import json
2+
3+
import logging
4+
import time
5+
from ..config.config import TopicConfig
6+
from ..resource_errors import ExceptionHandler
7+
from concurrent.futures import ThreadPoolExecutor
8+
from .abstract.topic_abstract import TopicAbstract
9+
from ..queue.models.queue_message import QueueMessage
10+
from azure.servicebus import ServiceBusClient, ServiceBusMessage
11+
from azure.servicebus import AutoLockRenewer
12+
import concurrent.futures as cf
13+
import threading
14+
15+
16+
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
17+
logger = logging.getLogger('AzureTopic')
18+
logger.setLevel(logging.INFO)
19+
20+
21+
"""
22+
AzureTopic class represents a topic in Azure Service Bus.
23+
Attributes:
24+
topic (str): The name of the topic.
25+
client (ServiceBusClient): The ServiceBusClient object used to interact with the Service Bus.
26+
max_concurrent_messages (int): The maximum number of concurrent messages to process.
27+
topic_name (str): The name of the topic.
28+
publisher (TopicSender): The TopicSender object used to send messages to the topic.
29+
executor (ThreadPoolExecutor): The ThreadPoolExecutor object used to execute callback functions.
30+
internal_count (int): The internal count of concurrent messages being processed.
31+
lock_renewal (AutoLockRenewer): The AutoLockRenewer object used to renew message locks.
32+
max_renewal_duration (int): The maximum duration in seconds to renew a message lock.
33+
wait_time_for_message (int): The maximum wait time in seconds to receive messages.
34+
Methods:
35+
publish(data: QueueMessage) -> None:
36+
Publishes a message to the topic.
37+
subscribe(subscription: str, callback) -> None:
38+
Subscribes to a subscription of the topic and processes incoming messages.
39+
internal_callback(message, callbackfn) -> ServiceBusMessage:
40+
Internal callback function that processes a message and invokes the callback function.
41+
settle_message(x: cf.Future) -> None:
42+
Sets the message as completed and updates the internal count.
43+
"""
44+
class AzureTopic(TopicAbstract):
45+
def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_messages:int=1):
46+
self.topic = topic_name
47+
self.client = ServiceBusClient.from_connection_string(conn_str=config.connection_string, retry_total=10, retry_backoff_factor=1, retry_backoff_max=30)
48+
self.max_concurrent_messages = max_concurrent_messages
49+
self.topic_name = topic_name
50+
self.publisher = self.client.get_topic_sender(topic_name=topic_name)
51+
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages)
52+
self.internal_count = 0
53+
self.lock_renewal = AutoLockRenewer(max_workers=max_concurrent_messages)
54+
self.max_renewal_duration = 86400 # Renew the message upto 1 day
55+
self.wait_time_for_message = 5
56+
self.thread_lock = threading.Lock()
57+
58+
59+
def publish(self, data: QueueMessage):
60+
"""
61+
Publishes a message to the topic.
62+
Args:
63+
data (QueueMessage): The message to publish.
64+
65+
"""
66+
message = QueueMessage.to_dict(data)
67+
self.publisher.send_messages(ServiceBusMessage(json.dumps(message)))
68+
69+
def subscribe(self, subscription: str, callback):
70+
71+
"""
72+
Subscribes to a subscription of the topic and processes incoming messages.
73+
Args:
74+
subscription (str): The name of the subscription to subscribe to.
75+
callback (function): The callback function to invoke for each message.
76+
"""
77+
self.receiver = self.client.get_subscription_receiver(topic_name=self.topic_name, subscription_name=subscription)
78+
while True:
79+
try:
80+
to_receive = (self.max_concurrent_messages - self.internal_count)
81+
if to_receive > 0:
82+
messages = self.receiver.receive_messages(max_message_count=to_receive, max_wait_time=self.wait_time_for_message)
83+
if not messages or len(messages) == 0:
84+
continue
85+
for message in messages:
86+
self.lock_renewal.register(self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration)
87+
execution_task = self.executor.submit(self.internal_callback, message, callback)
88+
execution_task.add_done_callback(lambda x: self.settle_message(x))
89+
else:
90+
time.sleep(self.wait_time_for_message)
91+
except Exception as e:
92+
logger.error(f'Error in receiving messages: {e}')
93+
94+
95+
def internal_callback(self, message, callbackfn):
96+
"""
97+
Internal callback function that processes a message and invokes the callback function.
98+
Args:
99+
message (ServiceBusMessage): The message to process.
100+
callbackfn (function): The callback function to invoke.
101+
Returns:
102+
ServiceBusMessage: The processed message.
103+
"""
104+
try:
105+
with self.thread_lock:
106+
self.internal_count += 1 # thread safe.
107+
queue_message = QueueMessage.data_from(str(message))
108+
callbackfn(queue_message)
109+
return [True,message]
110+
except Exception as e:
111+
logger.error(f'Error in processing message: {e}')
112+
return [False,message]
113+
114+
115+
def settle_message(self, x: cf.Future):
116+
"""
117+
Sets the message as completed and updates the internal count.
118+
Args:
119+
x (cf.Future): The future object representing the message processing.
120+
"""
121+
# Lock the internal count
122+
with self.thread_lock:
123+
self.internal_count -= 1
124+
# Check if the future has an exception
125+
[is_success,incoming_message] = x.result()
126+
if is_success:
127+
self.receiver.complete_message(incoming_message)
128+
else:
129+
print(f'Abandoning message: {incoming_message}')
130+
self.receiver.abandon_message(incoming_message) # send back to the topic
131+
return
132+
133+

0 commit comments

Comments
 (0)