33"""
44
55# pylint: disable=redefined-outer-name
6- import asyncio
7- import contextlib
8- import enum
96import logging
10- import sys
11- import typing
127
138import pytest
149
15- from testsuite .utils import callinfo
16- from testsuite .utils import compat
17- from testsuite .utils import net as net_utils
10+ from testsuite import logcapture
11+ from testsuite .logcapture import __tracebackhide__ # noqa
1812
1913from ..utils import tskv
2014
2418logger = logging .getLogger (__name__ )
2519
2620
27- class BaseError (Exception ):
28- pass
29-
30-
31- class IncorrectUsageError (BaseError ):
32- pass
33-
34-
35- class ClientConnectTimeoutError (BaseError ):
36- pass
37-
38-
39- class LogLevel (enum .Enum ):
40- TRACE = 0
41- DEBUG = 1
42- INFO = 2
43- WARNING = 3
44- ERROR = 4
45- CRITICAL = 5
46- NONE = 6
47-
48- @classmethod
49- def from_string (cls , level : str ) -> 'LogLevel' :
50- return cls [level .upper ()]
51-
52-
53- class CapturedLogs :
54- def __init__ (self , * , log_level : str ) -> None :
55- self ._log_level = LogLevel .from_string (log_level )
56- self ._logs : typing .List [tskv .TskvRow ] = []
57- self ._subscribers : typing .List = []
58- self ._closed = False
59-
60- def close (self ):
61- self ._closed = True
62-
63- async def publish (self , row : tskv .TskvRow ) -> None :
64- self ._logs .append (row )
65- for query , callback in self ._subscribers :
66- if _match_entry (row , query ):
67- await callback (** row )
68-
69- def select (self , ** query ) -> typing .List [tskv .TskvRow ]:
70- if not self ._closed :
71- raise IncorrectUsageError (
72- 'select() is only supported for closed captures\n Please move select() after context manager body' ,
73- )
74- level = query .get ('level' )
75- if level :
76- log_level = LogLevel [level ]
77- if log_level .value < self ._log_level .value :
78- raise IncorrectUsageError (
79- f'Requested log level={ log_level .name } is lower than service log level { self ._log_level .name } ' ,
80- )
81- result = []
82- for row in self ._logs :
83- if _match_entry (row , query ):
84- result .append (row )
85- return result
86-
87- def subscribe (self , ** query ):
88- if self ._closed :
89- raise IncorrectUsageError (
90- 'subscribe() is not supported for closed captures\n Please move subscribe() into context manager body' ,
91- )
92-
93- def decorator (func ):
94- decorated = callinfo .acallqueue (func )
95- self ._subscribers .append ((query , decorated ))
96- return decorated
97-
98- return decorator
99-
100-
101- class CaptureControl :
102- def __init__ (self , * , log_level : str ):
103- self .default_log_level = log_level
104- self ._capture : typing .Optional [CapturedLogs ] = None
105- self ._tasks = []
106- self ._client_cond = asyncio .Condition ()
107-
108- async def wait_for_client (self , timeout : float = 10.0 ):
109- async def waiter ():
110- async with self ._client_cond :
111- await self ._client_cond .wait_for (lambda : self ._tasks )
112-
113- logger .debug ('Waiting for logcapture client to connect...' )
114- try :
115- await asyncio .wait_for (waiter (), timeout = timeout )
116- except TimeoutError :
117- raise ClientConnectTimeoutError (
118- 'Timedout while waiting for logcapture client to connect' ,
119- )
120-
121- @compat .asynccontextmanager
122- async def start_capture (
123- self ,
124- * ,
125- log_level : typing .Optional [str ] = None ,
126- timeout : float = 10.0 ,
127- ):
128- if self ._capture :
129- yield self ._capture
130- return
131-
132- if not log_level :
133- log_level = self .default_log_level
134-
135- self ._capture = CapturedLogs (log_level = log_level )
136- try :
137- yield self ._capture
138- finally :
139- self ._capture .close ()
140- self ._capture = None
141- if self ._tasks :
142- _ , pending = await asyncio .wait (self ._tasks , timeout = timeout )
143- self ._tasks = []
144- if pending :
145- raise RuntimeError (
146- 'Timedout while waiting for capture task to finish' ,
147- )
148-
149- @compat .asynccontextmanager
150- async def start_server (self , * , sock , loop = None ):
151- extra = {}
152- if sys .version_info < (3 , 8 ):
153- if loop is None :
154- loop = asyncio .get_running_loop ()
155- extra ['loop' ] = loop
156- server = await asyncio .start_server (
157- self ._handle_client ,
158- sock = sock ,
159- ** extra ,
160- )
161- try :
162- yield server
163- finally :
164- server .close ()
165- await server .wait_closed ()
166-
167- async def _handle_client (self , reader , writer ):
168- logger .debug ('logcapture client connected' )
169-
170- async def log_reader (capture : CapturedLogs ):
171- with contextlib .closing (writer ):
172- async for line in reader :
173- row = tskv .parse_line (line .decode ('utf-8' ))
174- await capture .publish (row )
175- await writer .wait_closed ()
176-
177- if not self ._capture :
178- writer .close ()
179- await writer .wait_closed ()
180- else :
181- self ._tasks .append (asyncio .create_task (log_reader (self ._capture )))
182- async with self ._client_cond :
183- self ._client_cond .notify_all ()
184-
185-
18621def pytest_addoption (parser ):
18722 group = parser .getgroup ('logs-capture' )
18823 group .addoption (
@@ -199,40 +34,27 @@ def pytest_addoption(parser):
19934
20035
20136@pytest .fixture (scope = 'session' )
202- def userver_log_capture (_userver_capture_control , _userver_capture_server ):
203- return _userver_capture_control
204-
205-
206- @pytest .fixture (scope = 'session' )
207- def _userver_capture_control (userver_log_level ):
208- return CaptureControl (log_level = userver_log_level )
209-
210-
211- @pytest .fixture (scope = 'session' )
212- def _userver_log_capture_socket (pytestconfig ):
37+ async def userver_log_capture (pytestconfig , userver_log_level ):
21338 host = pytestconfig .option .logs_capture_host
21439 port = pytestconfig .option .logs_capture_port
21540 if pytestconfig .option .service_wait or pytestconfig .option .service_disable :
21641 port = port or DEFAULT_PORT
217- with net_utils .bind_socket (host , port ) as socket :
218- yield socket
219-
22042
221- @pytest .fixture (scope = 'session' )
222- async def _userver_capture_server (
223- _userver_capture_control : CaptureControl ,
224- _userver_log_capture_socket ,
225- ):
226- async with _userver_capture_control .start_server (
227- sock = _userver_log_capture_socket ,
228- ) as server :
43+ server = logcapture .CaptureServer (
44+ log_level = logcapture .LogLevel .from_string (userver_log_level ),
45+ parse_line = _tskv_parse_line ,
46+ )
47+ async with server .start (host = host , port = port ):
22948 yield server
23049
23150
23251@pytest .fixture (scope = 'session' )
233- def _userver_config_logs_capture (_userver_log_capture_socket ):
52+ def _userver_config_logs_capture (userver_log_capture ):
53+ socknames = userver_log_capture .getsocknames ()
54+ assert socknames
55+ sockname = socknames [0 ]
56+
23457 def patch_config (config , _config_vars ) -> None :
235- sockname = _userver_log_capture_socket .getsockname ()
23658 logging_config = config ['components_manager' ]['components' ]['logging' ]
23759 default_logger = logging_config ['loggers' ]['default' ]
23860 # Other formats are not yet supported by log-capture.
@@ -245,12 +67,6 @@ def patch_config(config, _config_vars) -> None:
24567 return patch_config
24668
24769
248- def _match_entry (row : tskv .TskvRow , query ) -> bool :
249- for key , value in query .items ():
250- if row .get (key ) != value :
251- return False
252- return True
253-
254-
255- def __tracebackhide__ (excinfo ):
256- return excinfo .errisinstance (BaseError )
70+ def _tskv_parse_line (rawline : bytes ):
71+ line = rawline .decode ('utf-8' )
72+ return tskv .parse_line (line )
0 commit comments