-
Notifications
You must be signed in to change notification settings - Fork 350
Expand file tree
/
Copy pathlocal.py
More file actions
157 lines (131 loc) · 5.22 KB
/
local.py
File metadata and controls
157 lines (131 loc) · 5.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from __future__ import annotations
import logging
from typing import Dict, Union
import canopen.network
from canopen import objectdictionary
from canopen.emcy import EmcyProducer
from canopen.nmt import NmtSlave
from canopen.node.base import BaseNode
from canopen.objectdictionary import ObjectDictionary
from canopen.pdo import PDO, RPDO, TPDO
from canopen.sdo import SdoAbortedError, SdoServer
logger = logging.getLogger(__name__)
class LocalNode(BaseNode):
def __init__(
self,
node_id: int,
object_dictionary: Union[ObjectDictionary, str],
):
super(LocalNode, self).__init__(node_id, object_dictionary)
self.data_store: Dict[int, Dict[int, bytes]] = {}
self._read_callbacks = []
self._write_callbacks = []
self.sdo = SdoServer(0x600 + self.id, 0x580 + self.id, self)
self.tpdo = TPDO(self)
self.rpdo = RPDO(self)
self.pdo = PDO(self, self.rpdo, self.tpdo)
self.nmt = NmtSlave(self.id, self)
# Let self.nmt handle writes for 0x1017
self.add_write_callback(self.nmt.on_write)
self.emcy = EmcyProducer(0x80 + self.id)
def associate_network(self, network: canopen.network.Network):
self.network = network
self.sdo.network = network
self.tpdo.network = network
self.rpdo.network = network
self.nmt.network = network
self.emcy.network = network
network.subscribe(self.sdo.rx_cobid, self.sdo.on_request)
network.subscribe(0, self.nmt.on_command)
def remove_network(self) -> None:
self.network.unsubscribe(self.sdo.rx_cobid, self.sdo.on_request)
self.network.unsubscribe(0, self.nmt.on_command)
self.stop_pdo_services()
self.network = canopen.network._UNINITIALIZED_NETWORK
self.sdo.network = canopen.network._UNINITIALIZED_NETWORK
self.tpdo.network = canopen.network._UNINITIALIZED_NETWORK
self.rpdo.network = canopen.network._UNINITIALIZED_NETWORK
self.nmt.network = canopen.network._UNINITIALIZED_NETWORK
self.emcy.network = canopen.network._UNINITIALIZED_NETWORK
def add_read_callback(self, callback):
self._read_callbacks.append(callback)
def add_write_callback(self, callback):
self._write_callbacks.append(callback)
def start_pdo_services(
self, period: float, enable_rpdo: bool = True, enable_tpdo: bool = True
):
"""
Start the PDO related services of the node.
:param period: Service interval in seconds.
:param enable_rpdo: Enable RPDO service.
:param enable_tpdo: Enable TPDO service.
"""
if enable_rpdo:
self.rpdo.read(from_od=True, subscribe=True)
if enable_tpdo:
self.tpdo.read(from_od=True, subscribe=False)
self.tpdo.start(period=period)
def stop_pdo_services(self):
"""
Stop the PDO related services of the node.
"""
self.rpdo.unsubscribe()
self.tpdo.stop()
def get_data(
self, index: int, subindex: int, check_readable: bool = False
) -> bytes:
obj = self._find_object(index, subindex)
if check_readable and not obj.readable:
raise SdoAbortedError(0x06010001)
# Try callback
for callback in self._read_callbacks:
result = callback(index=index, subindex=subindex, od=obj)
if result is not None:
return obj.encode_raw(result)
# Try stored data
try:
return self.data_store[index][subindex]
except KeyError:
# Try ParameterValue in EDS
if obj.value is not None:
return obj.encode_raw(obj.value)
# Try default value
if obj.default is not None:
return obj.encode_raw(obj.default)
# Resource not available
logger.info("Resource unavailable for 0x%04X:%02X", index, subindex)
raise SdoAbortedError(0x060A0023)
def set_data(
self,
index: int,
subindex: int,
data: bytes,
check_writable: bool = False,
) -> None:
obj = self._find_object(index, subindex)
if check_writable and not obj.writable:
raise SdoAbortedError(0x06010002)
# Check length matches type (length of od variable is in bits)
if obj.data_type in objectdictionary.NUMBER_TYPES and (
not 8 * len(data) == len(obj)
):
raise SdoAbortedError(0x06070010)
# Try callbacks
for callback in self._write_callbacks:
callback(index=index, subindex=subindex, od=obj, data=data)
# Store data
self.data_store.setdefault(index, {})
self.data_store[index][subindex] = bytes(data)
self.tpdo.update()
def _find_object(self, index, subindex):
if index not in self.object_dictionary:
# Index does not exist
raise SdoAbortedError(0x06020000)
obj = self.object_dictionary[index]
if not isinstance(obj, objectdictionary.ODVariable):
# Group or array
if subindex not in obj:
# Subindex does not exist
raise SdoAbortedError(0x06090011)
obj = obj[subindex]
return obj