Skip to content

Commit bdeb411

Browse files
authored
fix(state-keeper): ensure unsealed batch is present during IO init (#3071)
## What ❔ Ensures unsealed L1 batch is present in the DB even if we start with re-execution. ## Why ❔ Leftover bug after #2846 ## Checklist <!-- Check your PR fulfills the following items. --> <!-- For draft PRs check the boxes as you complete them. --> - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk_supervisor fmt` and `zk_supervisor lint`.
1 parent 0841f1e commit bdeb411

File tree

8 files changed

+158
-61
lines changed

8 files changed

+158
-61
lines changed

core/lib/dal/src/blocks_dal.rs

Lines changed: 58 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -578,11 +578,14 @@ impl BlocksDal<'_, '_> {
578578
/// null or set to default value for the corresponding type).
579579
pub async fn insert_l1_batch(
580580
&mut self,
581-
number: L1BatchNumber,
582-
timestamp: u64,
583-
protocol_version: Option<ProtocolVersionId>,
584-
fee_address: Address,
585-
batch_fee_input: BatchFeeInput,
581+
unsealed_batch_header: UnsealedL1BatchHeader,
582+
) -> DalResult<()> {
583+
Self::insert_l1_batch_inner(unsealed_batch_header, self.storage).await
584+
}
585+
586+
async fn insert_l1_batch_inner(
587+
unsealed_batch_header: UnsealedL1BatchHeader,
588+
conn: &mut Connection<'_, Core>,
586589
) -> DalResult<()> {
587590
sqlx::query!(
588591
r#"
@@ -625,21 +628,51 @@ impl BlocksDal<'_, '_> {
625628
FALSE
626629
)
627630
"#,
628-
i64::from(number.0),
629-
timestamp as i64,
630-
protocol_version.map(|v| v as i32),
631-
fee_address.as_bytes(),
632-
batch_fee_input.l1_gas_price() as i64,
633-
batch_fee_input.fair_l2_gas_price() as i64,
634-
batch_fee_input.fair_pubdata_price() as i64,
631+
i64::from(unsealed_batch_header.number.0),
632+
unsealed_batch_header.timestamp as i64,
633+
unsealed_batch_header.protocol_version.map(|v| v as i32),
634+
unsealed_batch_header.fee_address.as_bytes(),
635+
unsealed_batch_header.fee_input.l1_gas_price() as i64,
636+
unsealed_batch_header.fee_input.fair_l2_gas_price() as i64,
637+
unsealed_batch_header.fee_input.fair_pubdata_price() as i64,
635638
)
636639
.instrument("insert_l1_batch")
637-
.with_arg("number", &number)
638-
.execute(self.storage)
640+
.with_arg("number", &unsealed_batch_header.number)
641+
.execute(conn)
639642
.await?;
640643
Ok(())
641644
}
642645

646+
pub async fn ensure_unsealed_l1_batch_exists(
647+
&mut self,
648+
unsealed_batch: UnsealedL1BatchHeader,
649+
) -> anyhow::Result<()> {
650+
let mut transaction = self.storage.start_transaction().await?;
651+
let unsealed_batch_fetched = Self::get_unsealed_l1_batch_inner(&mut transaction).await?;
652+
653+
match unsealed_batch_fetched {
654+
None => {
655+
tracing::info!(
656+
"Unsealed batch #{} could not be found; inserting",
657+
unsealed_batch.number
658+
);
659+
Self::insert_l1_batch_inner(unsealed_batch, &mut transaction).await?;
660+
}
661+
Some(unsealed_batch_fetched) => {
662+
if unsealed_batch_fetched.number != unsealed_batch.number {
663+
anyhow::bail!(
664+
"fetched unsealed L1 batch #{} does not conform to expected L1 batch #{}",
665+
unsealed_batch_fetched.number,
666+
unsealed_batch.number
667+
)
668+
}
669+
}
670+
}
671+
672+
transaction.commit().await?;
673+
Ok(())
674+
}
675+
643676
/// Marks provided L1 batch as sealed and populates it with all the runtime information.
644677
///
645678
/// Errors if the batch does not exist.
@@ -744,6 +777,12 @@ impl BlocksDal<'_, '_> {
744777
}
745778

746779
pub async fn get_unsealed_l1_batch(&mut self) -> DalResult<Option<UnsealedL1BatchHeader>> {
780+
Self::get_unsealed_l1_batch_inner(self.storage).await
781+
}
782+
783+
async fn get_unsealed_l1_batch_inner(
784+
conn: &mut Connection<'_, Core>,
785+
) -> DalResult<Option<UnsealedL1BatchHeader>> {
747786
let batch = sqlx::query_as!(
748787
UnsealedStorageL1Batch,
749788
r#"
@@ -761,8 +800,8 @@ impl BlocksDal<'_, '_> {
761800
NOT is_sealed
762801
"#,
763802
)
764-
.instrument("get_last_committed_to_eth_l1_batch")
765-
.fetch_optional(self.storage)
803+
.instrument("get_unsealed_l1_batch")
804+
.fetch_optional(conn)
766805
.await?;
767806

768807
Ok(batch.map(|b| b.into()))
@@ -2621,11 +2660,7 @@ impl BlocksDal<'_, '_> {
26212660

26222661
pub async fn insert_mock_l1_batch(&mut self, header: &L1BatchHeader) -> anyhow::Result<()> {
26232662
self.insert_l1_batch(
2624-
header.number,
2625-
header.timestamp,
2626-
header.protocol_version,
2627-
header.fee_address,
2628-
BatchFeeInput::pubdata_independent(100, 100, 100),
2663+
header.to_unsealed_header(BatchFeeInput::pubdata_independent(100, 100, 100)),
26292664
)
26302665
.await?;
26312666
self.mark_l1_batch_as_sealed(
@@ -2940,11 +2975,7 @@ mod tests {
29402975
};
29412976
conn.blocks_dal()
29422977
.insert_l1_batch(
2943-
header.number,
2944-
header.timestamp,
2945-
header.protocol_version,
2946-
header.fee_address,
2947-
BatchFeeInput::pubdata_independent(100, 100, 100),
2978+
header.to_unsealed_header(BatchFeeInput::pubdata_independent(100, 100, 100)),
29482979
)
29492980
.await
29502981
.unwrap();
@@ -2958,11 +2989,7 @@ mod tests {
29582989
predicted_gas += predicted_gas;
29592990
conn.blocks_dal()
29602991
.insert_l1_batch(
2961-
header.number,
2962-
header.timestamp,
2963-
header.protocol_version,
2964-
header.fee_address,
2965-
BatchFeeInput::pubdata_independent(100, 100, 100),
2992+
header.to_unsealed_header(BatchFeeInput::pubdata_independent(100, 100, 100)),
29662993
)
29672994
.await
29682995
.unwrap();

core/lib/types/src/block.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ pub struct L1BatchHeader {
6868
pub fee_address: Address,
6969
}
7070

71+
impl L1BatchHeader {
72+
pub fn to_unsealed_header(&self, fee_input: BatchFeeInput) -> UnsealedL1BatchHeader {
73+
UnsealedL1BatchHeader {
74+
number: self.number,
75+
timestamp: self.timestamp,
76+
protocol_version: self.protocol_version,
77+
fee_address: self.fee_address,
78+
fee_input,
79+
}
80+
}
81+
}
82+
7183
#[derive(Debug, Clone, PartialEq)]
7284
pub struct UnsealedL1BatchHeader {
7385
pub number: L1BatchNumber,

core/lib/vm_interface/src/types/inputs/l1_batch_env.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use serde::{Deserialize, Serialize};
2-
use zksync_types::{fee_model::BatchFeeInput, Address, L1BatchNumber, H256};
2+
use zksync_types::{
3+
block::UnsealedL1BatchHeader, fee_model::BatchFeeInput, Address, L1BatchNumber,
4+
ProtocolVersionId, H256,
5+
};
36

47
use super::L2BlockEnv;
58

@@ -21,3 +24,18 @@ pub struct L1BatchEnv {
2124
pub enforced_base_fee: Option<u64>,
2225
pub first_l2_block: L2BlockEnv,
2326
}
27+
28+
impl L1BatchEnv {
29+
pub fn into_unsealed_header(
30+
self,
31+
protocol_version: Option<ProtocolVersionId>,
32+
) -> UnsealedL1BatchHeader {
33+
UnsealedL1BatchHeader {
34+
number: self.number,
35+
timestamp: self.timestamp,
36+
protocol_version,
37+
fee_address: self.fee_account,
38+
fee_input: self.fee_input,
39+
}
40+
}
41+
}

core/node/genesis/src/lib.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -419,13 +419,7 @@ pub async fn create_genesis_l1_batch(
419419
.await?;
420420
transaction
421421
.blocks_dal()
422-
.insert_l1_batch(
423-
genesis_l1_batch_header.number,
424-
genesis_l1_batch_header.timestamp,
425-
genesis_l1_batch_header.protocol_version,
426-
genesis_l1_batch_header.fee_address,
427-
batch_fee_input,
428-
)
422+
.insert_l1_batch(genesis_l1_batch_header.to_unsealed_header(batch_fee_input))
429423
.await?;
430424
transaction
431425
.blocks_dal()

core/node/node_sync/src/external_io.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use zksync_state_keeper::{
1515
updates::UpdatesManager,
1616
};
1717
use zksync_types::{
18+
block::UnsealedL1BatchHeader,
1819
protocol_upgrade::ProtocolUpgradeTx,
1920
protocol_version::{ProtocolSemanticVersion, VersionPatch},
2021
L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256,
@@ -200,6 +201,14 @@ impl StateKeeperIO for ExternalIO {
200201
cursor.l1_batch
201202
)
202203
})?;
204+
storage
205+
.blocks_dal()
206+
.ensure_unsealed_l1_batch_exists(
207+
l1_batch_env
208+
.clone()
209+
.into_unsealed_header(Some(system_env.version)),
210+
)
211+
.await?;
203212
let data = load_pending_batch(&mut storage, system_env, l1_batch_env)
204213
.await
205214
.with_context(|| {
@@ -241,13 +250,13 @@ impl StateKeeperIO for ExternalIO {
241250
.connection()
242251
.await?
243252
.blocks_dal()
244-
.insert_l1_batch(
245-
cursor.l1_batch,
246-
params.first_l2_block.timestamp,
247-
None,
248-
params.operator_address,
249-
params.fee_input,
250-
)
253+
.insert_l1_batch(UnsealedL1BatchHeader {
254+
number: cursor.l1_batch,
255+
timestamp: params.first_l2_block.timestamp,
256+
protocol_version: None,
257+
fee_address: params.operator_address,
258+
fee_input: params.fee_input,
259+
})
251260
.await?;
252261
return Ok(Some(params));
253262
}

core/node/state_keeper/src/io/mempool.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use zksync_mempool::L2TxFilter;
1414
use zksync_multivm::{interface::Halt, utils::derive_base_fee_and_gas_per_pubdata};
1515
use zksync_node_fee_model::BatchFeeModelInputProvider;
1616
use zksync_types::{
17-
protocol_upgrade::ProtocolUpgradeTx, utils::display_timestamp, Address, L1BatchNumber,
18-
L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, U256,
17+
block::UnsealedL1BatchHeader, protocol_upgrade::ProtocolUpgradeTx, utils::display_timestamp,
18+
Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, U256,
1919
};
2020
// TODO (SMA-1206): use seconds instead of milliseconds.
2121
use zksync_utils::time::millis_since_epoch;
@@ -133,6 +133,15 @@ impl StateKeeperIO for MempoolIO {
133133
gas_per_pubdata: gas_per_pubdata as u32,
134134
};
135135

136+
storage
137+
.blocks_dal()
138+
.ensure_unsealed_l1_batch_exists(
139+
l1_batch_env
140+
.clone()
141+
.into_unsealed_header(Some(system_env.version)),
142+
)
143+
.await?;
144+
136145
Ok((
137146
cursor,
138147
Some(PendingBatchData {
@@ -219,13 +228,13 @@ impl StateKeeperIO for MempoolIO {
219228
.connection()
220229
.await?
221230
.blocks_dal()
222-
.insert_l1_batch(
223-
cursor.l1_batch,
231+
.insert_l1_batch(UnsealedL1BatchHeader {
232+
number: cursor.l1_batch,
224233
timestamp,
225-
Some(protocol_version),
226-
self.fee_account,
227-
self.filter.fee_input,
228-
)
234+
protocol_version: Some(protocol_version),
235+
fee_address: self.fee_account,
236+
fee_input: self.filter.fee_input,
237+
})
229238
.await?;
230239

231240
return Ok(Some(L1BatchParams {

core/node/state_keeper/src/io/persistence.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -456,13 +456,7 @@ mod tests {
456456
.await
457457
.unwrap()
458458
.blocks_dal()
459-
.insert_l1_batch(
460-
l1_batch_env.number,
461-
l1_batch_env.timestamp,
462-
None,
463-
l1_batch_env.fee_account,
464-
l1_batch_env.fee_input,
465-
)
459+
.insert_l1_batch(l1_batch_env.into_unsealed_header(None))
466460
.await
467461
.unwrap();
468462

core/node/state_keeper/src/io/tests/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,3 +606,37 @@ async fn continue_unsealed_batch_on_restart(commitment_mode: L1BatchCommitmentMo
606606

607607
assert_eq!(old_l1_batch_params, new_l1_batch_params);
608608
}
609+
610+
#[test_casing(2, COMMITMENT_MODES)]
611+
#[tokio::test]
612+
async fn insert_unsealed_batch_on_init(commitment_mode: L1BatchCommitmentMode) {
613+
let connection_pool = ConnectionPool::<Core>::test_pool().await;
614+
let mut tester = Tester::new(commitment_mode);
615+
tester.genesis(&connection_pool).await;
616+
let fee_input = BatchFeeInput::pubdata_independent(55, 555, 5555);
617+
let tx_result = tester
618+
.insert_l2_block(&connection_pool, 1, 5, fee_input)
619+
.await;
620+
tester
621+
.insert_sealed_batch(&connection_pool, 1, &[tx_result])
622+
.await;
623+
// Pre-insert L2 block without its unsealed L1 batch counterpart
624+
tester.set_timestamp(2);
625+
tester
626+
.insert_l2_block(&connection_pool, 2, 5, fee_input)
627+
.await;
628+
629+
let (mut mempool, _) = tester.create_test_mempool_io(connection_pool.clone()).await;
630+
// Initialization is supposed to recognize that the current L1 batch is not present in the DB and
631+
// insert it itself.
632+
let (cursor, _) = mempool.initialize().await.unwrap();
633+
634+
// Make sure we are able to fetch the newly inserted batch's params
635+
let l1_batch_params = mempool
636+
.wait_for_new_batch_params(&cursor, Duration::from_secs(10))
637+
.await
638+
.unwrap()
639+
.expect("no batch params generated");
640+
assert_eq!(l1_batch_params.fee_input, fee_input);
641+
assert_eq!(l1_batch_params.first_l2_block.timestamp, 2);
642+
}

0 commit comments

Comments
 (0)