-
Notifications
You must be signed in to change notification settings - Fork 66
Expand file tree
/
Copy pathhttp_utils.py
More file actions
172 lines (140 loc) · 5.42 KB
/
http_utils.py
File metadata and controls
172 lines (140 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from typing import Any, List, Union, Optional
import asyncio
import aiohttp
# Global session and lock for thread-safe initialization
_client_session: Optional[aiohttp.ClientSession] = None
_session_lock: asyncio.Lock = asyncio.Lock()
async def _get_session() -> aiohttp.ClientSession:
"""Get or create the shared ClientSession.
Returns
-------
aiohttp.ClientSession
The shared client session with configured timeout and connection pooling.
"""
global _client_session
# Double-check locking pattern for async
if _client_session is None or _client_session.closed:
async with _session_lock:
# Check again after acquiring lock
if _client_session is None or _client_session.closed:
# Configure timeout optimized for localhost IPC
timeout = aiohttp.ClientTimeout(
total=240, # 4-minute total timeout for slow operations
sock_connect=10, # Fast connection over localhost
sock_read=None # Covered by total timeout
)
# Configure TCP connector optimized for localhost IPC
connector = aiohttp.TCPConnector(
limit=30, # Maximum connections for single host
limit_per_host=30, # Maximum connections per host
enable_cleanup_closed=True # Enable cleanup of closed connections
)
_client_session = aiohttp.ClientSession(
timeout=timeout,
connector=connector
)
return _client_session
async def _handle_request_error():
"""Handle connection errors by closing and resetting the session.
This handles cases where the remote host process recycles.
"""
global _client_session
async with _session_lock:
if _client_session is not None and not _client_session.closed:
try:
await _client_session.close()
finally:
_client_session = None
async def _close_session() -> None:
"""Close the shared ClientSession if it exists.
Note: This function is currently only called by _handle_request_error().
There is no worker shutdown hook available, but process shutdown will
clean up all resources automatically.
"""
global _client_session
async with _session_lock:
if _client_session is not None and not _client_session.closed:
try:
await _client_session.close()
finally:
_client_session = None
async def post_async_request(url: str,
data: Any = None,
trace_parent: str = None,
trace_state: str = None) -> List[Union[int, Any]]:
"""Post request with the data provided to the url provided.
Parameters
----------
url: str
url to make the post to
data: Any
object to post
trace_parent: str
traceparent header to send with the request
trace_state: str
tracestate header to send with the request
Returns
-------
[int, Any]
Tuple with the Response status code and the data returned from the request
"""
session = await _get_session()
headers = {}
if trace_parent:
headers["traceparent"] = trace_parent
if trace_state:
headers["tracestate"] = trace_state
try:
async with session.post(url, json=data, headers=headers) as response:
# We disable aiohttp's input type validation
# as the server may respond with alternative
# data encodings. This is potentially unsafe.
# More here: https://docs.aiohttp.org/en/stable/client_advanced.html
data = await response.json(content_type=None)
return [response.status, data]
except (aiohttp.ClientError, asyncio.TimeoutError):
# On connection errors, close and recreate session for next request
await _handle_request_error()
raise
async def get_async_request(url: str) -> List[Any]:
"""Get the data from the url provided.
Parameters
----------
url: str
url to get the data from
Returns
-------
[int, Any]
Tuple with the Response status code and the data returned from the request
"""
session = await _get_session()
try:
async with session.get(url) as response:
data = await response.json(content_type=None)
if data is None:
data = ""
return [response.status, data]
except (aiohttp.ClientError, asyncio.TimeoutError):
# On connection errors, close and recreate session for next request
await _handle_request_error()
raise
async def delete_async_request(url: str) -> List[Union[int, Any]]:
"""Delete the data from the url provided.
Parameters
----------
url: str
url to delete the data from
Returns
-------
[int, Any]
Tuple with the Response status code and the data returned from the request
"""
session = await _get_session()
try:
async with session.delete(url) as response:
data = await response.json(content_type=None)
return [response.status, data]
except (aiohttp.ClientError, asyncio.TimeoutError):
# On connection errors, close and recreate session for next request
await _handle_request_error()
raise