Skip to content

Commit ff6ba2c

Browse files
committed
Fixed review comments
1 parent c40338b commit ff6ba2c

8 files changed

Lines changed: 78 additions & 30 deletions

File tree

src/example.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
core = Core(config='Local')
1414
print('Hello')
1515

16-
topic = 'gtfs-flex-upload'
16+
topic = 'gtfspathways'
1717
subscription = 'upload-validation-processor-test'
1818
some_other_sub = 'usdufs'
1919

@@ -46,11 +46,12 @@ def process(message):
4646
container = azure_client.get_container(container_name='gtfspathways')
4747

4848
list_of_files = container.list_files()
49-
for single in list_of_files:
50-
print(single.path)
51-
firstFile = list_of_files[0]
52-
# print(firstFile.name+'<><>')
53-
file_content = firstFile.get_body_text()
49+
if len(list_of_files) > 0:
50+
for single in list_of_files:
51+
print(single.path)
52+
firstFile = list_of_files[0]
53+
# print(firstFile.name+'<><>')
54+
file_content = firstFile.get_body_text()
5455

5556
# Creating a text stream
5657
txt = 'foo\nbar\nbaz'

src/python_ms_core/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from .core.logger.logger import Logger
23
from .core.logger.local_logger import LocalLogger
34
from .core.topic.topic import Topic
@@ -7,6 +8,8 @@
78
from .core.config.config import CoreConfig, LocalConfig
89

910
LOCAL_ENV = 'LOCAL'
11+
AZURE_ENV = 'AZURE'
12+
1013

1114
class Core:
1215
def __init__(self, config=None):
@@ -20,22 +23,28 @@ def get_logger(self):
2023
logger_config = self.config.logger()
2124
if logger_config.provider.upper() == LOCAL_ENV:
2225
return LocalLogger(config=logger_config)
23-
else:
26+
elif logger_config.provider.upper() == AZURE_ENV:
2427
return Logger(config=logger_config)
28+
else:
29+
logging.error(f'Unimplemented initialization for core {logger_config.provider}')
2530

2631
def get_topic(self, topic_name: str):
2732
topic_config = self.config.topic()
2833
if topic_config.provider.upper() == LOCAL_ENV:
2934
return LocalTopic(config=topic_config, topic_name=topic_name)
30-
else:
35+
elif topic_config.provider.upper() == AZURE_ENV:
3136
return Topic(config=topic_config, topic_name=topic_name)
37+
else:
38+
logging.error(f'Unimplemented initialization for core {topic_config.provider}')
3239

3340
def get_storage_client(self):
3441
storage_config = self.config.storage()
3542
if storage_config.provider.upper() == LOCAL_ENV:
3643
return LocalStorageClient(storage_config)
37-
else:
44+
elif storage_config.provider.upper() == AZURE_ENV:
3845
return AzureStorageClient(storage_config)
46+
else:
47+
logging.error(f'Unimplemented initialization for core {storage_config.provider}')
3948

4049
def __check_health(self):
4150
print('\x1b[32m ------------------------- \x1b[0m')

src/python_ms_core/core/config/config.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,30 +67,34 @@ def storage(self):
6767

6868
class LocalConfig:
6969
def __init__(self):
70-
self.provider = 'local'
70+
self.provider = os.environ.get('PROVIDER', 'local')
71+
self.queue_connection = os.environ.get('QUEUECONNECTION', 'amqp://guest:guest@localhost:5672')
72+
self.queue_name = os.environ.get('LOGGERQUEUE', 'http://localhost:8100')
73+
self.topic_connection = os.environ.get('QUEUECONNECTION', 'amqp://guest:guest@localhost:5672')
74+
self.storage_connection = os.environ.get('STORAGECONNECTION', 'http://localhost:8100')
7175

7276
def logger(self):
7377
return LogerConfig(
7478
provider=self.provider,
75-
con_string='amqp://guest:guest@rabbitmq:5672/',
76-
queue_name='http://localhost:8100'
79+
con_string=self.queue_connection,
80+
queue_name=self.queue_name
7781
)
7882

7983
def queue(self):
8084
return QueueConfig(
8185
provider=self.provider,
82-
con_string='amqp://guest:guest@rabbitmq:5672/',
83-
queue_name='http://localhost:8100'
86+
con_string=self.queue_connection,
87+
queue_name=self.queue_name
8488
)
8589

8690
def topic(self):
8791
return TopicConfig(
8892
provider=self.provider,
89-
con_string='amqp://guest:guest@localhost:5672'
93+
con_string=self.topic_connection
9094
)
9195

9296
def storage(self):
9397
return StorageConfig(
9498
provider=self.provider,
95-
con_string='http://localhost:8100'
99+
con_string=self.storage_connection
96100
)
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,49 @@
11
from .abstracts.logger_abstract import LoggerAbstract
22
from ..queue.models.queue_message import QueueMessage
3-
from ..queue.queue import Queue
3+
from ..queue.local_queue import LocalQueue
44

55

66
class LocalLogger(LoggerAbstract):
77

88
def __init__(self, config=None):
99
super().__init__(config=config)
10-
self.queue_client = Queue(config)
10+
self.queue_client = LocalQueue(config)
1111

1212
def add_request(self, request_data):
1313
message = QueueMessage.data_from({
1414
'message': 'Add Request',
1515
'messageType': 'addRequest',
1616
'data': request_data
1717
})
18-
self.queue_client.send_local(message)
18+
self.queue_client.send(message)
1919

2020
def info(self, message: str):
2121
msg = QueueMessage.data_from({
2222
'message': message,
2323
'messageType': 'info',
2424
})
25-
self.queue_client.send_local(msg)
25+
self.queue_client.send(msg)
2626

2727
def debug(self, message: str):
2828
msg = QueueMessage.data_from({
2929
'message': message,
3030
'messageType': 'debug',
3131
})
32-
self.queue_client.send_local(msg)
32+
self.queue_client.send(msg)
3333

3434
def warn(self, message: str):
3535
msg = QueueMessage.data_from({
3636
'message': message,
3737
'messageType': 'warn',
3838
})
39-
self.queue_client.send_local(msg)
39+
self.queue_client.send(msg)
4040

4141
def error(self, message: str):
4242
msg = QueueMessage.data_from({
4343
'message': message,
4444
'messageType': 'error',
4545
})
46-
self.queue_client.send_local(msg)
46+
self.queue_client.send(msg)
4747

4848
def record_metric(self, name: str, value: str):
4949
message = QueueMessage.data_from({
@@ -54,4 +54,4 @@ def record_metric(self, name: str, value: str):
5454
'value': value
5555
}
5656
})
57-
self.queue_client.send_local(message)
57+
self.queue_client.send(message)

src/python_ms_core/core/queue/abstracts/queue_abstract.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ def __init__(self, config): pass
88
@abstractmethod
99
def send(self, data=None): pass
1010

11-
@abstractmethod
12-
def send_local(self, data=None): pass
13-
1411
@abstractmethod
1512
def add(self, data): pass
1613

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import json
2+
import requests
3+
from .config.queue_config import Config
4+
from .models.queue_message import QueueMessage
5+
from .abstracts.queue_abstract import QueueAbstract
6+
from ..resource_errors import ExceptionHandler
7+
8+
9+
class LocalQueue(QueueAbstract):
10+
queue = list()
11+
12+
def __init__(self, config):
13+
self.provider = Config(config=config)
14+
15+
@ExceptionHandler.decorated
16+
def send(self, data=None):
17+
if data:
18+
message = QueueMessage.to_dict(data)
19+
url = f'{self.provider.queue_name}/log'
20+
try:
21+
resp = requests.post(url, json=message)
22+
print(resp.status_code)
23+
except Exception as e:
24+
print(e)
25+
print(message)
26+
27+
self.queue = list()
28+
29+
def add(self, data):
30+
if data is not None:
31+
self.queue.insert(0, json.dumps(data))
32+
33+
def remove(self):
34+
if len(self.queue) > 0:
35+
self.queue.pop()
36+
37+
def get_items(self):
38+
return self.queue

src/python_ms_core/core/storage/providers/local/local_file_entity.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def upload(self, upload_stream):
2929
upload_path = f'{self.path}/{self.name}'
3030
upload_relative_path = f'{self.config.connection_string}{self.upload_path}{upload_path}'
3131
requests.post(upload_relative_path, files={'uploadFile': upload_stream})
32-
self._get_remote_url = f'{upload_relative_path}/uploadFile'
32+
self._get_remote_url = upload_relative_path
3333

3434
@ExceptionHandler.decorated
3535
def get_remote_url(self):

src/python_ms_core/core/topic/local_topic.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import json
2-
31
import logging
42
import threading
53
import time
@@ -28,6 +26,7 @@ def __init__(self, config=None, topic_name=None):
2826
self.topic = topic_name
2927
self.config = config
3028
self.client = Config(config=config, topic_name=topic_name)
29+
print(self.client.connection_string)
3130
params = pika.URLParameters(self.client.connection_string)
3231
self.connection = pika.BlockingConnection(params)
3332
self.channel = self.connection.channel()

0 commit comments

Comments
 (0)