Skip to content

Commit d9cf4b8

Browse files
Copilotcgillum
andauthored
Fix connection contention by reusing aiohttp ClientSession (#592)
* Implement ClientSession reuse with proper lifecycle management --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: cgillum <2704139+cgillum@users.noreply.github.com> Co-authored-by: Chris Gillum <cgillum@microsoft.com> Co-authored-by: Chris Gillum <cgillum@gmail.com>
1 parent fd9d26b commit d9cf4b8

2 files changed

Lines changed: 388 additions & 9 deletions

File tree

azure/durable_functions/models/utils/http_utils.py

Lines changed: 101 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,82 @@
1-
from typing import Any, List, Union
1+
from typing import Any, List, Union, Optional
2+
import asyncio
23

34
import aiohttp
45

56

7+
# Global session and lock for thread-safe initialization
8+
_client_session: Optional[aiohttp.ClientSession] = None
9+
_session_lock: asyncio.Lock = asyncio.Lock()
10+
11+
12+
async def _get_session() -> aiohttp.ClientSession:
13+
"""Get or create the shared ClientSession.
14+
15+
Returns
16+
-------
17+
aiohttp.ClientSession
18+
The shared client session with configured timeout and connection pooling.
19+
"""
20+
global _client_session
21+
22+
# Double-check locking pattern for async
23+
if _client_session is None or _client_session.closed:
24+
async with _session_lock:
25+
# Check again after acquiring lock
26+
if _client_session is None or _client_session.closed:
27+
# Configure timeout optimized for localhost IPC
28+
timeout = aiohttp.ClientTimeout(
29+
total=240, # 4-minute total timeout for slow operations
30+
sock_connect=10, # Fast connection over localhost
31+
sock_read=None # Covered by total timeout
32+
)
33+
34+
# Configure TCP connector optimized for localhost IPC
35+
connector = aiohttp.TCPConnector(
36+
limit=30, # Maximum connections for single host
37+
limit_per_host=30, # Maximum connections per host
38+
enable_cleanup_closed=True # Enable cleanup of closed connections
39+
)
40+
41+
_client_session = aiohttp.ClientSession(
42+
timeout=timeout,
43+
connector=connector
44+
)
45+
46+
return _client_session
47+
48+
49+
async def _handle_request_error():
50+
"""Handle connection errors by closing and resetting the session.
51+
52+
This handles cases where the remote host process recycles.
53+
"""
54+
global _client_session
55+
async with _session_lock:
56+
if _client_session is not None and not _client_session.closed:
57+
try:
58+
await _client_session.close()
59+
finally:
60+
_client_session = None
61+
62+
63+
async def _close_session() -> None:
64+
"""Close the shared ClientSession if it exists.
65+
66+
Note: This function is currently only called by _handle_request_error().
67+
There is no worker shutdown hook available, but process shutdown will
68+
clean up all resources automatically.
69+
"""
70+
global _client_session
71+
72+
async with _session_lock:
73+
if _client_session is not None and not _client_session.closed:
74+
try:
75+
await _client_session.close()
76+
finally:
77+
_client_session = None
78+
79+
680
async def post_async_request(url: str,
781
data: Any = None,
882
trace_parent: str = None,
@@ -25,19 +99,25 @@ async def post_async_request(url: str,
2599
[int, Any]
26100
Tuple with the Response status code and the data returned from the request
27101
"""
28-
async with aiohttp.ClientSession() as session:
29-
headers = {}
30-
if trace_parent:
31-
headers["traceparent"] = trace_parent
32-
if trace_state:
33-
headers["tracestate"] = trace_state
102+
session = await _get_session()
103+
headers = {}
104+
if trace_parent:
105+
headers["traceparent"] = trace_parent
106+
if trace_state:
107+
headers["tracestate"] = trace_state
108+
109+
try:
34110
async with session.post(url, json=data, headers=headers) as response:
35111
# We disable aiohttp's input type validation
36112
# as the server may respond with alternative
37113
# data encodings. This is potentially unsafe.
38114
# More here: https://docs.aiohttp.org/en/stable/client_advanced.html
39115
data = await response.json(content_type=None)
40116
return [response.status, data]
117+
except (aiohttp.ClientError, asyncio.TimeoutError):
118+
# On connection errors, close and recreate session for next request
119+
await _handle_request_error()
120+
raise
41121

42122

43123
async def get_async_request(url: str) -> List[Any]:
@@ -53,12 +133,18 @@ async def get_async_request(url: str) -> List[Any]:
53133
[int, Any]
54134
Tuple with the Response status code and the data returned from the request
55135
"""
56-
async with aiohttp.ClientSession() as session:
136+
session = await _get_session()
137+
138+
try:
57139
async with session.get(url) as response:
58140
data = await response.json(content_type=None)
59141
if data is None:
60142
data = ""
61143
return [response.status, data]
144+
except (aiohttp.ClientError, asyncio.TimeoutError):
145+
# On connection errors, close and recreate session for next request
146+
await _handle_request_error()
147+
raise
62148

63149

64150
async def delete_async_request(url: str) -> List[Union[int, Any]]:
@@ -74,7 +160,13 @@ async def delete_async_request(url: str) -> List[Union[int, Any]]:
74160
[int, Any]
75161
Tuple with the Response status code and the data returned from the request
76162
"""
77-
async with aiohttp.ClientSession() as session:
163+
session = await _get_session()
164+
165+
try:
78166
async with session.delete(url) as response:
79167
data = await response.json(content_type=None)
80168
return [response.status, data]
169+
except (aiohttp.ClientError, asyncio.TimeoutError):
170+
# On connection errors, close and recreate session for next request
171+
await _handle_request_error()
172+
raise

0 commit comments

Comments
 (0)