Skip to content
29 changes: 23 additions & 6 deletions crates/networking/p2p/sync/trie_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@ use tracing::{info, warn};
use crate::sync::seconds_to_readable;

use super::{
SyncError, MAX_CHANNEL_MESSAGES, MAX_CHANNEL_READS, MAX_PARALLEL_FETCHES,
SHOW_PROGRESS_INTERVAL_DURATION, STATE_TRIE_SEGMENTS_END, STATE_TRIE_SEGMENTS_START,
SyncError, MAX_CHANNEL_MESSAGES, MAX_CHANNEL_READS, SHOW_PROGRESS_INTERVAL_DURATION,
STATE_TRIE_SEGMENTS_END, STATE_TRIE_SEGMENTS_START,
};

/// The storage root used to indicate that the storage to be rebuilt is not complete
/// This will tell the rebuilder to skip storage root validations for this trie
/// The storage should be queued for rebuilding by the sender
pub(crate) const REBUILDER_INCOMPLETE_STORAGE_ROOT: H256 = H256::zero();

/// Max storages to rebuild in parallel
const MAX_PARALLEL_REBUILDS: usize = 15;

const MAX_SNAPSHOT_READS_WITHOUT_COMMIT: usize = 5;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the criteria for this number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trial and error mostly. The gist is that less commits make rebuild go faster but large commits can slow down other processes. I sought the highest value that would not slow down state sync.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if these values that might be ideal for Holesky, are also appropriate for the other networks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values should be the least affected by a change of network as rebuild happens in the background and only interacts with the DB, it is not really affected by peer availability/speed at which we receive data from peers, and we cannot really split it into different processes to scale to a bigger trie size.
They could possibly change along with changes/improvements to trie caches and the DB.
These are also strong candidates to become customizable via config file (such as in #2331 )


/// Represents the permanently ongoing background trie rebuild process
/// This process will be started whenever a state sync is initiated and will be
/// kept alive throughout sync cycles, only stopping once the tries are fully rebuilt or the node is stopped
Expand Down Expand Up @@ -159,10 +164,12 @@ async fn rebuild_state_trie_segment(
cancel_token: CancellationToken,
) -> Result<(H256, H256), SyncError> {
let mut state_trie = store.open_state_trie(root);
let mut snapshot_reads_since_last_commit = 0;
loop {
if cancel_token.is_cancelled() {
break;
}
snapshot_reads_since_last_commit += 1;
let mut batch = store.read_account_snapshot(start)?;
// Remove out of bounds elements
batch.retain(|(hash, _)| *hash <= STATE_TRIE_SEGMENTS_END[segment_number]);
Expand All @@ -176,7 +183,10 @@ async fn rebuild_state_trie_segment(
for (hash, account) in batch.iter() {
state_trie.insert(hash.0.to_vec(), account.encode_to_vec())?;
}
root = state_trie.hash()?;
if snapshot_reads_since_last_commit > MAX_SNAPSHOT_READS_WITHOUT_COMMIT {
snapshot_reads_since_last_commit = 0;
state_trie.hash()?;
}
// Return if we have no more snapshot accounts to process for this segemnt
if unfilled_batch {
let state_sync_complete = store
Expand All @@ -190,6 +200,7 @@ async fn rebuild_state_trie_segment(
break;
}
}
root = state_trie.hash()?;
Ok((root, start))
}

Expand Down Expand Up @@ -221,7 +232,7 @@ async fn rebuild_storage_trie_in_background(

// Spawn tasks to rebuild current storages
let mut rebuild_tasks = JoinSet::new();
for _ in 0..MAX_PARALLEL_FETCHES {
for _ in 0..MAX_PARALLEL_REBUILDS {
if pending_storages.is_empty() {
break;
}
Expand Down Expand Up @@ -253,8 +264,11 @@ async fn rebuild_storage_trie(
) -> Result<(), SyncError> {
let mut start = H256::zero();
let mut storage_trie = store.open_storage_trie(account_hash, *EMPTY_TRIE_HASH);
let mut snapshot_reads_since_last_commit = 0;
loop {
snapshot_reads_since_last_commit += 1;
let batch = store.read_storage_snapshot(account_hash, start).await?;

let unfilled_batch = batch.len() < MAX_SNAPSHOT_READS;
// Update start
if let Some(last) = batch.last() {
Expand All @@ -265,9 +279,12 @@ async fn rebuild_storage_trie(
for (key, val) in batch {
storage_trie.insert(key.0.to_vec(), val.encode_to_vec())?;
}
storage_trie.hash()?;
if snapshot_reads_since_last_commit > MAX_SNAPSHOT_READS_WITHOUT_COMMIT {
snapshot_reads_since_last_commit = 0;
storage_trie.hash()?;
}

// Return if we have no more snapshot accounts to process for this segemnt
// Return if we have no more snapshot values to process for this storage
if unfilled_batch {
break;
}
Expand Down
Loading