Skip to content

feat(consume): add consume enginex simulator #1765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/cli/pytest_commands/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ def get_command_logic_test_paths(command_name: str, is_hive: bool) -> List[Path]
command_logic_test_paths = [
base_path / "simulators" / "simulator_logic" / f"test_via_{cmd}.py" for cmd in commands
]
elif command_name in ["engine", "rlp"]:
elif command_name in ["engine", "enginex"]:
command_logic_test_paths = [
base_path / "simulators" / "simulator_logic" / f"test_via_{command_name}.py"
base_path / "simulators" / "simulator_logic" / "test_via_engine.py"
]
elif command_name == "rlp":
command_logic_test_paths = [
base_path / "simulators" / "simulator_logic" / "test_via_rlp.py"
]
elif command_name == "direct":
command_logic_test_paths = [base_path / "direct" / "test_via_direct.py"]
Expand Down Expand Up @@ -103,13 +107,19 @@ def rlp() -> None:

@consume_command(is_hive=True)
def engine() -> None:
"""Client consumes via the Engine API."""
"""Client consumes Engine Fixtures via the Engine API."""
pass


@consume_command(is_hive=True)
def enginex() -> None:
"""Client consumes Engine X Fixtures via the Engine API."""
pass


@consume_command(is_hive=True)
def hive() -> None:
"""Client consumes via all available hive methods (rlp, engine)."""
"""Client consumes via rlp & engine hive methods."""
pass


Expand Down
6 changes: 6 additions & 0 deletions src/cli/pytest_commands/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ def process_args(self, args: List[str]) -> List[str]:

if self.command_name == "engine":
modified_args.extend(["-p", "pytest_plugins.consume.simulators.engine.conftest"])
elif self.command_name == "enginex":
modified_args.extend(["-p", "pytest_plugins.consume.simulators.enginex.conftest"])
if (
self._has_parallelism_flag(args) or "-n" in modified_args
) and "--dist" not in modified_args:
modified_args.extend(["--dist=loadgroup"])
elif self.command_name == "rlp":
modified_args.extend(["-p", "pytest_plugins.consume.simulators.rlp.conftest"])
else:
Expand Down
203 changes: 194 additions & 9 deletions src/pytest_plugins/consume/consume.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
"""A pytest plugin providing common functionality for consuming test fixtures."""
"""
A pytest plugin providing common functionality for consuming test fixtures.

Features:
- Downloads and caches test fixtures from various sources (local, URL, release).
- Manages test case generation from fixture files.
- Provides xdist load balancing for large pre-allocation groups (enginex simulator).
"""

import logging
import re
import sys
import tarfile
from dataclasses import dataclass
from io import BytesIO
from pathlib import Path
from typing import List, Optional, Tuple
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse

import platformdirs
Expand All @@ -22,11 +30,118 @@

from .releases import ReleaseTag, get_release_page_url, get_release_url, is_release_url, is_url

logger = logging.getLogger(__name__)

CACHED_DOWNLOADS_DIRECTORY = (
Path(platformdirs.user_cache_dir("ethereum-execution-spec-tests")) / "cached_downloads"
)


class XDistGroupMapper:
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very happy with this here already, but might be nice to consider moving it to its own file in helpers!

Maps test cases to xdist groups, splitting large pre-allocation groups into sub-groups.

This class helps improve load balancing when using pytest-xdist with --dist=loadgroup
by breaking up large pre-allocation groups (e.g., 1000+ tests) into smaller virtual
sub-groups while maintaining the constraint that tests from the same pre-allocation
group must run on the same worker.
"""

def __init__(self, max_group_size: int = 100):
"""Initialize the mapper with a maximum group size."""
self.max_group_size = max_group_size
self.group_sizes: Dict[str, int] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be nice to add another check here for max group size, re division by zero. Even though the flag checks this.

self.test_to_subgroup: Dict[str, int] = {}
self._built = False

def build_mapping(self, test_cases: TestCases) -> None:
"""
Build the mapping of test cases to sub-groups.

This analyzes all test cases and determines which pre-allocation groups
need to be split into sub-groups based on the max_group_size.
"""
if self._built:
return

# Count tests per pre-allocation group
for test_case in test_cases:
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
pre_hash = test_case.pre_hash
self.group_sizes[pre_hash] = self.group_sizes.get(pre_hash, 0) + 1

# Assign sub-groups for large groups
group_counters: Dict[str, int] = {}
for test_case in test_cases:
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
pre_hash = test_case.pre_hash
group_size = self.group_sizes[pre_hash]

if group_size <= self.max_group_size:
# Small group, no sub-group needed
self.test_to_subgroup[test_case.id] = 0
else:
# Large group, assign to sub-group using round-robin
counter = group_counters.get(pre_hash, 0)
sub_group = counter // self.max_group_size
self.test_to_subgroup[test_case.id] = sub_group
group_counters[pre_hash] = counter + 1

self._built = True

# Log summary of large groups
large_groups = [
(pre_hash, size)
for pre_hash, size in self.group_sizes.items()
if size > self.max_group_size
]
if large_groups:
logger.info(
f"Found {len(large_groups)} pre-allocation groups larger than "
f"{self.max_group_size} tests that will be split for better load balancing"
)

def get_xdist_group_name(self, test_case) -> str:
"""
Get the xdist group name for a test case.

For small groups, returns the pre_hash as-is.
For large groups, returns "{pre_hash}:{sub_group_index}".
"""
if not hasattr(test_case, "pre_hash") or not test_case.pre_hash:
# No pre_hash, use test ID as fallback
return test_case.id

pre_hash = test_case.pre_hash
group_size = self.group_sizes.get(pre_hash, 0)

if group_size <= self.max_group_size:
# Small group, use pre_hash as-is
return pre_hash

# Large group, include sub-group index
sub_group = self.test_to_subgroup.get(test_case.id, 0)
return f"{pre_hash}:{sub_group}"

def get_split_statistics(self) -> Dict[str, Dict[str, int]]:
"""
Get statistics about how groups were split.

Returns a dict with information about each pre-allocation group
and how many sub-groups it was split into.
"""
stats = {}
for pre_hash, size in self.group_sizes.items():
if size > self.max_group_size:
num_subgroups = (size + self.max_group_size - 1) // self.max_group_size
stats[pre_hash] = {
"total_tests": size,
"num_subgroups": num_subgroups,
"average_tests_per_subgroup": size // num_subgroups,
}
return stats


def default_input() -> str:
"""
Directory (default) to consume generated test fixtures from. Defined as a
Expand Down Expand Up @@ -419,6 +534,28 @@ def pytest_configure(config): # noqa: D103
index = IndexFile.model_validate_json(index_file.read_text())
config.test_cases = index.test_cases

# Create XDistGroupMapper for enginex simulator if enginex options are present
try:
max_group_size = config.getoption("--enginex-max-group-size", None)
if max_group_size is not None:
config.xdist_group_mapper = XDistGroupMapper(max_group_size)
config.xdist_group_mapper.build_mapping(config.test_cases)

split_stats = config.xdist_group_mapper.get_split_statistics()
if split_stats and config.option.verbose >= 1:
rich.print("[bold yellow]Pre-allocation group splitting for load balancing:[/]")
for pre_hash, stats in split_stats.items():
rich.print(
f" Group {pre_hash[:8]}: {stats['total_tests']} tests → "
f"{stats['num_subgroups']} sub-groups "
f"(~{stats['average_tests_per_subgroup']} tests each)"
)
rich.print(f" Max group size: {max_group_size}")
else:
config.xdist_group_mapper = None
except ValueError:
config.xdist_group_mapper = None

for fixture_format in BaseFixture.formats.values():
config.addinivalue_line(
"markers",
Expand Down Expand Up @@ -485,22 +622,70 @@ def pytest_generate_tests(metafunc):
"""
Generate test cases for every test fixture in all the JSON fixture files
within the specified fixtures directory, or read from stdin if the directory is 'stdin'.

This function only applies to the test_blockchain_via_engine test function
to avoid conflicts with other consume simulators.
"""
if "cache" in sys.argv:
return

# Only apply to functions that have a 'test_case' parameter (consume test functions)
if "test_case" not in metafunc.fixturenames:
return

test_cases = metafunc.config.test_cases
xdist_group_mapper = getattr(metafunc.config, "xdist_group_mapper", None)
param_list = []

# Check if this is an enginex simulator (has enginex-specific enhancements)
is_enginex_function = (
hasattr(metafunc.config, "_supported_fixture_formats")
and "blockchain_test_engine_x" in metafunc.config._supported_fixture_formats
)
for test_case in test_cases:
if test_case.format.format_name not in metafunc.config._supported_fixture_formats:
# Check if _supported_fixture_formats is set, if not allow all formats
supported_formats = getattr(metafunc.config, "_supported_fixture_formats", None)
if supported_formats and test_case.format.format_name not in supported_formats:
continue

fork_markers = get_relative_fork_markers(test_case.fork, strict_mode=False)
param = pytest.param(
test_case,
id=test_case.id,
marks=[getattr(pytest.mark, m) for m in fork_markers]
+ [getattr(pytest.mark, test_case.format.format_name)],
)

# Basic test ID and markers (used by all consume tests)
test_id = test_case.id
markers = [getattr(pytest.mark, m) for m in fork_markers] + [
getattr(pytest.mark, test_case.format.format_name)
]

# Apply enginex-specific enhancements only for enginex functions
if is_enginex_function:
# Determine xdist group name for enginex load balancing
if xdist_group_mapper and hasattr(test_case, "pre_hash") and test_case.pre_hash:
# Use the mapper to get potentially split group name
xdist_group_name = xdist_group_mapper.get_xdist_group_name(test_case)
elif hasattr(test_case, "pre_hash") and test_case.pre_hash:
# No mapper or not enginex, use pre_hash directly
xdist_group_name = test_case.pre_hash
else:
# No pre_hash, use test ID
xdist_group_name = test_case.id

# Create enhanced test ID showing the xdist group name for easier identification
if hasattr(test_case, "pre_hash") and test_case.pre_hash:
# Show first 8 chars of xdist group name (includes sub-group if split)
group_display = (
xdist_group_name[:8] if len(xdist_group_name) > 8 else xdist_group_name
)
# If it's a split group (contains ':'), show that clearly
if ":" in xdist_group_name:
# Extract sub-group number for display
pre_hash_part, sub_group = xdist_group_name.split(":", 1)
group_display = f"{pre_hash_part[:8]}:{sub_group}"
test_id = f"{test_case.id}[{group_display}]"

# Add xdist group marker for load balancing
markers.append(pytest.mark.xdist_group(name=xdist_group_name))

param = pytest.param(test_case, id=test_id, marks=markers)
param_list.append(param)

metafunc.parametrize("test_case", param_list)
Expand Down
1 change: 0 additions & 1 deletion src/pytest_plugins/consume/hive_engine_test/__init__.py

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion src/pytest_plugins/consume/simulators/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""Consume Engine test functions."""
"""The consume engine simulator."""
30 changes: 13 additions & 17 deletions src/pytest_plugins/consume/simulators/engine/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
"""
Pytest fixtures for the `consume engine` simulator.

Configures the hive back-end & EL clients for each individual test execution.
"""
"""Pytest plugin for the `consume engine` simulator."""

import io
from typing import Mapping
Expand All @@ -29,6 +25,18 @@ def pytest_configure(config):
config._supported_fixture_formats = [BlockchainEngineFixture.format_name]


@pytest.fixture(scope="module")
def test_suite_name() -> str:
"""The name of the hive test suite used in this simulator."""
return "eest/consume-engine"


@pytest.fixture(scope="module")
def test_suite_description() -> str:
"""The description of the hive test suite used in this simulator."""
return "Execute blockchain tests against clients using the Engine API."


@pytest.fixture(scope="function")
def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) -> EngineRPC:
"""Initialize engine RPC client for the execution client under test."""
Expand All @@ -42,18 +50,6 @@ def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None)
return EngineRPC(f"http://{client.ip}:8551")


@pytest.fixture(scope="module")
def test_suite_name() -> str:
"""The name of the hive test suite used in this simulator."""
return "eest/consume-engine"


@pytest.fixture(scope="module")
def test_suite_description() -> str:
"""The description of the hive test suite used in this simulator."""
return "Execute blockchain tests against clients using the Engine API."


@pytest.fixture(scope="function")
def client_files(buffered_genesis: io.BufferedReader) -> Mapping[str, io.BufferedReader]:
"""Define the files that hive will start the client with."""
Expand Down
1 change: 1 addition & 0 deletions src/pytest_plugins/consume/simulators/enginex/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""The consume enginex simulator."""
Loading
Loading