Skip to content

Commit 7c4a4ad

Browse files
committed
Trying from fix lock issue
1 parent 30e447b commit 7c4a4ad

4 files changed

Lines changed: 282 additions & 8 deletions

File tree

src/python_ms_core/core/topic/azure_topic.py

Lines changed: 140 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import json
22

3+
import gc
34
import logging
5+
import os
46
import time
57
from ..config.config import TopicConfig
68
from ..resource_errors import ExceptionHandler
@@ -12,6 +14,11 @@
1214
import concurrent.futures as cf
1315
import threading
1416

17+
try:
18+
import psutil
19+
except ImportError: # pragma: no cover - dependency exists in the package, but keep logging resilient.
20+
psutil = None
21+
1522

1623
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
1724
logger = logging.getLogger('AzureTopic')
@@ -50,11 +57,37 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
5057
self.publisher = self.client.get_topic_sender(topic_name=topic_name)
5158
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages)
5259
self.internal_count = 0
53-
self.lock_renewal = AutoLockRenewer(max_workers=max_concurrent_messages)
54-
self.max_renewal_duration = 86400 # Renew the message upto 1 day
60+
self.max_renewal_duration = self._get_positive_int_from_env(
61+
'TOPIC_MAX_LOCK_RENEWAL_DURATION_SECONDS',
62+
86400,
63+
) # Renew the message upto 1 day
64+
self.lock_renewal_margin = self._get_positive_int_from_env(
65+
'TOPIC_LOCK_RENEWAL_MARGIN_SECONDS',
66+
60,
67+
)
68+
renewer_max_workers = max(max_concurrent_messages, 2)
69+
self.lock_renewal = AutoLockRenewer(
70+
max_lock_renewal_duration=self.max_renewal_duration,
71+
on_lock_renew_failure=self._handle_lock_renew_failure,
72+
max_workers=renewer_max_workers,
73+
)
74+
# The SDK default renews only in the last 10 seconds of the lock window.
75+
# Start earlier so long-running jobs have more headroom for scheduler jitter.
76+
self.lock_renewal._renew_period = min(self.lock_renewal_margin, self.max_renewal_duration)
77+
self.lock_renew_receiver = _LockRenewLoggingReceiver(self)
5578
self.wait_time_for_message = 5
5679
self.thread_lock = threading.Lock()
5780
self.pending_tasks = []
81+
self._process = None
82+
self._prime_runtime_samplers()
83+
logger.info(
84+
'Configured lock renewal for topic %s: max_lock_renewal_duration=%s seconds, '
85+
'renew_margin=%s seconds, renewer_max_workers=%s',
86+
self.topic_name,
87+
self.max_renewal_duration,
88+
self.lock_renewal_margin,
89+
renewer_max_workers,
90+
)
5891

5992

6093
def publish(self, data: QueueMessage):
@@ -97,7 +130,7 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1):
97130
self.internal_count += len(messages)
98131
for message in messages:
99132
self.lock_renewal.register(
100-
self.receiver,
133+
self.lock_renew_receiver,
101134
message,
102135
max_lock_renewal_duration=self.max_renewal_duration,
103136
on_lock_renew_failure=self._handle_lock_renew_failure,
@@ -170,6 +203,13 @@ def _settle_task(self, x: cf.Future, incoming_message=None):
170203
incoming_message = settled_message
171204
if incoming_message is None:
172205
return
206+
if getattr(incoming_message, '_lock_expired', False):
207+
logger.error(
208+
f'Skipping settlement for message {self._get_message_id(incoming_message)} '
209+
f'because the lock expired at {getattr(incoming_message, "locked_until_utc", None)}. '
210+
f'auto_renew_error={getattr(incoming_message, "auto_renew_error", None)}'
211+
)
212+
return
173213
if is_success:
174214
self.receiver.complete_message(incoming_message)
175215
else:
@@ -183,7 +223,101 @@ def _settle_task(self, x: cf.Future, incoming_message=None):
183223
return
184224

185225
def _handle_lock_renew_failure(self, renewable, error):
186-
message_id = getattr(renewable, 'message_id', None) or getattr(renewable, 'messageId', 'unknown')
187-
logger.error(f'Error renewing lock for message {message_id}: {error}')
226+
message_id = self._get_message_id(renewable)
227+
failure_reason = error or getattr(renewable, 'auto_renew_error', None) or 'lock expired before renewal could complete'
228+
logger.error(
229+
f'Error renewing lock for message {message_id}: {failure_reason}; '
230+
f'locked_until_utc={getattr(renewable, "locked_until_utc", None)}; '
231+
f'runtime_snapshot={self._get_runtime_snapshot()}'
232+
)
188233

189-
234+
@staticmethod
235+
def _get_message_id(message):
236+
return getattr(message, 'message_id', None) or getattr(message, 'messageId', 'unknown')
237+
238+
def _prime_runtime_samplers(self):
239+
if psutil is None:
240+
return
241+
try:
242+
self._process = psutil.Process(os.getpid())
243+
self._process.cpu_percent(interval=None)
244+
psutil.cpu_percent(interval=None)
245+
except Exception: # pragma: no cover - best effort diagnostics
246+
self._process = None
247+
248+
def _get_runtime_snapshot(self):
249+
return f'{self._get_memory_snapshot()}, {self._get_cpu_snapshot()}, {self._get_gc_snapshot()}'
250+
251+
def _get_memory_snapshot(self):
252+
if self._process is None:
253+
return 'memory=psutil-unavailable'
254+
try:
255+
memory_info = self._process.memory_info()
256+
rss_mb = memory_info.rss / (1024 * 1024)
257+
vms_mb = memory_info.vms / (1024 * 1024)
258+
return f'memory=rss_mb={rss_mb:.2f}, vms_mb={vms_mb:.2f}, num_threads={self._process.num_threads()}'
259+
except Exception as exc: # pragma: no cover - diagnostic fallback
260+
return f'memory=unavailable({exc})'
261+
262+
def _get_cpu_snapshot(self):
263+
if self._process is None:
264+
return 'cpu=psutil-unavailable'
265+
try:
266+
process_cpu_percent = self._process.cpu_percent(interval=None)
267+
system_cpu_percent = psutil.cpu_percent(interval=None)
268+
return (
269+
f'cpu=process_percent={process_cpu_percent:.2f}, '
270+
f'system_percent={system_cpu_percent:.2f}'
271+
)
272+
except Exception as exc: # pragma: no cover - diagnostic fallback
273+
return f'cpu=unavailable({exc})'
274+
275+
@staticmethod
276+
def _get_gc_snapshot():
277+
try:
278+
gc_counts = gc.get_count()
279+
gc_thresholds = gc.get_threshold()
280+
gc_stats = gc.get_stats()
281+
generation_summaries = []
282+
for generation, stats in enumerate(gc_stats):
283+
generation_summaries.append(
284+
'gen{generation}[collections={collections}, collected={collected}, uncollectable={uncollectable}]'.format(
285+
generation=generation,
286+
collections=stats.get('collections', 0),
287+
collected=stats.get('collected', 0),
288+
uncollectable=stats.get('uncollectable', 0),
289+
)
290+
)
291+
return (
292+
f'gc=enabled={gc.isenabled()}, counts={gc_counts}, thresholds={gc_thresholds}, '
293+
f'stats={"; ".join(generation_summaries)}'
294+
)
295+
except Exception as exc: # pragma: no cover - diagnostic fallback
296+
return f'gc=unavailable({exc})'
297+
298+
@staticmethod
299+
def _get_positive_int_from_env(name, default):
300+
value = os.environ.get(name)
301+
if value is None:
302+
return default
303+
try:
304+
parsed = int(value)
305+
if parsed > 0:
306+
return parsed
307+
except (TypeError, ValueError):
308+
pass
309+
logger.warning(f'Invalid value for {name}: {value}. Using default {default}.')
310+
return default
311+
312+
class _LockRenewLoggingReceiver:
313+
def __init__(self, topic):
314+
self._topic = topic
315+
316+
def renew_message_lock(self, renewable):
317+
logger.info(
318+
'Attempting lock renewal for message %s; locked_until_utc=%s; runtime_snapshot=%s',
319+
self._topic._get_message_id(renewable),
320+
getattr(renewable, 'locked_until_utc', None),
321+
self._topic._get_runtime_snapshot(),
322+
)
323+
return self._topic.receiver.renew_message_lock(renewable)

src/python_ms_core/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.0.25'
1+
__version__ = '0.2.5.1'

tests/unit_tests/test_topic/test_azure_topic.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,25 @@
88

99
class TestAzureTopic(unittest.TestCase):
1010

11+
@patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer')
12+
@patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient')
13+
def test_init_sets_long_running_lock_renewal_defaults(self, mock_service_bus_client, mock_auto_lock_renewer):
14+
mock_client = MagicMock()
15+
mock_config = MagicMock(connection_string='Endpoint=sb://test/')
16+
mock_renewer = MagicMock()
17+
18+
mock_service_bus_client.from_connection_string.return_value = mock_client
19+
mock_client.get_topic_sender.return_value = MagicMock()
20+
mock_auto_lock_renewer.return_value = mock_renewer
21+
22+
AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1)
23+
24+
mock_auto_lock_renewer.assert_called_once()
25+
_, kwargs = mock_auto_lock_renewer.call_args
26+
self.assertEqual(kwargs['max_lock_renewal_duration'], 86400)
27+
self.assertEqual(kwargs['max_workers'], 2)
28+
self.assertEqual(mock_renewer._renew_period, 60)
29+
1130
@patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer')
1231
@patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient')
1332
def test_subscribe_settles_completed_tasks_on_receiver_loop(self, mock_service_bus_client, mock_auto_lock_renewer):
@@ -16,6 +35,7 @@ def test_subscribe_settles_completed_tasks_on_receiver_loop(self, mock_service_b
1635
mock_message = MagicMock()
1736
mock_future = MagicMock()
1837
mock_config = MagicMock(connection_string='Endpoint=sb://test/')
38+
mock_message._lock_expired = False
1939

2040
mock_service_bus_client.from_connection_string.return_value = mock_client
2141
mock_client.get_topic_sender.return_value = MagicMock()
@@ -48,6 +68,7 @@ def test_settle_task_logs_error_and_releases_slot(self, mock_service_bus_client,
4868
mock_message = MagicMock()
4969
mock_future = MagicMock()
5070
mock_config = MagicMock(connection_string='Endpoint=sb://test/')
71+
mock_message._lock_expired = False
5172

5273
mock_service_bus_client.from_connection_string.return_value = mock_client
5374
mock_client.get_topic_sender.return_value = MagicMock()
@@ -65,6 +86,122 @@ def test_settle_task_logs_error_and_releases_slot(self, mock_service_bus_client,
6586
mock_logger.error.assert_called_once_with('Error in settling message: Mocked settlement failure')
6687
self.assertEqual(topic.internal_count, 0)
6788

89+
@patch('src.python_ms_core.core.topic.azure_topic.logger')
90+
@patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer')
91+
@patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient')
92+
def test_settle_task_skips_expired_message(self, mock_service_bus_client, mock_auto_lock_renewer, mock_logger):
93+
mock_client = MagicMock()
94+
mock_receiver = MagicMock()
95+
mock_message = MagicMock()
96+
mock_future = MagicMock()
97+
mock_config = MagicMock(connection_string='Endpoint=sb://test/')
98+
99+
mock_message._lock_expired = True
100+
mock_message.message_id = 'message-1'
101+
mock_message.locked_until_utc = '2026-03-17T09:39:28Z'
102+
mock_message.auto_renew_error = None
103+
mock_service_bus_client.from_connection_string.return_value = mock_client
104+
mock_client.get_topic_sender.return_value = MagicMock()
105+
mock_client.get_subscription_receiver.return_value = mock_receiver
106+
mock_future.result.return_value = [True, mock_message]
107+
108+
topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1)
109+
topic.receiver = mock_receiver
110+
topic.internal_count = 1
111+
112+
topic._settle_task(mock_future, incoming_message=mock_message)
113+
114+
mock_receiver.complete_message.assert_not_called()
115+
mock_receiver.abandon_message.assert_not_called()
116+
mock_logger.error.assert_called_once_with(
117+
'Skipping settlement for message message-1 because the lock expired at '
118+
'2026-03-17T09:39:28Z. auto_renew_error=None'
119+
)
120+
self.assertEqual(topic.internal_count, 0)
121+
122+
@patch('src.python_ms_core.core.topic.azure_topic.logger')
123+
@patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer')
124+
@patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient')
125+
def test_handle_lock_renew_failure_logs_when_sdk_returns_no_error(
126+
self,
127+
mock_service_bus_client,
128+
mock_auto_lock_renewer,
129+
mock_logger,
130+
):
131+
mock_client = MagicMock()
132+
mock_config = MagicMock(connection_string='Endpoint=sb://test/')
133+
mock_message = MagicMock()
134+
135+
mock_service_bus_client.from_connection_string.return_value = mock_client
136+
mock_client.get_topic_sender.return_value = MagicMock()
137+
mock_message.message_id = 'message-1'
138+
mock_message.locked_until_utc = '2026-03-17T09:39:28Z'
139+
mock_message.auto_renew_error = None
140+
141+
topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1)
142+
topic._get_runtime_snapshot = MagicMock(
143+
return_value=(
144+
'memory=rss_mb=128.00, vms_mb=256.00, num_threads=4, '
145+
'cpu=process_percent=80.00, system_percent=91.00, '
146+
'gc=enabled=True, counts=(1, 2, 3), thresholds=(700, 10, 10), '
147+
'stats=gen0[collections=1, collected=2, uncollectable=0]'
148+
)
149+
)
150+
151+
topic._handle_lock_renew_failure(mock_message, None)
152+
153+
mock_logger.error.assert_called_once_with(
154+
'Error renewing lock for message message-1: lock expired before renewal could complete; '
155+
'locked_until_utc=2026-03-17T09:39:28Z; '
156+
'runtime_snapshot=memory=rss_mb=128.00, vms_mb=256.00, num_threads=4, '
157+
'cpu=process_percent=80.00, system_percent=91.00, '
158+
'gc=enabled=True, counts=(1, 2, 3), thresholds=(700, 10, 10), '
159+
'stats=gen0[collections=1, collected=2, uncollectable=0]'
160+
)
161+
162+
@patch('src.python_ms_core.core.topic.azure_topic.logger')
163+
@patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer')
164+
@patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient')
165+
def test_lock_renew_attempt_logs_memory_snapshot(
166+
self,
167+
mock_service_bus_client,
168+
mock_auto_lock_renewer,
169+
mock_logger,
170+
):
171+
mock_client = MagicMock()
172+
mock_config = MagicMock(connection_string='Endpoint=sb://test/')
173+
mock_service_bus_client.from_connection_string.return_value = mock_client
174+
mock_client.get_topic_sender.return_value = MagicMock()
175+
mock_auto_lock_renewer.return_value = MagicMock()
176+
177+
topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1)
178+
topic._get_runtime_snapshot = MagicMock(
179+
return_value=(
180+
'memory=rss_mb=64.00, vms_mb=128.00, num_threads=3, '
181+
'cpu=process_percent=55.00, system_percent=70.00, '
182+
'gc=enabled=True, counts=(4, 5, 6), thresholds=(700, 10, 10), '
183+
'stats=gen0[collections=3, collected=10, uncollectable=0]'
184+
)
185+
)
186+
187+
mock_message = MagicMock()
188+
mock_message.message_id = 'message-2'
189+
mock_message.locked_until_utc = '2026-03-17T09:39:33Z'
190+
topic.receiver = MagicMock()
191+
192+
topic.lock_renew_receiver.renew_message_lock(mock_message)
193+
194+
mock_logger.info.assert_any_call(
195+
'Attempting lock renewal for message %s; locked_until_utc=%s; runtime_snapshot=%s',
196+
'message-2',
197+
'2026-03-17T09:39:33Z',
198+
'memory=rss_mb=64.00, vms_mb=128.00, num_threads=3, '
199+
'cpu=process_percent=55.00, system_percent=70.00, '
200+
'gc=enabled=True, counts=(4, 5, 6), thresholds=(700, 10, 10), '
201+
'stats=gen0[collections=3, collected=10, uncollectable=0]',
202+
)
203+
topic.receiver.renew_message_lock.assert_called_once_with(mock_message)
204+
68205

69206
if __name__ == '__main__':
70207
unittest.main()

version.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
1-
version = '0.0.25'
1+
version = '0.2.5.1'
2+
lastCommit = '30e447b527d6744b5ce72e9460f21e22675a3126'
3+
lastCommitShort = '30e4'
4+
buildDate = '2026-03-17'

0 commit comments

Comments
 (0)