Skip to content

Commit 01d81e3

Browse files
committed
Merge branch 'snap_sync' into snap_sync_fix_ci
2 parents 41c9dc8 + b4f85bb commit 01d81e3

File tree

3 files changed

+63
-5
lines changed

3 files changed

+63
-5
lines changed

cmd/ethrex/initializers.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ pub async fn init_network(
191191
signer,
192192
peer_table.clone(),
193193
store,
194-
blockchain,
194+
blockchain.clone(),
195195
get_client_version(),
196196
based_context,
197197
);
@@ -200,7 +200,10 @@ pub async fn init_network(
200200
.await
201201
.expect("Network starts");
202202

203-
tracker.spawn(ethrex_p2p::periodically_show_peer_stats());
203+
tracker.spawn(ethrex_p2p::periodically_show_peer_stats(
204+
blockchain,
205+
peer_table.peers.clone(),
206+
));
204207
}
205208

206209
#[cfg(feature = "dev")]

crates/networking/p2p/network.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,24 @@ use crate::{
33
server::{DiscoveryServer, DiscoveryServerError},
44
side_car::{DiscoverySideCar, DiscoverySideCarError},
55
},
6-
kademlia::Kademlia,
6+
kademlia::{Kademlia, PeerData},
77
metrics::METRICS,
88
rlpx::{
99
connection::server::{RLPxConnBroadcastSender, RLPxConnection},
1010
initiator::{RLPxInitiator, RLPxInitiatorError},
1111
l2::l2_connection::P2PBasedContext,
1212
message::Message,
13+
p2p::SUPPORTED_SNAP_CAPABILITIES,
1314
},
1415
tx_broadcaster::{TxBroadcaster, TxBroadcasterError},
1516
types::{Node, NodeRecord},
1617
};
1718
use ethrex_blockchain::Blockchain;
19+
use ethrex_common::H256;
1820
use ethrex_storage::Store;
1921
use secp256k1::SecretKey;
2022
use std::{
23+
collections::BTreeMap,
2124
io,
2225
net::SocketAddr,
2326
sync::Arc,
@@ -175,9 +178,20 @@ fn listener(tcp_addr: SocketAddr) -> Result<TcpListener, io::Error> {
175178
tcp_socket.listen(50)
176179
}
177180

178-
pub async fn periodically_show_peer_stats() {
181+
pub async fn periodically_show_peer_stats(
182+
blockchain: Arc<Blockchain>,
183+
peers: Arc<Mutex<BTreeMap<H256, PeerData>>>,
184+
) {
185+
periodically_show_peer_stats_during_syncing(blockchain).await;
186+
periodically_show_peer_stats_after_sync(peers).await;
187+
}
188+
189+
pub async fn periodically_show_peer_stats_during_syncing(blockchain: Arc<Blockchain>) {
179190
let start = std::time::Instant::now();
180191
loop {
192+
if blockchain.is_synced() {
193+
return;
194+
}
181195
let metrics_enabled = *METRICS.enabled.lock().await;
182196
// Show the metrics only when these are enabled
183197
if !metrics_enabled {
@@ -424,6 +438,33 @@ bytecodes progress: {bytecodes_download_progress} (total: {bytecodes_to_download
424438
}
425439
}
426440

441+
/// Shows the amount of connected peers, active peers, and peers suitable for snap sync on a set interval
442+
pub async fn periodically_show_peer_stats_after_sync(peers: Arc<Mutex<BTreeMap<H256, PeerData>>>) {
443+
const INTERVAL_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(60);
444+
let mut interval = tokio::time::interval(INTERVAL_DURATION);
445+
loop {
446+
// clone peers to keep the lock short
447+
let peers: Vec<PeerData> = peers.lock().await.values().cloned().collect();
448+
let active_peers = peers
449+
.iter()
450+
.filter(|peer| -> bool { peer.channels.as_ref().is_some() })
451+
.count();
452+
let snap_active_peers = peers
453+
.iter()
454+
.filter(|peer| -> bool {
455+
peer.channels.as_ref().is_some()
456+
&& SUPPORTED_SNAP_CAPABILITIES
457+
.iter()
458+
.any(|cap| peer.supported_capabilities.contains(cap))
459+
})
460+
.count();
461+
info!(
462+
"Snap Peers: {snap_active_peers} / Total Peers: {active_peers}"
463+
);
464+
interval.tick().await;
465+
}
466+
}
467+
427468
fn format_duration(duration: Duration) -> String {
428469
let total_seconds = duration.as_secs();
429470
let hours = total_seconds / 3600;

crates/networking/p2p/rlpx/connection/server.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use ethrex_common::{
1010
H256,
1111
types::{MempoolTransaction, Transaction},
1212
};
13-
use ethrex_storage::Store;
13+
use ethrex_storage::{Store, error::StoreError};
14+
use ethrex_trie::TrieError;
1415
use futures::{SinkExt as _, Stream, stream::SplitSink};
1516
use rand::random;
1617
use secp256k1::{PublicKey, SecretKey};
@@ -327,6 +328,19 @@ impl GenServer for RLPxConnection {
327328
);
328329
return CastResponse::Stop;
329330
}
331+
RLPxError::StoreError(StoreError::Trie(TrieError::InconsistentTree)) => {
332+
if established_state.blockchain.is_synced() {
333+
log_peer_error(
334+
&established_state.node,
335+
&format!("Error handling cast message: {e}"),
336+
);
337+
} else {
338+
log_peer_debug(
339+
&established_state.node,
340+
&format!("Error handling cast message: {e}"),
341+
);
342+
}
343+
}
330344
_ => {
331345
log_peer_warn(
332346
&established_state.node,

0 commit comments

Comments
 (0)