-
Notifications
You must be signed in to change notification settings - Fork 90
Expand file tree
/
Copy pathbatcher.py
More file actions
119 lines (96 loc) · 4.29 KB
/
batcher.py
File metadata and controls
119 lines (96 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Copyright (c) 2023 EPAM Systems
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
"""This module contains a helper class to automate packaging separate Log entries into log batches."""
import logging
import threading
from typing import Any, Generic, Optional, TypeVar
from reportportal_client.core.rp_requests import AsyncRPRequestLog, RPRequestLog
from reportportal_client.logs import MAX_LOG_BATCH_PAYLOAD_SIZE, MAX_LOG_BATCH_SIZE
logger = logging.getLogger(__name__)
T_co = TypeVar("T_co", bound="RPRequestLog", covariant=True)
class LogBatcher(Generic[T_co]):
"""Log packaging class to automate compiling separate Log entries into log batches.
The class accepts the maximum number of log entries in desired batches and maximum batch size to conform
with maximum request size limits, configured on servers. The class implementation is thread-safe.
"""
entry_num: int
payload_limit: int
_lock: threading.Lock
_batch: list[T_co]
_payload_size: int
def __init__(self, entry_num=MAX_LOG_BATCH_SIZE, payload_limit=MAX_LOG_BATCH_PAYLOAD_SIZE) -> None:
"""Initialize the batcher instance with empty batch and specific limits.
:param entry_num: maximum numbers of entries in a Log batch
:param payload_limit: maximum batch size in bytes
"""
self.entry_num = entry_num
self.payload_limit = payload_limit
self._lock = threading.Lock()
self._batch = []
self._payload_size = 0
def _append(self, size: int, log_req: RPRequestLog) -> Optional[list[RPRequestLog]]:
with self._lock:
if self._payload_size + size >= self.payload_limit:
if len(self._batch) > 0:
batch = self._batch
self._batch = [log_req]
self._payload_size = size
return batch
self._batch.append(log_req)
self._payload_size += size
if len(self._batch) < self.entry_num:
return None
batch = self._batch
self._batch = []
self._payload_size = 0
return batch
def append(self, log_req: RPRequestLog) -> Optional[list[RPRequestLog]]:
"""Add a log request object to internal batch and return the batch if it's full.
:param log_req: log request object
:return: a batch or None
"""
return self._append(log_req.multipart_size, log_req)
async def append_async(self, log_req: AsyncRPRequestLog) -> Optional[list[AsyncRPRequestLog]]:
"""Add a log request object to internal batch and return the batch if it's full.
:param log_req: log request object
:return: a batch or None
"""
return self._append(await log_req.multipart_size, log_req)
def flush(self) -> Optional[list[T_co]]:
"""Immediately return everything what's left in the internal batch.
:return: a batch or None
"""
if len(self._batch) <= 0:
return None
with self._lock:
if len(self._batch) <= 0:
return None
batch = self._batch
self._batch = []
self._payload_size = 0
return batch
def __getstate__(self) -> dict[str, Any]:
"""Control object pickling and return object fields as Dictionary.
:return: object state dictionary
:rtype: dict
"""
state = self.__dict__.copy()
# Don't pickle 'session' field, since it contains unpickling 'socket'
del state["_lock"]
return state
def __setstate__(self, state: dict[str, Any]) -> None:
"""Control object pickling, receives object state as Dictionary.
:param dict state: object state dictionary
"""
self.__dict__.update(state)
self._lock = threading.Lock()