Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.42.0"
version = "0.42.1"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
129 changes: 129 additions & 0 deletions consensus/src/testing/behind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use std::{
collections::{HashSet, VecDeque},
time::{Duration, Instant},
};

use crate::{
testing::{init_log, spawn_honest_member, HonestMember, NetworkData},
NodeCount, NodeIndex, SpawnHandle,
};
use aleph_bft_mock::{DataProvider, NetworkHook, Router, Spawner};
use futures::StreamExt;
use log::info;

struct Latency {
who: NodeIndex,
buffer: VecDeque<(Instant, (NetworkData, NodeIndex, NodeIndex))>,
}

const LATENCY: Duration = Duration::from_millis(300);

impl Latency {
pub fn new(who: NodeIndex) -> Self {
Latency {
who,
buffer: VecDeque::new(),
}
}

fn add_message(
&mut self,
data: NetworkData,
sender: NodeIndex,
recipient: NodeIndex,
) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
match sender == self.who || recipient == self.who {
true => {
self.buffer
.push_back((Instant::now(), (data, sender, recipient)));
Vec::new()
}
false => vec![(data, sender, recipient)],
}
}

fn messages_to_send(&mut self) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
let mut result = Vec::new();
while !self.buffer.is_empty() {
let (when, msg) = self
.buffer
.pop_front()
.expect("just checked it is not empty");
if Instant::now().duration_since(when) < LATENCY {
self.buffer.push_front((when, msg));
break;
}
result.push(msg);
}
result
}
}

impl NetworkHook<NetworkData> for Latency {
fn process_message(
&mut self,
data: NetworkData,
sender: NodeIndex,
recipient: NodeIndex,
) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
let mut result = self.add_message(data, sender, recipient);
result.append(&mut self.messages_to_send());
result
}
}

#[tokio::test(flavor = "multi_thread")]
async fn delayed_finalized() {
let n_members = NodeCount(7);
let australian = NodeIndex(0);
init_log();
let spawner = Spawner::new();
let mut batch_rxs = Vec::new();
let mut exits = Vec::new();
let mut handles = Vec::new();
let (mut net_hub, networks) = Router::new(n_members);

net_hub.add_hook(Latency::new(australian));

spawner.spawn("network-hub", net_hub);

for (network, _) in networks {
let ix = network.index();
let HonestMember {
finalization_rx,
exit_tx,
handle,
..
} = spawn_honest_member(
spawner,
ix,
n_members,
vec![],
DataProvider::new_range(ix.0 * 50, (ix.0 + 1) * 50),
network,
);
batch_rxs.push(finalization_rx);
exits.push(exit_tx);
handles.push(handle);
}
let to_finalize: HashSet<u32> = (0..((n_members.0) * 50))
.map(|number| number as u32)
.collect();

for mut rx in batch_rxs.drain(..) {
let mut to_finalize_local = to_finalize.clone();
while !to_finalize_local.is_empty() {
let number = rx.next().await.unwrap();
info!("finalizing {}", number);
assert!(to_finalize_local.remove(&number));
}
info!("finished one node");
}

for exit in exits {
let _ = exit.send(());
}
for handle in handles {
let _ = handle.await;
}
}
23 changes: 15 additions & 8 deletions consensus/src/testing/byzantine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::{
Hasher, Network as NetworkT, NetworkData as NetworkDataT, NodeCount, NodeIndex, NodeMap,
Recipient, Round, SessionId, Signed, SpawnHandle, TaskHandle,
};
use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain, NetworkHook, Router, Spawner};
use aleph_bft_mock::{
Data, DataProvider, Hash64, Hasher64, Keychain, NetworkHook, Router, Spawner,
};
use futures::{channel::oneshot, StreamExt};
use log::{debug, error, trace};
use parking_lot::Mutex;
Expand Down Expand Up @@ -230,7 +232,12 @@ impl AlertHook {
}

impl NetworkHook<NetworkData> for AlertHook {
fn update_state(&mut self, data: &mut NetworkData, sender: NodeIndex, recipient: NodeIndex) {
fn process_message(
&mut self,
data: NetworkData,
sender: NodeIndex,
recipient: NodeIndex,
) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
use crate::{alerts::AlertMessage::*, network::NetworkDataInner::*};
if let crate::NetworkData(Alert(ForkAlert(_))) = data {
*self
Expand All @@ -239,21 +246,21 @@ impl NetworkHook<NetworkData> for AlertHook {
.entry((sender, recipient))
.or_insert(0) += 1;
}
vec![(data, sender, recipient)]
}
}

async fn honest_members_agree_on_batches_byzantine(
n_members: NodeCount,
n_honest: NodeCount,
n_batches: usize,
network_reliability: f64,
) {
init_log();
let spawner = Spawner::new();
let mut batch_rxs = Vec::new();
let mut exits = Vec::new();
let mut handles = Vec::new();
let (mut net_hub, networks) = Router::new(n_members, network_reliability);
let (mut net_hub, networks) = Router::new(n_members);

let alert_hook = AlertHook::new();
net_hub.add_hook(alert_hook.clone());
Expand All @@ -270,7 +277,7 @@ async fn honest_members_agree_on_batches_byzantine(
exit_tx,
handle,
..
} = spawn_honest_member(spawner, ix, n_members, vec![], network);
} = spawn_honest_member(spawner, ix, n_members, vec![], DataProvider::new(), network);
batch_rxs.push(finalization_rx);
(exit_tx, handle)
};
Expand Down Expand Up @@ -317,17 +324,17 @@ async fn honest_members_agree_on_batches_byzantine(
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_byzantine_one_forker() {
honest_members_agree_on_batches_byzantine(4.into(), 3.into(), 5, 1.0).await;
honest_members_agree_on_batches_byzantine(4.into(), 3.into(), 5).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_byzantine_two_forkers() {
honest_members_agree_on_batches_byzantine(7.into(), 5.into(), 5, 1.0).await;
honest_members_agree_on_batches_byzantine(7.into(), 5.into(), 5).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn medium_byzantine_ten_forkers() {
honest_members_agree_on_batches_byzantine(31.into(), 21.into(), 5, 1.0).await;
honest_members_agree_on_batches_byzantine(31.into(), 21.into(), 5).await;
}
23 changes: 13 additions & 10 deletions consensus/src/testing/crash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ use crate::{
testing::{init_log, spawn_honest_member, HonestMember},
NodeCount, SpawnHandle,
};
use aleph_bft_mock::{Router, Spawner};
use aleph_bft_mock::{DataProvider, Router, Spawner, UnreliableHook};
use futures::StreamExt;
use serial_test::serial;

async fn honest_members_agree_on_batches(
n_members: NodeCount,
n_alive: NodeCount,
n_batches: usize,
network_reliability: f64,
network_reliability: Option<f64>,
) {
init_log();
let spawner = Spawner::new();
let mut exits = Vec::new();
let mut handles = Vec::new();
let mut batch_rxs = Vec::new();
let (net_hub, networks) = Router::new(n_members, network_reliability);
let (mut net_hub, networks) = Router::new(n_members);
if let Some(reliability) = network_reliability {
net_hub.add_hook(UnreliableHook::new(reliability));
}
spawner.spawn("network-hub", net_hub);

for (network, _) in networks {
Expand All @@ -28,7 +31,7 @@ async fn honest_members_agree_on_batches(
exit_tx,
handle,
..
} = spawn_honest_member(spawner, ix, n_members, vec![], network);
} = spawn_honest_member(spawner, ix, n_members, vec![], DataProvider::new(), network);
batch_rxs.push(finalization_rx);
exits.push(exit_tx);
handles.push(handle);
Expand Down Expand Up @@ -59,35 +62,35 @@ async fn honest_members_agree_on_batches(
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_honest_all_alive() {
honest_members_agree_on_batches(4.into(), 4.into(), 5, 1.0).await;
honest_members_agree_on_batches(4.into(), 4.into(), 5, None).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_honest_one_crash() {
honest_members_agree_on_batches(4.into(), 3.into(), 5, 1.0).await;
honest_members_agree_on_batches(4.into(), 3.into(), 5, None).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn small_honest_one_crash_unreliable_network() {
honest_members_agree_on_batches(4.into(), 3.into(), 5, 0.9).await;
honest_members_agree_on_batches(4.into(), 3.into(), 5, Some(0.9)).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn medium_honest_all_alive() {
honest_members_agree_on_batches(31.into(), 31.into(), 5, 1.0).await;
honest_members_agree_on_batches(31.into(), 31.into(), 5, None).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn medium_honest_ten_crashes() {
honest_members_agree_on_batches(31.into(), 21.into(), 5, 1.0).await;
honest_members_agree_on_batches(31.into(), 21.into(), 5, None).await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn medium_honest_ten_crashes_unreliable_network() {
honest_members_agree_on_batches(31.into(), 21.into(), 5, 0.9).await;
honest_members_agree_on_batches(31.into(), 21.into(), 5, Some(0.9)).await;
}
24 changes: 19 additions & 5 deletions consensus/src/testing/crash_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
units::{UncheckedSignedUnit, Unit, UnitCoord},
NodeCount, NodeIndex, SpawnHandle, TaskHandle,
};
use aleph_bft_mock::{Data, Hasher64, Router, Signature, Spawner};
use aleph_bft_mock::{Data, DataProvider, Hasher64, Router, Signature, Spawner};
use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -67,7 +67,14 @@ fn connect_nodes(
saved_state,
exit_tx,
handle,
} = spawn_honest_member(*spawner, ix, n_members, vec![], network);
} = spawn_honest_member(
*spawner,
ix,
n_members,
vec![],
DataProvider::new(),
network,
);
(
ix,
NodeData {
Expand Down Expand Up @@ -109,7 +116,14 @@ async fn reconnect_nodes(
saved_state,
exit_tx,
handle,
} = spawn_honest_member(*spawner, *node_id, n_members, saved_units.clone(), network);
} = spawn_honest_member(
*spawner,
*node_id,
n_members,
saved_units.clone(),
DataProvider::new(),
network,
);
reconnected_nodes.push((
*node_id,
NodeData {
Expand Down Expand Up @@ -166,7 +180,7 @@ async fn crashed_nodes_recover(n_members: NodeCount, n_batches: usize) {

let n_kill = (n_members - n_members.consensus_threshold()) + 1.into();
let spawner = Spawner::new();
let (net_hub, networks) = Router::new(n_members, 1.0);
let (net_hub, networks) = Router::new(n_members);
spawner.spawn("network-hub", net_hub);

let mut node_data = connect_nodes(&spawner, n_members, networks);
Expand Down Expand Up @@ -239,7 +253,7 @@ async fn saves_units_properly() {
let n_batches = 2;
let n_members = NodeCount(4);
let spawner = Spawner::new();
let (net_hub, networks) = Router::new(n_members, 1.0);
let (net_hub, networks) = Router::new(n_members);
spawner.spawn("network-hub", net_hub);

let mut node_data = connect_nodes(&spawner, n_members, networks);
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod alerts;
mod behind;
mod byzantine;
mod crash;
mod crash_recovery;
Expand Down Expand Up @@ -67,9 +68,9 @@ pub fn spawn_honest_member(
node_index: NodeIndex,
n_members: NodeCount,
units: Vec<u8>,
data_provider: DataProvider,
network: impl 'static + NetworkT<NetworkData>,
) -> HonestMember {
let data_provider = DataProvider::new();
let (finalization_handler, finalization_rx) = FinalizationHandler::new();
let config = gen_config(node_index, n_members, gen_delay_config());
let (exit_tx, exit_rx) = oneshot::channel();
Expand Down
Loading
Loading