Skip to content

Commit 3731102

Browse files
committed
Simplify BatchExecutor and its factory
1 parent 52b0871 commit 3731102

File tree

21 files changed

+110
-331
lines changed

21 files changed

+110
-331
lines changed

core/lib/vm_executor/src/batch/executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::{
77
task::JoinHandle,
88
};
99
use zksync_multivm::interface::{
10-
executor::{BatchExecutor, StandardOutputs},
10+
executor::BatchExecutor,
1111
storage::{ReadStorage, StorageView},
1212
BatchTransactionExecutionResult, FinishedL1Batch, L2BlockEnv,
1313
};
@@ -68,7 +68,7 @@ impl<S: ReadStorage> MainBatchExecutor<S> {
6868
}
6969

7070
#[async_trait]
71-
impl<S> BatchExecutor<StandardOutputs<S>> for MainBatchExecutor<S>
71+
impl<S> BatchExecutor<S> for MainBatchExecutor<S>
7272
where
7373
S: ReadStorage + Send + 'static,
7474
{

core/lib/vm_executor/src/batch/factory.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use once_cell::sync::OnceCell;
55
use tokio::sync::mpsc;
66
use zksync_multivm::{
77
interface::{
8-
executor::{BatchExecutorFactory, StandardOutputs},
8+
executor::{BatchExecutor, BatchExecutorFactory},
99
storage::{ReadStorage, StorageView},
1010
BatchTransactionExecutionResult, ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv,
1111
L2BlockEnv, SystemEnv, VmInterface, VmInterfaceHistoryEnabled,
@@ -57,15 +57,12 @@ impl MainBatchExecutorFactory {
5757
}
5858

5959
impl<S: ReadStorage + Send + 'static> BatchExecutorFactory<S> for MainBatchExecutorFactory {
60-
type Outputs = StandardOutputs<S>;
61-
type Executor = MainBatchExecutor<S>;
62-
6360
fn init_batch(
6461
&mut self,
6562
storage: S,
6663
l1_batch_params: L1BatchEnv,
6764
system_env: SystemEnv,
68-
) -> Box<Self::Executor> {
65+
) -> Box<dyn BatchExecutor<S>> {
6966
// Since we process `BatchExecutor` commands one-by-one (the next command is never enqueued
7067
// until a previous command is processed), capacity 1 is enough for the commands channel.
7168
let (commands_sender, commands_receiver) = mpsc::channel(1);

core/lib/vm_interface/src/executor.rs

Lines changed: 8 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! High-level executor traits.
22
3-
use std::{fmt, marker::PhantomData};
3+
use std::fmt;
44

55
use async_trait::async_trait;
66
use zksync_types::Transaction;
@@ -10,65 +10,28 @@ use crate::{
1010
SystemEnv,
1111
};
1212

13-
/// [`BatchExecutor`] outputs.
14-
pub trait BatchExecutorOutputs {
15-
/// Output from processing a single transaction in a batch.
16-
type Tx: 'static + Send;
17-
/// Output from finalizing a batch.
18-
type Batch: 'static + Send;
19-
}
20-
21-
/// Marker type for "standard" batch executor outputs.
22-
#[derive(Debug)]
23-
pub struct StandardOutputs<S>(PhantomData<S>);
24-
25-
impl<S: Send + 'static> BatchExecutorOutputs for StandardOutputs<S> {
26-
type Tx = BatchTransactionExecutionResult;
27-
type Batch = (FinishedL1Batch, StorageView<S>);
28-
}
29-
3013
/// Factory of [`BatchExecutor`]s.
3114
pub trait BatchExecutorFactory<S: Send + 'static>: 'static + Send + fmt::Debug {
32-
/// Outputs produced by executors instantiated by this factory.
33-
type Outputs: BatchExecutorOutputs;
34-
/// Executor instantiated by this factory.
35-
type Executor: BatchExecutor<Self::Outputs> + ?Sized;
36-
3715
/// Initializes an executor for a batch with the specified params and using the provided storage.
3816
fn init_batch(
3917
&mut self,
4018
storage: S,
4119
l1_batch_params: L1BatchEnv,
4220
system_env: SystemEnv,
43-
) -> Box<Self::Executor>;
44-
}
45-
46-
impl<S, T> BatchExecutorFactory<S> for Box<T>
47-
where
48-
S: Send + 'static,
49-
T: BatchExecutorFactory<S> + ?Sized,
50-
{
51-
type Outputs = T::Outputs;
52-
type Executor = T::Executor;
53-
54-
fn init_batch(
55-
&mut self,
56-
storage: S,
57-
l1_batch_params: L1BatchEnv,
58-
system_env: SystemEnv,
59-
) -> Box<Self::Executor> {
60-
(**self).init_batch(storage, l1_batch_params, system_env)
61-
}
21+
) -> Box<dyn BatchExecutor<S>>;
6222
}
6323

6424
/// Handle for executing a single L1 batch.
6525
///
6626
/// The handle is parametric by the transaction execution output in order to be able to represent different
6727
/// levels of abstraction.
6828
#[async_trait]
69-
pub trait BatchExecutor<Out: BatchExecutorOutputs>: 'static + Send + fmt::Debug {
29+
pub trait BatchExecutor<S>: 'static + Send + fmt::Debug {
7030
/// Executes a transaction.
71-
async fn execute_tx(&mut self, tx: Transaction) -> anyhow::Result<Out::Tx>;
31+
async fn execute_tx(
32+
&mut self,
33+
tx: Transaction,
34+
) -> anyhow::Result<BatchTransactionExecutionResult>;
7235

7336
/// Rolls back the last executed transaction.
7437
async fn rollback_last_tx(&mut self) -> anyhow::Result<()>;
@@ -77,48 +40,5 @@ pub trait BatchExecutor<Out: BatchExecutorOutputs>: 'static + Send + fmt::Debug
7740
async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()>;
7841

7942
/// Finished the current L1 batch.
80-
async fn finish_batch(self: Box<Self>) -> anyhow::Result<Out::Batch>;
81-
}
82-
83-
/// Boxed [`BatchExecutorFactory`]. Can be constructed from any executor using [`box_batch_executor_factory()`].
84-
pub type BoxBatchExecutorFactory<S, O = StandardOutputs<S>> =
85-
Box<dyn BatchExecutorFactory<S, Outputs = O, Executor = dyn BatchExecutor<O>>>;
86-
87-
/// Trait object for [`BatchExecutor`] with [`StandardOutputs`].
88-
pub type DynBatchExecutor<S> = dyn BatchExecutor<StandardOutputs<S>>;
89-
90-
/// Wrapper for a [`BatchExecutorFactory`] erasing returned executors.
91-
struct ErasedFactory<T>(T);
92-
93-
impl<T: fmt::Debug> fmt::Debug for ErasedFactory<T> {
94-
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
95-
fmt::Debug::fmt(&self.0, formatter)
96-
}
97-
}
98-
99-
impl<S, T> BatchExecutorFactory<S> for ErasedFactory<T>
100-
where
101-
S: Send + 'static,
102-
T: BatchExecutorFactory<S, Executor: Sized>,
103-
{
104-
type Outputs = T::Outputs;
105-
type Executor = dyn BatchExecutor<T::Outputs>;
106-
107-
fn init_batch(
108-
&mut self,
109-
storage: S,
110-
l1_batch_params: L1BatchEnv,
111-
system_env: SystemEnv,
112-
) -> Box<Self::Executor> {
113-
self.0.init_batch(storage, l1_batch_params, system_env)
114-
}
115-
}
116-
117-
/// Boxes the provided executor factory so that it doesn't have an ambiguous associated type.
118-
pub fn box_batch_executor_factory<S, T>(executor: T) -> BoxBatchExecutorFactory<S, T::Outputs>
119-
where
120-
S: Send + 'static,
121-
T: BatchExecutorFactory<S, Executor: Sized>,
122-
{
123-
Box::new(ErasedFactory(executor))
43+
async fn finish_batch(self: Box<Self>) -> anyhow::Result<(FinishedL1Batch, StorageView<S>)>;
12444
}

core/node/consensus/src/testonly.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@ use zksync_node_sync::{
3030
};
3131
use zksync_node_test_utils::{create_l1_batch_metadata, l1_batch_metadata_to_commitment_artifacts};
3232
use zksync_state_keeper::{
33-
executor::{MainBatchExecutorFactory, MainStateKeeperExecutorFactory},
33+
executor::MainBatchExecutorFactory,
3434
io::{IoCursor, L1BatchParams, L2BlockParams},
3535
seal_criteria::NoopSealer,
36-
testonly::{fund, l1_transaction, l2_transaction, MockBatchExecutor},
36+
testonly::{
37+
fund, l1_transaction, l2_transaction, test_batch_executor::MockReadStorageFactory,
38+
MockBatchExecutor,
39+
},
3740
AsyncRocksdbCache, OutputHandler, StateKeeperPersistence, TreeWritesPersistence,
3841
ZkSyncStateKeeper,
3942
};
@@ -591,8 +594,6 @@ impl StateKeeperRunner {
591594

592595
s.spawn_bg({
593596
let executor_factory = MainBatchExecutorFactory::new(false, false);
594-
let executor_factory =
595-
MainStateKeeperExecutorFactory::new(executor_factory, Arc::new(async_cache));
596597
let stop_recv = stop_recv.clone();
597598
async {
598599
ZkSyncStateKeeper::new(
@@ -602,6 +603,7 @@ impl StateKeeperRunner {
602603
OutputHandler::new(Box::new(persistence.with_tx_insertion()))
603604
.with_handler(Box::new(self.sync_state.clone())),
604605
Arc::new(NoopSealer),
606+
Arc::new(async_cache),
605607
)
606608
.run()
607609
.await
@@ -683,6 +685,7 @@ impl StateKeeperRunner {
683685
.with_handler(Box::new(tree_writes_persistence))
684686
.with_handler(Box::new(self.sync_state.clone())),
685687
Arc::new(NoopSealer),
688+
Arc::new(MockReadStorageFactory),
686689
)
687690
.run()
688691
.await

core/node/node_framework/src/implementations/layers/state_keeper/mod.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,14 @@
11
use std::sync::Arc;
22

33
use anyhow::Context;
4+
pub use zksync_state::RocksdbStorageOptions;
45
use zksync_state::{AsyncCatchupTask, OwnedStorage, ReadStorageFactory};
56
use zksync_state_keeper::{
6-
executor::MainStateKeeperExecutorFactory, seal_criteria::ConditionalSealer, AsyncRocksdbCache,
7-
OutputHandler, StateKeeperIO, ZkSyncStateKeeper,
7+
seal_criteria::ConditionalSealer, AsyncRocksdbCache, OutputHandler, StateKeeperIO,
8+
ZkSyncStateKeeper,
89
};
910
use zksync_storage::RocksDB;
10-
11-
pub mod external_io;
12-
pub mod main_batch_executor;
13-
pub mod mempool_io;
14-
pub mod output_handler;
15-
16-
// Public re-export to not require the user to directly depend on `zksync_state`.
17-
pub use zksync_state::RocksdbStorageOptions;
18-
use zksync_vm_executor::interface::BoxBatchExecutorFactory;
11+
use zksync_vm_executor::interface::BatchExecutorFactory;
1912

2013
use crate::{
2114
implementations::resources::{
@@ -31,6 +24,11 @@ use crate::{
3124
FromContext, IntoContext,
3225
};
3326

27+
pub mod external_io;
28+
pub mod main_batch_executor;
29+
pub mod mempool_io;
30+
pub mod output_handler;
31+
3432
/// Wiring layer for the state keeper.
3533
#[derive(Debug)]
3634
pub struct StateKeeperLayer {
@@ -126,7 +124,7 @@ impl WiringLayer for StateKeeperLayer {
126124
#[derive(Debug)]
127125
pub struct StateKeeperTask {
128126
io: Box<dyn StateKeeperIO>,
129-
executor_factory: BoxBatchExecutorFactory<OwnedStorage>,
127+
executor_factory: Box<dyn BatchExecutorFactory<OwnedStorage>>,
130128
output_handler: OutputHandler,
131129
sealer: Arc<dyn ConditionalSealer>,
132130
storage_factory: Arc<dyn ReadStorageFactory>,
@@ -139,14 +137,13 @@ impl Task for StateKeeperTask {
139137
}
140138

141139
async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
142-
let executor_factory =
143-
MainStateKeeperExecutorFactory::new(self.executor_factory, self.storage_factory);
144140
let state_keeper = ZkSyncStateKeeper::new(
145141
stop_receiver.0,
146142
self.io,
147-
Box::new(executor_factory),
143+
self.executor_factory,
148144
self.output_handler,
149145
self.sealer,
146+
self.storage_factory,
150147
);
151148
state_keeper.run().await
152149
}

core/node/node_framework/src/implementations/layers/vm_runner/bwip.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use zksync_config::configs::vm_runner::BasicWitnessInputProducerConfig;
22
use zksync_types::L2ChainId;
3-
use zksync_vm_executor::{batch::MainBatchExecutorFactory, interface::box_batch_executor_factory};
3+
use zksync_vm_executor::batch::MainBatchExecutorFactory;
44
use zksync_vm_runner::{
55
impls::{BasicWitnessInputProducer, BasicWitnessInputProducerIo},
66
ConcurrentOutputHandlerFactoryTask, StorageSyncTask,
@@ -81,7 +81,7 @@ impl WiringLayer for BasicWitnessInputProducerLayer {
8181
let (basic_witness_input_producer, tasks) = BasicWitnessInputProducer::new(
8282
connection_pool,
8383
object_store.0,
84-
box_batch_executor_factory(batch_executor),
84+
Box::new(batch_executor),
8585
self.config.db_path,
8686
self.zksync_network_id,
8787
self.config.first_processed_batch,

core/node/node_framework/src/implementations/resources/state_keeper.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ use std::sync::Arc;
22

33
use zksync_state::OwnedStorage;
44
use zksync_state_keeper::{seal_criteria::ConditionalSealer, OutputHandler, StateKeeperIO};
5-
use zksync_vm_executor::interface::{
6-
box_batch_executor_factory, BatchExecutorFactory, BoxBatchExecutorFactory, StandardOutputs,
7-
};
5+
use zksync_vm_executor::interface::BatchExecutorFactory;
86

97
use crate::resource::{Resource, Unique};
108

@@ -28,7 +26,7 @@ impl<T: StateKeeperIO> From<T> for StateKeeperIOResource {
2826
/// A resource that provides [`BatchExecutorFactory`] implementation to the service.
2927
/// This resource is unique, e.g. it's expected to be consumed by a single service.
3028
#[derive(Debug, Clone)]
31-
pub struct BatchExecutorResource(pub Unique<BoxBatchExecutorFactory<OwnedStorage>>);
29+
pub struct BatchExecutorResource(pub Unique<Box<dyn BatchExecutorFactory<OwnedStorage>>>);
3230

3331
impl Resource for BatchExecutorResource {
3432
fn name() -> String {
@@ -38,10 +36,10 @@ impl Resource for BatchExecutorResource {
3836

3937
impl<T> From<T> for BatchExecutorResource
4038
where
41-
T: BatchExecutorFactory<OwnedStorage, Outputs = StandardOutputs<OwnedStorage>, Executor: Sized>,
39+
T: BatchExecutorFactory<OwnedStorage>,
4240
{
4341
fn from(executor: T) -> Self {
44-
Self(Unique::new(box_batch_executor_factory(executor)))
42+
Self(Unique::new(Box::new(executor)))
4543
}
4644
}
4745

core/node/node_sync/src/tests.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use zksync_node_test_utils::{
1111
create_l1_batch_metadata, create_l2_transaction, prepare_recovery_snapshot,
1212
};
1313
use zksync_state_keeper::{
14-
executor::MainStateKeeperExecutorFactory,
1514
io::{L1BatchParams, L2BlockParams},
1615
seal_criteria::NoopSealer,
1716
testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder},
@@ -122,20 +121,18 @@ impl StateKeeperHandles {
122121
.unwrap();
123122

124123
let (stop_sender, stop_receiver) = watch::channel(false);
125-
let mut batch_executor_base = TestBatchExecutorBuilder::default();
124+
let mut batch_executor = TestBatchExecutorBuilder::default();
126125
for &tx_hashes_in_l1_batch in tx_hashes {
127-
batch_executor_base.push_successful_transactions(tx_hashes_in_l1_batch);
126+
batch_executor.push_successful_transactions(tx_hashes_in_l1_batch);
128127
}
129128

130129
let state_keeper = ZkSyncStateKeeper::new(
131130
stop_receiver,
132131
Box::new(io),
133-
Box::new(MainStateKeeperExecutorFactory::new(
134-
batch_executor_base,
135-
Arc::new(MockReadStorageFactory),
136-
)),
132+
Box::new(batch_executor),
137133
output_handler,
138134
Arc::new(NoopSealer),
135+
Arc::new(MockReadStorageFactory),
139136
);
140137

141138
Self {

0 commit comments

Comments
 (0)