Skip to content

Commit 30e447b

Browse files
committed
Feature fix lock issue
1 parent 779729a commit 30e447b

6 files changed

Lines changed: 152 additions & 25 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Change log
22

3+
# Version 0.0.25
4+
### Fixes
5+
- **Azure Topic Settlement Stability:** Moved Azure Service Bus message settlement back onto the receiver-owning loop instead of settling from worker callback threads. This keeps receive and complete/abandon operations on the same receiver flow for long-running jobs.
6+
- **Receiver Slot Tracking:** Reserved and released in-flight message slots on the receive loop so concurrency limits remain accurate while messages are still processing.
7+
- **Lock Renewal Diagnostics:** Added logging for Service Bus lock-renew failures to make long-running lock-loss issues visible before final settlement.
8+
39
# Version 0.0.23
410
- Updated unit test cases pipeline
511
- Added support to upload test cases results on Azure blob
@@ -109,4 +115,4 @@ Reference task:
109115
- Added classes and methods
110116
- Topic
111117
- publish
112-
- subscribe
118+
- subscribe

MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include requirements.txt

src/python_ms_core/core/topic/azure_topic.py

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
5454
self.max_renewal_duration = 86400 # Renew the message upto 1 day
5555
self.wait_time_for_message = 5
5656
self.thread_lock = threading.Lock()
57+
self.pending_tasks = []
5758

5859

5960
def publish(self, data: QueueMessage):
@@ -78,23 +79,36 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1):
7879
self.receiver.local_received_messages = 0
7980
while True:
8081
try:
81-
to_receive = (self.max_concurrent_messages - self.internal_count)
82-
total_messages_to_recieve_more = max_receivable_messages - self.receiver.local_received_messages
83-
if max_receivable_messages > 0:
84-
to_receive = min(to_receive, total_messages_to_recieve_more)
82+
self._settle_completed_tasks()
83+
to_receive = self._get_receivable_count(max_receivable_messages=max_receivable_messages)
84+
if max_receivable_messages > 0 and self.receiver.local_received_messages >= max_receivable_messages:
85+
if len(self.pending_tasks) == 0:
86+
break
87+
self._wait_for_pending_tasks(timeout=0.5)
88+
continue
8589
if to_receive > 0:
8690
messages = self.receiver.receive_messages(max_message_count=to_receive, max_wait_time=self.wait_time_for_message)
8791
if not messages or len(messages) == 0:
92+
if len(self.pending_tasks) > 0:
93+
self._wait_for_pending_tasks(timeout=0.5)
8894
continue
8995
self.receiver.local_received_messages += len(messages)
96+
with self.thread_lock:
97+
self.internal_count += len(messages)
9098
for message in messages:
91-
self.lock_renewal.register(self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration)
99+
self.lock_renewal.register(
100+
self.receiver,
101+
message,
102+
max_lock_renewal_duration=self.max_renewal_duration,
103+
on_lock_renew_failure=self._handle_lock_renew_failure,
104+
)
92105
execution_task = self.executor.submit(self.internal_callback, message, callback)
93-
execution_task.add_done_callback(lambda x: self.settle_message(x))
94-
if self.receiver.local_received_messages >= max_receivable_messages and max_receivable_messages > 0: # Break if the messages are more than max_receivable_messages
95-
break
106+
self.pending_tasks.append((execution_task, message))
96107
else:
97-
time.sleep(self.wait_time_for_message)
108+
if len(self.pending_tasks) > 0:
109+
self._wait_for_pending_tasks(timeout=0.5)
110+
else:
111+
time.sleep(self.wait_time_for_message)
98112
except Exception as e:
99113
logger.error(f'Error in receiving messages: {e}')
100114

@@ -109,8 +123,6 @@ def internal_callback(self, message, callbackfn):
109123
ServiceBusMessage: The processed message.
110124
"""
111125
try:
112-
with self.thread_lock:
113-
self.internal_count += 1 # thread safe.
114126
queue_message = QueueMessage.data_from(str(message))
115127
callbackfn(queue_message)
116128
return [True,message]
@@ -120,21 +132,58 @@ def internal_callback(self, message, callbackfn):
120132

121133

122134
def settle_message(self, x: cf.Future):
135+
return self._settle_task(x)
136+
137+
def _get_receivable_count(self, max_receivable_messages=-1):
138+
with self.thread_lock:
139+
available_slots = self.max_concurrent_messages - self.internal_count
140+
if max_receivable_messages > 0:
141+
remaining_messages = max_receivable_messages - self.receiver.local_received_messages
142+
available_slots = min(available_slots, remaining_messages)
143+
return max(available_slots, 0)
144+
145+
def _wait_for_pending_tasks(self, timeout=0.5):
146+
if len(self.pending_tasks) == 0:
147+
return
148+
futures = [future for future, _ in self.pending_tasks]
149+
cf.wait(futures, timeout=timeout, return_when=cf.FIRST_COMPLETED)
150+
self._settle_completed_tasks()
151+
152+
def _settle_completed_tasks(self):
153+
remaining_tasks = []
154+
for future, incoming_message in self.pending_tasks:
155+
if future.done():
156+
self._settle_task(future, incoming_message=incoming_message)
157+
else:
158+
remaining_tasks.append((future, incoming_message))
159+
self.pending_tasks = remaining_tasks
160+
161+
def _settle_task(self, x: cf.Future, incoming_message=None):
123162
"""
124163
Sets the message as completed and updates the internal count.
125164
Args:
126165
x (cf.Future): The future object representing the message processing.
127166
"""
128-
# Lock the internal count
129-
with self.thread_lock:
130-
self.internal_count -= 1
131-
# Check if the future has an exception
132-
[is_success,incoming_message] = x.result()
133-
if is_success:
134-
self.receiver.complete_message(incoming_message)
135-
else:
136-
print(f'Abandoning message: {incoming_message}')
137-
self.receiver.abandon_message(incoming_message) # send back to the topic
138-
return
167+
try:
168+
[is_success, settled_message] = x.result()
169+
if settled_message is not None:
170+
incoming_message = settled_message
171+
if incoming_message is None:
172+
return
173+
if is_success:
174+
self.receiver.complete_message(incoming_message)
175+
else:
176+
logger.info(f'Abandoning message: {incoming_message}')
177+
self.receiver.abandon_message(incoming_message) # send back to the topic
178+
except Exception as e:
179+
logger.error(f'Error in settling message: {e}')
180+
finally:
181+
with self.thread_lock:
182+
self.internal_count = max(self.internal_count - 1, 0)
183+
return
184+
185+
def _handle_lock_renew_failure(self, renewable, error):
186+
message_id = getattr(renewable, 'message_id', None) or getattr(renewable, 'messageId', 'unknown')
187+
logger.error(f'Error renewing lock for message {message_id}: {error}')
139188

140-
189+

src/python_ms_core/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.0.24'
1+
__version__ = '0.0.25'
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import unittest
2+
import concurrent.futures as cf
3+
from unittest.mock import MagicMock, patch
4+
5+
from src.python_ms_core.core.topic.azure_topic import AzureTopic
6+
from src.python_ms_core.core.queue.models.queue_message import QueueMessage
7+
8+
9+
class TestAzureTopic(unittest.TestCase):
10+
11+
@patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer')
12+
@patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient')
13+
def test_subscribe_settles_completed_tasks_on_receiver_loop(self, mock_service_bus_client, mock_auto_lock_renewer):
14+
mock_client = MagicMock()
15+
mock_receiver = MagicMock()
16+
mock_message = MagicMock()
17+
mock_future = MagicMock()
18+
mock_config = MagicMock(connection_string='Endpoint=sb://test/')
19+
20+
mock_service_bus_client.from_connection_string.return_value = mock_client
21+
mock_client.get_topic_sender.return_value = MagicMock()
22+
mock_client.get_subscription_receiver.return_value = mock_receiver
23+
mock_receiver.receive_messages.side_effect = [[mock_message]]
24+
25+
topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1)
26+
callback = MagicMock()
27+
28+
def submit_side_effect(fn, *args, **kwargs):
29+
mock_future = cf.Future()
30+
mock_future.set_result(fn(*args, **kwargs))
31+
return mock_future
32+
33+
topic.executor.submit = MagicMock(side_effect=submit_side_effect)
34+
with patch.object(QueueMessage, 'data_from', return_value=QueueMessage()):
35+
topic.subscribe(subscription='mock-subscription', callback=callback, max_receivable_messages=1)
36+
37+
callback.assert_called_once()
38+
mock_auto_lock_renewer.return_value.register.assert_called_once()
39+
mock_receiver.complete_message.assert_called_once_with(mock_message)
40+
mock_receiver.abandon_message.assert_not_called()
41+
42+
@patch('src.python_ms_core.core.topic.azure_topic.logger')
43+
@patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer')
44+
@patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient')
45+
def test_settle_task_logs_error_and_releases_slot(self, mock_service_bus_client, mock_auto_lock_renewer, mock_logger):
46+
mock_client = MagicMock()
47+
mock_receiver = MagicMock()
48+
mock_message = MagicMock()
49+
mock_future = MagicMock()
50+
mock_config = MagicMock(connection_string='Endpoint=sb://test/')
51+
52+
mock_service_bus_client.from_connection_string.return_value = mock_client
53+
mock_client.get_topic_sender.return_value = MagicMock()
54+
mock_client.get_subscription_receiver.return_value = mock_receiver
55+
mock_future.result.return_value = [True, mock_message]
56+
mock_receiver.complete_message.side_effect = Exception('Mocked settlement failure')
57+
58+
topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1)
59+
topic.receiver = mock_receiver
60+
topic.internal_count = 1
61+
62+
topic._settle_task(mock_future, incoming_message=mock_message)
63+
64+
mock_receiver.complete_message.assert_called_once_with(mock_message)
65+
mock_logger.error.assert_called_once_with('Error in settling message: Mocked settlement failure')
66+
self.assertEqual(topic.internal_count, 0)
67+
68+
69+
if __name__ == '__main__':
70+
unittest.main()

version.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
version = '0.0.25'

0 commit comments

Comments
 (0)