|
1 | 1 | # # SPDX-License-Identifier: BSD-3-Clause
|
2 | 2 | # # Copyright 2024-2025 Intel Corporation
|
3 | 3 | # # Media Communications Mesh
|
| 4 | +import datetime |
4 | 5 | import logging
|
5 | 6 | import os
|
| 7 | +import shutil |
6 | 8 | import time
|
7 | 9 | from typing import Dict
|
8 | 10 |
|
9 | 11 | import pytest
|
10 | 12 | from common.nicctl import Nicctl
|
11 | 13 | from create_pcap_file.ramdisk import RamdiskPreparer
|
12 |
| -from mtl_engine.stash import clear_result_media, remove_result_media |
| 14 | +from mfd_common_libs.custom_logger import add_logging_level |
| 15 | +from mfd_common_libs.log_levels import TEST_FAIL, TEST_INFO, TEST_PASS |
| 16 | +from mtl_engine.const import LOG_FOLDER, TESTCMD_LVL |
| 17 | +from mtl_engine.csv_report import ( |
| 18 | + csv_add_test, |
| 19 | + csv_write_report, |
| 20 | + update_compliance_result, |
| 21 | +) |
| 22 | +from mtl_engine.stash import ( |
| 23 | + clear_issue, |
| 24 | + clear_result_log, |
| 25 | + clear_result_media, |
| 26 | + clear_result_note, |
| 27 | + get_issue, |
| 28 | + get_result_note, |
| 29 | + remove_result_media, |
| 30 | +) |
| 31 | +from pytest_mfd_logging.amber_log_formatter import AmberLogFormatter |
13 | 32 |
|
14 | 33 | logger = logging.getLogger(__name__)
|
15 | 34 | phase_report_key = pytest.StashKey[Dict[str, pytest.CollectReport]]()
|
@@ -88,32 +107,14 @@ def dma_port_list(request):
|
88 | 107 |
|
89 | 108 |
|
90 | 109 | @pytest.fixture(scope="session")
|
91 |
| -def nic_port_list(hosts: dict, mtl_path) -> list: |
| 110 | +def nic_port_list(hosts: dict, mtl_path) -> None: |
92 | 111 | for host in hosts.values():
|
93 |
| - # connection = host.connection |
94 |
| - # connection.enable_sudo() |
95 | 112 | nicctl = Nicctl(mtl_path, host)
|
96 | 113 | if int(host.network_interfaces[0].virtualization.get_current_vfs()) == 0:
|
97 | 114 | vfs = nicctl.create_vfs(host.network_interfaces[0].pci_address)
|
98 | 115 | vfs = nicctl.vfio_list()
|
99 | 116 | # Store VFs on the host object for later use
|
100 | 117 | host.vfs = vfs
|
101 |
| - # connection.disable_sudo() |
102 |
| - |
103 |
| - |
104 |
| -# def nic_port_list(request, media_config, hosts: dict = None) -> list: |
105 |
| -# vfs = [] |
106 |
| -# if hosts: |
107 |
| -# for host in hosts.values(): |
108 |
| -# if hasattr(host, "vfs"): |
109 |
| -# vfs.extend(host.vfs) |
110 |
| -# if vfs: |
111 |
| -# return vfs |
112 |
| -# # Fallback: use --nic parameter |
113 |
| -# nic_option = request.config.getoption("--nic") |
114 |
| -# if nic_option: |
115 |
| -# return [nic.strip() for nic in nic_option.split(",") if nic.strip()] |
116 |
| -# raise RuntimeError("No VFs found and --nic parameter not provided!") |
117 | 118 |
|
118 | 119 |
|
119 | 120 | @pytest.fixture(scope="session")
|
@@ -164,3 +165,94 @@ def pytest_addoption(parser):
|
164 | 165 | parser.addoption("--nic", help="list of PCI IDs of network devices")
|
165 | 166 | parser.addoption("--dma", help="list of PCI IDs of DMA devices")
|
166 | 167 | parser.addoption("--time", help="seconds to run every test (default=15)")
|
| 168 | + |
| 169 | + |
| 170 | +@pytest.fixture(scope="session", autouse=True) |
| 171 | +def log_session(): |
| 172 | + add_logging_level("TESTCMD", TESTCMD_LVL) |
| 173 | + |
| 174 | + today = datetime.datetime.today() |
| 175 | + folder = today.strftime("%Y-%m-%dT%H:%M:%S") |
| 176 | + path = os.path.join(LOG_FOLDER, folder) |
| 177 | + path_symlink = os.path.join(LOG_FOLDER, "latest") |
| 178 | + try: |
| 179 | + os.remove(path_symlink) |
| 180 | + except FileNotFoundError: |
| 181 | + pass |
| 182 | + os.makedirs(path, exist_ok=True) |
| 183 | + os.symlink(folder, path_symlink) |
| 184 | + yield |
| 185 | + shutil.copy("pytest.log", f"{LOG_FOLDER}/latest/pytest.log") |
| 186 | + csv_write_report(f"{LOG_FOLDER}/latest/report.csv") |
| 187 | + |
| 188 | + |
| 189 | +@pytest.fixture(scope="session", autouse=True) |
| 190 | +def compliance_report(request, log_session, test_config): |
| 191 | + """ |
| 192 | + This function is used for compliance check and report. |
| 193 | + """ |
| 194 | + # TODO: Implement compliance check logic. When tcpdump pcap is enabled, at the end of the test session all pcaps |
| 195 | + # shall be send into EBU list. |
| 196 | + # Pcaps shall be stored in the ramdisk, and then moved to the compliance |
| 197 | + # folder or send into EBU list after each test finished and remove it from the ramdisk. |
| 198 | + # Compliance report generation logic goes here after yield. Or in another class / function but triggered here. |
| 199 | + # AFAIK names of pcaps contains test name so it can be matched with result of each test like in code below. |
| 200 | + yield |
| 201 | + if test_config.get("compliance", False): |
| 202 | + logging.info("Compliance mode enabled, updating compliance results") |
| 203 | + for item in request.session.items: |
| 204 | + test_case = item.nodeid |
| 205 | + update_compliance_result(test_case, "Fail") |
| 206 | + |
| 207 | + |
| 208 | +@pytest.fixture(scope="function", autouse=True) |
| 209 | +def log_case(request, caplog: pytest.LogCaptureFixture): |
| 210 | + case_id = request.node.nodeid |
| 211 | + case_folder = os.path.dirname(case_id) |
| 212 | + os.makedirs(os.path.join(LOG_FOLDER, "latest", case_folder), exist_ok=True) |
| 213 | + logfile = os.path.join(LOG_FOLDER, "latest", f"{case_id}.log") |
| 214 | + fh = logging.FileHandler(logfile) |
| 215 | + formatter = request.session.config.pluginmanager.get_plugin( |
| 216 | + "logging-plugin" |
| 217 | + ).formatter |
| 218 | + format = AmberLogFormatter(formatter) |
| 219 | + fh.setFormatter(format) |
| 220 | + fh.setLevel(logging.DEBUG) |
| 221 | + logger = logging.getLogger() |
| 222 | + logger.addHandler(fh) |
| 223 | + clear_result_log() |
| 224 | + clear_issue() |
| 225 | + yield |
| 226 | + report = request.node.stash[phase_report_key] |
| 227 | + if report["setup"].failed: |
| 228 | + logging.log(level=TEST_FAIL, msg=f"Setup failed for {case_id}") |
| 229 | + os.chmod(logfile, 0o4755) |
| 230 | + result = "Fail" |
| 231 | + elif ("call" not in report) or report["call"].failed: |
| 232 | + logging.log(level=TEST_FAIL, msg=f"Test failed for {case_id}") |
| 233 | + os.chmod(logfile, 0o4755) |
| 234 | + result = "Fail" |
| 235 | + elif report["call"].passed: |
| 236 | + logging.log(level=TEST_PASS, msg=f"Test passed for {case_id}") |
| 237 | + os.chmod(logfile, 0o755) |
| 238 | + result = "Pass" |
| 239 | + else: |
| 240 | + logging.log(level=TEST_INFO, msg=f"Test skipped for {case_id}") |
| 241 | + result = "Skip" |
| 242 | + |
| 243 | + logger.removeHandler(fh) |
| 244 | + |
| 245 | + commands = [] |
| 246 | + for record in caplog.get_records("call"): |
| 247 | + if record.levelno == TESTCMD_LVL: |
| 248 | + commands.append(record.message) |
| 249 | + |
| 250 | + csv_add_test( |
| 251 | + test_case=case_id, |
| 252 | + commands="\n".join(commands), |
| 253 | + result=result, |
| 254 | + issue=get_issue(), |
| 255 | + result_note=get_result_note(), |
| 256 | + ) |
| 257 | + |
| 258 | + clear_result_note() |
0 commit comments