Skip to content
64 changes: 64 additions & 0 deletions roborock/devices/rpc/b01_q10_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@

from __future__ import annotations

import asyncio
import logging
from collections.abc import Iterable
from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.devices.transport.mqtt_channel import MqttChannel
from roborock.exceptions import RoborockException
from roborock.protocols.b01_q10_protocol import (
ParamsType,
decode_rpc_response,
encode_mqtt_payload,
)
from roborock.roborock_message import RoborockMessage

_LOGGER = logging.getLogger(__name__)
_TIMEOUT = 10.0


async def send_command(
Expand All @@ -34,3 +40,61 @@ async def send_command(
ex,
)
raise


async def send_decoded_command(
mqtt_channel: MqttChannel,
command: B01_Q10_DP,
params: ParamsType,
expected_dps: Iterable[B01_Q10_DP] | None = None,
) -> dict[B01_Q10_DP, Any]:
"""Send a command and await the first decoded response.

Q10 responses are not correlated with a message id, so we filter on
expected datapoints when provided.
"""
roborock_message = encode_mqtt_payload(command, params)
future: asyncio.Future[dict[B01_Q10_DP, Any]] = asyncio.get_running_loop().create_future()

expected_set = set(expected_dps) if expected_dps is not None else None

def find_response(response_message: RoborockMessage) -> None:
try:
decoded_dps = decode_rpc_response(response_message)
except RoborockException as ex:
_LOGGER.debug(
"Failed to decode B01 Q10 RPC response (expecting %s): %s: %s",
command,
response_message,
ex,
)
return
if expected_set and not any(dps in decoded_dps for dps in expected_set):
return
if not future.done():
future.set_result(decoded_dps)

unsub = await mqtt_channel.subscribe(find_response)

_LOGGER.debug("Sending MQTT message: %s", roborock_message)
try:
await mqtt_channel.publish(roborock_message)
return await asyncio.wait_for(future, timeout=_TIMEOUT)
except TimeoutError as ex:
raise RoborockException(f"B01 Q10 command timed out after {_TIMEOUT}s ({command})") from ex
except RoborockException as ex:
_LOGGER.warning(
"Error sending B01 Q10 decoded command (%s): %s",
command,
ex,
)
raise
except Exception as ex:
_LOGGER.exception(
"Error sending B01 Q10 decoded command (%s): %s",
command,
ex,
)
raise
finally:
unsub()
6 changes: 6 additions & 0 deletions roborock/devices/traits/b01/q10/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from roborock.devices.transport.mqtt_channel import MqttChannel

from .command import CommandTrait
from .status import StatusTrait
from .vacuum import VacuumTrait

__all__ = [
"Q10PropertiesApi",
"StatusTrait",
]


Expand All @@ -17,12 +19,16 @@ class Q10PropertiesApi(Trait):
command: CommandTrait
"""Trait for sending commands to Q10 devices."""

status: StatusTrait
"""Trait for querying Q10 device status."""

vacuum: VacuumTrait
"""Trait for sending vacuum related commands to Q10 devices."""

def __init__(self, channel: MqttChannel) -> None:
"""Initialize the B01Props API."""
self.command = CommandTrait(channel)
self.status = StatusTrait(channel)
self.vacuum = VacuumTrait(self.command)


Expand Down
71 changes: 71 additions & 0 deletions roborock/devices/traits/b01/q10/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Status trait for Q10 B01 devices."""

from __future__ import annotations

from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import (
B01_Q10_DP,
YXDeviceCleanTask,
YXDeviceState,
YXDeviceWorkMode,
YXFanLevel,
)
from roborock.devices.rpc.b01_q10_channel import send_decoded_command
from roborock.devices.transport.mqtt_channel import MqttChannel


class StatusTrait:
"""Trait for requesting and holding Q10 status values."""

def __init__(self, channel: MqttChannel) -> None:
self._channel = channel
self._data: dict[B01_Q10_DP, Any] = {}

@property
def data(self) -> dict[B01_Q10_DP, Any]:
"""Return the latest raw status data."""
return self._data

async def refresh(self) -> dict[B01_Q10_DP, Any]:
"""Refresh status values from the device."""
decoded = await send_decoded_command(
self._channel,
command=B01_Q10_DP.REQUEST_DPS,
params={},
expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY},
)
self._data = decoded
return decoded

@property
def state_code(self) -> int | None:
return self._data.get(B01_Q10_DP.STATUS)

@property
def state(self) -> YXDeviceState | None:
code = self.state_code
return YXDeviceState.from_code_optional(code) if code is not None else None

@property
def battery(self) -> int | None:
return self._data.get(B01_Q10_DP.BATTERY)

@property
def fan_level(self) -> YXFanLevel | None:
value = self._data.get(B01_Q10_DP.FAN_LEVEL)
return YXFanLevel.from_code_optional(value) if value is not None else None

@property
def clean_mode(self) -> YXDeviceWorkMode | None:
value = self._data.get(B01_Q10_DP.CLEAN_MODE)
return YXDeviceWorkMode.from_code_optional(value) if value is not None else None

@property
def clean_task(self) -> YXDeviceCleanTask | None:
value = self._data.get(B01_Q10_DP.CLEAN_TASK_TYPE)
return YXDeviceCleanTask.from_code_optional(value) if value is not None else None

@property
def cleaning_progress(self) -> int | None:
return self._data.get(B01_Q10_DP.CLEANING_PROGRESS)
173 changes: 173 additions & 0 deletions tests/devices/rpc/test_b01_q10_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Tests for B01 Q10 channel functions."""

import json
from typing import Any, cast

import pytest

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.devices.rpc.b01_q10_channel import send_command, send_decoded_command
from roborock.exceptions import RoborockException
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
from tests.fixtures.channel_fixtures import FakeChannel


@pytest.fixture(name="fake_channel")
def fake_channel_fixture() -> FakeChannel:
return FakeChannel()


def build_q10_dps_response(dps: dict[str, Any]) -> RoborockMessage:
"""Build a Q10 MQTT response message with DPS data."""
payload = {"dps": dps}
return RoborockMessage(
protocol=cast(RoborockMessageProtocol, 11), # MQTT protocol for B01 Q10
payload=json.dumps(payload).encode(),
seq=0,
version=b"B01",
)


async def test_send_command(fake_channel: FakeChannel) -> None:
"""Test sending a command without waiting for response."""
await send_command(fake_channel, B01_Q10_DP.START_CLEAN, {"cmd": 1}) # type: ignore[arg-type]

assert len(fake_channel.published_messages) == 1
message = fake_channel.published_messages[0]
assert message.payload is not None
payload_data = json.loads(message.payload.decode())
assert payload_data == {"dps": {"201": {"cmd": 1}}}


async def test_send_decoded_command_basic(fake_channel: FakeChannel) -> None:
"""Test sending a command and receiving a decoded response."""
# Queue a response
fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100}))

result = await send_decoded_command(
fake_channel, # type: ignore[arg-type]
B01_Q10_DP.REQUEST_DPS,
{},
expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY},
)

assert B01_Q10_DP.STATUS in result
assert B01_Q10_DP.BATTERY in result
assert result[B01_Q10_DP.STATUS] == 5
assert result[B01_Q10_DP.BATTERY] == 100


async def test_send_decoded_command_without_expected_dps(fake_channel: FakeChannel) -> None:
"""Test send_decoded_command accepts any response when expected_dps is None."""
# Queue a response with any DPS
fake_channel.response_queue.append(build_q10_dps_response({"123": 2}))

result = await send_decoded_command(
fake_channel, # type: ignore[arg-type]
B01_Q10_DP.REQUEST_DPS,
{},
expected_dps=None,
)

# Should accept any response
assert B01_Q10_DP.FAN_LEVEL in result
assert result[B01_Q10_DP.FAN_LEVEL] == 2


async def test_send_decoded_command_filters_by_expected_dps(fake_channel: FakeChannel) -> None:
"""Test that send_decoded_command filters by expected DPS."""
# Queue response with expected DPS
fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100}))

result = await send_decoded_command(
fake_channel, # type: ignore[arg-type]
B01_Q10_DP.REQUEST_DPS,
{},
expected_dps={B01_Q10_DP.STATUS},
)

# Should accept response with expected DPS
assert B01_Q10_DP.STATUS in result
assert result[B01_Q10_DP.STATUS] == 5


async def test_send_decoded_command_timeout(fake_channel: FakeChannel) -> None:
"""Test that send_decoded_command times out when no matching response."""
# Don't queue any response

with pytest.raises(RoborockException, match="B01 Q10 command timed out"):
await send_decoded_command(
fake_channel, # type: ignore[arg-type]
B01_Q10_DP.REQUEST_DPS,
{},
expected_dps={B01_Q10_DP.STATUS},
)


async def test_send_decoded_command_ignores_decode_errors(fake_channel: FakeChannel) -> None:
"""Test that send_decoded_command ignores non-decodable messages."""
# Queue a valid response (invalid responses are ignored by not matching expected_dps)
fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100}))

result = await send_decoded_command(
fake_channel, # type: ignore[arg-type]
B01_Q10_DP.REQUEST_DPS,
{},
expected_dps={B01_Q10_DP.STATUS},
)

# Should successfully decode and return valid response
assert B01_Q10_DP.STATUS in result


async def test_send_decoded_command_partial_match(fake_channel: FakeChannel) -> None:
"""Test that send_decoded_command accepts response with at least one expected DPS."""
# Queue response with only one of multiple expected DPS
fake_channel.response_queue.append(build_q10_dps_response({"121": 5}))

result = await send_decoded_command(
fake_channel, # type: ignore[arg-type]
B01_Q10_DP.REQUEST_DPS,
{},
expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY},
)

# Should accept response with at least one expected DPS
assert B01_Q10_DP.STATUS in result
assert result[B01_Q10_DP.STATUS] == 5


async def test_send_decoded_command_published_message(fake_channel: FakeChannel) -> None:
"""Test that send_decoded_command publishes the correct message."""
fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100}))

await send_decoded_command(
fake_channel, # type: ignore[arg-type]
B01_Q10_DP.REQUEST_DPS,
{},
expected_dps={B01_Q10_DP.STATUS},
)

# Check published message
assert len(fake_channel.published_messages) == 1
message = fake_channel.published_messages[0]
assert message.payload is not None
payload_data = json.loads(message.payload.decode())
assert payload_data == {"dps": {"102": {}}}


async def test_send_decoded_command_with_params(fake_channel: FakeChannel) -> None:
"""Test send_decoded_command with command parameters."""
fake_channel.response_queue.append(build_q10_dps_response({"121": 3, "122": 100}))

await send_decoded_command(
fake_channel, # type: ignore[arg-type]
B01_Q10_DP.START_CLEAN,
{"cmd": 1},
expected_dps={B01_Q10_DP.STATUS},
)

message = fake_channel.published_messages[0]
assert message.payload is not None
payload_data = json.loads(message.payload.decode())
assert payload_data == {"dps": {"201": {"cmd": 1}}}
Loading