Skip to content

Commit 1dc8736

Browse files
committed
minor changes
minor changes
1 parent c1d55c2 commit 1dc8736

3 files changed

Lines changed: 94 additions & 55 deletions

File tree

src/example.py

Lines changed: 54 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
from python_ms_core.core.queue.models.queue_message import QueueMessage
1212
from python_ms_core.core.auth.models.permission_request import PermissionRequest
1313

14-
core = Core(config='Local')
14+
core = Core()
1515
print('Hello')
1616

17-
topic = 'gtfspathways'
18-
subscription = 'upload-validation-processor-test'
17+
topic = 'gtfs-pathways-upload'
18+
subscription = 'log'
1919
some_other_sub = 'usdufs'
2020

2121

@@ -32,6 +32,8 @@ def publish_messages(topic_name):
3232
def subscribe(topic_name, subscription_name):
3333
def process(message):
3434
print(f'Message Received: {message}')
35+
# Spawn and thread process it -> 1 hr no issues
36+
# return
3537

3638
topic_object = core.get_topic(topic_name=topic_name)
3739
try:
@@ -42,52 +44,52 @@ def process(message):
4244

4345
subscribe(topic, subscription)
4446

45-
azure_client = core.get_storage_client()
46-
47-
container = azure_client.get_container(container_name='gtfspathways')
48-
49-
list_of_files = container.list_files()
50-
if len(list_of_files) > 0:
51-
for single in list_of_files:
52-
print(single.path)
53-
firstFile = list_of_files[0]
54-
# print(firstFile.name+'<><>')
55-
file_content = firstFile.get_body_text()
56-
57-
# Creating a text stream
58-
txt = 'foo\nbar\nbaz'
59-
file_like_io = StringIO(txt)
60-
basename = 'sample-file'
61-
suffix = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
62-
filename = '_'.join([basename, suffix])
63-
try:
64-
test_file = container.create_file(f'{filename}.txt')
65-
except Exception as e:
66-
print(e)
67-
print('Start uploading...')
68-
test_file.upload(file_like_io.read())
69-
print(test_file.get_remote_url())
70-
print('Uploaded Successfully')
71-
72-
logger = core.get_logger()
73-
logger.record_metric(name='test', value='test')
74-
75-
publish_messages(topic)
76-
time.sleep(2)
77-
78-
permission_params = PermissionRequest(
79-
user_id='7961d767-a352-464f-95b6-cd1c5189a93c',
80-
org_id='5e339544-3b12-40a5-8acd-78c66d1fa981',
81-
should_satisfy_all=False,
82-
permissions=['poc']
83-
)
84-
85-
try:
86-
auth = core.get_authorizer()
87-
resp = auth.has_permission(request_params=permission_params)
88-
print(resp)
89-
except Exception as e:
90-
print(f'Request failed with Code: {e.status_code}, Message: {e.message}')
91-
print()
92-
93-
os._exit(os.EX_OK)
47+
# azure_client = core.get_storage_client()
48+
49+
# container = azure_client.get_container(container_name='gtfspathways')
50+
51+
# list_of_files = container.list_files()
52+
# if len(list_of_files) > 0:
53+
# for single in list_of_files:
54+
# print(single.path)
55+
# firstFile = list_of_files[0]
56+
# # print(firstFile.name+'<><>')
57+
# file_content = firstFile.get_body_text()
58+
59+
# # Creating a text stream
60+
# txt = 'foo\nbar\nbaz'
61+
# file_like_io = StringIO(txt)
62+
# basename = 'sample-file'
63+
# suffix = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
64+
# filename = '_'.join([basename, suffix])
65+
# try:
66+
# test_file = container.create_file(f'{filename}.txt')
67+
# except Exception as e:
68+
# print(e)
69+
# print('Start uploading...')
70+
# test_file.upload(file_like_io.read())
71+
# print(test_file.get_remote_url())
72+
# print('Uploaded Successfully')
73+
74+
# logger = core.get_logger()
75+
# logger.record_metric(name='test', value='test')
76+
77+
# publish_messages(topic)
78+
# time.sleep(2)
79+
80+
# permission_params = PermissionRequest(
81+
# user_id='7961d767-a352-464f-95b6-cd1c5189a93c',
82+
# org_id='5e339544-3b12-40a5-8acd-78c66d1fa981',
83+
# should_satisfy_all=False,
84+
# permissions=['poc']
85+
# )
86+
87+
# try:
88+
# auth = core.get_authorizer()
89+
# resp = auth.has_permission(request_params=permission_params)
90+
# print(resp)
91+
# except Exception as e:
92+
# print(f'Request failed with Code: {e.status_code}, Message: {e.message}')
93+
# print()
94+
95+
# os._exit(os.EX_OK)
Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,24 @@
11
from azure.servicebus import ServiceBusClient, ServiceBusMessage
2-
2+
import logging
33

44
class Config:
55
def __init__(self, config=None, topic_name=None):
66
self.topic = topic_name
77
self.provider = config.provider
88
self.connection_string = config.connection_string
99
if self.provider.lower() == 'azure':
10+
# The logging levels below may need to be changed based on the logging that you want to suppress.
11+
uamqp_logger = logging.getLogger('uamqp')
12+
uamqp_logger.setLevel(logging.ERROR)
13+
14+
# or even further fine-grained control, suppressing the warnings in uamqp.connection module
15+
uamqp_connection_logger = logging.getLogger('uamqp.connection')
16+
uamqp_connection_logger.setLevel(logging.ERROR)
1017
self.client = ServiceBusClient.from_connection_string(conn_str=self.connection_string, retry_total=10, retry_backoff_factor=1, retry_backoff_max=30)
1118
self.sender = ServiceBusMessage
19+
# receiver = self.client.get_subscription_receiver('','')
20+
# receiver.receive_messages(1,50)
21+
# with receiver:
22+
# for message in receiver:
23+
# print('')
24+
# self.client.get_subscription_receiver()

src/python_ms_core/core/topic/topic.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,19 @@
77
from ..resource_errors import ExceptionHandler
88
from ..queue.models.queue_message import QueueMessage
99

10+
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',datefmt='%Y-%m-%d %H:%M:%S')
11+
logger = logging.getLogger('Topic')
12+
logger.setLevel(logging.INFO)
13+
1014

1115
class Callback:
1216
def __init__(self, fn=None):
1317
self._function_to_call = fn
1418

1519
def messages(self, provider, topic, subscription):
1620
with provider.client:
17-
topic_receiver = provider.client.get_subscription_receiver(topic, subscription_name=subscription)
21+
topic_receiver = provider.client.get_subscription_receiver(topic, subscription_name=subscription,max_wait_time=200)
22+
logger.info(f'Started receiver for {subscription}')
1823
with topic_receiver:
1924
for message in topic_receiver:
2025
try:
@@ -23,7 +28,26 @@ def messages(self, provider, topic, subscription):
2328
except Exception as e:
2429
print(f'Error: {e}, Invalid message received: {message}')
2530
finally:
31+
print(f'Completing message')
2632
topic_receiver.complete_message(message)
33+
logger.info(f'Completed gathering messages')
34+
logger.info('Completed topic receiver')
35+
36+
def process_message(self, message:str):
37+
queue_message = QueueMessage.data_from(message)
38+
self._function_to_call(queue_message)
39+
40+
def start_listening(self, provider, topic, subscription):
41+
with provider.client: # service bus client
42+
while True:
43+
logger.info('Going into while')
44+
topic_receiver = provider.client.get_subscription_receiver(topic, subscription_name=subscription) # servicebusclientsubscriptionreceiver
45+
with topic_receiver:
46+
for message in topic_receiver:
47+
self.process_message(message=str(message)) # sync call. [By default 1minute ] -> lock renewal for 300 seconds
48+
topic_receiver.complete_message(message) # fails -> peeklock is timedout
49+
# Change mode from PEEK_LOCK to RECEIVE_AND_DELETE
50+
logger.info('Completed topic receiver')
2751

2852

2953
class Topic(TopicAbstract):
@@ -35,7 +59,7 @@ def __init__(self, config=None, topic_name=None):
3559
def subscribe(self, subscription=None, callback=None):
3660
if subscription is not None:
3761
cb = Callback(callback)
38-
thread = threading.Thread(target=cb.messages, args=(self.provider, self.topic, subscription))
62+
thread = threading.Thread(target=cb.start_listening, args=(self.provider, self.topic, subscription))
3963
thread.start()
4064
time.sleep(5)
4165
else:

0 commit comments

Comments
 (0)