Skip to content

Commit fe0d9a3

Browse files
authored
Remove hard cap and honor configured retry intervals (#231)
fix(reconnection): remove hard cap and honor configured retry intervals The retry limit was silently clamped to the policy default, configured intervals were ignored by delay calculators, and the async reconnection loop never checked the limit or advanced its counter — fix all three and add a `maximum_reconnection_interval` config option. refactor(publish): remove client-side publish sequence number (`seqn`) `seqn` is not required by PubNub REST API, so remove `PublishSequenceManager`, all its subclasses, `MAX_SEQUENCE`, and `seqn` injection.
1 parent a61ef5d commit fe0d9a3

78 files changed

Lines changed: 1022 additions & 737 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.pubnub.yml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: python
2-
version: 10.6.2
2+
version: 10.6.3
33
schema: 1
44
scm: github.com/pubnub/python
55
sdks:
@@ -18,7 +18,7 @@ sdks:
1818
distributions:
1919
- distribution-type: library
2020
distribution-repository: package
21-
package-name: pubnub-10.6.2
21+
package-name: pubnub-10.6.3
2222
location: https://pypi.org/project/pubnub/
2323
supported-platforms:
2424
supported-operating-systems:
@@ -94,8 +94,8 @@ sdks:
9494
-
9595
distribution-type: library
9696
distribution-repository: git release
97-
package-name: pubnub-10.6.2
98-
location: https://github.com/pubnub/python/releases/download/10.6.2/pubnub-10.6.2.tar.gz
97+
package-name: pubnub-10.6.3
98+
location: https://github.com/pubnub/python/releases/download/10.6.3/pubnub-10.6.3.tar.gz
9999
supported-platforms:
100100
supported-operating-systems:
101101
Linux:
@@ -169,6 +169,15 @@ sdks:
169169
license-url: https://github.com/encode/httpx/blob/master/LICENSE.md
170170
is-required: Required
171171
changelog:
172+
- date: 2026-04-20
173+
version: 10.6.3
174+
changes:
175+
- type: bug
176+
text: "The retry limit was silently clamped to the policy default, configured intervals were ignored by delay calculators, and the async reconnection loop never checked the limit or advanced its counter — fix all three and add a `maximum_reconnection_interval` config option."
177+
- type: improvement
178+
text: "`seqn` is not required by PubNub REST API, so remove `PublishSequenceManager`, all its subclasses, `MAX_SEQUENCE`, and `seqn` injection."
179+
- type: improvement
180+
text: "Cover `LinearDelay`, `ExponentialDelay`, `ReconnectionManager`, `ReconnectEffect`, and `HeartbeatDelayedEffect` with deterministic assertions for default, custom, and edge cases."
172181
- date: 2026-03-26
173182
version: 10.6.2
174183
changes:

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
## 10.6.3
2+
April 20 2026
3+
4+
#### Fixed
5+
- The retry limit was silently clamped to the policy default, configured intervals were ignored by delay calculators, and the async reconnection loop never checked the limit or advanced its counter — fix all three and add a `maximum_reconnection_interval` config option.
6+
7+
#### Modified
8+
- `seqn` is not required by PubNub REST API, so remove `PublishSequenceManager`, all its subclasses, `MAX_SEQUENCE`, and `seqn` injection.
9+
- Cover `LinearDelay`, `ExponentialDelay`, `ReconnectionManager`, `ReconnectEffect`, and `HeartbeatDelayedEffect` with deterministic assertions for default, custom, and edge cases.
10+
111
## 10.6.2
212
March 26 2026
313

pubnub/event_engine/effects.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ def __init__(self, pubnub_instance, event_engine_instance,
154154
super().__init__(pubnub_instance, event_engine_instance, invocation)
155155
self.reconnection_policy = pubnub_instance.config.reconnect_policy
156156
self.interval = pubnub_instance.config.reconnection_interval
157+
self.maximum_interval = pubnub_instance.config.maximum_reconnection_interval
157158

158159
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
159160
self.max_retry_attempts = ExponentialDelay.MAX_RETRIES
@@ -177,11 +178,10 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs):
177178

178179
def calculate_reconnection_delay(self, attempts):
179180
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
180-
delay = ExponentialDelay.calculate(attempts)
181-
elif self.interval is None:
182-
delay = LinearDelay.calculate(attempts)
181+
delay = ExponentialDelay.calculate(
182+
attempts, minimum_delay=self.interval, maximum_delay=self.maximum_interval)
183183
else:
184-
delay = self.interval
184+
delay = LinearDelay.calculate(attempts, delay=self.interval)
185185

186186
return delay
187187

@@ -367,6 +367,7 @@ def __init__(self, pubnub_instance, event_engine_instance,
367367
super().__init__(pubnub_instance, event_engine_instance, invocation)
368368
self.reconnection_policy = pubnub_instance.config.reconnect_policy
369369
self.interval = pubnub_instance.config.reconnection_interval
370+
self.maximum_interval = pubnub_instance.config.maximum_reconnection_interval
370371

371372
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
372373
self.max_retry_attempts = ExponentialDelay.MAX_RETRIES
@@ -387,11 +388,10 @@ def _should_give_up(self, attempts):
387388

388389
def calculate_reconnection_delay(self, attempts):
389390
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
390-
delay = ExponentialDelay.calculate(attempts)
391-
elif self.interval is None:
392-
delay = LinearDelay.calculate(attempts)
391+
delay = ExponentialDelay.calculate(
392+
attempts, minimum_delay=self.interval, maximum_delay=self.maximum_interval)
393393
else:
394-
delay = self.interval
394+
delay = LinearDelay.calculate(attempts, delay=self.interval)
395395

396396
return delay
397397

pubnub/managers.py

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,6 @@
1919
logger = logging.getLogger("pubnub")
2020

2121

22-
class PublishSequenceManager:
23-
def __init__(self, provided_max_sequence):
24-
self.max_sequence = provided_max_sequence
25-
self.next_sequence = 0
26-
27-
@abstractmethod
28-
def get_next_sequence(self):
29-
if self.max_sequence == self.next_sequence:
30-
self.next_sequence = 1
31-
else:
32-
self.next_sequence += 1
33-
return self.next_sequence
34-
35-
3622
class BasePathManager(object):
3723
MAX_SUBDOMAIN = 20
3824
DEFAULT_SUBDOMAIN = "pubsub"
@@ -64,12 +50,14 @@ def set_reconnection_listener(self, reconnection_callback):
6450
def _recalculate_interval(self):
6551
policy = self._pubnub.config.reconnect_policy
6652
interval = self._pubnub.config.reconnection_interval
67-
if policy == PNReconnectionPolicy.LINEAR and interval is not None:
68-
self._timer_interval = interval
69-
elif policy == PNReconnectionPolicy.LINEAR:
70-
self._timer_interval = LinearDelay.calculate(self._connection_errors)
53+
if policy == PNReconnectionPolicy.LINEAR:
54+
self._timer_interval = LinearDelay.calculate(self._connection_errors, delay=interval)
7155
else:
72-
self._timer_interval = ExponentialDelay.calculate(self._connection_errors)
56+
self._timer_interval = ExponentialDelay.calculate(
57+
self._connection_errors,
58+
minimum_delay=interval,
59+
maximum_delay=self._pubnub.config.maximum_reconnection_interval
60+
)
7361

7462
def _retry_limit_reached(self):
7563
user_limit = self._pubnub.config.maximum_reconnection_retries
@@ -83,7 +71,7 @@ def _retry_limit_reached(self):
8371
policy_limit = (LinearDelay.MAX_RETRIES if policy == PNReconnectionPolicy.LINEAR
8472
else ExponentialDelay.MAX_RETRIES)
8573
if user_limit is not None:
86-
return self._connection_errors >= min(user_limit, policy_limit)
74+
return self._connection_errors >= user_limit
8775
return self._connection_errors > policy_limit
8876

8977
@abstractmethod
@@ -101,8 +89,9 @@ class LinearDelay:
10189
MAX_RETRIES = 10
10290

10391
@classmethod
104-
def calculate(cls, attempt: int):
105-
return cls.INTERVAL + round(random.random(), 3)
92+
def calculate(cls, attempt: int, delay=None):
93+
base = delay if delay is not None else cls.INTERVAL
94+
return base + round(random.random(), 3)
10695

10796

10897
class ExponentialDelay:
@@ -112,8 +101,10 @@ class ExponentialDelay:
112101
MAX_BACKOFF = 150
113102

114103
@classmethod
115-
def calculate(cls, attempt: int) -> int:
116-
return min(cls.MAX_BACKOFF, cls.MIN_DELAY * (2 ** attempt)) + round(random.random(), 3)
104+
def calculate(cls, attempt: int, minimum_delay=None, maximum_delay=None) -> float:
105+
min_delay = minimum_delay if minimum_delay is not None else cls.MIN_DELAY
106+
max_backoff = maximum_delay if maximum_delay is not None else cls.MAX_BACKOFF
107+
return min(max_backoff, min_delay * (2 ** attempt)) + round(random.random(), 3)
117108

118109

119110
class StateManager:

pubnub/pnconfiguration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ def __init__(self,
4141
self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES
4242
self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
4343
self.maximum_reconnection_retries = None # -1 means unlimited/ 0 means no retries
44-
self.reconnection_interval = None # if None is left the default value from LinearDelay is used
44+
self.reconnection_interval = None # if None is left the default value from LinearDelay/ExponentialDelay is used
45+
self.maximum_reconnection_interval = None # if None the default value from ExponentialDelay is used
4546
self.daemon = False
4647
self.use_random_initialization_vector = True
4748
self.suppress_leave_events = False

pubnub/pubnub.py

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
- PubNub: Main class for interacting with PubNub services
99
- NativeSubscriptionManager: Handles channel subscriptions and message processing
1010
- NativeReconnectionManager: Manages network reconnection strategies
11-
- NativePublishSequenceManager: Manages message sequence numbers for publishing
1211
- SubscribeListener: Helper class for handling subscription events
1312
- NonSubscribeListener: Helper class for handling non-subscription operations
1413
@@ -40,7 +39,7 @@
4039
- The SDK uses multiple threads for different operations
4140
- SubscribeMessageWorker runs in a daemon thread
4241
- Heartbeat and reconnection timers run in separate threads
43-
- Thread-safe implementations for sequence management and message queuing
42+
- Thread-safe implementations for message queuing
4443
4544
Error Handling:
4645
- Automatic retry mechanisms for failed operations
@@ -70,7 +69,7 @@
7069
from pubnub.endpoints.presence.leave import Leave
7170
from pubnub.endpoints.pubsub.subscribe import Subscribe
7271
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
73-
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager
72+
from pubnub.managers import SubscriptionManager, ReconnectionManager
7473
from pubnub.models.consumer.common import PNStatus
7574
from pubnub.pnconfiguration import PNConfiguration
7675
from pubnub.pubnub_core import PubNubCore
@@ -124,8 +123,6 @@ def __init__(self, config: PNConfiguration, *, custom_request_handler: Type[Base
124123
if self.config.enable_subscribe:
125124
self._subscription_manager = NativeSubscriptionManager(self)
126125

127-
self._publish_sequence_manager = PublishSequenceManager(PubNubCore.MAX_SEQUENCE)
128-
129126
def sdk_platform(self) -> str:
130127
"""Get the SDK platform identifier.
131128
@@ -205,12 +202,7 @@ def request_async(self, endpoint_name, endpoint_call_options, callback, cancella
205202
)
206203

207204
def merge_in_params(self, options):
208-
params_to_merge_in = {}
209-
210-
if options.operation_type == PNOperationType.PNPublishOperation:
211-
params_to_merge_in['seqn'] = self._publish_sequence_manager.get_next_sequence()
212-
213-
options.merge_params_in(params_to_merge_in)
205+
options.merge_params_in({})
214206

215207
def stop(self):
216208
"""Stop all subscriptions and clean up resources.
@@ -305,21 +297,6 @@ def stop_heartbeat_timer(self):
305297
self._timer.cancel()
306298

307299

308-
class NativePublishSequenceManager(PublishSequenceManager):
309-
def __init__(self, provided_max_sequence):
310-
super(NativePublishSequenceManager, self).__init__(provided_max_sequence)
311-
self._lock = threading.Lock()
312-
313-
def get_next_sequence(self):
314-
with self._lock:
315-
if self.max_sequence == self.next_sequence:
316-
self.next_sequence = 1
317-
else:
318-
self.next_sequence += 1
319-
320-
return self.next_sequence
321-
322-
323300
class NativeSubscriptionManager(SubscriptionManager):
324301
"""Manages channel subscriptions and message processing.
325302

pubnub/pubnub_asyncio.py

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
- AsyncioSubscriptionManager: Async implementation of subscription handling
1010
- EventEngineSubscriptionManager: Event-driven subscription management
1111
- AsyncioReconnectionManager: Async network reconnection handling
12-
- AsyncioPublishSequenceManager: Async message sequence management
13-
1412
Features:
1513
- Asynchronous publish/subscribe messaging
1614
- Non-blocking network operations
@@ -72,7 +70,7 @@ async def main():
7270
from pubnub.request_handlers.base import BaseRequestHandler
7371
from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler, WallClockTimeoutError
7472
from pubnub.workers import SubscribeMessageWorker
75-
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager
73+
from pubnub.managers import SubscriptionManager, ReconnectionManager
7674
from pubnub import utils
7775
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
7876
from pubnub.callbacks import SubscribeCallback, ReconnectionCallback
@@ -153,8 +151,6 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None, *,
153151
if self.config.enable_subscribe:
154152
self._subscription_manager = subscription_manager(self)
155153

156-
self._publish_sequence_manager = AsyncioPublishSequenceManager(self.event_loop, PubNubCore.MAX_SEQUENCE)
157-
158154
@property
159155
def _connector(self):
160156
return self._request_handler._connector
@@ -317,6 +313,13 @@ async def _register_heartbeat_timer(self):
317313
reconnection attempt based on the current state.
318314
"""
319315
while True:
316+
if self._retry_limit_reached():
317+
logger.warning("Reconnection retry limit reached. Disconnecting.")
318+
disconnect_status = PNStatus()
319+
disconnect_status.category = PNStatusCategory.PNDisconnectedCategory
320+
self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status)
321+
break
322+
320323
self._recalculate_interval()
321324
await asyncio.sleep(self._timer_interval)
322325
logger.debug("reconnect loop at: %s" % utils.datetime_now())
@@ -327,9 +330,8 @@ async def _register_heartbeat_timer(self):
327330
self._callback.on_reconnect()
328331
break
329332
except Exception:
330-
if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.EXPONENTIAL:
331-
logger.debug("reconnect interval increment at: %s" % utils.datetime_now())
332-
self._connection_errors += 1
333+
logger.debug("reconnect interval increment at: %s" % utils.datetime_now())
334+
self._connection_errors += 1
333335

334336
def start_polling(self):
335337
"""Start the reconnection polling process."""
@@ -345,22 +347,6 @@ def stop_polling(self):
345347
self._task.cancel()
346348

347349

348-
class AsyncioPublishSequenceManager(PublishSequenceManager):
349-
def __init__(self, ioloop, provided_max_sequence):
350-
super(AsyncioPublishSequenceManager, self).__init__(provided_max_sequence)
351-
self._lock = asyncio.Lock()
352-
self._event_loop = ioloop
353-
354-
async def get_next_sequence(self):
355-
async with self._lock:
356-
if self.max_sequence == self.next_sequence:
357-
self.next_sequence = 1
358-
else:
359-
self.next_sequence += 1
360-
361-
return self.next_sequence
362-
363-
364350
class AsyncioSubscriptionManager(SubscriptionManager):
365351
"""Manages channel subscriptions and message processing.
366352

pubnub/pubnub_core.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ class PubNubCore:
165165
SDK_NAME: str = "PubNub-Python"
166166

167167
TIMESTAMP_DIVIDER: int = 1000
168-
MAX_SEQUENCE: int = 65535
169168

170169
__metaclass__ = ABCMeta
171170
__crypto: Optional[PubNubCryptoModule] = None
@@ -190,7 +189,6 @@ def __init__(self, config: PNConfiguration) -> None:
190189
}
191190

192191
self._subscription_manager = None
193-
self._publish_sequence_manager = None
194192
self._base_path_manager = BasePathManager(config)
195193
self._token_manager = TokenManager()
196194
self._subscription_registry = PNSubscriptionRegistry(self)

pubnub/request_handlers/async_aiohttp.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from asyncio import Event
88
from pubnub import utils
9-
from pubnub.enums import PNOperationType, PNStatusCategory
9+
from pubnub.enums import PNStatusCategory
1010
from pubnub.errors import PNERR_CLIENT_ERROR, PNERR_JSON_DECODING_FAILED, PNERR_SERVER_ERROR
1111
from pubnub.exceptions import PubNubException
1212
from pubnub.models.envelopes import AsyncioEnvelope
@@ -73,12 +73,7 @@ async def async_request(self, options_func, cancellation_event):
7373
create_status = options.create_status
7474
create_exception = options.create_exception
7575

76-
params_to_merge_in = {}
77-
78-
if options.operation_type == PNOperationType.PNPublishOperation:
79-
params_to_merge_in['seqn'] = await self.pubnub._publish_sequence_manager.get_next_sequence()
80-
81-
options.merge_params_in(params_to_merge_in)
76+
options.merge_params_in({})
8277

8378
if options.use_base_path:
8479
url = utils.build_url(self.pubnub.config.scheme(), self.pubnub.base_origin, options.path,

pubnub/request_handlers/async_httpx.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import urllib
88

99
from pubnub import utils
10-
from pubnub.enums import PNOperationType, PNStatusCategory
10+
from pubnub.enums import PNStatusCategory
1111
from pubnub.errors import PNERR_CLIENT_ERROR, PNERR_JSON_DECODING_FAILED, PNERR_SERVER_ERROR
1212
from pubnub.exceptions import PubNubException
1313
from pubnub.models.envelopes import AsyncioEnvelope
@@ -115,12 +115,7 @@ async def async_request(self, options_func, cancellation_event):
115115
create_status = options.create_status
116116
create_exception = options.create_exception
117117

118-
params_to_merge_in = {}
119-
120-
if options.operation_type == PNOperationType.PNPublishOperation:
121-
params_to_merge_in['seqn'] = await self.pubnub._publish_sequence_manager.get_next_sequence()
122-
123-
options.merge_params_in(params_to_merge_in)
118+
options.merge_params_in({})
124119

125120
if options.use_base_path:
126121
url = utils.build_url(self.pubnub.config.scheme(), self.pubnub.base_origin, options.path,

0 commit comments

Comments
 (0)