66
77
88class Topic :
9- def __init__ (self , config = None , topic_name = None ):
9+ def __init__ (self , config = None , topic_name = None , callback = None ):
1010 self .topic = topic_name
11+ self ._callback = callback
12+ self ._messages = []
1113 if config .provider == 'Azure' :
1214 try :
1315 self .azure = AzureServiceBusTopic (topic_name = topic_name )
@@ -19,16 +21,22 @@ def __init__(self, config=None, topic_name=None):
1921 @ExceptionHandler .decorated
2022 def subscribe (self , subscription = None ):
2123 if subscription is not None :
22- messages = []
23- with self .azure .client .get_subscription_receiver (topic_name = self .azure .topic , subscription_name = subscription ) as receiver :
24- received_msgs = receiver .receive_messages (max_message_count = 10 , max_wait_time = 5 )
25- for message in received_msgs :
26- queue_message = QueueMessage .data_from (str (message ))
27- messages .append (queue_message )
28- receiver .complete_message (message )
29- return messages
24+ with self .azure .client :
25+ topic_receiver = self .azure .client .get_subscription_receiver (self .azure .topic , subscription_name = subscription )
26+ with topic_receiver :
27+ for message in topic_receiver :
28+ queue_message = QueueMessage .data_from (str (message ))
29+ self ._messages = queue_message
30+ if self ._callback and callable (self ._callback ):
31+ self ._callback (self )
32+ topic_receiver .complete_message (message )
33+
3034 else :
31- logging .error (f'Unimplemented initialization for core { self .config .provider } , Subscription name is required!' )
35+ logging .error (
36+ f'Unimplemented initialization for core { self .config .provider } , Subscription name is required!' )
37+
38+ def get_messages (self ):
39+ return self ._messages
3240
3341 @ExceptionHandler .decorated
3442 def publish (self , data = None ):
0 commit comments