Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion roborock/mqtt/roborock_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
_LOGGER = logging.getLogger(__name__)
_MQTT_LOGGER = logging.getLogger(f"{__name__}.aiomqtt")

CLIENT_KEEPALIVE = datetime.timedelta(seconds=60)
CLIENT_KEEPALIVE = datetime.timedelta(seconds=45)
TOPIC_KEEPALIVE = datetime.timedelta(seconds=60)

# Exponential backoff parameters
Expand Down Expand Up @@ -175,6 +175,16 @@ async def _run_reconnect_loop(self, start_future: asyncio.Future[None] | None) -
if self._stop:
_LOGGER.debug("MQTT session closed, stopping retry loop")
return
if not self._client_subscribed_topics and not self._listeners.keys():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this implementation looks good. One thought is this behavior could live in LazyMqttSession. I can see that on one hand (a) it could be easier to make a more efficient "wake up" method in the lazy session if the logic is simpler (e.g. using an asyncio.Event to do wake up when a new subscriber is added) but (b) might make it more complex to have an "API" between the two classes exposed for this. Just sharing a thought but not need to act on it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a good idea, I don't think i'll act on it right now, but we can see how things end up looking with future work with the mqtt connection and how we handle the subscriptions

_LOGGER.debug("MQTT session disconnected with no active subscriptions, deferring reconnect")
self._diagnostics.increment("reconnect_deferred")
while not self._stop and not self._client_subscribed_topics and not self._listeners.keys():
await asyncio.sleep(0.1)
if self._stop:
_LOGGER.debug("MQTT session closed while waiting for active subscriptions")
return
self._backoff = MIN_BACKOFF_INTERVAL
continue
_LOGGER.info("MQTT session disconnected, retrying in %s seconds", self._backoff.total_seconds())
self._diagnostics.increment("reconnect_wait")
await asyncio.sleep(self._backoff.total_seconds())
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/__snapshots__/test_mqtt_session.ambr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: test_session_e2e_publish_message
[mqtt >]
00000000 10 21 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.!..MQTT...<....|
00000000 10 21 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.!..MQTT...-....|
00000010 08 75 73 65 72 6e 61 6d 65 00 08 70 61 73 73 77 |.username..passw|
00000020 6f 72 64 |ord|
[mqtt <]
Expand All @@ -15,7 +15,7 @@
# ---
# name: test_session_e2e_receive_message
[mqtt >]
00000000 10 21 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.!..MQTT...<....|
00000000 10 21 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.!..MQTT...-....|
00000010 08 75 73 65 72 6e 61 6d 65 00 08 70 61 73 73 77 |.username..passw|
00000020 6f 72 64 |ord|
[mqtt <]
Expand Down
29 changes: 29 additions & 0 deletions tests/mqtt/test_roborock_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,37 @@ def succeed_then_fail_unauthorized() -> Any:
session = await create_mqtt_session(params)
assert session.connected

# Keep an active subscription so reconnect attempts are not deferred.
await session.subscribe("topic-1", Subscriber().append)

try:
async with asyncio.timeout(10):
assert await unauthorized.wait()
finally:
await session.close()


async def test_session_defers_reconnect_when_idle() -> None:
"""Test that reconnects are deferred when there are no active subscriptions."""

session = RoborockMqttSession(FAKE_PARAMS)
start_future: asyncio.Future[None] = asyncio.Future()
connect_attempts = 0

async def fake_run_connection(start: asyncio.Future[None] | None) -> None:
nonlocal connect_attempts
connect_attempts += 1
if start and not start.done():
start.set_result(None)

with patch.object(session, "_run_connection", side_effect=fake_run_connection):
reconnect_task = asyncio.create_task(session._run_reconnect_loop(start_future))
Comment thread
Lash-L marked this conversation as resolved.
Outdated
try:
await start_future
await asyncio.sleep(0.1)
assert connect_attempts == 1
assert session._diagnostics.as_dict().get("reconnect_deferred", 0) >= 1
Comment thread
Lash-L marked this conversation as resolved.
Outdated
finally:
session._stop = True
reconnect_task.cancel()
await asyncio.gather(reconnect_task, return_exceptions=True)
Loading