|
1 | 1 | import json |
2 | 2 | import logging |
3 | | -import threading |
4 | 3 | import time |
5 | 4 | from .config.topic_config import Config |
6 | | -from .abstract.topic_abstract import TopicAbstract |
7 | 5 | from ..resource_errors import ExceptionHandler |
| 6 | +from concurrent.futures import ThreadPoolExecutor |
| 7 | +from .abstract.topic_abstract import TopicAbstract |
8 | 8 | from ..queue.models.queue_message import QueueMessage |
9 | 9 |
|
10 | | -logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',datefmt='%Y-%m-%d %H:%M:%S') |
| 10 | +logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') |
11 | 11 | logger = logging.getLogger('Topic') |
12 | 12 | logger.setLevel(logging.INFO) |
13 | 13 |
|
14 | 14 |
|
15 | 15 | class Callback: |
16 | | - def __init__(self, fn=None): |
| 16 | + def __init__(self, fn=None, max_concurrent_messages=1): |
17 | 17 | self._function_to_call = fn |
| 18 | + self._max_concurrent_messages = max_concurrent_messages |
| 19 | + self._renewal_interval = 30 # seconds |
| 20 | + self.message_processing = 0 |
| 21 | + self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages) |
| 22 | + |
| 23 | + def _renew_message_lock(self, message, receiver): |
| 24 | + while True: |
| 25 | + try: |
| 26 | + time.sleep(self._renewal_interval) |
| 27 | + if not message._lock_expired: |
| 28 | + receiver.renew_message_lock(message) |
| 29 | + except Exception as e: |
| 30 | + break |
18 | 31 |
|
19 | | - # old method to fetch messages. Not used anymore |
20 | | - def messages(self, provider, topic, subscription): |
21 | | - with provider.client: |
22 | | - topic_receiver = provider.client.get_subscription_receiver(topic, subscription_name=subscription) |
23 | | - logger.info(f'Started receiver for {subscription}') |
24 | | - with topic_receiver: |
25 | | - for message in topic_receiver: |
26 | | - try: |
27 | | - queue_message = QueueMessage.data_from(str(message)) |
28 | | - self._function_to_call(queue_message) |
29 | | - except Exception as e: |
30 | | - print(f'Error: {e}, Invalid message received: {message}') |
31 | | - finally: |
32 | | - topic_receiver.complete_message(message) |
33 | | - logger.info('Completed topic receiver') |
34 | | - |
35 | 32 | # Sends data to the callback function |
36 | | - def process_message(self, message:str): |
37 | | - queue_message = QueueMessage.data_from(message) |
| 33 | + def process_message(self, message, receiver): |
| 34 | + queue_message = QueueMessage.data_from(str(message)) |
| 35 | + secondary_executor = ThreadPoolExecutor(max_workers=1) |
| 36 | + if not message._lock_expired: |
| 37 | + secondary_executor.submit(self._renew_message_lock, message, receiver) |
38 | 38 | self._function_to_call(queue_message) |
39 | | - |
| 39 | + |
40 | 40 | # Starts listening to the messages |
41 | 41 | def start_listening(self, provider, topic, subscription): |
42 | | - with provider.client: # service bus client |
| 42 | + with provider.client: # service bus client |
43 | 43 | logger.info('Initiating receiver') |
44 | | - topic_receiver = provider.client.get_subscription_receiver(topic, subscription_name=subscription) # servicebusclientsubscriptionreceiver |
45 | | - logger.info('Done') |
| 44 | + topic_receiver = provider.client.get_subscription_receiver( |
| 45 | + topic_name=topic, |
| 46 | + subscription_name=subscription |
| 47 | + ) |
| 48 | + logger.info('Receiver started') |
46 | 49 | with topic_receiver: |
47 | 50 | while True: |
48 | | - try: |
49 | | - for message in topic_receiver: |
50 | | - try: |
51 | | - self.process_message(message=str(message)) # sync call. [By default 1minute ] -> lock renewal for 300 seconds |
52 | | - except Exception as e: |
53 | | - print(f'Error : {e}, Invalid message received : {message}') |
54 | | - finally: |
55 | | - topic_receiver.complete_message(message) |
56 | | - except Exception as et: |
57 | | - print(f'Error in service bus connection : {et}') |
58 | | - # Change mode from PEEK_LOCK to RECEIVE_AND_DELETE |
59 | | - logger.info('Topic receiver invalidated') |
| 51 | + available_slots = self._max_concurrent_messages - self.message_processing |
| 52 | + if available_slots > 0: |
| 53 | + try: |
| 54 | + messages = topic_receiver.receive_messages( |
| 55 | + max_message_count=available_slots, |
| 56 | + max_wait_time=5 |
| 57 | + ) |
| 58 | + if not messages: |
| 59 | + continue |
| 60 | + for message in messages: |
| 61 | + self.message_processing += 1 |
| 62 | + self.executor.submit(self._process_message_in_thread, message, topic_receiver) |
| 63 | + except Exception as et: |
| 64 | + logger.error(f'Error in service bus connection: {et}') |
| 65 | + else: |
| 66 | + time.sleep(10) # Short sleep to prevent tight loop if no slots available |
| 67 | + |
| 68 | + logger.info('Receiver stopped') |
| 69 | + |
| 70 | + def _process_message_in_thread(self, message, topic_receiver): |
| 71 | + try: |
| 72 | + self.process_message(message=message, receiver=topic_receiver) |
| 73 | + except Exception as e: |
| 74 | + logger.error(f'Error: {e}, Invalid message received: {message}') |
| 75 | + finally: |
| 76 | + try: |
| 77 | + topic_receiver.complete_message(message) # Mark the message as complete |
| 78 | + except Exception as err: |
| 79 | + logger.error(f'Error completing the message: {err}') |
| 80 | + self.message_processing -= 1 |
60 | 81 |
|
61 | 82 |
|
62 | 83 | class Topic(TopicAbstract): |
63 | | - def __init__(self, config=None, topic_name=None): |
| 84 | + def __init__(self, config=None, topic_name=None, max_concurrent_messages=1): |
64 | 85 | self.topic = topic_name |
65 | 86 | self.provider = Config(config=config, topic_name=topic_name) |
| 87 | + self.max_concurrent_messages = max_concurrent_messages |
66 | 88 |
|
67 | 89 | @ExceptionHandler.decorated |
68 | 90 | def subscribe(self, subscription=None, callback=None): |
69 | 91 | if subscription is not None: |
70 | | - cb = Callback(callback) |
71 | | - thread = threading.Thread(target=cb.start_listening, args=(self.provider, self.topic, subscription)) |
72 | | - thread.start() |
73 | | - time.sleep(5) |
| 92 | + cb = Callback(callback, max_concurrent_messages=self.max_concurrent_messages) |
| 93 | + cb.start_listening(self.provider, self.topic, subscription) |
74 | 94 | else: |
75 | 95 | logging.error( |
76 | 96 | f'Unimplemented initialize for core {self.provider.provider}, Subscription name is required!') |
|
0 commit comments