Skip to content

Commit d947cac

Browse files
committed
ws client hardening
1 parent b89b0da commit d947cac

7 files changed

Lines changed: 137 additions & 9 deletions

File tree

README.md

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ A comprehensive Python package to interact with the Mist Cloud APIs, built from
3232
- [Examples](#examples)
3333
- [WebSocket Streaming](#websocket-streaming)
3434
- [Connection Parameters](#connection-parameters)
35-
- [Callbacks](#callbacks)
35+
- [Methods](#methods)
3636
- [Available Channels](#available-channels)
3737
- [Usage Patterns](#usage-patterns)
3838
- [Async Usage](#async-usage)
@@ -588,6 +588,7 @@ All channel classes accept the following optional keyword arguments:
588588
| `auto_reconnect` | `bool` | `False` | Automatically reconnect on transient failures using exponential backoff. |
589589
| `max_reconnect_attempts` | `int` | `5` | Maximum number of reconnect attempts before giving up. |
590590
| `reconnect_backoff` | `float` | `2.0` | Base backoff delay in seconds. Doubles after each failed attempt (2s, 4s, 8s, ...). Resets on successful reconnection. |
591+
| `queue_maxsize` | `int` | `0` | Maximum messages buffered in the internal queue for `receive()`. `0` means unbounded. When set, the receive thread blocks if the queue is full, providing backpressure for high-frequency streams. |
591592

592593
```python
593594
ws = mistapi.websockets.sites.DeviceStatsEvents(
@@ -600,15 +601,18 @@ ws = mistapi.websockets.sites.DeviceStatsEvents(
600601
ws.connect()
601602
```
602603

603-
### Callbacks
604+
### Methods
604605

605606
| Method | Signature | Description |
606607
|--------|-----------|-------------|
607-
| `ws.on_open(cb)` | `cb()` | Called when the connection is established |
608-
| `ws.on_message(cb)` | `cb(data: dict)` | Called for every incoming message |
609-
| `ws.on_error(cb)` | `cb(error: Exception)` | Called on WebSocket errors |
610-
| `ws.on_close(cb)` | `cb(status_code: int, msg: str)` | Called when the connection closes |
611-
| `ws.ready()` | `-> bool \| None` | Returns `True` if the connection is open and ready |
608+
| `ws.on_open(cb)` | `cb()` | Register callback for connection established |
609+
| `ws.on_message(cb)` | `cb(data: dict)` | Register callback for incoming messages. Mutually exclusive with `receive()`. |
610+
| `ws.on_error(cb)` | `cb(error: Exception)` | Register callback for WebSocket errors |
611+
| `ws.on_close(cb)` | `cb(code: int, msg: str)` | Register callback for connection close. Safe to call `connect()` from within. |
612+
| `ws.connect(run_in_background)` | | Open the connection. `True` (default) runs in a daemon thread; `False` blocks. |
613+
| `ws.disconnect(wait, timeout)` | | Close the connection. `wait=True` blocks until the background thread finishes. |
614+
| `ws.receive()` | `-> Generator[dict]` | Blocking generator yielding messages. Mutually exclusive with `on_message`. |
615+
| `ws.ready()` | `-> bool` | Returns `True` if the connection is open and ready |
612616

613617
### Available Channels
614618

src/mistapi/websockets/__ws_client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"""
1414

1515
import json
16+
import logging
1617
import queue
1718
import ssl
1819
import threading
@@ -23,6 +24,10 @@
2324

2425
from mistapi.__logger import logger
2526

27+
# Prevent websocket-client from logging HTTP headers (which contain API
28+
# tokens) at DEBUG level.
29+
logging.getLogger("websocket").setLevel(logging.WARNING)
30+
2631
if TYPE_CHECKING:
2732
from mistapi import APISession
2833

@@ -48,6 +53,7 @@ def __init__(
4853
auto_reconnect: bool = False,
4954
max_reconnect_attempts: int = 5,
5055
reconnect_backoff: float = 2.0,
56+
queue_maxsize: int = 0,
5157
) -> None:
5258
if max_reconnect_attempts < 0:
5359
raise ValueError("max_reconnect_attempts must be >= 0")
@@ -64,7 +70,7 @@ def __init__(
6470
self._lock = threading.Lock()
6571
self._ws: websocket.WebSocketApp | None = None
6672
self._thread: threading.Thread | None = None
67-
self._queue: queue.Queue[dict | None] = queue.Queue()
73+
self._queue: queue.Queue[dict | None] = queue.Queue(maxsize=queue_maxsize)
6874
self._connected = (
6975
threading.Event()
7076
) # tracks whether the WebSocket connection is currently open

src/mistapi/websockets/location.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ class BleAssetsEvents(_MistWebsocket):
3939
Maximum number of reconnect attempts before giving up.
4040
reconnect_backoff : float, default 2.0
4141
Base backoff delay in seconds. Doubles after each failed attempt.
42-
42+
queue_maxsize : int, default 0
43+
Maximum number of messages buffered in the internal queue for the
44+
``receive()`` generator. ``0`` means unbounded. When set, the
45+
websocket-client receive thread blocks if the queue is full,
46+
providing backpressure for high-frequency streams.
4347
4448
EXAMPLE
4549
-----------
@@ -76,6 +80,7 @@ def __init__(
7680
auto_reconnect: bool = False,
7781
max_reconnect_attempts: int = 5,
7882
reconnect_backoff: float = 2.0,
83+
queue_maxsize: int = 0,
7984
) -> None:
8085
channels = [f"/sites/{site_id}/stats/maps/{mid}/assets" for mid in map_ids]
8186
super().__init__(
@@ -86,6 +91,7 @@ def __init__(
8691
auto_reconnect=auto_reconnect,
8792
max_reconnect_attempts=max_reconnect_attempts,
8893
reconnect_backoff=reconnect_backoff,
94+
queue_maxsize=queue_maxsize,
8995
)
9096

9197

@@ -113,6 +119,11 @@ class ConnectedClientsEvents(_MistWebsocket):
113119
Maximum number of reconnect attempts before giving up.
114120
reconnect_backoff : float, default 2.0
115121
Base backoff delay in seconds. Doubles after each failed attempt.
122+
queue_maxsize : int, default 0
123+
Maximum number of messages buffered in the internal queue for the
124+
``receive()`` generator. ``0`` means unbounded. When set, the
125+
websocket-client receive thread blocks if the queue is full,
126+
providing backpressure for high-frequency streams.
116127
117128
EXAMPLE
118129
-----------
@@ -149,6 +160,7 @@ def __init__(
149160
auto_reconnect: bool = False,
150161
max_reconnect_attempts: int = 5,
151162
reconnect_backoff: float = 2.0,
163+
queue_maxsize: int = 0,
152164
) -> None:
153165
channels = [f"/sites/{site_id}/stats/maps/{mid}/clients" for mid in map_ids]
154166
super().__init__(
@@ -159,6 +171,7 @@ def __init__(
159171
auto_reconnect=auto_reconnect,
160172
max_reconnect_attempts=max_reconnect_attempts,
161173
reconnect_backoff=reconnect_backoff,
174+
queue_maxsize=queue_maxsize,
162175
)
163176

164177

@@ -186,6 +199,11 @@ class SdkClientsEvents(_MistWebsocket):
186199
Maximum number of reconnect attempts before giving up.
187200
reconnect_backoff : float, default 2.0
188201
Base backoff delay in seconds. Doubles after each failed attempt.
202+
queue_maxsize : int, default 0
203+
Maximum number of messages buffered in the internal queue for the
204+
``receive()`` generator. ``0`` means unbounded. When set, the
205+
websocket-client receive thread blocks if the queue is full,
206+
providing backpressure for high-frequency streams.
189207
190208
EXAMPLE
191209
-----------
@@ -222,6 +240,7 @@ def __init__(
222240
auto_reconnect: bool = False,
223241
max_reconnect_attempts: int = 5,
224242
reconnect_backoff: float = 2.0,
243+
queue_maxsize: int = 0,
225244
) -> None:
226245
channels = [f"/sites/{site_id}/stats/maps/{mid}/sdkclients" for mid in map_ids]
227246
super().__init__(
@@ -232,6 +251,7 @@ def __init__(
232251
auto_reconnect=auto_reconnect,
233252
max_reconnect_attempts=max_reconnect_attempts,
234253
reconnect_backoff=reconnect_backoff,
254+
queue_maxsize=queue_maxsize,
235255
)
236256

237257

@@ -259,6 +279,11 @@ class UnconnectedClientsEvents(_MistWebsocket):
259279
Maximum number of reconnect attempts before giving up.
260280
reconnect_backoff : float, default 2.0
261281
Base backoff delay in seconds. Doubles after each failed attempt.
282+
queue_maxsize : int, default 0
283+
Maximum number of messages buffered in the internal queue for the
284+
``receive()`` generator. ``0`` means unbounded. When set, the
285+
websocket-client receive thread blocks if the queue is full,
286+
providing backpressure for high-frequency streams.
262287
263288
EXAMPLE
264289
-----------
@@ -295,6 +320,7 @@ def __init__(
295320
auto_reconnect: bool = False,
296321
max_reconnect_attempts: int = 5,
297322
reconnect_backoff: float = 2.0,
323+
queue_maxsize: int = 0,
298324
) -> None:
299325
channels = [
300326
f"/sites/{site_id}/stats/maps/{mid}/unconnected_clients" for mid in map_ids
@@ -307,6 +333,7 @@ def __init__(
307333
auto_reconnect=auto_reconnect,
308334
max_reconnect_attempts=max_reconnect_attempts,
309335
reconnect_backoff=reconnect_backoff,
336+
queue_maxsize=queue_maxsize,
310337
)
311338

312339

@@ -334,6 +361,11 @@ class DiscoveredBleAssetsEvents(_MistWebsocket):
334361
Maximum number of reconnect attempts before giving up.
335362
reconnect_backoff : float, default 2.0
336363
Base backoff delay in seconds. Doubles after each failed attempt.
364+
queue_maxsize : int, default 0
365+
Maximum number of messages buffered in the internal queue for the
366+
``receive()`` generator. ``0`` means unbounded. When set, the
367+
websocket-client receive thread blocks if the queue is full,
368+
providing backpressure for high-frequency streams.
337369
338370
EXAMPLE
339371
-----------
@@ -370,6 +402,7 @@ def __init__(
370402
auto_reconnect: bool = False,
371403
max_reconnect_attempts: int = 5,
372404
reconnect_backoff: float = 2.0,
405+
queue_maxsize: int = 0,
373406
) -> None:
374407
channels = [
375408
f"/sites/{site_id}/stats/maps/{mid}/discovered_assets" for mid in map_ids
@@ -382,4 +415,5 @@ def __init__(
382415
auto_reconnect=auto_reconnect,
383416
max_reconnect_attempts=max_reconnect_attempts,
384417
reconnect_backoff=reconnect_backoff,
418+
queue_maxsize=queue_maxsize,
385419
)

src/mistapi/websockets/orgs.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ class InsightsEvents(_MistWebsocket):
3737
Maximum number of reconnect attempts before giving up.
3838
reconnect_backoff : float, default 2.0
3939
Base backoff delay in seconds. Doubles after each failed attempt.
40+
queue_maxsize : int, default 0
41+
Maximum number of messages buffered in the internal queue for the
42+
``receive()`` generator. ``0`` means unbounded. When set, the
43+
websocket-client receive thread blocks if the queue is full,
44+
providing backpressure for high-frequency streams.
4045
4146
EXAMPLE
4247
-----------
@@ -72,6 +77,7 @@ def __init__(
7277
auto_reconnect: bool = False,
7378
max_reconnect_attempts: int = 5,
7479
reconnect_backoff: float = 2.0,
80+
queue_maxsize: int = 0,
7581
) -> None:
7682
super().__init__(
7783
mist_session,
@@ -81,6 +87,7 @@ def __init__(
8187
auto_reconnect=auto_reconnect,
8288
max_reconnect_attempts=max_reconnect_attempts,
8389
reconnect_backoff=reconnect_backoff,
90+
queue_maxsize=queue_maxsize,
8491
)
8592

8693

@@ -106,6 +113,11 @@ class MxEdgesStatsEvents(_MistWebsocket):
106113
Maximum number of reconnect attempts before giving up.
107114
reconnect_backoff : float, default 2.0
108115
Base backoff delay in seconds. Doubles after each failed attempt.
116+
queue_maxsize : int, default 0
117+
Maximum number of messages buffered in the internal queue for the
118+
``receive()`` generator. ``0`` means unbounded. When set, the
119+
websocket-client receive thread blocks if the queue is full,
120+
providing backpressure for high-frequency streams.
109121
110122
EXAMPLE
111123
-----------
@@ -141,6 +153,7 @@ def __init__(
141153
auto_reconnect: bool = False,
142154
max_reconnect_attempts: int = 5,
143155
reconnect_backoff: float = 2.0,
156+
queue_maxsize: int = 0,
144157
) -> None:
145158
super().__init__(
146159
mist_session,
@@ -150,6 +163,7 @@ def __init__(
150163
auto_reconnect=auto_reconnect,
151164
max_reconnect_attempts=max_reconnect_attempts,
152165
reconnect_backoff=reconnect_backoff,
166+
queue_maxsize=queue_maxsize,
153167
)
154168

155169

@@ -175,6 +189,11 @@ class MxEdgesEvents(_MistWebsocket):
175189
Maximum number of reconnect attempts before giving up.
176190
reconnect_backoff : float, default 2.0
177191
Base backoff delay in seconds. Doubles after each failed attempt.
192+
queue_maxsize : int, default 0
193+
Maximum number of messages buffered in the internal queue for the
194+
``receive()`` generator. ``0`` means unbounded. When set, the
195+
websocket-client receive thread blocks if the queue is full,
196+
providing backpressure for high-frequency streams.
178197
179198
EXAMPLE
180199
-----------
@@ -210,6 +229,7 @@ def __init__(
210229
auto_reconnect: bool = False,
211230
max_reconnect_attempts: int = 5,
212231
reconnect_backoff: float = 2.0,
232+
queue_maxsize: int = 0,
213233
) -> None:
214234
super().__init__(
215235
mist_session,
@@ -219,4 +239,5 @@ def __init__(
219239
auto_reconnect=auto_reconnect,
220240
max_reconnect_attempts=max_reconnect_attempts,
221241
reconnect_backoff=reconnect_backoff,
242+
queue_maxsize=queue_maxsize,
222243
)

src/mistapi/websockets/session.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ class SessionWithUrl(_MistWebsocket):
4444
Maximum number of reconnect attempts before giving up.
4545
reconnect_backoff : float, default 2.0
4646
Base backoff delay in seconds. Doubles after each failed attempt.
47+
queue_maxsize : int, default 0
48+
Maximum number of messages buffered in the internal queue for the
49+
``receive()`` generator. ``0`` means unbounded. When set, the
50+
websocket-client receive thread blocks if the queue is full,
51+
providing backpressure for high-frequency streams.
4752
4853
EXAMPLE
4954
-----------
@@ -79,6 +84,7 @@ def __init__(
7984
auto_reconnect: bool = False,
8085
max_reconnect_attempts: int = 5,
8186
reconnect_backoff: float = 2.0,
87+
queue_maxsize: int = 0,
8288
) -> None:
8389
if not url.startswith("wss://"):
8490
raise ValueError("url must use the wss:// scheme")
@@ -91,6 +97,7 @@ def __init__(
9197
auto_reconnect=auto_reconnect,
9298
max_reconnect_attempts=max_reconnect_attempts,
9399
reconnect_backoff=reconnect_backoff,
100+
queue_maxsize=queue_maxsize,
94101
)
95102

96103
def _build_ws_url(self) -> str:

0 commit comments

Comments
 (0)