From d2f2916ad8fdcc10ae9e7409337fa24c6a07be18 Mon Sep 17 00:00:00 2001 From: timorleph Date: Wed, 29 Jan 2025 16:56:44 +0100 Subject: [PATCH] Censorship resistance test --- Cargo.lock | 4 +- consensus/Cargo.toml | 2 +- consensus/src/testing/behind.rs | 129 ++++++++++++++++++++++++ consensus/src/testing/byzantine.rs | 23 +++-- consensus/src/testing/crash.rs | 23 +++-- consensus/src/testing/crash_recovery.rs | 24 ++++- consensus/src/testing/mod.rs | 3 +- consensus/src/testing/unreliable.rs | 28 +++-- mock/Cargo.toml | 2 +- mock/src/dataio.rs | 9 +- mock/src/lib.rs | 1 + mock/src/network.rs | 66 ++++++++---- 12 files changed, 256 insertions(+), 58 deletions(-) create mode 100644 consensus/src/testing/behind.rs diff --git a/Cargo.lock b/Cargo.lock index d13a5a83..b32baf4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.42.0" +version = "0.42.1" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", @@ -102,7 +102,7 @@ dependencies = [ [[package]] name = "aleph-bft-mock" -version = "0.15.0" +version = "0.16.0" dependencies = [ "aleph-bft-types", "async-trait", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index e8122373..6278a071 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.42.0" +version = "0.42.1" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/testing/behind.rs b/consensus/src/testing/behind.rs new file mode 100644 index 00000000..11b0b99a --- /dev/null +++ b/consensus/src/testing/behind.rs @@ -0,0 +1,129 @@ +use std::{ + collections::{HashSet, VecDeque}, + time::{Duration, Instant}, +}; + +use crate::{ + testing::{init_log, spawn_honest_member, HonestMember, NetworkData}, + NodeCount, NodeIndex, SpawnHandle, +}; +use aleph_bft_mock::{DataProvider, NetworkHook, Router, Spawner}; +use futures::StreamExt; +use log::info; + +struct Latency { + who: NodeIndex, + buffer: VecDeque<(Instant, (NetworkData, NodeIndex, NodeIndex))>, +} + +const LATENCY: Duration = Duration::from_millis(300); + +impl Latency { + pub fn new(who: NodeIndex) -> Self { + Latency { + who, + buffer: VecDeque::new(), + } + } + + fn add_message( + &mut self, + data: NetworkData, + sender: NodeIndex, + recipient: NodeIndex, + ) -> Vec<(NetworkData, NodeIndex, NodeIndex)> { + match sender == self.who || recipient == self.who { + true => { + self.buffer + .push_back((Instant::now(), (data, sender, recipient))); + Vec::new() + } + false => vec![(data, sender, recipient)], + } + } + + fn messages_to_send(&mut self) -> Vec<(NetworkData, NodeIndex, NodeIndex)> { + let mut result = Vec::new(); + while !self.buffer.is_empty() { + let (when, msg) = self + .buffer + .pop_front() + .expect("just checked it is not empty"); + if Instant::now().duration_since(when) < LATENCY { + self.buffer.push_front((when, msg)); + break; + } + result.push(msg); + } + result + } +} + +impl NetworkHook for Latency { + fn process_message( + &mut self, + data: NetworkData, + sender: NodeIndex, + recipient: NodeIndex, + ) -> Vec<(NetworkData, NodeIndex, NodeIndex)> { + let mut result = self.add_message(data, sender, recipient); + result.append(&mut self.messages_to_send()); + result + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn delayed_finalized() { + let n_members = NodeCount(7); + let australian = NodeIndex(0); + init_log(); + let spawner = Spawner::new(); + let mut batch_rxs = Vec::new(); + let mut exits = Vec::new(); + let mut handles = Vec::new(); + let (mut net_hub, networks) = Router::new(n_members); + + net_hub.add_hook(Latency::new(australian)); + + spawner.spawn("network-hub", net_hub); + + for (network, _) in networks { + let ix = network.index(); + let HonestMember { + finalization_rx, + exit_tx, + handle, + .. + } = spawn_honest_member( + spawner, + ix, + n_members, + vec![], + DataProvider::new_range(ix.0 * 50, (ix.0 + 1) * 50), + network, + ); + batch_rxs.push(finalization_rx); + exits.push(exit_tx); + handles.push(handle); + } + let to_finalize: HashSet = (0..((n_members.0) * 50)) + .map(|number| number as u32) + .collect(); + + for mut rx in batch_rxs.drain(..) { + let mut to_finalize_local = to_finalize.clone(); + while !to_finalize_local.is_empty() { + let number = rx.next().await.unwrap(); + info!("finalizing {}", number); + assert!(to_finalize_local.remove(&number)); + } + info!("finished one node"); + } + + for exit in exits { + let _ = exit.send(()); + } + for handle in handles { + let _ = handle.await; + } +} diff --git a/consensus/src/testing/byzantine.rs b/consensus/src/testing/byzantine.rs index aa4abe74..93515afc 100644 --- a/consensus/src/testing/byzantine.rs +++ b/consensus/src/testing/byzantine.rs @@ -6,7 +6,9 @@ use crate::{ Hasher, Network as NetworkT, NetworkData as NetworkDataT, NodeCount, NodeIndex, NodeMap, Recipient, Round, SessionId, Signed, SpawnHandle, TaskHandle, }; -use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain, NetworkHook, Router, Spawner}; +use aleph_bft_mock::{ + Data, DataProvider, Hash64, Hasher64, Keychain, NetworkHook, Router, Spawner, +}; use futures::{channel::oneshot, StreamExt}; use log::{debug, error, trace}; use parking_lot::Mutex; @@ -230,7 +232,12 @@ impl AlertHook { } impl NetworkHook for AlertHook { - fn update_state(&mut self, data: &mut NetworkData, sender: NodeIndex, recipient: NodeIndex) { + fn process_message( + &mut self, + data: NetworkData, + sender: NodeIndex, + recipient: NodeIndex, + ) -> Vec<(NetworkData, NodeIndex, NodeIndex)> { use crate::{alerts::AlertMessage::*, network::NetworkDataInner::*}; if let crate::NetworkData(Alert(ForkAlert(_))) = data { *self @@ -239,6 +246,7 @@ impl NetworkHook for AlertHook { .entry((sender, recipient)) .or_insert(0) += 1; } + vec![(data, sender, recipient)] } } @@ -246,14 +254,13 @@ async fn honest_members_agree_on_batches_byzantine( n_members: NodeCount, n_honest: NodeCount, n_batches: usize, - network_reliability: f64, ) { init_log(); let spawner = Spawner::new(); let mut batch_rxs = Vec::new(); let mut exits = Vec::new(); let mut handles = Vec::new(); - let (mut net_hub, networks) = Router::new(n_members, network_reliability); + let (mut net_hub, networks) = Router::new(n_members); let alert_hook = AlertHook::new(); net_hub.add_hook(alert_hook.clone()); @@ -270,7 +277,7 @@ async fn honest_members_agree_on_batches_byzantine( exit_tx, handle, .. - } = spawn_honest_member(spawner, ix, n_members, vec![], network); + } = spawn_honest_member(spawner, ix, n_members, vec![], DataProvider::new(), network); batch_rxs.push(finalization_rx); (exit_tx, handle) }; @@ -317,17 +324,17 @@ async fn honest_members_agree_on_batches_byzantine( #[tokio::test(flavor = "multi_thread")] #[serial] async fn small_byzantine_one_forker() { - honest_members_agree_on_batches_byzantine(4.into(), 3.into(), 5, 1.0).await; + honest_members_agree_on_batches_byzantine(4.into(), 3.into(), 5).await; } #[tokio::test(flavor = "multi_thread")] #[serial] async fn small_byzantine_two_forkers() { - honest_members_agree_on_batches_byzantine(7.into(), 5.into(), 5, 1.0).await; + honest_members_agree_on_batches_byzantine(7.into(), 5.into(), 5).await; } #[tokio::test(flavor = "multi_thread")] #[serial] async fn medium_byzantine_ten_forkers() { - honest_members_agree_on_batches_byzantine(31.into(), 21.into(), 5, 1.0).await; + honest_members_agree_on_batches_byzantine(31.into(), 21.into(), 5).await; } diff --git a/consensus/src/testing/crash.rs b/consensus/src/testing/crash.rs index 26d2f3eb..da7b7b08 100644 --- a/consensus/src/testing/crash.rs +++ b/consensus/src/testing/crash.rs @@ -2,7 +2,7 @@ use crate::{ testing::{init_log, spawn_honest_member, HonestMember}, NodeCount, SpawnHandle, }; -use aleph_bft_mock::{Router, Spawner}; +use aleph_bft_mock::{DataProvider, Router, Spawner, UnreliableHook}; use futures::StreamExt; use serial_test::serial; @@ -10,14 +10,17 @@ async fn honest_members_agree_on_batches( n_members: NodeCount, n_alive: NodeCount, n_batches: usize, - network_reliability: f64, + network_reliability: Option, ) { init_log(); let spawner = Spawner::new(); let mut exits = Vec::new(); let mut handles = Vec::new(); let mut batch_rxs = Vec::new(); - let (net_hub, networks) = Router::new(n_members, network_reliability); + let (mut net_hub, networks) = Router::new(n_members); + if let Some(reliability) = network_reliability { + net_hub.add_hook(UnreliableHook::new(reliability)); + } spawner.spawn("network-hub", net_hub); for (network, _) in networks { @@ -28,7 +31,7 @@ async fn honest_members_agree_on_batches( exit_tx, handle, .. - } = spawn_honest_member(spawner, ix, n_members, vec![], network); + } = spawn_honest_member(spawner, ix, n_members, vec![], DataProvider::new(), network); batch_rxs.push(finalization_rx); exits.push(exit_tx); handles.push(handle); @@ -59,35 +62,35 @@ async fn honest_members_agree_on_batches( #[tokio::test(flavor = "multi_thread")] #[serial] async fn small_honest_all_alive() { - honest_members_agree_on_batches(4.into(), 4.into(), 5, 1.0).await; + honest_members_agree_on_batches(4.into(), 4.into(), 5, None).await; } #[tokio::test(flavor = "multi_thread")] #[serial] async fn small_honest_one_crash() { - honest_members_agree_on_batches(4.into(), 3.into(), 5, 1.0).await; + honest_members_agree_on_batches(4.into(), 3.into(), 5, None).await; } #[tokio::test(flavor = "multi_thread")] #[serial] async fn small_honest_one_crash_unreliable_network() { - honest_members_agree_on_batches(4.into(), 3.into(), 5, 0.9).await; + honest_members_agree_on_batches(4.into(), 3.into(), 5, Some(0.9)).await; } #[tokio::test(flavor = "multi_thread")] #[serial] async fn medium_honest_all_alive() { - honest_members_agree_on_batches(31.into(), 31.into(), 5, 1.0).await; + honest_members_agree_on_batches(31.into(), 31.into(), 5, None).await; } #[tokio::test(flavor = "multi_thread")] #[serial] async fn medium_honest_ten_crashes() { - honest_members_agree_on_batches(31.into(), 21.into(), 5, 1.0).await; + honest_members_agree_on_batches(31.into(), 21.into(), 5, None).await; } #[tokio::test(flavor = "multi_thread")] #[serial] async fn medium_honest_ten_crashes_unreliable_network() { - honest_members_agree_on_batches(31.into(), 21.into(), 5, 0.9).await; + honest_members_agree_on_batches(31.into(), 21.into(), 5, Some(0.9)).await; } diff --git a/consensus/src/testing/crash_recovery.rs b/consensus/src/testing/crash_recovery.rs index 740dae04..ba21d4a7 100644 --- a/consensus/src/testing/crash_recovery.rs +++ b/consensus/src/testing/crash_recovery.rs @@ -3,7 +3,7 @@ use crate::{ units::{UncheckedSignedUnit, Unit, UnitCoord}, NodeCount, NodeIndex, SpawnHandle, TaskHandle, }; -use aleph_bft_mock::{Data, Hasher64, Router, Signature, Spawner}; +use aleph_bft_mock::{Data, DataProvider, Hasher64, Router, Signature, Spawner}; use codec::Decode; use futures::{ channel::{mpsc, oneshot}, @@ -67,7 +67,14 @@ fn connect_nodes( saved_state, exit_tx, handle, - } = spawn_honest_member(*spawner, ix, n_members, vec![], network); + } = spawn_honest_member( + *spawner, + ix, + n_members, + vec![], + DataProvider::new(), + network, + ); ( ix, NodeData { @@ -109,7 +116,14 @@ async fn reconnect_nodes( saved_state, exit_tx, handle, - } = spawn_honest_member(*spawner, *node_id, n_members, saved_units.clone(), network); + } = spawn_honest_member( + *spawner, + *node_id, + n_members, + saved_units.clone(), + DataProvider::new(), + network, + ); reconnected_nodes.push(( *node_id, NodeData { @@ -166,7 +180,7 @@ async fn crashed_nodes_recover(n_members: NodeCount, n_batches: usize) { let n_kill = (n_members - n_members.consensus_threshold()) + 1.into(); let spawner = Spawner::new(); - let (net_hub, networks) = Router::new(n_members, 1.0); + let (net_hub, networks) = Router::new(n_members); spawner.spawn("network-hub", net_hub); let mut node_data = connect_nodes(&spawner, n_members, networks); @@ -239,7 +253,7 @@ async fn saves_units_properly() { let n_batches = 2; let n_members = NodeCount(4); let spawner = Spawner::new(); - let (net_hub, networks) = Router::new(n_members, 1.0); + let (net_hub, networks) = Router::new(n_members); spawner.spawn("network-hub", net_hub); let mut node_data = connect_nodes(&spawner, n_members, networks); diff --git a/consensus/src/testing/mod.rs b/consensus/src/testing/mod.rs index 64cd8c04..14f17106 100644 --- a/consensus/src/testing/mod.rs +++ b/consensus/src/testing/mod.rs @@ -1,4 +1,5 @@ mod alerts; +mod behind; mod byzantine; mod crash; mod crash_recovery; @@ -67,9 +68,9 @@ pub fn spawn_honest_member( node_index: NodeIndex, n_members: NodeCount, units: Vec, + data_provider: DataProvider, network: impl 'static + NetworkT, ) -> HonestMember { - let data_provider = DataProvider::new(); let (finalization_handler, finalization_rx) = FinalizationHandler::new(); let config = gen_config(node_index, n_members, gen_delay_config()); let (exit_tx, exit_rx) = oneshot::channel(); diff --git a/consensus/src/testing/unreliable.rs b/consensus/src/testing/unreliable.rs index 7935f10d..431e514e 100644 --- a/consensus/src/testing/unreliable.rs +++ b/consensus/src/testing/unreliable.rs @@ -5,7 +5,7 @@ use crate::{ units::Unit, Index, NodeCount, NodeIndex, Round, Signed, SpawnHandle, }; -use aleph_bft_mock::{BadSigning, Keychain, NetworkHook, Router, Spawner}; +use aleph_bft_mock::{BadSigning, DataProvider, Keychain, NetworkHook, Router, Spawner}; use futures::StreamExt; use parking_lot::Mutex; use std::sync::Arc; @@ -18,11 +18,16 @@ struct CorruptPacket { } impl NetworkHook for CorruptPacket { - fn update_state(&mut self, data: &mut NetworkData, sender: NodeIndex, recipient: NodeIndex) { + fn process_message( + &mut self, + mut data: NetworkData, + sender: NodeIndex, + recipient: NodeIndex, + ) -> Vec<(NetworkData, NodeIndex, NodeIndex)> { if self.recipient != recipient || self.sender != sender { - return; + return vec![(data, sender, recipient)]; } - if let crate::NetworkData(NetworkDataInner::Units(UnitMessage::NewUnit(us))) = data { + if let crate::NetworkData(NetworkDataInner::Units(UnitMessage::NewUnit(us))) = &mut data { let full_unit = us.clone().into_signable(); let index = full_unit.index(); if full_unit.round() == self.round && full_unit.creator() == self.creator { @@ -30,6 +35,7 @@ impl NetworkHook for CorruptPacket { *us = Signed::sign(full_unit, &bad_keychain).into(); } } + vec![(data, sender, recipient)] } } @@ -41,16 +47,22 @@ struct NoteRequest { } impl NetworkHook for NoteRequest { - fn update_state(&mut self, data: &mut NetworkData, sender: NodeIndex, _: NodeIndex) { + fn process_message( + &mut self, + data: NetworkData, + sender: NodeIndex, + recipient: NodeIndex, + ) -> Vec<(NetworkData, NodeIndex, NodeIndex)> { use NetworkDataInner::Units; use UnitMessage::RequestCoord; if sender == self.sender { - if let crate::NetworkData(Units(RequestCoord(_, co))) = data { + if let crate::NetworkData(Units(RequestCoord(_, co))) = &data { if co.round() == self.round && co.creator() == self.creator { *self.requested.lock() = true; } } } + vec![(data, sender, recipient)] } } @@ -63,7 +75,7 @@ async fn request_missing_coord() { let censoring_node = NodeIndex(1); let censoring_round = 5; - let (mut net_hub, networks) = Router::new(n_members, 1.0); + let (mut net_hub, networks) = Router::new(n_members); net_hub.add_hook(CorruptPacket { recipient: censored_node, sender: censoring_node, @@ -90,7 +102,7 @@ async fn request_missing_coord() { exit_tx, handle, .. - } = spawn_honest_member(spawner, ix, n_members, vec![], network); + } = spawn_honest_member(spawner, ix, n_members, vec![], DataProvider::new(), network); batch_rxs.push(finalization_rx); exits.push(exit_tx); handles.push(handle); diff --git a/mock/Cargo.toml b/mock/Cargo.toml index d24f0812..574fa9ab 100644 --- a/mock/Cargo.toml +++ b/mock/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft-mock" -version = "0.15.0" +version = "0.16.0" edition = "2021" authors = ["Cardinal Cryptography"] documentation = "https://docs.rs/?" diff --git a/mock/src/dataio.rs b/mock/src/dataio.rs index b06f0354..d3791fe4 100644 --- a/mock/src/dataio.rs +++ b/mock/src/dataio.rs @@ -36,6 +36,12 @@ impl DataProvider { n_data: Some(n_data), } } + pub fn new_range(start: usize, end: usize) -> Self { + Self { + counter: start, + n_data: Some(end), + } + } } #[async_trait] @@ -43,13 +49,14 @@ impl DataProviderT for DataProvider { type Output = Data; async fn get_data(&mut self) -> Option { + let result = self.counter as u32; self.counter += 1; if let Some(n_data) = self.n_data { if n_data < self.counter { return None; } } - Some(self.counter as u32) + Some(result) } } diff --git a/mock/src/lib.rs b/mock/src/lib.rs index 2406b39a..8a6e767a 100644 --- a/mock/src/lib.rs +++ b/mock/src/lib.rs @@ -11,5 +11,6 @@ pub use dataio::{Data, DataProvider, FinalizationHandler, Loader, Saver, Stalled pub use hasher::{Hash64, Hasher64}; pub use network::{ Network, NetworkHook, NetworkReceiver, NetworkSender, Peer, ReconnectSender, Router, + UnreliableHook, }; pub use spawner::Spawner; diff --git a/mock/src/network.rs b/mock/src/network.rs index 22bd5f30..6f450736 100644 --- a/mock/src/network.rs +++ b/mock/src/network.rs @@ -80,7 +80,40 @@ pub struct Peer { } pub trait NetworkHook: Send { - fn update_state(&mut self, data: &mut D, sender: NodeIndex, recipient: NodeIndex); + fn process_message( + &mut self, + data: D, + sender: NodeIndex, + recipient: NodeIndex, + ) -> Vec<(D, NodeIndex, NodeIndex)>; +} + +pub struct UnreliableHook { + reliability: f64, +} + +impl UnreliableHook { + // reliability - a number in the range [0, 1], 1.0 means perfect reliability, 0.0 means no message gets through + pub fn new(reliability: f64) -> Self { + UnreliableHook { reliability } + } +} + +impl NetworkHook for UnreliableHook { + fn process_message( + &mut self, + data: D, + sender: NodeIndex, + recipient: NodeIndex, + ) -> Vec<(D, NodeIndex, NodeIndex)> { + let rand_sample = rand::random::(); + if rand_sample > self.reliability { + debug!("Simulated network fail."); + Vec::new() + } else { + vec![(data, sender, recipient)] + } + } } type ReconnectReceiver = UnboundedReceiver<(NodeIndex, oneshot::Sender>)>; @@ -91,7 +124,6 @@ pub struct Router { peer_list: Vec, hook_list: RefCell>>>, peer_reconnect_rx: ReconnectReceiver, - reliability: f64, } impl Debug for Router { @@ -99,7 +131,6 @@ impl Debug for Router { f.debug_struct("Router") .field("peers", &self.peer_list) .field("hook count", &self.hook_list.borrow().len()) - .field("reliability", &self.reliability) .finish() } } @@ -107,8 +138,7 @@ impl Debug for Router { type RouterWithNetworks = (Router, Vec<(Network, ReconnectSender)>); impl Router { - // reliability - a number in the range [0, 1], 1.0 means perfect reliability, 0.0 means no message gets through - pub fn new(n_members: NodeCount, reliability: f64) -> RouterWithNetworks { + pub fn new(n_members: NodeCount) -> RouterWithNetworks { let peer_list = n_members.into_iterator().collect(); let (reconnect_tx, peer_reconnect_rx) = unbounded(); let mut router = Router { @@ -116,7 +146,6 @@ impl Router { peer_list, hook_list: RefCell::new(Vec::new()), peer_reconnect_rx, - reliability, }; let mut networks = Vec::new(); for ix in n_members.into_iterator() { @@ -152,10 +181,6 @@ impl Router { pub fn peer_list(&self) -> Vec { self.peer_list.clone() } - - pub fn reliability(&self) -> f64 { - self.reliability - } } impl Future for Router { @@ -168,8 +193,8 @@ impl Future for Router { loop { // this call is responsible for waking this Future match peer.rx.poll_next_unpin(cx) { - Poll::Ready(Some(msg)) => { - buffer.push((*peer_id, msg)); + Poll::Ready(Some((data, recipient))) => { + buffer.push((data, *peer_id, recipient)); } Poll::Ready(None) => { disconnected_peers.push(*peer_id); @@ -200,17 +225,16 @@ impl Future for Router { } } } - for (sender, (mut data, recipient)) in buffer { - let rand_sample = rand::random::(); - if rand_sample > this.reliability { - debug!("Simulated network fail."); - continue; + let mut new_buffer = Vec::new(); + for hook in this.hook_list.borrow_mut().iter_mut() { + for (data, sender, recipient) in buffer { + new_buffer.append(&mut hook.process_message(data, sender, recipient)); } - + buffer = new_buffer; + new_buffer = Vec::new(); + } + for (data, sender, recipient) in buffer { if let Some(peer) = this.peers.borrow().get(&recipient) { - for hook in this.hook_list.borrow_mut().iter_mut() { - hook.update_state(&mut data, sender, recipient) - } peer.tx.unbounded_send((data, sender)).ok(); } }