Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions odxtools/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .ecuconfig import EcuConfig
from .exceptions import odxraise, odxrequire
from .flash import Flash
from .multipleecujobspec import MultipleEcuJobSpec
from .nameditemlist import NamedItemList
from .odxdoccontext import OdxDocContext
from .odxlink import DocType, OdxDocFragment, OdxLinkDatabase, OdxLinkId
Expand All @@ -43,6 +44,7 @@ def __init__(self) -> None:
self._comparam_specs = NamedItemList[ComparamSpec]()
self._ecu_configs = NamedItemList[EcuConfig]()
self._flashs = NamedItemList[Flash]()
self._multiple_ecu_job_specs = NamedItemList[MultipleEcuJobSpec]()
self._short_name = "odx_database"

def add_pdx_file(self, pdx_file: Union[str, "PathLike[Any]", IO[bytes], ZipFile]) -> None:
Expand Down Expand Up @@ -123,6 +125,10 @@ def add_xml_tree(self, root: ElementTree.Element) -> None:
elif category_tag == "FLASH":
context = OdxDocContext(model_version, (OdxDocFragment(category_sn, DocType.FLASH),))
self._flashs.append(Flash.from_et(category_et, context))
elif category_tag == "MULTIPLE-ECU-JOB-SPEC":
context = OdxDocContext(model_version,
(OdxDocFragment(category_sn, DocType.MULTIPLE_ECU_JOB_SPEC),))
self._multiple_ecu_job_specs.append(MultipleEcuJobSpec.from_et(category_et, context))

def refresh(self) -> None:
# Create wrapper objects
Expand Down Expand Up @@ -160,6 +166,9 @@ def refresh(self) -> None:
for flash in self.flashs:
flash._resolve_odxlinks(self._odxlinks)

for multiple_ecu_job_spec in self.multiple_ecu_job_specs:
multiple_ecu_job_spec._resolve_odxlinks(self._odxlinks)

# resolve short name references for containers which do not do
# inheritance (we can call directly call _resolve_snrefs())
context = SnRefContext()
Expand All @@ -176,6 +185,8 @@ def refresh(self) -> None:
ecu_config._finalize_init(self, self._odxlinks)
for flash in self.flashs:
flash._finalize_init(self, self._odxlinks)
for multiple_ecu_job_spec in self.multiple_ecu_job_specs:
multiple_ecu_job_spec._finalize_init(self, self._odxlinks)

for subset in self.comparam_subsets:
subset._resolve_snrefs(context)
Expand All @@ -187,6 +198,8 @@ def refresh(self) -> None:
ecu_config._resolve_snrefs(context)
for flash in self.flashs:
flash._resolve_snrefs(context)
for multiple_ecu_job_spec in self.multiple_ecu_job_specs:
multiple_ecu_job_spec._resolve_snrefs(context)

def _build_odxlinks(self) -> dict[OdxLinkId, Any]:
result: dict[OdxLinkId, Any] = {}
Expand All @@ -205,6 +218,8 @@ def _build_odxlinks(self) -> dict[OdxLinkId, Any]:

for flash in self.flashs:
result.update(flash._build_odxlinks())
for multiple_ecu_job_spec in self.multiple_ecu_job_specs:
result.update(multiple_ecu_job_spec._build_odxlinks())

return result

Expand Down Expand Up @@ -287,6 +302,10 @@ def ecu_configs(self) -> NamedItemList[EcuConfig]:
def flashs(self) -> NamedItemList[Flash]:
return self._flashs

@property
def multiple_ecu_job_specs(self) -> NamedItemList[MultipleEcuJobSpec]:
return self._multiple_ecu_job_specs

def __repr__(self) -> str:
return f"Database(model_version={self.model_version}, " \
f"protocols={[x.short_name for x in self.protocols]}, " \
Expand Down
9 changes: 1 addition & 8 deletions odxtools/diaglayers/diaglayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,7 @@ def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
if self.import_refs:
imported_links: dict[OdxLinkId, Any] = {}
for import_ref in self.import_refs:
imported_dl = odxlinks.resolve(import_ref, DiagLayer)

odxassert(
imported_dl.variant_type == DiagLayerType.ECU_SHARED_DATA,
f"Tried to import references from diagnostic layer "
f"'{imported_dl.short_name}' of type {imported_dl.variant_type.value}. "
f"Only ECU-SHARED-DATA layers may be referenced using the "
f"IMPORT-REF mechanism")
imported_dl = odxlinks.resolve(import_ref)

# TODO: ensure that the imported diagnostic layer has
# not been referenced in any PARENT-REF of the current
Expand Down
178 changes: 178 additions & 0 deletions odxtools/multipleecujob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass, field
from typing import Any
from xml.etree import ElementTree

from .admindata import AdminData
from .audience import Audience
from .diaglayers.diaglayer import DiagLayer
from .element import IdentifiableElement
from .exceptions import odxrequire
from .functionalclass import FunctionalClass
from .inputparam import InputParam
from .nameditemlist import NamedItemList
from .negoutputparam import NegOutputParam
from .odxdoccontext import OdxDocContext
from .odxlink import OdxLinkDatabase, OdxLinkId, OdxLinkRef
from .odxtypes import odxstr_to_bool
from .outputparam import OutputParam
from .progcode import ProgCode
from .snrefcontext import SnRefContext
from .specialdatagroup import SpecialDataGroup
from .utils import dataclass_fields_asdict


@dataclass(kw_only=True)
class MultipleEcuJob(IdentifiableElement):
"""A multiple ECU job is a diagnostic communication primitive.

A multiple ECU job is more complex than a diagnostic service and is
not provided natively by the ECU. In particular, the job is
defined in external programs which are referenced by the attribute
`.prog_codes`.

In contrast to "single ECU jobs", a multiple ECU job only involves
calls to services provided by more than one ECU.

Multiple ECU jobs are defined in section 7.3.11 of the ASAM MCD-2
standard.
"""

admin_data: AdminData | None = None
sdgs: list[SpecialDataGroup] = field(default_factory=list)
functional_class_refs: list[OdxLinkRef] = field(default_factory=list)
prog_codes: list[ProgCode] = field(default_factory=list)
input_params: NamedItemList[InputParam] = field(default_factory=NamedItemList)
output_params: NamedItemList[OutputParam] = field(default_factory=NamedItemList)
neg_output_params: NamedItemList[NegOutputParam] = field(default_factory=NamedItemList)
diag_layer_refs: list[OdxLinkRef] = field(default_factory=list)
audience: Audience | None = None
semantic: str | None = None
is_executable_raw: bool | None = None

@property
def functional_classes(self) -> NamedItemList[FunctionalClass]:
return self._functional_classes

@property
def diag_layers(self) -> NamedItemList[DiagLayer]:
return self._diag_layers

@property
def is_executable(self) -> bool:
return self.is_executable_raw in (True, None)

@staticmethod
def from_et(et_element: ElementTree.Element, context: OdxDocContext) -> "MultipleEcuJob":
kwargs = dataclass_fields_asdict(IdentifiableElement.from_et(et_element, context))

admin_data = None
if (admin_data_elem := et_element.find("ADMIN-DATA")) is not None:
admin_data = AdminData.from_et(admin_data_elem, context)
sdgs = [SpecialDataGroup.from_et(sdge, context) for sdge in et_element.iterfind("SDGS/SDG")]
functional_class_refs = [
odxrequire(OdxLinkRef.from_et(el, context))
for el in et_element.iterfind("FUNCT-CLASS-REFS/FUNCT-CLASS-REF")
]
prog_codes = [
ProgCode.from_et(pc_elem, context)
for pc_elem in et_element.iterfind("PROG-CODES/PROG-CODE")
]

input_params = NamedItemList([
InputParam.from_et(el, context)
for el in et_element.iterfind("INPUT-PARAMS/INPUT-PARAM")
])
output_params = NamedItemList([
OutputParam.from_et(el, context)
for el in et_element.iterfind("OUTPUT-PARAMS/OUTPUT-PARAM")
])
neg_output_params = NamedItemList([
NegOutputParam.from_et(el, context)
for el in et_element.iterfind("NEG-OUTPUT-PARAMS/NEG-OUTPUT-PARAM")
])
diag_layer_refs = [
odxrequire(OdxLinkRef.from_et(el, context))
for el in et_element.iterfind("DIAG-LAYER-REFS/DIAG-LAYER-REF")
]
audience = None
if (aud_elem := et_element.find("AUDIENCE")) is not None:
audience = Audience.from_et(aud_elem, context)

semantic = et_element.attrib.get("SEMANTIC")
is_executable_raw = odxstr_to_bool(et_element.attrib.get("IS-EXECUTABLE"))

return MultipleEcuJob(
admin_data=admin_data,
sdgs=sdgs,
functional_class_refs=functional_class_refs,
prog_codes=prog_codes,
input_params=input_params,
output_params=output_params,
neg_output_params=neg_output_params,
diag_layer_refs=diag_layer_refs,
audience=audience,
semantic=semantic,
is_executable_raw=is_executable_raw,
**kwargs)

def _build_odxlinks(self) -> dict[OdxLinkId, Any]:
result = {self.odx_id: self}

if self.admin_data is not None:
result.update(self.admin_data._build_odxlinks())
for sdg in self.sdgs:
result.update(sdg._build_odxlinks())
for prog_code in self.prog_codes:
result.update(prog_code._build_odxlinks())
for input_param in self.input_params:
result.update(input_param._build_odxlinks())
for output_param in self.output_params:
result.update(output_param._build_odxlinks())
for neg_output_param in self.neg_output_params:
result.update(neg_output_param._build_odxlinks())
if self.audience is not None:
result.update(self.audience._build_odxlinks())

return result

def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
if self.admin_data is not None:
self.admin_data._resolve_odxlinks(odxlinks)
for sdg in self.sdgs:
sdg._resolve_odxlinks(odxlinks)
for prog_code in self.prog_codes:
prog_code._resolve_odxlinks(odxlinks)
for input_param in self.input_params:
input_param._resolve_odxlinks(odxlinks)
for output_param in self.output_params:
output_param._resolve_odxlinks(odxlinks)
for neg_output_param in self.neg_output_params:
neg_output_param._resolve_odxlinks(odxlinks)
if self.audience is not None:
self.audience._resolve_odxlinks(odxlinks)

self._functional_classes = NamedItemList(
[odxlinks.resolve(fc_ref, FunctionalClass) for fc_ref in self.functional_class_refs])
self._diag_layers = NamedItemList(
[odxlinks.resolve(dl_ref, DiagLayer) for dl_ref in self.diag_layer_refs])

def _resolve_snrefs(self, context: SnRefContext) -> None:
context.multiple_ecu_job = self

if self.admin_data is not None:
self.admin_data._resolve_snrefs(context)
for sdg in self.sdgs:
sdg._resolve_snrefs(context)
for prog_code in self.prog_codes:
prog_code._resolve_snrefs(context)
for input_param in self.input_params:
input_param._resolve_snrefs(context)
for output_param in self.output_params:
output_param._resolve_snrefs(context)
for neg_output_param in self.neg_output_params:
neg_output_param._resolve_snrefs(context)
if self.audience is not None:
self.audience._resolve_snrefs(context)

context.multiple_ecu_job = None
Loading