Skip to content

Commit 7c03219

Browse files
authored
fix(l1): validate incoming payloads even when the node is syncing. (#2426)
**Motivation** We should be able to do payload validations even when the node is in a sync process (except if it's snap sync). **Description** - Refactored some code to make it flatter - Removed early return when the node is syncing - minor renames for clarity sake.
1 parent b9cb992 commit 7c03219

File tree

9 files changed

+103
-114
lines changed

9 files changed

+103
-114
lines changed

.github/workflows/pr-main_l1.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,11 @@ jobs:
160160
ethrex_flags: ""
161161
- name: "Cancun Engine tests"
162162
simulation: ethereum/engine
163-
test_pattern: "engine-cancun/Blob Transactions On Block 1|Blob Transaction Ordering, Single|Blob Transaction Ordering, Multiple Accounts|Replace Blob Transactions|Parallel Blob Transactions|ForkchoiceUpdatedV3|ForkchoiceUpdatedV2|ForkchoiceUpdated Version|GetPayload|NewPayloadV3 After Cancun|NewPayloadV3 Before Cancun|NewPayloadV3 Versioned Hashes|Incorrect BlobGasUsed|ParentHash equals BlockHash|RPC:|in ForkchoiceState|Unknown SafeBlockHash|Unknown FinalizedBlockHash|Unique|Re-Execute Payload|Multiple New Payloads|NewPayload with|Build Payload with|Re-org to Previously|Safe Re-Org to Side Chain|Transaction Re-Org|Re-Org Back into Canonical Chain, Depth=5|Suggested Fee Recipient Test|PrevRandao Opcode|Fork ID: *|Invalid Missing Ancestor Syncing ReOrg|Request Blob Pooled Transactions"
163+
test_pattern: "engine-cancun/Blob Transactions On Block 1|Blob Transaction Ordering, Single|Blob Transaction Ordering, Multiple Accounts|Replace Blob Transactions|Parallel Blob Transactions|ForkchoiceUpdatedV3|ForkchoiceUpdatedV2|ForkchoiceUpdated Version|GetPayload|NewPayloadV3 After Cancun|NewPayloadV3 Before Cancun|NewPayloadV3 Versioned Hashes|Incorrect BlobGasUsed|ParentHash equals BlockHash|RPC:|in ForkchoiceState|Unknown SafeBlockHash|Unknown FinalizedBlockHash|Unique|Re-Execute Payload|Multiple New Payloads|NewPayload with|Build Payload with|Re-org to Previously|Safe Re-Org to Side Chain|Transaction Re-Org|Re-Org Back into Canonical Chain, Depth=5|Suggested Fee Recipient Test|PrevRandao Opcode|Fork ID: *|Invalid Missing Ancestor Syncing ReOrg|Request Blob Pooled Transactions|Invalid NewPayload, Incomplete Transactions|Invalid P9|Invalid P10|Re-Org Back to Canonical Chain*|Invalid PayloadAttributes*|Invalid NewPayload, VersionedHashes|Invalid NewPayload, Incomplete VersionedHashes|Invalid NewPayload, Extra VersionedHashes|Bad Hash on NewPayload|Unknown HeadBlockHash|In-Order Consecutive Payload Execution|Valid NewPayload->ForkchoiceUpdated|Invalid NewPayload, ParentHash|Syncing=False|Payload Build after New Invalid Payload"
164164
ethrex_flags: ""
165165
- name: "Paris Engine tests"
166166
simulation: ethereum/engine
167-
test_pattern: "engine-api/RPC|Re-org to Previously Validated Sidechain Payload|Re-Org Back into Canonical Chain, Depth=5|Safe Re-Org|Transaction Re-Org|Inconsistent|Suggested Fee|PrevRandao Opcode Transactions|Fork ID|Unknown SafeBlockHash|Unknown FinalizedBlockHash|Unique Payload ID|Re-Execute Payload|Multiple New Payloads|NewPayload with|Payload Build|Build Payload"
167+
test_pattern: "engine-api/RPC|Re-org to Previously Validated Sidechain Payload|Re-Org Back into Canonical Chain, Depth=5|Safe Re-Org|Transaction Re-Org|Inconsistent|Suggested Fee|PrevRandao Opcode Transactions|Fork ID|Unknown SafeBlockHash|Unknown FinalizedBlockHash|Unique Payload ID|Re-Execute Payload|Multiple New Payloads|NewPayload with|Payload Build|Build Payload|Invalid PayloadAttributes|Unknown HeadBlockHash|Valid NewPayload->ForkchoiceUpdated|Invalid NewPayload, ParentHash|Bad Hash on NewPayload|Invalid P9|In-Order Consecutive Payload Execution|Re-Org Back to Canonical Chain"
168168
ethrex_flags: ""
169169
- name: "Engine withdrawal tests"
170170
simulation: ethereum/engine

crates/networking/p2p/sync_manager.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::{
1818
sync::{SyncMode, Syncer},
1919
};
2020

21+
#[derive(Debug)]
2122
pub enum SyncStatus {
2223
Active(SyncMode),
2324
Inactive,
@@ -62,7 +63,7 @@ impl SyncManager {
6263
.await
6364
.is_ok_and(|res| res.is_some())
6465
{
65-
sync_manager.start_sync().await;
66+
sync_manager.start_sync();
6667
}
6768
sync_manager
6869
}
@@ -100,15 +101,17 @@ impl SyncManager {
100101
/// Attempts to sync to the last received fcu head
101102
/// Will do nothing if the syncer is already involved in a sync process
102103
/// If the sync process would require multiple sync cycles (such as snap sync), starts all required sync cycles until the sync is complete
103-
pub async fn start_sync(&self) {
104+
pub fn start_sync(&self) {
104105
let syncer = self.syncer.clone();
105106
let store = self.store.clone();
106-
let Ok(Some(current_head)) = self.store.get_latest_canonical_block_hash().await else {
107-
tracing::error!("Failed to fecth latest canonical block, unable to sync");
108-
return;
109-
};
110107
let sync_head = self.last_fcu_head.clone();
108+
111109
tokio::spawn(async move {
110+
let Ok(Some(current_head)) = store.get_latest_canonical_block_hash().await else {
111+
tracing::error!("Failed to fecth latest canonical block, unable to sync");
112+
return;
113+
};
114+
112115
// If we can't get hold of the syncer, then it means that there is an active sync in process
113116
let Ok(mut syncer) = syncer.try_lock() else {
114117
return;
@@ -147,7 +150,7 @@ impl SyncManager {
147150
}
148151

149152
/// Returns the syncer's current syncmode (either snap or full)
150-
fn sync_mode(&self) -> SyncMode {
153+
pub fn sync_mode(&self) -> SyncMode {
151154
if self.snap_enabled.load(Ordering::Relaxed) {
152155
SyncMode::Snap
153156
} else {

crates/networking/rpc/engine/fork_choice.rs

Lines changed: 43 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use ethrex_blockchain::{
55
payload::{create_payload, BuildPayloadArgs},
66
};
77
use ethrex_common::types::BlockHeader;
8-
use ethrex_p2p::sync_manager::SyncStatus;
8+
use ethrex_p2p::sync::SyncMode;
99
use serde_json::Value;
1010
use tracing::{debug, info, warn};
1111

@@ -211,63 +211,53 @@ async fn handle_forkchoice(
211211
fork_choice_state.finalized_block_hash
212212
);
213213

214-
// Update fcu head in syncer
215-
context.syncer.set_head(fork_choice_state.head_block_hash);
216-
217-
let fork_choice_res = if let Some(latest_valid_hash) = context
214+
if let Some(latest_valid_hash) = context
218215
.storage
219216
.get_latest_valid_ancestor(fork_choice_state.head_block_hash)
220217
.await?
221218
{
222-
warn!(
223-
"Invalid fork choice state. Reason: Invalid ancestor {:#x}",
224-
latest_valid_hash
225-
);
226-
Err(InvalidForkChoice::InvalidAncestor(latest_valid_hash))
227-
} else {
228-
// Check parent block hash in invalid_ancestors (if head block exists)
229-
let head_block = context
230-
.storage
231-
.get_block_header_by_hash(fork_choice_state.head_block_hash)?;
232-
let check_parent = if let Some(head_block) = head_block {
233-
debug!(
234-
"Checking parent for invalid ancestor {}",
235-
head_block.parent_hash
236-
);
237-
context
238-
.storage
239-
.get_latest_valid_ancestor(head_block.parent_hash)
240-
.await
241-
.ok()
242-
.flatten()
243-
} else {
244-
None
245-
};
219+
return Ok((
220+
None,
221+
ForkChoiceResponse::from(PayloadStatus::invalid_with(
222+
latest_valid_hash,
223+
InvalidForkChoice::InvalidAncestor(latest_valid_hash).to_string(),
224+
)),
225+
));
226+
}
246227

247-
// Check head block hash in invalid_ancestors
248-
if let Some(latest_valid_hash) = check_parent {
249-
Err(InvalidForkChoice::InvalidAncestor(latest_valid_hash))
250-
} else {
251-
// Check if there is an ongoing sync before applying the forkchoice
252-
match context.syncer.status()? {
253-
// Apply current fork choice
254-
SyncStatus::Inactive => {
255-
// All checks passed, apply fork choice
256-
apply_fork_choice(
257-
&context.storage,
258-
fork_choice_state.head_block_hash,
259-
fork_choice_state.safe_block_hash,
260-
fork_choice_state.finalized_block_hash,
261-
)
262-
.await
263-
}
264-
// Restart sync if needed
265-
_ => Err(InvalidForkChoice::Syncing),
266-
}
228+
// Check parent block hash in invalid_ancestors (if head block exists)
229+
if let Some(head_block) = context
230+
.storage
231+
.get_block_header_by_hash(fork_choice_state.head_block_hash)?
232+
{
233+
if let Some(latest_valid_hash) = context
234+
.storage
235+
.get_latest_valid_ancestor(head_block.parent_hash)
236+
.await?
237+
{
238+
return Ok((
239+
None,
240+
ForkChoiceResponse::from(PayloadStatus::invalid_with(
241+
latest_valid_hash,
242+
InvalidForkChoice::InvalidAncestor(latest_valid_hash).to_string(),
243+
)),
244+
));
267245
}
268-
};
246+
}
269247

270-
match fork_choice_res {
248+
if context.syncer.sync_mode() == SyncMode::Snap {
249+
context.syncer.set_head(fork_choice_state.head_block_hash);
250+
return Ok((None, PayloadStatus::syncing().into()));
251+
}
252+
253+
match apply_fork_choice(
254+
&context.storage,
255+
fork_choice_state.head_block_hash,
256+
fork_choice_state.safe_block_hash,
257+
fork_choice_state.finalized_block_hash,
258+
)
259+
.await
260+
{
271261
Ok(head) => {
272262
// Remove included transactions from the mempool after we accept the fork choice
273263
// TODO(#797): The remove of transactions from the mempool could be incomplete (i.e. REORGS)
@@ -318,7 +308,8 @@ async fn handle_forkchoice(
318308
.update_sync_status(false)
319309
.await
320310
.map_err(|e| RpcErr::Internal(e.to_string()))?;
321-
context.syncer.start_sync().await;
311+
context.syncer.set_head(fork_choice_state.head_block_hash);
312+
context.syncer.start_sync();
322313
ForkChoiceResponse::from(PayloadStatus::syncing())
323314
}
324315
InvalidForkChoice::Disconnected(_, _) | InvalidForkChoice::ElementNotFound(_) => {

crates/networking/rpc/engine/payload.rs

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use ethrex_common::types::payload::PayloadBundle;
44
use ethrex_common::types::requests::{compute_requests_hash, EncodedRequests};
55
use ethrex_common::types::{Block, BlockBody, BlockHash, BlockNumber, Fork};
66
use ethrex_common::{H256, U256};
7-
use ethrex_p2p::sync_manager::SyncStatus;
7+
use ethrex_p2p::sync::SyncMode;
88
use serde_json::Value;
9-
use tracing::{debug, error, info, warn};
9+
use tracing::{error, info, warn};
1010

1111
use crate::rpc::{RpcApiContext, RpcHandler};
1212
use crate::types::payload::{
@@ -34,7 +34,9 @@ impl RpcHandler for NewPayloadV1Request {
3434

3535
async fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr> {
3636
validate_execution_payload_v1(&self.payload)?;
37-
handle_new_payload_v1_v2(&self.payload, context).await
37+
let block = get_block_from_payload(&self.payload, None, None)?;
38+
let payload_status = handle_new_payload_v1_v2(&self.payload, block, context).await?;
39+
serde_json::to_value(payload_status).map_err(|error| RpcErr::Internal(error.to_string()))
3840
}
3941
}
4042

@@ -58,7 +60,9 @@ impl RpcHandler for NewPayloadV2Request {
5860
validate_execution_payload_v1(&self.payload)?;
5961
}
6062

61-
handle_new_payload_v1_v2(&self.payload, context).await
63+
let block = get_block_from_payload(&self.payload, None, None)?;
64+
let payload_status = handle_new_payload_v1_v2(&self.payload, block, context).await?;
65+
serde_json::to_value(payload_status).map_err(|error| RpcErr::Internal(error.to_string()))
6266
}
6367
}
6468

@@ -572,36 +576,29 @@ async fn validate_ancestors(
572576
Ok(None)
573577
}
574578

575-
// TODO: We need to check why we return a Result<Value, RpcErr> here instead of a Result<PayloadStatus, RpcErr> as in v3.
576579
async fn handle_new_payload_v1_v2(
577580
payload: &ExecutionPayload,
581+
block: Block,
578582
context: RpcApiContext,
579-
) -> Result<Value, RpcErr> {
580-
let block = get_block_from_payload(payload, None, None)?;
581-
582-
// Check sync status
583-
match context.syncer.status()? {
584-
SyncStatus::Active(_) => {
585-
return serde_json::to_value(PayloadStatus::syncing())
586-
.map_err(|error| RpcErr::Internal(error.to_string()));
587-
}
588-
SyncStatus::Inactive => {}
589-
}
590-
583+
) -> Result<PayloadStatus, RpcErr> {
591584
// Validate block hash
592585
if let Err(RpcErr::Internal(error_msg)) = validate_block_hash(payload, &block) {
593-
return serde_json::to_value(PayloadStatus::invalid_with_err(&error_msg))
594-
.map_err(|error| RpcErr::Internal(error.to_string()));
586+
return Ok(PayloadStatus::invalid_with_err(&error_msg));
595587
}
596588

597589
// Check for invalid ancestors
598590
if let Some(status) = validate_ancestors(&block, &context).await? {
599-
return serde_json::to_value(status).map_err(|error| RpcErr::Internal(error.to_string()));
591+
return Ok(status);
592+
}
593+
594+
if context.syncer.sync_mode() == SyncMode::Snap {
595+
warn!("Snap sync in progress, skipping new payload validation");
596+
return Ok(PayloadStatus::syncing());
600597
}
601598

602599
// All checks passed, execute payload
603-
let payload_status = execute_payload(&block, &context).await?;
604-
serde_json::to_value(payload_status).map_err(|error| RpcErr::Internal(error.to_string()))
600+
let payload_status = try_execute_payload(&block, &context).await?;
601+
Ok(payload_status)
605602
}
606603

607604
async fn handle_new_payload_v3(
@@ -610,23 +607,6 @@ async fn handle_new_payload_v3(
610607
block: Block,
611608
expected_blob_versioned_hashes: Vec<H256>,
612609
) -> Result<PayloadStatus, RpcErr> {
613-
// Ignore incoming
614-
// Check sync status
615-
match context.syncer.status()? {
616-
SyncStatus::Active(_) => return Ok(PayloadStatus::syncing()),
617-
SyncStatus::Inactive => {}
618-
}
619-
620-
// Validate block hash
621-
if let Err(RpcErr::Internal(error_msg)) = validate_block_hash(payload, &block) {
622-
return Ok(PayloadStatus::invalid_with_err(&error_msg));
623-
}
624-
625-
// Check for invalid ancestors
626-
if let Some(status) = validate_ancestors(&block, &context).await? {
627-
return Ok(status);
628-
}
629-
630610
// V3 specific: validate blob hashes
631611
let blob_versioned_hashes: Vec<H256> = block
632612
.body
@@ -641,8 +621,7 @@ async fn handle_new_payload_v3(
641621
));
642622
}
643623

644-
// All checks passed, execute payload
645-
execute_payload(&block, &context).await
624+
handle_new_payload_v1_v2(payload, block, context).await
646625
}
647626

648627
// Elements of the list MUST be ordered by request_type in ascending order.
@@ -684,11 +663,13 @@ fn validate_block_hash(payload: &ExecutionPayload, block: &Block) -> Result<(),
684663
"Invalid block hash. Expected {actual_block_hash:#x}, got {block_hash:#x}"
685664
)));
686665
}
687-
debug!("Block hash {block_hash} is valid");
688666
Ok(())
689667
}
690668

691-
async fn execute_payload(block: &Block, context: &RpcApiContext) -> Result<PayloadStatus, RpcErr> {
669+
async fn try_execute_payload(
670+
block: &Block,
671+
context: &RpcApiContext,
672+
) -> Result<PayloadStatus, RpcErr> {
692673
let block_hash = block.hash();
693674
let storage = &context.storage;
694675
// Return the valid message directly if we have it.
@@ -698,6 +679,9 @@ async fn execute_payload(block: &Block, context: &RpcApiContext) -> Result<Paylo
698679

699680
// Execute and store the block
700681
info!("Executing payload with block hash: {block_hash:#x}");
682+
683+
// TODO: this is not correct, the block being validated it no necesarily a descendant
684+
// of the latest canonical block
701685
let latest_valid_hash = context
702686
.storage
703687
.get_latest_canonical_block_hash()
@@ -707,7 +691,18 @@ async fn execute_payload(block: &Block, context: &RpcApiContext) -> Result<Paylo
707691
))?;
708692

709693
match context.blockchain.add_block(block).await {
710-
Err(ChainError::ParentNotFound) => Ok(PayloadStatus::syncing()),
694+
Err(ChainError::ParentNotFound) => {
695+
// Start sync
696+
context
697+
.storage
698+
.update_sync_status(false)
699+
.await
700+
.map_err(|e| RpcErr::Internal(e.to_string()))?;
701+
context.syncer.set_head(block_hash);
702+
context.syncer.start_sync();
703+
704+
Ok(PayloadStatus::syncing())
705+
}
711706
// Under the current implementation this is not possible: we always calculate the state
712707
// transition of any new payload as long as the parent is present. If we received the
713708
// parent payload but it was stashed, then new payload would stash this one too, with a

crates/storage/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
318318

319319
async fn is_synced(&self) -> Result<bool, StoreError>;
320320

321-
async fn update_sync_status(&self, status: bool) -> Result<(), StoreError>;
321+
async fn update_sync_status(&self, is_synced: bool) -> Result<(), StoreError>;
322322

323323
/// Write an account batch into the current state snapshot
324324
async fn write_snapshot_account_batch(

crates/storage/store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,8 +1006,8 @@ impl Store {
10061006
pub async fn is_synced(&self) -> Result<bool, StoreError> {
10071007
self.engine.is_synced().await
10081008
}
1009-
pub async fn update_sync_status(&self, status: bool) -> Result<(), StoreError> {
1010-
self.engine.update_sync_status(status).await
1009+
pub async fn update_sync_status(&self, is_synced: bool) -> Result<(), StoreError> {
1010+
self.engine.update_sync_status(is_synced).await
10111011
}
10121012

10131013
/// Write an account batch into the current state snapshot

crates/storage/store_db/in_memory.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -557,8 +557,8 @@ impl StoreEngine for Store {
557557
Ok(self.inner().chain_data.is_synced)
558558
}
559559

560-
async fn update_sync_status(&self, status: bool) -> Result<(), StoreError> {
561-
self.inner().chain_data.is_synced = status;
560+
async fn update_sync_status(&self, is_synced: bool) -> Result<(), StoreError> {
561+
self.inner().chain_data.is_synced = is_synced;
562562
Ok(())
563563
}
564564

crates/storage/store_db/libmdbx.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -768,8 +768,8 @@ impl StoreEngine for Store {
768768
}
769769
}
770770

771-
async fn update_sync_status(&self, status: bool) -> Result<(), StoreError> {
772-
self.write::<ChainData>(ChainDataIndex::IsSynced, status.encode_to_vec())
771+
async fn update_sync_status(&self, is_synced: bool) -> Result<(), StoreError> {
772+
self.write::<ChainData>(ChainDataIndex::IsSynced, is_synced.encode_to_vec())
773773
.await
774774
}
775775

0 commit comments

Comments
 (0)