Skip to content

Commit 9aaf130

Browse files
committed
fix(asyncio): fix error propagation in async request path
Ensure `PubNubAsyncioException` always carries a valid `PNStatus` with error data instead of `None`. fix(asyncio): fix `PubNubAsyncioException.__str__` crash Handle cases where status or `error_data` is `None` instead of raising `AttributeError`. fix(event-engine): fix error type checks in effects Match `PubNubAsyncioException` which is what `request_future` actually returns on failure. fix(event-engine): fix give-up logic for unlimited retries Handle `-1 (unlimited)` correctly since `attempts > -1` was always `true`, causing immediate give-up. fix(event-engine): initialize heartbeat max retry attempts Use delay class defaults instead of config value which could be `None` causing `TypeError` on comparison. fix(event-engine): add missing return after `heartbeat` give-up Prevent falling through to start a heartbeat after deciding to give up. fix(request-handlers): use explicit `httpx.Timeout` object Set all four timeout fields explicitly instead of a 2-tuple that left write and pool unset. fix(request-handlers): enforce wall-clock deadline to survive system sleep On macOS and Linux, `time.monotonic()` does not advance during system sleep, causing socket and `asyncio` timeouts (310s subscribe) to stall for hours of wall-clock time. Add `time.time()`-based deadline checks that detect sleep and cancel stale requests within ~5s of wake. fix(asyncio): replace `asyncio.wait_for` with wall-clock-aware loop Use `asyncio.wait()` with periodic `time.time()` checks instead of a single monotonic-based `wait_for()`, yielding to the event loop between checks. fix(native-threads): add `WallClockDeadlineWatchdog` Persistent single daemon thread monitors `time.time()` every 5s and closes the `httpx` session when the wall-clock deadline passes, interrupting the blocking socket read. Tracks deadlines per calling thread so concurrent requests (e.g., subscribe + publish) don't interfere. Only armed for long-timeout requests (>30s). Session is recreated for subsequent requests test(wall-clock-deadline): add unit tests for sleep detection Cover both `asyncio` and threads paths: simulated clock jumps, normal passthrough, clean watchdog shutdown, per-thread deadline isolation, concurrent request independence, cleanup, and exception propagation. test(native-threads): add try/finally cleanup to subscribe tests Ensure `pubnub.stop()` always runs to prevent non-daemon threads from blocking process exit. test(native-threads): fix flaky where_now and here_now tests Enable presence heartbeat and use unique channel names so presence registers on the server. test(file-upload): fix shared state leak in file upload tests Restore `cipher_key` after use in `send_file` and pass it explicitly to `download_file`. test(message-actions): use unique channel names Avoid collisions with stale data from prior test runs.
1 parent 23abadc commit 9aaf130

15 files changed

Lines changed: 1046 additions & 308 deletions

File tree

pubnub/event_engine/effects.py

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from pubnub.endpoints.presence.leave import Leave
77
from pubnub.endpoints.pubsub.subscribe import Subscribe
88
from pubnub.enums import PNReconnectionPolicy
9-
from pubnub.exceptions import PubNubException
9+
from pubnub.exceptions import PubNubAsyncioException, PubNubException
1010
from pubnub.features import feature_enabled
1111
from pubnub.models.server.subscribe import SubscribeMessage
1212
from pubnub.pubnub import PubNub
@@ -80,9 +80,10 @@ async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0
8080
request.timetoken(0)
8181
response = await request.future()
8282

83-
if isinstance(response, Exception):
83+
if isinstance(response, PubNubAsyncioException):
8484
self.logger.warning(f'Handshake failed: {str(response)}')
85-
handshake_failure = events.HandshakeFailureEvent(response, 1, timetoken=timetoken)
85+
reason = response.status.error_data if response.status and response.status.error_data else str(response)
86+
handshake_failure = events.HandshakeFailureEvent(reason, 1, timetoken=timetoken)
8687
self.event_engine.trigger(handshake_failure)
8788
elif response.status.error:
8889
self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}')
@@ -184,8 +185,15 @@ def calculate_reconnection_delay(self, attempts):
184185

185186
return delay
186187

188+
def _should_give_up(self, attempts):
189+
if self.reconnection_policy is PNReconnectionPolicy.NONE:
190+
return True
191+
if self.max_retry_attempts == -1:
192+
return False
193+
return attempts > self.max_retry_attempts
194+
187195
def run(self):
188-
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
196+
if self._should_give_up(self.invocation.attempts):
189197
self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts)
190198
else:
191199
attempts = self.invocation.attempts
@@ -214,9 +222,10 @@ async def delayed_reconnect_async(self, delay, attempt):
214222

215223
response = await request.future()
216224

217-
if isinstance(response, PubNubException):
225+
if isinstance(response, PubNubAsyncioException):
218226
self.logger.warning(f'Reconnect failed: {str(response)}')
219-
self.failure(str(response), attempt, self.get_timetoken())
227+
reason = response.status.error_data if response.status and response.status.error_data else str(response)
228+
self.failure(reason, attempt, self.get_timetoken())
220229

221230
elif response.status.error:
222231
self.logger.warning(f'Reconnect failed: {response.status.error_data.__dict__}')
@@ -302,10 +311,11 @@ async def heartbeat(self, channels, groups, stop_event):
302311

303312
response = await request.future()
304313

305-
if isinstance(response, PubNubException):
314+
if isinstance(response, PubNubAsyncioException):
306315
self.logger.warning(f'Heartbeat failed: {str(response)}')
316+
reason = response.status.error_data if response.status and response.status.error_data else str(response)
307317
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
308-
reason=response.status.error_data, attempt=1))
318+
reason=reason, attempt=1))
309319
elif response.status and response.status.error:
310320
self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}')
311321
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
@@ -345,18 +355,36 @@ async def leave(self, channels, groups, stop_event):
345355
leave_request = Leave(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
346356
leave = await leave_request.future()
347357

348-
if leave.status.error:
349-
self.logger.warning(f'Heartbeat failed: {leave.status.error_data.__dict__}')
358+
if isinstance(leave, PubNubAsyncioException):
359+
self.logger.warning(f'Leave failed: {str(leave)}')
360+
elif leave.status and leave.status.error:
361+
self.logger.warning(f'Leave failed: {leave.status.error_data.__dict__}')
350362

351363

352364
class HeartbeatDelayedEffect(Effect):
353365
def __init__(self, pubnub_instance, event_engine_instance,
354366
invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None:
355367
super().__init__(pubnub_instance, event_engine_instance, invocation)
356368
self.reconnection_policy = pubnub_instance.config.reconnect_policy
357-
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries
358369
self.interval = pubnub_instance.config.reconnection_interval
359370

371+
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
372+
self.max_retry_attempts = ExponentialDelay.MAX_RETRIES
373+
elif self.reconnection_policy is PNReconnectionPolicy.LINEAR:
374+
self.max_retry_attempts = LinearDelay.MAX_RETRIES
375+
else:
376+
self.max_retry_attempts = 0
377+
378+
if pubnub_instance.config.maximum_reconnection_retries is not None:
379+
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries
380+
381+
def _should_give_up(self, attempts):
382+
if self.reconnection_policy is PNReconnectionPolicy.NONE:
383+
return True
384+
if self.max_retry_attempts == -1:
385+
return False
386+
return attempts > self.max_retry_attempts
387+
360388
def calculate_reconnection_delay(self, attempts):
361389
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
362390
delay = ExponentialDelay.calculate(attempts)
@@ -368,23 +396,19 @@ def calculate_reconnection_delay(self, attempts):
368396
return delay
369397

370398
def run(self):
371-
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
399+
if self._should_give_up(self.invocation.attempts):
372400
self.event_engine.trigger(events.HeartbeatGiveUpEvent(channels=self.invocation.channels,
373401
groups=self.invocation.groups,
374402
reason=self.invocation.reason,
375403
attempt=self.invocation.attempts))
404+
return
376405

377406
if hasattr(self.pubnub, 'event_loop'):
378407
self.stop_event = self.get_new_stop_event()
379408
self.run_async(self.heartbeat(channels=self.invocation.channels, groups=self.invocation.groups,
380409
attempt=self.invocation.attempts, stop_event=self.stop_event))
381410

382411
async def heartbeat(self, channels, groups, attempt, stop_event):
383-
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
384-
self.event_engine.trigger(events.HeartbeatGiveUpEvent(channels=self.invocation.channels,
385-
groups=self.invocation.groups,
386-
reason=self.invocation.reason,
387-
attempt=self.invocation.attempts))
388412

389413
channels = list(filter(lambda ch: not ch.endswith('-pnpres'), self.invocation.channels))
390414
groups = list(filter(lambda gr: not gr.endswith('-pnpres'), self.invocation.groups))
@@ -395,12 +419,13 @@ async def heartbeat(self, channels, groups, attempt, stop_event):
395419
await asyncio.sleep(delay)
396420

397421
response = await request.future()
398-
if isinstance(response, PubNubException):
422+
if isinstance(response, PubNubAsyncioException):
399423
self.logger.warning(f'Heartbeat failed: {str(response)}')
424+
reason = response.status.error_data if response.status and response.status.error_data else str(response)
400425
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
401-
reason=response.status.error_data,
426+
reason=reason,
402427
attempt=attempt))
403-
elif response.status.error:
428+
elif response.status and response.status.error:
404429
self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}')
405430
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
406431
reason=response.status.error_data,

pubnub/exceptions.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ def __init__(self, result, status):
4343
self.status = status
4444

4545
def __str__(self):
46-
return str(self.status.error_data.exception)
46+
if self.status and hasattr(self.status, 'error_data') and self.status.error_data:
47+
return str(self.status.error_data.exception)
48+
return f"PubNubAsyncioException(result={self.result}, status={self.status})"
4749

4850
@staticmethod
4951
def is_error():

pubnub/pubnub.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,14 @@ def stop(self):
218218
Raises:
219219
Exception: If subscription manager is not enabled
220220
"""
221-
if self._subscription_manager is not None:
222-
self._subscription_manager.stop()
223-
else:
224-
raise Exception("Subscription manager is not enabled for this instance")
221+
try:
222+
if self._subscription_manager is not None:
223+
self._subscription_manager.stop()
224+
else:
225+
raise Exception("Subscription manager is not enabled for this instance")
226+
finally:
227+
if hasattr(self._request_handler, 'close'):
228+
self._request_handler.close()
225229

226230
def request_deferred(self, options_func):
227231
raise NotImplementedError

pubnub/pubnub_asyncio.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ async def main():
6262
from pubnub.event_engine.models import events, states
6363

6464
from pubnub.models.consumer.common import PNStatus
65+
from pubnub.models.consumer.pn_error_data import PNErrorData
6566
from pubnub.dtos import SubscribeOperation, UnsubscribeOperation
6667
from pubnub.event_engine.statemachine import StateMachine
6768
from pubnub.endpoints.presence.heartbeat import Heartbeat
@@ -234,9 +235,17 @@ async def request_future(self, options_func, cancellation_event):
234235
res = await self._request_handler.async_request(options_func, cancellation_event)
235236
return res
236237
except PubNubException as e:
238+
if e.status is not None:
239+
status = e.status
240+
else:
241+
status = PNStatus()
242+
status.category = PNStatusCategory.PNBadRequestCategory
243+
status.error = True
244+
status.error_data = PNErrorData(str(e), e)
245+
status.status_code = e._status_code if e._status_code != 0 else None
237246
return PubNubAsyncioException(
238247
result=None,
239-
status=e.status
248+
status=status
240249
)
241250
except asyncio.TimeoutError:
242251
return PubNubAsyncioException(

pubnub/request_handlers/async_httpx.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from asyncio import Event
22
import asyncio
33
import logging
4+
import time
45
import httpx
56
import json # noqa # pylint: disable=W0611
67
import urllib
@@ -56,6 +57,39 @@ def sync_request(self, **_):
5657
def threaded_request(self, **_):
5758
raise NotImplementedError("threaded_request is not implemented for asyncio handler")
5859

60+
WALL_CLOCK_CHECK_INTERVAL = 5.0
61+
62+
async def _request_with_wall_clock_deadline(self, request_arguments, timeout):
63+
"""Execute an HTTP request with wall-clock deadline enforcement.
64+
65+
On macOS and Linux, time.monotonic() (and thus asyncio timeouts, socket timeouts)
66+
does not advance during system sleep. A 310-second subscribe timeout can take hours
67+
of wall-clock time if the machine sleeps. This method uses time.time() (wall clock)
68+
to enforce the deadline regardless of sleep, while yielding to the event loop between checks.
69+
"""
70+
if timeout is None:
71+
return await self._session.request(**request_arguments)
72+
73+
wall_deadline = time.time() + timeout
74+
request_task = asyncio.ensure_future(self._session.request(**request_arguments))
75+
76+
try:
77+
while True:
78+
remaining = wall_deadline - time.time()
79+
if remaining <= 0:
80+
request_task.cancel()
81+
raise asyncio.TimeoutError("Wall-clock deadline exceeded")
82+
83+
done, _ = await asyncio.wait(
84+
{request_task},
85+
timeout=min(self.WALL_CLOCK_CHECK_INTERVAL, remaining)
86+
)
87+
if done:
88+
return request_task.result()
89+
except BaseException:
90+
request_task.cancel()
91+
raise
92+
5993
async def async_request(self, options_func, cancellation_event):
6094
"""
6195
Query string should be provided as a manually serialized and encoded string.
@@ -103,7 +137,10 @@ async def async_request(self, options_func, cancellation_event):
103137
'headers': request_headers,
104138
'url': full_url,
105139
'follow_redirects': options.allow_redirects,
106-
'timeout': (options.connect_timeout, options.request_timeout),
140+
'timeout': httpx.Timeout(connect=options.connect_timeout,
141+
read=options.request_timeout,
142+
write=options.connect_timeout,
143+
pool=options.connect_timeout),
107144
}
108145
if options.is_post() or options.is_patch():
109146
request_arguments['content'] = options.data
@@ -112,9 +149,8 @@ async def async_request(self, options_func, cancellation_event):
112149
try:
113150
if not self._session:
114151
await self.create_session()
115-
response = await asyncio.wait_for(
116-
self._session.request(**request_arguments),
117-
options.request_timeout
152+
response = await self._request_with_wall_clock_deadline(
153+
request_arguments, options.request_timeout
118154
)
119155
except (asyncio.TimeoutError, asyncio.CancelledError):
120156
raise

0 commit comments

Comments
 (0)