diff --git a/tests/validation/mtl_engine/GstreamerApp.py b/tests/validation/mtl_engine/GstreamerApp.py index 764304386..50bf5eb3c 100755 --- a/tests/validation/mtl_engine/GstreamerApp.py +++ b/tests/validation/mtl_engine/GstreamerApp.py @@ -5,6 +5,7 @@ import logging import os import time +import re from mtl_engine.RxTxApp import prepare_tcpdump @@ -294,6 +295,144 @@ def setup_gstreamer_st40p_rx_pipeline( return pipeline_command +def create_gstreamer_test_config( + test_type: str, + tx_host=None, + rx_host=None, + **kwargs +) -> dict: + """ + Create a standardized GStreamer test configuration following RxTxApp pattern. + + :param test_type: Type of test (st20p, st30, st40p) + :param tx_host: TX host object + :param rx_host: RX host object + :param kwargs: Additional configuration parameters + :return: Configuration dictionary + """ + base_config = { + "test_type": test_type, + "tx_host": tx_host, + "rx_host": rx_host, + "tx_dev_ip": "192.168.96.3", + "rx_dev_ip": "192.168.96.2", + "multicast_ip": "239.168.85.20", + "test_time": kwargs.get("test_time", 30), + "queues": kwargs.get("queues", 4), + "payload_type": kwargs.get("payload_type", 112), + } + + # Add test-type specific configuration + if test_type == "st20p": + base_config.update({ + "udp_port": 20000, + "width": kwargs.get("width", 1920), + "height": kwargs.get("height", 1080), + "framerate": kwargs.get("framerate", "p25"), + "format": kwargs.get("format", "YUV422PLANAR10LE"), + }) + elif test_type == "st30": + base_config.update({ + "udp_port": 30000, + "audio_format": kwargs.get("audio_format", "s16le"), + "channels": kwargs.get("channels", 2), + "sampling": kwargs.get("sampling", 48000), + }) + elif test_type == "st40p": + base_config.update({ + "udp_port": 40000, + "tx_framebuff_cnt": kwargs.get("tx_framebuff_cnt", 3), + "tx_fps": kwargs.get("tx_fps", 25), + "tx_did": kwargs.get("tx_did", 0x41), + "tx_sdid": kwargs.get("tx_sdid", 0x01), + "timeout": kwargs.get("timeout", 40000), + }) + + # Add any additional parameters from kwargs + for key, value in kwargs.items(): + if key not in base_config: + base_config[key] = value + + return base_config + + +def execute_gstreamer_test_from_config( + config: dict, + build: str, + input_file: str, + is_dual: bool = False +) -> bool: + """ + Execute GStreamer test from configuration, following RxTxApp pattern. + + :param config: Test configuration dictionary + :param build: Build path + :param input_file: Input file path + :param is_dual: Whether this is a dual-host test + :return: True if test passed, False otherwise + """ + test_type = config["test_type"] + + if is_dual: + if test_type == "st20p": + return execute_dual_st20p_test( + build=build, + tx_nic_port=config["tx_host"].vfs[0], + rx_nic_port=config["rx_host"].vfs[0], + input_path=input_file, + width=config["width"], + height=config["height"], + framerate=config["framerate"], + format=config["format"], + payload_type=config["payload_type"], + queues=config["queues"], + test_time=config["test_time"], + tx_host=config["tx_host"], + rx_host=config["rx_host"], + capture_cfg=config.get("capture_cfg"), + ) + elif test_type == "st30": + return execute_dual_st30_test( + build=build, + tx_nic_port=config["tx_host"].vfs[0], + rx_nic_port=config["rx_host"].vfs[0], + input_path=input_file, + payload_type=config["payload_type"], + queues=config["queues"], + audio_format=config["audio_format"], + channels=config["channels"], + sampling=config["sampling"], + test_time=config["test_time"], + tx_host=config["tx_host"], + rx_host=config["rx_host"], + capture_cfg=config.get("capture_cfg"), + ) + elif test_type == "st40p": + return execute_dual_st40_test( + build=build, + tx_nic_port=config["tx_host"].vfs[0], + rx_nic_port=config["rx_host"].vfs[0], + input_path=input_file, + payload_type=config["payload_type"], + queues=config["queues"], + tx_framebuff_cnt=config["tx_framebuff_cnt"], + tx_fps=config["tx_fps"], + tx_did=config["tx_did"], + tx_sdid=config["tx_sdid"], + timeout=config["timeout"], + test_time=config["test_time"], + tx_host=config["tx_host"], + rx_host=config["rx_host"], + capture_cfg=config.get("capture_cfg"), + ) + else: + # Single host test execution would go here + # Currently not implemented but could be added for consistency + raise NotImplementedError("Single host GStreamer tests not implemented in config pattern") + + return False + + def execute_test( build: str, tx_command: dict, @@ -516,3 +655,585 @@ def audio_format_change(file_format, rx_side: bool = False): return 16 else: return 24 + + +def create_dual_connection_params( + tx_dev_port: str, + rx_dev_port: str, + payload_type: str, + tx_dev_ip: str = "192.168.96.3", + rx_dev_ip: str = "192.168.96.2", + ip: str = "239.168.85.20", + udp_port: int = 20000, +) -> tuple: + """Create connection parameters for dual host test""" + tx_params = { + "dev-port": tx_dev_port, + "dev-ip": tx_dev_ip, + "ip": ip, + "udp-port": udp_port, + "payload-type": payload_type, + } + + rx_params = { + "dev-port": rx_dev_port, + "dev-ip": rx_dev_ip, + "ip": ip, + "udp-port": udp_port, + "payload-type": payload_type, + } + + return tx_params, rx_params + + +def _update_pipeline_for_dual_host( + command: list, + is_tx: bool, + tx_dev_ip: str = "192.168.96.3", + rx_dev_ip: str = "192.168.96.2", + multicast_ip: str = "239.168.85.20" +) -> list: + """ + Update a single-host pipeline command for dual-host networking. + + :param command: Original pipeline command list + :param is_tx: Whether this is a TX pipeline + :param tx_dev_ip: TX host device IP + :param rx_dev_ip: RX host device IP + :param multicast_ip: Multicast IP for communication + :return: Updated command list + """ + updated_command = command.copy() + + # Find and replace IP configuration for dual host setup + for i, arg in enumerate(updated_command): + if arg.startswith("dev-ip="): + if is_tx: + updated_command[i] = f"dev-ip={tx_dev_ip}" + else: + updated_command[i] = f"dev-ip={rx_dev_ip}" + elif arg.startswith("ip="): + # For dual host, use multicast IP + updated_command[i] = f"ip={multicast_ip}" + + return updated_command + + +def get_case_id() -> str: + """Get test case ID from environment""" + case_id = os.environ.get("PYTEST_CURRENT_TEST", "gstreamer_test") + # Extract the test function name and parameters + full_case = case_id[: case_id.rfind("(") - 1] if "(" in case_id else case_id + # Get the test name after the last :: + test_name = full_case.split("::")[-1] + return test_name + + +def sanitize_filename(name: str) -> str: + """Replace unsafe characters with underscores""" + import re + return re.sub(r"[^A-Za-z0-9_.-]", "_", name) + + +def log_to_file(message: str, host, build: str): + """Log message to a file on the remote host""" + import time + + test_name = sanitize_filename(get_case_id()) + timestamp = time.strftime("%Y%m%d_%H%M%S") + log_file = f"{build}/tests/{test_name}_{timestamp}_gstreamer.log" + + from .execute import run + + # Ensure parent directory exists + parent_dir = os.path.dirname(log_file) + run(f"mkdir -p {parent_dir}", host=host) + + # Append to file with timestamp + log_timestamp = time.strftime("%Y-%m-%d %H:%M:%S") + log_entry = f"[{log_timestamp}] {message}\n" + + try: + remote_conn = host.connection + f = remote_conn.path(log_file) + + if f.exists(): + current_content = f.read_text() + f.write_text(current_content + log_entry, encoding="utf-8") + else: + f.write_text(log_entry, encoding="utf-8") + except Exception as e: + logger.warning(f"Could not write to log file {log_file}: {e}") + + +def create_empty_output_files(output_suffix: str, number_of_files: int = 1, host=None, build: str = "") -> list: + """Create empty output files on remote host""" + output_files = [] + + # Create a timestamp for uniqueness + timestamp = time.strftime("%Y%m%d_%H%M%S") + test_name = sanitize_filename(get_case_id()) + + for i in range(number_of_files): + output_file = f"{build}/tests/{test_name}_{timestamp}_out_{i}.{output_suffix}" + output_files.append(output_file) + + try: + remote_conn = host.connection + f = remote_conn.path(output_file) + f.touch() + except Exception as e: + logger.warning(f"Could not create output file {output_file}: {e}") + + return output_files + + +def execute_dual_test( + build: str, + tx_command: list, + rx_command: list, + input_file: str, + output_file: str, + test_time: int = 30, + tx_host=None, + rx_host=None, + sleep_interval: int = 5, + tx_first: bool = True, + capture_cfg=None, +): + """ + Execute GStreamer dual test with separate TX and RX hosts. + + :param build: Build path on the remote hosts + :param tx_command: TX pipeline command list + :param rx_command: RX pipeline command list + :param input_file: Input file path on TX host + :param output_file: Output file path on RX host + :param test_time: Test duration in seconds + :param tx_host: TX host object with connection + :param rx_host: RX host object with connection + :param sleep_interval: Sleep interval between starting TX and RX + :param tx_first: Whether to start TX first + :param capture_cfg: Capture configuration for tcpdump + :return: True if test passed, False otherwise + """ + case_id = os.environ.get("PYTEST_CURRENT_TEST", "gstreamer_dual_test") + case_id = case_id[: case_id.rfind("(") - 1] if "(" in case_id else case_id + + logger.info(f"TX Host: {tx_host}") + logger.info(f"RX Host: {rx_host}") + logger.info(f"TX Command: {' '.join(tx_command)}") + logger.info(f"RX Command: {' '.join(rx_command)}") + + log_to_file(f"TX Host: {tx_host}", rx_host, build) + log_to_file(f"RX Host: {rx_host}", rx_host, build) + log_to_file(f"TX Command: {' '.join(tx_command)}", tx_host, build) + log_to_file(f"RX Command: {' '.join(rx_command)}", rx_host, build) + + tx_process = None + rx_process = None + # Use RX host for tcpdump capture + tcpdump = prepare_tcpdump(capture_cfg, rx_host) + + try: + if tx_first: + # Start TX pipeline first on TX host + logger.info("Starting TX pipeline on TX host...") + tx_process = run( + " ".join(tx_command), + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=tx_host, + background=True, + enable_sudo=True, + ) + time.sleep(sleep_interval) + + # Start RX pipeline on RX host + logger.info("Starting RX pipeline on RX host...") + rx_process = run( + " ".join(rx_command), + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=rx_host, + background=True, + enable_sudo=True, + ) + else: + # Start RX pipeline first on RX host + logger.info("Starting RX pipeline on RX host...") + rx_process = run( + " ".join(rx_command), + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=rx_host, + background=True, + enable_sudo=True, + ) + time.sleep(sleep_interval) + + # Start TX pipeline on TX host + logger.info("Starting TX pipeline on TX host...") + tx_process = run( + " ".join(tx_command), + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=tx_host, + background=True, + enable_sudo=True, + ) + + # Start tcpdump after pipelines are running + if tcpdump: + logger.info("Starting tcpdump capture...") + tcpdump.capture(capture_time=capture_cfg.get("capture_time", test_time)) + + # Let the test run for the specified duration + logger.info(f"Running test for {test_time} seconds...") + time.sleep(test_time) + + # Terminate processes gracefully + logger.info("Terminating processes...") + if tx_process: + try: + tx_process.terminate() + except Exception: + pass + if rx_process: + try: + rx_process.terminate() + except Exception: + pass + + # Wait a bit for termination + time.sleep(2) + + # Get output after processes have been terminated + try: + if rx_process and hasattr(rx_process, "stdout_text"): + output_rx = rx_process.stdout_text.splitlines() + for line in output_rx: + logger.info(f"RX Output: {line}") + log_to_file(f"RX Output:\n{rx_process.stdout_text}", rx_host, build) + except Exception: + logger.info("Could not retrieve RX output") + + try: + if tx_process and hasattr(tx_process, "stdout_text"): + output_tx = tx_process.stdout_text.splitlines() + for line in output_tx: + logger.info(f"TX Output: {line}") + log_to_file(f"TX Output:\n{tx_process.stdout_text}", tx_host, build) + except Exception: + logger.info("Could not retrieve TX output") + + except Exception as e: + log_fail(f"Error during dual test execution: {e}") + raise + finally: + # Ensure processes are terminated + if tx_process: + try: + tx_process.terminate() + tx_process.wait(timeout=10) + except Exception: + pass + if rx_process: + try: + rx_process.terminate() + rx_process.wait(timeout=10) + except Exception: + pass + if tcpdump: + tcpdump.stop() + + # Compare files for validation + file_compare = compare_dual_files(input_file, output_file, tx_host, rx_host) + logger.info(f"File comparison: {file_compare}") + + return file_compare + + +def compare_dual_files(input_file: str, output_file: str, tx_host, rx_host) -> bool: + """Compare input file on TX host with output file on RX host""" + try: + # Get input file hash from TX host + from .execute import run + + input_md5_proc = run(f"md5sum {input_file}", host=tx_host) + if input_md5_proc.return_code != 0: + log_fail(f"Could not get MD5 of input file: {input_file}") + return False + input_hash = input_md5_proc.stdout_text.split()[0] + + # Get input file size from TX host + input_stat_proc = run(f"stat -c '%s' {input_file}", host=tx_host) + if input_stat_proc.return_code != 0: + log_fail(f"Could not get size of input file: {input_file}") + return False + input_size = int(input_stat_proc.stdout_text.strip()) + + # Get output file hash from RX host + output_md5_proc = run(f"md5sum {output_file}", host=rx_host) + if output_md5_proc.return_code != 0: + log_fail(f"Could not get MD5 of output file: {output_file}") + return False + output_hash = output_md5_proc.stdout_text.split()[0] + + # Get output file size from RX host + output_stat_proc = run(f"stat -c '%s' {output_file}", host=rx_host) + if output_stat_proc.return_code != 0: + log_fail(f"Could not get size of output file: {output_file}") + return False + output_size = int(output_stat_proc.stdout_text.strip()) + + logger.info(f"Input file size: {input_size}") + logger.info(f"Output file size: {output_size}") + logger.info(f"Input file hash: {input_hash}") + logger.info(f"Output file hash: {output_hash}") + + if input_size != output_size: + log_fail("File sizes are different") + return False + + if input_hash != output_hash: + log_fail("File hashes are different") + return False + + return True + + except Exception as e: + log_fail(f"Error comparing files: {e}") + return False + + +def execute_dual_st20p_test( + build: str, + tx_nic_port: str, + rx_nic_port: str, + input_path: str, + width: int, + height: int, + framerate: str, + format: str, + payload_type: int, + queues: int = 1, + test_time: int = 30, + tx_host=None, + rx_host=None, + tx_framebuff_num: int = None, + rx_framebuff_num: int = None, + tx_fps: int = None, + rx_fps: int = None, + capture_cfg=None, +): + """ + Execute ST20P dual test with GStreamer TX and RX on separate hosts. + """ + # Create output file on RX host + output_files = create_empty_output_files("yuv", 1, rx_host, build) + output_path = output_files[0] + + # Setup TX pipeline using single host function + tx_command = setup_gstreamer_st20p_tx_pipeline( + build=build, + nic_port_list=tx_nic_port, + input_path=input_path, + width=width, + height=height, + framerate=framerate, + format=format, + tx_payload_type=payload_type, + tx_queues=queues, + tx_framebuff_num=tx_framebuff_num, + tx_fps=tx_fps, + ) + + # Setup RX pipeline using single host function + rx_command = setup_gstreamer_st20p_rx_pipeline( + build=build, + nic_port_list=rx_nic_port, + output_path=output_path, + width=width, + height=height, + framerate=framerate, + format=format, + rx_payload_type=payload_type, + rx_queues=queues, + rx_framebuff_num=rx_framebuff_num, + rx_fps=rx_fps, + ) + + # Update commands for dual host networking + tx_command = _update_pipeline_for_dual_host(tx_command, is_tx=True) + rx_command = _update_pipeline_for_dual_host(rx_command, is_tx=False) + + # Execute dual test + result = execute_dual_test( + build=build, + tx_command=tx_command, + rx_command=rx_command, + input_file=input_path, + output_file=output_path, + test_time=test_time, + tx_host=tx_host, + rx_host=rx_host, + capture_cfg=capture_cfg, + ) + + # Clean up output files + try: + from .execute import run + run(f"rm -f {output_path}", host=rx_host) + logger.info(f"Removed output file: {output_path}") + except Exception as e: + logger.info(f"Could not remove output file: {e}") + + return result + + +def execute_dual_st30_test( + build: str, + tx_nic_port: str, + rx_nic_port: str, + input_path: str, + payload_type: int, + queues: int = 1, + audio_format: str = "s16le", + channels: int = 2, + sampling: int = 48000, + test_time: int = 30, + tx_host=None, + rx_host=None, + capture_cfg=None, +): + # Create output file on RX host + output_files = create_empty_output_files("pcm", 1, rx_host, build) + output_path = output_files[0] + + # Setup TX pipeline using single host function + tx_command = setup_gstreamer_st30_tx_pipeline( + build=build, + nic_port_list=tx_nic_port, + input_path=input_path, + tx_payload_type=payload_type, + tx_queues=queues, + audio_format=audio_format, + channels=channels, + sampling=sampling, + ) + + # Setup RX pipeline using single host function + rx_command = setup_gstreamer_st30_rx_pipeline( + build=build, + nic_port_list=rx_nic_port, + output_path=output_path, + rx_payload_type=payload_type, + rx_queues=queues, + rx_audio_format=audio_format_change(audio_format, rx_side=True), + rx_channels=channels, + rx_sampling=sampling, + ) + + # Update commands for dual host networking + tx_command = _update_pipeline_for_dual_host(tx_command, is_tx=True) + rx_command = _update_pipeline_for_dual_host(rx_command, is_tx=False) + + # Execute dual test + result = execute_dual_test( + build=build, + tx_command=tx_command, + rx_command=rx_command, + input_file=input_path, + output_file=output_path, + test_time=test_time, + tx_host=tx_host, + rx_host=rx_host, + capture_cfg=capture_cfg, + ) + + # Clean up output files + try: + from .execute import run + run(f"rm -f {output_path}", host=rx_host) + logger.info(f"Removed output file: {output_path}") + except Exception as e: + logger.info(f"Could not remove output file: {e}") + + return result + + +def execute_dual_st40_test( + build: str, + tx_nic_port: str, + rx_nic_port: str, + input_path: str, + payload_type: int, + queues: int = 1, + tx_framebuff_cnt: int = 3, + tx_fps: int = 25, + tx_did: int = 0x41, + tx_sdid: int = 0x01, + timeout: int = 40000, + test_time: int = 30, + tx_host=None, + rx_host=None, + capture_cfg=None, +): + # Create output file on RX host + output_files = create_empty_output_files("anc", 1, rx_host, build) + output_path = output_files[0] + + # Setup TX pipeline using single host function + tx_command = setup_gstreamer_st40p_tx_pipeline( + build=build, + nic_port_list=tx_nic_port, + input_path=input_path, + tx_payload_type=payload_type, + tx_queues=queues, + tx_framebuff_cnt=tx_framebuff_cnt, + tx_fps=tx_fps, + tx_did=tx_did, + tx_sdid=tx_sdid, + ) + + # Setup RX pipeline using single host function + rx_command = setup_gstreamer_st40p_rx_pipeline( + build=build, + nic_port_list=rx_nic_port, + output_path=output_path, + rx_payload_type=payload_type, + rx_queues=queues, + timeout=timeout, + ) + + # Update commands for dual host networking + tx_command = _update_pipeline_for_dual_host(tx_command, is_tx=True) + rx_command = _update_pipeline_for_dual_host(rx_command, is_tx=False) + + # Execute dual test + result = execute_dual_test( + build=build, + tx_command=tx_command, + rx_command=rx_command, + input_file=input_path, + output_file=output_path, + test_time=test_time, + tx_host=tx_host, + rx_host=rx_host, + capture_cfg=capture_cfg, + ) + + # Clean up output files + try: + from .execute import run + run(f"rm -f {output_path}", host=rx_host) + logger.info(f"Removed output file: {output_path}") + except Exception as e: + logger.info(f"Could not remove output file: {e}") + + return result diff --git a/tests/validation/mtl_engine/RxTxApp.py b/tests/validation/mtl_engine/RxTxApp.py index 8524aabe2..423ebf7fc 100755 --- a/tests/validation/mtl_engine/RxTxApp.py +++ b/tests/validation/mtl_engine/RxTxApp.py @@ -1482,7 +1482,7 @@ def add_dual_interfaces( tx_config["interfaces"][0]["name"] = tx_nic_port_list[0] # Configure RX host interface only - rx_config["interfaces"][0]["name"] = rx_nic_port_list[1] + rx_config["interfaces"][0]["name"] = rx_nic_port_list[0] if test_mode == "unicast": tx_config["interfaces"][0]["ip"] = unicast_ip_dict["tx_interfaces"] @@ -1673,20 +1673,27 @@ def execute_dual_test( tx_config = config["tx_config"] rx_config = config["rx_config"] + tx_config_json = json.dumps(tx_config, indent=4) + rx_config_json = json.dumps(rx_config, indent=4) + # Log test start logger.info(f"Starting dual RxTxApp test: {get_case_id()}") log_to_file(f"Starting dual RxTxApp test: {get_case_id()}", tx_host, build) log_to_file(f"Starting dual RxTxApp test: {get_case_id()}", rx_host, build) - log_to_file(f"TX config: {json.dumps(tx_config, indent=4)}", tx_host, build) - log_to_file(f"RX config: {json.dumps(rx_config, indent=4)}", rx_host, build) + log_to_file(f"TX config: {tx_config_json}", tx_host, build) + log_to_file(f"RX config: {rx_config_json}", rx_host, build) # Prepare TX config + tx_config_file = f"{build}/tests/tx_config.json" tx_f = tx_host.connection.path(build, "tests", "tx_config.json") - tx_f.write_text(tx_config, encoding="utf-8") + tx_json_content = tx_config_json.replace('"', '\\"') + tx_f.write_text(tx_json_content) # Prepare RX config - rx_f = tx_host.connection.path(build, "tests", "rx_config.json") - rx_f.write_text(rx_config, encoding="utf-8") + rx_config_file = f"{build}/tests/rx_config.json" + rx_f = rx_host.connection.path(build, "tests", "rx_config.json") + rx_json_content = rx_config_json.replace('"', '\\"') + rx_f.write_text(rx_json_content) # Adjust test_time for high-res/fps/replicas if ( @@ -1710,8 +1717,8 @@ def execute_dual_test( if ptp: base_command += " --ptp" - tx_command = f"{base_command} --config_file {tx_config}" - rx_command = f"{base_command} --config_file {rx_config}" + tx_command = f"{base_command} --config_file {tx_config_file}" + rx_command = f"{base_command} --config_file {rx_config_file}" logger.info(f"TX Command: {tx_command}") logger.info(f"RX Command: {rx_command}") diff --git a/tests/validation/mtl_engine/ffmpeg_app.py b/tests/validation/mtl_engine/ffmpeg_app.py index 9697fd134..7d1b5d7e1 100755 --- a/tests/validation/mtl_engine/ffmpeg_app.py +++ b/tests/validation/mtl_engine/ffmpeg_app.py @@ -113,17 +113,21 @@ def execute_test( if not multiple_sessions: output_files = create_empty_output_files(output_format, 1, host, build) rx_cmd = ( - f"ffmpeg -p_port {nic_port_list[0]} -p_sip {ip_dict['rx_interfaces']} " - f"-p_rx_ip {ip_dict['rx_sessions']} -udp_port 20000 -payload_type 112 " - f"-fps {fps} -pix_fmt yuv422p10le -video_size {video_size} " - f"-f mtl_st20p -i k {ffmpeg_rx_f_flag} {output_files[0]} -y" + f"ffmpeg -p_port {nic_port_list[0]} " + f"-p_sip {ip_dict['rx_interfaces']} " + f"-p_rx_ip {ip_dict['rx_sessions']} -udp_port 20000 " + f"-payload_type 112 -fps {fps} -pix_fmt yuv422p10le " + f"-video_size {video_size} -f mtl_st20p -i k " + f"{ffmpeg_rx_f_flag} {output_files[0]} -y" ) if tx_is_ffmpeg: tx_cmd = ( - f"ffmpeg -video_size {video_size} -f rawvideo -pix_fmt yuv422p10le " - f"-i {video_url} -filter:v fps={fps} -p_port {nic_port_list[1]} " - f"-p_sip {ip_dict['tx_interfaces']} -p_tx_ip {ip_dict['tx_sessions']} " - f"-udp_port 20000 -payload_type 112 -f mtl_st20p -" + f"ffmpeg -video_size {video_size} -f rawvideo " + f"-pix_fmt yuv422p10le -i {video_url} " + f"-filter:v fps={fps} -p_port {nic_port_list[1]} " + f"-p_sip {ip_dict['tx_interfaces']} " + f"-p_tx_ip {ip_dict['tx_sessions']} -udp_port 20000 " + f"-payload_type 112 -f mtl_st20p -" ) else: # tx is rxtxapp tx_config_file = generate_rxtxapp_tx_config( @@ -134,21 +138,25 @@ def execute_test( output_files = create_empty_output_files(output_format, 2, host, build) rx_cmd = ( f"ffmpeg -p_sip {ip_dict['rx_interfaces']} " - f"-p_port {nic_port_list[0]} -p_rx_ip {ip_dict['rx_sessions']} " - f"-udp_port 20000 -payload_type 112 -fps {fps} -pix_fmt yuv422p10le " + f"-p_port {nic_port_list[0]} " + f"-p_rx_ip {ip_dict['rx_sessions']} -udp_port 20000 " + f"-payload_type 112 -fps {fps} -pix_fmt yuv422p10le " f"-video_size {video_size} -f mtl_st20p -i 1 " - f"-p_port {nic_port_list[0]} -p_rx_ip {ip_dict['rx_sessions']} " - f"-udp_port 20002 -payload_type 112 -fps {fps} -pix_fmt yuv422p10le " + f"-p_port {nic_port_list[0]} " + f"-p_rx_ip {ip_dict['rx_sessions']} -udp_port 20002 " + f"-payload_type 112 -fps {fps} -pix_fmt yuv422p10le " f"-video_size {video_size} -f mtl_st20p -i 2 " f"-map 0:0 {ffmpeg_rx_f_flag} {output_files[0]} -y " f"-map 1:0 {ffmpeg_rx_f_flag} {output_files[1]} -y" ) if tx_is_ffmpeg: tx_cmd = ( - f"ffmpeg -video_size {video_size} -f rawvideo -pix_fmt yuv422p10le " - f"-i {video_url} -filter:v fps={fps} -p_port {nic_port_list[1]} " - f"-p_sip {ip_dict['tx_interfaces']} -p_tx_ip {ip_dict['tx_sessions']} " - f"-udp_port 20000 -payload_type 112 -f mtl_st20p -" + f"ffmpeg -video_size {video_size} -f rawvideo " + f"-pix_fmt yuv422p10le -i {video_url} " + f"-filter:v fps={fps} -p_port {nic_port_list[1]} " + f"-p_sip {ip_dict['tx_interfaces']} " + f"-p_tx_ip {ip_dict['tx_sessions']} -udp_port 20000 " + f"-payload_type 112 -f mtl_st20p -" ) else: # tx is rxtxapp tx_config_file = generate_rxtxapp_tx_config( @@ -993,3 +1001,571 @@ def decode_video_format_to_st20p(video_format: str) -> tuple: else: log_fail(f"Invalid video format: {video_format}") return None + + +def execute_dual_test( + test_time: int, + build: str, + tx_host, + rx_host, + type_: str, + video_format: str, + pg_format: str, + video_url: str, + output_format: str, + multiple_sessions: bool = False, + tx_is_ffmpeg: bool = True, + capture_cfg=None, +): + # Initialize logging for this test + init_test_logging() + + case_id = os.environ.get("PYTEST_CURRENT_TEST", "ffmpeg_test") + case_id = case_id[: case_id.rfind("(") - 1] if "(" in case_id else case_id + + tx_nic_port_list = tx_host.vfs + rx_nic_port_list = rx_host.vfs + video_size, fps = decode_video_format_16_9(video_format) + match output_format: + case "yuv": + ffmpeg_rx_f_flag = "-f rawvideo" + case "h264": + ffmpeg_rx_f_flag = "-c:v libopenh264" + if not multiple_sessions: + output_files = create_empty_output_files(output_format, 1, rx_host, build) + rx_cmd = ( + f"ffmpeg -p_port {rx_nic_port_list[0]} -p_sip {ip_dict['rx_interfaces']} " + f"-p_rx_ip {ip_dict['rx_sessions']} -udp_port 20000 -payload_type 112 " + f"-fps {fps} -pix_fmt yuv422p10le -video_size {video_size} " + f"-f mtl_st20p -i k {ffmpeg_rx_f_flag} {output_files[0]} -y" + ) + if tx_is_ffmpeg: + tx_cmd = ( + f"ffmpeg -video_size {video_size} -f rawvideo -pix_fmt yuv422p10le " + f"-i {video_url} -filter:v fps={fps} -p_port {tx_nic_port_list[0]} " + f"-p_sip {ip_dict['tx_interfaces']} -p_tx_ip {ip_dict['tx_sessions']} " + f"-udp_port 20000 -payload_type 112 -f mtl_st20p -" + ) + else: # tx is rxtxapp + tx_config_file = generate_rxtxapp_tx_config( + tx_nic_port_list[0], video_format, video_url, tx_host, build + ) + tx_cmd = f"{RXTXAPP_PATH} --config_file {tx_config_file}" + else: # multiple sessions + output_files = create_empty_output_files(output_format, 2, rx_host, build) + rx_cmd = ( + f"ffmpeg -p_sip {ip_dict['rx_interfaces']} " + f"-p_port {rx_nic_port_list[0]} -p_rx_ip {ip_dict['rx_sessions']} " + f"-udp_port 20000 -payload_type 112 -fps {fps} -pix_fmt yuv422p10le " + f"-video_size {video_size} -f mtl_st20p -i 1 " + f"-p_port {rx_nic_port_list[0]} -p_rx_ip {ip_dict['rx_sessions']} " + f"-udp_port 20002 -payload_type 112 -fps {fps} -pix_fmt yuv422p10le " + f"-video_size {video_size} -f mtl_st20p -i 2 " + f"-map 0:0 {ffmpeg_rx_f_flag} {output_files[0]} -y " + f"-map 1:0 {ffmpeg_rx_f_flag} {output_files[1]} -y" + ) + if tx_is_ffmpeg: + tx_cmd = ( + f"ffmpeg -video_size {video_size} -f rawvideo -pix_fmt yuv422p10le " + f"-i {video_url} -filter:v fps={fps} -p_port {tx_nic_port_list[0]} " + f"-p_sip {ip_dict['tx_interfaces']} -p_tx_ip {ip_dict['tx_sessions']} " + f"-udp_port 20000 -payload_type 112 -f mtl_st20p -" + ) + else: # tx is rxtxapp + tx_config_file = generate_rxtxapp_tx_config( + tx_nic_port_list[0], video_format, video_url, tx_host, build, True + ) + tx_cmd = f"{RXTXAPP_PATH} --config_file {tx_config_file}" + + logger.info(f"TX Host: {tx_host}") + logger.info(f"RX Host: {rx_host}") + logger.info(f"RX Command: {rx_cmd}") + logger.info(f"TX Command: {tx_cmd}") + log_to_file(f"TX Host: {tx_host}", rx_host, build) + log_to_file(f"RX Host: {rx_host}", rx_host, build) + log_to_file(f"RX Command: {rx_cmd}", rx_host, build) + log_to_file(f"TX Command: {tx_cmd}", tx_host, build) + + rx_proc = None + tx_proc = None + # Use RX host for tcpdump capture + tcpdump = prepare_tcpdump(capture_cfg, rx_host) + + try: + # Start RX pipeline first on RX host + logger.info("Starting RX pipeline on RX host...") + rx_proc = run( + rx_cmd, + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=rx_host, + background=True, + enable_sudo=True, + ) + time.sleep(2) + + # Start TX pipeline on TX host + logger.info("Starting TX pipeline on TX host...") + tx_proc = run( + tx_cmd, + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=tx_host, + background=True, + enable_sudo=True, + ) + # Start tcpdump after pipelines are running + if tcpdump: + logger.info("Starting tcpdump capture...") + tcpdump.capture(capture_time=capture_cfg.get("capture_time", test_time)) + + # Let the test run for the specified duration + logger.info(f"Running test for {test_time} seconds...") + time.sleep(test_time) + + logger.info("Terminating processes...") + if tx_proc: + try: + tx_proc.terminate() + except Exception: + pass + if rx_proc: + try: + rx_proc.terminate() + except Exception: + pass + # Wait a bit for termination + time.sleep(2) + # Get output after processes have been terminated + try: + rx_output = f"RX Output:\n{rx_proc.stdout_text}" + log_to_file(rx_output, rx_host, build) + except Exception: + logger.info("Could not retrieve RX output") + try: + tx_output = f"TX Output:\n{tx_proc.stdout_text}" + log_to_file(tx_output, tx_host, build) + except Exception: + logger.info("Could not retrieve TX output") + except Exception as e: + log_fail(f"Error during test execution: {e}") + # Terminate processes immediately on error + if tx_proc: + try: + tx_proc.terminate() + except Exception: + pass + if rx_proc: + try: + rx_proc.terminate() + except Exception: + pass + raise + finally: + # Ensure processes are terminated with force kill if needed + if tx_proc: + try: + tx_proc.terminate() + tx_proc.wait(timeout=5) + except Exception: + try: + # Force kill if terminate didn't work + tx_proc.kill() + tx_proc.wait(timeout=5) + except Exception: + pass + if rx_proc: + try: + rx_proc.terminate() + rx_proc.wait(timeout=5) + except Exception: + try: + # Force kill if terminate didn't work + rx_proc.kill() + rx_proc.wait(timeout=5) + except Exception: + pass + if tcpdump: + tcpdump.stop() + passed = False + match output_format: + case "yuv": + passed = check_output_video_yuv(output_files[0], rx_host, build, video_url) + case "h264": + passed = check_output_video_h264( + output_files[0], video_size, rx_host, build, video_url + ) + # Clean up output files after validation + try: + for output_file in output_files: + run(f"rm -f {output_file}", host=rx_host) + logger.info(f"Removed output file: {output_file}") + except Exception as e: + logger.info(f"Could not remove output files: {e}") + if not passed: + log_fail("test failed") + return passed + + +def execute_dual_test_rgb24( + test_time: int, + build: str, + tx_host, + rx_host, + type_: str, + video_format: str, + pg_format: str, + video_url: str, + capture_cfg=None, +): + # Initialize logging for this test + init_test_logging() + + # Use separate NIC port lists for TX and RX hosts + tx_nic_port_list = tx_host.vfs + rx_nic_port_list = rx_host.vfs + video_size, fps = decode_video_format_16_9(video_format) + + logger.info( + f"Creating RX config for RGB24 dual test with video_format: {video_format}" + ) + log_to_file( + f"Creating RX config for RGB24 dual test with video_format: {video_format}", + rx_host, + build, + ) + try: + rx_config_file = generate_rxtxapp_rx_config( + rx_nic_port_list[0], video_format, rx_host, build + ) + logger.info(f"Successfully created RX config file: {rx_config_file}") + log_to_file( + f"Successfully created RX config file: {rx_config_file}", rx_host, build + ) + except Exception as e: + log_fail(f"Failed to create RX config file: {e}") + log_to_file(f"Failed to create RX config file: {e}", rx_host, build) + return False + + rx_cmd = f"{RXTXAPP_PATH} --config_file {rx_config_file} --test_time {test_time}" + tx_cmd = ( + f"ffmpeg -stream_loop -1 -video_size {video_size} -f rawvideo -pix_fmt rgb24 " + f"-i {video_url} -filter:v fps={fps} -p_port {tx_nic_port_list[0]} " + f"-p_sip {ip_dict['tx_interfaces']} -p_tx_ip {ip_dict['tx_sessions']} " + f"-udp_port 20000 -payload_type 112 -f mtl_st20p -" + ) + + logger.info(f"TX Host: {tx_host}") + logger.info(f"RX Host: {rx_host}") + logger.info(f"RX Command: {rx_cmd}") + logger.info(f"TX Command: {tx_cmd}") + log_to_file(f"TX Host: {tx_host}", rx_host, build) + log_to_file(f"RX Host: {rx_host}", rx_host, build) + log_to_file(f"RX Command: {rx_cmd}", rx_host, build) + log_to_file(f"TX Command: {tx_cmd}", tx_host, build) + + rx_proc = None + tx_proc = None + # Use RX host for tcpdump capture + tcpdump = prepare_tcpdump(capture_cfg, rx_host) + + try: + # Start RX pipeline first on RX host + logger.info("Starting RX pipeline on RX host...") + rx_proc = run( + rx_cmd, + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=rx_host, + background=True, + enable_sudo=True, + ) + time.sleep(5) + + # Start TX pipeline on TX host + logger.info("Starting TX pipeline on TX host...") + tx_proc = run( + tx_cmd, + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=tx_host, + background=True, + enable_sudo=True, + ) + + # Start tcpdump after pipelines are running + if tcpdump: + logger.info("Starting tcpdump capture...") + tcpdump.capture(capture_time=capture_cfg.get("capture_time", test_time)) + + logger.info( + f"Waiting for RX process to complete (test_time: {test_time} seconds)..." + ) + rx_proc.wait() + logger.info("RX process completed") + + # Terminate TX process after RX completes + logger.info("Terminating TX process...") + if tx_proc: + try: + tx_proc.terminate() + tx_proc.wait(timeout=5) + logger.info("TX process terminated successfully") + except Exception: + try: + tx_proc.kill() + tx_proc.wait(timeout=5) + logger.info("TX process killed") + except Exception: + logger.info("Could not terminate TX process") + + rx_output = "" + try: + rx_output = rx_proc.stdout_text + log_to_file(f"RX Output:\n{rx_output}", rx_host, build) + logger.info("RX output captured successfully") + except Exception as e: + logger.info(f"Error retrieving RX output: {e}") + log_to_file(f"Error retrieving RX output: {e}", rx_host, build) + + try: + log_to_file(f"TX Output:\n{tx_proc.stdout_text}", tx_host, build) + logger.info("TX output captured successfully") + except Exception as e: + log_to_file(f"Error retrieving TX output: {e}", tx_host, build) + + except Exception as e: + log_fail(f"Error during test execution: {e}") + # Terminate processes immediately on error + if tx_proc: + try: + tx_proc.terminate() + except Exception: + pass + if rx_proc: + try: + rx_proc.terminate() + except Exception: + pass + raise + finally: + # Final cleanup - ensure processes are terminated + if tx_proc: + try: + tx_proc.terminate() + tx_proc.wait(timeout=3) + except Exception: + try: + tx_proc.kill() + tx_proc.wait(timeout=3) + except Exception: + pass + if rx_proc: + try: + rx_proc.terminate() + rx_proc.wait(timeout=3) + except Exception: + try: + rx_proc.kill() + rx_proc.wait(timeout=3) + except Exception: + pass + if tcpdump: + tcpdump.stop() + + if not check_output_rgb24(rx_output, 1): + log_fail("rx video sessions failed") + return False + time.sleep(5) + return True + + +def execute_dual_test_rgb24_multiple( + test_time: int, + build: str, + tx_host, + rx_host, + type_: str, + video_format_list: list, + pg_format: str, + video_url_list: list, + capture_cfg=None, +): + # Initialize logging for this test + init_test_logging() + + # Use separate NIC port lists for TX and RX hosts + tx_nic_port_list = tx_host.vfs + rx_nic_port_list = rx_host.vfs + video_size_1, fps_1 = decode_video_format_16_9(video_format_list[0]) + video_size_2, fps_2 = decode_video_format_16_9(video_format_list[1]) + + logger.info( + f"Creating RX config for RGB24 multiple dual test with video_formats: {video_format_list}" + ) + log_to_file( + f"Creating RX config for RGB24 multiple dual test with video_formats: {video_format_list}", + rx_host, + build, + ) + try: + rx_config_file = generate_rxtxapp_rx_config_multiple( + rx_nic_port_list[:2], video_format_list, rx_host, build, True + ) + logger.info(f"Successfully created RX config file: {rx_config_file}") + log_to_file( + f"Successfully created RX config file: {rx_config_file}", rx_host, build + ) + except Exception as e: + log_fail(f"Failed to create RX config file: {e}") + log_to_file(f"Failed to create RX config file: {e}", rx_host, build) + return False + + rx_cmd = f"{RXTXAPP_PATH} --config_file {rx_config_file} --test_time {test_time}" + tx_1_cmd = ( + f"ffmpeg -stream_loop -1 -video_size {video_size_1} -f rawvideo -pix_fmt rgb24 " + f"-i {video_url_list[0]} -filter:v fps={fps_1} -p_port {tx_nic_port_list[0]} " + f"-p_sip {ip_dict_rgb24_multiple['p_sip_1']} " + f"-p_tx_ip {ip_dict_rgb24_multiple['p_tx_ip_1']} " + f"-udp_port 20000 -payload_type 112 -f mtl_st20p -" + ) + tx_2_cmd = ( + f"ffmpeg -stream_loop -1 -video_size {video_size_2} -f rawvideo -pix_fmt rgb24 " + f"-i {video_url_list[1]} -filter:v fps={fps_2} -p_port {tx_nic_port_list[1]} " + f"-p_sip {ip_dict_rgb24_multiple['p_sip_2']} " + f"-p_tx_ip {ip_dict_rgb24_multiple['p_tx_ip_2']} " + f"-udp_port 20000 -payload_type 112 -f mtl_st20p -" + ) + + logger.info(f"TX Host: {tx_host}") + logger.info(f"RX Host: {rx_host}") + logger.info(f"RX Command: {rx_cmd}") + logger.info(f"TX1 Command: {tx_1_cmd}") + logger.info(f"TX2 Command: {tx_2_cmd}") + log_to_file(f"TX Host: {tx_host}", rx_host, build) + log_to_file(f"RX Host: {rx_host}", rx_host, build) + log_to_file(f"RX Command: {rx_cmd}", rx_host, build) + log_to_file(f"TX1 Command: {tx_1_cmd}", tx_host, build) + log_to_file(f"TX2 Command: {tx_2_cmd}", tx_host, build) + + rx_proc = None + tx_1_proc = None + tx_2_proc = None + # Use RX host for tcpdump capture + tcpdump = prepare_tcpdump(capture_cfg, rx_host) + + try: + # Start RX pipeline first on RX host + logger.info("Starting RX pipeline on RX host...") + rx_proc = run( + rx_cmd, + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=rx_host, + background=True, + enable_sudo=True, + ) + time.sleep(5) + + # Start TX pipelines on TX host + logger.info("Starting TX pipelines on TX host...") + tx_1_proc = run( + tx_1_cmd, + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=tx_host, + background=True, + enable_sudo=True, + ) + tx_2_proc = run( + tx_2_cmd, + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=tx_host, + background=True, + enable_sudo=True, + ) + + # Start tcpdump after pipelines are running + if tcpdump: + logger.info("Starting tcpdump capture...") + tcpdump.capture(capture_time=capture_cfg.get("capture_time", test_time)) + + logger.info(f"Waiting for RX process (test_time: {test_time} seconds)...") + rx_proc.wait() + logger.info("RX process completed") + + # Terminate TX processes after RX completes + logger.info("Terminating TX processes...") + for proc in [tx_1_proc, tx_2_proc]: + if proc: + try: + proc.terminate() + proc.wait(timeout=5) + logger.info("TX process terminated successfully") + except Exception: + try: + proc.kill() + proc.wait(timeout=5) + logger.info("TX process killed") + except Exception: + logger.info("Could not terminate TX process") + + rx_output = "" + try: + rx_output = rx_proc.stdout_text + log_to_file(f"RX Output:\n{rx_output}", rx_host, build) + logger.info("RX output captured successfully") + except Exception as e: + logger.info(f"Error retrieving RX output: {e}") + log_to_file(f"Error retrieving RX output: {e}", rx_host, build) + + try: + log_to_file(f"TX1 Output:\n{tx_1_proc.stdout_text}", tx_host, build) + logger.info("TX1 output captured successfully") + except Exception as e: + logger.info(f"Error retrieving TX1 output: {e}") + + try: + log_to_file(f"TX2 Output:\n{tx_2_proc.stdout_text}", tx_host, build) + logger.info("TX2 output captured successfully") + except Exception as e: + logger.info(f"Error retrieving TX2 output: {e}") + except Exception as e: + log_fail(f"Error during test execution: {e}") + # Terminate processes immediately on error + for proc in [tx_1_proc, tx_2_proc, rx_proc]: + if proc: + try: + proc.terminate() + except Exception: + pass + raise + finally: + # Final cleanup - ensure processes are terminated + for proc in [tx_1_proc, tx_2_proc, rx_proc]: + if proc: + try: + proc.terminate() + proc.wait(timeout=3) + except Exception: + try: + proc.kill() + proc.wait(timeout=3) + except Exception: + pass + if tcpdump: + tcpdump.stop() + + if not check_output_rgb24(rx_output, 2): + log_fail("rx video session failed") + return False + time.sleep(5) + return True diff --git a/tests/validation/tests/dual/ffmpeg/__init__.py b/tests/validation/tests/dual/ffmpeg/__init__.py new file mode 100755 index 000000000..e69de29bb diff --git a/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_ffmpeg_dual.py b/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_ffmpeg_dual.py new file mode 100755 index 000000000..95d96cd4a --- /dev/null +++ b/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_ffmpeg_dual.py @@ -0,0 +1,59 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import os + +import pytest +from mtl_engine import ffmpeg_app +from mtl_engine.media_files import yuv_files + + +@pytest.mark.parametrize( + "video_format, test_time_multipler,", + [ + ("i1080p25", 1), + ("i1080p50", 1), + ("i1080p60", 2), + ("i2160p60", 3), + ], +) +@pytest.mark.parametrize("output_format", ["yuv", "h264"]) +def test_rx_ffmpeg_tx_ffmpeg_dual( + hosts, + test_time, + build, + media, + nic_port_list, + video_format, + test_time_multipler, + output_format, + test_config, + prepare_ramdisk, +): + # Get TX and RX hosts + host_list = list(hosts.values()) + if len(host_list) < 2: + pytest.skip("Dual tests require at least 2 hosts") + + tx_host = host_list[0] + rx_host = host_list[1] + + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_rx_ffmpeg_tx_ffmpeg_dual_{video_format}_{output_format}" + ) + + video_file = yuv_files[video_format] + + ffmpeg_app.execute_dual_test( + test_time=test_time * test_time_multipler, + build=build, + tx_host=tx_host, + rx_host=rx_host, + type_="frame", + video_format=video_format, + pg_format=video_file["format"], + video_url=os.path.join(media, video_file["filename"]), + output_format=output_format, + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_ffmpeg_rgb24_dual.py b/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_ffmpeg_rgb24_dual.py new file mode 100755 index 000000000..56ebfe24d --- /dev/null +++ b/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_ffmpeg_rgb24_dual.py @@ -0,0 +1,55 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import os + +import pytest +from mtl_engine import ffmpeg_app +from mtl_engine.media_files import yuv_files + + +@pytest.mark.parametrize( + "video_format, test_time_mutlipler", + [ + ("i1080p25", 1), + ("i1080p30", 1), + ("i1080p60", 2), + ("i2160p30", 2), + ("i2160p60", 3), + ], +) +def test_rx_ffmpeg_tx_ffmpeg_rgb24_dual( + hosts, + test_time, + build, + media, + nic_port_list, + video_format, + test_time_mutlipler, + test_config, + prepare_ramdisk, +): + # Get TX and RX hosts + host_list = list(hosts.values()) + if len(host_list) < 2: + pytest.skip("Dual tests require at least 2 hosts") + + tx_host = host_list[0] + rx_host = host_list[1] + + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = f"test_rx_ffmpeg_tx_ffmpeg_rgb24_dual_{video_format}" + + video_file = yuv_files[video_format] + + ffmpeg_app.execute_dual_test_rgb24( + test_time=test_time * test_time_mutlipler, + build=build, + tx_host=tx_host, + rx_host=rx_host, + type_="frame", + video_format=video_format, + pg_format=video_file["format"], + video_url=os.path.join(media, video_file["filename"]), + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_ffmpeg_rgb24_multiple_dual.py b/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_ffmpeg_rgb24_multiple_dual.py new file mode 100755 index 000000000..e831afd33 --- /dev/null +++ b/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_ffmpeg_rgb24_multiple_dual.py @@ -0,0 +1,64 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import os + +import pytest +from mtl_engine import ffmpeg_app +from mtl_engine.media_files import yuv_files + + +@pytest.mark.parametrize( + "video_format_1, video_format_2, test_time_mutlipler", + [ + ("i1080p25", "i1080p25", 2), + ("i1080p30", "i1080p30", 2), + ("i1080p60", "i1080p60", 4), + ("i1080p60", "i1080p50", 4), + ("i1080p50", "i1080p30", 3), + ("i1080p25", "i1080p50", 3), + ("i1080p25", "i1080p60", 3), + ], +) +def test_rx_ffmpeg_tx_ffmpeg_rgb24_multiple_dual( + hosts, + test_time, + build, + media, + nic_port_list, + video_format_1, + video_format_2, + test_time_mutlipler, + test_config, + prepare_ramdisk, +): + # Get TX and RX hosts + host_list = list(hosts.values()) + if len(host_list) < 2: + pytest.skip("Dual tests require at least 2 hosts") + + tx_host = host_list[0] + rx_host = host_list[1] + + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_rx_ffmpeg_tx_ffmpeg_rgb24_multiple_dual_{video_format_1}_{video_format_2}" + ) + + video_file_1 = yuv_files[video_format_1] + video_file_2 = yuv_files[video_format_2] + + ffmpeg_app.execute_dual_test_rgb24_multiple( + test_time=test_time * test_time_mutlipler, + build=build, + tx_host=tx_host, + rx_host=rx_host, + type_="frame", + video_format_list=[video_format_1, video_format_2], + pg_format=video_file_1["format"], + video_url_list=[ + os.path.join(media, video_file_1["filename"]), + os.path.join(media, video_file_2["filename"]), + ], + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_rxtxapp_dual.py b/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_rxtxapp_dual.py new file mode 100755 index 000000000..2ff984aae --- /dev/null +++ b/tests/validation/tests/dual/ffmpeg/test_rx_ffmpeg_tx_rxtxapp_dual.py @@ -0,0 +1,66 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import os + +import pytest +from mtl_engine import ffmpeg_app +from mtl_engine.media_files import yuv_files + + +@pytest.mark.parametrize( + "video_format, multiple_sessions, test_time_multipler", + [ + ("i1080p25", False, 1), + ("i1080p30", False, 1), + ("i1080p60", False, 2), + ("i2160p25", False, 2), + ("i2160p30", False, 2), + ("i2160p60", False, 2), + ("i1080p25", True, 3), + ("i1080p30", True, 3), + ], +) +@pytest.mark.parametrize("output_format", ["yuv", "h264"]) +def test_rx_ffmpeg_tx_rxtxapp_dual( + hosts, + test_time, + build, + media, + nic_port_list, + video_format, + multiple_sessions, + test_time_multipler, + output_format, + test_config, + prepare_ramdisk, +): + # Get TX and RX hosts + host_list = list(hosts.values()) + if len(host_list) < 2: + pytest.skip("Dual tests require at least 2 hosts") + + tx_host = host_list[0] + rx_host = host_list[1] + + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_rx_ffmpeg_tx_rxtxapp_dual_{video_format}_{output_format}_{multiple_sessions}_{test_time_multipler}" + ) + + video_file = yuv_files[video_format] + + ffmpeg_app.execute_dual_test( + test_time=test_time * test_time_multipler, + build=build, + tx_host=tx_host, + rx_host=rx_host, + type_="frame", + video_format=video_format, + pg_format=video_file["format"], + video_url=os.path.join(media, video_file["filename"]), + output_format=output_format, + multiple_sessions=multiple_sessions, + tx_is_ffmpeg=False, # This test uses RxTxApp for TX + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/dual/gstreamer/__init__.py b/tests/validation/tests/dual/gstreamer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/validation/tests/dual/gstreamer/anc_format/__init__.py b/tests/validation/tests/dual/gstreamer/anc_format/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/validation/tests/dual/gstreamer/anc_format/test_anc_format_dual.py b/tests/validation/tests/dual/gstreamer/anc_format/test_anc_format_dual.py new file mode 100644 index 000000000..71e489252 --- /dev/null +++ b/tests/validation/tests/dual/gstreamer/anc_format/test_anc_format_dual.py @@ -0,0 +1,70 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import os + +import mtl_engine.media_creator as media_create +import pytest +from mtl_engine import GstreamerApp + + +@pytest.mark.parametrize("fps", [24, 25, 30, 50, 60]) +@pytest.mark.parametrize("file_size_kb", [10, 100]) +@pytest.mark.parametrize("framebuff", [3]) +def test_st40p_fps_size_dual( + hosts, + build, + media, + nic_port_list, + file_size_kb, + fps, + framebuff, + test_time, + test_config, + prepare_ramdisk, +): + """Test GStreamer ST40P ANC format in dual host configuration.""" + # Get TX and RX hosts + host_list = list(hosts.values()) + if len(host_list) < 2: + pytest.skip("Dual tests require at least 2 hosts") + + tx_host = host_list[0] + rx_host = host_list[1] + + # Create input file on TX host + input_file_path = media_create.create_text_file( + size_kb=file_size_kb, + output_path=os.path.join(media, "test_anc.txt"), + host=tx_host, + ) + + capture_cfg = dict(test_config.get("capture_cfg", {})) if test_config else {} + capture_cfg["test_name"] = ( + f"test_st40p_fps_size_dual_{fps}_{file_size_kb}kb_{framebuff}" + ) + + try: + result = GstreamerApp.execute_dual_st40_test( + build=build, + tx_nic_port=tx_host.vfs[0], + rx_nic_port=rx_host.vfs[0], + input_path=input_file_path, + payload_type=113, + queues=4, + tx_framebuff_cnt=framebuff, + tx_fps=fps, + tx_did=67, + tx_sdid=2, + timeout=40000, + test_time=test_time, + tx_host=tx_host, + rx_host=rx_host, + capture_cfg=capture_cfg, + ) + + assert result, f"GStreamer dual ST40P test failed for fps={fps}, size={file_size_kb}KB" + + finally: + # Remove the input file on TX host + media_create.remove_file(input_file_path, host=tx_host) diff --git a/tests/validation/tests/dual/gstreamer/audio_format/__init__.py b/tests/validation/tests/dual/gstreamer/audio_format/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/validation/tests/dual/gstreamer/audio_format/test_audio_format_dual.py b/tests/validation/tests/dual/gstreamer/audio_format/test_audio_format_dual.py new file mode 100644 index 000000000..b7708c571 --- /dev/null +++ b/tests/validation/tests/dual/gstreamer/audio_format/test_audio_format_dual.py @@ -0,0 +1,74 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import os + +import mtl_engine.media_creator as media_create +import pytest +from mtl_engine import GstreamerApp + + +@pytest.mark.parametrize("audio_format", ["s8", "s16le", "s24le"]) +@pytest.mark.parametrize("audio_channel", [1, 2]) +@pytest.mark.parametrize("audio_rate", [44100, 48000, 96000]) +def test_audio_format_dual( + hosts, + build, + media, + nic_port_list, + audio_format, + audio_channel, + audio_rate, + test_time, + test_config, + prepare_ramdisk, +): + """Test GStreamer ST30 audio format in dual host configuration.""" + # Get TX and RX hosts + host_list = list(hosts.values()) + if len(host_list) < 2: + pytest.skip("Dual tests require at least 2 hosts") + + tx_host = host_list[0] + rx_host = host_list[1] + + # Create input file on TX host + input_file_path = os.path.join(media, "test_audio.pcm") + + media_create.create_audio_file_sox( + sample_rate=audio_rate, + channels=audio_channel, + bit_depth=GstreamerApp.audio_format_change(audio_format), + duration=10, + frequency=440, + output_path=input_file_path, + host=tx_host, + ) + + capture_cfg = dict(test_config.get("capture_cfg", {})) if test_config else {} + capture_cfg["test_name"] = ( + f"test_audio_format_dual_{audio_format}_{audio_channel}_{audio_rate}" + ) + + try: + result = GstreamerApp.execute_dual_st30_test( + build=build, + tx_nic_port=tx_host.vfs[0], + rx_nic_port=rx_host.vfs[0], + input_path=input_file_path, + payload_type=111, + queues=4, + audio_format=audio_format, + channels=audio_channel, + sampling=audio_rate, + test_time=test_time, + tx_host=tx_host, + rx_host=rx_host, + capture_cfg=capture_cfg, + ) + + assert result, f"GStreamer dual audio format test failed for format {audio_format}" + + finally: + # Remove the input file on TX host + media_create.remove_file(input_file_path, host=tx_host) diff --git a/tests/validation/tests/dual/gstreamer/video_format/__init__.py b/tests/validation/tests/dual/gstreamer/video_format/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/validation/tests/dual/gstreamer/video_format/test_video_format_dual.py b/tests/validation/tests/dual/gstreamer/video_format/test_video_format_dual.py new file mode 100644 index 000000000..f88093b1d --- /dev/null +++ b/tests/validation/tests/dual/gstreamer/video_format/test_video_format_dual.py @@ -0,0 +1,72 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import os + +import mtl_engine.media_creator as media_create +import pytest +from mtl_engine import GstreamerApp +from mtl_engine.media_files import gstreamer_formats + + +@pytest.mark.parametrize("file", gstreamer_formats.keys()) +def test_video_format_dual( + hosts, + build, + media, + nic_port_list, + file, + test_time, + test_config, + prepare_ramdisk, +): + """ + Test GStreamer ST20P video format in dual host configuration using single host setup functions. + This test now reuses single host pipeline setup functions with dual host networking updates. + """ + video_file = gstreamer_formats[file] + + # Get TX and RX hosts + host_list = list(hosts.values()) + if len(host_list) < 2: + pytest.skip("Dual tests require at least 2 hosts") + + tx_host = host_list[0] + rx_host = host_list[1] + + # Create input file on TX host + input_file_path = media_create.create_video_file( + width=video_file["width"], + height=video_file["height"], + framerate=video_file["fps"], + format=GstreamerApp.video_format_change(video_file["format"]), + media_path=media, + host=tx_host, + ) + + capture_cfg = dict(test_config.get("capture_cfg", {})) if test_config else {} + capture_cfg["test_name"] = f"test_video_format_dual_{file}" + + try: + result = GstreamerApp.execute_dual_st20p_test( + build=build, + tx_nic_port=tx_host.vfs[0], + rx_nic_port=rx_host.vfs[0], + input_path=input_file_path, + width=video_file["width"], + height=video_file["height"], + framerate=video_file["fps"], + format=GstreamerApp.video_format_change(video_file["format"]), + payload_type=112, + queues=4, + test_time=test_time, + tx_host=tx_host, + rx_host=rx_host, + capture_cfg=capture_cfg, + ) + + assert result, f"GStreamer dual video format test failed for format {file}" + + finally: + # Remove the input file on TX host + media_create.remove_file(input_file_path, host=tx_host) diff --git a/tests/validation/tests/dual/gstreamer/video_resolution/__init__.py b/tests/validation/tests/dual/gstreamer/video_resolution/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/validation/tests/dual/gstreamer/video_resolution/test_video_resolution_dual.py b/tests/validation/tests/dual/gstreamer/video_resolution/test_video_resolution_dual.py new file mode 100644 index 000000000..340f217ab --- /dev/null +++ b/tests/validation/tests/dual/gstreamer/video_resolution/test_video_resolution_dual.py @@ -0,0 +1,84 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import os + +import mtl_engine.media_creator as media_create +import pytest +from mtl_engine import GstreamerApp +from mtl_engine.media_files import yuv_files +from tests.xfail import SDBQ1971_conversion_v210_720p_error + + +@pytest.mark.parametrize( + "file", + [ + pytest.param(f, marks=pytest.mark.smoke) if f == "i1080p59" else f + for f in yuv_files.keys() + ], +) +def test_video_resolutions_dual( + hosts, + build, + media, + nic_port_list, + file, + request, + test_time, + test_config, + prepare_ramdisk, +): + """Test GStreamer ST20P video resolution in dual host configuration.""" + video_file = yuv_files[file] + video_file["format"] = "v210" + + # Get TX and RX hosts + host_list = list(hosts.values()) + if len(host_list) < 2: + pytest.skip("Dual tests require at least 2 hosts") + + tx_host = host_list[0] + rx_host = host_list[1] + + SDBQ1971_conversion_v210_720p_error( + video_format=video_file["format"], + resolution_width=video_file["height"], + request=request, + ) + + # Create input file on TX host + input_file_path = media_create.create_video_file( + width=video_file["width"], + height=video_file["height"], + framerate=video_file["fps"], + format=GstreamerApp.video_format_change(video_file["format"]), + media_path=media, + host=tx_host, + ) + + capture_cfg = dict(test_config.get("capture_cfg", {})) if test_config else {} + capture_cfg["test_name"] = f"test_video_resolutions_dual_{file}" + + try: + result = GstreamerApp.execute_dual_st20p_test( + build=build, + tx_nic_port=tx_host.vfs[0], + rx_nic_port=rx_host.vfs[0], + input_path=input_file_path, + width=video_file["width"], + height=video_file["height"], + framerate=video_file["fps"], + format=GstreamerApp.video_format_change(video_file["format"]), + payload_type=112, + queues=4, + test_time=test_time, + tx_host=tx_host, + rx_host=rx_host, + capture_cfg=capture_cfg, + ) + + assert result, f"GStreamer dual video resolution test failed for resolution {file}" + + finally: + # Remove the input file on TX host + media_create.remove_file(input_file_path, host=tx_host)