11import json
2-
32import gc
43import logging
4+ import multiprocessing as mp
55import os
66import time
7+ import traceback
78from ..config .config import TopicConfig
8- from ..resource_errors import ExceptionHandler
99from concurrent .futures import ThreadPoolExecutor
1010from .abstract .topic_abstract import TopicAbstract
1111from ..queue .models .queue_message import QueueMessage
1212from azure .servicebus import ServiceBusClient , ServiceBusMessage
1313from azure .servicebus import AutoLockRenewer
14- import concurrent .futures as cf
1514import threading
1615
1716try :
@@ -56,6 +55,9 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
5655 self .topic_name = topic_name
5756 self .publisher = self .client .get_topic_sender (topic_name = topic_name )
5857 self .executor = ThreadPoolExecutor (max_workers = max_concurrent_messages )
58+ self .callback_execution_mode = self ._get_callback_execution_mode ()
59+ self .callback_process_start_method = self ._get_process_start_method ()
60+ self .process_context = self ._get_process_context ()
5961 self .internal_count = 0
6062 self .max_renewal_duration = self ._get_positive_int_from_env (
6163 'TOPIC_MAX_LOCK_RENEWAL_DURATION_SECONDS' ,
@@ -81,9 +83,11 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
8183 self ._process = None
8284 self ._prime_runtime_samplers ()
8385 logger .info (
84- 'Configured lock renewal for topic %s: max_lock_renewal_duration =%s seconds , '
85- 'renew_margin=%s seconds, renewer_max_workers=%s' ,
86+ 'Configured AzureTopic %s: execution_mode =%s, process_start_method=%s , '
87+ 'max_lock_renewal_duration=%s seconds, renew_margin=%s seconds, renewer_max_workers=%s' ,
8688 self .topic_name ,
89+ self .callback_execution_mode ,
90+ self .callback_process_start_method ,
8791 self .max_renewal_duration ,
8892 self .lock_renewal_margin ,
8993 renewer_max_workers ,
@@ -129,13 +133,13 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1):
129133 with self .thread_lock :
130134 self .internal_count += len (messages )
131135 for message in messages :
136+ execution_task = self ._submit_processing_task (message , callback )
132137 self .lock_renewal .register (
133138 self .lock_renew_receiver ,
134139 message ,
135140 max_lock_renewal_duration = self .max_renewal_duration ,
136141 on_lock_renew_failure = self ._handle_lock_renew_failure ,
137142 )
138- execution_task = self .executor .submit (self .internal_callback , message , callback )
139143 self .pending_tasks .append ((execution_task , message ))
140144 else :
141145 if len (self .pending_tasks ) > 0 :
@@ -146,27 +150,64 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1):
146150 logger .error (f'Error in receiving messages: { e } ' )
147151
148152
149- def internal_callback (self , message , callbackfn ):
153+ def internal_callback (self , message_payload , callbackfn ):
150154 """
151155 Internal callback function that processes a message and invokes the callback function.
152156 Args:
153- message (ServiceBusMessage ): The message to process.
157+ message_payload (str ): The message payload to process.
154158 callbackfn (function): The callback function to invoke.
155159 Returns:
156- ServiceBusMessage : The processed message .
160+ dict : The callback status payload .
157161 """
158162 try :
159- queue_message = QueueMessage .data_from (str ( message ) )
163+ queue_message = QueueMessage .data_from (message_payload )
160164 callbackfn (queue_message )
161- return [ True ,message ]
165+ return { 'success' : True , 'error' : None }
162166 except Exception as e :
163- logger .error (f'Error in processing message: { e } ' )
164- return [False ,message ]
167+ return {
168+ 'success' : False ,
169+ 'error' : '' .join (traceback .format_exception (type (e ), e , e .__traceback__ )).strip (),
170+ }
165171
166172
167- def settle_message (self , x : cf . Future ):
173+ def settle_message (self , x ):
168174 return self ._settle_task (x )
169175
176+ def _submit_processing_task (self , message , callback ):
177+ message_payload = str (message )
178+ if self .callback_execution_mode == 'process' :
179+ try :
180+ return self ._submit_process_task (message_payload , callback )
181+ except Exception as exc :
182+ logger .warning (
183+ 'Falling back to thread execution for message %s because process start failed: %s' ,
184+ self ._get_message_id (message ),
185+ exc ,
186+ )
187+ return self ._submit_thread_task (message_payload , callback )
188+
189+ def _submit_thread_task (self , message_payload , callback ):
190+ future = self .executor .submit (self .internal_callback , message_payload , callback )
191+ return _FutureExecutionTask (future )
192+
193+ def _submit_process_task (self , message_payload , callback ):
194+ if self .process_context is None :
195+ raise RuntimeError ('Process execution mode is not available for this environment.' )
196+
197+ parent_connection , child_connection = self .process_context .Pipe (duplex = False )
198+ callback_process = self .process_context .Process (
199+ target = _run_callback_in_subprocess ,
200+ args = (message_payload , callback , child_connection ),
201+ )
202+ try :
203+ callback_process .start ()
204+ except Exception :
205+ parent_connection .close ()
206+ child_connection .close ()
207+ raise
208+ child_connection .close ()
209+ return _ProcessExecutionTask (callback_process , parent_connection )
210+
170211 def _get_receivable_count (self , max_receivable_messages = - 1 ):
171212 with self .thread_lock :
172213 available_slots = self .max_concurrent_messages - self .internal_count
@@ -178,8 +219,14 @@ def _get_receivable_count(self, max_receivable_messages=-1):
178219 def _wait_for_pending_tasks (self , timeout = 0.5 ):
179220 if len (self .pending_tasks ) == 0 :
180221 return
181- futures = [future for future , _ in self .pending_tasks ]
182- cf .wait (futures , timeout = timeout , return_when = cf .FIRST_COMPLETED )
222+ if timeout <= 0 :
223+ self ._settle_completed_tasks ()
224+ return
225+ deadline = time .time () + timeout
226+ while time .time () < deadline :
227+ if any (task .done () for task , _ in self .pending_tasks ):
228+ break
229+ time .sleep (min (0.1 , max (deadline - time .time (), 0 )))
183230 self ._settle_completed_tasks ()
184231
185232 def _settle_completed_tasks (self ):
@@ -191,16 +238,21 @@ def _settle_completed_tasks(self):
191238 remaining_tasks .append ((future , incoming_message ))
192239 self .pending_tasks = remaining_tasks
193240
194- def _settle_task (self , x : cf . Future , incoming_message = None ):
241+ def _settle_task (self , x , incoming_message = None ):
195242 """
196243 Sets the message as completed and updates the internal count.
197244 Args:
198- x (cf.Future) : The future object representing the message processing.
245+ x: The task object representing the message processing.
199246 """
200247 try :
201- [is_success , settled_message ] = x .result ()
202- if settled_message is not None :
203- incoming_message = settled_message
248+ task_result = x .result ()
249+ except Exception as e :
250+ task_result = {
251+ 'success' : False ,
252+ 'error' : f'Callback worker exited before returning a result: { e } ' ,
253+ }
254+
255+ try :
204256 if incoming_message is None :
205257 return
206258 if getattr (incoming_message , '_lock_expired' , False ):
@@ -210,11 +262,15 @@ def _settle_task(self, x: cf.Future, incoming_message=None):
210262 f'auto_renew_error={ getattr (incoming_message , "auto_renew_error" , None )} '
211263 )
212264 return
213- if is_success :
265+ if task_result . get ( 'success' ) :
214266 self .receiver .complete_message (incoming_message )
215267 else :
216- logger .info (f'Abandoning message: { incoming_message } ' )
217- self .receiver .abandon_message (incoming_message ) # send back to the topic
268+ logger .error (
269+ 'Processing failed for message %s: %s' ,
270+ self ._get_message_id (incoming_message ),
271+ task_result .get ('error' , 'unknown processing failure' ),
272+ )
273+ self .receiver .abandon_message (incoming_message )
218274 except Exception as e :
219275 logger .error (f'Error in settling message: { e } ' )
220276 finally :
@@ -309,6 +365,46 @@ def _get_positive_int_from_env(name, default):
309365 logger .warning (f'Invalid value for { name } : { value } . Using default { default } .' )
310366 return default
311367
368+ @staticmethod
369+ def _get_callback_execution_mode ():
370+ value = os .environ .get ('TOPIC_CALLBACK_EXECUTION_MODE' , 'process' )
371+ normalized = str (value ).strip ().lower ()
372+ if normalized in ('process' , 'thread' ):
373+ return normalized
374+ logger .warning (
375+ 'Invalid value for TOPIC_CALLBACK_EXECUTION_MODE: %s. Using default process.' ,
376+ value ,
377+ )
378+ return 'process'
379+
380+ @staticmethod
381+ def _get_process_start_method ():
382+ available_methods = mp .get_all_start_methods ()
383+ default_method = 'fork' if 'fork' in available_methods else mp .get_start_method () or available_methods [0 ]
384+ configured_method = os .environ .get ('TOPIC_CALLBACK_PROCESS_START_METHOD' , default_method )
385+ normalized_method = str (configured_method ).strip ().lower ()
386+ if normalized_method in available_methods :
387+ return normalized_method
388+ logger .warning (
389+ 'Invalid value for TOPIC_CALLBACK_PROCESS_START_METHOD: %s. Using default %s.' ,
390+ configured_method ,
391+ default_method ,
392+ )
393+ return default_method
394+
395+ def _get_process_context (self ):
396+ if self .callback_execution_mode != 'process' :
397+ return None
398+ try :
399+ return mp .get_context (self .callback_process_start_method )
400+ except ValueError :
401+ logger .warning (
402+ 'Process start method %s is unavailable. Falling back to thread execution.' ,
403+ self .callback_process_start_method ,
404+ )
405+ self .callback_execution_mode = 'thread'
406+ return None
407+
312408class _LockRenewLoggingReceiver :
313409 def __init__ (self , topic ):
314410 self ._topic = topic
@@ -321,3 +417,56 @@ def renew_message_lock(self, renewable):
321417 self ._topic ._get_runtime_snapshot (),
322418 )
323419 return self ._topic .receiver .renew_message_lock (renewable )
420+
421+
422+ def _run_callback_in_subprocess (message_payload , callbackfn , result_connection ):
423+ try :
424+ queue_message = QueueMessage .data_from (message_payload )
425+ callbackfn (queue_message )
426+ result_connection .send ({'success' : True , 'error' : None })
427+ except BaseException as exc : # pragma: no cover - exercised through the parent process wrapper
428+ result_connection .send ({
429+ 'success' : False ,
430+ 'error' : '' .join (traceback .format_exception (type (exc ), exc , exc .__traceback__ )).strip (),
431+ })
432+ finally :
433+ result_connection .close ()
434+
435+
436+ class _FutureExecutionTask :
437+ def __init__ (self , future ):
438+ self ._future = future
439+
440+ def done (self ):
441+ return self ._future .done ()
442+
443+ def result (self ):
444+ return self ._future .result ()
445+
446+
447+ class _ProcessExecutionTask :
448+ def __init__ (self , process , result_connection ):
449+ self ._process = process
450+ self ._result_connection = result_connection
451+ self ._result = None
452+
453+ def done (self ):
454+ return not self ._process .is_alive ()
455+
456+ def result (self ):
457+ if self ._result is not None :
458+ return self ._result
459+
460+ self ._process .join ()
461+ try :
462+ if self ._result_connection .poll ():
463+ self ._result = self ._result_connection .recv ()
464+ else :
465+ self ._result = {
466+ 'success' : False ,
467+ 'error' : f'Callback worker exited with code { self ._process .exitcode } without returning a result.' ,
468+ }
469+ finally :
470+ self ._result_connection .close ()
471+
472+ return self ._result
0 commit comments