Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions examples/somersaultlazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def create_isotp_socket(channel: str | None, rxid: int, txid: int) -> isotp.tpso
return result_socket


async def ecu_send(isotp_socket: isotp.tpsock.socket | None, payload: bytes) -> None:
async def ecu_send(isotp_socket: isotp.tpsock.socket | None, payload: bytes | bytearray) -> None:
"""
ECU sends a message, either in "sterile" or in "live" mode.
"""
Expand All @@ -77,7 +77,7 @@ async def ecu_send(isotp_socket: isotp.tpsock.socket | None, payload: bytes) ->
assert sterile_rx_tester_event is not None

sterile_rx_tester_event.clear()
sterile_rx_tester.append(payload)
sterile_rx_tester.append(bytes(payload))
sterile_rx_tester_event.set()
else:
assert isotp_socket is not None
Expand Down Expand Up @@ -112,7 +112,7 @@ async def ecu_recv(isotp_socket: isotp.tpsock.socket | None) -> bytes:
return await loop.sock_recv(cast(socket, isotp_socket), 4095)


async def tester_send(isotp_socket: isotp.tpsock.socket | None, payload: bytes) -> None:
async def tester_send(isotp_socket: isotp.tpsock.socket | None, payload: bytes | bytearray) -> None:
"""
Tester sends a message, either in "sterile" or in "live" mode.
"""
Expand All @@ -123,7 +123,7 @@ async def tester_send(isotp_socket: isotp.tpsock.socket | None, payload: bytes)
assert isotp_socket is None
assert sterile_rx_ecu_event is not None

sterile_rx_ecu.append(payload)
sterile_rx_ecu.append(bytes(payload))
sterile_rx_ecu_event.set()
else:
assert isotp_socket is not None
Expand Down Expand Up @@ -360,7 +360,7 @@ async def _auto_close_session_task(self) -> None:


async def tester_await_response(isotp_socket: isotp.tpsock.socket | None,
raw_message: bytes,
raw_message: bytes | bytearray,
timeout: float = 0.5) -> bytes | ParameterValueDict:
# await the answer from the server (be aware that the maximum
# length of ISO-TP telegrams over the CAN bus is 4095 bytes)
Expand Down
6 changes: 4 additions & 2 deletions odxtools/cli/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,10 @@ def encode_message_from_string_values(
print(f"The value specified for parameter {inner_param_sn} is not a string")
continue

typed_dict[inner_param_sn] = _convert_string_to_odx_type(
inner_param_value, inner_param.physical_type.base_data_type)
if isinstance(inner_param,
ParameterWithDOP) and inner_param.physical_type is not None:
typed_dict[inner_param_sn] = _convert_string_to_odx_type(
inner_param_value, inner_param.physical_type.base_data_type)
parameter_values[parameter.short_name] = typed_dict
else:
if not isinstance(parameter_value, str):
Expand Down
6 changes: 3 additions & 3 deletions odxtools/cli/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,18 +425,18 @@ def compare_diagnostic_layers(self, dl1: DiagLayer, dl2: DiagLayer) -> ServiceDi
changed_parameters_of_service=services_with_param_changes)
dl1_service_names = [service.short_name for service in dl1.services]

dl1_request_prefixes: list[bytes | None] = [
dl1_request_prefixes: list[bytes | bytearray | None] = [
None if s.request is None else s.request.coded_const_prefix() for s in dl1.services
]
dl2_request_prefixes: list[bytes | None] = [
dl2_request_prefixes: list[bytes | bytearray | None] = [
None if s.request is None else s.request.coded_const_prefix() for s in dl2.services
]

# compare diagnostic services
for service1 in dl1.services:

# check for added diagnostic services
rq_prefix: bytes
rq_prefix: bytes | bytearray
if service1.request is not None:
rq_prefix = service1.request.coded_const_prefix()

Expand Down
2 changes: 1 addition & 1 deletion odxtools/compositecodec.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def composite_codec_get_free_parameters(codec: CompositeCodec) -> list[Parameter


def composite_codec_get_coded_const_prefix(codec: CompositeCodec,
request_prefix: bytes = b'') -> bytes:
request_prefix: bytes = b'') -> bytearray:
encode_state = EncodeState(coded_message=bytearray(), triggering_request=request_prefix)

for param in codec.parameters:
Expand Down
70 changes: 70 additions & 0 deletions odxtools/configdata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass, field
from typing import Any
from xml.etree import ElementTree

from .configrecord import ConfigRecord
from .element import NamedElement
from .nameditemlist import NamedItemList
from .odxdoccontext import OdxDocContext
from .odxlink import OdxLinkDatabase, OdxLinkId
from .snrefcontext import SnRefContext
from .specialdatagroup import SpecialDataGroup
from .utils import dataclass_fields_asdict
from .validbasevariant import ValidBaseVariant


@dataclass(kw_only=True)
class ConfigData(NamedElement):
"""This class represents a CONFIG-DATA."""
valid_base_variants: list[ValidBaseVariant]
config_records: NamedItemList[ConfigRecord]
sdgs: list[SpecialDataGroup] = field(default_factory=list)

@staticmethod
def from_et(et_element: ElementTree.Element, context: OdxDocContext) -> "ConfigData":
kwargs = dataclass_fields_asdict(NamedElement.from_et(et_element, context))

valid_base_variants = [
ValidBaseVariant.from_et(vbv_elem, context)
for vbv_elem in et_element.iterfind("VALID-BASE-VARIANTS/VALID-BASE-VARIANT")
]
config_records = NamedItemList([
ConfigRecord.from_et(cr_elem, context)
for cr_elem in et_element.iterfind("CONFIG-RECORDS/CONFIG-RECORD")
])
sdgs = [SpecialDataGroup.from_et(sdge, context) for sdge in et_element.iterfind("SDGS/SDG")]

return ConfigData(
valid_base_variants=valid_base_variants,
config_records=config_records,
sdgs=sdgs,
**kwargs)

def _build_odxlinks(self) -> dict[OdxLinkId, Any]:
result = {}

for valid_base_variant in self.valid_base_variants:
result.update(valid_base_variant._build_odxlinks())
for config_record in self.config_records:
result.update(config_record._build_odxlinks())
for sdg in self.sdgs:
result.update(sdg._build_odxlinks())

return result

def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
for valid_base_variant in self.valid_base_variants:
valid_base_variant._resolve_odxlinks(odxlinks)
for config_record in self.config_records:
config_record._resolve_odxlinks(odxlinks)
for sdg in self.sdgs:
sdg._resolve_odxlinks(odxlinks)

def _resolve_snrefs(self, context: SnRefContext) -> None:
for valid_base_variant in self.valid_base_variants:
valid_base_variant._resolve_snrefs(context)
for config_record in self.config_records:
config_record._resolve_snrefs(context)
for sdg in self.sdgs:
sdg._resolve_snrefs(context)
57 changes: 57 additions & 0 deletions odxtools/configdatadictionaryspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass, field
from typing import Any
from xml.etree import ElementTree

from .dataobjectproperty import DataObjectProperty
from .nameditemlist import NamedItemList
from .odxdoccontext import OdxDocContext
from .odxlink import OdxLinkDatabase, OdxLinkId
from .snrefcontext import SnRefContext
from .unitspec import UnitSpec


@dataclass(kw_only=True)
class ConfigDataDictionarySpec:
data_object_props: NamedItemList[DataObjectProperty] = field(default_factory=NamedItemList)
unit_spec: UnitSpec | None = None

@staticmethod
def from_et(et_element: ElementTree.Element,
context: OdxDocContext) -> "ConfigDataDictionarySpec":
data_object_props = NamedItemList([
DataObjectProperty.from_et(dop_element, context)
for dop_element in et_element.iterfind("DATA-OBJECT-PROPS/DATA-OBJECT-PROP")
])

if (spec_elem := et_element.find("UNIT-SPEC")) is not None:
unit_spec = UnitSpec.from_et(spec_elem, context)
else:
unit_spec = None

return ConfigDataDictionarySpec(
data_object_props=data_object_props,
unit_spec=unit_spec,
)

def _build_odxlinks(self) -> dict[OdxLinkId, Any]:
odxlinks = {}

for data_object_prop in self.data_object_props:
odxlinks.update(data_object_prop._build_odxlinks())
if self.unit_spec is not None:
odxlinks.update(self.unit_spec._build_odxlinks())

return odxlinks

def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
for data_object_prop in self.data_object_props:
data_object_prop._resolve_odxlinks(odxlinks)
if self.unit_spec is not None:
self.unit_spec._resolve_odxlinks(odxlinks)

def _resolve_snrefs(self, context: SnRefContext) -> None:
for data_object_prop in self.data_object_props:
data_object_prop._resolve_snrefs(context)
if self.unit_spec is not None:
self.unit_spec._resolve_snrefs(context)
18 changes: 18 additions & 0 deletions odxtools/configiditem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from xml.etree import ElementTree

from .configitem import ConfigItem
from .odxdoccontext import OdxDocContext
from .utils import dataclass_fields_asdict


@dataclass(kw_only=True)
class ConfigIdItem(ConfigItem):
"""This class represents a CONFIG-ID-ITEM."""

@staticmethod
def from_et(et_element: ElementTree.Element, context: OdxDocContext) -> "ConfigIdItem":
kwargs = dataclass_fields_asdict(ConfigItem.from_et(et_element, context))

return ConfigIdItem(**kwargs)
85 changes: 85 additions & 0 deletions odxtools/configitem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass, field
from typing import Any
from xml.etree import ElementTree

from .dopbase import DopBase
from .element import NamedElement
from .exceptions import odxrequire
from .odxdoccontext import OdxDocContext
from .odxlink import OdxLinkDatabase, OdxLinkId, OdxLinkRef, resolve_snref
from .snrefcontext import SnRefContext
from .specialdatagroup import SpecialDataGroup
from .utils import dataclass_fields_asdict


@dataclass(kw_only=True)
class ConfigItem(NamedElement):
"""This class represents a CONFIG-ITEM.

CONFIG-ITEM is the base class for CONFIG-ID-ITEM, DATA-ID-ITEM,
OPTION-ITEM, and SYSTEM-ITEM.
"""
byte_position: int | None = None
bit_position: int | None = None

# according to the spec exactly one of the following two
# attributes is not None...x
data_object_prop_ref: OdxLinkRef | None = None
data_object_prop_snref: str | None = None
sdgs: list[SpecialDataGroup] = field(default_factory=list)

@property
def data_object_prop(self) -> DopBase:
return self._data_object_prop

@staticmethod
def from_et(et_element: ElementTree.Element, context: OdxDocContext) -> "ConfigItem":
kwargs = dataclass_fields_asdict(NamedElement.from_et(et_element, context))

byte_position = None
if (byte_pos_elem := et_element.findtext("BYTE-POSITION")) is not None:
byte_position = int(byte_pos_elem)

bit_position = None
if (bit_pos_elem := et_element.findtext("BIT-POSITION")) is not None:
bit_position = int(bit_pos_elem)

data_object_prop_ref = OdxLinkRef.from_et(et_element.find("DATA-OBJECT-PROP-REF"), context)
data_object_prop_snref = None
if (data_object_prop_snref_elem := et_element.find("DATA-OBJECT-PROP-SNREF")) is not None:
data_object_prop_snref = odxrequire(
data_object_prop_snref_elem.attrib.get("SHORT-NAME"))
sdgs = [SpecialDataGroup.from_et(sdge, context) for sdge in et_element.iterfind("SDGS/SDG")]

return ConfigItem(
byte_position=byte_position,
bit_position=bit_position,
data_object_prop_ref=data_object_prop_ref,
data_object_prop_snref=data_object_prop_snref,
sdgs=sdgs,
**kwargs)

def _build_odxlinks(self) -> dict[OdxLinkId, Any]:
result = {}

for sdg in self.sdgs:
result.update(sdg._build_odxlinks())

return result

def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
if self.data_object_prop_ref is not None:
self._data_object_prop = odxlinks.resolve(self.data_object_prop_ref, DopBase)

for sdg in self.sdgs:
sdg._resolve_odxlinks(odxlinks)

def _resolve_snrefs(self, context: SnRefContext) -> None:
if self.data_object_prop_snref is not None:
ddds = odxrequire(context.diag_layer).diag_data_dictionary_spec
self._data_object_prop = resolve_snref(self.data_object_prop_snref,
ddds.all_data_object_properties, DopBase)

for sdg in self.sdgs:
sdg._resolve_snrefs(context)
Loading