Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions roborock/devices/rpc/b01_q7_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@

from roborock.devices.transport.mqtt_channel import MqttChannel
from roborock.exceptions import RoborockException
from roborock.protocols.b01_q7_protocol import B01_VERSION, Q7RequestMessage, decode_rpc_response, encode_mqtt_payload
from roborock.protocols.b01_q7_protocol import (
B01_VERSION,
MapKey,
Q7RequestMessage,
decode_map_payload,
decode_rpc_response,
encode_mqtt_payload,
)
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -127,18 +134,32 @@ def find_response(response_message: RoborockMessage) -> DecodedB01Response | Non
raise


async def send_map_command(mqtt_channel: MqttChannel, request_message: Q7RequestMessage) -> bytes:
"""Send map upload command and wait for MAP_RESPONSE payload bytes.
class MapRpcChannel:
"""RPC channel for map-related commands on B01/Q7 devices."""

This stays separate from ``send_decoded_command()`` because map uploads arrive as
raw ``MAP_RESPONSE`` payload bytes instead of a decoded RPC ``data`` payload.
"""
def __init__(self, mqtt_channel: MqttChannel, map_key: MapKey) -> None:
self._mqtt_channel = mqtt_channel
self._map_key = map_key

try:
return await _send_command(
mqtt_channel,
request_message,
response_matcher=lambda response_message: _matches_map_response(response_message, version=B01_VERSION),
)
except TimeoutError as ex:
raise RoborockException(f"B01 map command timed out after {_TIMEOUT}s ({request_message})") from ex
async def send_map_command(self, request_message: Q7RequestMessage) -> bytes:
"""Send a map upload command and return decoded SCMap bytes.

This publishes the request and waits for a matching ``MAP_RESPONSE`` message
with the correct protocol version. The raw ``MAP_RESPONSE`` payload bytes are
then decoded/inflated via :func:`decode_map_payload` using this channel's
``map_key``, and the resulting SCMap bytes are returned.

The returned value is the decoded map data bytes suitable for passing to the
map parser library, not the raw MQTT ``MAP_RESPONSE`` payload bytes.
"""

try:
raw_payload = await _send_command(
self._mqtt_channel,
request_message,
response_matcher=lambda response_message: _matches_map_response(response_message, version=B01_VERSION),
)
except TimeoutError as ex:
raise RoborockException(f"B01 map command timed out after {_TIMEOUT}s ({request_message})") from ex

return decode_map_payload(raw_payload, map_key=self._map_key)
20 changes: 14 additions & 6 deletions roborock/devices/traits/b01/q7/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
SCWindMapping,
WaterLevelMapping,
)
from roborock.devices.rpc.b01_q7_channel import send_decoded_command
from roborock.devices.rpc.b01_q7_channel import MapRpcChannel, send_decoded_command
from roborock.devices.traits import Trait
from roborock.devices.transport.mqtt_channel import MqttChannel
from roborock.protocols.b01_q7_protocol import B01_Q7_DPS, CommandType, ParamsType, Q7RequestMessage
from roborock.exceptions import RoborockException
from roborock.protocols.b01_q7_protocol import B01_Q7_DPS, CommandType, ParamsType, Q7RequestMessage, create_map_key
from roborock.roborock_message import RoborockB01Props
from roborock.roborock_typing import RoborockB01Q7Methods

Expand Down Expand Up @@ -51,9 +52,12 @@ class Q7PropertiesApi(Trait):
map_content: MapContentTrait
"""Trait for fetching parsed current map content."""

def __init__(self, channel: MqttChannel, *, device: HomeDataDevice, product: HomeDataProduct) -> None:
def __init__(
self, channel: MqttChannel, map_rpc_channel: MapRpcChannel, device: HomeDataDevice, product: HomeDataProduct
) -> None:
"""Initialize the Q7 API."""
self._channel = channel
self._map_rpc_channel = map_rpc_channel
self._device = device
self._product = product

Expand All @@ -63,9 +67,8 @@ def __init__(self, channel: MqttChannel, *, device: HomeDataDevice, product: Hom
self.clean_summary = CleanSummaryTrait(channel)
self.map = MapTrait(channel)
self.map_content = MapContentTrait(
self._map_rpc_channel,
self.map,
serial=device.sn,
model=product.model,
)

async def query_values(self, props: list[RoborockB01Props]) -> B01Props | None:
Expand Down Expand Up @@ -173,4 +176,9 @@ async def send(self, command: CommandType, params: ParamsType) -> Any:

def create(product: HomeDataProduct, device: HomeDataDevice, channel: MqttChannel) -> Q7PropertiesApi:
"""Create traits for B01 Q7 devices."""
return Q7PropertiesApi(channel, device=device, product=product)
if device.sn is None or product.model is None:
raise RoborockException(
f"Device serial number and product model are required (sn:: {device.sn}, model: {product.model})"
)
map_rpc_channel = MapRpcChannel(channel, map_key=create_map_key(serial=device.sn, model=product.model))
return Q7PropertiesApi(channel, device=device, product=product, map_rpc_channel=map_rpc_channel)
34 changes: 6 additions & 28 deletions roborock/devices/traits/b01/q7/map.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""Map trait for B01 Q7 devices."""

import asyncio

from roborock.data import Q7MapList
from roborock.devices.rpc.b01_q7_channel import send_decoded_command, send_map_command
from roborock.devices.rpc.b01_q7_channel import send_decoded_command
from roborock.devices.traits import Trait
from roborock.devices.transport.mqtt_channel import MqttChannel
from roborock.exceptions import RoborockException
Expand All @@ -12,14 +10,15 @@


class MapTrait(Q7MapList, Trait):
"""Map retrieval + map metadata helpers for Q7 devices."""
"""Map trait for B01/Q7 devices, responsible for fetching and caching map list metadata.

The MapContent is fetched from the MapContent trait, which relies on this trait to determine the
current map ID to fetch.
"""

def __init__(self, channel: MqttChannel) -> None:
super().__init__()
self._channel = channel
# Map uploads are serialized per-device to avoid response cross-wiring.
self._map_command_lock = asyncio.Lock()
self._loaded = False

async def refresh(self) -> None:
"""Refresh cached map list metadata from the device."""
Expand All @@ -36,24 +35,3 @@ async def refresh(self) -> None:
raise RoborockException(f"Failed to decode map list response: {response!r}")

self.map_list = parsed.map_list
self._loaded = True

async def _get_map_payload(self, *, map_id: int) -> bytes:
"""Fetch raw map payload bytes for the given map id."""
request = Q7RequestMessage(
dps=B01_Q7_DPS,
command=RoborockB01Q7Methods.UPLOAD_BY_MAPID,
params={"map_id": map_id},
)
async with self._map_command_lock:
return await send_map_command(self._channel, request)

async def get_current_map_payload(self) -> bytes:
"""Fetch raw map payload bytes for the currently selected map."""
if not self._loaded:
await self.refresh()

map_id = self.current_map_id
if map_id is None:
raise RoborockException(f"Unable to determine map_id from map list response: {self!r}")
return await self._get_map_payload(map_id=map_id)
54 changes: 28 additions & 26 deletions roborock/devices/traits/b01/q7/map_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
For B01/Q7 devices, the underlying raw map payload is retrieved via `MapTrait`.
"""

import asyncio
from dataclasses import dataclass

from vacuum_map_parser_base.map_data import MapData

from roborock.data import RoborockBase
from roborock.devices.rpc.b01_q7_channel import MapRpcChannel
from roborock.devices.traits import Trait
from roborock.exceptions import RoborockException
from roborock.map.b01_map_parser import B01MapParser, B01MapParserConfig
from roborock.protocols.b01_q7_protocol import B01_Q7_DPS, Q7RequestMessage
from roborock.roborock_typing import RoborockB01Q7Methods

from .map import MapTrait

Expand Down Expand Up @@ -51,38 +55,38 @@ class MapContentTrait(MapContent, Trait):

def __init__(
self,
map_rpc_channel: MapRpcChannel,
map_trait: MapTrait,
*,
serial: str,
model: str,
map_parser_config: B01MapParserConfig | None = None,
) -> None:
super().__init__()
self._map_rpc_channel = map_rpc_channel
self._map_trait = map_trait
self._serial = serial
self._model = model
self._map_parser = B01MapParser(map_parser_config)
# Map uploads are serialized per-device to avoid response cross-wiring.
self._map_command_lock = asyncio.Lock()

async def refresh(self) -> None:
"""Fetch, decode, and parse the current map payload."""
raw_payload = await self._map_trait.get_current_map_payload()
parsed = self.parse_map_content(raw_payload)
self.image_content = parsed.image_content
self.map_data = parsed.map_data
self.raw_api_response = parsed.raw_api_response

def parse_map_content(self, response: bytes) -> MapContent:
"""Parse map content from raw bytes.

This mirrors the v1 trait behavior so cached map payload bytes can be
reparsed without going back to the device.
"""Fetch, decode, and parse the current map payload.

This relies on the Map Trait already having fetched the map list metadata
so it can determine the current map_id.
"""
# Users must call first
if (map_id := self._map_trait.current_map_id) is None:
raise RoborockException("Unable to determine current map ID")

Comment on lines +73 to +79
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MapContentTrait.refresh() now fails immediately when current_map_id is None, which means callers must remember to call await map.refresh() first (a behavior change from the previous get_current_map_payload() flow). Consider auto-refreshing the map list here (e.g., call await self._map_trait.refresh() when current_map_id is None) and only raising if the map list is still empty/unusable, or make the exception message explicitly instruct the caller to refresh map metadata first.

Suggested change
This relies on the Map Trait already having fetched the map list metadata
so it can determine the current map_id.
"""
# Users must call first
if (map_id := self._map_trait.current_map_id) is None:
raise RoborockException("Unable to determine current map ID")
This primarily relies on the Map Trait having fetched the map list
metadata so it can determine the current map_id, but will fall back to
auto-refreshing the map list if needed.
"""
map_id = self._map_trait.current_map_id
if map_id is None:
# Attempt to auto-refresh map metadata so we can determine map_id.
await self._map_trait.refresh()
map_id = self._map_trait.current_map_id
if map_id is None:
raise RoborockException(
"Unable to determine current map ID even after refreshing map "
"metadata; ensure at least one map exists on the device."
)

Copilot uses AI. Check for mistakes.
request = Q7RequestMessage(
dps=B01_Q7_DPS,
command=RoborockB01Q7Methods.UPLOAD_BY_MAPID,
params={"map_id": map_id},
)
async with self._map_command_lock:
raw_payload = await self._map_rpc_channel.send_map_command(request)

try:
parsed_data = self._map_parser.parse(
response,
serial=self._serial,
model=self._model,
)
parsed_data = self._map_parser.parse(raw_payload)
except RoborockException:
raise
except Exception as ex:
Expand All @@ -91,8 +95,6 @@ def parse_map_content(self, response: bytes) -> MapContent:
if parsed_data.image_content is None:
raise RoborockException("Failed to render B01 map image")

return MapContent(
image_content=parsed_data.image_content,
map_data=parsed_data.map_data,
raw_api_response=response,
)
self.image_content = parsed_data.image_content
self.map_data = parsed_data.map_data
self.raw_api_response = raw_payload
70 changes: 8 additions & 62 deletions roborock/map/b01_map_parser.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
"""Module for parsing B01/Q7 map content.

Observed Q7 `MAP_RESPONSE` payloads follow this decode pipeline:
- base64-encoded ASCII
- AES-ECB encrypted with the derived map key
- PKCS7 padded
- ASCII hex for a zlib-compressed SCMap payload

The inner SCMap blob is parsed with protobuf messages generated from
`roborock/map/proto/b01_scmap.proto`.
"""

import base64
import binascii
import hashlib
import io
import zlib
from dataclasses import dataclass

from Crypto.Cipher import AES
from google.protobuf.message import DecodeError, Message
from google.protobuf.message import DecodeError
from PIL import Image
from vacuum_map_parser_base.config.image_config import ImageConfig
from vacuum_map_parser_base.map_data import ImageData, MapData

from roborock.exceptions import RoborockException
from roborock.map.proto.b01_scmap_pb2 import RobotMap # type: ignore[attr-defined]
from roborock.protocol import Utils

from .map_parser import ParsedMapData

Expand All @@ -46,10 +34,9 @@ class B01MapParser:
def __init__(self, config: B01MapParserConfig | None = None) -> None:
self._config = config or B01MapParserConfig()

def parse(self, raw_payload: bytes, *, serial: str, model: str) -> ParsedMapData:
"""Parse a raw MAP_RESPONSE payload and return a PNG + MapData."""
inflated = _decode_b01_map_payload(raw_payload, serial=serial, model=model)
parsed = _parse_scmap_payload(inflated)
def parse(self, payload: bytes) -> ParsedMapData:
"""Parse an inflated SCMap payload and return a PNG + MapData."""
parsed = _parse_scmap_payload(payload)
size_x, size_y, grid = _extract_grid(parsed)
room_names = _extract_room_names(parsed)

Expand Down Expand Up @@ -78,54 +65,13 @@ def parse(self, raw_payload: bytes, *, serial: str, model: str) -> ParsedMapData
)


def _derive_map_key(serial: str, model: str) -> bytes:
"""Derive the B01/Q7 map decrypt key from serial + model."""
model_suffix = model.split(".")[-1]
model_key = (model_suffix + "0" * 16)[:16].encode()
material = f"{serial}+{model_suffix}+{serial}".encode()
encrypted = Utils.encrypt_ecb(material, model_key)
md5 = hashlib.md5(base64.b64encode(encrypted), usedforsecurity=False).hexdigest()
return md5[8:24].encode()


def _decode_base64_payload(raw_payload: bytes) -> bytes:
blob = raw_payload.strip()
padded = blob + b"=" * (-len(blob) % 4)
try:
return base64.b64decode(padded, validate=True)
except binascii.Error as err:
raise RoborockException("Failed to decode B01 map payload") from err


def _decode_b01_map_payload(raw_payload: bytes, *, serial: str, model: str) -> bytes:
"""Decode raw B01 `MAP_RESPONSE` payload into inflated SCMap bytes."""
# TODO: Move this lower-level B01 transport decode under `roborock.protocols`
# so this module only handles SCMap parsing/rendering.
encrypted_payload = _decode_base64_payload(raw_payload)
if len(encrypted_payload) % AES.block_size != 0:
raise RoborockException("Unexpected encrypted B01 map payload length")

map_key = _derive_map_key(serial, model)

try:
compressed_hex = Utils.decrypt_ecb(encrypted_payload, map_key).decode("ascii")
compressed_payload = bytes.fromhex(compressed_hex)
return zlib.decompress(compressed_payload)
except (ValueError, UnicodeDecodeError, zlib.error) as err:
raise RoborockException("Failed to decode B01 map payload") from err


def _parse_proto(blob: bytes, message: Message, *, context: str) -> None:
try:
message.ParseFromString(blob)
except DecodeError as err:
raise RoborockException(f"Failed to parse {context}") from err


def _parse_scmap_payload(payload: bytes) -> RobotMap:
"""Parse inflated SCMap bytes into a generated protobuf message."""
parsed = RobotMap()
_parse_proto(payload, parsed, context="B01 SCMap")
try:
parsed.ParseFromString(payload)
except DecodeError as err:
raise RoborockException("Failed to parse B01 SCMap") from err
return parsed


Expand Down
Loading
Loading