Skip to content

Commit f0abb23

Browse files
authored
Snap sync tx broadcaster (#4077)
**Motivation** <!-- Why does this pull request exist? What are its goals? --> **Description** <!-- A clear and concise general description of the changes this PR introduces --> <!-- Link to issues: Resolves #111, Resolves #222 --> Closes #issue_number
1 parent b9ddefa commit f0abb23

File tree

5 files changed

+176
-14
lines changed

5 files changed

+176
-14
lines changed

crates/blockchain/mempool.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use ethrex_storage::error::StoreError;
1919

2020
#[derive(Debug, Default)]
2121
pub struct Mempool {
22+
broadcast_pool: RwLock<HashSet<H256>>,
2223
transaction_pool: RwLock<HashMap<H256, MempoolTransaction>>,
2324
blobs_bundle_pool: Mutex<HashMap<H256, BlobsBundle>>,
2425
txs_by_sender_nonce: RwLock<BTreeMap<(H160, u64), H256>>,
@@ -42,10 +43,38 @@ impl Mempool {
4243
.write()
4344
.map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?
4445
.insert(hash, transaction);
46+
self.broadcast_pool
47+
.write()
48+
.map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?
49+
.insert(hash);
4550

4651
Ok(())
4752
}
4853

54+
pub fn get_txs_for_broadcast(&self) -> Result<Vec<MempoolTransaction>, StoreError> {
55+
let txs = self.transaction_pool.read()
56+
.map_err(|error| StoreError::MempoolReadLock(error.to_string()))
57+
.and_then(|pool| {
58+
Ok(pool.iter()
59+
.filter_map(|(hash, tx)| {
60+
if !self.broadcast_pool.read().ok()?.contains(hash) {
61+
None
62+
} else {
63+
Some(tx.clone())
64+
}
65+
}).collect::<Vec<_>>())
66+
})?;
67+
Ok(txs)
68+
}
69+
70+
pub fn clear_broadcasted_txs(&self) {
71+
self.broadcast_pool
72+
.write()
73+
.map_err(|error| StoreError::MempoolWriteLock(error.to_string()))
74+
.map(|mut pool| pool.clear())
75+
.unwrap_or_default();
76+
}
77+
4978
/// Add a blobs bundle to the pool by its blob transaction hash
5079
pub fn add_blobs_bundle(
5180
&self,
@@ -75,6 +104,10 @@ impl Mempool {
75104
.transaction_pool
76105
.write()
77106
.map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?;
107+
let mut broadcast_pool = self
108+
.broadcast_pool
109+
.write()
110+
.map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?;
78111
if let Some(tx) = tx_pool.get(hash) {
79112
if matches!(tx.tx_type(), TxType::EIP4844) {
80113
self.blobs_bundle_pool
@@ -88,6 +121,7 @@ impl Mempool {
88121
.map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?
89122
.remove(&(tx.sender(), tx.nonce()));
90123
tx_pool.remove(hash);
124+
broadcast_pool.remove(hash);
91125
};
92126

93127
Ok(())

crates/networking/p2p/network.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,12 @@ use crate::{
22
discv4::{
33
server::{DiscoveryServer, DiscoveryServerError},
44
side_car::{DiscoverySideCar, DiscoverySideCarError},
5-
},
6-
kademlia::Kademlia,
7-
metrics::METRICS,
8-
rlpx::{
5+
}, kademlia::Kademlia, metrics::METRICS, rlpx::{
96
connection::server::{RLPxConnBroadcastSender, RLPxConnection},
107
initiator::{RLPxInitiator, RLPxInitiatorError},
118
l2::l2_connection::P2PBasedContext,
129
message::Message,
13-
},
14-
types::{Node, NodeRecord},
10+
}, tx_broadcaster::{TxBroadcaster, TxBroadcasterError}, types::{Node, NodeRecord}
1511
};
1612
use ethrex_blockchain::Blockchain;
1713
use ethrex_storage::Store;
@@ -86,6 +82,8 @@ pub enum NetworkError {
8682
DiscoverySideCarError(#[from] DiscoverySideCarError),
8783
#[error("Failed to start RLPx Initiator: {0}")]
8884
RLPxInitiatorError(#[from] RLPxInitiatorError),
85+
#[error("Failed to start Tx Broadcaster: {0}")]
86+
TxBroadcasterError(#[from] TxBroadcasterError),
8987
}
9088

9189
pub fn peer_table() -> Kademlia {
@@ -128,6 +126,12 @@ pub async fn start_network(context: P2PContext, bootnodes: Vec<Node>) -> Result<
128126
error!("Failed to start RLPx Initiator: {e}");
129127
})?;
130128

129+
TxBroadcaster::spawn(context.table.clone(), context.blockchain.clone())
130+
.await
131+
.inspect_err(|e| {
132+
error!("Failed to start Tx Broadcaster: {e}");
133+
})?;
134+
131135
context.tracker.spawn(serve_p2p_requests(context.clone()));
132136

133137
Ok(())

crates/networking/p2p/p2p.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod sync;
99
pub mod sync_manager;
1010
pub mod types;
1111
pub mod utils;
12+
pub mod tx_broadcaster;
1213

1314
pub use network::periodically_show_peer_stats;
1415
pub use network::start_network;

crates/networking/p2p/rlpx/connection/server.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use crate::{
6969
};
7070

7171
const PING_INTERVAL: Duration = Duration::from_secs(10);
72-
const TX_BROADCAST_INTERVAL: Duration = Duration::from_millis(500);
72+
const TX_BROADCAST_INTERVAL: Duration = Duration::from_millis(20000);
7373
const BLOCK_RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
7474
// Soft limit for the number of transaction hashes sent in a single NewPooledTransactionHashes message as per [the spec](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#newpooledtransactionhashes-0x080)
7575
const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;
@@ -172,7 +172,7 @@ pub enum CastMessage {
172172
PeerMessage(Message),
173173
BackendMessage(Message),
174174
SendPing,
175-
SendNewPooledTxHashes,
175+
SendNewPooledTxHashes(Vec<MempoolTransaction>),
176176
BlockRangeUpdate,
177177
BroadcastMessage(task::Id, Arc<Message>),
178178
L2(L2Cast),
@@ -296,8 +296,8 @@ impl GenServer for RLPxConnection {
296296
Self::CastMsg::SendPing => {
297297
send(&mut established_state, Message::Ping(PingMessage {})).await
298298
}
299-
Self::CastMsg::SendNewPooledTxHashes => {
300-
send_new_pooled_tx_hashes(&mut established_state).await
299+
Self::CastMsg::SendNewPooledTxHashes(txs) => {
300+
send_new_pooled_tx_hashes(&mut established_state, txs).await
301301
}
302302
Self::CastMsg::BroadcastMessage(id, msg) => {
303303
log_peer_debug(
@@ -418,14 +418,14 @@ where
418418
log_peer_debug(&state.node, "Peer connection initialized.");
419419

420420
// Send transactions transaction hashes from mempool at connection start
421-
send_new_pooled_tx_hashes(state).await?;
421+
send_all_pooled_tx_hashes(state).await?;
422422

423423
// Periodic broadcast check repeated events.
424-
send_interval(
424+
/*send_interval(
425425
TX_BROADCAST_INTERVAL,
426426
handle.clone(),
427427
CastMessage::SendNewPooledTxHashes,
428-
);
428+
);*/
429429

430430
// Periodic Pings repeated events.
431431
send_interval(PING_INTERVAL, handle.clone(), CastMessage::SendPing);
@@ -467,7 +467,39 @@ where
467467
Ok(())
468468
}
469469

470-
async fn send_new_pooled_tx_hashes(state: &mut Established) -> Result<(), RLPxError> {
470+
async fn send_new_pooled_tx_hashes(state: &mut Established, txs: Vec<MempoolTransaction>) -> Result<(), RLPxError> {
471+
if SUPPORTED_ETH_CAPABILITIES
472+
.iter()
473+
.any(|cap| state.capabilities.contains(cap))
474+
{
475+
if !txs.is_empty() {
476+
for tx_chunk in txs.chunks(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT) {
477+
let tx_count = tx_chunk.len();
478+
let mut txs_to_send = Vec::with_capacity(tx_count);
479+
for tx in tx_chunk {
480+
txs_to_send.push((**tx).clone());
481+
state.broadcasted_txs.insert(tx.hash());
482+
}
483+
484+
send(
485+
state,
486+
Message::NewPooledTransactionHashes(NewPooledTransactionHashes::new(
487+
txs_to_send,
488+
&state.blockchain,
489+
)?),
490+
)
491+
.await?;
492+
log_peer_debug(
493+
&state.node,
494+
&format!("Sent {tx_count} transaction hashes to peer"),
495+
);
496+
}
497+
}
498+
}
499+
Ok(())
500+
}
501+
502+
async fn send_all_pooled_tx_hashes(state: &mut Established) -> Result<(), RLPxError> {
471503
if SUPPORTED_ETH_CAPABILITIES
472504
.iter()
473505
.any(|cap| state.capabilities.contains(cap))
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use ethrex_blockchain::Blockchain;
4+
use spawned_concurrency::{messages::Unused, tasks::{send_interval, CastResponse, GenServer}};
5+
use tracing::{debug, error, info};
6+
7+
use crate::{kademlia::Kademlia, rlpx::connection::server::CastMessage};
8+
9+
#[derive(Debug, Clone)]
10+
pub struct TxBroadcaster {
11+
kademlia: Kademlia,
12+
blockchain: Arc<Blockchain>,
13+
}
14+
15+
#[derive(Debug, Clone)]
16+
pub enum InMessage {
17+
BroadcastTxs
18+
}
19+
20+
#[derive(Debug, Clone)]
21+
pub enum OutMessage {
22+
Done,
23+
}
24+
25+
26+
impl TxBroadcaster {
27+
pub async fn spawn(
28+
kademlia: Kademlia,
29+
blockchain: Arc<Blockchain>,
30+
) -> Result<(), TxBroadcasterError> {
31+
info!("Starting Transaction Broadcaster");
32+
33+
let state = TxBroadcaster {
34+
kademlia,
35+
blockchain,
36+
};
37+
38+
let mut server = state.clone().start();
39+
40+
send_interval(
41+
Duration::from_secs(1),
42+
server.clone(),
43+
InMessage::BroadcastTxs,
44+
);
45+
46+
Ok(())
47+
}
48+
49+
async fn broadcast_txs(&self) -> Result<(), TxBroadcasterError> {
50+
let txs_to_broadcast = self.blockchain.mempool.get_txs_for_broadcast().map_err(|_| TxBroadcasterError::Broadcast)?;
51+
let peers = self.kademlia.get_peer_channels(&[]).await;
52+
for (peer_id, mut peer_channels) in peers {
53+
peer_channels.connection.cast(CastMessage::SendNewPooledTxHashes(
54+
txs_to_broadcast.clone(),
55+
)).await.unwrap_or_else(|err| {
56+
error!(peer_id = %format!("{:#x}", peer_id), err = ?err, "Failed to send new pooled tx hashes");
57+
});
58+
}
59+
self.blockchain.mempool.clear_broadcasted_txs();
60+
Ok(())
61+
}
62+
}
63+
64+
impl GenServer for TxBroadcaster {
65+
type CallMsg = Unused;
66+
type CastMsg = InMessage;
67+
type OutMsg = OutMessage;
68+
type Error = TxBroadcasterError;
69+
70+
async fn handle_cast(
71+
&mut self,
72+
message: Self::CastMsg,
73+
handle: &spawned_concurrency::tasks::GenServerHandle<Self>,
74+
) -> CastResponse {
75+
match message {
76+
Self::CastMsg::BroadcastTxs => {
77+
debug!(received = "BroadcastTxs");
78+
79+
self.broadcast_txs().await;
80+
81+
CastResponse::NoReply
82+
}
83+
}
84+
}
85+
}
86+
87+
#[derive(Debug, thiserror::Error)]
88+
pub enum TxBroadcasterError {
89+
#[error("Failed to broadcast transactions")]
90+
Broadcast
91+
}

0 commit comments

Comments
 (0)