-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathbluetooth.py
More file actions
152 lines (127 loc) · 4.69 KB
/
bluetooth.py
File metadata and controls
152 lines (127 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# nxt.backend.bluetooth module -- Bluetooth backend
# Copyright (C) 2006, 2007 Douglas P Lau
# Copyright (C) 2009 Marcus Wanner
# Copyright (C) 2021 Nicolas Schodet
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import logging
import struct
import nxt.brick
logger = logging.getLogger(__name__)
# NXT brick RFCOMM port.
PORT = 1
class BluetoothSock:
"""Bluetooth socket connected to a NXT brick."""
#: Block size.
bsize = 118
#: Connection type, used to evaluate latency.
type = "bluetooth"
def __init__(self, bluetooth, host):
self._bluetooth = bluetooth
self._host = host
self._sock = None
def __str__(self):
return f"Bluetooth ({self._host})"
def connect(self):
"""Connect to NXT brick.
:return: Connected brick.
:rtype: Brick
"""
logger.info("connecting via %s", self)
sock = self._bluetooth.BluetoothSocket(self._bluetooth.RFCOMM)
sock.connect((self._host, PORT))
self._sock = sock
return nxt.brick.Brick(self)
def close(self):
"""Close the connection."""
if self._sock is not None:
logger.info("closing %s connection", self)
self._sock.close()
self._sock = None
def send(self, data):
"""Send raw data.
:param bytes data: Data to send.
"""
data = struct.pack("<H", len(data)) + data
logger.debug("send: %s", data.hex())
self._sock.send(data)
def recv(self):
"""Receive raw data.
:return: Received data.
:rtype: bytes
"""
data = self._sock.recv(2)
logger.debug("recv: %s", data.hex())
(plen,) = struct.unpack("<H", data)
data = self._sock.recv(plen)
logger.debug("recv: %s", data.hex())
return data
class Backend:
"""Bluetooth backend."""
def __init__(self, bluetooth):
self._bluetooth = bluetooth
def find(self, host=None, name=None, **kwargs):
"""Find bricks connected using Bluetooth.
:param host: Bluetooth address (example: ``"00:16:53:01:02:03"``).
:type host: str or None
:param name: Brick name (example: ``"NXT"``).
:type name: str or None
:param kwargs: Other parameters are ignored.
:return: Iterator over all found bricks.
:rtype: Iterator[Brick]
"""
if host is not None:
name = None
lookup_names = False
discovered = [host]
else:
lookup_names = name is not None
try:
discovered = self._bluetooth.discover_devices(lookup_names=lookup_names)
except self._bluetooth.BluetoothError as err:
logger.warning("failed to discover Bluetooth devices: %s", err)
logger.debug("error from discover_devices", exc_info=True)
return None
except OSError as err:
logger.warning("failed to discover Bluetooth devices: %s", err)
logger.debug("error from discover_devices", exc_info=True)
return None
for dev in discovered:
if lookup_names:
devhost, devname = dev
else:
devhost, devname = dev, None
if (host is None or devhost == host) and (name is None or devname == name):
if type(devhost) is bytes:
devhost = devhost.decode('utf8')
sock = BluetoothSock(self._bluetooth, devhost)
try:
brick = sock.connect()
except self._bluetooth.BluetoothError:
logger.warning("failed to connect to device %s", sock)
logger.debug("error from connect", exc_info=True)
else:
yield brick
def get_backend():
"""Get an instance of the Bluetooth backend if available.
:return: Bluetooth backend.
:rtype: Backend or None
"""
try:
import bluetooth
except ImportError:
logger.info("no bluetooth module")
return None
except Exception:
logger.info("platform is not supported by bluetooth module")
logger.debug("error from import", exc_info=True)
return None
return Backend(bluetooth)