|
| 1 | +# SPDX-License-Identifier: MIT |
| 2 | +from dataclasses import dataclass, field |
| 3 | +from typing import Any |
| 4 | +from xml.etree import ElementTree |
| 5 | + |
| 6 | +from .admindata import AdminData |
| 7 | +from .audience import Audience |
| 8 | +from .diaglayers.diaglayer import DiagLayer |
| 9 | +from .element import IdentifiableElement |
| 10 | +from .exceptions import odxrequire |
| 11 | +from .functionalclass import FunctionalClass |
| 12 | +from .inputparam import InputParam |
| 13 | +from .nameditemlist import NamedItemList |
| 14 | +from .negoutputparam import NegOutputParam |
| 15 | +from .odxdoccontext import OdxDocContext |
| 16 | +from .odxlink import OdxLinkDatabase, OdxLinkId, OdxLinkRef |
| 17 | +from .odxtypes import odxstr_to_bool |
| 18 | +from .outputparam import OutputParam |
| 19 | +from .progcode import ProgCode |
| 20 | +from .snrefcontext import SnRefContext |
| 21 | +from .specialdatagroup import SpecialDataGroup |
| 22 | +from .utils import dataclass_fields_asdict |
| 23 | + |
| 24 | + |
| 25 | +@dataclass(kw_only=True) |
| 26 | +class MultipleEcuJob(IdentifiableElement): |
| 27 | + """A multiple ECU job is a diagnostic communication primitive. |
| 28 | +
|
| 29 | + A multiple ECU job is more complex than a diagnostic service and is |
| 30 | + not provided natively by the ECU. In particular, the job is |
| 31 | + defined in external programs which are referenced by the attribute |
| 32 | + `.prog_codes`. |
| 33 | +
|
| 34 | + In contrast to "single ECU jobs", a multiple ECU job only involves |
| 35 | + calls to services provided by more than one ECU. |
| 36 | +
|
| 37 | + Multiple ECU jobs are defined in section 7.3.11 of the ASAM MCD-2 |
| 38 | + standard. |
| 39 | + """ |
| 40 | + |
| 41 | + admin_data: AdminData | None = None |
| 42 | + sdgs: list[SpecialDataGroup] = field(default_factory=list) |
| 43 | + functional_class_refs: list[OdxLinkRef] = field(default_factory=list) |
| 44 | + prog_codes: list[ProgCode] = field(default_factory=list) |
| 45 | + input_params: NamedItemList[InputParam] = field(default_factory=NamedItemList) |
| 46 | + output_params: NamedItemList[OutputParam] = field(default_factory=NamedItemList) |
| 47 | + neg_output_params: NamedItemList[NegOutputParam] = field(default_factory=NamedItemList) |
| 48 | + diag_layer_refs: list[OdxLinkRef] = field(default_factory=list) |
| 49 | + audience: Audience | None = None |
| 50 | + semantic: str | None = None |
| 51 | + is_executable_raw: bool | None = None |
| 52 | + |
| 53 | + @property |
| 54 | + def functional_classes(self) -> NamedItemList[FunctionalClass]: |
| 55 | + return self._functional_classes |
| 56 | + |
| 57 | + @property |
| 58 | + def diag_layers(self) -> NamedItemList[DiagLayer]: |
| 59 | + return self._diag_layers |
| 60 | + |
| 61 | + @property |
| 62 | + def is_executable(self) -> bool: |
| 63 | + return self.is_executable_raw in (True, None) |
| 64 | + |
| 65 | + @staticmethod |
| 66 | + def from_et(et_element: ElementTree.Element, context: OdxDocContext) -> "MultipleEcuJob": |
| 67 | + kwargs = dataclass_fields_asdict(IdentifiableElement.from_et(et_element, context)) |
| 68 | + |
| 69 | + admin_data = None |
| 70 | + if (admin_data_elem := et_element.find("ADMIN-DATA")) is not None: |
| 71 | + admin_data = AdminData.from_et(admin_data_elem, context) |
| 72 | + sdgs = [SpecialDataGroup.from_et(sdge, context) for sdge in et_element.iterfind("SDGS/SDG")] |
| 73 | + functional_class_refs = [ |
| 74 | + odxrequire(OdxLinkRef.from_et(el, context)) |
| 75 | + for el in et_element.iterfind("FUNCT-CLASS-REFS/FUNCT-CLASS-REF") |
| 76 | + ] |
| 77 | + prog_codes = [ |
| 78 | + ProgCode.from_et(pc_elem, context) |
| 79 | + for pc_elem in et_element.iterfind("PROG-CODES/PROG-CODE") |
| 80 | + ] |
| 81 | + |
| 82 | + input_params = NamedItemList([ |
| 83 | + InputParam.from_et(el, context) |
| 84 | + for el in et_element.iterfind("INPUT-PARAMS/INPUT-PARAM") |
| 85 | + ]) |
| 86 | + output_params = NamedItemList([ |
| 87 | + OutputParam.from_et(el, context) |
| 88 | + for el in et_element.iterfind("OUTPUT-PARAMS/OUTPUT-PARAM") |
| 89 | + ]) |
| 90 | + neg_output_params = NamedItemList([ |
| 91 | + NegOutputParam.from_et(el, context) |
| 92 | + for el in et_element.iterfind("NEG-OUTPUT-PARAMS/NEG-OUTPUT-PARAM") |
| 93 | + ]) |
| 94 | + diag_layer_refs = [ |
| 95 | + odxrequire(OdxLinkRef.from_et(el, context)) |
| 96 | + for el in et_element.iterfind("DIAG-LAYER-REFS/DIAG-LAYER-REF") |
| 97 | + ] |
| 98 | + audience = None |
| 99 | + if (aud_elem := et_element.find("AUDIENCE")) is not None: |
| 100 | + audience = Audience.from_et(aud_elem, context) |
| 101 | + |
| 102 | + semantic = et_element.attrib.get("SEMANTIC") |
| 103 | + is_executable_raw = odxstr_to_bool(et_element.attrib.get("IS-EXECUTABLE")) |
| 104 | + |
| 105 | + return MultipleEcuJob( |
| 106 | + admin_data=admin_data, |
| 107 | + sdgs=sdgs, |
| 108 | + functional_class_refs=functional_class_refs, |
| 109 | + prog_codes=prog_codes, |
| 110 | + input_params=input_params, |
| 111 | + output_params=output_params, |
| 112 | + neg_output_params=neg_output_params, |
| 113 | + diag_layer_refs=diag_layer_refs, |
| 114 | + audience=audience, |
| 115 | + semantic=semantic, |
| 116 | + is_executable_raw=is_executable_raw, |
| 117 | + **kwargs) |
| 118 | + |
| 119 | + def _build_odxlinks(self) -> dict[OdxLinkId, Any]: |
| 120 | + result = {self.odx_id: self} |
| 121 | + |
| 122 | + if self.admin_data is not None: |
| 123 | + result.update(self.admin_data._build_odxlinks()) |
| 124 | + for sdg in self.sdgs: |
| 125 | + result.update(sdg._build_odxlinks()) |
| 126 | + for prog_code in self.prog_codes: |
| 127 | + result.update(prog_code._build_odxlinks()) |
| 128 | + for input_param in self.input_params: |
| 129 | + result.update(input_param._build_odxlinks()) |
| 130 | + for output_param in self.output_params: |
| 131 | + result.update(output_param._build_odxlinks()) |
| 132 | + for neg_output_param in self.neg_output_params: |
| 133 | + result.update(neg_output_param._build_odxlinks()) |
| 134 | + if self.audience is not None: |
| 135 | + result.update(self.audience._build_odxlinks()) |
| 136 | + |
| 137 | + return result |
| 138 | + |
| 139 | + def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None: |
| 140 | + if self.admin_data is not None: |
| 141 | + self.admin_data._resolve_odxlinks(odxlinks) |
| 142 | + for sdg in self.sdgs: |
| 143 | + sdg._resolve_odxlinks(odxlinks) |
| 144 | + for prog_code in self.prog_codes: |
| 145 | + prog_code._resolve_odxlinks(odxlinks) |
| 146 | + for input_param in self.input_params: |
| 147 | + input_param._resolve_odxlinks(odxlinks) |
| 148 | + for output_param in self.output_params: |
| 149 | + output_param._resolve_odxlinks(odxlinks) |
| 150 | + for neg_output_param in self.neg_output_params: |
| 151 | + neg_output_param._resolve_odxlinks(odxlinks) |
| 152 | + if self.audience is not None: |
| 153 | + self.audience._resolve_odxlinks(odxlinks) |
| 154 | + |
| 155 | + self._functional_classes = NamedItemList( |
| 156 | + [odxlinks.resolve(fc_ref, FunctionalClass) for fc_ref in self.functional_class_refs]) |
| 157 | + self._diag_layers = NamedItemList( |
| 158 | + [odxlinks.resolve(dl_ref, DiagLayer) for dl_ref in self.diag_layer_refs]) |
| 159 | + |
| 160 | + def _resolve_snrefs(self, context: SnRefContext) -> None: |
| 161 | + context.multiple_ecu_job = self |
| 162 | + |
| 163 | + if self.admin_data is not None: |
| 164 | + self.admin_data._resolve_snrefs(context) |
| 165 | + for sdg in self.sdgs: |
| 166 | + sdg._resolve_snrefs(context) |
| 167 | + for prog_code in self.prog_codes: |
| 168 | + prog_code._resolve_snrefs(context) |
| 169 | + for input_param in self.input_params: |
| 170 | + input_param._resolve_snrefs(context) |
| 171 | + for output_param in self.output_params: |
| 172 | + output_param._resolve_snrefs(context) |
| 173 | + for neg_output_param in self.neg_output_params: |
| 174 | + neg_output_param._resolve_snrefs(context) |
| 175 | + if self.audience is not None: |
| 176 | + self.audience._resolve_snrefs(context) |
| 177 | + |
| 178 | + context.multiple_ecu_job = None |
0 commit comments