Skip to content

Commit 47656be

Browse files
committed
xdd: Implement option to load object dictionary from XDD file.
Signed-off-by: Taras Zaporozhets <zaporozhets.taras@gmail.com>
1 parent 9685880 commit 47656be

File tree

5 files changed

+1914
-2
lines changed

5 files changed

+1914
-2
lines changed

canopen/objectdictionary/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def import_od(
7676
source: Union[str, TextIO, None],
7777
node_id: Optional[int] = None,
7878
) -> ObjectDictionary:
79-
"""Parse an EDS, DCF, or EPF file.
79+
"""Parse an EDS, DCF, EPF or XDD file.
8080
8181
:param source:
8282
The path to object dictionary file, a file like object, or an EPF XML tree.
@@ -106,9 +106,12 @@ def import_od(
106106
elif suffix == ".epf":
107107
from canopen.objectdictionary import epf
108108
return epf.import_epf(source)
109+
elif suffix == ".xdd":
110+
from canopen.objectdictionary import xdd
111+
return xdd.import_xdd(source, node_id)
109112
else:
110113
doc_type = suffix[1:]
111-
allowed = ", ".join(["eds", "dcf", "epf"])
114+
allowed = ", ".join(["eds", "dcf", "epf", "xdd"])
112115
raise ValueError(
113116
f"Cannot import from the {doc_type!r} format; "
114117
f"supported formats: {allowed}"

canopen/objectdictionary/xdd.py

Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
import functools
2+
import logging
3+
import os
4+
import re
5+
import xml.etree.ElementTree as etree
6+
from typing import Union, Optional
7+
from canopen.objectdictionary import (
8+
ODArray,
9+
ODRecord,
10+
ODVariable,
11+
ObjectDictionary,
12+
datatypes,
13+
objectcodes,
14+
)
15+
16+
logger = logging.getLogger(__name__)
17+
18+
autoint = functools.partial(int, base=0)
19+
hex = functools.partial(int, base=16)
20+
21+
def import_xdd(
22+
xdd: Union[etree.Element, str, bytes, os.PathLike],
23+
node_id: Optional[int],
24+
) -> ObjectDictionary:
25+
od = ObjectDictionary()
26+
if etree.iselement(xdd):
27+
root = xdd
28+
else:
29+
root = etree.parse(xdd).getroot()
30+
31+
if node_id is None:
32+
device_commissioning = root.find('.//{*}DeviceCommissioning')
33+
if device_commissioning is not None:
34+
od.node_id = int(device_commissioning['nodeID'], 0)
35+
else:
36+
od.node_id = None
37+
else:
38+
od.node_id = node_id
39+
40+
_add_device_information(od, root)
41+
_add_object_list(od, root)
42+
_add_dummy_objects(od, root)
43+
return od
44+
45+
46+
def _add_device_information(
47+
od: ObjectDictionary,
48+
root: etree.Element
49+
):
50+
device_identity = root.find('.//{*}DeviceIdentity')
51+
if device_identity is not None:
52+
for src_prop, dst_prop, f in [
53+
("vendorName", "vendor_name", str),
54+
("vendorID", "vendor_number", hex),
55+
("productName", "product_name", str),
56+
("productID", "product_number", hex),
57+
]:
58+
val = device_identity.find(f'{{*}}{src_prop}')
59+
if val is not None and val.text:
60+
setattr(od.device_information, dst_prop, f(val.text))
61+
62+
general_features = root.find('.//{*}CANopenGeneralFeatures')
63+
if general_features is not None:
64+
for src_prop, dst_prop, f, default in [
65+
# properties without default value (default=None) are required
66+
("granularity", "granularity", autoint, None),
67+
("nrOfRxPDO", "nr_of_RXPDO", autoint, "0"),
68+
("nrOfTxPDO", "nr_of_TXPDO", autoint, "0"),
69+
("bootUpSlave", "simple_boot_up_slave", bool, 0),
70+
]:
71+
val = general_features.get(src_prop, default)
72+
if val is None:
73+
raise ValueError(f"Missing required '{src_prop}' property in XDD file")
74+
setattr(od.device_information, dst_prop, f(val))
75+
76+
77+
baud_rate = root.find('.//{*}PhysicalLayer/{*}baudRate')
78+
for baud in baud_rate:
79+
try:
80+
rate = int(baud.get("value").replace(' Kbps', ''), 10) * 1000
81+
od.device_information.allowed_baudrates.add(rate)
82+
except (ValueError, TypeError):
83+
pass
84+
85+
if default_baud := baud_rate.get('defaultValue', None):
86+
try:
87+
od.bitrate = int(default_baud.replace(' Kbps', ''), 10) * 1000
88+
except (ValueError, TypeError):
89+
pass
90+
91+
92+
def _add_object_list(
93+
od: ObjectDictionary,
94+
root: etree.Element
95+
):
96+
# Process all CANopen objects in the file
97+
for obj in root.findall('.//{*}CANopenObjectList/{*}CANopenObject'):
98+
name = obj.get('name', '')
99+
index = int(obj.get('index', '0'), 16)
100+
object_type = int(obj.get('objectType', '0'))
101+
sub_number = obj.get('subNumber')
102+
103+
# Simple variable
104+
if object_type == objectcodes.VAR:
105+
unique_id_ref = obj.get('uniqueIDRef', None)
106+
parameters = root.find(f'.//{{*}}parameter[@uniqueID="{unique_id_ref}"]')
107+
108+
var = _build_variable(parameters, od.node_id, name, index)
109+
_set_parameters_from_xdd_canopen_object(od.node_id, var, obj)
110+
od.add_object(var)
111+
112+
# Array
113+
elif object_type == objectcodes.ARRAY and sub_number:
114+
array = ODArray(name, index)
115+
for sub_obj in obj:
116+
sub_name = sub_obj.get('name', '')
117+
sub_index = int(sub_obj.get('subIndex'), 16)
118+
sub_unique_id = sub_obj.get('uniqueIDRef', None)
119+
sub_parameters = root.find(f'.//{{*}}parameter[@uniqueID="{sub_unique_id}"]')
120+
121+
sub_var = _build_variable(sub_parameters, od.node_id, sub_name, index, sub_index)
122+
_set_parameters_from_xdd_canopen_object(od.node_id, sub_var, sub_obj)
123+
array.add_member(sub_var)
124+
od.add_object(array)
125+
126+
# Record/Struct
127+
elif object_type == objectcodes.RECORD and sub_number:
128+
record = ODRecord(name, index)
129+
for sub_obj in obj:
130+
sub_name = sub_obj.get('name', '')
131+
sub_index = int(sub_obj.get('subIndex'), 16)
132+
sub_unique_id = sub_obj.get('uniqueIDRef', None)
133+
sub_parameters = root.find(f'.//{{*}}parameter[@uniqueID="{sub_unique_id}"]')
134+
sub_var = _build_variable(sub_parameters, od.node_id, sub_name, index, sub_index)
135+
_set_parameters_from_xdd_canopen_object(od.node_id, sub_var, sub_obj)
136+
record.add_member(sub_var)
137+
od.add_object(record)
138+
139+
140+
def _add_dummy_objects(
141+
od: ObjectDictionary,
142+
root: etree.Element
143+
):
144+
dummy_section = root.find('.//{*}ApplicationLayers/{*}dummyUsage')
145+
for dummy in dummy_section:
146+
p = dummy.get('entry').split('=')
147+
key = p[0]
148+
value = int(p[1], 10)
149+
index = int(key.replace('Dummy', ''), 10)
150+
if value == 1:
151+
var = ODVariable(key, index, 0)
152+
var.data_type = index
153+
var.access_type = "const"
154+
od.add_object(var)
155+
156+
157+
def _set_parameters_from_xdd_canopen_object(
158+
node_id: Optional[int],
159+
dst: ODVariable,
160+
src: etree.Element
161+
):
162+
# PDO mapping of the object, optional, string
163+
# Valid values:
164+
# * no - not mappable
165+
# * default - mapped by default
166+
# * optional - optionally mapped
167+
# * TPDO - may be mapped into TPDO only
168+
# * RPDO - may be mapped into RPDO only
169+
pdo_mapping = src.get('PDOmapping', 'no')
170+
dst.pdo_mappable = pdo_mapping != 'no'
171+
172+
# Name of the object, optional, string
173+
if var_name := src.get('name', None):
174+
dst.name = var_name
175+
176+
# CANopen data type (two hex digits), optional
177+
# data_type matches canopen library, no conversion needed
178+
if var_data_type := src.get('dataType', None):
179+
try:
180+
dst.data_type = int(var_data_type, 16)
181+
except (ValueError, TypeError):
182+
pass
183+
184+
# Access type of the object; valid values, optional, string
185+
# * const - read access only; the value is not changing
186+
# * ro - read access only
187+
# * wo - write access only
188+
# * rw - both read and write access
189+
# strings match with access_type in canopen library, no conversion needed
190+
if access_type := src.get('accessType', None):
191+
dst.access_type = access_type
192+
193+
# Low limit of the parameter value, optional, string
194+
if min_value := src.get('lowLimit', None):
195+
try:
196+
dst.min = _convert_variable(node_id, dst.data_type, min_value)
197+
except (ValueError, TypeError):
198+
pass
199+
200+
# High limit of the parameter value, optional, string
201+
if max_value := src.get('highLimit', None):
202+
try:
203+
dst.max = _convert_variable(node_id, dst.data_type, max_value)
204+
except (ValueError, TypeError):
205+
pass
206+
207+
# Default value of the object, optional, string
208+
if default_value := src.get('defaultValue', None):
209+
try:
210+
dst.default_raw = default_value
211+
if '$NODEID' in dst.default_raw:
212+
dst.relative = True
213+
dst.default = _convert_variable(node_id, dst.data_type, dst.default_raw)
214+
except (ValueError, TypeError):
215+
pass
216+
217+
218+
def _build_variable(
219+
par_tree: Optional[etree.Element],
220+
node_id: Optional[int],
221+
name: str,
222+
index: int,
223+
subindex: int = 0
224+
) -> ODVariable:
225+
var = ODVariable(name, index, subindex)
226+
# Set default parameters
227+
var.default_raw = None
228+
var.access_type = 'ro'
229+
if par_tree is None:
230+
return var
231+
232+
var.description = par_tree.get('description', '')
233+
234+
# Extract data type
235+
data_types = {
236+
'BOOL': datatypes.BOOLEAN,
237+
'SINT': datatypes.INTEGER8,
238+
'INT': datatypes.INTEGER16,
239+
'DINT': datatypes.INTEGER32,
240+
'LINT': datatypes.INTEGER64,
241+
'USINT': datatypes.UNSIGNED8,
242+
'UINT': datatypes.UNSIGNED16,
243+
'UDINT': datatypes.UNSIGNED32,
244+
'ULINT': datatypes.UNSIGNED32,
245+
'REAL': datatypes.REAL32,
246+
'LREAL': datatypes.REAL64,
247+
'STRING': datatypes.VISIBLE_STRING,
248+
'BITSTRING': datatypes.DOMAIN,
249+
'WSTRING': datatypes.UNICODE_STRING
250+
}
251+
252+
for k, v in data_types.items():
253+
if par_tree.find(f'{{*}}{k}') is not None:
254+
var.data_type = v
255+
256+
# Extract access type
257+
if access_type_str := par_tree.get('access', None):
258+
# Defines which operations are valid for the parameter:
259+
# * const - read access only; the value is not changing
260+
# * read - read access only (default value)
261+
# * write - write access only
262+
# * readWrite - both read and write access
263+
# * readWriteInput - both read and write access, but represents process input data
264+
# * readWriteOutput - both read and write access, but represents process output data
265+
# * noAccess - access denied
266+
access_types = {
267+
'const': 'const',
268+
'read': 'ro',
269+
'write': 'wo',
270+
'readWrite': 'rw',
271+
'readWriteInput': 'rw',
272+
'readWriteOutput': 'rw',
273+
'noAccess': 'const',
274+
}
275+
var.access_type = access_types.get(access_type_str)
276+
277+
# Extract default value
278+
default_value = par_tree.find('{*}defaultValue')
279+
if default_value is not None:
280+
try:
281+
var.default_raw = default_value.get('value')
282+
if '$NODEID' in var.default_raw:
283+
var.relative = True
284+
var.default = _convert_variable(node_id, var.data_type, var.default_raw)
285+
except (ValueError, TypeError):
286+
pass
287+
288+
# Extract allowed values range
289+
min_value = par_tree.find('{*}allowedValues/{*}range/{*}minValue')
290+
if min_value is not None:
291+
try:
292+
var.min = _convert_variable(node_id, var.data_type, min_value.get('value'))
293+
except (ValueError, TypeError):
294+
pass
295+
296+
max_value = par_tree.find('{*}allowedValues/{*}range/{*}maxValue')
297+
if max_value is not None:
298+
try:
299+
var.max = _convert_variable(node_id, var.data_type, max_value.get('value'))
300+
except (ValueError, TypeError):
301+
pass
302+
return var
303+
304+
305+
def _calc_bit_length(
306+
data_type: int
307+
) -> int:
308+
if data_type == datatypes.INTEGER8:
309+
return 8
310+
elif data_type == datatypes.INTEGER16:
311+
return 16
312+
elif data_type == datatypes.INTEGER32:
313+
return 32
314+
elif data_type == datatypes.INTEGER64:
315+
return 64
316+
else:
317+
raise ValueError(f"Invalid data_type '{data_type}', expecting a signed integer data_type.")
318+
319+
320+
def _signed_int_from_hex(
321+
hex_str: str,
322+
bit_length: int
323+
) -> int:
324+
number = int(hex_str, 0)
325+
max_value = (1 << (bit_length - 1)) - 1
326+
327+
if number > max_value:
328+
return number - (1 << bit_length)
329+
else:
330+
return number
331+
332+
333+
def _convert_variable(
334+
node_id: Optional[int],
335+
var_type: int,
336+
value: str
337+
) -> Optional[Union[bytes, str, float, int]]:
338+
if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN):
339+
return bytes.fromhex(value)
340+
elif var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING):
341+
return str(value)
342+
elif var_type in datatypes.FLOAT_TYPES:
343+
return float(value)
344+
else:
345+
# COB-ID can contain '$NODEID+' so replace this with node_id before converting
346+
value = value.replace(" ", "").upper()
347+
if '$NODEID' in value:
348+
if node_id is None:
349+
logger.warn("Cannot convert value with $NODEID, skipping conversion")
350+
return None
351+
else:
352+
return int(re.sub(r'\+?\$NODEID\+?', '', value), 0) + node_id
353+
else:
354+
if var_type in datatypes.SIGNED_TYPES:
355+
return _signed_int_from_hex(value, _calc_bit_length(var_type))
356+
else:
357+
return int(value, 0)

0 commit comments

Comments
 (0)