11import json
2- import gc
32import logging
43import multiprocessing as mp
54import os
1312from azure .servicebus import AutoLockRenewer
1413import threading
1514
16- try :
17- import psutil
18- except ImportError : # pragma: no cover - dependency exists in the package, but keep logging resilient.
19- psutil = None
2015
21-
22- logging .basicConfig (format = '%(asctime)s %(levelname)-8s %(message)s' , datefmt = '%Y-%m-%d %H:%M:%S' )
2316logger = logging .getLogger ('AzureTopic' )
24- logger .setLevel (logging .INFO )
2517
2618
2719"""
@@ -59,14 +51,8 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
5951 self .callback_process_start_method = self ._get_process_start_method ()
6052 self .process_context = self ._get_process_context ()
6153 self .internal_count = 0
62- self .max_renewal_duration = self ._get_positive_int_from_env (
63- 'TOPIC_MAX_LOCK_RENEWAL_DURATION_SECONDS' ,
64- 86400 ,
65- ) # Renew the message upto 1 day
66- self .lock_renewal_margin = self ._get_positive_int_from_env (
67- 'TOPIC_LOCK_RENEWAL_MARGIN_SECONDS' ,
68- 60 ,
69- )
54+ self .max_renewal_duration = 86400 # Renew the message upto 1 day
55+ self .lock_renewal_margin = 60
7056 renewer_max_workers = max (max_concurrent_messages , 2 )
7157 self .lock_renewal = AutoLockRenewer (
7258 max_lock_renewal_duration = self .max_renewal_duration ,
@@ -76,22 +62,9 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
7662 # The SDK default renews only in the last 10 seconds of the lock window.
7763 # Start earlier so long-running jobs have more headroom for scheduler jitter.
7864 self .lock_renewal ._renew_period = min (self .lock_renewal_margin , self .max_renewal_duration )
79- self .lock_renew_receiver = _LockRenewLoggingReceiver (self )
8065 self .wait_time_for_message = 5
8166 self .thread_lock = threading .Lock ()
8267 self .pending_tasks = []
83- self ._process = None
84- self ._prime_runtime_samplers ()
85- logger .info (
86- 'Configured AzureTopic %s: execution_mode=%s, process_start_method=%s, '
87- 'max_lock_renewal_duration=%s seconds, renew_margin=%s seconds, renewer_max_workers=%s' ,
88- self .topic_name ,
89- self .callback_execution_mode ,
90- self .callback_process_start_method ,
91- self .max_renewal_duration ,
92- self .lock_renewal_margin ,
93- renewer_max_workers ,
94- )
9568
9669
9770 def publish (self , data : QueueMessage ):
@@ -135,7 +108,7 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1):
135108 for message in messages :
136109 execution_task = self ._submit_processing_task (message , callback )
137110 self .lock_renewal .register (
138- self .lock_renew_receiver ,
111+ self .receiver ,
139112 message ,
140113 max_lock_renewal_duration = self .max_renewal_duration ,
141114 on_lock_renew_failure = self ._handle_lock_renew_failure ,
@@ -283,88 +256,13 @@ def _handle_lock_renew_failure(self, renewable, error):
283256 failure_reason = error or getattr (renewable , 'auto_renew_error' , None ) or 'lock expired before renewal could complete'
284257 logger .error (
285258 f'Error renewing lock for message { message_id } : { failure_reason } ; '
286- f'locked_until_utc={ getattr (renewable , "locked_until_utc" , None )} ; '
287- f'runtime_snapshot={ self ._get_runtime_snapshot ()} '
259+ f'locked_until_utc={ getattr (renewable , "locked_until_utc" , None )} '
288260 )
289261
290262 @staticmethod
291263 def _get_message_id (message ):
292264 return getattr (message , 'message_id' , None ) or getattr (message , 'messageId' , 'unknown' )
293265
294- def _prime_runtime_samplers (self ):
295- if psutil is None :
296- return
297- try :
298- self ._process = psutil .Process (os .getpid ())
299- self ._process .cpu_percent (interval = None )
300- psutil .cpu_percent (interval = None )
301- except Exception : # pragma: no cover - best effort diagnostics
302- self ._process = None
303-
304- def _get_runtime_snapshot (self ):
305- return f'{ self ._get_memory_snapshot ()} , { self ._get_cpu_snapshot ()} , { self ._get_gc_snapshot ()} '
306-
307- def _get_memory_snapshot (self ):
308- if self ._process is None :
309- return 'memory=psutil-unavailable'
310- try :
311- memory_info = self ._process .memory_info ()
312- rss_mb = memory_info .rss / (1024 * 1024 )
313- vms_mb = memory_info .vms / (1024 * 1024 )
314- return f'memory=rss_mb={ rss_mb :.2f} , vms_mb={ vms_mb :.2f} , num_threads={ self ._process .num_threads ()} '
315- except Exception as exc : # pragma: no cover - diagnostic fallback
316- return f'memory=unavailable({ exc } )'
317-
318- def _get_cpu_snapshot (self ):
319- if self ._process is None :
320- return 'cpu=psutil-unavailable'
321- try :
322- process_cpu_percent = self ._process .cpu_percent (interval = None )
323- system_cpu_percent = psutil .cpu_percent (interval = None )
324- return (
325- f'cpu=process_percent={ process_cpu_percent :.2f} , '
326- f'system_percent={ system_cpu_percent :.2f} '
327- )
328- except Exception as exc : # pragma: no cover - diagnostic fallback
329- return f'cpu=unavailable({ exc } )'
330-
331- @staticmethod
332- def _get_gc_snapshot ():
333- try :
334- gc_counts = gc .get_count ()
335- gc_thresholds = gc .get_threshold ()
336- gc_stats = gc .get_stats ()
337- generation_summaries = []
338- for generation , stats in enumerate (gc_stats ):
339- generation_summaries .append (
340- 'gen{generation}[collections={collections}, collected={collected}, uncollectable={uncollectable}]' .format (
341- generation = generation ,
342- collections = stats .get ('collections' , 0 ),
343- collected = stats .get ('collected' , 0 ),
344- uncollectable = stats .get ('uncollectable' , 0 ),
345- )
346- )
347- return (
348- f'gc=enabled={ gc .isenabled ()} , counts={ gc_counts } , thresholds={ gc_thresholds } , '
349- f'stats={ "; " .join (generation_summaries )} '
350- )
351- except Exception as exc : # pragma: no cover - diagnostic fallback
352- return f'gc=unavailable({ exc } )'
353-
354- @staticmethod
355- def _get_positive_int_from_env (name , default ):
356- value = os .environ .get (name )
357- if value is None :
358- return default
359- try :
360- parsed = int (value )
361- if parsed > 0 :
362- return parsed
363- except (TypeError , ValueError ):
364- pass
365- logger .warning (f'Invalid value for { name } : { value } . Using default { default } .' )
366- return default
367-
368266 @staticmethod
369267 def _get_callback_execution_mode ():
370268 value = os .environ .get ('TOPIC_CALLBACK_EXECUTION_MODE' , 'process' )
@@ -405,19 +303,6 @@ def _get_process_context(self):
405303 self .callback_execution_mode = 'thread'
406304 return None
407305
408- class _LockRenewLoggingReceiver :
409- def __init__ (self , topic ):
410- self ._topic = topic
411-
412- def renew_message_lock (self , renewable ):
413- logger .info (
414- 'Attempting lock renewal for message %s; locked_until_utc=%s; runtime_snapshot=%s' ,
415- self ._topic ._get_message_id (renewable ),
416- getattr (renewable , 'locked_until_utc' , None ),
417- self ._topic ._get_runtime_snapshot (),
418- )
419- return self ._topic .receiver .renew_message_lock (renewable )
420-
421306
422307def _run_callback_in_subprocess (message_payload , callbackfn , result_connection ):
423308 try :
0 commit comments