diff --git a/canopen/objectdictionary/__init__.py b/canopen/objectdictionary/__init__.py index 40ab8fbf..4ef21c1b 100644 --- a/canopen/objectdictionary/__init__.py +++ b/canopen/objectdictionary/__init__.py @@ -209,6 +209,8 @@ def __init__(self, name: str, index: int): self.storage_location: Optional[str] = None self.subindices: dict[int, ODVariable] = {} self.names: dict[str, ODVariable] = {} + #: Key-Value pairs not defined by the standard + self.custom_options: dict[str, str] = {} def __repr__(self) -> str: return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>" @@ -270,6 +272,8 @@ def __init__(self, name: str, index: int): self.storage_location: Optional[str] = None self.subindices: dict[int, ODVariable] = {} self.names: dict[str, ODVariable] = {} + #: Key-Value pairs not defined by the standard + self.custom_options: dict[str, str] = {} def __repr__(self) -> str: return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>" @@ -290,6 +294,8 @@ def __getitem__(self, subindex: Union[int, str]) -> ODVariable: "bit_definitions", "storage_location"): if attr in template.__dict__: var.__dict__[attr] = template.__dict__[attr] + if "custom_options" in template.__dict__: + var.__dict__["custom_options"] = template.__dict__["custom_options"].copy() else: raise KeyError(f"Could not find subindex {pretty_index(None, subindex)}") return var @@ -380,6 +386,8 @@ def __init__(self, name: str, index: int, subindex: int = 0): self.storage_location: Optional[str] = None #: Can this variable be mapped to a PDO self.pdo_mappable = False + #: Key-Value pairs not defined by the standard + self.custom_options: dict[str, str] = {} def __repr__(self) -> str: subindex = self.subindex if isinstance(self.parent, (ODRecord, ODArray)) else None diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index d47a3019..959b7f45 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -22,6 +22,7 @@ logger = logging.getLogger(__name__) + def import_eds(source, node_id): eds = RawConfigParser(inline_comment_prefixes=(';',)) eds.optionxform = str @@ -133,20 +134,22 @@ def import_eds(source, node_id): od.add_object(var) elif object_type == objectcodes.ARRAY and eds.has_option(section, "CompactSubObj"): arr = ODArray(name, index) - last_subindex = ODVariable( - "Number of entries", index, 0) + last_subindex = ODVariable("Number of entries", index, 0) last_subindex.data_type = datatypes.UNSIGNED8 arr.add_member(last_subindex) arr.add_member(build_variable(eds, section, node_id, object_type, index, 1)) arr.storage_location = storage_location + arr.custom_options = _get_custom_options(eds, section) od.add_object(arr) elif object_type == objectcodes.ARRAY: arr = ODArray(name, index) arr.storage_location = storage_location + arr.custom_options = _get_custom_options(eds, section) od.add_object(arr) elif object_type == objectcodes.RECORD: record = ODRecord(name, index) record.storage_location = storage_location + record.custom_options = _get_custom_options(eds, section) od.add_object(record) continue @@ -257,6 +260,25 @@ def _revert_variable(var_type, value): else: return f"0x{value:02X}" +_STANDARD_OPTIONS = { + "ObjectType", "ParameterName", "DataType", "AccessType", + "PDOMapping", "LowLimit", "HighLimit", "DefaultValue", + "ParameterValue", "Factor", "Description", "Unit", + "StorageLocation", "CompactSubObj", + # CiA 306 fields parsed explicitly: + "SubNumber", + # ObjFlags and Denotation are intentionally absent: they are not yet + # parsed by this codebase, so they flow through custom_options and + # survive round-trips. Proper first-class support is tracked in #654. +} + +def _get_custom_options(eds, section): + custom_options = {} + for option, value in eds.items(section): + if option not in _STANDARD_OPTIONS: + custom_options[option] = value + return custom_options + def build_variable( eds: RawConfigParser, @@ -350,6 +372,8 @@ def build_variable( var.unit = eds.get(section, "Unit") except ValueError: pass + + var.custom_options = _get_custom_options(eds, section) return var @@ -359,6 +383,8 @@ def copy_variable(eds, section, subindex, src_var): # It is only the name and subindex that varies var.name = name var.subindex = subindex + # Give the copy its own custom_options dict to avoid shared-state mutations + var.custom_options = src_var.custom_options.copy() return var @@ -425,12 +451,19 @@ def export_variable(var, eds): if getattr(var, 'unit', '') != '': eds.set(section, "Unit", var.unit) + for option, value in var.custom_options.items(): + if option not in _STANDARD_OPTIONS: + eds.set(section, option, str(value)) + def export_record(var, eds): section = f"{var.index:04X}" export_common(var, eds, section) eds.set(section, "SubNumber", f"0x{len(var.subindices):X}") ot = objectcodes.RECORD if isinstance(var, ODRecord) else objectcodes.ARRAY eds.set(section, "ObjectType", f"0x{ot:X}") + for option, value in var.custom_options.items(): + if option not in _STANDARD_OPTIONS: + eds.set(section, option, str(value)) for i in var: export_variable(var[i], eds) diff --git a/test/sample.eds b/test/sample.eds index ad00a12e..3e5e3bbc 100644 --- a/test/sample.eds +++ b/test/sample.eds @@ -1018,6 +1018,36 @@ Factor=ERROR Description= Unit= +[3061] +ParameterName=Object with custom options +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +PDOMapping=0 +Category=Motor +Offset=100 + +[3062] +ParameterName=Record with custom options +SubNumber=0x2 +ObjectType=0x9 +RecordTag=vendor_specific + +[3062sub0] +ParameterName=Highest subindex +ObjectType=0x7 +DataType=0x0005 +AccessType=ro +DefaultValue=0x01 +PDOMapping=0 + +[3062sub1] +ParameterName=Value +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +PDOMapping=0 + [3063] ParameterName=DOMAIN object ObjectType=0x2 diff --git a/test/test_eds.py b/test/test_eds.py index 7a19ffeb..295c3c26 100644 --- a/test/test_eds.py +++ b/test/test_eds.py @@ -251,6 +251,54 @@ def test_roundtrip_domain_objects(self): self.assertTrue(od2[0x3063].is_domain) self.assertTrue(od2[0x3064][1].is_domain) + def test_reading_custom_options(self): + # custom options (unknown EDS keys) are collected in custom_options dict + var = self.od[0x3061] + self.assertIsInstance(var, canopen.objectdictionary.ODVariable) + self.assertEqual(var.custom_options, {'Category': 'Motor', 'Offset': '100'}) + + def test_custom_options_standard_keys_excluded(self): + # Standard CiA 306 keys must NOT appear in custom_options + var = self.od[0x3061] + for key in ('ParameterName', 'ObjectType', 'DataType', 'AccessType', 'PDOMapping'): + self.assertNotIn(key, var.custom_options, + f"Standard key {key!r} must not be in custom_options") + + def test_custom_options_empty_for_standard_object(self): + # Objects without extra keys must have an empty custom_options dict + var = self.od['Producer heartbeat time'] + self.assertEqual(var.custom_options, {}) + + def test_custom_options_record(self): + # custom_options is read for ODRecord container objects too + record = self.od[0x3062] + self.assertIsInstance(record, canopen.objectdictionary.ODRecord) + self.assertEqual(record.custom_options, {'RecordTag': 'vendor_specific'}) + # sub-entries without extra keys have empty custom_options + self.assertEqual(record[1].custom_options, {}) + + def test_roundtrip_custom_options(self): + # custom_options survive an EDS export/import round-trip + import io + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3061].custom_options, {'Category': 'Motor', 'Offset': '100'}) + self.assertEqual(od2[0x3062].custom_options, {'RecordTag': 'vendor_specific'}) + + def test_roundtrip_custom_options_not_duplicated_as_standard(self): + # After round-trip the re-imported object must not contain standard keys + import io + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + for key in ('ParameterName', 'ObjectType', 'DataType', 'AccessType', 'PDOMapping'): + self.assertNotIn(key, od2[0x3061].custom_options) + def test_comments(self): self.assertEqual(self.od.comments,