-
Notifications
You must be signed in to change notification settings - Fork 352
Expand file tree
/
Copy path__init__.py
More file actions
152 lines (121 loc) · 4.73 KB
/
__init__.py
File metadata and controls
152 lines (121 loc) · 4.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import logging
from canopen import node
from canopen.pdo.base import PdoBase, PdoMap, PdoMaps, PdoVariable
from canopen.sdo import SdoAbortedError
__all__ = [
"PdoBase",
"PdoMap",
"PdoMaps",
"PdoVariable",
"PDO",
"RPDO",
"TPDO",
]
logger = logging.getLogger(__name__)
class PDO(PdoBase):
"""PDO Class for backwards compatibility.
:param rpdo: RPDO object holding the Receive PDO mappings
:param tpdo: TPDO object holding the Transmit PDO mappings
"""
def __init__(self, node, rpdo, tpdo):
super(PDO, self).__init__(node)
self.rx = rpdo.map
self.tx = tpdo.map
self.map = {}
# the object 0x1A00 equals to key '1' so we remove 1 from the key
for key, value in self.rx.items():
self.map[0x1A00 + (key - 1)] = value
for key, value in self.tx.items():
self.map[0x1600 + (key - 1)] = value
class RPDO(PdoBase):
"""Receive PDO to transfer data from somewhere to the represented node.
Properties 0x1400 to 0x1403 | Mapping 0x1600 to 0x1603.
:param object node: Parent node for this object.
"""
def __init__(self, node):
super(RPDO, self).__init__(node)
self.map = PdoMaps(0x1400, 0x1600, self, 0x200)
logger.debug('RPDO Map as %d', len(self.map))
def stop(self):
"""Stop transmission of all RPDOs.
:raise TypeError: Exception is thrown if the node associated with the PDO does not
support this function.
"""
if isinstance(self.node, node.RemoteNode):
for pdo in self.map.values():
pdo.stop()
else:
raise TypeError('The node type does not support this function.')
class TPDO(PdoBase):
"""Transmit PDO to broadcast data from the represented node to the network.
Properties 0x1800 to 0x1803 | Mapping 0x1A00 to 0x1A03.
:param object node: Parent node for this object.
"""
def __init__(self, node):
super(TPDO, self).__init__(node)
self.map = PdoMaps(0x1800, 0x1A00, self, 0x180)
logger.debug('TPDO Map as %d', len(self.map))
def start(self, period: float):
"""Start transmission of all TPDOs.
:param float period: Transmission period in seconds.
:raises TypeError: Exception is thrown if the node associated with the PDO does not
support this function.
"""
if isinstance(self.node, node.LocalNode):
for pdo in self.map.values():
pdo.start(period)
else:
raise TypeError("The node type does not support this function.")
def pack_data(self, data: bytearray, variable: PdoVariable):
"""Pack new data into data array if new data is available.
:param list data: List of data to pack.
:param PdoVariable variable: Variable to pack.
"""
length_bytes = variable.length // 8
offset_bytes = variable.offset // 8
if length_bytes > 8:
raise ValueError("Data length is greater than 64 bits.")
if length_bytes == 0:
return data
if (
self.node.data_store.get(variable.index, {}).get(variable.subindex)
is not None
):
try:
variable_data = self.node.get_data(variable.index, variable.subindex)
# reverse list to be able to be transmitted properly
sanitized_data = variable_data[::-1]
# pack new data into data array
for i in range(length_bytes):
data[offset_bytes + i] = sanitized_data[i]
except SdoAbortedError:
logger.warning(
"Failed to get data from index: 0x%X, subindex: 0x%X",
variable.index,
variable.subindex,
)
return data
def update(self):
"""Update the data of all TPDOs.
:raises TypeError: Exception is thrown if the node associated with the PDO does not
support this function.
"""
if isinstance(self.node, node.LocalNode):
for pdo in self.map.values():
for variable in pdo:
self.pack_data(pdo.data, variable)
pdo.update()
else:
raise TypeError("The node type does not support this function.")
def stop(self):
"""Stop transmission of all TPDOs.
:raise TypeError: Exception is thrown if the node associated with the PDO does not
support this function.
"""
if isinstance(self.node, node.LocalNode):
for pdo in self.map.values():
pdo.stop()
else:
raise TypeError('The node type does not support this function.')
# Compatibility
Variable = PdoVariable