@@ -11,16 +11,18 @@ class Callback:
1111 def __init__ (self , fn = None ):
1212 self ._function_to_call = fn
1313
14- def messages (self , client ):
15- for message in client :
16- try :
17- queue_message = QueueMessage .data_from (str (message ))
18- self ._function_to_call (queue_message )
19- except Exception as e :
20- print (f'Error: { e } , Invalid message received: { message } ' )
21- finally :
22- client .complete_message (message )
23-
14+ def messages (self , provider , topic , subscription ):
15+ with provider .client :
16+ topic_receiver = provider .client .get_subscription_receiver (topic , subscription_name = subscription )
17+ with topic_receiver :
18+ for message in topic_receiver :
19+ try :
20+ queue_message = QueueMessage .data_from (str (message ))
21+ self ._function_to_call (queue_message )
22+ except Exception as e :
23+ print (f'Error: { e } , Invalid message received: { message } ' )
24+ finally :
25+ topic_receiver .complete_message (message )
2426
2527class Topic :
2628 def __init__ (self , config = None , topic_name = None ):
@@ -37,14 +39,9 @@ def __init__(self, config=None, topic_name=None):
3739 def subscribe (self , subscription = None , callback = None ):
3840 if subscription is not None :
3941 cb = Callback (callback )
40- with self .provider .client :
41- topic_receiver = self .provider .client .get_subscription_receiver (self .provider .topic ,
42- subscription_name = subscription )
43- with topic_receiver :
44- if callback and callable (callback ):
45- thread = threading .Thread (target = cb .messages , args = (topic_receiver ,))
46- thread .start ()
47- time .sleep (5 )
42+ thread = threading .Thread (target = cb .messages , args = (self .provider ,self .topic , subscription ))
43+ thread .start ()
44+ time .sleep (5 )
4845 else :
4946 logging .error (
5047 f'Unimplemented initialization for core { self .provider .provider } , Subscription name is required!' )
0 commit comments