Skip to content

Commit f095b4a

Browse files
authored
feat(en): Resume incomplete snapshot in snapshot creator in more cases (#2886)
## What ❔ Resumes an incomplete snapshot in the snapshot creator if the creator config doesn't specify an L1 batch. ## Why ❔ This effectively reverts the relevant changes from #2256. It makes the snapshot creator resilient by default without additional setup, at the cost of parallel creator jobs working incorrectly (unless they all specify L1 batches). ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`.
1 parent 438c820 commit f095b4a

File tree

4 files changed

+101
-42
lines changed

4 files changed

+101
-42
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/bin/snapshots_creator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@ futures.workspace = true
2929

3030
[dev-dependencies]
3131
rand.workspace = true
32+
test-casing.workspace = true

core/bin/snapshots_creator/src/creator.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -291,44 +291,46 @@ impl SnapshotCreator {
291291
.get_sealed_l1_batch_number()
292292
.await?;
293293
let sealed_l1_batch_number = sealed_l1_batch_number.context("No L1 batches in Postgres")?;
294-
let requested_l1_batch_number = if let Some(l1_batch_number) = config.l1_batch_number {
294+
let (requested_l1_batch_number, existing_snapshot) = if let Some(l1_batch_number) =
295+
config.l1_batch_number
296+
{
295297
anyhow::ensure!(
296298
l1_batch_number <= sealed_l1_batch_number,
297299
"Requested a snapshot for L1 batch #{l1_batch_number} that doesn't exist in Postgres (latest L1 batch: {sealed_l1_batch_number})"
298300
);
299-
l1_batch_number
301+
302+
let existing_snapshot = master_conn
303+
.snapshots_dal()
304+
.get_snapshot_metadata(l1_batch_number)
305+
.await?;
306+
(l1_batch_number, existing_snapshot)
300307
} else {
301308
// We subtract 1 so that after restore, EN node has at least one L1 batch to fetch.
302309
anyhow::ensure!(
303310
sealed_l1_batch_number != L1BatchNumber(0),
304311
"Cannot create snapshot when only the genesis L1 batch is present in Postgres"
305312
);
306-
sealed_l1_batch_number - 1
307-
};
313+
let requested_l1_batch_number = sealed_l1_batch_number - 1;
308314

309-
let existing_snapshot = master_conn
310-
.snapshots_dal()
311-
.get_snapshot_metadata(requested_l1_batch_number)
312-
.await?;
315+
// Continue creating a pending snapshot if it exists, even if it doesn't correspond to the latest L1 batch.
316+
// OTOH, a completed snapshot does not matter, unless it corresponds to `requested_l1_batch_number` (in which case it doesn't need to be created again).
317+
let existing_snapshot = master_conn
318+
.snapshots_dal()
319+
.get_newest_snapshot_metadata()
320+
.await?
321+
.filter(|snapshot| {
322+
!snapshot.is_complete() || snapshot.l1_batch_number == requested_l1_batch_number
323+
});
324+
(requested_l1_batch_number, existing_snapshot)
325+
};
313326
drop(master_conn);
314327

315328
match existing_snapshot {
316329
Some(snapshot) if snapshot.is_complete() => {
317330
tracing::info!("Snapshot for the requested L1 batch is complete: {snapshot:?}");
318331
Ok(None)
319332
}
320-
Some(snapshot) if config.l1_batch_number.is_some() => {
321-
Ok(Some(SnapshotProgress::from_existing_snapshot(&snapshot)))
322-
}
323-
Some(snapshot) => {
324-
// Unless creating a snapshot for a specific L1 batch is requested, we never continue an existing snapshot, even if it's incomplete.
325-
// This it to make running multiple snapshot creator instances in parallel easier to reason about.
326-
tracing::warn!(
327-
"Snapshot at expected L1 batch #{requested_l1_batch_number} exists, but is incomplete: {snapshot:?}. If you need to resume creating it, \
328-
specify the L1 batch number in the snapshot creator config"
329-
);
330-
Ok(None)
331-
}
333+
Some(snapshot) => Ok(Some(SnapshotProgress::from_existing_snapshot(&snapshot))),
332334
None => {
333335
Self::initialize_snapshot_progress(
334336
config,

core/bin/snapshots_creator/src/tests.rs

Lines changed: 77 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::{
1010
};
1111

1212
use rand::{thread_rng, Rng};
13+
use test_casing::test_casing;
1314
use zksync_config::SnapshotsCreatorConfig;
1415
use zksync_dal::{Connection, CoreDal};
1516
use zksync_object_store::{MockObjectStore, ObjectStore};
@@ -64,6 +65,15 @@ impl HandleEvent for TestEventListener {
6465
}
6566
}
6667

68+
#[derive(Debug)]
69+
struct UnreachableEventListener;
70+
71+
impl HandleEvent for UnreachableEventListener {
72+
fn on_chunk_started(&self) -> TestBehavior {
73+
unreachable!("should not be reached");
74+
}
75+
}
76+
6777
impl SnapshotCreator {
6878
fn for_tests(blob_store: Arc<dyn ObjectStore>, pool: ConnectionPool<Core>) -> Self {
6979
Self {
@@ -80,6 +90,13 @@ impl SnapshotCreator {
8090
..self
8191
}
8292
}
93+
94+
fn panic_on_chunk_start(self) -> Self {
95+
Self {
96+
event_listener: Box::new(UnreachableEventListener),
97+
..self
98+
}
99+
}
83100
}
84101

85102
#[derive(Debug)]
@@ -431,8 +448,9 @@ async fn persisting_snapshot_logs_for_v0_snapshot() {
431448
assert_eq!(actual_logs, expected_outputs.storage_logs);
432449
}
433450

451+
#[test_casing(2, [false, true])]
434452
#[tokio::test]
435-
async fn recovery_workflow() {
453+
async fn recovery_workflow(specify_batch_after_recovery: bool) {
436454
let pool = ConnectionPool::<Core>::test_pool().await;
437455
let mut rng = thread_rng();
438456
let object_store = MockObjectStore::arc();
@@ -462,29 +480,9 @@ async fn recovery_workflow() {
462480
let actual_deps: HashSet<_> = factory_deps.into_iter().collect();
463481
assert_eq!(actual_deps, expected_outputs.deps);
464482

465-
// Check that the creator does nothing unless it's requested to create a new snapshot.
466-
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
467-
.stop_after_chunk_count(2)
468-
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
469-
.await
470-
.unwrap();
471-
let snapshot_metadata = conn
472-
.snapshots_dal()
473-
.get_snapshot_metadata(snapshot_l1_batch_number)
474-
.await
475-
.unwrap()
476-
.expect("No snapshot metadata");
477-
assert!(
478-
snapshot_metadata
479-
.storage_logs_filepaths
480-
.iter()
481-
.all(Option::is_none),
482-
"{snapshot_metadata:?}"
483-
);
484-
485483
// Process 2 storage log chunks, then stop.
486484
let recovery_config = SnapshotsCreatorConfig {
487-
l1_batch_number: Some(snapshot_l1_batch_number),
485+
l1_batch_number: specify_batch_after_recovery.then_some(snapshot_l1_batch_number),
488486
..SEQUENTIAL_TEST_CONFIG
489487
};
490488
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
@@ -510,11 +508,68 @@ async fn recovery_workflow() {
510508

511509
// Process the remaining chunks.
512510
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
511+
.run(recovery_config.clone(), MIN_CHUNK_COUNT)
512+
.await
513+
.unwrap();
514+
515+
assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;
516+
517+
// Check that the snapshot is not created anew after it is completed.
518+
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
519+
.panic_on_chunk_start()
513520
.run(recovery_config, MIN_CHUNK_COUNT)
514521
.await
515522
.unwrap();
516523

524+
let snapshot_metadata = conn
525+
.snapshots_dal()
526+
.get_snapshot_metadata(snapshot_l1_batch_number)
527+
.await
528+
.unwrap()
529+
.expect("No snapshot metadata");
530+
assert!(snapshot_metadata.is_complete(), "{snapshot_metadata:#?}");
531+
}
532+
533+
#[tokio::test]
534+
async fn recovery_workflow_with_new_l1_batch() {
535+
let pool = ConnectionPool::<Core>::test_pool().await;
536+
let mut rng = thread_rng();
537+
let object_store = MockObjectStore::arc();
538+
let mut conn = pool.connection().await.unwrap();
539+
let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await;
540+
541+
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
542+
.stop_after_chunk_count(2)
543+
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
544+
.await
545+
.unwrap();
546+
547+
let snapshot_l1_batch_number = L1BatchNumber(8);
548+
let snapshot_metadata = conn
549+
.snapshots_dal()
550+
.get_snapshot_metadata(snapshot_l1_batch_number)
551+
.await
552+
.unwrap()
553+
.expect("No snapshot metadata");
554+
assert!(!snapshot_metadata.is_complete(), "{snapshot_metadata:#?}");
555+
556+
let new_logs = gen_storage_logs(&mut thread_rng(), 50);
557+
create_l1_batch(&mut conn, snapshot_l1_batch_number + 2, &new_logs).await;
558+
559+
// The old snapshot should be completed.
560+
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
561+
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
562+
.await
563+
.unwrap();
517564
assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;
565+
566+
let snapshot_metadata = conn
567+
.snapshots_dal()
568+
.get_snapshot_metadata(snapshot_l1_batch_number)
569+
.await
570+
.unwrap()
571+
.expect("No snapshot metadata");
572+
assert!(snapshot_metadata.is_complete(), "{snapshot_metadata:#?}");
518573
}
519574

520575
#[tokio::test]

0 commit comments

Comments
 (0)