-
-
Notifications
You must be signed in to change notification settings - Fork 59
Expand file tree
/
Copy pathutil.py
More file actions
209 lines (161 loc) · 7 KB
/
util.py
File metadata and controls
209 lines (161 loc) · 7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import logging
import os
import re
import sys
from datetime import datetime, timezone
from logging import Logger, StreamHandler
import json_logging
import inspect
def is_env_var_toggle(var_name):
enable_json_setting = str(os.getenv(var_name)).lower()
_env_toggle = enable_json_setting.lower() in ['true', '1', 'y', 'yes']
return _env_toggle
def get_library_logger(logger_name):
"""
:param logger_name: name
:return: logger
"""
# noinspection PyUnresolvedReferences
if logger_name in logging.Logger.manager.loggerDict:
return logging.getLogger(logger_name)
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# add stdout output in case parent have no handlers
if len(logger.parent.handlers) == 0:
logger.addHandler(StreamHandler(sys.stdout))
return logger
def update_formatter_for_loggers(loggers_iter, formatter):
"""
:param formatter:
:param loggers_iter:
"""
for logger in loggers_iter:
if not isinstance(logger, Logger):
raise RuntimeError("%s is not a logging.Logger instance", logger)
for handler in logger.handlers:
if not isinstance(handler.formatter, formatter):
handler.formatter = formatter()
# noinspection PyPep8
def parse_int(input_int, default):
# noinspection PyBroadException
try:
integer = int(input_int)
except:
integer = default
return integer
def validate_subclass(subclass, superclass):
"""
:param subclass
:param superclass
:return: bool
"""
if not issubclass(subclass, superclass):
raise RuntimeError(str(subclass) + ' is not a subclass of ' + str(superclass))
return True
_epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
def epoch_nano_second(datetime_):
return int((datetime_ - _epoch).total_seconds()) * 1000000000 + datetime_.microsecond * 1000
def iso_time_format(datetime_):
return '%04d-%02d-%02dT%02d:%02d:%02d.%03dZ' % (
datetime_.year, datetime_.month, datetime_.day, datetime_.hour, datetime_.minute, datetime_.second,
int(datetime_.microsecond / 1000))
if hasattr(sys, '_getframe'):
currentframe = lambda _no_of_go_up_level: sys._getframe(_no_of_go_up_level)
else: # pragma: no cover
# noinspection PyBroadException
def currentframe(_no_of_go_up_level):
"""Return the frame object for the caller's stack frame."""
try:
raise Exception
except Exception:
return sys.exc_info()[_no_of_go_up_level - 1].tb_frame.f_back
class RequestUtil:
"""
util for extract request's information
"""
def __new__(cls, *args, **kw):
# make this request util a singleton object
if not hasattr(cls, '_instance'):
request_info_extractor_class = kw['request_info_extractor_class']
response_info_extractor_class = kw['response_info_extractor_class']
validate_subclass(request_info_extractor_class, json_logging.BaseRequestInfoExtractor)
validate_subclass(response_info_extractor_class, json_logging.BaseResponseInfoExtractor)
cls._instance = object.__new__(cls)
cls._instance.request_info_extractor_class = request_info_extractor_class
cls._instance.response_info_extractor_class = response_info_extractor_class
cls._instance.request_adapter = request_info_extractor_class()
cls._instance.response_adapter = response_info_extractor_class()
cls._instance.is_support_global_request_object = request_info_extractor_class.support_global_request_object()
cls._instance.create_correlation_id_if_not_exists = json_logging.CREATE_CORRELATION_ID_IF_NOT_EXISTS
return cls._instance
def get_correlation_id(self, request=None, within_formatter=False):
"""
Gets the correlation id from the header of the request. \
It tries to search from json_logging.CORRELATION_ID_HEADERS list, one by one.\n
If found no value, new id will be generated by default.\n
:param request: request object
:return: correlation id string
"""
if request is None:
if self.is_support_global_request_object:
request = self.request_info_extractor_class.get_current_request()
else:
request = self.get_request_from_call_stack()
if request is None:
return json_logging.EMPTY_VALUE
correlation_id = self.request_adapter.get_correlation_id_in_request_context(request)
if correlation_id is not None:
return correlation_id
correlation_id = self._get_correlation_id_in_request_header(self.request_adapter, request)
if correlation_id is None and self.create_correlation_id_if_not_exists:
correlation_id = str(json_logging.CORRELATION_ID_GENERATOR())
self.request_adapter.set_correlation_id(request, correlation_id)
return correlation_id if correlation_id else json_logging.EMPTY_VALUE
def get_request_from_call_stack(self, within_formatter=False):
"""
:return: get request object from call stack
"""
"""
python 3 call stack frame
00 get_request_from_call_stack [util.py:225]
01 get_correlation_id [util.py:177]
02 format [__init__.py:333]
03 format [__init__.py:830]
04 emit [__init__.py:980]
05 handle [__init__.py:855]
06 callHandlers [__init__.py:1487]
07 handle [__init__.py:1425]
08 _log [__init__.py:1415]
09 info [__init__.py:1279]
10 logging statement
"""
module = inspect.getmodule(inspect.currentframe().f_back)
class_type = self.request_info_extractor_class.get_request_class_type()
no_of_go_up_level = 11 if within_formatter else 1
# FIXME: find out the depth of logging call stack in Python 2.7
f = currentframe(no_of_go_up_level)
while True:
f_locals = f.f_locals
if 'request' in f_locals:
if isinstance(f_locals['request'], class_type):
return f_locals['request']
if 'req' in f_locals:
if isinstance(f_locals['req'], class_type):
return f_locals['req']
for key in f_locals:
if key not in {'request', 'req'} and isinstance(f_locals[key], class_type):
return f_locals[key]
if f.f_back is not None:
f = f.f_back
else:
break
return None
@staticmethod
def _get_correlation_id_in_request_header(request_adapter, request):
for header in json_logging.CORRELATION_ID_HEADERS:
value = request_adapter.get_http_header(request, header)
if value is not None:
return value
return None
def is_not_match_any_pattern(path, patterns):
return all(map(lambda pattern: re.search(pattern, path) is None, patterns))