Skip to content

Commit 93c4658

Browse files
committed
scope pooled clients per event loop
1 parent 82fefc6 commit 93c4658

1 file changed

Lines changed: 71 additions & 25 deletions

File tree

src/bubble_data_api_client/pool.py

Lines changed: 71 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import atexit
55
import threading
6+
import weakref
67
from collections.abc import AsyncIterator
78
from contextlib import asynccontextmanager
89

@@ -12,47 +13,86 @@
1213
from bubble_data_api_client.exceptions import ConfigurationError
1314
from bubble_data_api_client.http_client import httpx_client_factory
1415

15-
# global client pool keyed by config
16-
_clients: dict[tuple[str, str], httpx.AsyncClient] = {}
16+
# type aliases
17+
_ConfigKey = tuple[str, str]
18+
_LoopClientMap = weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, httpx.AsyncClient]
19+
20+
# global client pool: config_key → { loop → client }
21+
# WeakKeyDictionary auto-removes entries when the loop is garbage collected
22+
_clients: dict[_ConfigKey, _LoopClientMap] = {}
1723
_lock = threading.Lock()
1824

1925

20-
def _make_client_key(config: BubbleConfig) -> tuple[str, str]:
26+
def _make_client_key(config: BubbleConfig) -> _ConfigKey:
2127
"""Generate a unique key for client pooling based on config."""
2228
return (config["data_api_root_url"], config["api_key"])
2329

2430

31+
def _create_client_from_config(config: BubbleConfig) -> httpx.AsyncClient:
32+
"""Create a new httpx client from config."""
33+
base_url = config["data_api_root_url"]
34+
if not base_url:
35+
raise ConfigurationError("data_api_root_url")
36+
api_key = config["api_key"]
37+
if not api_key:
38+
raise ConfigurationError("api_key")
39+
return httpx_client_factory(base_url=base_url, api_key=api_key)
40+
41+
2542
def get_client() -> httpx.AsyncClient:
26-
"""Get or create a client for the current config. Thread-safe."""
43+
"""Get or create a client for the current config and event loop. Thread-safe.
44+
45+
Each (config, event_loop) pair gets its own client. When an event loop is
46+
garbage collected, its associated clients are automatically removed.
47+
48+
If called outside an async context (no running loop), returns a fresh
49+
uncached client as a fallback.
50+
"""
2751
config = get_config()
28-
key = _make_client_key(config)
2952

30-
# fast path: no lock if client exists
31-
if key in _clients:
32-
return _clients[key]
53+
try:
54+
current_loop = asyncio.get_running_loop()
55+
except RuntimeError:
56+
# no running loop - return uncached client
57+
# it will bind to whatever loop it's eventually used in
58+
return _create_client_from_config(config)
59+
60+
key = _make_client_key(config)
3361

34-
# slow path: acquire lock for creation
3562
with _lock:
36-
# double-check after acquiring lock
3763
if key not in _clients:
38-
base_url = config["data_api_root_url"]
39-
if not base_url:
40-
raise ConfigurationError("data_api_root_url")
41-
api_key = config["api_key"]
42-
if not api_key:
43-
raise ConfigurationError("api_key")
44-
_clients[key] = httpx_client_factory(base_url=base_url, api_key=api_key)
45-
return _clients[key]
64+
_clients[key] = weakref.WeakKeyDictionary()
65+
66+
loop_clients = _clients[key]
67+
68+
# return cached client if it exists and is still open
69+
if current_loop in loop_clients and not loop_clients[current_loop].is_closed:
70+
return loop_clients[current_loop]
71+
72+
# create new client (either missing or was closed externally)
73+
loop_clients[current_loop] = _create_client_from_config(config)
74+
return loop_clients[current_loop]
4675

4776

4877
async def close_clients() -> None:
49-
"""Close all clients in the pool. Thread-safe. Safe to call multiple times."""
78+
"""Close all clients for the current event loop. Thread-safe. Safe to call multiple times.
79+
80+
Only closes clients bound to the calling loop. Clients for other loops are
81+
left alone (they should be closed by their respective loops, or will be
82+
garbage collected when those loops die).
83+
"""
84+
current_loop = asyncio.get_running_loop()
85+
5086
with _lock:
51-
clients_to_close = list(_clients.values())
52-
_clients.clear()
87+
clients_to_close: list[httpx.AsyncClient] = [
88+
loop_clients.pop(current_loop) for loop_clients in _clients.values() if current_loop in loop_clients
89+
]
5390

5491
for client in clients_to_close:
55-
await client.aclose()
92+
try:
93+
await client.aclose()
94+
except Exception:
95+
pass # best-effort cleanup, continue with remaining clients
5696

5797

5898
@asynccontextmanager
@@ -65,9 +105,12 @@ async def client_scope() -> AsyncIterator[None]:
65105

66106

67107
def _atexit_cleanup() -> None:
68-
"""Best-effort cleanup at interpreter exit."""
108+
"""Best-effort cleanup of all clients at interpreter exit."""
69109
with _lock:
70-
clients_to_close = list(_clients.values())
110+
# collect all clients from all config/loop combinations
111+
clients_to_close: list[httpx.AsyncClient] = []
112+
for loop_clients in _clients.values():
113+
clients_to_close.extend(loop_clients.values())
71114
_clients.clear()
72115

73116
if not clients_to_close:
@@ -92,7 +135,10 @@ def _atexit_cleanup() -> None:
92135
# no running loop, create one and close all clients
93136
async def _close_all() -> None:
94137
for client in clients_to_close:
95-
await client.aclose()
138+
try:
139+
await client.aclose()
140+
except Exception:
141+
pass
96142

97143
try:
98144
asyncio.run(_close_all())

0 commit comments

Comments
 (0)