Skip to content

Commit 9c812a5

Browse files
committed
Introduces ThreadPoolExecutor and updated unit test cases
1 parent 16753dc commit 9c812a5

2 files changed

Lines changed: 34 additions & 29 deletions

File tree

src/python_ms_core/core/topic/topic.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import json
22
import logging
3-
import threading
43
import time
54
from .config.topic_config import Config
6-
from .abstract.topic_abstract import TopicAbstract
75
from ..resource_errors import ExceptionHandler
6+
from concurrent.futures import ThreadPoolExecutor
7+
from .abstract.topic_abstract import TopicAbstract
88
from ..queue.models.queue_message import QueueMessage
99

1010
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
@@ -18,6 +18,7 @@ def __init__(self, fn=None, max_concurrent_messages=1):
1818
self._max_concurrent_messages = max_concurrent_messages
1919
self._renewal_interval = 30 # seconds
2020
self.message_processing = 0
21+
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages)
2122

2223
def _renew_message_lock(self, message, receiver):
2324
while True:
@@ -31,12 +32,9 @@ def _renew_message_lock(self, message, receiver):
3132
# Sends data to the callback function
3233
def process_message(self, message, receiver):
3334
queue_message = QueueMessage.data_from(str(message))
35+
secondary_executor = ThreadPoolExecutor(max_workers=1)
3436
if not message._lock_expired:
35-
lock_renewal_thread = threading.Thread(
36-
target=self._renew_message_lock,
37-
args=(message, receiver)
38-
)
39-
lock_renewal_thread.start()
37+
secondary_executor.submit(self._renew_message_lock, message, receiver)
4038
self._function_to_call(queue_message)
4139

4240
# Starts listening to the messages
@@ -61,9 +59,7 @@ def start_listening(self, provider, topic, subscription):
6159
continue
6260
for message in messages:
6361
self.message_processing += 1
64-
threading.Thread(
65-
target=self._process_message_in_thread,
66-
args=(message, topic_receiver,)).start()
62+
self.executor.submit(self._process_message_in_thread, message, topic_receiver)
6763
except Exception as et:
6864
logger.error(f'Error in service bus connection: {et}')
6965
else:
@@ -94,9 +90,7 @@ def __init__(self, config=None, topic_name=None, max_concurrent_messages=1):
9490
def subscribe(self, subscription=None, callback=None):
9591
if subscription is not None:
9692
cb = Callback(callback, max_concurrent_messages=self.max_concurrent_messages)
97-
thread = threading.Thread(target=cb.start_listening, args=(self.provider, self.topic, subscription))
98-
thread.start()
99-
time.sleep(5)
93+
cb.start_listening(self.provider, self.topic, subscription)
10094
else:
10195
logging.error(
10296
f'Unimplemented initialize for core {self.provider.provider}, Subscription name is required!')

tests/unit_tests/test_topic/test_topic.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,33 @@ def test_subscribe_with_subscription(self):
1919
mock_callback = MagicMock()
2020
mock_provider = MagicMock()
2121
mock_client = MagicMock()
22-
mock_thread = MagicMock()
23-
24-
with patch('threading.Thread', return_value=mock_thread) as mock_thread_class:
25-
topic = Topic(config=self.mock_config, topic_name='mock_topic')
26-
topic.provider = mock_provider
27-
mock_provider.client = mock_client
28-
29-
with patch.object(Callback, 'start_listening', return_value=None) as mock_start_listening:
30-
topic.subscribe(subscription='mock_subscription', callback=mock_callback)
31-
32-
# Assertions on the objects
33-
mock_thread.start.assert_called_once()
34-
mock_thread_class.assert_called_once_with(
35-
target=mock_start_listening,
36-
args=(mock_provider, 'mock_topic', 'mock_subscription')
37-
)
22+
23+
topic = Topic(config=self.mock_config, topic_name='mock_topic')
24+
topic.provider = mock_provider
25+
mock_provider.client = mock_client
26+
27+
with patch.object(Callback, 'start_listening', return_value=None) as mock_start_listening:
28+
topic.subscribe(subscription='mock_subscription', callback=mock_callback)
29+
30+
# Assertions on the objects
31+
mock_start_listening.assert_called_once_with(mock_provider, 'mock_topic', 'mock_subscription')
32+
33+
def test_subscribe_without_subscription(self):
34+
mock_callback = MagicMock()
35+
mock_provider = MagicMock()
36+
mock_client = MagicMock()
37+
38+
topic = Topic(config=self.mock_config, topic_name='mock_topic')
39+
topic.provider = mock_provider
40+
mock_provider.client = mock_client
41+
42+
with patch('logging.error') as mock_logging_error:
43+
topic.subscribe(subscription=None, callback=mock_callback)
44+
45+
# Assertions
46+
mock_logging_error.assert_called_once_with(
47+
f'Unimplemented initialize for core {mock_provider.provider}, Subscription name is required!'
48+
)
3849

3950
def test_publish(self):
4051
data = {'test': random.randint(0, 1000)}

0 commit comments

Comments
 (0)