Skip to content

Commit 8075bcc

Browse files
committed
# Version 0.0.21
### New Features and Enhancements - **Message Lock Renewal:** Implemented a mechanism to automatically renew message locks during processing. This ensures that messages remain active and are not returned to the queue for reprocessing while they are being handled. - **Concurrent Message Processing:** Enhanced the system to process messages concurrently using a number of worker threads equal to the number of available CPU cores by default. Users can override this default by specifying the `max_concurrent_messages` parameter, for example, `core.get_topic(topic_name=topic_name, max_concurrent_messages=10)`. This optimization leverages system resources for improved performance and throughput. - **Completion Acknowledgement:** Updated the processing flow to wait until message processing is fully completed before sending the acknowledgement of message completion. This change ensures reliable processing and accurate message handling. - **Version Tracking:** Introduced a `version.py` file to maintain and track the package version. This addition facilitates version control and package management. - **Unit Test Updates:** Updated unit test cases to cover the new features and enhancements, ensuring robust testing and quality assurance. - **Documentation Update:** Updated the README file to reflect the new features and enhancements, providing clearer guidance and information for users.
1 parent b9c307c commit 8075bcc

6 files changed

Lines changed: 144 additions & 66 deletions

File tree

CHANGELOG.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
# Change log
22

3-
# 0.0.21
4-
- Introduced renew message lock, which ensures that message is alive while processing and not returned to the queue to be reprocessed.
5-
- Added max concurrent messages login which can handle multiple requests at once
6-
- Added version.py file to maintain the package version
7-
- Updated readme
3+
# Version 0.0.21
4+
### New Features and Enhancements
5+
- **Message Lock Renewal:** Implemented a mechanism to automatically renew message locks during processing. This ensures that messages remain active and are not returned to the queue for reprocessing while they are being handled.
6+
- **Concurrent Message Processing:** Enhanced the system to process messages concurrently using a number of worker threads equal to the number of available CPU cores by default. Users can override this default by specifying the `max_concurrent_messages` parameter, for example, `core.get_topic(topic_name=topic_name, max_concurrent_messages=10)`. This optimization leverages system resources for improved performance and throughput.
7+
- **Completion Acknowledgement:** Updated the processing flow to wait until message processing is fully completed before sending the acknowledgement of message completion. This change ensures reliable processing and accurate message handling.
8+
- **Version Tracking:** Introduced a `version.py` file to maintain and track the package version. This addition facilitates version control and package management.
9+
- **Unit Test Updates:** Updated unit test cases to cover the new features and enhancements, ensuring robust testing and quality assurance.
10+
- **Documentation Update:** Updated the README file to reflect the new features and enhancements, providing clearer guidance and information for users.
11+
12+
813

914
# 0.0.18
1015
- Adds extra checks in service bus for retries

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ Topic can be accessed by the core method `get_topic`. This method takes two para
112112
from python_ms_core import Core
113113

114114
core = Core()
115-
topic = core.get_topic(topic_name='topicName')
115+
topic = core.get_topic(topic_name='topicName') # By default, process messages concurrently which are available CPU cores
116+
topic = core.get_topic(topic_name='topicName', max_concurrent_messages=10) # Process 10 messages concurrently
117+
118+
116119

117120
```
118121

src/example.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,41 @@
11
# Testing code
2-
import sys
32
import os
43
import time
54
import uuid
65
import random
7-
import datetime
8-
from io import BytesIO, StringIO
6+
import threading
97

108
from python_ms_core import Core
119
from python_ms_core.core.queue.models.queue_message import QueueMessage
12-
from python_ms_core.core.auth.models.permission_request import PermissionRequest
13-
14-
1510

1611
core = Core()
1712
print(f'Core version: {Core.__version__}')
18-
topic = 'gtfs-pathways-upload'
19-
subscription = 'log'
20-
some_other_sub = 'usdufs'
13+
topic = 'temp-request'
14+
subscription = 'temp'
15+
some_other_sub = 'temp'
2116

2217

2318
def publish_messages(topic_name):
2419
topic_object = core.get_topic(topic_name=topic_name)
2520
queue_message = QueueMessage.data_from({
2621
'message': str(uuid.uuid4().hex),
27-
'data': {'a': random.randint(0, 1000)}
22+
'data': {'a': random.randint(60, 120)}
2823
})
2924
topic_object.publish(data=queue_message)
3025
print('Message Published')
3126

27+
def long_running_task(sleep_time):
28+
# Simulate a long-running task
29+
time.sleep(sleep_time)
30+
3231

3332
def subscribe(topic_name, subscription_name):
3433
def process(message):
35-
print(f'Message Received: {message}')
36-
# Spawn and thread process it -> 1 hr no issues
37-
# return
34+
print(f'Message Received: {message.data}')
35+
long_running_thread = threading.Thread(target=long_running_task, args=(message.data['a'],))
36+
long_running_thread.start()
37+
long_running_thread.join()
38+
print(f' > Message Completed: {message.data}')
3839

3940
topic_object = core.get_topic(topic_name=topic_name)
4041
try:
@@ -44,6 +45,8 @@ def process(message):
4445

4546

4647
# subscribe(topic, subscription)
48+
# for x in range(10):
49+
# publish_messages(topic_name=topic)
4750

4851
# azure_client = core.get_storage_client()
4952

src/python_ms_core/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import logging
23
from .core.logger.logger import Logger
34
from .core.logger.local_logger import LocalLogger
@@ -43,7 +44,7 @@ def get_logger(self):
4344
else:
4445
logging.error(f'Failed to initialize core.get_logger for provider: {logger_config.provider}')
4546

46-
def get_topic(self, topic_name: str, max_concurrent_messages=1):
47+
def get_topic(self, topic_name: str, max_concurrent_messages=os.cpu_count()):
4748
topic_config = self.config.topic()
4849
if topic_config.provider.upper() == LOCAL_ENV:
4950
return LocalTopic(config=topic_config, topic_name=topic_name)

src/python_ms_core/core/topic/topic.py

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,28 @@ class Callback:
1616
def __init__(self, fn=None, max_concurrent_messages=1):
1717
self._function_to_call = fn
1818
self._max_concurrent_messages = max_concurrent_messages
19-
self._stop_event = threading.Event()
20-
self._renewal_interval = 10 # seconds
19+
self._renewal_interval = 30 # seconds
20+
self.message_processing = 0
2121

2222
def _renew_message_lock(self, message, receiver):
23-
while not self._stop_event.is_set():
23+
while True:
2424
try:
2525
time.sleep(self._renewal_interval)
2626
if not message._lock_expired:
2727
receiver.renew_message_lock(message)
2828
except Exception as e:
2929
break
3030

31-
# old method to fetch messages. Not used anymore
32-
def messages(self, provider, topic, subscription):
33-
with provider.client:
34-
topic_receiver = provider.client.get_subscription_receiver(topic, subscription_name=subscription)
35-
logger.info(f'Started receiver for {subscription}')
36-
with topic_receiver:
37-
for message in topic_receiver:
38-
try:
39-
queue_message = QueueMessage.data_from(str(message))
40-
self._function_to_call(queue_message)
41-
except Exception as e:
42-
print(f'Error: {e}, Invalid message received: {message}')
43-
finally:
44-
topic_receiver.complete_message(message)
45-
logger.info('Completed topic receiver')
46-
4731
# Sends data to the callback function
4832
def process_message(self, message, receiver):
4933
queue_message = QueueMessage.data_from(str(message))
34+
if not message._lock_expired:
35+
lock_renewal_thread = threading.Thread(
36+
target=self._renew_message_lock,
37+
args=(message, receiver)
38+
)
39+
lock_renewal_thread.start()
5040
self._function_to_call(queue_message)
51-
receiver.complete_message(message)
5241

5342
# Starts listening to the messages
5443
def start_listening(self, provider, topic, subscription):
@@ -61,35 +50,39 @@ def start_listening(self, provider, topic, subscription):
6150
logger.info('Receiver started')
6251
with topic_receiver:
6352
while True:
64-
try:
65-
messages = topic_receiver.receive_messages(
66-
max_message_count=self._max_concurrent_messages,
67-
max_wait_time=5
68-
)
69-
if not messages:
70-
continue
71-
72-
for message in messages:
73-
stop_event = threading.Event()
74-
lock_renewal_thread = threading.Thread(
75-
target=self._renew_message_lock,
76-
args=(message, topic_receiver)
53+
available_slots = self._max_concurrent_messages - self.message_processing
54+
if available_slots > 0:
55+
try:
56+
messages = topic_receiver.receive_messages(
57+
max_message_count=available_slots,
58+
max_wait_time=5
7759
)
78-
lock_renewal_thread.start()
79-
try:
80-
self.process_message(message, topic_receiver)
81-
except Exception as e:
82-
logger.error(f'Error processing message: {message}. Error: {e}')
83-
topic_receiver.abandon_message(message)
84-
logger.info(f'Message {message.message_id} abandoned')
85-
finally:
86-
stop_event.set() # Signal the lock renewal thread to stop
87-
lock_renewal_thread.join()
88-
except Exception as et:
89-
logger.error(f'Error in service bus connection: {et}')
90-
# Change mode from PEEK_LOCK to RECEIVE_AND_DELETE
60+
if not messages:
61+
continue
62+
for message in messages:
63+
self.message_processing += 1
64+
threading.Thread(
65+
target=self._process_message_in_thread,
66+
args=(message, topic_receiver,)).start()
67+
except Exception as et:
68+
logger.error(f'Error in service bus connection: {et}')
69+
else:
70+
time.sleep(10) # Short sleep to prevent tight loop if no slots available
71+
9172
logger.info('Receiver stopped')
9273

74+
def _process_message_in_thread(self, message, topic_receiver):
75+
try:
76+
self.process_message(message=message, receiver=topic_receiver)
77+
except Exception as e:
78+
logger.error(f'Error: {e}, Invalid message received: {message}')
79+
finally:
80+
try:
81+
topic_receiver.complete_message(message) # Mark the message as complete
82+
except Exception as err:
83+
logger.error(f'Error completing the message: {err}')
84+
self.message_processing -= 1
85+
9386

9487
class Topic(TopicAbstract):
9588
def __init__(self, config=None, topic_name=None, max_concurrent_messages=1):

tests/unit_tests/test_topic/test_topic.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,78 @@ def test_publish(self):
6161
mock_sender.send_messages.assert_called_once()
6262

6363

64+
class TestCallback(unittest.TestCase):
65+
66+
def setUp(self):
67+
self.mock_function = MagicMock()
68+
self.callback = Callback(fn=self.mock_function, max_concurrent_messages=2)
69+
self.mock_message = MagicMock()
70+
self.mock_receiver = MagicMock()
71+
self.mock_provider = MagicMock()
72+
self.mock_topic_receiver = MagicMock()
73+
self.mock_provider.client.get_subscription_receiver.return_value = self.mock_topic_receiver
74+
self.mock_topic_receiver.receive_messages.return_value = [self.mock_message]
75+
76+
def test_renew_message_lock(self):
77+
self.mock_message._lock_expired = False
78+
self.callback._renewal_interval = 0 # For fast testing
79+
with patch.object(self.mock_receiver, 'renew_message_lock') as mock_renew_message_lock:
80+
with patch('time.sleep', side_effect=[None, Exception()]):
81+
self.callback._renew_message_lock(self.mock_message, self.mock_receiver)
82+
mock_renew_message_lock.assert_called_with(self.mock_message)
83+
84+
@patch('threading.Thread')
85+
def test_process_message_lock_expired(self, mock_thread_class):
86+
self.mock_message._lock_expired = True
87+
with patch.object(QueueMessage, 'data_from', return_value=QueueMessage()):
88+
self.callback.process_message(self.mock_message, self.mock_receiver)
89+
self.mock_function.assert_called_once()
90+
mock_thread_class.assert_not_called()
91+
92+
@patch('threading.Thread')
93+
def test_process_message(self, mock_thread_class):
94+
self.mock_message._lock_expired = False
95+
with patch.object(QueueMessage, 'data_from', return_value=QueueMessage()):
96+
self.callback.process_message(self.mock_message, self.mock_receiver)
97+
self.mock_function.assert_called_once()
98+
mock_thread_class.assert_called_once()
99+
100+
def test_process_message_in_thread_success(self):
101+
with patch.object(self.callback, 'process_message') as mock_process_message:
102+
self.callback.message_processing = 1
103+
self.callback._process_message_in_thread(self.mock_message, self.mock_receiver)
104+
mock_process_message.assert_called_once_with(message=self.mock_message, receiver=self.mock_receiver)
105+
self.mock_receiver.complete_message.assert_called_once_with(self.mock_message)
106+
self.assertEqual(self.callback.message_processing, 0)
107+
108+
def test_process_message_in_thread_process_message_error(self):
109+
with patch.object(self.callback, 'process_message', side_effect=Exception("Mocked Exception")):
110+
self.callback.message_processing = 1
111+
self.callback._process_message_in_thread(self.mock_message, self.mock_receiver)
112+
self.mock_receiver.complete_message.assert_called_once_with(self.mock_message)
113+
self.assertEqual(self.callback.message_processing, 0)
114+
115+
def test_process_message_in_thread_complete_message_error(self):
116+
with patch.object(self.callback, 'process_message'):
117+
self.mock_receiver.complete_message.side_effect = Exception("Mocked Exception")
118+
self.callback.message_processing = 1
119+
self.callback._process_message_in_thread(self.mock_message, self.mock_receiver)
120+
self.mock_receiver.complete_message.assert_called_once_with(self.mock_message)
121+
self.assertEqual(self.callback.message_processing, 0)
122+
123+
@patch('src.python_ms_core.core.topic.topic.logger')
124+
def test_process_message_in_thread_both_errors(self, mock_logger):
125+
with patch.object(self.callback, 'process_message', side_effect=Exception("Mocked Process Exception")):
126+
self.mock_receiver.complete_message.side_effect = Exception("Mocked Complete Exception")
127+
self.callback.message_processing = 1
128+
self.callback._process_message_in_thread(self.mock_message, self.mock_receiver)
129+
self.mock_receiver.complete_message.assert_called_once_with(self.mock_message)
130+
self.assertEqual(self.callback.message_processing, 0)
131+
self.assertEqual(mock_logger.error.call_count, 2)
132+
mock_logger.error.assert_any_call(
133+
f'Error: Mocked Process Exception, Invalid message received: {self.mock_message}')
134+
mock_logger.error.assert_any_call(f'Error completing the message: Mocked Complete Exception')
135+
136+
64137
if __name__ == '__main__':
65138
unittest.main()

0 commit comments

Comments
 (0)