|
2 | 2 | # Copyright (c) 2022 MBition GmbH
|
3 | 3 |
|
4 | 4 |
|
| 5 | +from typing import List, Tuple, Union |
| 6 | + |
| 7 | +from ..exceptions import EncodeError, DecodeError |
5 | 8 | from ..globals import logger
|
| 9 | +from ..odxtypes import DataType |
6 | 10 |
|
7 | 11 | from .compumethodbase import CompuMethod
|
| 12 | +from .limit import IntervalType, Limit |
8 | 13 |
|
9 | 14 |
|
10 | 15 | class TabIntpCompuMethod(CompuMethod):
|
| 16 | + """ |
| 17 | + A compu method of type Tab Interpolated is used for linear interpolation. |
| 18 | +
|
| 19 | + A `TabIntpCompuMethod` is defined by a set of points. Each point is an (internal, physical) value pair. |
| 20 | + When converting from internal to physical or vice-versa, the result is linearly interpolated. |
| 21 | +
|
| 22 | + The function defined by a `TabIntpCompuMethod` is similar to the one of a `ScaleLinearCompuMethod` with the following differences: |
| 23 | +
|
| 24 | + * `TabIntpCompuMethod`s are always continuous whereas `ScaleLinearCompuMethod` might have jumps |
| 25 | + * `TabIntpCompuMethod`s are always invertible: Even if the linear interpolation is not monotonic, the first matching interval is taken. |
| 26 | +
|
| 27 | + Refer to ASAM MCD-2 D (ODX) Specification, section 7.3.6.6.8 for details. |
| 28 | +
|
| 29 | + Examples |
| 30 | + -------- |
| 31 | +
|
| 32 | + Create a TabIntpCompuMethod defined by the points (0, -1), (10, 1), (30, 2):: |
| 33 | +
|
| 34 | + method = TabIntpCompuMethod( |
| 35 | + internal_type=DataType.A_UINT32, |
| 36 | + physical_type=DataType.A_UINT32, |
| 37 | + internal_points=[0, 10, 30], |
| 38 | + physical_points=[-1, 1, 2] |
| 39 | + ) |
| 40 | +
|
| 41 | + Note that the points are given as two lists. The equivalent odx definition is:: |
| 42 | +
|
| 43 | + <COMPU-METHOD> |
| 44 | + <CATEGORY>TAB-INTP</CATEGORY> |
| 45 | + <COMPU-INTERNAL-TO-PHYS> |
| 46 | + <COMPU-SCALES> |
| 47 | + <COMPU-SCALE> |
| 48 | + <LOWER-LIMIT INTERVAL-TYPE = "CLOSED">0</LOWER-LIMIT> |
| 49 | + <COMPU-CONST> |
| 50 | + <V>-1</V> |
| 51 | + </COMPU-CONST> |
| 52 | + </COMPU-SCALE> |
| 53 | + <COMPU-SCALE> |
| 54 | + <LOWER-LIMIT INTERVAL-TYPE = "CLOSED">10</LOWER-LIMIT> |
| 55 | + <COMPU-CONST> |
| 56 | + <V>1</V> |
| 57 | + </COMPU-CONST> |
| 58 | + </COMPU-SCALE> |
| 59 | + <COMPU-SCALE> |
| 60 | + <LOWER-LIMIT INTERVAL-TYPE = "CLOSED">30</LOWER-LIMIT> |
| 61 | + <COMPU-CONST> |
| 62 | + <V>2</V> |
| 63 | + </COMPU-CONST> |
| 64 | + </COMPU-SCALE> |
| 65 | + </COMPU-SCALES> |
| 66 | + </COMPU-INTERNAL-TO-PHYS> |
| 67 | + </COMPU-METHOD> |
| 68 | +
|
| 69 | + """ |
| 70 | + |
11 | 71 | def __init__(self,
|
12 |
| - internal_type, |
13 |
| - physical_type): |
| 72 | + internal_type: Union[DataType, str], |
| 73 | + physical_type: Union[DataType, str], |
| 74 | + internal_points: List[Union[float, int]], |
| 75 | + physical_points: List[Union[float, int]]): |
14 | 76 | super().__init__(internal_type, physical_type, "TAB-INTP")
|
15 |
| - logger.debug("Created table interpolation compu method!") |
16 |
| - logger.warning( |
17 |
| - "TODO: Implement table interpolation compu method properly!") |
18 | 77 |
|
19 |
| - def convert_physical_to_internal(self, physical_value): |
20 |
| - return self.internal_type.make_from(physical_value) |
| 78 | + self.internal_points = internal_points |
| 79 | + self.physical_points = physical_points |
| 80 | + |
| 81 | + self._physical_lower_limit = Limit( |
| 82 | + min(physical_points), IntervalType.CLOSED) |
| 83 | + self._physical_upper_limit = Limit( |
| 84 | + max(physical_points), IntervalType.CLOSED) |
| 85 | + |
| 86 | + logger.debug("Created compu method of type tab interpolated !") |
| 87 | + self._assert_validity() |
| 88 | + |
| 89 | + @property |
| 90 | + def physical_lower_limit(self) -> Limit: |
| 91 | + return self._physical_lower_limit |
| 92 | + |
| 93 | + @property |
| 94 | + def physical_upper_limit(self) -> Limit: |
| 95 | + return self._physical_upper_limit |
| 96 | + |
| 97 | + def _assert_validity(self) -> None: |
| 98 | + assert len(self.internal_points) == len(self.physical_points) |
| 99 | + |
| 100 | + assert self.internal_type in [DataType.A_INT32, DataType.A_UINT32, |
| 101 | + DataType.A_FLOAT32, DataType.A_FLOAT64], \ |
| 102 | + ("Internal data type of tab-intp compumethod must be one of" |
| 103 | + " [DataType.A_INT32, DataType.A_UINT32, DataType.A_FLOAT32, DataType.A_FLOAT64]") |
| 104 | + assert self.physical_type in [DataType.A_INT32, DataType.A_UINT32, |
| 105 | + DataType.A_FLOAT32, DataType.A_FLOAT64], \ |
| 106 | + ("Physical data type of tab-intp compumethod must be one of" |
| 107 | + " [DataType.A_INT32, DataType.A_UINT32, DataType.A_FLOAT32, DataType.A_FLOAT64]") |
| 108 | + |
| 109 | + def _piecewise_linear_interpolate(self, |
| 110 | + x: Union[int, float], |
| 111 | + points: List[Tuple[Union[int, float], Union[int, float]]]) \ |
| 112 | + -> Union[float, None]: |
| 113 | + for ((x0, y0), (x1, y1)) in zip(points[:-1], points[1:]): |
| 114 | + if x0 <= x and x <= x1: |
| 115 | + return y0 + (x - x0) * (y1 - y0) / (x1 - x0) |
| 116 | + |
| 117 | + return None |
| 118 | + |
| 119 | + def convert_physical_to_internal(self, physical_value: Union[int, float]) -> Union[int, float]: |
| 120 | + reference_points = list(zip( |
| 121 | + self.physical_points, self.internal_points)) |
| 122 | + result = self._piecewise_linear_interpolate( |
| 123 | + physical_value, reference_points) |
| 124 | + |
| 125 | + if result is None: |
| 126 | + raise EncodeError(f"Internal value {physical_value} must be inside the range" |
| 127 | + f" [{min(self.physical_points)}, {max(self.physical_points)}]") |
| 128 | + res = self.internal_type.make_from(result) |
| 129 | + assert isinstance(res, (int, float)) |
| 130 | + return res |
| 131 | + |
| 132 | + def convert_internal_to_physical(self, internal_value: Union[int, float]) -> Union[int, float]: |
| 133 | + reference_points = list(zip( |
| 134 | + self.internal_points, self.physical_points)) |
| 135 | + result = self._piecewise_linear_interpolate( |
| 136 | + internal_value, reference_points) |
21 | 137 |
|
22 |
| - def convert_internal_to_physical(self, internal_value): |
23 |
| - return self.physical_type.make_from(internal_value) |
| 138 | + if result is None: |
| 139 | + raise DecodeError(f"Internal value {internal_value} must be inside the range" |
| 140 | + f" [{min(self.internal_points)}, {max(self.internal_points)}]") |
| 141 | + res = self.physical_type.make_from(result) |
| 142 | + assert isinstance(res, (int, float)) |
| 143 | + return res |
24 | 144 |
|
25 |
| - def is_valid_physical_value(self, physical_value): |
26 |
| - return True |
| 145 | + def is_valid_physical_value(self, physical_value: Union[int, float]) -> bool: |
| 146 | + return min(self.physical_points) <= physical_value and physical_value <= max(self.physical_points) |
27 | 147 |
|
28 |
| - def is_valid_internal_value(self, internal_value): |
29 |
| - return True |
| 148 | + def is_valid_internal_value(self, internal_value: Union[int, float]) -> bool: |
| 149 | + return min(self.internal_points) <= internal_value and internal_value <= max(self.internal_points) |
0 commit comments