diff --git a/.gitignore b/.gitignore index 6703b5f1..b0df99ae 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,8 @@ hass.py __pycache__ custom_components/loxone/homeassistant .vscode + + +.DS_Store + +.coverage \ No newline at end of file diff --git a/.pytest.ini b/.pytest.ini new file mode 100644 index 00000000..939f3e4e --- /dev/null +++ b/.pytest.ini @@ -0,0 +1,12 @@ +[pytest] +; logging +log_level = WARNING +log_format = %(asctime)s %(levelname)s %(filename)s:%(funcName)s %(message)s +log_date_format = %Y-%m-%d %H:%M:%S + +; log file +log_file_level = INFO +log_file = ./custom_components/test/.test_log/fuzzing.log + +; execution +addopts = --no-summary \ No newline at end of file diff --git a/custom_components/test/.test_log/.gitkeep b/custom_components/test/.test_log/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/custom_components/test/fuzzing/fuzzer_overview.puml b/custom_components/test/fuzzing/fuzzer_overview.puml new file mode 100644 index 00000000..53f9e27e --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_overview.puml @@ -0,0 +1,304 @@ +@startuml fuzzer_overview +allow_mixing + +'Web server to creat UML: +' https://www.plantuml.com/plantuml/uml/SyfFKj2rKt3CoKnELR1Io4ZDoSa70000 +' copy the UML and generate SVG + +''''''''''''''''''''''''''''''''''''''' +'Fuzzer'''''''''''''''''''''''''''''''' +''''''''''''''''''''''''''''''''''''''' +abstract class "Fuzzer" as fuzzer << abstract class >> #LightSkyBlue { + -- + - __init__(self): void + + fuzz(): list +} +''''''''''''''''''''''''''''''''''''''' +class "ValuePoolFuzzer" as VPFuzzer << class >> #LightSkyBlue { + - __logger + - __value_pool: ValuePool + - __grammar_fuzzer: GrammarFuzzer + -- + - __init__(self): void + - __get_fuzzing_pool(self, value_pools: list[list], param_combi: int): list[list] + + fuzz(self, types: list, param_combi: int): list[list] +} +class "ValuePool" as VPool << class >> { + - __UINT_POOL: list + - __INT_POOL: list + - __FLOAT_POOL: list + - __STRING_POOL: list + - __BOOL_POOL: list + - __BYTE_POOL: list + - __LIST_POOL: list + - __DICT_POOL: list + - __DATE_POOL: list + - __ALL_VALUES_POOL: list + -- + - __init__(self): void + + get_uint(self): list + + get_int(self): list + + get_float(self): list + + get_int(self): list + + get_uint(self): list + + get_float(self): list + + get_string(self): list + + get_bool(self): list + + get_byte(self): list + + get_list(self): list + + get_dict(self): list + + get_date(self): list + + get_all_values(self): list +} +''''''''''''''''''''''''''''''''''''''' +class "GeneratorFuzzer" as GFuzzer << class >> #LightSkyBlue { + - _value_pool_fuzzer: ValuePoolFuzzer + - _param_types: defaultdict + - _param_types_file: str + - _mode: int + -- + - __init__(self, value_pool_fuzzer: ValuePoolFuzzer, mode: int = 1): None + - _load_param_types(self): None + - _save_param_types(self): None + - _assign_param_types(self, class_name: str, method: Callable): Dict[int, str] + - _get_param_types(self, class_name: str, method: Callable): Dict[int, str] + - _generate_method_sequence(self, cls: Type, start_methods: List[str], max_sequence_length: int): List[str] + - _to_hashable_with_marker(self, data: Any): Union[Tuple[str, frozenset], Any] + - _to_original_from_marker(self, data_with_marker: Union[Tuple[str, frozenset], Any]): Any + + fuzz(self, cls: Type, start_methods: List[str], max_sequence_length: int, num_sequences: int, max_param_combi: int = 2): List[List[Tuple[str, Tuple[Any]]]] +} +''''''''''''''''''''''''''''''''''''''' +class "CostGrammarType" as CGType << class >> { + + MIN: int + + MAX: int + -- +} +class "GrammarFuzzer" as GrFuzzer << class >> #LightSkyBlue { + - _NON_TERMINAL_REGEX: str + -- + - __init__(self): void + - __convert_to_cost_grammar(self, grammar: Grammar, conversion_type: CostGrammarType): tuple + - __convert_to_trackable_grammar(self, grammar: Grammar): tuple + - __compose_min_cost(self, head: Element, given_cost_grammar: Annotated_Grammar): str + - __compose_max_cost(self, head: Element, given_cost_grammar: Annotated_Grammar, applications: int, max_applications: int): str + - __is_grammar_covered(self, trackable_grammar: Annotated_Grammar, trackable_non_terminals: Annotated_Non_Terminals): bool + + fuzz_min_cost(self, grammar: Grammar, start_symbol: Element): str + + fuzz_max_cost( self, grammar: Grammar, start_symbol: Element, max_rule_applications: int): str + + fuzz_grammar_coverage(self, grammar: Grammar, start_symbol: Element): list +} +entity "grammar_pool" as gr_pool << grammars >> { + grammar_ipv4: Grammar + grammar_controls_json: Grammar + grammar_loxconfig_rooms_cats_json: Grammar +} +''''''''''''''''''''''''''''''''''''''' +class "MutationalFuzzer" as MFuzzer << class >> #LightSkyBlue { + - __logger + - __multiplier: list[int] + -- + - __init__(self): void + - __delete_random_char(self, s: str): str + - __insert_random_char(self, s: str): str + - __flip_random_char(self, s: str): str + - __get_random_float(self): float + - __check_inf(self, number: float): float + - __add_random_number(self, number: float): float + - __sub_random_number(self, number: float): float + - __mult_random_number(self, number: float): float + - __div_random_number(self, number: float): float + + fuzz(self, seed: list, rounds: int = 1): list[list] + + fuzz_failed(self, seed: dict, rounds: int = 1): list[list] +} +''''''''''''''''''''''''''''''''''''''' +'Runner'''''''''''''''''''''''''''''''' +''''''''''''''''''''''''''''''''''''''' +''''''''''''''''''''''''''''''''''''''' +class "GreyBoxFuzzer" as GBFuzzer << class >> #LightSkyBlue { + - __RANGE_RANDOM_INT: int = 9 + - __RANGE_RANDOM_STRING: int = 100 + - __data_type_creator: DataTypeCreator + -- + - __init__(self): void + + fuzz(self, seed_template: list, seed_specification: list = None, amount_seeds: int = 100): list[Seed] +} +''''''''''''''''''''''''''''''''''''''' +class "GreyBoxRunner" as GBRunner << class >> #LightGreen { + - __seed_manager = SeedManager() + - __mutator = Mutator() + -- + - __init__(self): void + + run(self, function: Callable, seed_population: List[Seed], amount_runs: int = 10000): list + - __hash_md5(self, branch_covered: str): str + - __store_hashed_branch(self, hashed_branch: str): void + - __mutate(self, seed: Seed): void +} +''''''''''''''''''''''''''''''''''''''' +class "SeedManager" as SeedManager << class >> { + - __power_energy: int = 2 + -- + - __init__(self): void + + select_seed(self, seed_population: List[Seed]): Seed + + adjust_energy(self, seed: Seed, branch_dict: dict, hashed_branch: str): void + + get_normalized_energy(self, seed_population: List[Seed]): list +} +''''''''''''''''''''''''''''''''''''''' +class "Seed" as Seed << class >> { + - energy: int = 0 + - seed_values: list = [] + -- + - __init__(self, energy: int = 0, seed_values: list = []): void +} +''''''''''''''''''''''''''''''''''''''' +class "DataTypeCreator" as DataTypeCreator << class >> { + - __MAX_INT: int = (1 << 31) - 1 + - __MAX_UINT: uint = (1 << 32) - 1 + -- + - __init__(self): void + - create_int(self, amount_digits: int = 10, random_creation: bool = True): int + - create_float(self, amount_digits: int = 10, random_creation: bool = True): float + - create_string_only_letters(self, amount_chars: int): int + - create_string_special_characters(self, amount_chars: int): str +} +''''''''''''''''''''''''''''''''''''''' +abstract class "Runner" as runner << abstract class >> #LightGreen { + -- + - __init__(self): void + + run(): int +} +''''''''''''''''''''''''''''''''''''''' +class "ParamRunner" as PRunner << class >> #LightGreen { + - __logger + -- + - __init__(self): void + + run(self, function: function, param_set: list): dict + + limit_param_set(self, param_set: list, runs: int): list +} +''''''''''''''''''''''''''''''''''''''' +class "GeneratorRunner" as GRunner << class >> #LightGreen { + - _logger: logging.logger + -- + - __init__(self): None + + run(self, cls: Type, sequences: List[List[Tuple[str, Tuple[Any]]]]): List[int] +} +''''''''''''''''''''''''''''''''''''''' +'Testcases''''''''''''''''''''''''''''' +''''''''''''''''''''''''''''''''''''''' +entity "test_vp_on_helpers.py" as test_VP_H << test cases >> #LightGray { + logger + value_pool_fuzzer: ValuePoolFuzzer + value_pool_fuzzer: ValuePoolFuzzer + param_runner: ParamRunner + -- + test_map_range(): None + test_hass_to_lox(): None + test_lox_to_hass(): None + test_lox2lox_mapped(): None + test_lox2hass_mapped(): None + test_to_hass_color_temp(): None + test_to_loxone_color_temp(): None + test_get_room_name_from_room_uuid(): None + test_get_cat_name_from_cat_uuid(): None + test_add_room_and_cat_to_value_values(): None + test_get_miniserver_type(): None + test_get_all(): None +} +''''''''''''''''''''''''''''''''''''''' +entity "test_gen_on_pyLoxone.py" as test_GEN_pyLoxone << test cases >> #LightGray { + logger + value_pool_fuzzer: ValuePoolFuzzer + generator_fuzzer: GeneratorFuzzer + generator_runner: GeneratorRunner + -- + test_api_LxToken(): None +} +''''''''''''''''''''''''''''''''''''''' +entity "test_gen_on_dummy.py" as test_GEN_dummy << test cases >> #LightGray { + logger + value_pool_fuzzer: ValuePoolFuzzer + generator_fuzzer: GeneratorFuzzer + generator_runner: GeneratorRunner + -- + test_DummyClass(): None +} +''''''''''''''''''''''''''''''''''''''' +entity "test_mut_on_helpers.py" as test_MUT << test cases >> #LightGray { + logger + mutational_fuzzer: MutationalFuzzer + grammar_fuzzer: GrammarFuzzer + param_runner: ParamRunner + -- + test_map_range(): None + test_hass_to_lox(): None + test_lox_to_hass(): None + test_lox2lox_mapped(): None + test_lox2hass_mapped(): None + test_to_hass_color_temp(): None + test_to_loxone_color_temp(): None + test_get_room_name_from_room_uuid(): None + test_get_cat_name_from_cat_uuid(): None + test_add_room_and_cat_to_value_values(): None + test_get_miniserver_type(): None + test_get_all(): None +} +''''''''''''''''''''''''''''''''''''''' +entity "test_grey_box_on_helpers.py" as test_GBox << test case >> #LightGray { + logger + grey_box_fuzzer = GreyBoxFuzzer + grey_box_runner = GreyBoxRunner + -- + test_map_range(): None + test_hass_to_lox(): None + test_lox_to_hass(): None + test_lox2lox_mapped(): None + test_lox2hass_mapped(): None + test_to_hass_color_temp(): None + test_to_loxone_color_temp(): None + test_get_miniserver_type(): None +} +''''''''''''''''''''''''''''''''''''''' +'Fuzzer'''''''''''''''''''''''''''''''' +''''''''''''''''''''''''''''''''''''''' +fuzzer <|-- VPFuzzer: inherits from < +VPool "1"<--* VPFuzzer: has access to < +GrFuzzer "1"<--* VPFuzzer: has access to < +gr_pool "1"<-- VPFuzzer: imports < +fuzzer <|-- GFuzzer: inherits from < +fuzzer <|-- MFuzzer: inherits from < +fuzzer <|-- GBFuzzer: inherits from < +GBFuzzer o-- DataTypeCreator: aggregates > +GBFuzzer --> Seed: uses > +''''''''''''''''''''''''''''''''''''''' +'Runner'''''''''''''''''''''''''''''''' +''''''''''''''''''''''''''''''''''''''' +runner <|-- PRunner: inherits from < +runner <|-- GBRunner: inherits from < +runner <|-- GRunner: inherits from < +GBRunner o-- SeedManager: aggregates > +GBRunner --> Seed: uses > +''''''''''''''''''''''''''''''''''''''' +'Other classes''''''''''''''''''''''''' +''''''''''''''''''''''''''''''''''''''' +SeedManager --> Seed: uses > +DataTypeCreator --> Seed: uses > +''''''''''''''''''''''''''''''''''''''' +'Testcases''''''''''''''''''''''''''''' +''''''''''''''''''''''''''''''''''''''' +VPFuzzer "1"<-- test_VP_H: imports < +PRunner "1"<-- test_VP_H: imports < +''''''''''''''''''''''''''''''''''''''' +CGType "1"<--* GrFuzzer: needs a < +''''''''''''''''''''''''''''''''''''''' +GFuzzer "1"<--* test_GEN_dummy: needs a < +GRunner "1"<--* test_GEN_dummy: needs a < +VPFuzzer "1"<--* test_GEN_dummy: needs a < +GFuzzer "1"<--* test_GEN_pyLoxone: needs a < +GRunner "1"<--* test_GEN_pyLoxone: needs a < +VPFuzzer "1"<--* test_GEN_pyLoxone: needs a < +''''''''''''''''''''''''''''''''''''''' +VPFuzzer "1"<-- test_MUT: imports < +MFuzzer "1"<-- test_MUT: imports < +GrFuzzer "1"<-- test_MUT: imports < +PRunner "1"<-- test_MUT: imports < +GBFuzzer "1"<--* test_GBox: needs a < +GBRunner "1"<--* test_GBox: needs a < +@enduml \ No newline at end of file diff --git a/custom_components/test/fuzzing/fuzzer_overview.svg b/custom_components/test/fuzzing/fuzzer_overview.svg new file mode 100644 index 00000000..0db4dfe7 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_overview.svg @@ -0,0 +1 @@ +«abstract class»Fuzzer__init__(self): voidfuzz(): list«class»ValuePoolFuzzer__logger__value_pool: ValuePool__grammar_fuzzer: GrammarFuzzer__init__(self): void__get_fuzzing_pool(self, value_pools: list[list], param_combi: int): list[list]fuzz(self, types: list, param_combi: int): list[list]«class»ValuePool__UINT_POOL: list__INT_POOL: list__FLOAT_POOL: list__STRING_POOL: list__BOOL_POOL: list__BYTE_POOL: list__LIST_POOL: list__DICT_POOL: list__DATE_POOL: list__ALL_VALUES_POOL: list__init__(self): voidget_uint(self): listget_int(self): listget_float(self): listget_int(self): listget_uint(self): listget_float(self): listget_string(self): listget_bool(self): listget_byte(self): listget_list(self): listget_dict(self): listget_date(self): listget_all_values(self): list«class»GeneratorFuzzer_value_pool_fuzzer: ValuePoolFuzzer_param_types: defaultdict_param_types_file: str_mode: int__init__(self, value_pool_fuzzer: ValuePoolFuzzer, mode: int = 1): None_load_param_types(self): None_save_param_types(self): None_assign_param_types(self, class_name: str, method: Callable): Dict[int, str]_get_param_types(self, class_name: str, method: Callable): Dict[int, str]_generate_method_sequence(self, cls: Type, start_methods: List[str], max_sequence_length: int): List[str]_to_hashable_with_marker(self, data: Any): Union[Tuple[str, frozenset], Any]_to_original_from_marker(self, data_with_marker: Union[Tuple[str, frozenset], Any]): Anyfuzz(self, cls: Type, start_methods: List[str], max_sequence_length: int, num_sequences: int, max_param_combi: int = 2): List[List[Tuple[str, Tuple[Any]]]]«class»CostGrammarTypeMIN: intMAX: int«class»GrammarFuzzer_NON_TERMINAL_REGEX: str__init__(self): void__convert_to_cost_grammar(self, grammar: Grammar, conversion_type: CostGrammarType): tuple__convert_to_trackable_grammar(self, grammar: Grammar): tuple__compose_min_cost(self, head: Element, given_cost_grammar: Annotated_Grammar): str__compose_max_cost(self, head: Element, given_cost_grammar: Annotated_Grammar, applications: int, max_applications: int): str__is_grammar_covered(self, trackable_grammar: Annotated_Grammar, trackable_non_terminals: Annotated_Non_Terminals): boolfuzz_min_cost(self, grammar: Grammar, start_symbol: Element): strfuzz_max_cost( self, grammar: Grammar, start_symbol: Element, max_rule_applications: int): strfuzz_grammar_coverage(self, grammar: Grammar, start_symbol: Element): list«grammars»grammar_poolgrammar_ipv4: Grammargrammar_controls_json: Grammargrammar_loxconfig_rooms_cats_json: Grammar«class»MutationalFuzzer__logger__multiplier: list[int]__init__(self): void__delete_random_char(self, s: str): str__insert_random_char(self, s: str): str__flip_random_char(self, s: str): str__get_random_float(self): float__check_inf(self, number: float): float__add_random_number(self, number: float): float__sub_random_number(self, number: float): float__mult_random_number(self, number: float): float__div_random_number(self, number: float): floatfuzz(self, seed: list, rounds: int = 1): list[list]fuzz_failed(self, seed: dict, rounds: int = 1): list[list]«class»GreyBoxFuzzer__RANGE_RANDOM_INT: int = 9__RANGE_RANDOM_STRING: int = 100__data_type_creator: DataTypeCreator__init__(self): voidfuzz(self, seed_template: list, seed_specification: list = None, amount_seeds: int = 100): list[Seed]«class»GreyBoxRunner__seed_manager = SeedManager()__mutator = Mutator()__init__(self): voidrun(self, function: Callable, seed_population: List[Seed], amount_runs: int = 10000): list__hash_md5(self, branch_covered: str): str__store_hashed_branch(self, hashed_branch: str): void__mutate(self, seed: Seed): void«class»SeedManager__power_energy: int = 2__init__(self): voidselect_seed(self, seed_population: List[Seed]): Seedadjust_energy(self, seed: Seed, branch_dict: dict, hashed_branch: str): voidget_normalized_energy(self, seed_population: List[Seed]): list«class»Seedenergy: int = 0seed_values: list = []__init__(self, energy: int = 0, seed_values: list = []): void«class»DataTypeCreator__MAX_INT: int = (1 << 31) - 1__MAX_UINT: uint = (1 << 32) - 1__init__(self): voidcreate_int(self, amount_digits: int = 10, random_creation: bool = True): intcreate_float(self, amount_digits: int = 10, random_creation: bool = True): floatcreate_string_only_letters(self, amount_chars: int): intcreate_string_special_characters(self, amount_chars: int): str«abstract class»Runner__init__(self): voidrun(): int«class»ParamRunner__logger__init__(self): voidrun(self, function: function, param_set: list): dictlimit_param_set(self, param_set: list, runs: int): list«class»GeneratorRunner_logger: logging.logger__init__(self): Nonerun(self, cls: Type, sequences: List[List[Tuple[str, Tuple[Any]]]]): List[int]«test cases»test_vp_on_helpers.pyloggervalue_pool_fuzzer: ValuePoolFuzzervalue_pool_fuzzer: ValuePoolFuzzerparam_runner: ParamRunnertest_map_range(): Nonetest_hass_to_lox(): Nonetest_lox_to_hass(): Nonetest_lox2lox_mapped(): Nonetest_lox2hass_mapped(): Nonetest_to_hass_color_temp(): Nonetest_to_loxone_color_temp(): Nonetest_get_room_name_from_room_uuid(): Nonetest_get_cat_name_from_cat_uuid(): Nonetest_add_room_and_cat_to_value_values(): Nonetest_get_miniserver_type(): Nonetest_get_all(): None«test cases»test_gen_on_pyLoxone.pyloggervalue_pool_fuzzer: ValuePoolFuzzergenerator_fuzzer: GeneratorFuzzergenerator_runner: GeneratorRunnertest_api_LxToken(): None«test cases»test_gen_on_dummy.pyloggervalue_pool_fuzzer: ValuePoolFuzzergenerator_fuzzer: GeneratorFuzzergenerator_runner: GeneratorRunnertest_DummyClass(): None«test cases»test_mut_on_helpers.pyloggermutational_fuzzer: MutationalFuzzergrammar_fuzzer: GrammarFuzzerparam_runner: ParamRunnertest_map_range(): Nonetest_hass_to_lox(): Nonetest_lox_to_hass(): Nonetest_lox2lox_mapped(): Nonetest_lox2hass_mapped(): Nonetest_to_hass_color_temp(): Nonetest_to_loxone_color_temp(): Nonetest_get_room_name_from_room_uuid(): Nonetest_get_cat_name_from_cat_uuid(): Nonetest_add_room_and_cat_to_value_values(): Nonetest_get_miniserver_type(): Nonetest_get_all(): None«test case»test_grey_box_on_helpers.pyloggergrey_box_fuzzer = GreyBoxFuzzergrey_box_runner = GreyBoxRunnertest_map_range(): Nonetest_hass_to_lox(): Nonetest_lox_to_hass(): Nonetest_lox2lox_mapped(): Nonetest_lox2hass_mapped(): Nonetest_to_hass_color_temp(): Nonetest_to_loxone_color_temp(): Nonetest_get_miniserver_type(): Noneinherits fromhas access to1has access to1imports1inherits frominherits frominherits fromaggregatesusesinherits frominherits frominherits fromaggregatesusesusesusesimports1imports1needs a1needs a1needs a1needs a1needs a1needs a1needs a1imports1imports1imports1imports1needs a1needs a1 \ No newline at end of file diff --git a/custom_components/test/fuzzing/fuzzer_utils/Fuzzer.py b/custom_components/test/fuzzing/fuzzer_utils/Fuzzer.py new file mode 100644 index 00000000..0d2ebf68 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/Fuzzer.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + + +class Fuzzer(ABC): + """Abstract class""" + + @abstractmethod + def __init__(self): + """Abstract method, must be overloaded by the corresponding fuzzer.""" + pass + + @abstractmethod + def fuzz(self) -> list: + """Abstract method, must be overloaded by the corresponding fuzzer.""" + pass diff --git a/custom_components/test/fuzzing/fuzzer_utils/GeneratorFuzzer.py b/custom_components/test/fuzzing/fuzzer_utils/GeneratorFuzzer.py new file mode 100644 index 00000000..30d8bea9 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/GeneratorFuzzer.py @@ -0,0 +1,287 @@ +import csv +import random +import inspect +import logging +from collections import defaultdict +from typing import Dict, List, Set, Tuple, Type, Callable, Any, Union +from custom_components.test.fuzzing.fuzzer_utils.Fuzzer import Fuzzer +from custom_components.test.fuzzing.fuzzer_utils import ValuePoolFuzzer + +class GeneratorFuzzer(Fuzzer): + def __init__(self, value_pool_fuzzer: ValuePoolFuzzer, mode: int = 1): + """ + Initialize the GeneratorFuzzer. + + :param value_pool_fuzzer: Instance of ValuePoolFuzzer to generate parameter values. + :type value_pool_fuzzer: ValuePoolFuzzer + :param mode: Operating mode (1: use CSV, 2: no CSV but use specified types, 3: use only random types). + :type mode: int + """ + self._value_pool_fuzzer: ValuePoolFuzzer = value_pool_fuzzer + self._param_types: defaultdict = defaultdict(lambda: defaultdict(dict)) + self._param_types_file: str = "./custom_components/test/fuzzing/generators/param_types.csv" + self._mode: int = mode + if self._mode == 1: + self._load_param_types() + + def _load_param_types(self) -> None: + """ + Load parameter types from a CSV file into the _param_types dictionary. + """ + try: + with open(self._param_types_file, mode='r') as file: + reader = csv.reader(file) + for row in reader: + row: List[str] + class_name: str + method_name: str + param_index: str + param_type: str + class_name, method_name, param_index, param_type = row + self._param_types[class_name][method_name][int(param_index)] = param_type + except FileNotFoundError: + pass + + def _save_param_types(self) -> None: + """ + Save the _param_types dictionary to a CSV file. + """ + with open(self._param_types_file, mode='w') as file: + writer = csv.writer(file) + for class_name, methods in self._param_types.items(): + class_name: str + methods: defaultdict + for method_name, params in methods.items(): + method_name: str + params: dict + for param_index, param_type in params.items(): + param_index: int + param_type: str + writer.writerow([class_name, method_name, param_index, param_type]) + + def _assign_param_types(self, class_name: str, method: Callable) -> Dict[int, str]: + """ + Assign types to parameters from the method signature. If not specified, the parameter type will be randomly assigned. + + :param class_name: Name of the class containing the method. + :type class_name: str + :param method: The method for which parameter types are being assigned. + :type method: Callable + :return: Dictionary of parameter indices and their assigned types. + :rtype: Dict[int, str] + """ + param_types: Dict[int, str] = {} + # recognized types without string as string needs special handling + recognized_types = {'int', 'uint', 'float', 'bool', 'byte', 'list', 'dict', 'date'} + # Get the parameters of the method + parameters = inspect.signature(method).parameters.items() + + # Filter out the "self" parameter + filtered_parameters: List[Tuple[str, inspect.Parameter]] = [ + (param_name, param) for param_name, param in parameters if param_name != "self" + ] + + # Assign the param type for each parameter according to the mode we are using + for i, (param_name, param) in enumerate(filtered_parameters): + i: int + param_name: str + param: inspect.Parameter + assigned_param_type: str + if param.annotation == param.empty or self._mode == 3: + assigned_param_type = random.choice( + ['INT', 'UINT', 'FLOAT', 'STRING', 'BOOL', 'BYTE', 'LIST', 'DICT', 'DATE'] + ) + logging.warning( + f"Randomly assigned type '{assigned_param_type}' for parameter {i+1} ({param_name}) in {class_name}.{method.__name__}" + ) + elif isinstance(param.annotation, str): + if param.annotation == 'str': + assigned_param_type = 'STRING' + elif param.annotation in recognized_types: + assigned_param_type = param.annotation.upper() + else: + assigned_param_type = random.choice( + ['INT', 'UINT', 'FLOAT', 'STRING', 'BOOL', 'BYTE', 'LIST', 'DICT', 'DATE'] + ) + logging.warning( + f"Unknown parameter type '{param.annotation}' of parameter {i+1} ({param_name}) in {class_name}.{method.__name__}. Randomly assigned '{assigned_param_type}'" + ) + elif isinstance(param.annotation, type): + if param.annotation.__name__ == 'str': + assigned_param_type = 'STRING' + elif param.annotation.__name__ in recognized_types: + assigned_param_type = param.annotation.__name__.upper() + else: + assigned_param_type = random.choice( + ['INT', 'UINT', 'FLOAT', 'STRING', 'BOOL', 'BYTE', 'LIST', 'DICT', 'DATE'] + ) + logging.warning( + f"Unknown parameter type '{param.annotation}' of parameter {i+1} ({param_name}) in {class_name}.{method.__name__}. Randomly assigned '{assigned_param_type}'" + ) + else: + logging.error( + f"Unhandeled way of declaring parameter type of parameter {i+1} ({param_name}) in {class_name}.{method.__name__}." + ) + param_types[i] = assigned_param_type + if self._mode == 1: + self._param_types[class_name][method.__name__] = param_types + self._save_param_types() + return param_types + + + def _get_param_types(self, class_name: str, method: Callable) -> Dict[int, str]: + """ + Get the parameter types for a method based on the operating mode. + + :param class_name: Name of the class containing the method. + :type class_name: str + :param method: The method for which parameter types are being retrieved. + :type method: Callable + :return: Dictionary of parameter indices and their types. + :rtype: Dict[int, str] + """ + if ( + self._mode == 1 + and class_name in self._param_types + and method.__name__ in self._param_types[class_name] + ): + return self._param_types[class_name][method.__name__] + return self._assign_param_types(class_name, method) + + def _generate_method_sequence( + self, cls: Type, start_methods: List[str], max_sequence_length: int + ) -> List[str]: + """ + Generate a random method sequence with a length of up to max_sequence_length, starting with one of the start methods. + + :param cls: The class for which the method sequence is being generated. + :type cls: Type + :param start_methods: List of method names that can start the sequence. + :type start_methods: List[str] + :param max_sequence_length: The maximum length of the method sequence. + :type max_sequence_length: int + :return: List of method names forming the sequence. + :rtype: List[str] + """ + methods: List[str] = [ + method + for method in dir(cls) + if callable(getattr(cls, method)) and not method.startswith("__") + ] + start_method: str = random.choice(start_methods) + sequence: List[str] = [start_method] + for _ in range(random.randint(1, max_sequence_length - 1)): + next_method: str = random.choice(methods) + sequence.append(next_method) + return sequence + + def _to_hashable_with_marker(self, data: Any) -> Union[Tuple[str, frozenset], Any]: + """ + Convert a list or a dictionary to a hashable form (frozenset) with a type marker, or return other data types unchanged. + + :param data: The data to be converted to a frozenset form. + :type data: Any + :return: A tuple containing the type marker and the frozenset if 'data' is a list or a dictionary, otherwise 'data' itself. + :rtype: Union[Tuple[str, frozenset], Any] + """ + if isinstance(data, list): + return ('list', frozenset(data)) + elif isinstance(data, dict): + return ('dict', frozenset(data.items())) + else: + # If data is neither a list nor a dict, return it as is + return data + + def _to_original_from_marker( + self, data_with_marker: Union[Tuple[str, frozenset], Any] + ) -> Any: + """ + Convert data back to its original form from a hashable form (frozenset) with a type marker, or return other data types unchanged. + + :param data_with_marker: The data to be converted back to its original form. + :type data_with_marker: Union[Tuple[str, frozenset], Any] + :return: The original data form if 'data_with_marker' had a type marker, otherwise 'data_with_marker' itself. + :rtype: Any + """ + # Check if data_with_marker is a tuple and has a type marker + if isinstance(data_with_marker, tuple) and data_with_marker[0] in ('list', 'dict'): + data_type: str + data: frozenset + data_type, data = data_with_marker + if data_type == 'list': + return list(data) + elif data_type == 'dict': + return dict(data) + else: + # If there is no type marker, return the data as is + return data_with_marker + + def fuzz( + self, + cls: Type, + start_methods: List[str], + max_sequence_length: int, + num_sequences: int, + max_param_combi: int = 2 + ) -> List[List[Tuple[str, Tuple[Any]]]]: + """ + Generate unique method sequences with parameters. A sequence is unique if either the methods are chained differently or any value of any parameter differs from an otherwise identical sequence. + + :param cls: The class for which the method sequences are being generated. + :type cls: Type + :param start_methods: List of method names that can start the sequence. + :type start_methods: List[str] + :param max_sequence_length: The maximum length of the method sequences. + :type max_sequence_length: int + :param num_sequences: The number of unique sequences to generate. + :type num_sequences: int + :param max_param_combi: Maximum number of parameter combinations. If a method takes less parameters the combinations will be decreased accordingly. + :type param_combi: int + :return: List of unique method sequences with parameters. + :rtype: List[List[Tuple[str, Tuple[Any]]]] + example return: + [ + [('method1', (param1, param2)), ('method2', (param3, param4))], + [('method3', (param5)), ('method4', (param6, param7))] + ] + """ + existing_sequences: Set[Tuple[str, Tuple[Any]]] = set() + while len(existing_sequences) < num_sequences: + sequence: List[str] = self._generate_method_sequence(cls, start_methods, max_sequence_length) + param_sequences: List[Tuple[str, Tuple[Any]]] = [] + for method_name in sequence: + method_name: str + param_combi: int = max_param_combi + method: Callable = getattr(cls, method_name) + param_types: Dict[int, str] = self._get_param_types(cls.__name__, method) + if len(param_types) != 0: + # make sure we don't request more combinations than parameters available + if max_param_combi > len(param_types): + param_combi = len(param_types) + # Generate parameter sets using the value pool fuzzer + param_set: List[Tuple[Any]] = self._value_pool_fuzzer.fuzz( + types=[param_types[i] for i in range(len(param_types))], param_combi=param_combi + ) + # Select a random parameter set from the generated combinations + random_param_set: Tuple[Any] = random.choice(param_set) + param_sequences.append((method_name, random_param_set)) + else: + # don't generate any param_sets if we don't need any - would raise error when calling _value_pool_fuzzer.fuzz + param_sequences.append((method_name, {})) # empty dict to indicate no parameters needed + # convert sequence list to tuple to make it hashable to be able to add it to existing_sequences set + sequence_tuple: Tuple[Tuple[str, Tuple[Union[Tuple[str, frozenset], Any]]]] = tuple( + ( + method_name, + tuple(self._to_hashable_with_marker(param) for param in params) + ) + for method_name, params in param_sequences + ) + existing_sequences.add(sequence_tuple) # add the sequence_tuple to set to make sure we only have unique sequences + # Return the sequences but revert the types back to their original + return [ + [ + (method_name, tuple(self._to_original_from_marker(param) for param in params)) + for method_name, params in seq_tuple + ] + for seq_tuple in existing_sequences + ] diff --git a/custom_components/test/fuzzing/fuzzer_utils/GeneratorRunner.py b/custom_components/test/fuzzing/fuzzer_utils/GeneratorRunner.py new file mode 100644 index 00000000..b5dc5c61 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/GeneratorRunner.py @@ -0,0 +1,59 @@ +import logging +from typing import List, Tuple, Type, Any, Callable + +from custom_components.test.fuzzing.fuzzer_utils.Runner import Runner + +class GeneratorRunner(Runner): + """Generator runner class, inherits from the abstract runner class.""" + + def __init__(self): + """Constructor for GeneratorRunner.""" + self._logger = logging.getLogger(__name__) + + def run(self, cls: Type, sequences: List[List[Tuple[str, Tuple[Any]]]]) -> List[int]: + """ + Run the generated sequences on the given class. + + :param cls: The class on which the sequences will be run. + :type cls: Type + :param sequences: List of method sequences with parameters to be executed. + :type sequences: List[List[Tuple[str, Tuple[Any]]]] + :return: List containing the number of passed and failed tests. + :rtype: List[int] + """ + num_passed_tests: int = 0 + num_failed_tests: int = 0 + + for sequence in sequences: + sequence: List[Tuple[str, Tuple[Any]]] + failed_test: bool = False + err: Exception = None + method_name: str + param_set: Tuple[Any] + method_name, param_set = sequence[0] + if method_name == '__init__': + instance = cls(*param_set) # Initialize the Class with the intended parameters for __init__ + sequence = sequence[1:] # Remove __init__ method from the sequence as we have already called it + else: + instance = cls() # Create an instance of the class where the __init__ method does not require parameters + for method_name, param_set in sequence: + method: Callable = getattr(instance, method_name) + num_params: int = len(param_set) + self._logger.info(f"Running {method_name} with {num_params} parameters") + + try: + method(*param_set) # Run the method with the given parameters + except Exception as e: + e: Exception + err = e + failed_test = True + break + + if not failed_test: + self._logger.info(f"Test passed with sequence: {sequence}") + num_passed_tests += 1 + else: + self._logger.error(f"Test failed with sequence: {sequence}. Exception: {err}") + num_failed_tests += 1 + + return [num_passed_tests, num_failed_tests] diff --git a/custom_components/test/fuzzing/fuzzer_utils/GrammarFuzzer.py b/custom_components/test/fuzzing/fuzzer_utils/GrammarFuzzer.py new file mode 100644 index 00000000..fed278d4 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/GrammarFuzzer.py @@ -0,0 +1,288 @@ +import math +from typing import Dict, List, Tuple +import re +from enum import Enum +from random import sample + +Element = str +Grammar = Dict[Element, List[Element]] + +Annotated_Element = Tuple[Element, int] +Annotated_Non_Terminals = Dict[Element, int] +Annotated_Grammar = Dict[Element, List[Annotated_Element]] + + +class CostGrammarType(Enum): + MIN = 1 + MAX = 2 + + +class GrammarFuzzer: + _NON_TERMINAL_REGEX = re.compile(r"<.*?>") + + def __init__(self) -> None: + """constructor""" + pass + + def __convert_to_cost_grammar( + self, grammar: Grammar, conversion_type: CostGrammarType + ) -> Tuple[Annotated_Grammar, Annotated_Non_Terminals]: + """Converts a common grammar to an annotated cost grammar""" + cost_grammar: Annotated_Grammar = {} + annotated_non_terminals: Annotated_Non_Terminals = {} + + def convert_rule(head: Element, body: List[Element]) -> None: + """Recursively converts a rule to an annotated cost rule""" + if head in annotated_non_terminals: + return + + annotated_elements: List[Annotated_Element] = [] + + is_inf = False + + for element in body: + cost = 0 + non_terminals = re.findall(self._NON_TERMINAL_REGEX, element) + + if head in non_terminals: + is_inf = True + annotated_non_terminals[head] = math.inf + + for non_terminal in non_terminals: + if non_terminal not in annotated_non_terminals: + convert_rule(non_terminal, grammar[non_terminal]) + cost += annotated_non_terminals[non_terminal] + + annotated_elements.append((element, cost)) + + cost_grammar[head] = annotated_elements + + if not is_inf: + annotated_non_terminals[head] = ( + min(annotated_elements, key=lambda x: x[1])[1] + 1 + if conversion_type == CostGrammarType.MIN + else max(annotated_elements, key=lambda x: x[1])[1] + 1 + ) + + for key, value in grammar.items(): + convert_rule(key, value) + + return cost_grammar, annotated_non_terminals + + def __convert_to_trackable_grammar( + self, grammar: Grammar + ) -> Tuple[Annotated_Grammar, Annotated_Non_Terminals]: + """Converts a common grammar to an annotated trackable grammar""" + trackable_grammar: Annotated_Grammar = {} + trackable_non_terminals: Annotated_Non_Terminals = {} + + def convert_rule(head: Element, body: List[Element]) -> None: + """Recursively converts a rule to an annotated trackable rule""" + if head in trackable_non_terminals: + return + + trackable_elements: List[Annotated_Element] = [] + + for element in body: + non_terminals = re.findall(self._NON_TERMINAL_REGEX, element) + + for non_terminal in non_terminals: + if non_terminal not in trackable_non_terminals: + trackable_non_terminals[head] = 0 + + trackable_elements.append((element, 0)) + + trackable_grammar[head] = trackable_elements + + for key, value in grammar.items(): + convert_rule(key, value) + + return trackable_grammar, trackable_non_terminals + + def __compose_min_cost( + self, head: Element, given_cost_grammar: Annotated_Grammar + ) -> str: + """Derives the first minimum cost value of a given annotated grammar.""" + min_tuple: Annotated_Element = min(given_cost_grammar[head], key=lambda x: x[1]) + is_non_terminal = ( + True if re.findall(self._NON_TERMINAL_REGEX, min_tuple[0]) else False + ) + + if not is_non_terminal: + return min_tuple[0] + else: + non_terminals = re.findall(self._NON_TERMINAL_REGEX, min_tuple[0]) + replacements = iter( + list( + map( + lambda element: self.__compose_min_cost( + element, given_cost_grammar + ), + non_terminals, + ) + ) + ) + result = re.sub( + self._NON_TERMINAL_REGEX, + lambda element: next(replacements), + min_tuple[0], + ) + return result + + def fuzz_min_cost(self, grammar: Grammar, start_symbol: Element) -> str: + """Derives the first minimum cost value of a given grammar.""" + cost_grammar: Annotated_Grammar + cost_grammar, _ = self.__convert_to_cost_grammar(grammar, CostGrammarType.MIN) + + return self.__compose_min_cost(start_symbol, cost_grammar) + + def __compose_max_cost( + self, + head: Element, + given_cost_grammar: Annotated_Grammar, + applications: int, + max_applications: int, + ) -> str: + """Derives a maximum cost value of a given grammar.""" + + if applications == max_applications: + min_tuple: Annotated_Element = min( + sample(given_cost_grammar[head], len(given_cost_grammar[head])), key=lambda x: x[1] + ) + is_non_terminal = ( + True if re.findall(self._NON_TERMINAL_REGEX, min_tuple[0]) else False + ) + + if not is_non_terminal: + return min_tuple[0] + else: + non_terminals = re.findall(self._NON_TERMINAL_REGEX, min_tuple[0]) + replacements = iter( + list( + map( + lambda element: self.__compose_max_cost( + element, + given_cost_grammar, + applications, + max_applications, + ), + non_terminals, + ) + ) + ) + result = re.sub( + self._NON_TERMINAL_REGEX, + lambda element: next(replacements), + min_tuple[0], + ) + return result + else: + max_tuple: Annotated_Element = max( + given_cost_grammar[head], key=lambda x: x[1] + ) + is_non_terminal = ( + True if re.findall(self._NON_TERMINAL_REGEX, max_tuple[0]) else False + ) + + if not is_non_terminal: + return max_tuple[0] + else: + non_terminals = re.findall(self._NON_TERMINAL_REGEX, max_tuple[0]) + replacements = iter( + list( + map( + lambda element: self.__compose_max_cost( + element, + given_cost_grammar, + applications + 1, + max_applications, + ), + non_terminals, + ) + ) + ) + result = re.sub( + self._NON_TERMINAL_REGEX, + lambda element: next(replacements), + max_tuple[0], + ) + return result + + def fuzz_max_cost( + self, grammar: Grammar, start_symbol: Element, max_rule_applications: int + ) -> str: + """Derives the first maximum cost value of a given grammar.""" + cost_grammar: Annotated_Grammar + cost_grammar, _ = self.__convert_to_cost_grammar(grammar, CostGrammarType.MAX) + + return self.__compose_max_cost( + start_symbol, cost_grammar, 0, max_rule_applications + ) + + def __is_grammar_covered( + self, + trackable_grammar: Annotated_Grammar, + trackable_non_terminals: Annotated_Non_Terminals, + ) -> bool: + """Checks whether a given grammar is completely covered.""" + for non_terminal in trackable_non_terminals: + if non_terminal[1] == 0: + return False + + for rule in trackable_grammar: + for element in trackable_grammar[rule]: + if element[1] == 0: + return False + + return True + + def fuzz_grammar_coverage( + self, grammar: Grammar, start_symbol: Element + ) -> List[str]: + """Derives values until each production rule is fully covered.""" + trackable_grammar: Annotated_Grammar + trackable_non_terminals: Annotated_Non_Terminals + trackable_grammar, trackable_non_terminals = ( + self.__convert_to_trackable_grammar(grammar) + ) + + fuzzed_values: List[str] = [] + + def generate_value(head: Element): + """Recursively generates values until each production rule is fully covered.""" + min_tuple: Annotated_Element = min( + trackable_grammar[head], key=lambda x: x[1] + ) + is_non_terminal = ( + True if re.findall(self._NON_TERMINAL_REGEX, min_tuple[0]) else False + ) + trackable_grammar[head] = list( + map( + lambda element: ( + element + if element[0] != min_tuple[0] + else (element[0], element[1] + 1) + ), + trackable_grammar[head], + ) + ) + + if not is_non_terminal: + return min_tuple[0] + else: + trackable_non_terminals[head] = min_tuple[1] + 1 + non_terminals = re.findall(self._NON_TERMINAL_REGEX, min_tuple[0]) + replacements = iter( + list(map(lambda element: generate_value(element), non_terminals)) + ) + result = re.sub( + self._NON_TERMINAL_REGEX, + lambda element: next(replacements), + min_tuple[0], + ) + return result + + while not self.__is_grammar_covered(trackable_grammar, trackable_non_terminals): + fuzzed_values.append(generate_value(start_symbol)) + + return fuzzed_values diff --git a/custom_components/test/fuzzing/fuzzer_utils/GreyBoxFuzzer.py b/custom_components/test/fuzzing/fuzzer_utils/GreyBoxFuzzer.py new file mode 100644 index 00000000..c5bffaa3 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/GreyBoxFuzzer.py @@ -0,0 +1,81 @@ +from custom_components.test.fuzzing.fuzzer_utils.Fuzzer import Fuzzer +from custom_components.test.fuzzing.fuzzer_utils.fuzzer_tools.Seed import Seed +from custom_components.test.fuzzing.fuzzer_utils.fuzzer_tools.DataTypeCreator import DataTypeCreator +from typing import List +import random + +class GreyBoxFuzzer(Fuzzer): + """GreyBox fuzzer class, inherits from the abstract fuzzer class.""" + + __RANGE_RANDOM_STRING = 100 + __data_type_creator = DataTypeCreator() + + def __init__(self): + """initialize GreyBoxFuzzer""" + print("Initialize GreyBoxFuzzer") + + def fuzz(self, + seed_template: list, + seed_specification: list = None, + amount_seeds: int = 100) -> List[Seed]: + """Returns a population of seeds with specific values based on the seed template and seed specifiction. + + This function takes two lists 'seed_template' and 'seed_specification' and creates seeds. + The number of seeds is specified by 'amount_seeds'. A list of the random seeds is returned. + + :param seed_template: The seed_template is a list of input types for the function. + The entries must correspond to the valid function parameters of the function to be tested. + Valid inputs are "INT", "FLOAT", "STRING" and "BOOLEAN". + e.g.: ["INT", "FLOAT", "STRING", "BOOLEAN"] + :type seed_template: list + + :param seed_specification: A list that provides the number of digits for each data type in seed_template. + If a random data type is to be initialised anyway, this must be marked with an 'r'. + Defalault value ist random for every data type. + E.g.: [5, 2, 'r'] + :type seed_specification: list + + :param amount_seeds: Amount of seeds which will be created. + :type amount_seeds: int + + :return: Returns a list indicating how many tests were successful and how many failed. + :rtype: list + """ + + # Throw exception if seed_specification and seed_template aren't the same length + if len(seed_template) != len(seed_specification): + raise ValueError("Length of seed_template and seed_specification must be the same length.") + + seed_population = [] + + for _ in range(amount_seeds): + param_set = [] + for seed_spec, data_type in zip(seed_specification, seed_template): + if data_type == "INT": + if seed_spec == 'r': + param_set.append(self.__data_type_creator.create_int(seed_spec,True)) + else: + param_set.append(self.__data_type_creator.create_int(seed_spec,False)) + + elif data_type == "FLOAT": + if seed_spec == 'r': + param_set.append(self.__data_type_creator.create_float(seed_spec,True)) + else: + param_set.append(self.__data_type_creator.create_float(seed_spec,False)) + + elif data_type == "STRING": + if seed_spec == 'r': + seed_spec = random.randint(1, self.__RANGE_RANDOM_STRING) + rand_val = random.randint(0,1) + if rand_val == 0: + param_set.append(self.__data_type_creator.create_string_only_letters(seed_spec)) + elif rand_val == 1: + param_set.append(self.__data_type_creator.create_string_special_characters(seed_spec)) + + elif data_type == "BOOL": + param_set.append(random.choice([True, False])) + + seed = Seed(1, param_set) + seed_population.append(seed) + + return seed_population \ No newline at end of file diff --git a/custom_components/test/fuzzing/fuzzer_utils/GreyBoxRunner.py b/custom_components/test/fuzzing/fuzzer_utils/GreyBoxRunner.py new file mode 100644 index 00000000..5aef74e8 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/GreyBoxRunner.py @@ -0,0 +1,132 @@ +import logging +import inspect +import coverage +import hashlib +import random +from typing import Callable, List + +from custom_components.test.fuzzing.fuzzer_utils.Runner import Runner +from custom_components.test.fuzzing.fuzzer_utils.fuzzer_tools.Seed import Seed, SeedManager +from custom_components.test.fuzzing.fuzzer_utils.MutationalFuzzer import MutationalBlackBoxFuzzer + +class GreyBoxRunner(Runner): + """Greybox runner class, inherits from the abstract runner class.""" + + __logger = None + __seed_manager = None + __mutationalFuzzer = None + path_dict = {} + + def __init__(self): + """constructor""" + self.__logger = logging.getLogger(__name__) + self.__seed_manager = SeedManager() + self.__mutationalFuzzer = MutationalBlackBoxFuzzer() + + def run(self, function: Callable, seed_population: List[Seed], amount_runs: int = 10000) -> list: + """Executes all transferred parameter sets for the transferred function. + + :param function: The passed function which is to be fuzzed. + :type function: Callable + + :param seed_population: A list with seeds. A seed is a set of parameters. + :type seed_population: list + + :param amount_runs: The number of times the function is to be tested. + :param amount_runs: int + + :return: Returns a dict with 2 keys, + the key 'passed_tests' contains the number of passed tests, + the key 'failed_tests' contains the number of failed tests. + :rtype: dict + """ + path_dict = {} + path_counter = 0 + + sig = inspect.signature(function) + num_params = len(sig.parameters) + self.__logger.debug(f"The given functions needs {str(num_params)} parameters") + + test_results = { + "passed_tests": 0, + "failed_tests": 0, + } + + for generation in range(0, amount_runs): + # get seed for the test + seed = self.__seed_manager.select_seed(seed_population) + + # Mutate seed values + self.__mutate(seed) + + cov = coverage.Coverage(branch=True) + cov.start() + try: + function(*seed.seed_values) + cov.stop() + cov.save() + test_results["passed_tests"] += 1 + self.__logger.debug(f"Test {generation} passed with parameters: {seed.seed_values}") + except Exception as e: + cov.stop() + cov.save() + test_results["failed_tests"] += 1 + self.__logger.error(f"Test {generation} failed with parameters: {seed.seed_values}.") + self.__logger.error(f"Exception: {e}") + + # check path coverage + data = cov.get_data() + filename = next(iter(data.measured_files())) + path_covered = data.arcs(filename) + + # Create hash of path + hashed_path = self.__hash_md5(str(path_covered)) + + # Check if a new path was covered + if hashed_path not in self.path_dict: + self.__logger.debug(f"Newly covered pathes: {path_covered}") + seed_population.append(seed) + path_counter += 1 + + + # store hash in path_dict + path_dict = self.__store_hashed_path(hashed_path, path_dict) + + # Adjust energy of seed + self.__seed_manager.adjust_energy(seed, self.path_dict, hashed_path) + + self.__logger.debug("\n##### Covert pathes #####\n") + self.__logger.debug(f"In total there were {path_counter} pathes discovered") + + return test_results + + def __hash_md5(self, path_covered: str) -> str: + md5_hash = hashlib.md5() + md5_hash.update(path_covered.encode('utf-8')) + return md5_hash.hexdigest() + + def __store_hashed_path(self, hashed_path: str, path_dict: dict) -> dict: + if hashed_path in self.path_dict: + self.path_dict[hashed_path] += 1 + else: + self.path_dict[hashed_path] = 1 + + return path_dict + + def __mutate(self, seed: Seed): + """Mutates one of the seed values. + + This function takes a seed and mutates one of the seed values of it. + + :param seed: A seed consists of a list of seed_values. + :type seed: Seed + """ + amount_values = len(seed.seed_values) + random_index = random.choice(range(0,amount_values)) + seed_value = seed.seed_values[random_index] + seed.seed_values[random_index] = self.__mutationalFuzzer.fuzz([seed_value],2)[1][0] + + + + + diff --git a/custom_components/test/fuzzing/fuzzer_utils/MutationalFuzzer.py b/custom_components/test/fuzzing/fuzzer_utils/MutationalFuzzer.py new file mode 100644 index 00000000..d7beb87b --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/MutationalFuzzer.py @@ -0,0 +1,395 @@ +import logging +import random +import math + +from custom_components.test.fuzzing.fuzzer_utils.Fuzzer import Fuzzer + + +class MutationalBlackBoxFuzzer(Fuzzer): + """Mutational fuzzer class, inherits from the abstract fuzzer class.""" + + __logger = None + __multiplier: list[int] = [] + + def __init__(self): + """Constructor get the logger.""" + self.__logger = logging.getLogger(__name__) + + i: int = 10 + while i <= 10000000000000000: + self.__multiplier.append(i) + i *= 10 + + def __delete_random_char(self, string: str) -> str: + """Returns string with a random character deleted. + + This function takes a string `string` as input and returns a new string + with one random character removed from it. + + :param string: Any string from which a character is to be removed. + :type string: str + + :return: Returns the input string `string` with one randomly chosen character deleted. + :rtype: str + """ + if string == "": + # If the string is empty, there's no character to delete, so return the empty string. + return string + + # Generate a random integer position within the range of the string's indices. + pos = random.randint(0, len(string) - 1) + + # Create a new string by excluding the character at the random position. + # This is done by concatenating the substring before the random position and + # the substring after the random position. + return string[:pos] + string[pos + 1 :] + + def __insert_random_char(self, string: str) -> str: + """Returns string with a random character inserted. + + This function takes a string `string` as input and returns a new string + with a random character inserted at a random position within the string. + + :param string: Any string where a character is to be inserted. + :type string: str + + :return: Returns the input string `string` with one randomly chosen character inserted at a random position. + :rtype: str + """ + # Generate a random position within the range of the string's length (including the end of the string). + pos = random.randint(0, len(string)) + + # Generate a random character from the ASCII range 32 to 126 (printable characters). + random_character = chr(random.randrange(32, 127)) + + # Create a new string by inserting the random character at the random position. + # This is done by concatenating the substring before the random position, the random character, + # and the substring after the random position. + return string[:pos] + random_character + string[pos:] + + def __flip_random_char(self, string: str) -> str: + """Returns string with a random bit flipped in a random position. + + This function takes a string `string` as input and returns a new string + where one randomly chosen character has one of its bits flipped + at a random bit position. + + :param string: Any string where a character's bit is to be flipped. + :type string: str + + :return: Returns the input string `string` with one character's bit flipped at a random position. + :rtype: str + """ + if string == "": + # If the string is empty, there's no character to flip, so return the empty string. + return string + + # Generate a random integer position within the range of the string's indices. + pos = random.randint(0, len(string) - 1) + + # Get the character at the randomly chosen position. + c = string[pos] + + # Generate a random bit position between 0 and 6 (since we are assuming 7-bit ASCII characters). + bit = 1 << random.randint(0, 6) + + # Flip the bit at the generated bit position using XOR. + new_c = chr(ord(c) ^ bit) + + # Create a new string by replacing the character at the random position with the new character. + # This is done by concatenating the substring before the random position, the new character, + # and the substring after the random position. + return string[:pos] + new_c + string[pos + 1 :] + + def __get_random_float(self) -> float: + """Returns a random float value modified by a randomly chosen multiplier. + + This function generates a random float value between 0.0 and 1.0, and then + multiplies it by a randomly selected value from the list `self.__multiplier`. + + :return: A random positiv float value. + :rtype: float + """ + # Generate a random float between 0.0 and 1.0. + random_float = random.random() + + # Multiply the random float by a randomly chosen multiplier from the list `self.__multiplier`. + random_float *= random.choice(self.__multiplier) + + # Return the modified random float. + return random_float + + def __check_inf(self, number: float) -> float: + """Checks if the number is infinite and replaces it with a random value if true. + + This function takes a floating-point number `number` as input. If the number is + positive or negative infinity, it replaces the number with a random value between + 0.0 and 1.0. It also logs this replacement. + + :param number: The number to check for infinity. + :type number: float + + :return: Returns the original number if it is not finite; otherwise, returns a random value between 0.0 and 1.0. + :rtype: float + """ + if math.isinf(number): + # If the number is infinite, replace it with a random value between 0.0 and 1.0. + number = random.random() + self.__logger.debug( + "The return value would be - or + INF, set it to a random value between 0.0 and 1.0" + ) + + # Return the potentially modified number. + return number + + def __add_random_number(self, number: float) -> float: + """Returns the input number with a random float added. + + This function takes a floating-point number `number` as input and adds + a random float to it. The random float is obtained from the private method + `__get_random_float`. + + :param number: The number to which a random float will be added. + :type number: float + + :return: Returns the input number `number` with an added random float, + after ensuring the result is not infinite using the `__check_inf` method. + :rtype: float + """ + number += self.__get_random_float() + + # Check if the resulting number is infinite. + return self.__check_inf(number) + + def __sub_random_number(self, number: float) -> float: + """Subtracts a random float from the given number. + + This function takes a float `number` as input and subtracts a randomly + generated float from it. The resulting number is then checked for + infinity values using the `__check_inf` method. + + :param number: The input number from which a random float will be subtracted. + :type number: float + + :return: Returns the resulting number after subtracting a random float and checking for infinity. + :rtype: float + """ + number -= self.__get_random_float() + + # Check if the resulting number is infinite. + return self.__check_inf(number) + + def __mult_random_number(self, number: float) -> float: + """Returns the result of multiplying the input number by a random float. + + This function takes a floating-point number `number` as input and returns + a new floating-point number which is the result of multiplying the input + number by a randomly generated float. It also checks if the result is + infinite. + + :param number: A floating-point number to be multiplied by a random float. + :type number: float + + :return: Returns the input number multiplied by a random float, + after checking if the result is infinite. + :rtype: float + """ + number *= self.__get_random_float() + + # Check if the resulting number is infinite. + return self.__check_inf(number) + + def __div_random_number(self, number: float) -> float: + """Divides the input number by a randomly generated float. + + This function takes a float `number` as input and divides it by + a random float generated by the `__get_random_float` method. It then + returns the result of this division after checking for infinity. + + :param number: The float number to be divided. + :type number: float + + :return: Returns the input number divided by a random float. + :rtype: float + """ + number /= self.__get_random_float() + + # Check if the resulting number is infinite. + return self.__check_inf(number) + + def fuzz( + self, + seed: list, + rounds: int = 1, + ) -> list[list]: + """The function returns a param_set for a ParamRunner. + The seed is changed randomly in any number of rounds (defined by rounds). + + :param seed: The seed is a list of starting values. + The entries must correspond to the valid function parameters of the function to be tested. + For example, if the function under test expects an int, float and a string, a possible seed would be [256, 128.5, "demo"]. + Only int, float or strings are permitted. + :type seed: list with any number of int double or strings + :param rounds: Specifies how many param_set's are to be supplied for the runner. The default is 1. + :type rounds: int + + :return: Returns a list of lists. Each list in the list is a test case for the ParamRunner. + :rtype: list + """ + ### Check types of the input seed ############################################### + for type in seed: + if isinstance(type, int): + self.__logger.debug(str(type) + " is instance of int.") + elif isinstance(type, float): + self.__logger.debug(str(type) + " is instance of float.") + elif isinstance(type, str): + self.__logger.debug(str(type) + " is instance of str.") + else: + self.__logger.error( + str(type) + " is not a instance of int, float or str." + ) + raise TypeError( + str(type) + + " is not a instance of int, float or str." + + " The MutationalFuzzer can only fuzz these types!" + ) + ################################################################################# + + result_list: list[list] = [] + # Add seed as first param set + result_list.append(seed) + self.__logger.debug("Creat new param_set: " + str(seed) + " in round: 0") + + current_round: int = 1 + next_param_set: list = [] + while current_round < rounds: + next_param_set = [] + + last_param_set = result_list[-1] + + for value in last_param_set: + # Check if the last value was a string. + if isinstance(value, str): + # Choose randomly one fuzz function. + random_case = random.randint(0, 2) + match random_case: + case 0: + next_param_set.append(self.__delete_random_char(value)) + case 1: + next_param_set.append(self.__insert_random_char(value)) + case 2: + next_param_set.append(self.__flip_random_char(value)) + case default: + self.__logger.warning( + "The fuzz mode " + + str(random_case) + + " is not specified. Use the __flip_random_char() function" + ) + next_param_set.append(self.__flip_random_char(value)) + + # Check if the last value was a int or float. + elif isinstance(value, int) or isinstance(value, float): + # Choose randomly one fuzz function. + random_case = random.randint(0, 3) + match random_case: + case 0: + # If an int is required, cast the float. + if isinstance(value, int): + next_param_set.append( + int(self.__add_random_number(value)) + ) + + else: + next_param_set.append(self.__add_random_number(value)) + case 1: + # If an int is required, cast the float. + if isinstance(value, int): + next_param_set.append( + int(self.__sub_random_number(value)) + ) + + else: + next_param_set.append(self.__sub_random_number(value)) + case 2: + # If an int is required, cast the float. + if isinstance(value, int): + next_param_set.append( + int(self.__mult_random_number(value)) + ) + + else: + next_param_set.append(self.__mult_random_number(value)) + case 3: + # If an int is required, cast the float. + if isinstance(value, int): + next_param_set.append( + int(self.__div_random_number(value)) + ) + + else: + next_param_set.append(self.__div_random_number(value)) + case default: + self.__logger.warning( + "The fuzz mode " + + str(random_case) + + " is not specified. Use the __int_add_random_() function" + ) + # If an int is required, cast the float. + if isinstance(value, int): + next_param_set.append( + int(self.__add_random_number(value)) + ) + + else: + next_param_set.append(self.__add_random_number(value)) + + else: + self.__logger.error( + str(value) + + " is not a instance of int, float or str." + + " Keep value, value is not longer fuzzed." + ) + next_param_set.append(value) + + self.__logger.debug( + "Creat new param_set: " + + str(next_param_set) + + " in round: " + + str(current_round) + ) + result_list.append(next_param_set) + current_round += 1 + + self.__logger.debug("Generated param_set: " + str(result_list)) + return result_list + + def fuzz_failed( + self, + seed: dict, + rounds: int = 1, + ) -> list[list]: + """The function returns a param_set for a ParamRunner. + The seed is changed randomly in any number of rounds (defined by rounds). + In contrast to the fuzz() function, the result dict of the run() function + of the ParamRunner is passed as a seed. + The param_set that failed in the test case is fuzzed again according to the number defined in the parameter rounds. + + :param seed: The seed is the result of the fuzz() function of the ParamRunner. + :type seed: dict + :param rounds: Specifies how many param_set's are to be supplied for the runner. The default is 1. + :type rounds: int + + :return: Returns a list of lists. Each list in the list is a test case for the ParamRunner. + :rtype: list + """ + # Get failed parameter of the result dict. + failed_params: dict = seed.get("failed_params", {}) + result_list: list[list] = [] + + # call for every failed set the fuzz function. + for param_list in failed_params.values(): + param_sets = self.fuzz(param_list, rounds) + for param_set in param_sets: + result_list.append(param_set) + + return result_list diff --git a/custom_components/test/fuzzing/fuzzer_utils/ParamRunner.py b/custom_components/test/fuzzing/fuzzer_utils/ParamRunner.py new file mode 100644 index 00000000..b04c3d74 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/ParamRunner.py @@ -0,0 +1,117 @@ +import logging +import inspect +import random + +from custom_components.test.fuzzing.fuzzer_utils.Runner import Runner + + +class ParamRunner(Runner): + """Paramter runner class, inherits from the abstract runner class.""" + + __logger = None + + def __init__(self): + """constructor""" + self.__logger = logging.getLogger(__name__) + + def run(self, function, param_set: list) -> dict: + """Executes all transferred parameter sets for the transferred function. + + :param function: The passed function which is to be fuzzed. + :type function: function + :param param_set: The parameter set transferred from the fuzzer. + :type param_set: list + + :return: Returns a dict with 3 keys, + the key 'passed_tests' contains the number of passed tests, + the key 'failed_tests' contains the number of failed tests, + and the last key 'failed_params' contains a dict with every failed param_set (compare example). + :rtype: dict + + .. code-block:: python + { + "passed_tests": 10, + "failed_tests": 1, + "failed_params": { + "1": [1.0,2,"xxx"], + "2": [1.0,223,"demo"] + } + } + """ + + sig = inspect.signature(function) + num_params = len(sig.parameters) + self.__logger.debug(f"The given functions needs {str(num_params)} parameters") + + test_results = { + "passed_tests": 0, + "failed_tests": 0, + "failed_params": {}, + } + + for index, param in enumerate(param_set): + try: + function(*param) + test_results["passed_tests"] += 1 + self.__logger.debug(f"Test {index} passed with parameters: {param}") + except Exception as e: + test_results["failed_tests"] += 1 + test_results["failed_params"][str(index)] = param + self.__logger.error(f"Test {index} failed with parameters: {param}.") + self.__logger.error(f"Exception: {e}") + + if test_results["failed_tests"] > 0: + self.__logger.error( + "Summary: " + + str(test_results["failed_tests"]) + + " of " + + str(test_results["failed_tests"] + test_results["passed_tests"]) + + " param_sets failed for the function " + + str(function.__name__) + ) + else: + self.__logger.info( + "Summary: All " + + str(test_results["passed_tests"]) + + " param_sets passed for the function " + + str(function.__name__) + ) + + return test_results + + def limit_param_set(self, param_set: list, runs: int) -> list: + """Generates a specific selection of an individual value pool. A list of lists is returned with a specified number of elements. + + :param param_set: The value pool as list of lists. + :type param_nr: list + :param runs: Number of elements selected from the value pool. + :type types: int + + :return: A random selection of certain elements from the parameter set. + :rtype: list + """ + + # Validate input parameters + if not isinstance(param_set, list): + self.__logger.error("Param_set must be of type list.") + raise TypeError("Param_set must be of type list.") + if len(param_set) == 0: + self.__logger.error("Length of param_set must be greater then 0.") + raise ValueError("Length of param_set must be greater then 0.") + if not isinstance(runs, int) or runs <= 0: + self.__logger.error( + "Runs must be of type int and greater than 0. Parameter set is returned unchanged." + ) + return param_set + + # Selection of random elements from param_set if the number of runs is smaller than the number of elements in param_set + if runs > len(param_set): + self.__logger.info( + "Length of param_set is smaller than the value of runs. Returned param_set unchanged." + ) + return param_set + else: + self.__logger.info( + f"Decresed elements in param_set from {len(param_set)} to {runs}" + ) + return random.sample(param_set, runs) diff --git a/custom_components/test/fuzzing/fuzzer_utils/Runner.py b/custom_components/test/fuzzing/fuzzer_utils/Runner.py new file mode 100644 index 00000000..815c281e --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/Runner.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + + +class Runner(ABC): + """Abstract class""" + + @abstractmethod + def __init__(self): + """Abstract method, must be overloaded by the corresponding fuzzer.""" + pass + + @abstractmethod + def run(self): + """Abstract method, must be overloaded by the corresponding fuzzer.""" + pass diff --git a/custom_components/test/fuzzing/fuzzer_utils/ValuePool.py b/custom_components/test/fuzzing/fuzzer_utils/ValuePool.py new file mode 100644 index 00000000..c67a8da9 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/ValuePool.py @@ -0,0 +1,154 @@ +import sys +import datetime + + +class ValuePool: + """Provides value pool for value-pool based fuzzing approches.""" + + __UINT_POOL = [] + __INT_POOL = [] + __FLOAT_POOL = [] + __STRING_POOL = [] + __BOOL_POOL = [] + __BYTE_POOL = [] + __LIST_POOL = [] + __DICT_POOL = [] + __DATE_POOL = [] + __ALL_VALUES_POOL = [] + + def __init__(self) -> None: + """constructor + Set values for pools. + + sys.maxsize: An integer giving the maximum value a variable of type Py_ssize_t can take. It's usually 2^31 - 1 on a 32-bit platform and 2^63 - 1 on a 64-bit platform. + """ + # set values for __UINT_POOL + self.__UINT_POOL = [ + 0, + 1, + 257, + sys.maxsize, + sys.maxsize * sys.maxsize, + ] + + # set values for __INT_POOL + self.__INT_POOL = [ + sys.maxsize * -sys.maxsize, + -sys.maxsize, + -257, + -1, + ] + self.__UINT_POOL + + # set values for __FLOAT_POOL + self.__FLOAT_POOL = [ + sys.maxsize * -sys.maxsize * 0.5, + -sys.maxsize * 0.5, + -257.0, + -1.0, + 0.0, + 1.0, + 257.0, + sys.maxsize * 0.5, + sys.maxsize * sys.maxsize * 0.5, + ] + [x * 1.1 for x in self.__INT_POOL] + + # set values for __STRING_POOL + self.__STRING_POOL = [ + "", + "a", + "abc", + " " * 100, # long string of spaces + "special_characters_!@#$%^&*()", + "üñîçødê", + "a" * 1000, # very long string + ] + + # set values for __BOOL_POOL + self.__BOOL_POOL = [ + None, + True, + False, + 0, + 1, + ] + + # set values for __BYTE_POOL + self.__BYTE_POOL = [ + b"", + b"\x00", + b"abc", + bytes(range(256)), # all possible byte values + ] + + # set values for __LIST_POOL + self.__LIST_POOL = [ + None, + [], + [1, 2, 3], + ["a", "b", "c"], + [True, False, None], + list(range(100)), # long list + ] + + # set values for __DICT_POOL + # a dict is represented as a string and need to be loaded as a json in the testcase + self.__DICT_POOL = [ + # None, # test cases can today not handel a None value, the JSON bib can load a NONE value as a JSON + "{}", + '{"key": "value"}', + '{"int": 1, "float": 1.0, "str": "string"}', + '{"0": 0, "1": 1, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6, "7": 7, "8": 8, "9": 9}', + ] + + # set values for __DATE_POOL + self.__DATE_POOL = [ + None, + datetime.datetime.min, + datetime.datetime.max, + datetime.datetime.now(), + datetime.datetime(2000, 1, 1), + datetime.datetime(1970, 1, 1), + ] + + # create a pool with all unique values + self.__ALL_VALUES_POOL = ( + self.__INT_POOL + + self.__UINT_POOL + + self.__FLOAT_POOL + + self.__STRING_POOL + + self.__BOOL_POOL + + self.__BYTE_POOL + + self.__LIST_POOL + + self.__DICT_POOL + + self.__DATE_POOL + ) + + def get_uint(self) -> list: + return self.__UINT_POOL + + def get_int(self) -> list: + return self.__INT_POOL + + def get_float(self) -> list: + return self.__FLOAT_POOL + + def get_string(self) -> list: + return self.__STRING_POOL + + def get_bool(self) -> list: + return self.__BOOL_POOL + + def get_byte(self) -> list: + return self.__BYTE_POOL + + def get_list(self) -> list: + return self.__LIST_POOL + + def get_dict(self) -> list: + return self.__DICT_POOL + + def get_date(self) -> list: + return self.__DATE_POOL + + def get_all_values(self) -> list: + return self.__ALL_VALUES_POOL diff --git a/custom_components/test/fuzzing/fuzzer_utils/ValuePoolFuzzer.py b/custom_components/test/fuzzing/fuzzer_utils/ValuePoolFuzzer.py new file mode 100644 index 00000000..2dc3736d --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/ValuePoolFuzzer.py @@ -0,0 +1,186 @@ +import logging +import itertools +import json + +from custom_components.test.fuzzing.fuzzer_utils.Fuzzer import Fuzzer +from custom_components.test.fuzzing.fuzzer_utils.GrammarFuzzer import GrammarFuzzer +from custom_components.test.fuzzing.fuzzer_utils.ValuePool import ValuePool +from custom_components.test.fuzzing.fuzzer_utils.grammar_pool import grammar_controls_json, grammar_ipv4, \ + grammar_loxconfig_rooms_cats_json + + +class ValuePoolFuzzer(Fuzzer): + """Value pool fuzzer class, inherits from the abstract fuzzer class.""" + + __logger = None + + __value_pool: ValuePool = None + __grammar_fuzzer: GrammarFuzzer = None + + def __init__(self): + """constructor""" + self.__logger = logging.getLogger(__name__) + self.__value_pool = ValuePool() + self.__grammar_fuzzer = GrammarFuzzer() + + def __get_fuzzing_pool( + self, value_pools: list[list], param_combi: int + ) -> list[list]: + """ + Generates combinations of the values in the provided lists. + + :param value_pools: A list containing inner lists of values. + :type lists: list of lists + :param param_combi: Number of parameters to be combined. + :type param_combi: int + + :return: A list with combinations of values, ready for fuzzing + :rtype: list of lists + """ + value_pool_limited: list[list] = [] + i: int = 0 + while i < param_combi: + value_pool_limited.append(value_pools[i]) + i += 1 + + return_lists: list[list] = [ + list(t) for t in itertools.product(*value_pool_limited) + ] + + if param_combi > 1: + # Adjust the shift to evenly distribute each element + for m in range(param_combi, len(value_pools)): + pool: list = value_pools[m] + pool_index: int = 0 + cnt_return_lists: int = 0 + # Distribute the additional pools over the already generated lists + while cnt_return_lists < len(return_lists): + if pool_index >= len(pool): + pool_index = 0 + # Calculate the index-shift for even distribution + if cnt_return_lists % len(value_pools[1]) == 0: + pool_index = (cnt_return_lists // len(value_pools[1])) * (m - param_combi + 1) + if pool_index >= len(pool): + pool_index = m - param_combi + if pool_index >= len(pool): + pool_index = 0 + return_lists[cnt_return_lists].append(pool[pool_index]) + pool_index += 1 + cnt_return_lists += 1 + else: + # Create return_lists with one element from the first value pool + return_lists = [[item] for item in value_pools[0]] + # Add elements from the other value pools + vp_index: int = 1 + while vp_index < len(value_pools): + pool = value_pools[vp_index] + return_lists_index: int = 0 + pool_index: int = 0 + while return_lists_index < len(return_lists): + + # If the pool_index is greater than the length of the current pool, start from the beginning + if pool_index >= len(pool): + pool_index = 0 + return_lists[return_lists_index].append(pool[pool_index]) + return_lists_index += 1 + pool_index += 1 + + vp_index += 1 + + return return_lists + + def fuzz(self, types: list = ["INT"], param_combi: int = 1) -> list: + """ + Generates an individual value pool for fuzzing based on the parameters. + + :param types: A list of required data types. + :type types: list, defaults to ["INT"] + :param param_combi: Maximum number of parameter combinations. + :type param_combi: int, defaults to 1 + + + :raises ValueError: If length of types list is not positive. + :raises ValueError: If param_combi is not between 1 and len(types). + + :return: The value pool for fuzzing. + :rtype: list of lists + """ + + # Validate input parameters + if len(types) <= 0: + self.__logger.error("Length of types list must be positive.") + raise ValueError("Length of types list must be positive.") + if param_combi <= 0 or param_combi > len(types): + self.__logger.error("param_combi must be between 1 and len(types).") + raise ValueError("param_combi must be between 1 and len(types).") + + # Get the value pools for the valid types + valid_types = { + "INT": self.__value_pool.get_int(), + "UINT": self.__value_pool.get_uint(), + "FLOAT": self.__value_pool.get_float(), + "STRING": self.__value_pool.get_string(), + "BOOL": self.__value_pool.get_bool(), + "BYTE": self.__value_pool.get_byte(), + "LIST": self.__value_pool.get_list(), + "DICT": self.__value_pool.get_dict(), + "DATE": self.__value_pool.get_date(), + "ALL": self.__value_pool.get_all_values(), + "GRAMMAR_IPV4_MIN": [ + self.__grammar_fuzzer.fuzz_min_cost(grammar_ipv4, "") + ], + "GRAMMAR_IPV4_MAX": [ + self.__grammar_fuzzer.fuzz_max_cost(grammar_ipv4, "", 2) + ], + "GRAMMAR_IPV4_COV": self.__grammar_fuzzer.fuzz_grammar_coverage( + grammar_ipv4, "" + ), + "GRAMMAR_CONTROLS_JSON_MIN": [ + json.loads(self.__grammar_fuzzer.fuzz_min_cost(grammar_controls_json, "")), ], + "GRAMMAR_CONTROLS_JSON_MAX": [ + json.loads(self.__grammar_fuzzer.fuzz_max_cost(grammar_controls_json, "", 6)), ], + "GRAMMAR_CONTROLS_JSON_COV": list(map(lambda x: json.loads(x), + self.__grammar_fuzzer.fuzz_grammar_coverage(grammar_controls_json, + ""))), + "GRAMMAR_LOXCONFIG_ROOMS_CATS_JSON_MIN": [ + json.loads(self.__grammar_fuzzer.fuzz_min_cost(grammar_loxconfig_rooms_cats_json, ""))], + "GRAMMAR_LOXCONFIG_ROOMS_CATS_JSON_MAX": [ + json.loads(self.__grammar_fuzzer.fuzz_max_cost(grammar_loxconfig_rooms_cats_json, "", 6))], + "GRAMMAR_LOXCONFIG_ROOMS_CATS_JSON_COV": list(map(lambda x: json.loads(x), + self.__grammar_fuzzer.fuzz_grammar_coverage( + grammar_loxconfig_rooms_cats_json, + ""))), + } + + data: list = [] + + for type in types: + # Check whether requested types are valid. + if type not in valid_types: + self.__logger.error("Invalid type " + str(type) + " specified.") + raise ValueError(f"Invalid type '{type}' specified.") + else: + # Creating list of the value_pool lists provided in types + data.append(valid_types[type]) + + # Sort the value pools by length in descending order + sorted_indices = sorted( + range(len(data)), key=lambda i: len(data[i]), reverse=True + ) + value_pools: list[list] = [data[i] for i in sorted_indices] + value_pools = self.__get_fuzzing_pool(value_pools, param_combi) + # Sort the resulting lists back into the original order + result: list[list] = [] + + value_pool_index: int = 0 + while value_pool_index < len(value_pools): + reordered_list: list = [] + vp: list = value_pools[value_pool_index] + m: int = 0 + while m < len(vp): + reordered_list.append(vp[sorted_indices.index(m)]) + m += 1 + value_pool_index += 1 + result.append(reordered_list) + + return result diff --git a/custom_components/test/fuzzing/fuzzer_utils/fuzzer_tools/DataTypeCreator.py b/custom_components/test/fuzzing/fuzzer_utils/fuzzer_tools/DataTypeCreator.py new file mode 100644 index 00000000..323bb598 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/fuzzer_tools/DataTypeCreator.py @@ -0,0 +1,115 @@ +import random +import string + +class DataTypeCreator: + + __MAX_INT = (1 << 31) - 1 + + def __init__(self): + """initialize DataTypeCreator""" + + def create_int(self, amount_digits: int = 10, random_creation: bool = True) -> int: + """Returns an int value with a certain number of digits. + + This function takes a value 'amount_digits' and returns an integer with this amount of digits. + + :param amount_digits: Amount of digits the integer should have + :type amount_digits: int + + :param random_creation: True: The int will be created random. False: The int will be amount_digits long. + :type random_creation: boolean + + :return: Returns an integer with a certain amount of digits. + :rtype: int + """ + if random_creation == True: + random_seed_value = random.randint(-self.__MAX_INT, self.__MAX_INT) + return random_seed_value + else: + seed_value = '' + for digit in range(amount_digits): + if digit == 0: + # Decide if negative of positive int + rand_val = random.randint(0,1) + if rand_val == 0: + seed_value += '-' + # First digit should not be a 0 + rand_val = str(random.randint(1,9)) + seed_value += rand_val + else: + rand_val = str(random.randint(0,9)) + seed_value += rand_val + + # cast to int type and append to seed + if digit == amount_digits-1: + return int(seed_value) + + def create_float(self, amount_digits: int, random_creation: bool = True) -> int: + """Returns an int value with a certain number of digits. + + This function takes a value 'amount_digits' and returns an float with this amount of digits. + + :param amount_digits: Amount of digits the float should have + :type amount_digits: int + + :param random_creation: True: The float will be created random. False: The float will be amount_digits long. + :type random_creation: boolean + + :return: Returns an float with a certain amount of digits. + :rtype: float + """ + return random.uniform(-1000,1000) + + def create_string_only_letters(self, amount_chars: int) -> int: + """Returns an string with a certain number of chars. + + This function takes a value 'amount_chars' and returns an string with this amount of chars. + + :param amount_chars: Amount of chars the string should have + :type amount_chars: int + + :param random_creation: True: The string will be created random. False: The string will be amount_digits long. + :type random_creation: boolean + + :return: Returns an string with a certain amount of chars. + :rtype: string + """ + seed_value = '' + for character in range(amount_chars): + random_letter = random.choice(string.ascii_letters) + + seed_value += random_letter + + # cast to int type and append to seed + if character == amount_chars-1: + return seed_value + + def create_string_special_characters(self, amount_chars: int) -> str: + """Returns an string with a certain number of chars. + + This function takes a value 'amount_chars' and returns an string with this amount of chars. + The string includes uppercase and lowercase letters and special charakters. + Special charakters = "!@#$%^&*()_+-=[]{}|;:',.<>?/`~". + + :param amount_chars: Amount of chars the string should have + :type amount_chars: int + + :return: Returns an string with a certain amount of chars. + :rtype: string + """ + special_characters = "!@#$%^&*()_+-=[]{}|;:',.<>?/`~" + seed_value = '' + for character in range(amount_chars): + rand_value = random.randint(0,4) + if rand_value == 0: + random_letter = random.choice(special_characters) + else: + random_letter = random.choice(string.ascii_letters) + + seed_value += random_letter + + # cast to int type and append to seed + if character == amount_chars-1: + return seed_value + + diff --git a/custom_components/test/fuzzing/fuzzer_utils/fuzzer_tools/Seed.py b/custom_components/test/fuzzing/fuzzer_utils/fuzzer_tools/Seed.py new file mode 100644 index 00000000..d100eaf5 --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/fuzzer_tools/Seed.py @@ -0,0 +1,86 @@ +from typing import List +import random +import copy + +class Seed: + + energy = 0 + seed_values = [] + + def __init__(self, energy: int = 0, seed_values: list = []): + """initialize PowerSchedule""" + self.energy = energy + self.seed_values = seed_values + + +class SeedManager: + __power_energy = 2 + + def __init__(self): + """initialize PowerSchedule""" + + def select_seed(self, seed_population: List[Seed]) -> Seed: + """Selects a seed based on their energy. + + This function selects a seed. + The higher the energy of a seed, the more likely it is that a seed will be selected. + + :param seed_population: A list with seeds. A seed is a set of parameters. + :type seed_population: list + + :return: Returns a single seed. + :rtype: Seed + """ + normalized_energy = self.get_normalized_energy(seed_population) + random_value = random.uniform(0,1) + for index, normalized_energy_val in enumerate(normalized_energy): + if random_value <= normalized_energy_val: + seed = copy.deepcopy(seed_population[index]) + break + + return seed + + def adjust_energy(self, seed: Seed, branch_dict: dict, hashed_branch: str): + """Adjusts the energy of a given seed. + + This function changes the energy of a seed based on how many times the branch was executed. + The formula for the adustment is: e = 1 / number_path_exercised + The number_path_exercised is the number of the how many times the path was seen in total. + + :param seed: A seed with a value and energy attribute. + :type seed: Seed + + :param branch_dict: A dictionary with hashes of the paths and a value of how many times the path was exercised. + :type branch_dict: dict + + :param hashed_branch: A hash of a path. + :type hashed_branch: str + + :return: Returns a single seed. + :rtype: Seed + """ + number_path_exercised = branch_dict[hashed_branch] + seed.energy = 1 / (number_path_exercised ** self.__power_energy) + + def get_normalized_energy(self, seed_population: List[Seed]) -> list: + total_energy = 0 + for seed in seed_population: + total_energy += seed.energy + + normalized_energy = [] + + for index, seed in enumerate(seed_population): + if index == 0: + normalized_energy.append(seed.energy / total_energy) + else: + normalized_energy.append(normalized_energy[index-1] + (seed.energy / total_energy)) + + return normalized_energy + + + + + + + + diff --git a/custom_components/test/fuzzing/fuzzer_utils/grammar_pool.py b/custom_components/test/fuzzing/fuzzer_utils/grammar_pool.py new file mode 100644 index 00000000..d69be11a --- /dev/null +++ b/custom_components/test/fuzzing/fuzzer_utils/grammar_pool.py @@ -0,0 +1,31 @@ +from custom_components.test.fuzzing.fuzzer_utils.GrammarFuzzer import Grammar + +grammar_ipv4: Grammar = { + "": ["..."], + "": ["<3Digits>", "<2Digits>", ""], + "<3Digits>": ["2<2DigitsR>", "1"], + "<2Digits>": [""], + "<2DigitsR>": ["55", "5", ""], + "": ["0", ""], + "": ["1", "2", "3", "4", "5", "6", "7", "8", "9"], + "": ["0", "1", "2", "3", "4"] +} + +grammar_controls_json: Grammar = { + "": ["{ \"controls\" : { } }"], + "": ["", ", "], + "": ["\"\": { }", ", \"\": { }"], + "": ["\"type\": \"\""], + "": [""], + "": ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", + "v", "w", "x", "y", "z"], +} + +grammar_loxconfig_rooms_cats_json: Grammar = { + "": ["{ \"rooms\": { }, \"cats\": { } }"], + "": ["", ", "], + "": ["\"\": { }", ", \"\": { }"], + "": ["\"name\": \"\""], + "": ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", + "v", "w", "x", "y", "z"], +} diff --git a/custom_components/test/fuzzing/generators/DummyClass.py b/custom_components/test/fuzzing/generators/DummyClass.py new file mode 100644 index 00000000..87485868 --- /dev/null +++ b/custom_components/test/fuzzing/generators/DummyClass.py @@ -0,0 +1,70 @@ +class DummyClass: + def __init__(self, value: int): + """ + Initialize the class with an integer value. + :param value: The initial integer value of the class instance. + """ + self.value = value + + def increment(self, amount: int) -> int: + """ + Increment the value by a given integer amount. + :param amount: The integer amount to increment the value by. + :return: The new integer value after incrementing. + """ + self.value += amount + return self.value + + def multiply(self, factor: float) -> float: + """ + Multiply the value by a given float factor. + :param factor: The float factor to multiply the value by. + :return: The new float value after multiplication. + """ + self.value *= factor + return self.value + + def reset(self) -> None: + """ + Reset the value to zero. + """ + self.value = 0 + + def echo(self, message: str) -> str: + """ + Return the given string message. + :param message: The string message to be returned. + :return: The string message provided as input. + """ + return message + + def divide(self, divisor: float) -> float: + """ + Divide the value by the given float divisor. + :param divisor: The float divisor to divide the value by. + :return: The new float value after division. + :raises ValueError: If the divisor is zero. + """ + if divisor == 0: + raise ValueError("Cannot divide by zero") + self.value /= divisor + return self.value + + def add_and_multiply(self, addend: int, multiplier: float) -> float: + """ + Add an integer to the value and then multiply it by a float. + :param addend: The integer number to add to the value. + :param multiplier: The float number to multiply the value by after adding. + :return: The new float value after addition and multiplication. + """ + self.value += addend + self.value *= multiplier + return self.value + + def process_list(self, data_list: list) -> int: + """ + Process a list of integers and return their sum. + :param data_list: A list of integers to be processed. + :return: The sum of the integers in the list. + """ + return sum(data_list) diff --git a/custom_components/test/fuzzing/generators/param_types.csv b/custom_components/test/fuzzing/generators/param_types.csv new file mode 100644 index 00000000..e4384691 --- /dev/null +++ b/custom_components/test/fuzzing/generators/param_types.csv @@ -0,0 +1,60 @@ +LoxoneRoomControllerV2,__init__,0,INT +LoxoneRoomControllerV2,add_to_platform_start,0,BYTE +LoxoneRoomControllerV2,add_to_platform_start,1,BOOL +LoxoneRoomControllerV2,add_to_platform_start,2,UINT +LoxoneRoomControllerV2,async_handle_set_preset_mode_service,0,STRING +LoxoneRoomControllerV2,async_update_ha_state,0,BOOL +LoxoneRoomControllerV2,async_set_temperature,0,INT +LoxoneRoomControllerV2,schedule_update_ha_state,0,BOOL +LoxoneRoomControllerV2,_name_internal,0,BOOL +LoxoneRoomControllerV2,_name_internal,1,BYTE +LoxoneRoomControllerV2,get_hassjob_type,0,STRING +LoxoneRoomControllerV2,async_handle_set_fan_mode_service,0,STRING +LoxoneRoomControllerV2,async_handle_set_swing_mode_service,0,STRING +LoxoneRoomControllerV2,_get_format,0,BOOL +LoxoneRoomControllerV2,__async_remove_impl,0,BOOL +LoxoneRoomControllerV2,async_set_context,0,INT +LoxoneRoomControllerV2,async_schedule_update_ha_state,0,BOOL +LoxoneRoomControllerV2,async_device_update,0,BOOL +LoxoneRoomControllerV2,set_fan_mode,0,STRING +LoxoneRoomControllerV2,async_set_hvac_mode,0,DATE +LoxoneRoomControllerV2,_stringify_state,0,BOOL +LoxoneRoomControllerV2,_async_process_registry_update_or_remove,0,BYTE +LoxoneRoomControllerV2,get_mode_from_id,0,UINT +LoxoneRoomControllerV2,_substitute_name_placeholders,0,STRING +LoxoneRoomControllerV2,async_set_preset_mode,0,STRING +LoxoneRoomControllerV2,_device_class_name_helper,0,INT +LoxoneRoomControllerV2,set_preset_mode,0,STRING +LoxoneRoomControllerV2,async_remove,0,BOOL +LoxoneRoomControllerV2,async_set_fan_mode,0,STRING +LoxoneRoomControllerV2,async_set_swing_mode,0,STRING +LoxoneRoomControllerV2,async_on_remove,0,BOOL +LoxoneRoomControllerV2,set_temperature,0,INT +LoxoneRoomControllerV2,event_handler,0,INT +LoxoneRoomControllerV2,async_set_humidity,0,INT +LoxoneRoomControllerV2,set_swing_mode,0,STRING +LoxoneRoomControllerV2,_async_device_registry_updated,0,BOOL +LoxoneRoomControllerV2,_report_deprecated_supported_features_values,0,INT +LoxoneRoomControllerV2,_async_registry_updated,0,BOOL +LoxoneRoomControllerV2,set_humidity,0,INT +LoxoneRoomControllerV2,set_hvac_mode,0,STRING +LoxoneRoomControllerV2,async_request_call,0,DICT +LoxoneRoomControllerV2,_clean_unit,0,UINT +LoxoneRoomControllerV2,get_state_value,0,UINT +LoxoneRoomControllerV2,_valid_mode_or_raise,0,LIST +LoxoneRoomControllerV2,_valid_mode_or_raise,1,STRING +LoxoneRoomControllerV2,_valid_mode_or_raise,2,BYTE +DummyClass,__init__,0,INT +DummyClass,add_and_multiply,0,INT +DummyClass,add_and_multiply,1,FLOAT +DummyClass,increment,0,INT +DummyClass,divide,0,FLOAT +DummyClass,echo,0,STRING +DummyClass,process_list,0,LIST +DummyClass,multiply,0,FLOAT +LxToken,__init__,0,STRING +LxToken,__init__,1,BYTE +LxToken,__init__,2,UINT +LxToken,set_token,0,BYTE +LxToken,set_hash_alg,0,BYTE +LxToken,set_valid_until,0,INT diff --git a/custom_components/test/fuzzing/generators/test_gen_on_dummy.py b/custom_components/test/fuzzing/generators/test_gen_on_dummy.py new file mode 100644 index 00000000..dbc28537 --- /dev/null +++ b/custom_components/test/fuzzing/generators/test_gen_on_dummy.py @@ -0,0 +1,38 @@ +import pytest +import logging + +from custom_components.test.fuzzing.fuzzer_utils.ValuePoolFuzzer import ValuePoolFuzzer +from custom_components.test.fuzzing.fuzzer_utils.GeneratorFuzzer import GeneratorFuzzer +from custom_components.test.fuzzing.fuzzer_utils.GeneratorRunner import GeneratorRunner +from custom_components.test.fuzzing.generators.DummyClass import DummyClass + +# Logger setup +logger = logging.getLogger(__name__) + +# Fuzzing and Runner setup +value_pool_fuzzer = ValuePoolFuzzer() +generator_fuzzer = GeneratorFuzzer(value_pool_fuzzer, 1) +generator_runner = GeneratorRunner() + +@pytest.mark.timeout(300) +def test_DummyClass() -> None: + logger.info("Start of DummyClass test.") + + # Define the start methods and parameters for fuzzing + start_methods = ['__init__'] + max_sequence_length = 4 + num_sequences = 10 + + # Generate fuzzed sequences + generated_sequences = generator_fuzzer.fuzz(DummyClass, start_methods, max_sequence_length, num_sequences) + + # Run the sequences on DummyClass + results = generator_runner.run(DummyClass, generated_sequences) + + # Extract passed and failed test counts + passed_tests, failed_tests = results + + # Log the results + logger.info(f"Passed tests: {passed_tests}, Failed tests: {failed_tests}") + + logger.info("DummyClass test finished.") \ No newline at end of file diff --git a/custom_components/test/fuzzing/generators/test_gen_on_pyLoxone.py b/custom_components/test/fuzzing/generators/test_gen_on_pyLoxone.py new file mode 100644 index 00000000..049fb3d2 --- /dev/null +++ b/custom_components/test/fuzzing/generators/test_gen_on_pyLoxone.py @@ -0,0 +1,39 @@ +import pytest +import logging + +from custom_components.test.fuzzing.fuzzer_utils.ValuePoolFuzzer import ValuePoolFuzzer +from custom_components.test.fuzzing.fuzzer_utils.GeneratorFuzzer import GeneratorFuzzer +from custom_components.test.fuzzing.fuzzer_utils.GeneratorRunner import GeneratorRunner +from custom_components.loxone.climate import LoxoneRoomControllerV2 +from custom_components.loxone.api import LxToken + +# Logger setup +logger = logging.getLogger(__name__) + +# Fuzzing and Runner setup +value_pool_fuzzer = ValuePoolFuzzer() +generator_fuzzer = GeneratorFuzzer(value_pool_fuzzer, 2) +generator_runner = GeneratorRunner() + +@pytest.mark.timeout(300) +def test_api_LxToken() -> None: + logger.info("Start test of LxToken from api.py.") + + # Define the start methods and parameters for fuzzing + start_methods = ['__init__'] + max_sequence_length = 8 + num_sequences = 50 + + # Generate fuzzed sequences + generated_sequences = generator_fuzzer.fuzz(LxToken, start_methods, max_sequence_length, num_sequences) + + # Run the sequences on LoxoneRoomControllerV2 + results = generator_runner.run(LxToken, generated_sequences) + + # Extract passed and failed test counts + passed_tests, failed_tests = results + + # Log the results + logger.info(f"Passed tests: {passed_tests}, Failed tests: {failed_tests}") + + logger.info("LxToken test finished.") diff --git a/custom_components/test/fuzzing/grey_box/example1_grey_box_fuzzer.py b/custom_components/test/fuzzing/grey_box/example1_grey_box_fuzzer.py new file mode 100644 index 00000000..97a1687f --- /dev/null +++ b/custom_components/test/fuzzing/grey_box/example1_grey_box_fuzzer.py @@ -0,0 +1,71 @@ +from custom_components.test.fuzzing.fuzzer_utils.GreyBoxFuzzer import GreyBoxFuzzer +from custom_components.test.fuzzing.fuzzer_utils.GreyBoxRunner import GreyBoxRunner +from custom_components.test.fuzzing.fuzzer_utils.fuzzer_tools.Seed import Seed + +from custom_components.loxone.helpers import ( + map_range, + hass_to_lox, + lox_to_hass, + lox2lox_mapped, + lox2hass_mapped, + to_hass_color_temp, + to_loxone_color_temp, + get_room_name_from_room_uuid, + get_cat_name_from_cat_uuid, + add_room_and_cat_to_value_values, + get_miniserver_type, + get_all, +) + +def path_coverage_function(f: float, s: str, i: int) -> str: + if f > 0.0: + if len(s) > 5: + if i % 2 == 0: + return "Path 1: f > 0.0, len(s) > 5, i is even" + else: + return "Path 2: f > 0.0, len(s) > 5, i is odd" + else: + if i % 2 == 0: + return "Path 3: f > 0.0, len(s) <= 5, i is even" + else: + return "Path 4: f > 0.0, len(s) <= 5, i is odd" + else: + if len(s) > 5: + if i % 2 == 0: + return "Path 5: f <= 0.0, len(s) > 5, i is even" + else: + return "Path 6: f <= 0.0, len(s) > 5, i is odd" + else: + if i % 2 == 0: + return "Path 7: f <= 0.0, len(s) <= 5, i is even" + else: + raise Exception() + + + +grey_box_fuzzer = GreyBoxFuzzer() +grey_box_runner = GreyBoxRunner() + +# seed specification + +amount_seeds = 10 +seed_template = ["FLOAT", "STRING", "INT"] +seed_specification = ['r','r','r'] + + +# create a population with fuzzer +seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 10) + +# Print seeds in population +print("##### Population #####") +for i in seed_population: + print(i.seed_values) + +print("\n##### Execute Tests #####\n") + +test_results = grey_box_runner.run(path_coverage_function, seed_population, 2000) + +print("\n##### Test restults #####\n") +print(f"Tests passed: {test_results['passed_tests']}") +print(f"Tests failed: {test_results['failed_tests']}") + diff --git a/custom_components/test/fuzzing/grey_box/example2_grey_box_fuzzer.py b/custom_components/test/fuzzing/grey_box/example2_grey_box_fuzzer.py new file mode 100644 index 00000000..21c2bc9e --- /dev/null +++ b/custom_components/test/fuzzing/grey_box/example2_grey_box_fuzzer.py @@ -0,0 +1,60 @@ +from custom_components.test.fuzzing.fuzzer_utils.GreyBoxFuzzer import GreyBoxFuzzer +from custom_components.test.fuzzing.fuzzer_utils.GreyBoxRunner import GreyBoxRunner +from custom_components.test.fuzzing.fuzzer_utils.fuzzer_tools.Seed import Seed + +from custom_components.loxone.helpers import ( + map_range, + hass_to_lox, + lox_to_hass, + lox2lox_mapped, + lox2hass_mapped, + to_hass_color_temp, + to_loxone_color_temp, + get_room_name_from_room_uuid, + get_cat_name_from_cat_uuid, + add_room_and_cat_to_value_values, + get_miniserver_type, + get_all, +) + +# Function to test the grey box fuzzer +def crashme(s: str) -> None: + cnt = 0 + if len(s) > 0 and s[0] == 'b': + cnt += 1 + if len(s) > 1 and s[1] == 'a': + cnt += 1 + if len(s) > 2 and s[2] == 'd': + cnt += 1 + if len(s) > 3 and s[3] == '!': + cnt += 1 + if cnt >= 3: + raise Exception() + + +grey_box_fuzzer = GreyBoxFuzzer() +grey_box_runner = GreyBoxRunner() + +# seed specification + +amount_seeds = 10 +seed_template = ["STRING"] +seed_specification = [4] + + +# create a population with fuzzer +seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 10) + +# Print seeds in population +print("##### Population #####") +for i in seed_population: + print(i.seed_values) + +print("\n##### Execute Tests #####\n") + +test_results = grey_box_runner.run(crashme, seed_population, 10) + + +print("\n##### Test restults #####\n") +print(f"Tests passed: {test_results['passed_tests']}") +print(f"Tests failed: {test_results['failed_tests']}") diff --git a/custom_components/test/fuzzing/grey_box/test_grey_box_on_helpers.py b/custom_components/test/fuzzing/grey_box/test_grey_box_on_helpers.py new file mode 100644 index 00000000..09ed027b --- /dev/null +++ b/custom_components/test/fuzzing/grey_box/test_grey_box_on_helpers.py @@ -0,0 +1,164 @@ +from custom_components.test.fuzzing.fuzzer_utils.GreyBoxFuzzer import GreyBoxFuzzer +from custom_components.test.fuzzing.fuzzer_utils.GreyBoxRunner import GreyBoxRunner +from custom_components.test.fuzzing.fuzzer_utils.fuzzer_tools.Seed import Seed +import pytest +import logging + +from custom_components.loxone.helpers import ( + map_range, + hass_to_lox, + lox_to_hass, + lox2lox_mapped, + lox2hass_mapped, + to_hass_color_temp, + to_loxone_color_temp, + get_miniserver_type, +) + +logger = logging.getLogger(__name__) +grey_box_fuzzer: GreyBoxFuzzer = GreyBoxFuzzer() +grey_box_runner: GreyBoxRunner = GreyBoxRunner() + + + + + +# Function to test the grey box fuzzer +def demo_function(s: str) -> None: + cnt = 0 + if len(s) > 0 and s[0] == 'b': + cnt += 1 + if len(s) > 1 and s[1] == 'a': + cnt += 1 + if len(s) > 2 and s[2] == 'd': + cnt += 1 + if len(s) > 3 and s[3] == '!': + cnt += 1 + if cnt >= 3: + raise Exception() + + +@pytest.mark.skipif(False, reason="Not skiped!") +def test_crashme() -> None: + logger.info("Start of test_crashme() test.") + seed_template = ["STRING"] + seed_specification = [4] + seed_population: list[Seed] + result: dict + + seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 20) + result = grey_box_runner.run(demo_function, seed_population, 100) + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skiped!") +def test_map_range() -> None: + logger.info("Start of test_map_range() test.") + seed_template = ["FLOAT", "FLOAT", "FLOAT", "FLOAT", "FLOAT"] + seed_specification = ['r', 'r', 'r', 'r', 'r'] + seed_population: list[Seed] + result: dict + + seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 20) + result = grey_box_runner.run(map_range, seed_population, 100) + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skiped!") +def test_hass_to_lox() -> None: + logger.info("Start of test_hass_to_lox() test.") + seed_template = ["FLOAT"] + seed_specification = ['r'] + seed_population: list[Seed] + result: dict + + seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 20) + result = grey_box_runner.run(hass_to_lox, seed_population, 100) + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skiped!") +def test_lox_to_hass() -> None: + logger.info("Start of test_lox_to_hass() test.") + seed_template = ["FLOAT"] + seed_specification = ['r'] + seed_population: list[Seed] + result: dict + + seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 20) + result = grey_box_runner.run(lox_to_hass, seed_population, 100) + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skiped!") +def test_lox2lox_mapped() -> None: + logger.info("Start of test_lox2lox_mapped() test.") + seed_template = ["FLOAT", "FLOAT", "FLOAT"] + seed_specification = ['r', 'r', 'r'] + seed_population: list[Seed] + result: dict + + seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 20) + result = grey_box_runner.run(lox2lox_mapped, seed_population, 100) + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skiped!") +def test_lox2hass_mapped() -> None: + logger.info("Start of test_lox2hass_mapped() test.") + seed_template = ["FLOAT", "FLOAT", "FLOAT"] + seed_specification = ['r', 'r', 'r'] + seed_population: list[Seed] + result: dict + + seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 20) + result = grey_box_runner.run(lox2hass_mapped, seed_population, 100) + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skiped!") +def test_to_hass_color_temp() -> None: + logger.info("Start of test_to_hass_color_temp() test.") + seed_template = ["FLOAT"] + seed_specification = ['r'] + seed_population: list[Seed] + result: dict + + seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 20) + result = grey_box_runner.run(to_hass_color_temp, seed_population, 100) + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skiped!") +def test_to_loxone_color_temp() -> None: + logger.info("Start of test_to_loxone_color_temp() test.") + seed_template = ["FLOAT"] + seed_specification = ['r'] + seed_population: list[Seed] + result: dict + + seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 20) + result = grey_box_runner.run(to_loxone_color_temp, seed_population, 100) + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skiped!") +def test_get_miniserver_type() -> None: + logger.info("Start of test_get_miniserver_type() test.") + seed_template = ["INT"] + seed_specification = ['r'] + seed_population: list[Seed] + result: dict + + seed_population = grey_box_fuzzer.fuzz(seed_template, seed_specification, 20) + result = grey_box_runner.run(get_miniserver_type, seed_population, 100) + + assert result["failed_tests"] == 0 diff --git a/custom_components/test/fuzzing/mutation/test_mut_on_helpers.py b/custom_components/test/fuzzing/mutation/test_mut_on_helpers.py new file mode 100644 index 00000000..14bf9056 --- /dev/null +++ b/custom_components/test/fuzzing/mutation/test_mut_on_helpers.py @@ -0,0 +1,365 @@ +import pytest +import logging +import random +import json + +from custom_components.loxone.helpers import ( + map_range, + hass_to_lox, + lox_to_hass, + lox2lox_mapped, + lox2hass_mapped, + to_hass_color_temp, + to_loxone_color_temp, + get_room_name_from_room_uuid, + get_cat_name_from_cat_uuid, + add_room_and_cat_to_value_values, + get_miniserver_type, + get_all, +) +from custom_components.test.fuzzing.fuzzer_utils.MutationalFuzzer import ( + MutationalBlackBoxFuzzer, +) +from custom_components.test.fuzzing.fuzzer_utils.GrammarFuzzer import GrammarFuzzer +from custom_components.test.fuzzing.fuzzer_utils.ValuePoolFuzzer import ValuePoolFuzzer +from custom_components.test.fuzzing.fuzzer_utils.grammar_pool import ( + grammar_controls_json, + grammar_loxconfig_rooms_cats_json, +) +from custom_components.test.fuzzing.fuzzer_utils.ParamRunner import ParamRunner + + +logger = logging.getLogger(__name__) + +mutational_fuzzer: MutationalBlackBoxFuzzer = MutationalBlackBoxFuzzer() +grammar_fuzzer: GrammarFuzzer = GrammarFuzzer() +value_pool_fuzzer: ValuePoolFuzzer = ValuePoolFuzzer() +param_runner: ParamRunner = ParamRunner() + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_map_range() -> None: + logger.info("Start of test_map_range() test.") + param_set: list[list] + result: dict + + param_set = mutational_fuzzer.fuzz([0.0, 0.0, 0.0, 0.0, 0.0], 100000) + result = param_runner.run(map_range, param_set) + + if result["failed_tests"] != 0: + param_set = mutational_fuzzer.fuzz_failed(result, 20) + result = param_runner.run(map_range, param_set) + + logger.info("test_map_range() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_hass_to_lox() -> None: + logger.info("Start of test_hass_to_lox() test.") + param_set: list[list] + result: dict + + param_set = mutational_fuzzer.fuzz([0.0], 100000) + result = param_runner.run(hass_to_lox, param_set) + + if result["failed_tests"] != 0: + param_set = mutational_fuzzer.fuzz_failed(result, 20) + result = param_runner.run(hass_to_lox, param_set) + + logger.info("test_hass_to_lox() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_lox_to_hass() -> None: + logger.info("Start of test_lox_to_hass() test.") + param_set: list[list] + result: dict + + param_set = mutational_fuzzer.fuzz([0.0], 100000) + result = param_runner.run(lox_to_hass, param_set) + + if result["failed_tests"] != 0: + param_set = mutational_fuzzer.fuzz_failed(result, 20) + result = param_runner.run(lox_to_hass, param_set) + + logger.info("test_lox_to_hass() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_lox2lox_mapped() -> None: + logger.info("Start of test_lox2lox_mapped() test.") + param_set: list[list] + result: dict + + param_set = mutational_fuzzer.fuzz([0.0, 0.0, 0.0], 100000) + result = param_runner.run(lox2lox_mapped, param_set) + + if result["failed_tests"] != 0: + param_set = mutational_fuzzer.fuzz_failed(result, 20) + result = param_runner.run(lox2lox_mapped, param_set) + + logger.info("test_lox2lox_mapped() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_lox2hass_mapped() -> None: + logger.info("Start of test_lox2hass_mapped() test.") + param_set: list[list] + result: dict + + param_set = mutational_fuzzer.fuzz([0.0, 0.0, 0.0], 100000) + result = param_runner.run(lox2hass_mapped, param_set) + + if result["failed_tests"] != 0: + param_set = mutational_fuzzer.fuzz_failed(result, 20) + result = param_runner.run(lox2hass_mapped, param_set) + + logger.info("test_lox2hass_mapped() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_to_hass_color_temp() -> None: + logger.info("Start of test_to_hass_color_temp() test.") + param_set: list[list] + result: dict + + param_set = mutational_fuzzer.fuzz([0.0], 100000) + result = param_runner.run(to_hass_color_temp, param_set) + + if result["failed_tests"] != 0: + param_set = mutational_fuzzer.fuzz_failed(result, 20) + result = param_runner.run(to_hass_color_temp, param_set) + + logger.info("test_to_hass_color_temp() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_to_loxone_color_temp() -> None: + logger.info("Start of test_to_loxone_color_temp() test.") + param_set: list[list] + result: dict + + param_set = mutational_fuzzer.fuzz([0.0], 100000) + result = param_runner.run(to_loxone_color_temp, param_set) + + if result["failed_tests"] != 0: + param_set = mutational_fuzzer.fuzz_failed(result, 20) + result = param_runner.run(to_loxone_color_temp, param_set) + + logger.info("test_to_loxone_color_temp() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_get_miniserver_type() -> None: + logger.info("Start of test_get_miniserver_type() test.") + param_set: list[list] + result: dict + + param_set = mutational_fuzzer.fuzz([0], 100000) + result = param_runner.run(get_miniserver_type, param_set) + + if result["failed_tests"] != 0: + param_set = mutational_fuzzer.fuzz_failed(result, 20) + result = param_runner.run(get_miniserver_type, param_set) + + logger.info("test_get_miniserver_type() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_get_room_name_from_room_uuid() -> None: + logger.info("Start of test_get_room_name_from_room_uuid() test.") + param_set: list[list] + result: dict + + # get a list of valid grammar outputs + full_grammar_cov: list[str] + full_grammar_cov = grammar_fuzzer.fuzz_grammar_coverage( + grammar_loxconfig_rooms_cats_json, "" + ) + + # choose randomly on grammar string + random_valid_grammar_string: str + random_valid_grammar_string = random.choice(full_grammar_cov) + + # mutate seed + param_set = mutational_fuzzer.fuzz([random_valid_grammar_string, "a"], 100) + + # function under test needs a json object + for set in param_set: + # is param after mutation still a valid json? + try: + # Yes -> load as json + set[0] = json.loads(set[0]) + except: + # No -> use random default value from grammar + logger.debug( + f"{set[0]} is not longer a valid json, replaced it with value from grammar." + ) + set[0] = json.loads(random.choice(full_grammar_cov)) + + result = param_runner.run(get_room_name_from_room_uuid, param_set) + + logger.info("test_get_room_name_from_room_uuid() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_get_cat_name_from_cat_uuid() -> None: + logger.info("Start of test_get_cat_name_from_cat_uuid() test.") + param_set: list[list] + result: dict + + # get a list of valid grammar outputs + full_grammar_cov: list[str] + full_grammar_cov = grammar_fuzzer.fuzz_grammar_coverage( + grammar_loxconfig_rooms_cats_json, "" + ) + + # choose randomly on grammar string + random_valid_grammar_string: str + random_valid_grammar_string = random.choice(full_grammar_cov) + + # mutate seed + param_set = mutational_fuzzer.fuzz([random_valid_grammar_string, "a"], 100) + + # function under test needs a json object + for set in param_set: + # is param after mutation still a valid json? + try: + # Yes -> load as json + set[0] = json.loads(set[0]) + except: + # No -> use random default value from grammar + logger.debug( + f"{set[0]} is not longer a valid json, replaced it with value from grammar." + ) + set[0] = json.loads(random.choice(full_grammar_cov)) + + result = param_runner.run(get_cat_name_from_cat_uuid, param_set) + + logger.info("test_get_cat_name_from_cat_uuid() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_add_room_and_cat_to_value_values() -> None: + logger.info("Start of test_add_room_and_cat_to_value_values() test.") + param_set: list[list] + result: dict + + # get a list of valid grammar outputs + full_grammar_cov: list[str] + full_grammar_cov = grammar_fuzzer.fuzz_grammar_coverage( + grammar_loxconfig_rooms_cats_json, "" + ) + + # choose randomly on grammar string + random_valid_grammar_string: str + random_valid_grammar_string = random.choice(full_grammar_cov) + + random_valid_grammar_string_2: list[list] = value_pool_fuzzer.fuzz(["DICT"], 1) + random_valid_grammar_string_2: list = random.choice(random_valid_grammar_string_2) + random_valid_grammar_string_2: dict = random_valid_grammar_string_2[0] + random_valid_grammar_string_2: str = str(random_valid_grammar_string_2) + + # mutate seed + param_set = mutational_fuzzer.fuzz( + [random_valid_grammar_string, random_valid_grammar_string_2], 100 + ) + + # function under test needs a json object + for set in param_set: + # is param after mutation still a valid json? + try: + # Yes -> load as json + set[0] = json.loads(set[0]) + except: + # No -> use random default value from grammar + logger.debug( + f"{set[0]} is not longer a valid json, replaced it with value from grammar." + ) + set[0] = json.loads(random.choice(full_grammar_cov)) + + try: + # Yes -> load as json + logger.debug(f"{set[1]} is type of {type(set[1]).__name__}.") + if set[1].isdigit(): + logger.debug(f"{set[1]} is not a valid JSON and type of {type(set[1]).__name__}.") + set[1] = json.loads('{"set[1]":123') + else: + set[1] = json.loads(set[1]) + logger.debug(f"{set[1]} is a valid JSON and type of {type(set[1]).__name__}.") + except: + # No -> use default value from value pool + logger.debug( + f"{set[1]} is not longer a valid json, replaced it with {random_valid_grammar_string_2}." + ) + set[1] = json.loads(random_valid_grammar_string_2) + + logger.debug(f"{set[0]} and {set[1]} is a param_set.") + + result = param_runner.run(add_room_and_cat_to_value_values, param_set) + + logger.info("test_add_room_and_cat_to_value_values() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_get_all() -> None: + logger.info("Start of test_get_all() test.") + param_set: list[list] + result: dict + + # get a list of valid grammar outputs + full_grammar_cov: list[str] + full_grammar_cov = grammar_fuzzer.fuzz_grammar_coverage( + grammar_controls_json, "" + ) + + # choose randomly on grammar string + random_valid_grammar_string: str + random_valid_grammar_string = random.choice(full_grammar_cov) + + # mutate seed + param_set = mutational_fuzzer.fuzz([random_valid_grammar_string, "a"], 100) + + # function under test needs a json object + for set in param_set: + # is param after mutation still a valid json? + try: + # Yes -> load as json + set[0] = json.loads(set[0]) + except: + # No -> use random default value from grammar + logger.debug( + f"{set[0]} is not longer a valid json, replaced it with value from grammar." + ) + set[0] = json.loads(random.choice(full_grammar_cov)) + + result = param_runner.run(get_all, param_set) + + logger.info("test_get_all() test finished.") + + assert result["failed_tests"] == 0 diff --git a/custom_components/test/fuzzing/readme.md b/custom_components/test/fuzzing/readme.md new file mode 100644 index 00000000..ff0a32fa --- /dev/null +++ b/custom_components/test/fuzzing/readme.md @@ -0,0 +1,76 @@ +# Fuzzing +## What is Fuzzing +Fuzzing, or fuzz testing, is a software testing technique used to find vulnerabilities and bugs by inputting large amounts of random data, called "fuzz," into a program. +The goal is to induce unexpected behavior, crashes, or memory leaks, thereby revealing security issues and flaws that might not be detected through traditional testing methods. +By systematically feeding malformed or semi-random data to the software, fuzzing helps developers identify and fix critical vulnerabilities, enhancing the overall robustness and security of the application. + +## Why are we fuzzing? +We are 5 students, and we have to fuzz an open source project for a grade bonus. +So here we are! + +## Start to fuzz +### Setup +1. create a virtual environment: + +You can create the virtual environment in the project's root directory (recommended) or any other directory of your choice. +```shell +python -m venv venv +``` +2. activate the virtual environment: + +Windows +```shell +.\venv\Scripts\activate +``` + +Linux and macOS +```shell +source venv/bin/activate +``` + +3. install the following packages: +```shell +pip install pytest +pip install pytest-timeout +pip install homeassistant +pip install numpy +pip install coverage +``` +4. you maybe have to tell python were to find the `PyLoxone` project + +Windows +- Go to the Windows menu and search for "Environment Variables". +- Select “Advanced system settings”. +- In the “System Properties” window, click the “Environment Variables” button. +- Click the “New” button in the top half of the dialog, to make a new user variable. +- Name the variable `PYTHONPATH` and set its value to the path of your code directory. Click "OK" and "OK" again to save. + +Linux and macOS +```shell +export PYTHONPATH=$PYTHONPATH:/path/to/PyLoxone +``` +### Run +- start the execution in the root of the repo +```shell +cd /path/to/PyLoxone +``` +- run `pytest` +```shell +pytest +``` +- if you only want to run a single test file, you can enter the path to the file: +```shell +pytest custom_components/test/path/to/test_file.py +``` + +## Fuzzer layout (UML) +![fuzzer_overview](fuzzer_overview.svg) + +# Vulnerabilities found +## `helpers.py` +### `map_range()` +- A possible 0 division is not checked or intercepted. +- If `in_max` and `in_min` are equal. + +### `get_all()` +- If the key `controls` or `type` is not in the `json_data: dict` the function crashes. \ No newline at end of file diff --git a/custom_components/test/fuzzing/value_pools/test_vp_on_helpers.py b/custom_components/test/fuzzing/value_pools/test_vp_on_helpers.py new file mode 100644 index 00000000..2b5bc0f1 --- /dev/null +++ b/custom_components/test/fuzzing/value_pools/test_vp_on_helpers.py @@ -0,0 +1,160 @@ +import pytest +import logging +import json + +from custom_components.loxone.helpers import ( + map_range, + hass_to_lox, + lox_to_hass, + lox2lox_mapped, + lox2hass_mapped, + to_hass_color_temp, + to_loxone_color_temp, + get_room_name_from_room_uuid, + get_cat_name_from_cat_uuid, + add_room_and_cat_to_value_values, + get_miniserver_type, + get_all, +) +from custom_components.test.fuzzing.fuzzer_utils.ValuePoolFuzzer import ValuePoolFuzzer +from custom_components.test.fuzzing.fuzzer_utils.ParamRunner import ParamRunner + +logger = logging.getLogger(__name__) + +value_pool_fuzzer = ValuePoolFuzzer() +param_runner = ParamRunner() + + +@pytest.mark.skipif(False, reason="Not skipped!") +def test_map_range() -> None: + logger.info("Start of map_range() test.") + param_set = value_pool_fuzzer.fuzz(["FLOAT", "FLOAT", "FLOAT", "FLOAT", "FLOAT"], 3) + param_set = param_runner.limit_param_set(param_set, 50000) + result = param_runner.run(map_range, param_set) + logger.info("map_range() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_hass_to_lox() -> None: + logger.info("Start of hass_to_lox() test.") + param_set = value_pool_fuzzer.fuzz(["FLOAT"], 1) + result = param_runner.run(hass_to_lox, param_set) + logger.info("hass_to_lox() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_lox_to_hass() -> None: + logger.info("Start of lox_to_hass() test.") + param_set = value_pool_fuzzer.fuzz(["FLOAT"], 1) + result = param_runner.run(lox_to_hass, param_set) + logger.info("lox_to_hass() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_lox2lox_mapped() -> None: + logger.info("Start of lox2lox_mapped() test.") + param_set = value_pool_fuzzer.fuzz(["FLOAT", "FLOAT", "FLOAT"], 2) + result = param_runner.run(lox2lox_mapped, param_set) + logger.info("lox2lox_mapped() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_lox2hass_mapped() -> None: + logger.info("Start of lox2hass_mapped() test.") + param_set = value_pool_fuzzer.fuzz(["FLOAT", "FLOAT", "FLOAT"], 2) + result = param_runner.run(lox2hass_mapped, param_set) + logger.info("lox2hass_mapped() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_to_hass_color_temp() -> None: + logger.info("Start of to_hass_color_temp() test.") + param_set = value_pool_fuzzer.fuzz(["FLOAT"], 1) + result = param_runner.run(to_hass_color_temp, param_set) + logger.info("to_hass_color_temp() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_to_loxone_color_temp() -> None: + logger.info("Start of to_loxone_color_temp() test.") + param_set = value_pool_fuzzer.fuzz(["FLOAT"], 1) + result = param_runner.run(to_loxone_color_temp, param_set) + logger.info("to_loxone_color_temp() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_get_room_name_from_room_uuid() -> None: + logger.info("Start of get_room_name_from_room_uuid() test.") + param_set = value_pool_fuzzer.fuzz(["GRAMMAR_LOXCONFIG_ROOMS_CATS_JSON_COV", "STRING"], 2) + result = param_runner.run(get_room_name_from_room_uuid, param_set) + logger.info("get_room_name_from_room_uuid() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_get_cat_name_from_cat_uuid() -> None: + logger.info("Start of get_cat_name_from_cat_uuid() test.") + param_set = value_pool_fuzzer.fuzz(["GRAMMAR_LOXCONFIG_ROOMS_CATS_JSON_COV", "STRING"], 2) + result = param_runner.run(get_cat_name_from_cat_uuid, param_set) + logger.info("get_cat_name_from_cat_uuid() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_add_room_and_cat_to_value_values() -> None: + logger.info("Start of add_room_and_cat_to_value_values() test.") + param_set = value_pool_fuzzer.fuzz(["GRAMMAR_LOXCONFIG_ROOMS_CATS_JSON_COV", "DICT"], 2) + # function under test needs a json object + for set in param_set: + set[1] = json.loads(set[1]) + + result = param_runner.run(add_room_and_cat_to_value_values, param_set) + logger.info("add_room_and_cat_to_value_values() test finished.") + + assert result["failed_tests"] == 0 + + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_get_miniserver_type() -> None: + logger.info("Start of get_miniserver_type() test.") + param_set = value_pool_fuzzer.fuzz(["INT"], 1) + result = param_runner.run(get_miniserver_type, param_set) + logger.info("get_miniserver_type() test finished.") + + assert result["failed_tests"] == 0 + +@pytest.mark.skipif(False, reason="Not skipped!") +@pytest.mark.timeout(300) +def test_get_all() -> None: + logger.info("Start of get_all() test.") + param_set = value_pool_fuzzer.fuzz(["GRAMMAR_CONTROLS_JSON_COV", "STRING"], 2) + result = param_runner.run(get_all, param_set) + logger.info("get_all() test finished.") + + assert result["failed_tests"] == 0