From 25c2d138786ee00120b4cd93294c619de6a9299d Mon Sep 17 00:00:00 2001 From: danceratopz Date: Wed, 18 Jun 2025 05:27:54 +0200 Subject: [PATCH 1/5] feat(consume): add initial implementation of consume enginex Also removes a file that was unintentionally added in #1718. --- src/cli/pytest_commands/consume.py | 18 +- src/cli/pytest_commands/processors.py | 6 + src/pytest_plugins/consume/consume.py | 11 +- .../consume/hive_engine_test/__init__.py | 1 - .../consume/hive_simulators_reorg/__init__.py | 1 - .../consume/simulators/engine/__init__.py | 2 +- .../consume/simulators/engine/conftest.py | 30 +- .../consume/simulators/enginex/__init__.py | 1 + .../consume/simulators/enginex/conftest.py | 54 +++ .../simulators/helpers/client_wrapper.py | 425 ++++++++++++++++++ .../consume/simulators/multi_test_client.py | 160 +++++++ .../consume/simulators/rlp/__init__.py | 2 +- .../simulator_logic/test_via_engine.py | 27 +- .../consume/simulators/single_test_client.py | 2 +- whitelist.txt | 3 +- 15 files changed, 703 insertions(+), 40 deletions(-) delete mode 100644 src/pytest_plugins/consume/hive_engine_test/__init__.py delete mode 100644 src/pytest_plugins/consume/hive_simulators_reorg/__init__.py create mode 100644 src/pytest_plugins/consume/simulators/enginex/__init__.py create mode 100644 src/pytest_plugins/consume/simulators/enginex/conftest.py create mode 100644 src/pytest_plugins/consume/simulators/helpers/client_wrapper.py create mode 100644 src/pytest_plugins/consume/simulators/multi_test_client.py diff --git a/src/cli/pytest_commands/consume.py b/src/cli/pytest_commands/consume.py index 5e14413c80b..15b23afa0bf 100644 --- a/src/cli/pytest_commands/consume.py +++ b/src/cli/pytest_commands/consume.py @@ -44,9 +44,13 @@ def get_command_logic_test_paths(command_name: str, is_hive: bool) -> List[Path] command_logic_test_paths = [ base_path / "simulators" / "simulator_logic" / f"test_via_{cmd}.py" for cmd in commands ] - elif command_name in ["engine", "rlp"]: + elif command_name in ["engine", "enginex"]: command_logic_test_paths = [ - base_path / "simulators" / "simulator_logic" / f"test_via_{command_name}.py" + base_path / "simulators" / "simulator_logic" / "test_via_engine.py" + ] + elif command_name == "rlp": + command_logic_test_paths = [ + base_path / "simulators" / "simulator_logic" / "test_via_rlp.py" ] elif command_name == "direct": command_logic_test_paths = [base_path / "direct" / "test_via_direct.py"] @@ -103,13 +107,19 @@ def rlp() -> None: @consume_command(is_hive=True) def engine() -> None: - """Client consumes via the Engine API.""" + """Client consumes Engine Fixtures via the Engine API.""" + pass + + +@consume_command(is_hive=True) +def enginex() -> None: + """Client consumes Engine X Fixtures via the Engine API.""" pass @consume_command(is_hive=True) def hive() -> None: - """Client consumes via all available hive methods (rlp, engine).""" + """Client consumes via rlp & engine hive methods.""" pass diff --git a/src/cli/pytest_commands/processors.py b/src/cli/pytest_commands/processors.py index e7948962704..71a13152a5f 100644 --- a/src/cli/pytest_commands/processors.py +++ b/src/cli/pytest_commands/processors.py @@ -101,6 +101,12 @@ def process_args(self, args: List[str]) -> List[str]: if self.command_name == "engine": modified_args.extend(["-p", "pytest_plugins.consume.simulators.engine.conftest"]) + elif self.command_name == "enginex": + modified_args.extend(["-p", "pytest_plugins.consume.simulators.enginex.conftest"]) + if ( + self._has_parallelism_flag(args) or "-n" in modified_args + ) and "--dist" not in modified_args: + modified_args.extend(["--dist=loadgroup"]) elif self.command_name == "rlp": modified_args.extend(["-p", "pytest_plugins.consume.simulators.rlp.conftest"]) else: diff --git a/src/pytest_plugins/consume/consume.py b/src/pytest_plugins/consume/consume.py index ffe2cecb9e1..d070143af2a 100644 --- a/src/pytest_plugins/consume/consume.py +++ b/src/pytest_plugins/consume/consume.py @@ -495,11 +495,18 @@ def pytest_generate_tests(metafunc): if test_case.format.format_name not in metafunc.config._supported_fixture_formats: continue fork_markers = get_relative_fork_markers(test_case.fork, strict_mode=False) + + # Append pre_hash (first 8 chars) to test ID for easier selection with --sim.limit + test_id = test_case.id + if hasattr(test_case, "pre_hash") and test_case.pre_hash: + test_id = f"{test_case.id}[{test_case.pre_hash[:8]}]" + param = pytest.param( test_case, - id=test_case.id, + id=test_id, marks=[getattr(pytest.mark, m) for m in fork_markers] - + [getattr(pytest.mark, test_case.format.format_name)], + + [getattr(pytest.mark, test_case.format.format_name)] + + [pytest.mark.xdist_group(name=test_case.pre_hash)], ) param_list.append(param) diff --git a/src/pytest_plugins/consume/hive_engine_test/__init__.py b/src/pytest_plugins/consume/hive_engine_test/__init__.py deleted file mode 100644 index 2d1322a5332..00000000000 --- a/src/pytest_plugins/consume/hive_engine_test/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Hive engine test consumer plugin.""" diff --git a/src/pytest_plugins/consume/hive_simulators_reorg/__init__.py b/src/pytest_plugins/consume/hive_simulators_reorg/__init__.py deleted file mode 100644 index 59ca949d150..00000000000 --- a/src/pytest_plugins/consume/hive_simulators_reorg/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Hive simulators reorganization consumer plugin.""" diff --git a/src/pytest_plugins/consume/simulators/engine/__init__.py b/src/pytest_plugins/consume/simulators/engine/__init__.py index 2cb194bb7d7..d0b290cdd44 100644 --- a/src/pytest_plugins/consume/simulators/engine/__init__.py +++ b/src/pytest_plugins/consume/simulators/engine/__init__.py @@ -1 +1 @@ -"""Consume Engine test functions.""" +"""The consume engine simulator.""" diff --git a/src/pytest_plugins/consume/simulators/engine/conftest.py b/src/pytest_plugins/consume/simulators/engine/conftest.py index e7cda8ffa7d..e01a0fbdc65 100644 --- a/src/pytest_plugins/consume/simulators/engine/conftest.py +++ b/src/pytest_plugins/consume/simulators/engine/conftest.py @@ -1,8 +1,4 @@ -""" -Pytest fixtures for the `consume engine` simulator. - -Configures the hive back-end & EL clients for each individual test execution. -""" +"""Pytest plugin for the `consume engine` simulator.""" import io from typing import Mapping @@ -29,6 +25,18 @@ def pytest_configure(config): config._supported_fixture_formats = [BlockchainEngineFixture.format_name] +@pytest.fixture(scope="module") +def test_suite_name() -> str: + """The name of the hive test suite used in this simulator.""" + return "eest/consume-engine" + + +@pytest.fixture(scope="module") +def test_suite_description() -> str: + """The description of the hive test suite used in this simulator.""" + return "Execute blockchain tests against clients using the Engine API." + + @pytest.fixture(scope="function") def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) -> EngineRPC: """Initialize engine RPC client for the execution client under test.""" @@ -42,18 +50,6 @@ def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) return EngineRPC(f"http://{client.ip}:8551") -@pytest.fixture(scope="module") -def test_suite_name() -> str: - """The name of the hive test suite used in this simulator.""" - return "eest/consume-engine" - - -@pytest.fixture(scope="module") -def test_suite_description() -> str: - """The description of the hive test suite used in this simulator.""" - return "Execute blockchain tests against clients using the Engine API." - - @pytest.fixture(scope="function") def client_files(buffered_genesis: io.BufferedReader) -> Mapping[str, io.BufferedReader]: """Define the files that hive will start the client with.""" diff --git a/src/pytest_plugins/consume/simulators/enginex/__init__.py b/src/pytest_plugins/consume/simulators/enginex/__init__.py new file mode 100644 index 00000000000..4ffa1ec0af5 --- /dev/null +++ b/src/pytest_plugins/consume/simulators/enginex/__init__.py @@ -0,0 +1 @@ +"""The consume enginex simulator.""" diff --git a/src/pytest_plugins/consume/simulators/enginex/conftest.py b/src/pytest_plugins/consume/simulators/enginex/conftest.py new file mode 100644 index 00000000000..058415e422f --- /dev/null +++ b/src/pytest_plugins/consume/simulators/enginex/conftest.py @@ -0,0 +1,54 @@ +""" +Pytest fixtures for the `consume enginex` simulator. + +Configures the hive back-end & EL clients for test execution with BlockchainEngineXFixtures. +""" + +import pytest +from hive.client import Client + +from ethereum_test_exceptions import ExceptionMapper +from ethereum_test_fixtures import BlockchainEngineXFixture +from ethereum_test_rpc import EngineRPC + +pytest_plugins = ( + "pytest_plugins.pytest_hive.pytest_hive", + "pytest_plugins.consume.simulators.base", + "pytest_plugins.consume.simulators.multi_test_client", + "pytest_plugins.consume.simulators.test_case_description", + "pytest_plugins.consume.simulators.timing_data", + "pytest_plugins.consume.simulators.exceptions", +) + + +def pytest_configure(config): + """Set the supported fixture formats for the engine simulator.""" + config._supported_fixture_formats = [BlockchainEngineXFixture.format_name] + + +@pytest.fixture(scope="module") +def test_suite_name() -> str: + """The name of the hive test suite used in this simulator.""" + return "eest/consume-enginex" + + +@pytest.fixture(scope="module") +def test_suite_description() -> str: + """The description of the hive test suite used in this simulator.""" + return ( + "Execute blockchain tests against clients using the Engine API with " + "pre-allocation group optimization using Engine X fixtures." + ) + + +@pytest.fixture(scope="function") +def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) -> EngineRPC: + """Initialize engine RPC client for the execution client under test.""" + if client_exception_mapper: + return EngineRPC( + f"http://{client.ip}:8551", + response_validation_context={ + "exception_mapper": client_exception_mapper, + }, + ) + return EngineRPC(f"http://{client.ip}:8551") diff --git a/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py b/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py new file mode 100644 index 00000000000..c82c6fd3ed3 --- /dev/null +++ b/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py @@ -0,0 +1,425 @@ +"""Client wrapper classes for managing client lifecycle in engine simulators.""" + +import io +import json +import logging +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Dict, Optional, cast + +from hive.client import Client, ClientType + +from ethereum_test_base_types import Number, to_json +from ethereum_test_fixtures import BlockchainFixtureCommon +from ethereum_test_fixtures.pre_alloc_groups import PreAllocGroup +from ethereum_test_forks import Fork +from pytest_plugins.consume.simulators.helpers.ruleset import ruleset + +logger = logging.getLogger(__name__) + + +class ClientWrapper(ABC): + """ + Abstract base class for managing client instances in engine simulators. + + This class encapsulates the common logic for generating genesis configurations, + environment variables, and client files needed to start a client. + """ + + def __init__(self, client_type: ClientType): + """ + Initialize the client wrapper. + + Args: + client_type: The type of client to manage + + """ + self.client_type = client_type + self.client: Optional[Client] = None + self._is_started = False + self.test_count = 0 + + @abstractmethod + def _get_fork(self) -> Fork: + """Get the fork for this client.""" + pass + + @abstractmethod + def _get_chain_id(self) -> int: + """Get the chain ID for this client.""" + pass + + @abstractmethod + def _get_pre_alloc(self) -> dict: + """Get the pre-allocation for this client.""" + pass + + @abstractmethod + def _get_genesis_header(self) -> dict: + """Get the genesis header for this client.""" + pass + + def get_genesis_config(self) -> dict: + """ + Get the genesis configuration for this client. + + Returns: + Genesis configuration dict + + """ + # Convert genesis header to JSON format + genesis = self._get_genesis_header() + + # Convert pre-allocation to JSON format + alloc = self._get_pre_alloc() + + # NOTE: nethermind requires account keys without '0x' prefix + genesis["alloc"] = {k.replace("0x", ""): v for k, v in alloc.items()} + + return genesis + + def get_environment(self) -> dict: + """ + Get the environment variables for this client. + + Returns: + Environment variables dict + + """ + fork = self._get_fork() + chain_id = self._get_chain_id() + + assert fork in ruleset, f"fork '{fork}' missing in hive ruleset" + + # Set check live port for engine simulator + check_live_port = 8551 # Engine API port + + return { + "HIVE_CHAIN_ID": str(Number(chain_id)), + "HIVE_FORK_DAO_VOTE": "1", + "HIVE_NODETYPE": "full", + "HIVE_CHECK_LIVE_PORT": str(check_live_port), + **{k: f"{v:d}" for k, v in ruleset[fork].items()}, + } + + def get_client_files(self) -> dict: + """ + Get the client files dict needed for start_client(). + + Returns: + Dict with genesis.json file + + """ + # Create buffered genesis file + genesis_config = self.get_genesis_config() + genesis_json = json.dumps(genesis_config) + genesis_bytes = genesis_json.encode("utf-8") + buffered_genesis = io.BufferedReader(cast(io.RawIOBase, io.BytesIO(genesis_bytes))) + + return {"/genesis.json": buffered_genesis} + + def set_client(self, client: Client) -> None: + """ + Set the client instance after it has been started. + + Args: + client: The started client instance + + """ + if self._is_started: + raise RuntimeError(f"Client {self.client_type.name} is already set") + + self.client = client + self._is_started = True + logger.info(f"Client ({self.client_type.name}) registered") + + def increment_test_count(self) -> None: + """Increment the count of tests that have used this client.""" + self.test_count += 1 + logger.debug(f"Test count for {self.client_type.name}: {self.test_count}") + + def stop(self) -> None: + """Mark the client as stopped.""" + if self._is_started: + logger.info( + f"Marking client ({self.client_type.name}) as stopped after {self.test_count} " + "tests." + ) + self.client = None + self._is_started = False + + @property + def is_running(self) -> bool: + """Check if the client is currently running.""" + return self._is_started and self.client is not None + + +class RestartClient(ClientWrapper): + """ + Client wrapper for the restart simulator where clients restart for each test. + + This class manages clients that are started and stopped for each individual test, + providing complete isolation between test executions. + """ + + def __init__(self, client_type: ClientType, fixture: BlockchainFixtureCommon): + """ + Initialize a restart client wrapper. + + Args: + client_type: The type of client to manage + fixture: The blockchain fixture for this test + + """ + super().__init__(client_type) + self.fixture = fixture + + def _get_fork(self) -> Fork: + """Get the fork from the fixture.""" + return self.fixture.fork + + def _get_chain_id(self) -> int: + """Get the chain ID from the fixture config.""" + return self.fixture.config.chain_id + + def _get_pre_alloc(self) -> dict: + """Get the pre-allocation from the fixture.""" + return to_json(self.fixture.pre) + + def _get_genesis_header(self) -> dict: + """Get the genesis header from the fixture.""" + return to_json(self.fixture.genesis) + + +class MultiTestClient(ClientWrapper): + """ + Client wrapper for multi-test execution where clients are used across tests. + + This class manages clients that are reused across multiple tests in the same + pre-allocation group. + """ + + def __init__( + self, + pre_hash: str, + client_type: ClientType, + pre_alloc_group: PreAllocGroup, + ): + """ + Initialize a multi-test client wrapper. + + Args: + pre_hash: The hash identifying the pre-allocation group + client_type: The type of client to manage + pre_alloc_group: The pre-allocation group data for this group + + """ + super().__init__(client_type) + self.pre_hash = pre_hash + self.pre_alloc_group = pre_alloc_group + + def _get_fork(self) -> Fork: + """Get the fork from the pre-allocation group.""" + return self.pre_alloc_group.fork + + def _get_chain_id(self) -> int: + """Get the chain ID from the pre-allocation group environment.""" + # TODO: Environment doesn't have chain_id field - see work_in_progress.md + return 1 + + def _get_pre_alloc(self) -> dict: + """Get the pre-allocation from the pre-allocation group.""" + return to_json(self.pre_alloc_group.pre) + + def _get_genesis_header(self) -> dict: + """Get the genesis header from the pre-allocation group.""" + return self.pre_alloc_group.genesis.model_dump(by_alias=True) + + def set_client(self, client: Client) -> None: + """Override to log with pre_hash information.""" + if self._is_started: + raise RuntimeError(f"Client for pre-allocation group {self.pre_hash} is already set") + + self.client = client + self._is_started = True + logger.info( + f"Multi-test client ({self.client_type.name}) registered for pre-allocation group " + f"{self.pre_hash}" + ) + + def stop(self) -> None: + """Override to log with pre_hash information.""" + if self._is_started: + logger.info( + f"Marking multi-test client ({self.client_type.name}) for pre-allocation group " + f"{self.pre_hash} as stopped after {self.test_count} tests" + ) + self.client = None + self._is_started = False + + +class MultiTestClientManager: + """ + Singleton manager for coordinating multi-test clients across test execution. + + This class tracks all multi-test clients by their preHash and ensures proper + lifecycle management including cleanup at session end. + """ + + _instance: Optional["MultiTestClientManager"] = None + _initialized: bool + + def __new__(cls) -> "MultiTestClientManager": + """Ensure only one instance of MultiTestClientManager exists.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self) -> None: + """Initialize the manager if not already initialized.""" + if hasattr(self, "_initialized") and self._initialized: + return + + self.multi_test_clients: Dict[str, MultiTestClient] = {} + self.pre_alloc_path: Optional[Path] = None + self._initialized = True + logger.info("MultiTestClientManager initialized") + + def set_pre_alloc_path(self, path: Path) -> None: + """ + Set the path to the pre-allocation files directory. + + Args: + path: Path to the directory containing pre-allocation JSON files + + """ + self.pre_alloc_path = path + logger.debug(f"Pre-alloc path set to: {path}") + + def load_pre_alloc_group(self, pre_hash: str) -> PreAllocGroup: + """ + Load the pre-allocation group for a given preHash. + + Args: + pre_hash: The hash identifying the pre-allocation group + + Returns: + The loaded PreAllocGroup + + Raises: + RuntimeError: If pre-alloc path is not set + FileNotFoundError: If pre-allocation file is not found + + """ + if self.pre_alloc_path is None: + raise RuntimeError("Pre-alloc path not set in MultiTestClientManager") + + pre_alloc_file = self.pre_alloc_path / f"{pre_hash}.json" + if not pre_alloc_file.exists(): + raise FileNotFoundError(f"Pre-allocation file not found: {pre_alloc_file}") + + return PreAllocGroup.model_validate_json(pre_alloc_file.read_text()) + + def get_or_create_multi_test_client( + self, + pre_hash: str, + client_type: ClientType, + ) -> MultiTestClient: + """ + Get an existing MultiTestClient or create a new one for the given preHash. + + This method doesn't start the actual client - that's done by HiveTestSuite. + It just manages the MultiTestClient wrapper objects. + + Args: + pre_hash: The hash identifying the pre-allocation group + client_type: The type of client that will be started + + Returns: + The MultiTestClient wrapper instance + + """ + # Check if we already have a MultiTestClient for this preHash + if pre_hash in self.multi_test_clients: + multi_test_client = self.multi_test_clients[pre_hash] + if multi_test_client.is_running: + logger.debug(f"Found existing MultiTestClient for pre-allocation group {pre_hash}") + return multi_test_client + else: + # MultiTestClient exists but isn't running, remove it + logger.warning( + f"Found stopped MultiTestClient for pre-allocation group {pre_hash}, removing" + ) + del self.multi_test_clients[pre_hash] + + # Load the pre-allocation group for this group + pre_alloc_group = self.load_pre_alloc_group(pre_hash) + + # Create new MultiTestClient wrapper + multi_test_client = MultiTestClient( + pre_hash=pre_hash, + client_type=client_type, + pre_alloc_group=pre_alloc_group, + ) + + # Track the MultiTestClient + self.multi_test_clients[pre_hash] = multi_test_client + + logger.info( + f"Created new MultiTestClient wrapper for pre-allocation group {pre_hash} " + f"(total tracked clients: {len(self.multi_test_clients)})" + ) + + return multi_test_client + + def get_client_for_test(self, pre_hash: str) -> Optional[Client]: + """ + Get the actual client instance for a test with the given preHash. + + Args: + pre_hash: The hash identifying the pre-allocation group + + Returns: + The client instance if available, None otherwise + + """ + if pre_hash in self.multi_test_clients: + multi_test_client = self.multi_test_clients[pre_hash] + if multi_test_client.is_running: + multi_test_client.increment_test_count() + return multi_test_client.client + return None + + def stop_all_clients(self) -> None: + """Mark all multi-test clients as stopped.""" + logger.info(f"Marking all {len(self.multi_test_clients)} multi-test clients as stopped") + + for pre_hash, multi_test_client in list(self.multi_test_clients.items()): + try: + multi_test_client.stop() + except Exception as e: + logger.error( + f"Error stopping MultiTestClient for pre-allocation group {pre_hash}: {e}" + ) + finally: + del self.multi_test_clients[pre_hash] + + logger.info("All MultiTestClient wrappers cleared") + + def get_client_count(self) -> int: + """Get the number of tracked multi-test clients.""" + return len(self.multi_test_clients) + + def get_test_counts(self) -> Dict[str, int]: + """Get test counts for each multi-test client.""" + return { + pre_hash: client.test_count for pre_hash, client in self.multi_test_clients.items() + } + + def reset(self) -> None: + """Reset the manager, clearing all state.""" + self.stop_all_clients() + self.multi_test_clients.clear() + self.pre_alloc_path = None + logger.info("MultiTestClientManager reset") diff --git a/src/pytest_plugins/consume/simulators/multi_test_client.py b/src/pytest_plugins/consume/simulators/multi_test_client.py new file mode 100644 index 00000000000..b12bcc0b699 --- /dev/null +++ b/src/pytest_plugins/consume/simulators/multi_test_client.py @@ -0,0 +1,160 @@ +"""Common pytest fixtures for simulators with multi-test client architecture.""" + +import io +import json +import logging +from typing import Dict, Generator, Mapping, cast + +import pytest +from hive.client import Client, ClientType +from hive.testing import HiveTestSuite + +from ethereum_test_base_types import to_json +from ethereum_test_fixtures import BlockchainEngineXFixture +from ethereum_test_fixtures.blockchain import FixtureHeader +from ethereum_test_fixtures.pre_alloc_groups import PreAllocGroup +from pytest_plugins.consume.consume import FixturesSource +from pytest_plugins.consume.simulators.helpers.ruleset import ( + ruleset, # TODO: generate dynamically +) +from pytest_plugins.filler.fixture_output import FixtureOutput + +from .helpers.client_wrapper import MultiTestClientManager +from .helpers.timing import TimingData + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="session") +def pre_alloc_group_cache() -> Dict[str, PreAllocGroup]: + """Cache for pre-allocation groups to avoid reloading from disk.""" + return {} + + +@pytest.fixture(scope="function") +def pre_alloc_group( + fixture: BlockchainEngineXFixture, + fixtures_source: FixturesSource, + pre_alloc_group_cache: Dict[str, PreAllocGroup], +) -> PreAllocGroup: + """Load the pre-allocation group for the current test case.""" + pre_hash = fixture.pre_hash + + # Check cache first + if pre_hash in pre_alloc_group_cache: + return pre_alloc_group_cache[pre_hash] + + # Load from disk + if fixtures_source.is_stdin: + raise ValueError("Pre-allocation groups require file-based fixture input.") + + # Look for pre-allocation group file using FixtureOutput path structure + fixture_output = FixtureOutput(output_path=fixtures_source.path) + pre_alloc_path = fixture_output.pre_alloc_groups_folder_path / f"{pre_hash}.json" + if not pre_alloc_path.exists(): + raise FileNotFoundError(f"Pre-allocation group file not found: {pre_alloc_path}") + + # Load and cache + with open(pre_alloc_path) as f: + pre_alloc_group_obj = PreAllocGroup.model_validate_json(f.read()) + + pre_alloc_group_cache[pre_hash] = pre_alloc_group_obj + return pre_alloc_group_obj + + +def create_environment(pre_alloc_group: PreAllocGroup, check_live_port: int) -> dict: + """Define environment using PreAllocGroup data.""" + fork = pre_alloc_group.fork + assert fork in ruleset, f"fork '{fork}' missing in hive ruleset" + return { + "HIVE_CHAIN_ID": "1", # TODO: Environment doesn't have chain_id - see work_in_progress.md + "HIVE_FORK_DAO_VOTE": "1", + "HIVE_NODETYPE": "full", + "HIVE_CHECK_LIVE_PORT": str(check_live_port), + **{k: f"{v:d}" for k, v in ruleset[fork].items()}, + } + + +def client_files(pre_alloc_group: PreAllocGroup) -> Mapping[str, io.BufferedReader]: + """Define the files that hive will start the client with.""" + genesis = to_json(pre_alloc_group.genesis) # type: ignore + alloc = to_json(pre_alloc_group.pre) + + # NOTE: nethermind requires account keys without '0x' prefix + genesis["alloc"] = {k.replace("0x", ""): v for k, v in alloc.items()} + + genesis_json = json.dumps(genesis) + genesis_bytes = genesis_json.encode("utf-8") + buffered_genesis = io.BufferedReader(cast(io.RawIOBase, io.BytesIO(genesis_bytes))) + + files = {} + files["/genesis.json"] = buffered_genesis + return files + + +@pytest.fixture(scope="session") +def multi_test_client_manager() -> Generator[MultiTestClientManager, None, None]: + """Provide singleton MultiTestClientManager with session cleanup.""" + manager = MultiTestClientManager() + try: + yield manager + finally: + logger.info("Cleaning up multi-test clients at session end...") + manager.stop_all_clients() + + +@pytest.fixture(scope="function") +def genesis_header(pre_alloc_group: PreAllocGroup) -> FixtureHeader: + """Provide the genesis header from the pre-allocation group.""" + return pre_alloc_group.genesis # type: ignore + + +@pytest.fixture(scope="function") +def client( + test_suite: HiveTestSuite, + client_type: ClientType, + total_timing_data: TimingData, + fixture: BlockchainEngineXFixture, + pre_alloc_group: PreAllocGroup, + multi_test_client_manager: MultiTestClientManager, + fixtures_source: FixturesSource, +) -> Generator[Client, None, None]: + """Initialize or reuse multi-test client for the test group.""" + logger.info("🔥 MULTI-TEST CLIENT FIXTURE CALLED - Using multi-test client architecture!") + pre_hash = fixture.pre_hash + + # Set pre-alloc path in manager if not already set + if multi_test_client_manager.pre_alloc_path is None: + fixture_output = FixtureOutput(output_path=fixtures_source.path) + multi_test_client_manager.set_pre_alloc_path(fixture_output.pre_alloc_groups_folder_path) + + # Check for existing client + existing_client = multi_test_client_manager.get_client_for_test(pre_hash) + if existing_client is not None: + logger.info(f"Reusing multi-test client for pre-allocation group {pre_hash}") + yield existing_client + return + + # Start new multi-test client + logger.info(f"Starting multi-test client for pre-allocation group {pre_hash}") + + with total_timing_data.time("Start multi-test client"): + hive_client = test_suite.start_client( + client_type=client_type, + environment=create_environment(pre_alloc_group, 8551), + files=client_files(pre_alloc_group), + ) + + assert hive_client is not None, ( + f"Failed to start multi-test client for pre-allocation group {pre_hash}" + ) + + # Register with manager + multi_test_client = multi_test_client_manager.get_or_create_multi_test_client( + pre_hash=pre_hash, + client_type=client_type, + ) + multi_test_client.set_client(hive_client) + + logger.info(f"Multi-test client ready for pre-allocation group {pre_hash}") + yield hive_client diff --git a/src/pytest_plugins/consume/simulators/rlp/__init__.py b/src/pytest_plugins/consume/simulators/rlp/__init__.py index a76490fc945..f157db34172 100644 --- a/src/pytest_plugins/consume/simulators/rlp/__init__.py +++ b/src/pytest_plugins/consume/simulators/rlp/__init__.py @@ -1 +1 @@ -"""Consume RLP test functions.""" +"""The consume rlp simulator.""" diff --git a/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py b/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py index 26e95719c56..82b1f7d89b6 100644 --- a/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py +++ b/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py @@ -7,8 +7,11 @@ import time +import pytest + from ethereum_test_exceptions import UndefinedException -from ethereum_test_fixtures import BlockchainEngineFixture +from ethereum_test_fixtures import BlockchainEngineFixture, BlockchainEngineXFixture +from ethereum_test_fixtures.blockchain import FixtureHeader from ethereum_test_rpc import EngineRPC, EthRPC from ethereum_test_rpc.types import ForkchoiceState, JSONRPCError, PayloadStatusEnum @@ -28,27 +31,29 @@ def __init__(self, *args: object) -> None: logger.fail(str(self)) +@pytest.mark.usefixtures("hive_test") def test_blockchain_via_engine( timing_data: TimingData, eth_rpc: EthRPC, engine_rpc: EngineRPC, - fixture: BlockchainEngineFixture, + fixture: BlockchainEngineFixture | BlockchainEngineXFixture, + genesis_header: FixtureHeader, strict_exception_matching: bool, ): """ - 1. Check the client genesis block hash matches `fixture.genesis.block_hash`. + 1. Check the client genesis block hash matches `genesis.block_hash`. 2. Execute the test case fixture blocks against the client under test using the `engine_newPayloadVX` method from the Engine API. 3. For valid payloads a forkchoice update is performed to finalize the chain. """ - # Send a initial forkchoice update + # Send an initial forkchoice update with timing_data.time("Initial forkchoice update"): logger.info("Sending initial forkchoice update to genesis block...") delay = 0.5 for attempt in range(3): forkchoice_response = engine_rpc.forkchoice_updated( forkchoice_state=ForkchoiceState( - head_block_hash=fixture.genesis.block_hash, + head_block_hash=genesis_header.block_hash, ), payload_attributes=None, version=fixture.payloads[0].forkchoice_updated_version, @@ -72,14 +77,14 @@ def test_blockchain_via_engine( with timing_data.time("Get genesis block"): logger.info("Calling getBlockByNumber to get genesis block...") - genesis_block = eth_rpc.get_block_by_number(0) - if genesis_block["hash"] != str(fixture.genesis.block_hash): - expected = fixture.genesis.block_hash - got = genesis_block["hash"] + client_genesis_response = eth_rpc.get_block_by_number(0) + if client_genesis_response["hash"] != str(genesis_header.block_hash): + expected = genesis_header.block_hash + got = client_genesis_response["hash"] logger.fail(f"Genesis block hash mismatch. Expected: {expected}, Got: {got}") raise GenesisBlockMismatchExceptionError( - expected_header=fixture.genesis, - got_genesis_block=genesis_block, + expected_header=genesis_header, + got_genesis_block=client_genesis_response, ) with timing_data.time("Payloads execution") as total_payload_timing: diff --git a/src/pytest_plugins/consume/simulators/single_test_client.py b/src/pytest_plugins/consume/simulators/single_test_client.py index 6836c94a6fe..e7967c33248 100644 --- a/src/pytest_plugins/consume/simulators/single_test_client.py +++ b/src/pytest_plugins/consume/simulators/single_test_client.py @@ -57,7 +57,7 @@ def buffered_genesis(client_genesis: dict) -> io.BufferedReader: @pytest.fixture(scope="function") def genesis_header(fixture: BlockchainFixtureCommon) -> FixtureHeader: - """Provide the genesis header from the shared pre-state group.""" + """Provide the genesis header from the pre-allocation group.""" return fixture.genesis # type: ignore diff --git a/whitelist.txt b/whitelist.txt index 777f990dd82..9a74204dffc 100644 --- a/whitelist.txt +++ b/whitelist.txt @@ -1057,7 +1057,8 @@ Typecheck autoformat Typechecking groupstats -SharedPreStateGroup +enginex +loadgroup qube aspell codespell From d9bd4ee664ea21ab0fa54c4b6d8062c9b084adda Mon Sep 17 00:00:00 2001 From: danceratopz Date: Wed, 18 Jun 2025 08:16:27 +0200 Subject: [PATCH 2/5] feat(consume): track tests by pre-alloc group for client cleanup --- .../consume/simulators/enginex/conftest.py | 53 ++++++ .../simulators/helpers/client_wrapper.py | 88 ++++++++- .../simulators/helpers/test_tracker.py | 177 ++++++++++++++++++ .../consume/simulators/multi_test_client.py | 21 ++- 4 files changed, 332 insertions(+), 7 deletions(-) create mode 100644 src/pytest_plugins/consume/simulators/helpers/test_tracker.py diff --git a/src/pytest_plugins/consume/simulators/enginex/conftest.py b/src/pytest_plugins/consume/simulators/enginex/conftest.py index 058415e422f..75b3767f4b9 100644 --- a/src/pytest_plugins/consume/simulators/enginex/conftest.py +++ b/src/pytest_plugins/consume/simulators/enginex/conftest.py @@ -4,6 +4,8 @@ Configures the hive back-end & EL clients for test execution with BlockchainEngineXFixtures. """ +import logging + import pytest from hive.client import Client @@ -11,6 +13,8 @@ from ethereum_test_fixtures import BlockchainEngineXFixture from ethereum_test_rpc import EngineRPC +logger = logging.getLogger(__name__) + pytest_plugins = ( "pytest_plugins.pytest_hive.pytest_hive", "pytest_plugins.consume.simulators.base", @@ -18,6 +22,7 @@ "pytest_plugins.consume.simulators.test_case_description", "pytest_plugins.consume.simulators.timing_data", "pytest_plugins.consume.simulators.exceptions", + "pytest_plugins.consume.simulators.helpers.test_tracker", ) @@ -41,6 +46,54 @@ def test_suite_description() -> str: ) +def pytest_collection_modifyitems(session, config, items): + """ + Build pre-allocation group test counts during collection phase. + + This hook analyzes all collected test items to determine how many tests + belong to each pre-allocation group, enabling automatic client cleanup + when all tests in a group are complete. + """ + # Only process items for enginex simulator + if not hasattr(config, "_supported_fixture_formats"): + return + + if BlockchainEngineXFixture.format_name not in config._supported_fixture_formats: + return + + # Get the test tracker from session if available + test_tracker = getattr(session, "_pre_alloc_group_test_tracker", None) + if test_tracker is None: + # Tracker will be created later by the fixture, store counts on session for now + group_counts = {} + for item in items: + if hasattr(item, "callspec") and "test_case" in item.callspec.params: + test_case = item.callspec.params["test_case"] + if hasattr(test_case, "pre_hash"): + pre_hash = test_case.pre_hash + group_counts[pre_hash] = group_counts.get(pre_hash, 0) + 1 + + # Store on session for later retrieval by test_tracker fixture + session._pre_alloc_group_counts = group_counts + logger.info( + f"Collected {len(group_counts)} pre-allocation groups with tests: {dict(group_counts)}" + ) + else: + # Update tracker directly if it exists + group_counts = {} + for item in items: + if hasattr(item, "callspec") and "test_case" in item.callspec.params: + test_case = item.callspec.params["test_case"] + if hasattr(test_case, "pre_hash"): + pre_hash = test_case.pre_hash + group_counts[pre_hash] = group_counts.get(pre_hash, 0) + 1 + + for pre_hash, count in group_counts.items(): + test_tracker.set_group_test_count(pre_hash, count) + + logger.info(f"Updated test tracker with {len(group_counts)} pre-allocation groups") + + @pytest.fixture(scope="function") def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) -> EngineRPC: """Initialize engine RPC client for the execution client under test.""" diff --git a/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py b/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py index c82c6fd3ed3..74cdf4afc79 100644 --- a/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py +++ b/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py @@ -15,6 +15,8 @@ from ethereum_test_forks import Fork from pytest_plugins.consume.simulators.helpers.ruleset import ruleset +from .test_tracker import PreAllocGroupTestTracker + logger = logging.getLogger(__name__) @@ -248,12 +250,22 @@ def set_client(self, client: Client) -> None: ) def stop(self) -> None: - """Override to log with pre_hash information.""" + """Override to log with pre_hash information and actually stop the client.""" if self._is_started: logger.info( - f"Marking multi-test client ({self.client_type.name}) for pre-allocation group " - f"{self.pre_hash} as stopped after {self.test_count} tests" + f"Stopping multi-test client ({self.client_type.name}) for pre-allocation group " + f"{self.pre_hash} after {self.test_count} tests" ) + # Actually stop the Hive client + if self.client is not None: + try: + self.client.stop() + logger.debug(f"Hive client stopped for pre-allocation group {self.pre_hash}") + except Exception as e: + logger.error( + f"Error stopping Hive client for pre-allocation group {self.pre_hash}: {e}" + ) + self.client = None self._is_started = False @@ -283,6 +295,7 @@ def __init__(self) -> None: self.multi_test_clients: Dict[str, MultiTestClient] = {} self.pre_alloc_path: Optional[Path] = None + self.test_tracker: Optional["PreAllocGroupTestTracker"] = None self._initialized = True logger.info("MultiTestClientManager initialized") @@ -297,6 +310,17 @@ def set_pre_alloc_path(self, path: Path) -> None: self.pre_alloc_path = path logger.debug(f"Pre-alloc path set to: {path}") + def set_test_tracker(self, test_tracker: "PreAllocGroupTestTracker") -> None: + """ + Set the test tracker for automatic client cleanup. + + Args: + test_tracker: The test tracker instance + + """ + self.test_tracker = test_tracker + logger.debug("Test tracker set for automatic client cleanup") + def load_pre_alloc_group(self, pre_hash: str) -> PreAllocGroup: """ Load the pre-allocation group for a given preHash. @@ -373,12 +397,15 @@ def get_or_create_multi_test_client( return multi_test_client - def get_client_for_test(self, pre_hash: str) -> Optional[Client]: + def get_client_for_test( + self, pre_hash: str, test_id: Optional[str] = None + ) -> Optional[Client]: """ Get the actual client instance for a test with the given preHash. Args: pre_hash: The hash identifying the pre-allocation group + test_id: Optional test ID for completion tracking Returns: The client instance if available, None otherwise @@ -391,6 +418,58 @@ def get_client_for_test(self, pre_hash: str) -> Optional[Client]: return multi_test_client.client return None + def mark_test_completed(self, pre_hash: str, test_id: str) -> None: + """ + Mark a test as completed and trigger automatic client cleanup if appropriate. + + Args: + pre_hash: The hash identifying the pre-allocation group + test_id: The unique test identifier + + """ + if self.test_tracker is None: + logger.debug("No test tracker available, skipping completion tracking") + return + + # Mark test as completed in tracker + is_group_complete = self.test_tracker.mark_test_completed(pre_hash, test_id) + + if is_group_complete: + # All tests in this pre-allocation group are complete + self._auto_stop_client_if_complete(pre_hash) + + def _auto_stop_client_if_complete(self, pre_hash: str) -> None: + """ + Automatically stop the client for a pre-allocation group if all tests are complete. + + Args: + pre_hash: The hash identifying the pre-allocation group + + """ + if pre_hash not in self.multi_test_clients: + logger.debug(f"No client found for pre-allocation group {pre_hash}") + return + + multi_test_client = self.multi_test_clients[pre_hash] + if not multi_test_client.is_running: + logger.debug(f"Client for pre-allocation group {pre_hash} is already stopped") + return + + # Stop the client and remove from tracking + logger.info( + f"Auto-stopping client for pre-allocation group {pre_hash} - " + f"all tests completed ({multi_test_client.test_count} tests executed)" + ) + + try: + multi_test_client.stop() + except Exception as e: + logger.error(f"Error auto-stopping client for pre-allocation group {pre_hash}: {e}") + finally: + # Remove from tracking to free memory + del self.multi_test_clients[pre_hash] + logger.debug(f"Removed completed client from tracking: {pre_hash}") + def stop_all_clients(self) -> None: """Mark all multi-test clients as stopped.""" logger.info(f"Marking all {len(self.multi_test_clients)} multi-test clients as stopped") @@ -422,4 +501,5 @@ def reset(self) -> None: self.stop_all_clients() self.multi_test_clients.clear() self.pre_alloc_path = None + self.test_tracker = None logger.info("MultiTestClientManager reset") diff --git a/src/pytest_plugins/consume/simulators/helpers/test_tracker.py b/src/pytest_plugins/consume/simulators/helpers/test_tracker.py new file mode 100644 index 00000000000..2498584c81e --- /dev/null +++ b/src/pytest_plugins/consume/simulators/helpers/test_tracker.py @@ -0,0 +1,177 @@ +"""Test tracking utilities for pre-allocation group lifecycle management.""" + +import logging +from dataclasses import dataclass, field +from typing import Dict, Set + +import pytest + +logger = logging.getLogger(__name__) + + +@dataclass +class PreAllocGroupTestTracker: + """ + Tracks test execution progress per pre-allocation group. + + This class enables automatic client cleanup by monitoring when all tests + in a pre-allocation group have completed execution. + """ + + group_test_counts: Dict[str, int] = field(default_factory=dict) + """Total number of tests per pre-allocation group (pre_hash -> count).""" + + group_completed_tests: Dict[str, Set[str]] = field(default_factory=dict) + """Completed test IDs per pre-allocation group (pre_hash -> {test_ids}).""" + + def set_group_test_count(self, pre_hash: str, total_tests: int) -> None: + """ + Set the total number of tests for a pre-allocation group. + + Args: + pre_hash: The pre-allocation group hash + total_tests: Total number of tests in this group + + """ + if pre_hash in self.group_test_counts: + existing_count = self.group_test_counts[pre_hash] + if existing_count != total_tests: + logger.warning( + f"Pre-allocation group {pre_hash} test count mismatch: " + f"existing={existing_count}, new={total_tests}" + ) + + self.group_test_counts[pre_hash] = total_tests + if pre_hash not in self.group_completed_tests: + self.group_completed_tests[pre_hash] = set() + + logger.debug(f"Set test count for pre-allocation group {pre_hash}: {total_tests}") + + def mark_test_completed(self, pre_hash: str, test_id: str) -> bool: + """ + Mark a test as completed for the given pre-allocation group. + + Args: + pre_hash: The pre-allocation group hash + test_id: The unique test identifier + + Returns: + True if all tests in the pre-allocation group are now complete + + """ + if pre_hash not in self.group_completed_tests: + self.group_completed_tests[pre_hash] = set() + + # Avoid double-counting the same test + if test_id in self.group_completed_tests[pre_hash]: + logger.debug(f"Test {test_id} already marked as completed for group {pre_hash}") + return self.is_group_complete(pre_hash) + + self.group_completed_tests[pre_hash].add(test_id) + completed_count = len(self.group_completed_tests[pre_hash]) + total_count = self.group_test_counts.get(pre_hash, 0) + + logger.debug( + f"Test {test_id} completed for pre-allocation group {pre_hash} " + f"({completed_count}/{total_count})" + ) + + is_complete = self.is_group_complete(pre_hash) + if is_complete: + logger.info( + f"All tests completed for pre-allocation group {pre_hash} " + f"({completed_count}/{total_count}) - ready for client cleanup" + ) + + return is_complete + + def is_group_complete(self, pre_hash: str) -> bool: + """ + Check if all tests in a pre-allocation group have completed. + + Args: + pre_hash: The pre-allocation group hash + + Returns: + True if all tests in the group are complete + + """ + if pre_hash not in self.group_test_counts: + logger.warning(f"No test count found for pre-allocation group {pre_hash}") + return False + + total_count = self.group_test_counts[pre_hash] + completed_count = len(self.group_completed_tests.get(pre_hash, set())) + + return completed_count >= total_count + + def get_completion_status(self, pre_hash: str) -> tuple[int, int]: + """ + Get completion status for a pre-allocation group. + + Args: + pre_hash: The pre-allocation group hash + + Returns: + Tuple of (completed_count, total_count) + + """ + total_count = self.group_test_counts.get(pre_hash, 0) + completed_count = len(self.group_completed_tests.get(pre_hash, set())) + return completed_count, total_count + + def get_all_completion_status(self) -> Dict[str, tuple[int, int]]: + """ + Get completion status for all tracked pre-allocation groups. + + Returns: + Dict mapping pre_hash to (completed_count, total_count) + + """ + return { + pre_hash: self.get_completion_status(pre_hash) for pre_hash in self.group_test_counts + } + + def reset_group(self, pre_hash: str) -> None: + """ + Reset tracking data for a specific pre-allocation group. + + Args: + pre_hash: The pre-allocation group hash to reset + + """ + if pre_hash in self.group_test_counts: + del self.group_test_counts[pre_hash] + if pre_hash in self.group_completed_tests: + del self.group_completed_tests[pre_hash] + logger.debug(f"Reset tracking data for pre-allocation group {pre_hash}") + + def reset_all(self) -> None: + """Reset all tracking data.""" + self.group_test_counts.clear() + self.group_completed_tests.clear() + logger.debug("Reset all test tracking data") + + +@pytest.fixture(scope="session") +def pre_alloc_group_test_tracker(request) -> PreAllocGroupTestTracker: + """ + Session-scoped test tracker for pre-allocation group lifecycle management. + + This fixture provides a centralized way to track test completion across + all pre-allocation groups during a pytest session. + """ + tracker = PreAllocGroupTestTracker() + + # Store tracker on session for access by collection hooks + request.session._pre_alloc_group_test_tracker = tracker + + # Load pre-collected group counts if available + if hasattr(request.session, "_pre_alloc_group_counts"): + group_counts = request.session._pre_alloc_group_counts + for pre_hash, count in group_counts.items(): + tracker.set_group_test_count(pre_hash, count) + logger.info(f"Loaded test counts for {len(group_counts)} pre-allocation groups") + + logger.info("Pre-allocation group test tracker initialized") + return tracker diff --git a/src/pytest_plugins/consume/simulators/multi_test_client.py b/src/pytest_plugins/consume/simulators/multi_test_client.py index b12bcc0b699..0cd9fabf92f 100644 --- a/src/pytest_plugins/consume/simulators/multi_test_client.py +++ b/src/pytest_plugins/consume/simulators/multi_test_client.py @@ -118,21 +118,32 @@ def client( pre_alloc_group: PreAllocGroup, multi_test_client_manager: MultiTestClientManager, fixtures_source: FixturesSource, + pre_alloc_group_test_tracker, + request, ) -> Generator[Client, None, None]: """Initialize or reuse multi-test client for the test group.""" logger.info("🔥 MULTI-TEST CLIENT FIXTURE CALLED - Using multi-test client architecture!") pre_hash = fixture.pre_hash + test_id = request.node.nodeid # Set pre-alloc path in manager if not already set if multi_test_client_manager.pre_alloc_path is None: fixture_output = FixtureOutput(output_path=fixtures_source.path) multi_test_client_manager.set_pre_alloc_path(fixture_output.pre_alloc_groups_folder_path) + # Set test tracker in manager if not already set + if multi_test_client_manager.test_tracker is None: + multi_test_client_manager.set_test_tracker(pre_alloc_group_test_tracker) + # Check for existing client - existing_client = multi_test_client_manager.get_client_for_test(pre_hash) + existing_client = multi_test_client_manager.get_client_for_test(pre_hash, test_id) if existing_client is not None: logger.info(f"Reusing multi-test client for pre-allocation group {pre_hash}") - yield existing_client + try: + yield existing_client + finally: + # Mark test as completed when fixture teardown occurs + multi_test_client_manager.mark_test_completed(pre_hash, test_id) return # Start new multi-test client @@ -157,4 +168,8 @@ def client( multi_test_client.set_client(hive_client) logger.info(f"Multi-test client ready for pre-allocation group {pre_hash}") - yield hive_client + try: + yield hive_client + finally: + # Mark test as completed when fixture teardown occurs + multi_test_client_manager.mark_test_completed(pre_hash, test_id) From 50a37e6b4c220104c1bcd8919b7686db91a18c06 Mon Sep 17 00:00:00 2001 From: danceratopz Date: Wed, 18 Jun 2025 08:27:07 +0200 Subject: [PATCH 3/5] feat(consume): add `--enginex-fcu-frequency` option --- .../consume/simulators/enginex/conftest.py | 41 +++++++- .../simulators/helpers/test_tracker.py | 99 +++++++++++++++++++ .../simulator_logic/test_via_engine.py | 42 +++++++- 3 files changed, 178 insertions(+), 4 deletions(-) diff --git a/src/pytest_plugins/consume/simulators/enginex/conftest.py b/src/pytest_plugins/consume/simulators/enginex/conftest.py index 75b3767f4b9..79408c76143 100644 --- a/src/pytest_plugins/consume/simulators/enginex/conftest.py +++ b/src/pytest_plugins/consume/simulators/enginex/conftest.py @@ -26,10 +26,29 @@ ) +def pytest_addoption(parser): + """Add enginex-specific command line options.""" + enginex_group = parser.getgroup("enginex", "EngineX simulator options") + enginex_group.addoption( + "--enginex-fcu-frequency", + action="store", + type=int, + default=0, + help=( + "Control forkchoice update frequency for enginex simulator. " + "0=disable FCUs (default), 1=FCU every test, N=FCU every Nth test per " + "pre-allocation group." + ), + ) + + def pytest_configure(config): - """Set the supported fixture formats for the engine simulator.""" + """Set the supported fixture formats and store enginex configuration.""" config._supported_fixture_formats = [BlockchainEngineXFixture.format_name] + # Store FCU frequency on config for access by fixtures + config.enginex_fcu_frequency = config.getoption("--enginex-fcu-frequency", 1) + @pytest.fixture(scope="module") def test_suite_name() -> str: @@ -105,3 +124,23 @@ def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) }, ) return EngineRPC(f"http://{client.ip}:8551") + + +@pytest.fixture(scope="session") +def fcu_frequency_tracker(request): + """ + Session-scoped FCU frequency tracker for enginex simulator. + + This fixture is imported from test_tracker module and configured + with the --enginex-fcu-frequency command line option. + """ + # Import here to avoid circular imports + from ..helpers.test_tracker import FCUFrequencyTracker + + # Get FCU frequency from pytest config (set by command line argument) + fcu_frequency = getattr(request.config, "enginex_fcu_frequency", 1) + + tracker = FCUFrequencyTracker(fcu_frequency=fcu_frequency) + logger.info(f"FCU frequency tracker initialized with frequency: {fcu_frequency}") + + return tracker diff --git a/src/pytest_plugins/consume/simulators/helpers/test_tracker.py b/src/pytest_plugins/consume/simulators/helpers/test_tracker.py index 2498584c81e..778d7c1b2aa 100644 --- a/src/pytest_plugins/consume/simulators/helpers/test_tracker.py +++ b/src/pytest_plugins/consume/simulators/helpers/test_tracker.py @@ -175,3 +175,102 @@ def pre_alloc_group_test_tracker(request) -> PreAllocGroupTestTracker: logger.info("Pre-allocation group test tracker initialized") return tracker + + +@dataclass +class FCUFrequencyTracker: + """ + Tracks forkchoice update frequency per pre-allocation group. + + This class enables controlling how often forkchoice updates are performed + during test execution on a per-pre-allocation-group basis. + """ + + fcu_frequency: int + """Frequency of FCU operations (0=disabled, 1=every test, N=every Nth test).""" + + group_test_counters: Dict[str, int] = field(default_factory=dict) + """Test counters per pre-allocation group (pre_hash -> count).""" + + def should_perform_fcu(self, pre_hash: str) -> bool: + """ + Check if forkchoice update should be performed for this test. + + Args: + pre_hash: The pre-allocation group hash + + Returns: + True if FCU should be performed for this test + + """ + if self.fcu_frequency == 0: + logger.debug(f"FCU disabled for pre-allocation group {pre_hash} (frequency=0)") + return False + + current_count = self.group_test_counters.get(pre_hash, 0) + should_perform = (current_count % self.fcu_frequency) == 0 + + logger.debug( + f"FCU decision for pre-allocation group {pre_hash}: " + f"perform={should_perform} (test_count={current_count}, " + f"frequency={self.fcu_frequency})" + ) + + return should_perform + + def increment_test_count(self, pre_hash: str) -> None: + """ + Increment test counter for pre-allocation group. + + Args: + pre_hash: The pre-allocation group hash + + """ + current_count = self.group_test_counters.get(pre_hash, 0) + new_count = current_count + 1 + self.group_test_counters[pre_hash] = new_count + + logger.debug( + f"Incremented test count for pre-allocation group {pre_hash}: " + f"{current_count} -> {new_count}" + ) + + def get_test_count(self, pre_hash: str) -> int: + """ + Get current test count for pre-allocation group. + + Args: + pre_hash: The pre-allocation group hash + + Returns: + Current test count for the group + + """ + return self.group_test_counters.get(pre_hash, 0) + + def get_all_test_counts(self) -> Dict[str, int]: + """ + Get test counts for all tracked pre-allocation groups. + + Returns: + Dict mapping pre_hash to test count + + """ + return dict(self.group_test_counters) + + def reset_group(self, pre_hash: str) -> None: + """ + Reset test counter for a specific pre-allocation group. + + Args: + pre_hash: The pre-allocation group hash to reset + + """ + if pre_hash in self.group_test_counters: + del self.group_test_counters[pre_hash] + logger.debug(f"Reset test counter for pre-allocation group {pre_hash}") + + def reset_all(self) -> None: + """Reset all test counters.""" + self.group_test_counters.clear() + logger.debug("Reset all FCU frequency test counters") diff --git a/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py b/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py index 82b1f7d89b6..e580cc81dfc 100644 --- a/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py +++ b/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py @@ -39,14 +39,36 @@ def test_blockchain_via_engine( fixture: BlockchainEngineFixture | BlockchainEngineXFixture, genesis_header: FixtureHeader, strict_exception_matching: bool, + fcu_frequency_tracker=None, # Optional for enginex simulator + request=None, # For accessing test info ): """ 1. Check the client genesis block hash matches `genesis.block_hash`. 2. Execute the test case fixture blocks against the client under test using the `engine_newPayloadVX` method from the Engine API. - 3. For valid payloads a forkchoice update is performed to finalize the chain. + 3. For valid payloads a forkchoice update is performed to finalize the chain + (controlled by FCU frequency for enginex simulator). """ - # Send an initial forkchoice update + # Determine if we should perform forkchoice updates based on frequency tracker + should_perform_fcus = True # Default behavior for engine simulator + pre_hash = None + + if fcu_frequency_tracker is not None and hasattr(fixture, "pre_hash"): + # EngineX simulator with forkchoice update frequency control + pre_hash = fixture.pre_hash + should_perform_fcus = fcu_frequency_tracker.should_perform_fcu(pre_hash) + + logger.info( + f"Forkchoice update frequency check for pre-allocation group {pre_hash}: " + f"perform_fcu={should_perform_fcus} " + f"(frequency={fcu_frequency_tracker.fcu_frequency}, " + f"test_count={fcu_frequency_tracker.get_test_count(pre_hash)})" + ) + + # Always increment the test counter at the start for proper tracking + if fcu_frequency_tracker is not None and pre_hash is not None: + fcu_frequency_tracker.increment_test_count(pre_hash) + # Send a initial forkchoice update with timing_data.time("Initial forkchoice update"): logger.info("Sending initial forkchoice update to genesis block...") delay = 0.5 @@ -155,7 +177,7 @@ def test_blockchain_via_engine( f"Unexpected error code: {e.code}, expected: {payload.error_code}" ) from e - if payload.valid(): + if payload.valid() and should_perform_fcus: with payload_timing.time( f"engine_forkchoiceUpdatedV{payload.forkchoice_updated_version}" ): @@ -176,4 +198,18 @@ def test_blockchain_via_engine( f"unexpected status: want {PayloadStatusEnum.VALID}," f" got {forkchoice_response.payload_status.status}" ) + elif payload.valid() and not should_perform_fcus: + logger.info( + f"Skipping forkchoice update for payload {i + 1} due to frequency setting " + f"(pre-allocation group: {pre_hash})" + ) logger.info("All payloads processed successfully.") + + # Log final FCU frequency statistics for enginex simulator + if fcu_frequency_tracker is not None and pre_hash is not None: + final_count = fcu_frequency_tracker.get_test_count(pre_hash) + logger.info( + f"Test completed for pre-allocation group {pre_hash}. " + f"Total tests in group: {final_count}, " + f"FCU frequency: {fcu_frequency_tracker.fcu_frequency}" + ) From 776b315768e524adf7ccb4f921421521d7a59ff6 Mon Sep 17 00:00:00 2001 From: danceratopz Date: Fri, 20 Jun 2025 09:58:58 +0200 Subject: [PATCH 4/5] feat(consume): add `--enginex-max-group-size` to load-balance xdist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This create pre_hash subgroups w/max size for better xdist balancing. - Replace function name check with fixture format detection to properly distinguish between engine and enginex simulators. Both use the same test function name but process different fixture formats: - Engine simulator: "blockchain_test_engine" → standard parametrization - EngineX simulator: "blockchain_test_engine_x" → enhanced parametrization with xdist group splitting and load balancing - Fixes unit test failures by checking for 'test_case' parameter presence instead of maintaining an allowlist of function names. --- src/pytest_plugins/consume/consume.py | 206 ++++++++++- .../consume/simulators/enginex/conftest.py | 59 +++- .../simulators/helpers/client_wrapper.py | 162 ++++++--- .../simulators/helpers/test_tracker.py | 168 ++++----- .../consume/simulators/multi_test_client.py | 25 +- .../simulator_logic/test_via_engine.py | 102 +++--- .../tests/test_group_identifier_container.py | 319 ++++++++++++++++++ 7 files changed, 822 insertions(+), 219 deletions(-) create mode 100644 src/pytest_plugins/consume/tests/test_group_identifier_container.py diff --git a/src/pytest_plugins/consume/consume.py b/src/pytest_plugins/consume/consume.py index d070143af2a..3906fb63bfc 100644 --- a/src/pytest_plugins/consume/consume.py +++ b/src/pytest_plugins/consume/consume.py @@ -1,12 +1,20 @@ -"""A pytest plugin providing common functionality for consuming test fixtures.""" +""" +A pytest plugin providing common functionality for consuming test fixtures. +Features: +- Downloads and caches test fixtures from various sources (local, URL, release). +- Manages test case generation from fixture files. +- Provides xdist load balancing for large pre-allocation groups (enginex simulator). +""" + +import logging import re import sys import tarfile from dataclasses import dataclass from io import BytesIO from pathlib import Path -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from urllib.parse import urlparse import platformdirs @@ -22,11 +30,118 @@ from .releases import ReleaseTag, get_release_page_url, get_release_url, is_release_url, is_url +logger = logging.getLogger(__name__) + CACHED_DOWNLOADS_DIRECTORY = ( Path(platformdirs.user_cache_dir("ethereum-execution-spec-tests")) / "cached_downloads" ) +class XDistGroupMapper: + """ + Maps test cases to xdist groups, splitting large pre-allocation groups into sub-groups. + + This class helps improve load balancing when using pytest-xdist with --dist=loadgroup + by breaking up large pre-allocation groups (e.g., 1000+ tests) into smaller virtual + sub-groups while maintaining the constraint that tests from the same pre-allocation + group must run on the same worker. + """ + + def __init__(self, max_group_size: int = 100): + """Initialize the mapper with a maximum group size.""" + self.max_group_size = max_group_size + self.group_sizes: Dict[str, int] = {} + self.test_to_subgroup: Dict[str, int] = {} + self._built = False + + def build_mapping(self, test_cases: TestCases) -> None: + """ + Build the mapping of test cases to sub-groups. + + This analyzes all test cases and determines which pre-allocation groups + need to be split into sub-groups based on the max_group_size. + """ + if self._built: + return + + # Count tests per pre-allocation group + for test_case in test_cases: + if hasattr(test_case, "pre_hash") and test_case.pre_hash: + pre_hash = test_case.pre_hash + self.group_sizes[pre_hash] = self.group_sizes.get(pre_hash, 0) + 1 + + # Assign sub-groups for large groups + group_counters: Dict[str, int] = {} + for test_case in test_cases: + if hasattr(test_case, "pre_hash") and test_case.pre_hash: + pre_hash = test_case.pre_hash + group_size = self.group_sizes[pre_hash] + + if group_size <= self.max_group_size: + # Small group, no sub-group needed + self.test_to_subgroup[test_case.id] = 0 + else: + # Large group, assign to sub-group using round-robin + counter = group_counters.get(pre_hash, 0) + sub_group = counter // self.max_group_size + self.test_to_subgroup[test_case.id] = sub_group + group_counters[pre_hash] = counter + 1 + + self._built = True + + # Log summary of large groups + large_groups = [ + (pre_hash, size) + for pre_hash, size in self.group_sizes.items() + if size > self.max_group_size + ] + if large_groups: + logger.info( + f"Found {len(large_groups)} pre-allocation groups larger than " + f"{self.max_group_size} tests that will be split for better load balancing" + ) + + def get_xdist_group_name(self, test_case) -> str: + """ + Get the xdist group name for a test case. + + For small groups, returns the pre_hash as-is. + For large groups, returns "{pre_hash}:{sub_group_index}". + """ + if not hasattr(test_case, "pre_hash") or not test_case.pre_hash: + # No pre_hash, use test ID as fallback + return test_case.id + + pre_hash = test_case.pre_hash + group_size = self.group_sizes.get(pre_hash, 0) + + if group_size <= self.max_group_size: + # Small group, use pre_hash as-is + return pre_hash + + # Large group, include sub-group index + sub_group = self.test_to_subgroup.get(test_case.id, 0) + return f"{pre_hash}:{sub_group}" + + def get_split_statistics(self) -> Dict[str, Dict[str, int]]: + """ + Get statistics about how groups were split. + + Returns a dict with information about each pre-allocation group + and how many sub-groups it was split into. + """ + stats = {} + for pre_hash, size in self.group_sizes.items(): + if size > self.max_group_size: + num_subgroups = (size + self.max_group_size - 1) // self.max_group_size + stats[pre_hash] = { + "total_tests": size, + "num_subgroups": num_subgroups, + "average_tests_per_subgroup": size // num_subgroups, + } + return stats + + def default_input() -> str: """ Directory (default) to consume generated test fixtures from. Defined as a @@ -419,6 +534,28 @@ def pytest_configure(config): # noqa: D103 index = IndexFile.model_validate_json(index_file.read_text()) config.test_cases = index.test_cases + # Create XDistGroupMapper for enginex simulator if enginex options are present + try: + max_group_size = config.getoption("--enginex-max-group-size", None) + if max_group_size is not None: + config.xdist_group_mapper = XDistGroupMapper(max_group_size) + config.xdist_group_mapper.build_mapping(config.test_cases) + + split_stats = config.xdist_group_mapper.get_split_statistics() + if split_stats and config.option.verbose >= 1: + rich.print("[bold yellow]Pre-allocation group splitting for load balancing:[/]") + for pre_hash, stats in split_stats.items(): + rich.print( + f" Group {pre_hash[:8]}: {stats['total_tests']} tests → " + f"{stats['num_subgroups']} sub-groups " + f"(~{stats['average_tests_per_subgroup']} tests each)" + ) + rich.print(f" Max group size: {max_group_size}") + else: + config.xdist_group_mapper = None + except ValueError: + config.xdist_group_mapper = None + for fixture_format in BaseFixture.formats.values(): config.addinivalue_line( "markers", @@ -485,29 +622,70 @@ def pytest_generate_tests(metafunc): """ Generate test cases for every test fixture in all the JSON fixture files within the specified fixtures directory, or read from stdin if the directory is 'stdin'. + + This function only applies to the test_blockchain_via_engine test function + to avoid conflicts with other consume simulators. """ if "cache" in sys.argv: return + # Only apply to functions that have a 'test_case' parameter (consume test functions) + if "test_case" not in metafunc.fixturenames: + return + test_cases = metafunc.config.test_cases + xdist_group_mapper = getattr(metafunc.config, "xdist_group_mapper", None) param_list = [] + + # Check if this is an enginex simulator (has enginex-specific enhancements) + is_enginex_function = ( + hasattr(metafunc.config, "_supported_fixture_formats") + and "blockchain_test_engine_x" in metafunc.config._supported_fixture_formats + ) for test_case in test_cases: - if test_case.format.format_name not in metafunc.config._supported_fixture_formats: + # Check if _supported_fixture_formats is set, if not allow all formats + supported_formats = getattr(metafunc.config, "_supported_fixture_formats", None) + if supported_formats and test_case.format.format_name not in supported_formats: continue + fork_markers = get_relative_fork_markers(test_case.fork, strict_mode=False) - # Append pre_hash (first 8 chars) to test ID for easier selection with --sim.limit + # Basic test ID and markers (used by all consume tests) test_id = test_case.id - if hasattr(test_case, "pre_hash") and test_case.pre_hash: - test_id = f"{test_case.id}[{test_case.pre_hash[:8]}]" - - param = pytest.param( - test_case, - id=test_id, - marks=[getattr(pytest.mark, m) for m in fork_markers] - + [getattr(pytest.mark, test_case.format.format_name)] - + [pytest.mark.xdist_group(name=test_case.pre_hash)], - ) + markers = [getattr(pytest.mark, m) for m in fork_markers] + [ + getattr(pytest.mark, test_case.format.format_name) + ] + + # Apply enginex-specific enhancements only for enginex functions + if is_enginex_function: + # Determine xdist group name for enginex load balancing + if xdist_group_mapper and hasattr(test_case, "pre_hash") and test_case.pre_hash: + # Use the mapper to get potentially split group name + xdist_group_name = xdist_group_mapper.get_xdist_group_name(test_case) + elif hasattr(test_case, "pre_hash") and test_case.pre_hash: + # No mapper or not enginex, use pre_hash directly + xdist_group_name = test_case.pre_hash + else: + # No pre_hash, use test ID + xdist_group_name = test_case.id + + # Create enhanced test ID showing the xdist group name for easier identification + if hasattr(test_case, "pre_hash") and test_case.pre_hash: + # Show first 8 chars of xdist group name (includes sub-group if split) + group_display = ( + xdist_group_name[:8] if len(xdist_group_name) > 8 else xdist_group_name + ) + # If it's a split group (contains ':'), show that clearly + if ":" in xdist_group_name: + # Extract sub-group number for display + pre_hash_part, sub_group = xdist_group_name.split(":", 1) + group_display = f"{pre_hash_part[:8]}:{sub_group}" + test_id = f"{test_case.id}[{group_display}]" + + # Add xdist group marker for load balancing + markers.append(pytest.mark.xdist_group(name=xdist_group_name)) + + param = pytest.param(test_case, id=test_id, marks=markers) param_list.append(param) metafunc.parametrize("test_case", param_list) diff --git a/src/pytest_plugins/consume/simulators/enginex/conftest.py b/src/pytest_plugins/consume/simulators/enginex/conftest.py index 79408c76143..5d29a312d0f 100644 --- a/src/pytest_plugins/consume/simulators/enginex/conftest.py +++ b/src/pytest_plugins/consume/simulators/enginex/conftest.py @@ -31,6 +31,7 @@ def pytest_addoption(parser): enginex_group = parser.getgroup("enginex", "EngineX simulator options") enginex_group.addoption( "--enginex-fcu-frequency", + dest="enginex_fcu_frequency", action="store", type=int, default=0, @@ -40,15 +41,23 @@ def pytest_addoption(parser): "pre-allocation group." ), ) + enginex_group.addoption( + "--enginex-max-group-size", + dest="enginex_max_group_size", + action="store", + type=int, + default=100, + help=( + "Maximum number of tests per xdist group. Large pre-allocation groups will be " + "split into virtual sub-groups to improve load balancing. Default: 100." + ), + ) def pytest_configure(config): """Set the supported fixture formats and store enginex configuration.""" config._supported_fixture_formats = [BlockchainEngineXFixture.format_name] - # Store FCU frequency on config for access by fixtures - config.enginex_fcu_frequency = config.getoption("--enginex-fcu-frequency", 1) - @pytest.fixture(scope="module") def test_suite_name() -> str: @@ -67,11 +76,11 @@ def test_suite_description() -> str: def pytest_collection_modifyitems(session, config, items): """ - Build pre-allocation group test counts during collection phase. + Build group test counts during collection phase. This hook analyzes all collected test items to determine how many tests - belong to each pre-allocation group, enabling automatic client cleanup - when all tests in a group are complete. + belong to each group (pre-allocation groups or xdist subgroups), enabling + automatic client cleanup when all tests in a group are complete. """ # Only process items for enginex simulator if not hasattr(config, "_supported_fixture_formats"): @@ -89,14 +98,22 @@ def pytest_collection_modifyitems(session, config, items): if hasattr(item, "callspec") and "test_case" in item.callspec.params: test_case = item.callspec.params["test_case"] if hasattr(test_case, "pre_hash"): - pre_hash = test_case.pre_hash - group_counts[pre_hash] = group_counts.get(pre_hash, 0) + 1 + # Get group identifier from xdist marker if available + group_identifier = None + for marker in item.iter_markers("xdist_group"): + if hasattr(marker, "kwargs") and "name" in marker.kwargs: + group_identifier = marker.kwargs["name"] + break + + # Fallback to pre_hash if no xdist marker (sequential execution) + if group_identifier is None: + group_identifier = test_case.pre_hash + + group_counts[group_identifier] = group_counts.get(group_identifier, 0) + 1 # Store on session for later retrieval by test_tracker fixture session._pre_alloc_group_counts = group_counts - logger.info( - f"Collected {len(group_counts)} pre-allocation groups with tests: {dict(group_counts)}" - ) + logger.info(f"Collected {len(group_counts)} groups with tests: {dict(group_counts)}") else: # Update tracker directly if it exists group_counts = {} @@ -104,13 +121,23 @@ def pytest_collection_modifyitems(session, config, items): if hasattr(item, "callspec") and "test_case" in item.callspec.params: test_case = item.callspec.params["test_case"] if hasattr(test_case, "pre_hash"): - pre_hash = test_case.pre_hash - group_counts[pre_hash] = group_counts.get(pre_hash, 0) + 1 + # Get group identifier from xdist marker if available + group_identifier = None + for marker in item.iter_markers("xdist_group"): + if hasattr(marker, "kwargs") and "name" in marker.kwargs: + group_identifier = marker.kwargs["name"] + break + + # Fallback to pre_hash if no xdist marker (sequential execution) + if group_identifier is None: + group_identifier = test_case.pre_hash + + group_counts[group_identifier] = group_counts.get(group_identifier, 0) + 1 - for pre_hash, count in group_counts.items(): - test_tracker.set_group_test_count(pre_hash, count) + for group_identifier, count in group_counts.items(): + test_tracker.set_group_test_count(group_identifier, count) - logger.info(f"Updated test tracker with {len(group_counts)} pre-allocation groups") + logger.info(f"Updated test tracker with {len(group_counts)} groups") @pytest.fixture(scope="function") diff --git a/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py b/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py index 74cdf4afc79..ee0674379d6 100644 --- a/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py +++ b/src/pytest_plugins/consume/simulators/helpers/client_wrapper.py @@ -20,6 +20,69 @@ logger = logging.getLogger(__name__) +def get_group_identifier_from_request(request, pre_hash: str) -> str: + """ + Determine the appropriate group identifier for client tracking. + + For xdist execution: Uses xdist group name (includes subgroup suffix if split) + For sequential execution: Uses pre_hash directly + + Args: + request: The pytest request object containing test metadata + pre_hash: The pre-allocation group hash + + Returns: + Group identifier string to use for client tracking + + """ + # Check if this test has an xdist_group marker (indicates xdist execution) + xdist_group_marker = None + iter_markers = getattr(request.node, "iter_markers", None) + if iter_markers is None: + return pre_hash + + for marker in iter_markers("xdist_group"): + xdist_group_marker = marker + break + + if ( + xdist_group_marker + and hasattr(xdist_group_marker, "kwargs") + and "name" in xdist_group_marker.kwargs + ): + group_identifier = xdist_group_marker.kwargs["name"] + logger.debug(f"Using xdist group identifier: {group_identifier}") + return group_identifier + + # Fallback to pre_hash for sequential execution or when no xdist marker is found + logger.debug(f"Using pre_hash as group identifier: {pre_hash}") + return pre_hash + + +def extract_pre_hash_from_group_identifier(group_identifier: str) -> str: + """ + Extract the pre_hash from a group identifier. + + For xdist subgroups: Removes the subgroup suffix (e.g., "0x123:0" -> "0x123") + For sequential: Returns as-is (group_identifier == pre_hash) + + Args: + group_identifier: The group identifier string + + Returns: + The pre_hash without any subgroup suffix + + """ + if ":" in group_identifier: + # Split subgroup format: "pre_hash:subgroup_index" + pre_hash = group_identifier.split(":", 1)[0] + logger.debug(f"Extracted pre_hash {pre_hash} from group identifier {group_identifier}") + return pre_hash + + # No subgroup suffix, return as-is + return group_identifier + + class ClientWrapper(ABC): """ Abstract base class for managing client instances in engine simulators. @@ -274,8 +337,9 @@ class MultiTestClientManager: """ Singleton manager for coordinating multi-test clients across test execution. - This class tracks all multi-test clients by their preHash and ensures proper - lifecycle management including cleanup at session end. + This class tracks all multi-test clients by their group identifier and ensures proper + lifecycle management including cleanup at session end. Group identifiers can be + either pre_hash (for sequential execution) or xdist group names (for parallel execution). """ _instance: Optional["MultiTestClientManager"] = None @@ -293,7 +357,7 @@ def __init__(self) -> None: if hasattr(self, "_initialized") and self._initialized: return - self.multi_test_clients: Dict[str, MultiTestClient] = {} + self.multi_test_clients: Dict[str, MultiTestClient] = {} # group_identifier -> client self.pre_alloc_path: Optional[Path] = None self.test_tracker: Optional["PreAllocGroupTestTracker"] = None self._initialized = True @@ -321,12 +385,12 @@ def set_test_tracker(self, test_tracker: "PreAllocGroupTestTracker") -> None: self.test_tracker = test_tracker logger.debug("Test tracker set for automatic client cleanup") - def load_pre_alloc_group(self, pre_hash: str) -> PreAllocGroup: + def load_pre_alloc_group(self, group_identifier: str) -> PreAllocGroup: """ - Load the pre-allocation group for a given preHash. + Load the pre-allocation group for a given group identifier. Args: - pre_hash: The hash identifying the pre-allocation group + group_identifier: The group identifier (pre_hash or xdist group name) Returns: The loaded PreAllocGroup @@ -339,6 +403,8 @@ def load_pre_alloc_group(self, pre_hash: str) -> PreAllocGroup: if self.pre_alloc_path is None: raise RuntimeError("Pre-alloc path not set in MultiTestClientManager") + # Extract pre_hash from group identifier (handles subgroups) + pre_hash = extract_pre_hash_from_group_identifier(group_identifier) pre_alloc_file = self.pre_alloc_path / f"{pre_hash}.json" if not pre_alloc_file.exists(): raise FileNotFoundError(f"Pre-allocation file not found: {pre_alloc_file}") @@ -347,38 +413,41 @@ def load_pre_alloc_group(self, pre_hash: str) -> PreAllocGroup: def get_or_create_multi_test_client( self, - pre_hash: str, + group_identifier: str, client_type: ClientType, ) -> MultiTestClient: """ - Get an existing MultiTestClient or create a new one for the given preHash. + Get an existing MultiTestClient or create a new one for the given group identifier. This method doesn't start the actual client - that's done by HiveTestSuite. It just manages the MultiTestClient wrapper objects. Args: - pre_hash: The hash identifying the pre-allocation group + group_identifier: The group identifier (pre_hash or xdist group name) client_type: The type of client that will be started Returns: The MultiTestClient wrapper instance """ - # Check if we already have a MultiTestClient for this preHash - if pre_hash in self.multi_test_clients: - multi_test_client = self.multi_test_clients[pre_hash] + # Check if we already have a MultiTestClient for this group identifier + if group_identifier in self.multi_test_clients: + multi_test_client = self.multi_test_clients[group_identifier] if multi_test_client.is_running: - logger.debug(f"Found existing MultiTestClient for pre-allocation group {pre_hash}") + logger.debug(f"Found existing MultiTestClient for group {group_identifier}") return multi_test_client else: # MultiTestClient exists but isn't running, remove it logger.warning( - f"Found stopped MultiTestClient for pre-allocation group {pre_hash}, removing" + f"Found stopped MultiTestClient for group {group_identifier}, removing" ) - del self.multi_test_clients[pre_hash] + del self.multi_test_clients[group_identifier] # Load the pre-allocation group for this group - pre_alloc_group = self.load_pre_alloc_group(pre_hash) + pre_alloc_group = self.load_pre_alloc_group(group_identifier) + + # Extract pre_hash for the MultiTestClient constructor + pre_hash = extract_pre_hash_from_group_identifier(group_identifier) # Create new MultiTestClient wrapper multi_test_client = MultiTestClient( @@ -387,43 +456,43 @@ def get_or_create_multi_test_client( pre_alloc_group=pre_alloc_group, ) - # Track the MultiTestClient - self.multi_test_clients[pre_hash] = multi_test_client + # Track the MultiTestClient by group identifier + self.multi_test_clients[group_identifier] = multi_test_client logger.info( - f"Created new MultiTestClient wrapper for pre-allocation group {pre_hash} " - f"(total tracked clients: {len(self.multi_test_clients)})" + f"Created new MultiTestClient wrapper for group {group_identifier} " + f"(pre_hash: {pre_hash}, total tracked clients: {len(self.multi_test_clients)})" ) return multi_test_client def get_client_for_test( - self, pre_hash: str, test_id: Optional[str] = None + self, group_identifier: str, test_id: Optional[str] = None ) -> Optional[Client]: """ - Get the actual client instance for a test with the given preHash. + Get the actual client instance for a test with the given group identifier. Args: - pre_hash: The hash identifying the pre-allocation group + group_identifier: The group identifier (pre_hash or xdist group name) test_id: Optional test ID for completion tracking Returns: The client instance if available, None otherwise """ - if pre_hash in self.multi_test_clients: - multi_test_client = self.multi_test_clients[pre_hash] + if group_identifier in self.multi_test_clients: + multi_test_client = self.multi_test_clients[group_identifier] if multi_test_client.is_running: multi_test_client.increment_test_count() return multi_test_client.client return None - def mark_test_completed(self, pre_hash: str, test_id: str) -> None: + def mark_test_completed(self, group_identifier: str, test_id: str) -> None: """ Mark a test as completed and trigger automatic client cleanup if appropriate. Args: - pre_hash: The hash identifying the pre-allocation group + group_identifier: The group identifier (pre_hash or xdist group name) test_id: The unique test identifier """ @@ -432,57 +501,55 @@ def mark_test_completed(self, pre_hash: str, test_id: str) -> None: return # Mark test as completed in tracker - is_group_complete = self.test_tracker.mark_test_completed(pre_hash, test_id) + is_group_complete = self.test_tracker.mark_test_completed(group_identifier, test_id) if is_group_complete: - # All tests in this pre-allocation group are complete - self._auto_stop_client_if_complete(pre_hash) + # All tests in this group are complete + self._auto_stop_client_if_complete(group_identifier) - def _auto_stop_client_if_complete(self, pre_hash: str) -> None: + def _auto_stop_client_if_complete(self, group_identifier: str) -> None: """ - Automatically stop the client for a pre-allocation group if all tests are complete. + Automatically stop the client for a group if all tests are complete. Args: - pre_hash: The hash identifying the pre-allocation group + group_identifier: The group identifier (pre_hash or xdist group name) """ - if pre_hash not in self.multi_test_clients: - logger.debug(f"No client found for pre-allocation group {pre_hash}") + if group_identifier not in self.multi_test_clients: + logger.debug(f"No client found for group {group_identifier}") return - multi_test_client = self.multi_test_clients[pre_hash] + multi_test_client = self.multi_test_clients[group_identifier] if not multi_test_client.is_running: - logger.debug(f"Client for pre-allocation group {pre_hash} is already stopped") + logger.debug(f"Client for group {group_identifier} is already stopped") return # Stop the client and remove from tracking logger.info( - f"Auto-stopping client for pre-allocation group {pre_hash} - " + f"Auto-stopping client for group {group_identifier} - " f"all tests completed ({multi_test_client.test_count} tests executed)" ) try: multi_test_client.stop() except Exception as e: - logger.error(f"Error auto-stopping client for pre-allocation group {pre_hash}: {e}") + logger.error(f"Error auto-stopping client for group {group_identifier}: {e}") finally: # Remove from tracking to free memory - del self.multi_test_clients[pre_hash] - logger.debug(f"Removed completed client from tracking: {pre_hash}") + del self.multi_test_clients[group_identifier] + logger.debug(f"Removed completed client from tracking: {group_identifier}") def stop_all_clients(self) -> None: """Mark all multi-test clients as stopped.""" logger.info(f"Marking all {len(self.multi_test_clients)} multi-test clients as stopped") - for pre_hash, multi_test_client in list(self.multi_test_clients.items()): + for group_identifier, multi_test_client in list(self.multi_test_clients.items()): try: multi_test_client.stop() except Exception as e: - logger.error( - f"Error stopping MultiTestClient for pre-allocation group {pre_hash}: {e}" - ) + logger.error(f"Error stopping MultiTestClient for group {group_identifier}: {e}") finally: - del self.multi_test_clients[pre_hash] + del self.multi_test_clients[group_identifier] logger.info("All MultiTestClient wrappers cleared") @@ -493,7 +560,8 @@ def get_client_count(self) -> int: def get_test_counts(self) -> Dict[str, int]: """Get test counts for each multi-test client.""" return { - pre_hash: client.test_count for pre_hash, client in self.multi_test_clients.items() + group_identifier: client.test_count + for group_identifier, client in self.multi_test_clients.items() } def reset(self) -> None: diff --git a/src/pytest_plugins/consume/simulators/helpers/test_tracker.py b/src/pytest_plugins/consume/simulators/helpers/test_tracker.py index 778d7c1b2aa..aff97f77a85 100644 --- a/src/pytest_plugins/consume/simulators/helpers/test_tracker.py +++ b/src/pytest_plugins/consume/simulators/helpers/test_tracker.py @@ -12,139 +12,143 @@ @dataclass class PreAllocGroupTestTracker: """ - Tracks test execution progress per pre-allocation group. + Tracks test execution progress per test group. This class enables automatic client cleanup by monitoring when all tests - in a pre-allocation group have completed execution. + in a group have completed execution. Groups can be either pre-allocation + groups (sequential execution) or xdist subgroups (parallel execution). """ group_test_counts: Dict[str, int] = field(default_factory=dict) - """Total number of tests per pre-allocation group (pre_hash -> count).""" + """Total number of tests per group (group_identifier -> count).""" group_completed_tests: Dict[str, Set[str]] = field(default_factory=dict) - """Completed test IDs per pre-allocation group (pre_hash -> {test_ids}).""" + """Completed test IDs per group (group_identifier -> {test_ids}).""" - def set_group_test_count(self, pre_hash: str, total_tests: int) -> None: + def set_group_test_count(self, group_identifier: str, total_tests: int) -> None: """ - Set the total number of tests for a pre-allocation group. + Set the total number of tests for a group. Args: - pre_hash: The pre-allocation group hash + group_identifier: The group identifier (pre_hash or xdist group name) total_tests: Total number of tests in this group """ - if pre_hash in self.group_test_counts: - existing_count = self.group_test_counts[pre_hash] + if group_identifier in self.group_test_counts: + existing_count = self.group_test_counts[group_identifier] if existing_count != total_tests: logger.warning( - f"Pre-allocation group {pre_hash} test count mismatch: " + f"Group {group_identifier} test count mismatch: " f"existing={existing_count}, new={total_tests}" ) - self.group_test_counts[pre_hash] = total_tests - if pre_hash not in self.group_completed_tests: - self.group_completed_tests[pre_hash] = set() + self.group_test_counts[group_identifier] = total_tests + if group_identifier not in self.group_completed_tests: + self.group_completed_tests[group_identifier] = set() - logger.debug(f"Set test count for pre-allocation group {pre_hash}: {total_tests}") + logger.debug(f"Set test count for group {group_identifier}: {total_tests}") - def mark_test_completed(self, pre_hash: str, test_id: str) -> bool: + def mark_test_completed(self, group_identifier: str, test_id: str) -> bool: """ - Mark a test as completed for the given pre-allocation group. + Mark a test as completed for the given group. Args: - pre_hash: The pre-allocation group hash + group_identifier: The group identifier (pre_hash or xdist group name) test_id: The unique test identifier Returns: - True if all tests in the pre-allocation group are now complete + True if all tests in the group are now complete """ - if pre_hash not in self.group_completed_tests: - self.group_completed_tests[pre_hash] = set() + if group_identifier not in self.group_completed_tests: + self.group_completed_tests[group_identifier] = set() # Avoid double-counting the same test - if test_id in self.group_completed_tests[pre_hash]: - logger.debug(f"Test {test_id} already marked as completed for group {pre_hash}") - return self.is_group_complete(pre_hash) + if test_id in self.group_completed_tests[group_identifier]: + logger.debug( + f"Test {test_id} already marked as completed for group {group_identifier}" + ) + return self.is_group_complete(group_identifier) - self.group_completed_tests[pre_hash].add(test_id) - completed_count = len(self.group_completed_tests[pre_hash]) - total_count = self.group_test_counts.get(pre_hash, 0) + self.group_completed_tests[group_identifier].add(test_id) + completed_count = len(self.group_completed_tests[group_identifier]) + total_count = self.group_test_counts.get(group_identifier, 0) logger.debug( - f"Test {test_id} completed for pre-allocation group {pre_hash} " + f"Test {test_id} completed for group {group_identifier} " f"({completed_count}/{total_count})" ) - is_complete = self.is_group_complete(pre_hash) + is_complete = self.is_group_complete(group_identifier) if is_complete: logger.info( - f"All tests completed for pre-allocation group {pre_hash} " + f"All tests completed for group {group_identifier} " f"({completed_count}/{total_count}) - ready for client cleanup" ) return is_complete - def is_group_complete(self, pre_hash: str) -> bool: + def is_group_complete(self, group_identifier: str) -> bool: """ - Check if all tests in a pre-allocation group have completed. + Check if all tests in a group have completed. Args: - pre_hash: The pre-allocation group hash + group_identifier: The group identifier (pre_hash or xdist group name) Returns: True if all tests in the group are complete """ - if pre_hash not in self.group_test_counts: - logger.warning(f"No test count found for pre-allocation group {pre_hash}") + if group_identifier not in self.group_test_counts: + logger.warning(f"No test count found for group {group_identifier}") return False - total_count = self.group_test_counts[pre_hash] - completed_count = len(self.group_completed_tests.get(pre_hash, set())) + total_count = self.group_test_counts[group_identifier] + completed_count = len(self.group_completed_tests.get(group_identifier, set())) return completed_count >= total_count - def get_completion_status(self, pre_hash: str) -> tuple[int, int]: + def get_completion_status(self, group_identifier: str) -> tuple[int, int]: """ - Get completion status for a pre-allocation group. + Get completion status for a group. Args: - pre_hash: The pre-allocation group hash + group_identifier: The group identifier (pre_hash or xdist group name) Returns: Tuple of (completed_count, total_count) """ - total_count = self.group_test_counts.get(pre_hash, 0) - completed_count = len(self.group_completed_tests.get(pre_hash, set())) + total_count = self.group_test_counts.get(group_identifier, 0) + completed_count = len(self.group_completed_tests.get(group_identifier, set())) return completed_count, total_count def get_all_completion_status(self) -> Dict[str, tuple[int, int]]: """ - Get completion status for all tracked pre-allocation groups. + Get completion status for all tracked groups. Returns: - Dict mapping pre_hash to (completed_count, total_count) + Dict mapping group_identifier to (completed_count, total_count) """ return { - pre_hash: self.get_completion_status(pre_hash) for pre_hash in self.group_test_counts + group_identifier: self.get_completion_status(group_identifier) + for group_identifier in self.group_test_counts } - def reset_group(self, pre_hash: str) -> None: + def reset_group(self, group_identifier: str) -> None: """ - Reset tracking data for a specific pre-allocation group. + Reset tracking data for a specific group. Args: - pre_hash: The pre-allocation group hash to reset + group_identifier: The group identifier to reset """ - if pre_hash in self.group_test_counts: - del self.group_test_counts[pre_hash] - if pre_hash in self.group_completed_tests: - del self.group_completed_tests[pre_hash] - logger.debug(f"Reset tracking data for pre-allocation group {pre_hash}") + if group_identifier in self.group_test_counts: + del self.group_test_counts[group_identifier] + if group_identifier in self.group_completed_tests: + del self.group_completed_tests[group_identifier] + logger.debug(f"Reset tracking data for group {group_identifier}") def reset_all(self) -> None: """Reset all tracking data.""" @@ -169,9 +173,9 @@ def pre_alloc_group_test_tracker(request) -> PreAllocGroupTestTracker: # Load pre-collected group counts if available if hasattr(request.session, "_pre_alloc_group_counts"): group_counts = request.session._pre_alloc_group_counts - for pre_hash, count in group_counts.items(): - tracker.set_group_test_count(pre_hash, count) - logger.info(f"Loaded test counts for {len(group_counts)} pre-allocation groups") + for group_identifier, count in group_counts.items(): + tracker.set_group_test_count(group_identifier, count) + logger.info(f"Loaded test counts for {len(group_counts)} groups") logger.info("Pre-allocation group test tracker initialized") return tracker @@ -180,95 +184,95 @@ def pre_alloc_group_test_tracker(request) -> PreAllocGroupTestTracker: @dataclass class FCUFrequencyTracker: """ - Tracks forkchoice update frequency per pre-allocation group. + Tracks forkchoice update frequency per group. This class enables controlling how often forkchoice updates are performed - during test execution on a per-pre-allocation-group basis. + during test execution on a per-group basis (supporting both pre-allocation + groups and xdist subgroups). """ fcu_frequency: int """Frequency of FCU operations (0=disabled, 1=every test, N=every Nth test).""" group_test_counters: Dict[str, int] = field(default_factory=dict) - """Test counters per pre-allocation group (pre_hash -> count).""" + """Test counters per group (group_identifier -> count).""" - def should_perform_fcu(self, pre_hash: str) -> bool: + def should_perform_fcu(self, group_identifier: str) -> bool: """ Check if forkchoice update should be performed for this test. Args: - pre_hash: The pre-allocation group hash + group_identifier: The group identifier (pre_hash or xdist group name) Returns: True if FCU should be performed for this test """ if self.fcu_frequency == 0: - logger.debug(f"FCU disabled for pre-allocation group {pre_hash} (frequency=0)") + logger.debug(f"FCU disabled for group {group_identifier} (frequency=0)") return False - current_count = self.group_test_counters.get(pre_hash, 0) + current_count = self.group_test_counters.get(group_identifier, 0) should_perform = (current_count % self.fcu_frequency) == 0 logger.debug( - f"FCU decision for pre-allocation group {pre_hash}: " + f"FCU decision for group {group_identifier}: " f"perform={should_perform} (test_count={current_count}, " f"frequency={self.fcu_frequency})" ) return should_perform - def increment_test_count(self, pre_hash: str) -> None: + def increment_test_count(self, group_identifier: str) -> None: """ - Increment test counter for pre-allocation group. + Increment test counter for group. Args: - pre_hash: The pre-allocation group hash + group_identifier: The group identifier (pre_hash or xdist group name) """ - current_count = self.group_test_counters.get(pre_hash, 0) + current_count = self.group_test_counters.get(group_identifier, 0) new_count = current_count + 1 - self.group_test_counters[pre_hash] = new_count + self.group_test_counters[group_identifier] = new_count logger.debug( - f"Incremented test count for pre-allocation group {pre_hash}: " - f"{current_count} -> {new_count}" + f"Incremented test count for group {group_identifier}: {current_count} -> {new_count}" ) - def get_test_count(self, pre_hash: str) -> int: + def get_test_count(self, group_identifier: str) -> int: """ - Get current test count for pre-allocation group. + Get current test count for group. Args: - pre_hash: The pre-allocation group hash + group_identifier: The group identifier (pre_hash or xdist group name) Returns: Current test count for the group """ - return self.group_test_counters.get(pre_hash, 0) + return self.group_test_counters.get(group_identifier, 0) def get_all_test_counts(self) -> Dict[str, int]: """ - Get test counts for all tracked pre-allocation groups. + Get test counts for all tracked groups. Returns: - Dict mapping pre_hash to test count + Dict mapping group_identifier to test count """ return dict(self.group_test_counters) - def reset_group(self, pre_hash: str) -> None: + def reset_group(self, group_identifier: str) -> None: """ - Reset test counter for a specific pre-allocation group. + Reset test counter for a specific group. Args: - pre_hash: The pre-allocation group hash to reset + group_identifier: The group identifier to reset """ - if pre_hash in self.group_test_counters: - del self.group_test_counters[pre_hash] - logger.debug(f"Reset test counter for pre-allocation group {pre_hash}") + if group_identifier in self.group_test_counters: + del self.group_test_counters[group_identifier] + logger.debug(f"Reset test counter for group {group_identifier}") def reset_all(self) -> None: """Reset all test counters.""" diff --git a/src/pytest_plugins/consume/simulators/multi_test_client.py b/src/pytest_plugins/consume/simulators/multi_test_client.py index 0cd9fabf92f..1b18828e053 100644 --- a/src/pytest_plugins/consume/simulators/multi_test_client.py +++ b/src/pytest_plugins/consume/simulators/multi_test_client.py @@ -19,7 +19,10 @@ ) from pytest_plugins.filler.fixture_output import FixtureOutput -from .helpers.client_wrapper import MultiTestClientManager +from .helpers.client_wrapper import ( + MultiTestClientManager, + get_group_identifier_from_request, +) from .helpers.timing import TimingData logger = logging.getLogger(__name__) @@ -126,6 +129,10 @@ def client( pre_hash = fixture.pre_hash test_id = request.node.nodeid + # Determine the appropriate group identifier for this test + group_identifier = get_group_identifier_from_request(request, pre_hash) + logger.info(f"Using group identifier: {group_identifier} (pre_hash: {pre_hash})") + # Set pre-alloc path in manager if not already set if multi_test_client_manager.pre_alloc_path is None: fixture_output = FixtureOutput(output_path=fixtures_source.path) @@ -136,18 +143,18 @@ def client( multi_test_client_manager.set_test_tracker(pre_alloc_group_test_tracker) # Check for existing client - existing_client = multi_test_client_manager.get_client_for_test(pre_hash, test_id) + existing_client = multi_test_client_manager.get_client_for_test(group_identifier, test_id) if existing_client is not None: - logger.info(f"Reusing multi-test client for pre-allocation group {pre_hash}") + logger.info(f"Reusing multi-test client for group {group_identifier}") try: yield existing_client finally: # Mark test as completed when fixture teardown occurs - multi_test_client_manager.mark_test_completed(pre_hash, test_id) + multi_test_client_manager.mark_test_completed(group_identifier, test_id) return # Start new multi-test client - logger.info(f"Starting multi-test client for pre-allocation group {pre_hash}") + logger.info(f"Starting multi-test client for group {group_identifier}") with total_timing_data.time("Start multi-test client"): hive_client = test_suite.start_client( @@ -157,19 +164,19 @@ def client( ) assert hive_client is not None, ( - f"Failed to start multi-test client for pre-allocation group {pre_hash}" + f"Failed to start multi-test client for group {group_identifier}" ) # Register with manager multi_test_client = multi_test_client_manager.get_or_create_multi_test_client( - pre_hash=pre_hash, + group_identifier=group_identifier, client_type=client_type, ) multi_test_client.set_client(hive_client) - logger.info(f"Multi-test client ready for pre-allocation group {pre_hash}") + logger.info(f"Multi-test client ready for group {group_identifier}") try: yield hive_client finally: # Mark test as completed when fixture teardown occurs - multi_test_client_manager.mark_test_completed(pre_hash, test_id) + multi_test_client_manager.mark_test_completed(group_identifier, test_id) diff --git a/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py b/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py index e580cc81dfc..0c9bb840d7c 100644 --- a/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py +++ b/src/pytest_plugins/consume/simulators/simulator_logic/test_via_engine.py @@ -51,63 +51,63 @@ def test_blockchain_via_engine( """ # Determine if we should perform forkchoice updates based on frequency tracker should_perform_fcus = True # Default behavior for engine simulator - pre_hash = None + group_identifier = None if fcu_frequency_tracker is not None and hasattr(fixture, "pre_hash"): # EngineX simulator with forkchoice update frequency control - pre_hash = fixture.pre_hash - should_perform_fcus = fcu_frequency_tracker.should_perform_fcu(pre_hash) + # Use group identifier for tracking (supports both sequential and xdist execution) + from ..helpers.client_wrapper import get_group_identifier_from_request + + group_identifier = get_group_identifier_from_request(request, fixture.pre_hash) + should_perform_fcus = fcu_frequency_tracker.should_perform_fcu(group_identifier) logger.info( - f"Forkchoice update frequency check for pre-allocation group {pre_hash}: " + f"Forkchoice update frequency check for group {group_identifier}: " f"perform_fcu={should_perform_fcus} " f"(frequency={fcu_frequency_tracker.fcu_frequency}, " - f"test_count={fcu_frequency_tracker.get_test_count(pre_hash)})" + f"test_count={fcu_frequency_tracker.get_test_count(group_identifier)})" ) # Always increment the test counter at the start for proper tracking - if fcu_frequency_tracker is not None and pre_hash is not None: - fcu_frequency_tracker.increment_test_count(pre_hash) - # Send a initial forkchoice update - with timing_data.time("Initial forkchoice update"): - logger.info("Sending initial forkchoice update to genesis block...") - delay = 0.5 - for attempt in range(3): - forkchoice_response = engine_rpc.forkchoice_updated( - forkchoice_state=ForkchoiceState( - head_block_hash=genesis_header.block_hash, - ), - payload_attributes=None, - version=fixture.payloads[0].forkchoice_updated_version, - ) - status = forkchoice_response.payload_status.status - logger.info(f"Initial forkchoice update response attempt {attempt + 1}: {status}") - if status != PayloadStatusEnum.SYNCING: - break - if attempt < 2: - time.sleep(delay) - delay *= 2 - - if forkchoice_response.payload_status.status != PayloadStatusEnum.VALID: - logger.error( - f"Client failed to initialize properly after 3 attempts, " - f"final status: {forkchoice_response.payload_status.status}" - ) - raise LoggedError( - f"unexpected status on forkchoice updated to genesis: {forkchoice_response}" - ) - - with timing_data.time("Get genesis block"): - logger.info("Calling getBlockByNumber to get genesis block...") - client_genesis_response = eth_rpc.get_block_by_number(0) - if client_genesis_response["hash"] != str(genesis_header.block_hash): - expected = genesis_header.block_hash - got = client_genesis_response["hash"] - logger.fail(f"Genesis block hash mismatch. Expected: {expected}, Got: {got}") - raise GenesisBlockMismatchExceptionError( - expected_header=genesis_header, - got_genesis_block=client_genesis_response, - ) + if fcu_frequency_tracker is not None and group_identifier is not None: + fcu_frequency_tracker.increment_test_count(group_identifier) + + if not isinstance(fixture, BlockchainEngineXFixture): + # Skip the initial FCU update for enginex simulator + with timing_data.time("Initial forkchoice update"): + logger.info("Sending initial forkchoice update to genesis block...") + delay = 0.5 + for attempt in range(3): + forkchoice_response = engine_rpc.forkchoice_updated( + forkchoice_state=ForkchoiceState( + head_block_hash=genesis_header.block_hash, + ), + payload_attributes=None, + version=fixture.payloads[0].forkchoice_updated_version, + ) + status = forkchoice_response.payload_status.status + logger.info(f"Initial forkchoice update response attempt {attempt + 1}: {status}") + if status != PayloadStatusEnum.SYNCING: + break + if attempt < 2: + time.sleep(delay) + delay *= 2 + if forkchoice_response.payload_status.status != PayloadStatusEnum.VALID: + raise LoggedError( + f"unexpected status on forkchoice updated to genesis: {forkchoice_response}" + ) + + with timing_data.time("Get genesis block"): + logger.info("Calling getBlockByNumber to get genesis block...") + client_genesis_response = eth_rpc.get_block_by_number(0) + if client_genesis_response["hash"] != str(genesis_header.block_hash): + expected = genesis_header.block_hash + got = client_genesis_response["hash"] + logger.fail(f"Genesis block hash mismatch. Expected: {expected}, Got: {got}") + raise GenesisBlockMismatchExceptionError( + expected_header=genesis_header, + got_genesis_block=client_genesis_response, + ) with timing_data.time("Payloads execution") as total_payload_timing: logger.info(f"Starting execution of {len(fixture.payloads)} payloads...") @@ -201,15 +201,15 @@ def test_blockchain_via_engine( elif payload.valid() and not should_perform_fcus: logger.info( f"Skipping forkchoice update for payload {i + 1} due to frequency setting " - f"(pre-allocation group: {pre_hash})" + f"(group: {group_identifier})" ) logger.info("All payloads processed successfully.") # Log final FCU frequency statistics for enginex simulator - if fcu_frequency_tracker is not None and pre_hash is not None: - final_count = fcu_frequency_tracker.get_test_count(pre_hash) + if fcu_frequency_tracker is not None and group_identifier is not None: + final_count = fcu_frequency_tracker.get_test_count(group_identifier) logger.info( - f"Test completed for pre-allocation group {pre_hash}. " + f"Test completed for group {group_identifier}. " f"Total tests in group: {final_count}, " f"FCU frequency: {fcu_frequency_tracker.fcu_frequency}" ) diff --git a/src/pytest_plugins/consume/tests/test_group_identifier_container.py b/src/pytest_plugins/consume/tests/test_group_identifier_container.py new file mode 100644 index 00000000000..3285f4432aa --- /dev/null +++ b/src/pytest_plugins/consume/tests/test_group_identifier_container.py @@ -0,0 +1,319 @@ +""" +Unit tests for test group identifier container cleanup. + +This module tests the container cleanup to ensures proper client lifecycle +management for both sequential and xdist execution modes. + +The test specifically addresses a regression introduced when subgroup splitting was +added for load balancing. Previously, each subgroup would create separate containers +for the same pre-allocation group, leading to container count explosion +(e.g., 24-25 containers instead of the expected 8 with 8 workers). +""" + +from unittest.mock import Mock + +import pytest + +from pytest_plugins.consume.simulators.helpers.client_wrapper import ( + extract_pre_hash_from_group_identifier, + get_group_identifier_from_request, +) + + +class TestGroupIdentifierDetection: + """Test group identifier detection for different execution modes.""" + + def test_sequential_execution_no_xdist_marker(self): + """Test group identifier detection for sequential execution (no xdist marker).""" + # Setup: Mock request with no xdist markers + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[]) + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should use pre_hash directly for sequential execution + assert group_id == pre_hash + + def test_xdist_execution_with_subgroup(self): + """Test group identifier detection for xdist execution with subgroups.""" + # Setup: Mock request with xdist marker containing subgroup + xdist_marker = Mock() + xdist_marker.kwargs = {"name": "0x479393be6619d67f:2"} + + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[xdist_marker]) + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should use xdist group name (with subgroup suffix) + assert group_id == "0x479393be6619d67f:2" + + def test_xdist_execution_without_subgroup(self): + """Test group identifier detection for xdist execution without subgroups.""" + # Setup: Mock request with xdist marker without subgroup + xdist_marker = Mock() + xdist_marker.kwargs = {"name": "0x479393be6619d67f"} + + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[xdist_marker]) + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should use xdist group name (same as pre_hash) + assert group_id == pre_hash + + def test_missing_iter_markers_method(self): + """Test fallback when request.node doesn't have iter_markers method.""" + # Setup: Mock request without iter_markers method + request_mock = Mock() + del request_mock.node.iter_markers # Remove the method + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should fallback to pre_hash + assert group_id == pre_hash + + def test_xdist_marker_without_name_kwargs(self): + """Test handling of xdist marker without proper name kwargs.""" + # Setup: Mock request with malformed xdist marker + xdist_marker = Mock() + xdist_marker.kwargs = {} # No 'name' key + + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[xdist_marker]) + + pre_hash = "0x479393be6619d67f" + + # Execute + group_id = get_group_identifier_from_request(request_mock, pre_hash) + + # Verify: Should fallback to pre_hash + assert group_id == pre_hash + + +class TestPreHashExtraction: + """Test pre_hash extraction from group identifiers.""" + + def test_extract_from_non_subgroup_identifier(self): + """Test extraction from group identifier without subgroup.""" + group_id = "0x479393be6619d67f" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == group_id + + def test_extract_from_subgroup_identifier(self): + """Test extraction from group identifier with subgroup.""" + group_id = "0x479393be6619d67f:2" + expected = "0x479393be6619d67f" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == expected + + def test_extract_with_multiple_colons(self): + """Test extraction with multiple colons (edge case).""" + group_id = "0x479393be6619d67f:2:extra:data" + expected = "0x479393be6619d67f" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == expected + + def test_extract_from_empty_string(self): + """Test extraction from empty string.""" + group_id = "" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == "" + + def test_extract_with_colon_only(self): + """Test extraction with colon only.""" + group_id = ":" + expected = "" + + extracted = extract_pre_hash_from_group_identifier(group_id) + + assert extracted == expected + + +class TestContainerIsolationScenario: + """Test the key scenario that fixes the container cleanup regression.""" + + def test_subgroup_container_isolation(self): + """Test that subgroups get separate container tracking.""" + # Setup: Simulate large pre-allocation group split into subgroups + pre_hash = "0x479393be6619d67f" + subgroups = [f"{pre_hash}:{i}" for i in range(5)] + + # Simulate container creation using group identifiers + containers = {} + for subgroup in subgroups: + container_key = subgroup # Key change: use subgroup as container key + extracted_pre_hash = extract_pre_hash_from_group_identifier(subgroup) + + containers[container_key] = { + "group_identifier": subgroup, + "pre_hash": extracted_pre_hash, + "tests_completed": 0, + "total_tests": 400, + } + + # Verify: Each subgroup gets its own container tracking + assert len(containers) == 5 + + # Verify: All containers reference the same pre-allocation file + for container in containers.values(): + assert container["pre_hash"] == pre_hash + + # Verify: Each container has unique group identifier + group_identifiers = [c["group_identifier"] for c in containers.values()] + assert len(set(group_identifiers)) == 5 # All unique + + def test_subgroup_cleanup_isolation(self): + """Test that subgroup cleanup is isolated to completed groups only.""" + # Setup: Multiple subgroups with different completion states + pre_hash = "0x479393be6619d67f" + containers = { + f"{pre_hash}:0": {"tests_completed": 400, "total_tests": 400}, # Complete + f"{pre_hash}:1": {"tests_completed": 200, "total_tests": 400}, # Partial + f"{pre_hash}:2": {"tests_completed": 0, "total_tests": 400}, # Not started + } + + # Simulate cleanup detection + completed_containers = [ + k for k, v in containers.items() if v["tests_completed"] >= v["total_tests"] + ] + + # Verify: Only completed subgroup is marked for cleanup + assert len(completed_containers) == 1 + assert completed_containers[0] == f"{pre_hash}:0" + + def test_sequential_vs_xdist_behavior(self): + """Test that sequential and xdist modes result in different container strategies.""" + pre_hash = "0x479393be6619d67f" + + # Sequential execution: single container for entire pre-allocation group + sequential_containers = {pre_hash: {"total_tests": 2000}} + + # XDist execution: multiple containers for subgroups + xdist_containers = {f"{pre_hash}:{i}": {"total_tests": 400} for i in range(5)} + + # Verify: Different container strategies + assert len(sequential_containers) == 1 # Single container + assert len(xdist_containers) == 5 # Multiple containers + + # Verify: Same total test count + sequential_total = sum(c["total_tests"] for c in sequential_containers.values()) + xdist_total = sum(c["total_tests"] for c in xdist_containers.values()) + assert sequential_total == xdist_total == 2000 + + +class TestRegressionScenario: + """Test the specific regression scenario that was reported.""" + + def test_container_count_regression_fix(self): + """ + Test that the fix prevents the container count regression. + + Before fix: 8 workers × 3 subgroups = 24-25 containers + After fix: Max 8 containers (1 per worker, different subgroups) + """ + # Setup: Simulate 8 workers with subgroups distributed across them + pre_hash = "0x479393be6619d67f" + num_workers = 8 + + # Before fix: Each worker could create containers for different subgroups + # This would lead to multiple containers per pre_hash across workers + before_fix_containers = {} + for worker in range(num_workers): + for subgroup in range(3): # 3 subgroups + # Old key: pre_hash (same for all subgroups) + # This caused multiple containers for same pre_hash + old_key = pre_hash + container_id = f"worker_{worker}_subgroup_{subgroup}" + before_fix_containers[container_id] = {"key": old_key} + + # After fix: Each subgroup gets unique container key + after_fix_containers = {} + for worker in range(num_workers): + # Each worker handles one subgroup (distributed by xdist) + subgroup = worker % 3 # Distribute subgroups across workers + new_key = f"{pre_hash}:{subgroup}" + container_id = f"worker_{worker}" + after_fix_containers[container_id] = {"key": new_key} + + # Verify: Fix reduces container proliferation + # Before: 24 containers (8 workers × 3 subgroups) + assert len(before_fix_containers) == 24 + + # After: 8 containers (1 per worker) + assert len(after_fix_containers) == 8 + + # Verify: Unique container keys in fixed version + after_fix_keys = [c["key"] for c in after_fix_containers.values()] + unique_keys = set(after_fix_keys) + assert len(unique_keys) <= 3 # At most one container per subgroup + + +@pytest.mark.parametrize( + "execution_mode,expected_containers", + [ + ("sequential", 1), # Single container for entire pre-allocation group + ("xdist_small", 1), # Small group, no splitting needed + ("xdist_large", 5), # Large group, split into 5 subgroups + ], +) +def test_container_strategy_by_execution_mode(execution_mode, expected_containers): + """Test container strategy varies by execution mode and group size.""" + pre_hash = "0x479393be6619d67f" + + if execution_mode == "sequential": + # Sequential: Always use pre_hash as container key + container_keys = [pre_hash] + elif execution_mode == "xdist_small": + # Small xdist group: No subgroup splitting + container_keys = [pre_hash] + elif execution_mode == "xdist_large": + # Large xdist group: Split into subgroups + container_keys = [f"{pre_hash}:{i}" for i in range(5)] + + assert len(container_keys) == expected_containers + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_none_request_handling(self): + """Test handling of None request parameter.""" + with pytest.raises(AttributeError): + get_group_identifier_from_request(None, "0x123") + + def test_empty_pre_hash(self): + """Test handling of empty pre_hash.""" + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[]) + + group_id = get_group_identifier_from_request(request_mock, "") + assert group_id == "" + + def test_none_pre_hash(self): + """Test handling of None pre_hash.""" + request_mock = Mock() + request_mock.node.iter_markers = Mock(return_value=[]) + + group_id = get_group_identifier_from_request(request_mock, None) + assert group_id is None From cdf62c00992278505fc7fe6575fb3d3b7a8baaf6 Mon Sep 17 00:00:00 2001 From: Mario Vega Date: Thu, 3 Jul 2025 22:08:23 +0000 Subject: [PATCH 5/5] fix(consume): Register client with test --- src/pytest_plugins/consume/simulators/multi_test_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/pytest_plugins/consume/simulators/multi_test_client.py b/src/pytest_plugins/consume/simulators/multi_test_client.py index 1b18828e053..51eecc9505c 100644 --- a/src/pytest_plugins/consume/simulators/multi_test_client.py +++ b/src/pytest_plugins/consume/simulators/multi_test_client.py @@ -7,7 +7,7 @@ import pytest from hive.client import Client, ClientType -from hive.testing import HiveTestSuite +from hive.testing import HiveTest, HiveTestSuite from ethereum_test_base_types import to_json from ethereum_test_fixtures import BlockchainEngineXFixture @@ -115,6 +115,7 @@ def genesis_header(pre_alloc_group: PreAllocGroup) -> FixtureHeader: @pytest.fixture(scope="function") def client( test_suite: HiveTestSuite, + hive_test: HiveTest, client_type: ClientType, total_timing_data: TimingData, fixture: BlockchainEngineXFixture, @@ -146,6 +147,7 @@ def client( existing_client = multi_test_client_manager.get_client_for_test(group_identifier, test_id) if existing_client is not None: logger.info(f"Reusing multi-test client for group {group_identifier}") + hive_test.register_shared_client(existing_client) try: yield existing_client finally: @@ -173,6 +175,7 @@ def client( client_type=client_type, ) multi_test_client.set_client(hive_client) + hive_test.register_shared_client(hive_client) logger.info(f"Multi-test client ready for group {group_identifier}") try: