Skip to content

Commit d6c1188

Browse files
committed
improve test data init efficiency
- send test data block by block to each worker as the master runner receives them
1 parent 8541fa4 commit d6c1188

File tree

2 files changed

+73
-67
lines changed

2 files changed

+73
-67
lines changed

chainbench/test_data/blockchain.py

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def __init__(self) -> None:
125125
self._logger.debug("Locked")
126126

127127
def init_http_client(self, host_url: str) -> None:
128-
self._client = HttpClient(host_url)
128+
self._client = HttpClient(host_url, timeout=60)
129129
self._logger.debug("Host: %s", host_url)
130130

131131
@property
@@ -144,16 +144,18 @@ def client(self) -> HttpClient:
144144
raise RuntimeError("HTTP Client is not initialized")
145145
return self._client
146146

147+
def release_lock(self) -> None:
148+
self._logger.debug("Releasing lock")
149+
self._lock.release()
150+
self._logger.debug("Lock released")
151+
147152
def close(self) -> None:
148153
if self._client is not None:
149154
self._client.close()
150155

151156
def wait(self) -> None:
152157
self._lock.wait()
153158

154-
def is_valid_block(self, block: B) -> bool:
155-
raise NotImplementedError
156-
157159
def fetch_block(self, block_number: BlockNumber) -> B:
158160
raise NotImplementedError
159161

@@ -162,7 +164,7 @@ def fetch_latest_block(self) -> B:
162164
raise NotImplementedError
163165

164166
@retry(reraise=True, stop=stop_after_attempt(5))
165-
def _fetch_random_block(self, block_numbers: list[BlockNumber]) -> B:
167+
def fetch_random_block(self, block_numbers: list[BlockNumber]) -> B:
166168
rng = get_rng()
167169
while True:
168170
block_number = self.data.block_range.get_random_block_number(rng)
@@ -173,62 +175,21 @@ def _fetch_random_block(self, block_numbers: list[BlockNumber]) -> B:
173175
def _get_start_and_end_blocks(self, parsed_options: Namespace) -> BlockRange:
174176
raise NotImplementedError
175177

176-
def _get_data_from_blockchain(self, parsed_options: Namespace) -> None:
178+
def get_block_from_data(self, data: dict[str, t.Any] | str) -> B:
179+
raise NotImplementedError
180+
181+
def init_data(self, parsed_options: Namespace) -> None:
177182
size: Size = Sizes.get_size(parsed_options.size) if parsed_options.size != "None" else self.DEFAULT_SIZE
178183
print(f"Test data size: {size.label}")
179184
self._logger.info(f"Test data size: {size.label}")
180185
self._data = BlockchainData(size)
181186
self.data.block_range = self._get_start_and_end_blocks(parsed_options)
182187

183-
if parsed_options.use_latest_blocks:
184-
print(f"Using latest {size.blocks_len} blocks as test data")
185-
self._logger.info(f"Using latest {size.blocks_len} blocks as test data")
186-
for block_number in range(self.data.block_range.start, self.data.block_range.end + 1):
187-
try:
188-
block = self.fetch_block(block_number)
189-
except (BlockNotFoundError, InvalidBlockError):
190-
block = self.fetch_latest_block()
191-
self.data.push_block(block)
192-
print(self.data.stats(), end="\r")
193-
else:
194-
print(self.data.stats(), end="\r")
195-
print("\n") # new line after progress display upon exiting loop
196-
else:
197-
while size.blocks_len > len(self.data.blocks):
198-
try:
199-
block = self._fetch_random_block(self.data.block_numbers)
200-
except (BlockNotFoundError, InvalidBlockError):
201-
continue
202-
self.data.push_block(block)
203-
print(self.data.stats(), end="\r")
204-
else:
205-
print(self.data.stats(), end="\r")
206-
print("\n") # new line after progress display upon exiting loop
207-
208-
def get_block_from_data(self, data: dict[str, t.Any] | str) -> B:
209-
raise NotImplementedError
210-
211-
def _get_data_from_json(self, json_data: str) -> None:
188+
def init_data_from_json(self, json_data: str) -> None:
212189
data: dict[str, t.Any] = json.loads(json_data)
213190
size = Size(**data["size"])
214191
self._data = BlockchainData(size)
215192
self.data.block_range = BlockRange(**data["block_range"])
216-
self.data.blocks = [self.get_block_from_data(block) for block in data["blocks"]]
217-
self.data.block_numbers = data["block_numbers"]
218-
219-
def _update_data(self, init_function: t.Callable, *args, **kwargs) -> None:
220-
self._logger.info("Updating data")
221-
init_function(*args, **kwargs)
222-
self._logger.debug("Data: %s", self.data.to_json())
223-
self._logger.info("Data updated. Releasing lock")
224-
self._lock.release()
225-
self._logger.info("Lock released")
226-
227-
def init_data_from_blockchain(self, parsed_options: Namespace) -> None:
228-
self._update_data(self._get_data_from_blockchain, parsed_options)
229-
230-
def init_data_from_json(self, json_data: str) -> None:
231-
self._update_data(self._get_data_from_json, json_data)
232193

233194
@staticmethod
234195
def get_random_bool(rng: RNG | None = None) -> bool:

chainbench/util/event.py

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ def cli_custom_arguments(parser: LocustArgumentParser):
4646
)
4747

4848

49+
def send_msg_to_workers(master_runner: MasterRunner, msg_type: str, data: dict[str, t.Any]):
50+
for i, worker in enumerate(master_runner.clients):
51+
master_runner.send_message(msg_type, {"data": (data, i)}, worker)
52+
logger.debug(f"{msg_type} sent to worker {i}")
53+
54+
4955
def setup_test_data(environment: Environment, msg: Message, **kwargs):
5056
# Fired when the worker receives a message of type 'test_data'
5157
test_data: dict[str, t.Any] = msg.data["data"][0]
@@ -61,15 +67,25 @@ def setup_test_data(environment: Environment, msg: Message, **kwargs):
6167
user.test_data.init_network(chain_id)
6268
else:
6369
raise AttributeError(f"{user} class does not have 'test_data' attribute")
64-
environment.runner.send_message("acknowledge_data", {"data": f"Test data received by worker {worker_index}"})
65-
logger.info("Test Data received from master")
70+
environment.runner.send_message(
71+
"acknowledge_data", {"data": f"Initial test data received by worker {worker_index}"}
72+
)
73+
logger.info("Initial test data received from master")
6674

6775

6876
def on_acknowledge(msg: Message, **kwargs):
6977
# Fired when the master receives a message of type 'acknowledge_data'
7078
print(msg.data["data"])
7179

7280

81+
def on_release(environment: Environment, msg: Message, **kwargs):
82+
# Fired when the worker receives a message of type 'release_lock'
83+
if isinstance(environment.runner, WorkerRunner):
84+
for user in environment.runner.user_classes:
85+
if hasattr(user, "test_data"):
86+
user.test_data.release_lock()
87+
88+
7389
def get_block_worker(master_runner: MasterRunner):
7490
logger.info("Getting blocks in real time.")
7591
active_users: list[t.Type[User]] = []
@@ -104,15 +120,13 @@ def get_block_worker(master_runner: MasterRunner):
104120
blocks[test_data_class_name] = block.to_json()
105121
print(f"Block {block.block_number} fetched and updated in test data")
106122
if len(blocks) > 0:
107-
for i, worker in enumerate(master_runner.clients):
108-
master_runner.send_message("block_data", {"data": blocks}, worker)
109-
logger.info(f"Block data is sent to worker {i}")
123+
send_msg_to_workers(master_runner, "block_data", blocks)
110124
time.sleep(1)
111125

112126

113127
def on_receive_block(environment: Environment, msg: Message, **kwargs):
114128
# Fired when the worker receives a message of type 'block_data'
115-
blocks: dict[str, t.Any] = msg.data["data"]
129+
blocks: dict[str, t.Any] = msg.data["data"][0]
116130

117131
if isinstance(environment.runner, WorkerRunner):
118132
for user in environment.runner.user_classes:
@@ -121,7 +135,7 @@ def on_receive_block(environment: Environment, msg: Message, **kwargs):
121135
block: Block = user.test_data.get_block_from_data(blocks[test_data_class_name])
122136
if block.block_number not in user.test_data.data.block_numbers:
123137
user.test_data.data.push_block(block)
124-
logger.debug(f"Block {block.block_number} received from master and updated in test data")
138+
logger.info(f"Block {block.block_number} received from master and updated in test data")
125139

126140

127141
# Listener for the init event
@@ -138,8 +152,7 @@ def on_init(environment: Environment, **_kwargs):
138152
logger.info("I'm a worker. Running tests for %s", host_under_test)
139153
environment.runner.register_message("test_data", setup_test_data)
140154
environment.runner.register_message("block_data", on_receive_block)
141-
142-
test_data: dict[str, t.Any] = {}
155+
environment.runner.register_message("release_lock", on_release)
143156

144157
if isinstance(environment.runner, MasterRunner):
145158
# Print master details to the log
@@ -156,6 +169,7 @@ def on_init(environment: Environment, **_kwargs):
156169
raise exit(1)
157170
time.sleep(1)
158171
try:
172+
test_data: dict[str, t.Any] = {}
159173
for user in environment.runner.user_classes:
160174
if not hasattr(user, "test_data"):
161175
raise AttributeError(f"{user} class does not have 'test_data' attribute")
@@ -174,19 +188,50 @@ def on_init(environment: Environment, **_kwargs):
174188
print(f"Target endpoint network is {user_test_data.network.name}")
175189
test_data["chain_id"] = {test_data_class_name: chain_id}
176190
if environment.parsed_options:
177-
user_test_data.init_data_from_blockchain(environment.parsed_options)
191+
user_test_data.init_data(environment.parsed_options)
178192
test_data[test_data_class_name] = user_test_data.data.to_json()
193+
send_msg_to_workers(environment.runner, "test_data", test_data)
194+
print("Fetching blocks...")
195+
if environment.parsed_options.use_latest_blocks:
196+
print(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data")
197+
logger.info(f"Using latest {user_test_data.data.size.blocks_len} blocks as test data")
198+
for block_number in range(
199+
user_test_data.data.block_range.start, user_test_data.data.block_range.end + 1
200+
):
201+
try:
202+
block = user_test_data.fetch_block(block_number)
203+
except (BlockNotFoundError, InvalidBlockError):
204+
block = user_test_data.fetch_latest_block()
205+
user_test_data.data.push_block(block)
206+
block_data = {test_data_class_name: block.to_json()}
207+
send_msg_to_workers(environment.runner, "block_data", block_data)
208+
print(user_test_data.data.stats(), end="\r")
209+
else:
210+
print(user_test_data.data.stats(), end="\r")
211+
print("\n") # new line after progress display upon exiting loop
212+
else:
213+
while user_test_data.data.size.blocks_len > len(user_test_data.data.blocks):
214+
try:
215+
block = user_test_data.fetch_random_block(user_test_data.data.block_numbers)
216+
except (BlockNotFoundError, InvalidBlockError):
217+
continue
218+
user_test_data.data.push_block(block)
219+
block_data = {test_data_class_name: block.to_json()}
220+
send_msg_to_workers(environment.runner, "block_data", block_data)
221+
print(user_test_data.data.stats(), end="\r")
222+
else:
223+
print(user_test_data.data.stats(), end="\r")
224+
print("\n") # new line after progress display upon exiting loop
225+
logger.info("Test data is ready")
226+
send_msg_to_workers(environment.runner, "release_lock", {})
227+
user_test_data.release_lock()
179228
except Exception as e:
180-
logger.error(f"Failed to update test data: {e.__class__.__name__}: {e}. Exiting...")
181-
print(f"Failed to update test data:\n {e.__class__.__name__}: {e}. Exiting...")
229+
logger.error(f"Failed to init test data: {e.__class__.__name__}: {e}. Exiting...")
230+
print(f"Failed to init test data:\n {e.__class__.__name__}: {e}. Exiting...")
182231
logger.debug(traceback.format_exc())
183232
environment.runner.quit()
184233
raise exit(1)
185234
else:
186-
logger.info("Test data is ready")
187-
for i, worker in enumerate(environment.runner.clients):
188-
environment.runner.send_message("test_data", {"data": (test_data, i)}, worker)
189-
logger.info(f"Test data is sent to worker {i}")
190235
if environment.web_ui:
191236
print(f"Web UI started at: " f"http://{environment.runner.master_bind_host}:8089")
192237
logger.info(f"Web UI started at: " f"http://{environment.runner.master_bind_host}:8089")

0 commit comments

Comments
 (0)