Skip to content

Commit eeabc68

Browse files
committed
fix(subscribe): lazy session init and silent restart on watchdog timeout
Defer `httpx.Client()` creation to first use and map watchdog timeout to `PNTimeoutCategory` for silent subscribe restart instead of announcing unexpected disconnect.
1 parent 0f60b57 commit eeabc68

3 files changed

Lines changed: 55 additions & 48 deletions

File tree

pubnub/pubnub_asyncio.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async def main():
7676
from pubnub import utils
7777
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
7878
from pubnub.callbacks import SubscribeCallback, ReconnectionCallback
79-
from pubnub.errors import PNERR_REQUEST_CANCELLED, PNERR_CLIENT_TIMEOUT, PNERR_CONNECTION_ERROR
79+
from pubnub.errors import PNERR_REQUEST_CANCELLED, PNERR_CLIENT_TIMEOUT
8080
from pubnub.exceptions import PubNubAsyncioException, PubNubException
8181

8282
# flake8: noqa
@@ -251,11 +251,11 @@ async def request_future(self, options_func, cancellation_event):
251251
return PubNubAsyncioException(
252252
result=None,
253253
status=options_func().create_status(
254-
PNStatusCategory.PNUnexpectedDisconnectCategory,
254+
PNStatusCategory.PNTimeoutCategory,
255255
None,
256256
None,
257257
exception=PubNubException(
258-
pn_error=PNERR_CONNECTION_ERROR,
258+
pn_error=PNERR_CLIENT_TIMEOUT,
259259
errormsg="Wall-clock deadline exceeded (system sleep detected)"
260260
)
261261
)

pubnub/request_handlers/httpx.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -163,15 +163,26 @@ class HttpxRequestHandler(BaseRequestHandler):
163163
ENDPOINT_THREAD_COUNTER: int = 0
164164

165165
def __init__(self, pubnub):
166-
self.session = httpx.Client()
166+
self._session = None
167+
self._session_lock = threading.Lock()
167168
self._watchdog = WallClockDeadlineWatchdog()
168169

169170
self.pubnub = pubnub
170171

172+
def _ensure_session(self):
173+
"""Return the current httpx.Client, creating one if needed. Thread-safe."""
174+
with self._session_lock:
175+
if self._session is None or self._session.is_closed:
176+
logger.debug("Creating new HTTP session")
177+
self._session = httpx.Client()
178+
return self._session
179+
171180
def close(self):
172181
"""Clean up resources: stop the watchdog thread and close the HTTP session."""
173182
self._watchdog.stop()
174-
self.session.close()
183+
with self._session_lock:
184+
if self._session is not None:
185+
self._session.close()
175186

176187
async def async_request(self, options_func, cancellation_event):
177188
raise NotImplementedError("async_request is not implemented for synchronous handler")
@@ -369,9 +380,7 @@ def _invoke_request(self, p_options, e_options, base_origin):
369380
assert isinstance(p_options, PlatformOptions)
370381
assert isinstance(e_options, RequestOptions)
371382

372-
if self.session.is_closed:
373-
logger.debug("HTTP session was closed (e.g. by wall-clock watchdog), recreating")
374-
self.session = httpx.Client()
383+
session = self._ensure_session()
375384

376385
if base_origin:
377386
url = p_options.pn_config.scheme() + "://" + base_origin + e_options.path
@@ -420,10 +429,10 @@ def _invoke_request(self, p_options, e_options, base_origin):
420429
use_watchdog = e_options.request_timeout is not None and e_options.request_timeout > 30
421430

422431
if use_watchdog:
423-
self._watchdog.set_deadline(self.session, time.time() + e_options.request_timeout)
432+
self._watchdog.set_deadline(session, time.time() + e_options.request_timeout)
424433

425434
try:
426-
res = self.session.request(**args)
435+
res = session.request(**args)
427436
# Safely access response text - read content first for streaming responses
428437
try:
429438
logger.debug("GOT %s" % res.text)
@@ -436,9 +445,8 @@ def _invoke_request(self, p_options, e_options, base_origin):
436445

437446
except httpx.ConnectError as e:
438447
if use_watchdog and self._watchdog.triggered:
439-
self.session = httpx.Client()
440448
raise PubNubException(
441-
pn_error=PNERR_CONNECTION_ERROR,
449+
pn_error=PNERR_CLIENT_TIMEOUT,
442450
errormsg="Wall-clock deadline exceeded (system sleep detected)"
443451
)
444452
raise PubNubException(
@@ -447,9 +455,8 @@ def _invoke_request(self, p_options, e_options, base_origin):
447455
)
448456
except httpx.TimeoutException as e:
449457
if use_watchdog and self._watchdog.triggered:
450-
self.session = httpx.Client()
451458
raise PubNubException(
452-
pn_error=PNERR_CONNECTION_ERROR,
459+
pn_error=PNERR_CLIENT_TIMEOUT,
453460
errormsg="Wall-clock deadline exceeded (system sleep detected)"
454461
)
455462
raise PubNubException(
@@ -469,9 +476,8 @@ def _invoke_request(self, p_options, e_options, base_origin):
469476
)
470477
except Exception as e:
471478
if use_watchdog and self._watchdog.triggered:
472-
self.session = httpx.Client()
473479
raise PubNubException(
474-
pn_error=PNERR_CONNECTION_ERROR,
480+
pn_error=PNERR_CLIENT_TIMEOUT,
475481
errormsg="Wall-clock deadline exceeded (system sleep detected)"
476482
)
477483
raise PubNubException(

tests/unit/test_wall_clock_deadline.py

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,9 @@ def test_close_stops_watchdog(self):
362362
from pubnub.request_handlers.httpx import HttpxRequestHandler
363363

364364
handler = HttpxRequestHandler(MagicMock())
365+
session = handler._ensure_session()
365366
# Start the watchdog by setting a deadline
366-
handler._watchdog.set_deadline(handler.session, time.time() + 300)
367+
handler._watchdog.set_deadline(session, time.time() + 300)
367368
self.assertIsNotNone(handler._watchdog._thread)
368369

369370
handler.close()
@@ -391,11 +392,11 @@ def _make_handler(self):
391392
def test_closed_session_is_recreated_on_next_request(self):
392393
"""After session.close(), the next _invoke_request should recreate the session."""
393394
handler = self._make_handler()
394-
original_session = handler.session
395+
original_session = handler._ensure_session()
395396

396397
# Simulate what the watchdog does
397-
handler.session.close()
398-
self.assertTrue(handler.session.is_closed)
398+
original_session.close()
399+
self.assertTrue(handler._session.is_closed)
399400

400401
# Build minimal request options
401402
from pubnub.structures import RequestOptions, PlatformOptions
@@ -422,17 +423,17 @@ def test_closed_session_is_recreated_on_next_request(self):
422423
except Exception:
423424
pass
424425

425-
self.assertFalse(handler.session.is_closed)
426-
self.assertIsNot(handler.session, original_session)
426+
self.assertFalse(handler._session.is_closed)
427+
self.assertIsNot(handler._session, original_session)
427428

428429
handler.close()
429430

430431
def test_open_session_is_not_recreated(self):
431432
"""An open session should not be replaced."""
432433
handler = self._make_handler()
433-
original_session = handler.session
434+
original_session = handler._ensure_session()
434435

435-
self.assertFalse(handler.session.is_closed)
436+
self.assertFalse(handler._session.is_closed)
436437

437438
from pubnub.structures import RequestOptions, PlatformOptions
438439
p_options = MagicMock(spec=PlatformOptions)
@@ -457,18 +458,18 @@ def test_open_session_is_not_recreated(self):
457458
except Exception:
458459
pass
459460

460-
self.assertIs(handler.session, original_session)
461+
self.assertIs(handler._session, original_session)
461462
handler.close()
462463

463464
def test_watchdog_trigger_with_timeout_exception_recreates_session(self):
464465
"""When watchdog triggers and the request gets TimeoutException, session should be recreated."""
465466
handler = self._make_handler()
466-
original_session = handler.session
467+
original_session = handler._ensure_session()
467468

468469
# Set up watchdog as triggered
469-
handler._watchdog.set_deadline(handler.session, time.time() - 1)
470+
handler._watchdog.set_deadline(original_session, time.time() - 1)
470471
time.sleep(0.3)
471-
self.assertTrue(handler.session.is_closed)
472+
self.assertTrue(original_session.is_closed)
472473

473474
from pubnub.structures import RequestOptions, PlatformOptions
474475
p_options = MagicMock(spec=PlatformOptions)
@@ -493,9 +494,9 @@ def test_watchdog_trigger_with_timeout_exception_recreates_session(self):
493494
except Exception:
494495
pass
495496

496-
# Session should have been recreated (either by is_closed check or watchdog trigger handler)
497-
self.assertFalse(handler.session.is_closed)
498-
self.assertIsNot(handler.session, original_session)
497+
# Session should have been recreated by _ensure_session() since original was closed
498+
self.assertFalse(handler._session.is_closed)
499+
self.assertIsNot(handler._session, original_session)
499500

500501
handler._watchdog.stop()
501502
handler.close()
@@ -509,10 +510,11 @@ def test_reconnection_time_request_works_after_session_close(self):
509510
3. /time/0 calls should detect closed session, recreate it, and eventually succeed
510511
"""
511512
handler = self._make_handler()
513+
original_session = handler._ensure_session()
512514

513515
# Step 1: Watchdog closes the session
514-
handler.session.close()
515-
self.assertTrue(handler.session.is_closed)
516+
original_session.close()
517+
self.assertTrue(handler._session.is_closed)
516518

517519
from pubnub.structures import RequestOptions, PlatformOptions
518520
p_options = MagicMock(spec=PlatformOptions)
@@ -540,21 +542,22 @@ def test_reconnection_time_request_works_after_session_close(self):
540542
# Should be a connection error, NOT "Cannot send a request, as the client has been closed"
541543
self.assertNotIn("client has been closed", str(e).lower())
542544

543-
self.assertFalse(handler.session.is_closed)
545+
self.assertFalse(handler._session.is_closed)
544546
handler.close()
545547

546548

547549
class TestWallClockErrorCategory(unittest.TestCase):
548-
"""Tests that wall-clock sleep timeouts produce PNUnexpectedDisconnectCategory,
549-
not PNTimeoutCategory — so they route through the reconnection manager with
550-
configured retry delays instead of an immediate silent restart.
550+
"""Tests that wall-clock sleep timeouts produce PNTimeoutCategory,
551+
so they silently restart the subscribe loop. If the network is actually
552+
down, the next request will fail with a real connection error that routes
553+
through the reconnection manager with configured retry delays.
551554
"""
552555

553-
def test_watchdog_triggered_produces_connection_error(self):
554-
"""When watchdog triggers during a request, the exception should use PNERR_CONNECTION_ERROR
555-
which maps to PNUnexpectedDisconnectCategory in _build_envelope."""
556+
def test_watchdog_triggered_produces_timeout_error(self):
557+
"""When watchdog triggers during a request, the exception should use PNERR_CLIENT_TIMEOUT
558+
which maps to PNTimeoutCategory in _build_envelope."""
556559
from pubnub.request_handlers.httpx import HttpxRequestHandler
557-
from pubnub.errors import PNERR_CONNECTION_ERROR
560+
from pubnub.errors import PNERR_CLIENT_TIMEOUT
558561
from pubnub.exceptions import PubNubException
559562
from pubnub.structures import RequestOptions, PlatformOptions
560563

@@ -565,7 +568,7 @@ def test_watchdog_triggered_produces_connection_error(self):
565568
mock_session = MagicMock(spec=httpx.Client)
566569
mock_session.is_closed = False
567570
mock_session.request.side_effect = httpx.ReadTimeout("timed out")
568-
handler.session = mock_session
571+
handler._session = mock_session
569572

570573
# Mock watchdog.triggered to return True (simulates watchdog firing during request)
571574
type(handler._watchdog).triggered = property(lambda self: True)
@@ -590,10 +593,8 @@ def test_watchdog_triggered_produces_connection_error(self):
590593
with self.assertRaises(PubNubException) as ctx:
591594
handler._invoke_request(p_options, e_options, 'ps.pndsn.com')
592595

593-
self.assertEqual(ctx.exception._pn_error, PNERR_CONNECTION_ERROR)
596+
self.assertEqual(ctx.exception._pn_error, PNERR_CLIENT_TIMEOUT)
594597
self.assertIn("Wall-clock deadline exceeded", ctx.exception._errormsg)
595-
# Session should have been recreated
596-
self.assertIsNot(handler.session, mock_session)
597598

598599
handler._watchdog.stop()
599600
handler.close()
@@ -636,8 +637,8 @@ def mock_time():
636637

637638
asyncio.run(run_test())
638639

639-
def test_async_wall_clock_timeout_maps_to_unexpected_disconnect(self):
640-
"""WallClockTimeoutError should produce PNUnexpectedDisconnectCategory in request_future."""
640+
def test_async_wall_clock_timeout_maps_to_timeout_category(self):
641+
"""WallClockTimeoutError should produce PNTimeoutCategory in request_future."""
641642
from pubnub.request_handlers.async_httpx import WallClockTimeoutError
642643
from pubnub.pubnub_asyncio import PubNubAsyncio
643644
from pubnub.pnconfiguration import PNConfiguration
@@ -668,7 +669,7 @@ def options_func():
668669
self.assertTrue(result.is_error())
669670
self.assertEqual(
670671
result.status.category,
671-
PNStatusCategory.PNUnexpectedDisconnectCategory
672+
PNStatusCategory.PNTimeoutCategory
672673
)
673674
finally:
674675
await pubnub.stop()

0 commit comments

Comments
 (0)