Skip to content

Commit f2f4056

Browse files
authored
feat(node-framework): New wiring interface (#2384)
## What ❔ ⚠️ No nitpick territory! This PR touches _a lot of code_, and many places there may be improved for sure. Let's focus on fundamentals only. You are free to leave nitpick comments, but please don't block the review on them only. I may or may not fix nitpicks, though will try depending on complexity and capacity, most likely in follow-up PRs. This PR introduces a new interface for `WiringLayer`. Instead of giving direct access to the `ServiceContext`, it now has to define `Input` and `Output` types, which will be fetched from/inserted to the context correspondingly. `WiringLayer::Input` has to implement `FromContext` trait. This trait has implementations for `()`, `T: Resource`, `Option<T: Resource>` , and can be derived. `WiringLayer::Output` has to implement `IntoContext`, which has the same basic implementations, and also has a derive macro. With this approach, all the inputs and outputs can be easily seen for the layer, so that we don't need to worry about docs getting outdated, and also it saves quite some boilerplate when using the framework. Besides, small changes were made where necessary, e.g.: - Consensus layer was split into two, for main and external node. - TxSink layer was split into two, for DB and proxy sinks. - A lot of "wrapper" tasks were removed. - Some convenience impls (e.g. impl `From <WrappedType> to <ResourceType>`). - Shutdown hook was made into a separate entity that implements `IntoContext`. ## Why ❔ Finalization of the framework design. ## Checklist <!-- Check your PR fulfills the following items. --> <!-- For draft PRs check the boxes as you complete them. --> - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`.
1 parent 52a4680 commit f2f4056

File tree

82 files changed

+2266
-1827
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+2266
-1827
lines changed

core/bin/external_node/src/node_builder.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use zksync_node_framework::{
1616
implementations::layers::{
1717
batch_status_updater::BatchStatusUpdaterLayer,
1818
commitment_generator::CommitmentGeneratorLayer,
19-
consensus::{ConsensusLayer, Mode},
19+
consensus::ExternalNodeConsensusLayer,
2020
consistency_checker::ConsistencyCheckerLayer,
2121
healtcheck_server::HealthCheckLayer,
2222
l1_batch_commitment_mode_validation::L1BatchCommitmentModeValidationLayer,
@@ -41,7 +41,7 @@ use zksync_node_framework::{
4141
server::{Web3ServerLayer, Web3ServerOptionalConfig},
4242
tree_api_client::TreeApiClientLayer,
4343
tx_sender::{PostgresStorageCachesConfig, TxSenderLayer},
44-
tx_sink::TxSinkLayer,
44+
tx_sink::ProxySinkLayer,
4545
},
4646
},
4747
service::{ZkStackService, ZkStackServiceBuilder},
@@ -209,11 +209,7 @@ impl ExternalNodeBuilder {
209209
let config = self.config.consensus.clone();
210210
let secrets =
211211
config::read_consensus_secrets().context("config::read_consensus_secrets()")?;
212-
let layer = ConsensusLayer {
213-
mode: Mode::External,
214-
config,
215-
secrets,
216-
};
212+
let layer = ExternalNodeConsensusLayer { config, secrets };
217213
self.node.add_layer(layer);
218214
Ok(self)
219215
}
@@ -359,7 +355,7 @@ impl ExternalNodeBuilder {
359355
)
360356
.with_whitelisted_tokens_for_aa_cache(true);
361357

362-
self.node.add_layer(TxSinkLayer::ProxySink);
358+
self.node.add_layer(ProxySinkLayer);
363359
self.node.add_layer(tx_sender_layer);
364360
Ok(self)
365361
}

core/bin/zksync_server/src/node_builder.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use zksync_node_framework::{
2525
base_token_ratio_provider::BaseTokenRatioProviderLayer,
2626
circuit_breaker_checker::CircuitBreakerCheckerLayer,
2727
commitment_generator::CommitmentGeneratorLayer,
28-
consensus::{ConsensusLayer, Mode as ConsensusMode},
28+
consensus::MainNodeConsensusLayer,
2929
contract_verification_api::ContractVerificationApiLayer,
3030
da_dispatcher::DataAvailabilityDispatcherLayer,
3131
eth_sender::{EthTxAggregatorLayer, EthTxManagerLayer},
@@ -56,7 +56,7 @@ use zksync_node_framework::{
5656
server::{Web3ServerLayer, Web3ServerOptionalConfig},
5757
tree_api_client::TreeApiClientLayer,
5858
tx_sender::{PostgresStorageCachesConfig, TxSenderLayer},
59-
tx_sink::TxSinkLayer,
59+
tx_sink::MasterPoolSinkLayer,
6060
},
6161
},
6262
service::{ZkStackService, ZkStackServiceBuilder},
@@ -280,7 +280,7 @@ impl MainNodeBuilder {
280280
};
281281

282282
// On main node we always use master pool sink.
283-
self.node.add_layer(TxSinkLayer::MasterPoolSink);
283+
self.node.add_layer(MasterPoolSinkLayer);
284284
self.node.add_layer(TxSenderLayer::new(
285285
TxSenderConfig::new(
286286
&sk_config,
@@ -445,10 +445,16 @@ impl MainNodeBuilder {
445445
}
446446

447447
fn add_consensus_layer(mut self) -> anyhow::Result<Self> {
448-
self.node.add_layer(ConsensusLayer {
449-
mode: ConsensusMode::Main,
450-
config: self.consensus_config.clone(),
451-
secrets: self.secrets.consensus.clone(),
448+
self.node.add_layer(MainNodeConsensusLayer {
449+
config: self
450+
.consensus_config
451+
.clone()
452+
.context("Consensus config has to be provided")?,
453+
secrets: self
454+
.secrets
455+
.consensus
456+
.clone()
457+
.context("Consensus secrets have to be provided")?,
452458
});
453459

454460
Ok(self)

core/bin/zksync_tee_prover/src/tee_prover.rs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
use std::time::Duration;
1+
use std::{fmt, time::Duration};
22

33
use secp256k1::{ecdsa::Signature, Message, PublicKey, Secp256k1, SecretKey};
44
use url::Url;
55
use zksync_basic_types::H256;
66
use zksync_node_framework::{
7-
service::{ServiceContext, StopReceiver},
7+
service::StopReceiver,
88
task::{Task, TaskId},
99
wiring_layer::{WiringError, WiringLayer},
10+
IntoContext,
1011
};
1112
use zksync_prover_interface::inputs::TeeVerifierInput;
1213
use zksync_tee_verifier::Verify;
@@ -15,16 +16,8 @@ use zksync_types::{tee_types::TeeType, L1BatchNumber};
1516
use crate::{api_client::TeeApiClient, error::TeeProverError, metrics::METRICS};
1617

1718
/// Wiring layer for `TeeProver`
18-
///
19-
/// ## Requests resources
20-
///
21-
/// no resources requested
22-
///
23-
/// ## Adds tasks
24-
///
25-
/// - `TeeProver`
2619
#[derive(Debug)]
27-
pub struct TeeProverLayer {
20+
pub(crate) struct TeeProverLayer {
2821
api_url: Url,
2922
signing_key: SecretKey,
3023
attestation_quote_bytes: Vec<u8>,
@@ -47,27 +40,35 @@ impl TeeProverLayer {
4740
}
4841
}
4942

43+
#[derive(Debug, IntoContext)]
44+
pub(crate) struct LayerOutput {
45+
#[context(task)]
46+
pub tee_prover: TeeProver,
47+
}
48+
5049
#[async_trait::async_trait]
5150
impl WiringLayer for TeeProverLayer {
51+
type Input = ();
52+
type Output = LayerOutput;
53+
5254
fn layer_name(&self) -> &'static str {
5355
"tee_prover_layer"
5456
}
5557

56-
async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
57-
let tee_prover_task = TeeProver {
58+
async fn wire(self, _input: Self::Input) -> Result<Self::Output, WiringError> {
59+
let tee_prover = TeeProver {
5860
config: Default::default(),
5961
signing_key: self.signing_key,
6062
public_key: self.signing_key.public_key(&Secp256k1::new()),
6163
attestation_quote_bytes: self.attestation_quote_bytes,
6264
tee_type: self.tee_type,
6365
api_client: TeeApiClient::new(self.api_url),
6466
};
65-
context.add_task(tee_prover_task);
66-
Ok(())
67+
Ok(LayerOutput { tee_prover })
6768
}
6869
}
6970

70-
struct TeeProver {
71+
pub(crate) struct TeeProver {
7172
config: TeeProverConfig,
7273
signing_key: SecretKey,
7374
public_key: PublicKey,
@@ -76,6 +77,17 @@ struct TeeProver {
7677
api_client: TeeApiClient,
7778
}
7879

80+
impl fmt::Debug for TeeProver {
81+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82+
f.debug_struct("TeeProver")
83+
.field("config", &self.config)
84+
.field("public_key", &self.public_key)
85+
.field("attestation_quote_bytes", &self.attestation_quote_bytes)
86+
.field("tee_type", &self.tee_type)
87+
.finish()
88+
}
89+
}
90+
7991
impl TeeProver {
8092
fn verify(
8193
&self,

core/lib/default_da_clients/src/no_da/wiring_layer.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,34 @@ use std::fmt::Debug;
33
use zksync_da_client::DataAvailabilityClient;
44
use zksync_node_framework::{
55
implementations::resources::da_client::DAClientResource,
6-
service::ServiceContext,
76
wiring_layer::{WiringError, WiringLayer},
7+
IntoContext,
88
};
99

1010
use crate::no_da::client::NoDAClient;
1111

1212
#[derive(Debug, Default)]
1313
pub struct NoDAClientWiringLayer;
1414

15+
#[derive(Debug, IntoContext)]
16+
pub struct Output {
17+
pub client: DAClientResource,
18+
}
19+
1520
#[async_trait::async_trait]
1621
impl WiringLayer for NoDAClientWiringLayer {
22+
type Input = ();
23+
type Output = Output;
24+
1725
fn layer_name(&self) -> &'static str {
1826
"no_da_layer"
1927
}
2028

21-
async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
29+
async fn wire(self, _input: Self::Input) -> Result<Self::Output, WiringError> {
2230
let client: Box<dyn DataAvailabilityClient> = Box::new(NoDAClient);
2331

24-
context.insert_resource(DAClientResource(client))?;
25-
26-
Ok(())
32+
Ok(Output {
33+
client: DAClientResource(client),
34+
})
2735
}
2836
}

core/lib/default_da_clients/src/object_store/wiring_layer.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use zksync_config::ObjectStoreConfig;
22
use zksync_da_client::DataAvailabilityClient;
33
use zksync_node_framework::{
44
implementations::resources::da_client::DAClientResource,
5-
service::ServiceContext,
65
wiring_layer::{WiringError, WiringLayer},
6+
IntoContext,
77
};
88

99
use crate::object_store::client::ObjectStoreDAClient;
@@ -19,18 +19,26 @@ impl ObjectStorageClientWiringLayer {
1919
}
2020
}
2121

22+
#[derive(Debug, IntoContext)]
23+
pub struct Output {
24+
pub client: DAClientResource,
25+
}
26+
2227
#[async_trait::async_trait]
2328
impl WiringLayer for ObjectStorageClientWiringLayer {
29+
type Input = ();
30+
type Output = Output;
31+
2432
fn layer_name(&self) -> &'static str {
2533
"object_store_da_layer"
2634
}
2735

28-
async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
36+
async fn wire(self, _input: Self::Input) -> Result<Self::Output, WiringError> {
2937
let client: Box<dyn DataAvailabilityClient> =
3038
Box::new(ObjectStoreDAClient::new(self.config).await?);
3139

32-
context.insert_resource(DAClientResource(client))?;
33-
34-
Ok(())
40+
Ok(Output {
41+
client: DAClientResource(client),
42+
})
3543
}
3644
}

core/node/base_token_adjuster/src/base_token_ratio_provider.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
use std::{fmt::Debug, num::NonZeroU64, time::Duration};
1+
use std::{
2+
fmt::Debug,
3+
num::NonZeroU64,
4+
sync::{Arc, RwLock},
5+
time::Duration,
6+
};
27

38
use anyhow::Context;
49
use async_trait::async_trait;
@@ -9,23 +14,23 @@ use zksync_types::fee_model::BaseTokenConversionRatio;
914
const CACHE_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
1015

1116
#[async_trait]
12-
pub trait BaseTokenRatioProvider: Debug + Send + Sync {
17+
pub trait BaseTokenRatioProvider: Debug + Send + Sync + 'static {
1318
fn get_conversion_ratio(&self) -> BaseTokenConversionRatio;
1419
}
1520

1621
#[derive(Debug, Clone)]
1722
pub struct DBBaseTokenRatioProvider {
1823
pub pool: ConnectionPool<Core>,
19-
pub latest_ratio: BaseTokenConversionRatio,
24+
pub latest_ratio: Arc<RwLock<BaseTokenConversionRatio>>,
2025
}
2126

2227
impl DBBaseTokenRatioProvider {
2328
pub async fn new(pool: ConnectionPool<Core>) -> anyhow::Result<Self> {
24-
let mut fetcher = Self {
29+
let fetcher = Self {
2530
pool,
26-
latest_ratio: BaseTokenConversionRatio::default(),
31+
latest_ratio: Arc::default(),
2732
};
28-
fetcher.latest_ratio = fetcher.get_latest_price().await?;
33+
fetcher.update_latest_price().await?;
2934

3035
// TODO(PE-129): Implement latest ratio usability logic.
3136

@@ -36,7 +41,11 @@ impl DBBaseTokenRatioProvider {
3641
Ok(fetcher)
3742
}
3843

39-
pub async fn run(&mut self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
44+
fn get_latest_ratio(&self) -> BaseTokenConversionRatio {
45+
*self.latest_ratio.read().unwrap()
46+
}
47+
48+
pub async fn run(&self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
4049
let mut timer = tokio::time::interval(CACHE_UPDATE_INTERVAL);
4150

4251
while !*stop_receiver.borrow_and_update() {
@@ -45,20 +54,15 @@ impl DBBaseTokenRatioProvider {
4554
_ = stop_receiver.changed() => break,
4655
}
4756

48-
let latest_storage_ratio = self.get_latest_price().await?;
49-
5057
// TODO(PE-129): Implement latest ratio usability logic.
51-
self.latest_ratio = BaseTokenConversionRatio {
52-
numerator: latest_storage_ratio.numerator,
53-
denominator: latest_storage_ratio.denominator,
54-
};
58+
self.update_latest_price().await?;
5559
}
5660

5761
tracing::info!("Stop signal received, base_token_ratio_provider is shutting down");
5862
Ok(())
5963
}
6064

61-
async fn get_latest_price(&self) -> anyhow::Result<BaseTokenConversionRatio> {
65+
async fn update_latest_price(&self) -> anyhow::Result<()> {
6266
let latest_storage_ratio = self
6367
.pool
6468
.connection_tagged("db_base_token_ratio_provider")
@@ -68,28 +72,31 @@ impl DBBaseTokenRatioProvider {
6872
.get_latest_ratio()
6973
.await;
7074

71-
match latest_storage_ratio {
72-
Ok(Some(latest_storage_price)) => Ok(BaseTokenConversionRatio {
75+
let ratio = match latest_storage_ratio {
76+
Ok(Some(latest_storage_price)) => BaseTokenConversionRatio {
7377
numerator: latest_storage_price.numerator,
7478
denominator: latest_storage_price.denominator,
75-
}),
79+
},
7680
Ok(None) => {
7781
// TODO(PE-136): Insert initial ratio from genesis.
7882
// Though the DB should be populated very soon after the server starts, it is possible
7983
// to have no ratios in the DB right after genesis. Having initial ratios in the DB
8084
// from the genesis stage will eliminate this possibility.
8185
tracing::error!("No latest price found in the database. Using default ratio.");
82-
Ok(BaseTokenConversionRatio::default())
86+
BaseTokenConversionRatio::default()
8387
}
8488
Err(err) => anyhow::bail!("Failed to get latest base token ratio: {:?}", err),
85-
}
89+
};
90+
91+
*self.latest_ratio.write().unwrap() = ratio;
92+
Ok(())
8693
}
8794
}
8895

8996
#[async_trait]
9097
impl BaseTokenRatioProvider for DBBaseTokenRatioProvider {
9198
fn get_conversion_ratio(&self) -> BaseTokenConversionRatio {
92-
self.latest_ratio
99+
self.get_latest_ratio()
93100
}
94101
}
95102

core/node/node_framework/examples/main_node.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use zksync_node_framework::{
5050
server::{Web3ServerLayer, Web3ServerOptionalConfig},
5151
tree_api_client::TreeApiClientLayer,
5252
tx_sender::{PostgresStorageCachesConfig, TxSenderLayer},
53-
tx_sink::TxSinkLayer,
53+
tx_sink::MasterPoolSinkLayer,
5454
},
5555
},
5656
service::{ZkStackService, ZkStackServiceBuilder, ZkStackServiceError},
@@ -215,7 +215,7 @@ impl MainNodeBuilder {
215215
let wallets = Wallets::from_env()?;
216216

217217
// On main node we always use master pool sink.
218-
self.node.add_layer(TxSinkLayer::MasterPoolSink);
218+
self.node.add_layer(MasterPoolSinkLayer);
219219
self.node.add_layer(TxSenderLayer::new(
220220
TxSenderConfig::new(
221221
&state_keeper_config,

0 commit comments

Comments
 (0)