Skip to content

Feature/wss support #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions chainbench/profile/ethereum/subscriptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from chainbench.user import WssJrpcUser
from chainbench.user.protocol.ethereum import EthSubscribe


class EthSubscriptions(WssJrpcUser):
subscriptions = [
EthSubscribe(["newHeads"]),
# logs subscription for approve method signature
EthSubscribe(["logs", {"topics": ["0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"]}]),
EthSubscribe(["newPendingTransactions"]),
]

def get_notification_name(self, parsed_response: dict):
return self.get_subscription(
subscription_id=parsed_response["params"]["subscription"]
).subscribe_rpc_call.params[0]
2 changes: 1 addition & 1 deletion chainbench/profile/solana/get_program_accounts_shark.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from locust import task

from chainbench.user.http import RpcCall
from chainbench.user.jsonrpc import RpcCall
from chainbench.user.protocol.solana import SolanaUser


Expand Down
2 changes: 1 addition & 1 deletion chainbench/profile/solana/get_program_accounts_stake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from locust import task

from chainbench.user.http import RpcCall
from chainbench.user.jsonrpc import RpcCall
from chainbench.user.protocol.solana import SolanaUser


Expand Down
7 changes: 4 additions & 3 deletions chainbench/test_data/blockchain.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import logging
import typing as t
from argparse import Namespace
from dataclasses import dataclass

import orjson as json
from gevent.lock import Semaphore as GeventSemaphore
from orjson.orjson import OPT_SORT_KEYS
from tenacity import retry, stop_after_attempt

from chainbench.util.http import HttpClient
Expand Down Expand Up @@ -46,7 +47,7 @@ class Block:
block_number: BlockNumber

def to_json(self) -> str:
return json.dumps(self.__dict__)
return json.dumps(self.__dict__).decode("utf-8")


@dataclass
Expand Down Expand Up @@ -91,7 +92,7 @@ def __init__(self, size: Size, start: BlockNumber = 0, end: BlockNumber = 0):
self.block_numbers: list[BlockNumber] = []

def to_json(self) -> str:
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)
return json.dumps(self, default=lambda o: o.__dict__, option=OPT_SORT_KEYS).decode("utf-8")

def push_block(self, block: B) -> None:
if block.block_number in self.block_numbers:
Expand Down
2 changes: 1 addition & 1 deletion chainbench/test_data/ethereum.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import logging
import typing as t
from argparse import Namespace
from dataclasses import dataclass

import orjson as json
from tenacity import retry, stop_after_attempt, wait_fixed

from chainbench.test_data.blockchain import (
Expand Down
3 changes: 2 additions & 1 deletion chainbench/test_data/evm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
import logging
import typing as t
from argparse import Namespace
from dataclasses import dataclass

import orjson as json

from chainbench.util.rng import RNG, get_rng

from .blockchain import (
Expand Down
2 changes: 1 addition & 1 deletion chainbench/test_data/solana.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import logging
import typing as t
from argparse import Namespace
from dataclasses import dataclass

import orjson as json
from tenacity import retry, stop_after_attempt

from chainbench.util.rng import RNG, get_rng
Expand Down
3 changes: 2 additions & 1 deletion chainbench/test_data/starknet.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import logging
import typing as t

import orjson as json

from .blockchain import (
Account,
BlockHash,
Expand Down
2 changes: 1 addition & 1 deletion chainbench/tools/discovery/rpc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator

import orjson as json
from tenacity import retry, retry_if_exception_type, wait_exponential

from chainbench.util.http import HttpClient, HttpErrorLevel
Expand Down
7 changes: 5 additions & 2 deletions chainbench/user/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from chainbench.util.event import setup_event_listeners

from .common import get_subclass_tasks
from .http import HttpUser, JsonRpcUser
from .http import HttpUser
from .jsonrpc import JrpcHttpUser
from .wss import WssJrpcUser

# importing plugins here as all profiles depend on it
import locust_plugins # isort: skip # noqa
Expand All @@ -13,8 +15,9 @@
"EthBeaconUser",
"EvmUser",
"HttpUser",
"JsonRpcUser",
"JrpcHttpUser",
"SolanaUser",
"StarkNetUser",
"WssJrpcUser",
"get_subclass_tasks",
]
200 changes: 2 additions & 198 deletions chainbench/user/http.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,11 @@
import json
import logging
import random
import typing as t

from locust import FastHttpUser, TaskSet, tag, task
from locust import FastHttpUser, TaskSet
from locust.contrib.fasthttp import ResponseContextManager

from chainbench.test_data import TestData
from chainbench.util.rng import RNGManager


class RpcCall:
def __init__(self, method: str, params: list[t.Any] | dict | None = None) -> None:
self.method = method
self.params = params


def expand_rpc_calls(rpc_calls_weighted: dict[t.Callable[[], RpcCall], int]) -> list[RpcCall]:
rpc_call_methods_weighted: dict[RpcCall, int] = {}
for rpc_call_method, weight in rpc_calls_weighted.items():
rpc_call_methods_weighted[rpc_call_method()] = weight

expanded_rpc_calls: list[RpcCall] = expand_to_list(rpc_call_methods_weighted)
return expanded_rpc_calls


def expand_to_list(items_weighted: dict[t.Any, int] | list[t.Any | tuple[t.Any, int]]) -> list[t.Any]:
expanded_items_list: list[t.Any] = []
if isinstance(items_weighted, dict):
items_weighted = list(items_weighted.items())

if isinstance(items_weighted, list):
for rpc_call in items_weighted:
if isinstance(rpc_call, tuple):
rpc_call, count = rpc_call
for _ in range(count):
expanded_items_list.append(rpc_call)
else:
expanded_items_list.append(rpc_call)

return expanded_items_list
from chainbench.util.jsonrpc import expand_to_list


class HttpUser(FastHttpUser):
Expand All @@ -48,7 +14,6 @@ class HttpUser(FastHttpUser):
abstract = True
test_data: TestData = TestData()
logger = logging.getLogger(__name__)
rng = RNGManager()

connection_timeout = 120
network_timeout = 360
Expand Down Expand Up @@ -119,164 +84,3 @@ def get(self, name: str, params: t.Optional[dict] = None, path: str = "") -> Res
with self.client.request("GET", path, params=params, name=name, catch_response=True) as response:
self.check_http_error(response)
return response


class JsonRpcUser(HttpUser):
"""Extension of HttpUser to provide JsonRPC support."""

abstract = True
rpc_path = ""
rpc_error_code_exclusions: list[int] = []
rpc_calls: dict[t.Callable, int] = {} # To be populated in the subclass load profile
calls_per_batch = 10 # default requests to include in a batch request

def __init__(self, environment: t.Any):
self.calls_per_batch = environment.parsed_options.batch_size
super().__init__(environment)

@tag("single")
@task
def rpc_call_task(self) -> None:
self.method_to_task_function(self.environment.parsed_options.method)(self)

@tag("batch")
@task
def batch_rpc_call_task(self) -> None:
rpc_calls = {getattr(self, method.__name__): weight for method, weight in self.rpc_calls.items()}
self.make_random_batch_rpc_call(
rpc_calls,
calls_per_batch=self.calls_per_batch,
)

@tag("batch_single")
@task
def batch_single_rpc_call_task(self) -> None:
rpc_call: RpcCall = self.method_to_rpc_call(self.environment.parsed_options.method)(self)
rpc_calls = [rpc_call for _ in range(self.calls_per_batch)]
self.make_batch_rpc_call(
rpc_calls,
)

@classmethod
def method_to_rpc_call(cls, method: str) -> t.Callable:
method_name = cls.method_to_function_name(method)
return getattr(cls, method_name)

def check_json_rpc_response(self, response: ResponseContextManager, name: str) -> None:
CHUNK_SIZE = 1024
if response.text is None:
self.logger.error(f"Response for {name} is empty")
response.failure(f"Response for {name} is empty")
return
data = response.text[:CHUNK_SIZE]
if "jsonrpc" not in data:
self.logger.error(f"Response for {name} is not a JSON-RPC: {response.text}")
response.failure(f"Response for {name} is not a JSON-RPC")
return

if "error" in data:
response_js: list | dict = response.json()
if isinstance(response_js, dict):
response_js = [response_js]
if isinstance(response_js, list):
for response_js_item in response_js:
if "error" in response_js_item:
if "code" in response_js_item["error"]:
self.logger.error(f"Response for {name} has a JSON-RPC error: {response.text}")
if response_js_item["error"]["code"] not in self.rpc_error_code_exclusions:
response.failure(
f"Response for {name} has a JSON-RPC error {response_js_item['error']['code']} - "
f"{response_js_item['error']['message']}"
)
return
response.failure("Unspecified JSON-RPC error")
self.logger.error(f"Unspecified JSON-RPC error: {response.text}")
return
# TODO: handle multiple errors in batch response properly

if "result" not in data:
response.failure(f"Response for {name} call has no result")
self.logger.error(f"Response for {name} call has no result: {response.text}")

def make_rpc_call(
self,
rpc_call: RpcCall | None = None,
method: str | None = None,
params: list[t.Any] | dict | None = None,
name: str = "",
path: str = "",
) -> None:
"""Make a JSON-RPC call."""
if rpc_call is not None:
method = rpc_call.method
params = rpc_call.params

if name == "" and method is not None:
name = method

with self.client.request(
"POST", self.rpc_path + path, json=generate_request_body(method, params), name=name, catch_response=True
) as response:
self.check_http_error(response)
self.check_json_rpc_response(response, name=name)

def make_batch_rpc_call(self, rpc_calls: list[RpcCall], name: str = "", path: str = "") -> None:
"""Make a Batch JSON-RPC call."""

if name == "":
name = f"Batch RPC ({len(rpc_calls)})"

headers = {"Content-Type": "application/json", "accept": "application/json"}

with self.client.request(
"POST",
self.rpc_path + path,
data=generate_batch_request_body(rpc_calls),
name=name,
catch_response=True,
headers=headers,
) as response:
self.check_http_error(response)
self.check_json_rpc_response(response, name=name)

def make_random_batch_rpc_call(
self,
weighted_rpc_calls: dict[t.Callable[[], RpcCall], int],
calls_per_batch: int,
name: str = "",
path: str = "",
) -> None:
"""Make a Batch JSON-RPC call."""
rpc_calls: list[RpcCall] = expand_rpc_calls(weighted_rpc_calls)
random_rpc_calls: list[RpcCall] = random.choices(rpc_calls, k=calls_per_batch)

self.make_batch_rpc_call(random_rpc_calls, name=name, path=path)


def generate_request_body(
method: str | None = None, params: list | dict | None = None, request_id: int | None = None, version: str = "2.0"
) -> dict:
"""Generate a JSON-RPC request body."""

if params is None:
params = []

if request_id is None:
request_id = random.randint(1, 100000000)

return {
"jsonrpc": version,
"method": method,
"params": params,
"id": request_id,
}


def generate_batch_request_body(rpc_calls: list[RpcCall], version: str = "2.0") -> str:
"""Generate a batch JSON-RPC request body."""
return json.dumps(
[
generate_request_body(rpc_calls[i].method, rpc_calls[i].params, request_id=i, version=version)
for i in range(1, len(rpc_calls))
]
)
Loading