From a9894d904a957a9394331b614bd9f7471bb75101 Mon Sep 17 00:00:00 2001 From: timorleph Date: Mon, 24 Mar 2025 14:17:44 +0100 Subject: [PATCH 1/2] Make initial unit collection more independent --- Cargo.lock | 2 +- consensus/Cargo.toml | 2 +- .../collection.rs => collection/mod.rs} | 215 +++++++++++------- consensus/src/dissemination/mod.rs | 42 +--- consensus/src/dissemination/responder.rs | 49 ++-- consensus/src/dissemination/task.rs | 4 +- consensus/src/lib.rs | 1 + consensus/src/member.rs | 68 ++---- consensus/src/network/unit.rs | 2 +- consensus/src/runway/mod.rs | 76 ++++--- 10 files changed, 239 insertions(+), 222 deletions(-) rename consensus/src/{runway/collection.rs => collection/mod.rs} (74%) diff --git a/Cargo.lock b/Cargo.lock index e69b163c..8d3d99d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.43.0" +version = "0.43.1" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 6bb09f37..21b4afc4 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.43.0" +version = "0.43.1" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/runway/collection.rs b/consensus/src/collection/mod.rs similarity index 74% rename from consensus/src/runway/collection.rs rename to consensus/src/collection/mod.rs index 0b48b237..c9be16e6 100644 --- a/consensus/src/runway/collection.rs +++ b/consensus/src/collection/mod.rs @@ -1,8 +1,9 @@ use crate::{ - dissemination::DisseminationRequest, + config::DelaySchedule, + dissemination::{Addressed, DisseminationMessage}, units::{UncheckedSignedUnit, Unit, ValidationError, Validator}, - Data, Hasher, Keychain, NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signable, - Signature, SignatureError, UncheckedSigned, + Data, Hasher, Keychain, NodeCount, NodeIndex, NodeMap, Receiver, Recipient, Round, Sender, + Signable, Signature, SignatureError, UncheckedSigned, }; use codec::{Decode, Encode}; use futures::{channel::oneshot, FutureExt, StreamExt}; @@ -16,6 +17,8 @@ use std::{ time::Duration, }; +const LOG_TARGET: &str = "AlephBFT-collection"; + /// Salt uniquely identifying an initial unit collection instance. pub type Salt = u64; @@ -134,19 +137,16 @@ pub struct Collection<'a, MK: Keychain> { impl<'a, MK: Keychain> Collection<'a, MK> { /// Create a new collection instance ready to collect responses. /// The returned salt should be used to initiate newest unit requests. - pub fn new(keychain: &'a MK, validator: &'a Validator) -> (Self, Salt) { + pub fn new(keychain: &'a MK, validator: &'a Validator) -> Self { let salt = generate_salt(); let mut collected_starting_rounds = NodeMap::with_size(keychain.node_count()); collected_starting_rounds.insert(keychain.index(), 0); - ( - Collection { - keychain, - validator, - collected_starting_rounds, - salt, - }, + Collection { + keychain, + validator, + collected_starting_rounds, salt, - ) + } } fn index(&self) -> NodeIndex { @@ -178,7 +178,7 @@ impl<'a, MK: Keychain> Collection<'a, MK> { .get(response.responder) .unwrap_or(&round); if current_round != round { - debug!(target: "AlephBFT-runway", "Node {} responded with starting unit {}, but now says {}", response.responder.0, current_round, round); + debug!(target: LOG_TARGET, "Node {} responded with starting unit {}, but now says {}", response.responder.0, current_round, round); } self.collected_starting_rounds .insert(response.responder, max(current_round, round)); @@ -194,6 +194,25 @@ impl<'a, MK: Keychain> Collection<'a, MK> { self.collected_starting_rounds.size().consensus_threshold() } + fn missing_responders(&self) -> Vec { + self.collected_starting_rounds + .size() + .into_iterator() + .filter(|node_id| self.collected_starting_rounds.get(*node_id).is_none()) + .map(Recipient::Node) + .collect() + } + + /// Returns a request addressed to the appropriate nodes. + pub fn prepare_request( + &self, + ) -> Addressed> { + Addressed::new( + DisseminationMessage::NewestUnitRequest(self.index(), self.salt()), + self.missing_responders(), + ) + } + /// The current status of the collection. pub fn status(&self) -> Status { use Status::*; @@ -218,8 +237,9 @@ type ResponsesFromNetwork = UncheckedSigned< pub struct IO<'a, H: Hasher, D: Data, MK: Keychain> { round_for_creator: oneshot::Sender, responses_from_network: Receiver>, - resolved_requests: Sender>, + requests_for_network: Sender>>, collection: Collection<'a, MK>, + request_delay: DelaySchedule, } impl<'a, H: Hasher, D: Data, MK: Keychain> IO<'a, H, D, MK> { @@ -227,35 +247,37 @@ impl<'a, H: Hasher, D: Data, MK: Keychain> IO<'a, H, D, MK> { pub fn new( round_for_creator: oneshot::Sender, responses_from_network: Receiver>, - resolved_requests: Sender>, + requests_for_network: Sender>>, collection: Collection<'a, MK>, + request_delay: DelaySchedule, ) -> Self { IO { round_for_creator, responses_from_network, - resolved_requests, + requests_for_network, collection, + request_delay, } } fn finish(self, round: Round) { if self.round_for_creator.send(round).is_err() { - error!(target: "AlephBFT-runway", "unable to send starting round to creator"); + error!(target: LOG_TARGET, "unable to send starting round to creator"); } - if let Err(e) = self - .resolved_requests - .unbounded_send(DisseminationRequest::NewestUnit( - self.collection.index(), - self.collection.salt(), - )) - { - warn!(target: "AlephBFT-runway", "unable to send resolved request: {}", e); - } - info!(target: "AlephBFT-runway", "Finished initial unit collection with status: {:?}", self.collection.status()); + info!(target: LOG_TARGET, "Finished initial unit collection with status: {:?}", self.collection.status()); } fn status_report(&self) { - info!(target: "AlephBFT-runway", "Initial unit collection status report: status - {:?}, collected starting rounds - {}", self.collection.status(), self.collection.collected_starting_rounds); + info!(target: LOG_TARGET, "Initial unit collection status report: status - {:?}, collected starting rounds - {}", self.collection.status(), self.collection.collected_starting_rounds); + } + + fn send_request(&self) { + if let Err(e) = self + .requests_for_network + .unbounded_send(self.collection.prepare_request()) + { + warn!(target: LOG_TARGET, "unable to send request: {}", e); + } } /// Run the initial unit collection until it sends the initial round. @@ -267,14 +289,18 @@ impl<'a, H: Hasher, D: Data, MK: Keychain> IO<'a, H, D, MK> { let status_ticker_delay = Duration::from_secs(10); let mut status_ticker = Delay::new(status_ticker_delay).fuse(); + let mut request_counter = 0; + let mut request_ticker = Delay::new((self.request_delay)(request_counter)).fuse(); + self.send_request(); + loop { futures::select! { response = self.responses_from_network.next() => { let response = match response { Some(response) => response, None => { - warn!(target: "AlephBFT-runway", "Response channel closed."); - info!(target: "AlephBFT-runway", "Finished initial unit collection with status: {:?}", self.collection.status()); + warn!(target: LOG_TARGET, "Response channel closed."); + info!(target: LOG_TARGET, "Finished initial unit collection with status: {:?}", self.collection.status()); return; } }; @@ -288,13 +314,13 @@ impl<'a, H: Hasher, D: Data, MK: Keychain> IO<'a, H, D, MK> { self.finish(round); return; }, - Err(e) => warn!(target: "AlephBFT-runway", "Received wrong newest unit response: {}", e), + Err(e) => warn!(target: LOG_TARGET, "Received wrong newest unit response: {}", e), } }, _ = catch_up_delay => match self.collection.status() { Pending => { delay_passed = true; - debug!(target: "AlephBFT-runway", "Catch up delay passed."); + debug!(target: LOG_TARGET, "Catch up delay passed."); self.status_report(); }, Ready(round) | Finished(round) => { @@ -302,6 +328,11 @@ impl<'a, H: Hasher, D: Data, MK: Keychain> IO<'a, H, D, MK> { return; }, }, + _ = &mut request_ticker => { + request_counter += 1; + request_ticker = Delay::new((self.request_delay)(request_counter)).fuse(); + self.send_request(); + }, _ = &mut status_ticker => { self.status_report(); status_ticker = Delay::new(status_ticker_delay).fuse(); @@ -315,10 +346,11 @@ impl<'a, H: Hasher, D: Data, MK: Keychain> IO<'a, H, D, MK> { mod tests { use super::{ Collection as GenericCollection, Error, NewestUnitResponse as GenericNewestUnitResponse, - Salt, Status::*, + Status::*, }; use crate::{ creation::Creator as GenericCreator, + dissemination::DisseminationMessage, units::{ FullUnit as GenericFullUnit, PreUnit as GenericPreUnit, UncheckedSignedUnit as GenericUncheckedSignedUnit, Validator as GenericValidator, @@ -347,9 +379,12 @@ mod tests { fn create_responses<'a, R: Iterator)>>( presponses: R, - salt: Salt, - requester: NodeIndex, + request: DisseminationMessage, ) -> Vec { + let (requester, salt) = match request { + DisseminationMessage::NewestUnitRequest(requester, salt) => (requester, salt), + _ => panic!("Cannot create newest unit response for a non-request."), + }; let mut result = Vec::new(); for (keychain, maybe_unit) in presponses { let response = NewestUnitResponse::new(requester, keychain.index(), maybe_unit, salt); @@ -376,67 +411,78 @@ mod tests { let max_round = 2; let keychain = Keychain::new(n_members, creator_id); let validator = Validator::new(session_id, keychain, max_round); - let (collection, _) = Collection::new(&keychain, &validator); + let collection = Collection::new(&keychain, &validator); assert_eq!(collection.status(), Pending); + assert_eq!( + collection + .prepare_request::() + .recipients() + .len(), + n_members.0 - 1 + ); } #[test] fn pending_with_too_few_messages() { let n_members = NodeCount(7); - let creator_id = NodeIndex(0); let session_id = 0; let max_round = 2; let keychains = keychain_set(n_members); let keychain = &keychains[0]; let validator = Validator::new(session_id, *keychain, max_round); - let (mut collection, salt) = Collection::new(keychain, &validator); - let responses = create_responses( - keychains.iter().skip(1).take(3).zip(repeat(None)), - salt, - creator_id, - ); + let mut collection = Collection::new(keychain, &validator); + let request = collection.prepare_request().message().clone(); + let responses = + create_responses(keychains.iter().skip(1).take(3).zip(repeat(None)), request); for response in responses { assert_eq!(collection.on_newest_response(response), Ok(Pending)); } assert_eq!(collection.status(), Pending); + assert_eq!( + collection + .prepare_request::() + .recipients() + .len(), + 3 + ); } #[test] fn pending_with_repeated_messages() { let n_members = NodeCount(7); - let creator_id = NodeIndex(0); let session_id = 0; let max_round = 2; let keychains = keychain_set(n_members); let keychain = &keychains[0]; let validator = Validator::new(session_id, *keychain, max_round); - let (mut collection, salt) = Collection::new(keychain, &validator); - let responses = create_responses( - repeat(&keychains[1]).take(43).zip(repeat(None)), - salt, - creator_id, - ); + let mut collection = Collection::new(keychain, &validator); + let request = collection.prepare_request().message().clone(); + let responses = create_responses(repeat(&keychains[1]).take(43).zip(repeat(None)), request); for response in responses { assert_eq!(collection.on_newest_response(response), Ok(Pending)); } assert_eq!(collection.status(), Pending); + assert_eq!( + collection + .prepare_request::() + .recipients() + .len(), + 5 + ); } #[test] fn ready_with_just_enough_messages() { let n_members = NodeCount(7); - let creator_id = NodeIndex(0); let session_id = 0; let max_round = 2; let keychains = keychain_set(n_members); let keychain = &keychains[0]; let validator = Validator::new(session_id, *keychain, max_round); - let (mut collection, salt) = Collection::new(keychain, &validator); - let responses = create_responses( - keychains.iter().skip(1).take(4).zip(repeat(None)), - salt, - creator_id, - ); + let mut collection = Collection::new(keychain, &validator); + let request = collection.prepare_request().message().clone(); + let responses = + create_responses(keychains.iter().skip(1).take(4).zip(repeat(None)), request); for response in responses.iter().take(3) { assert_eq!(collection.on_newest_response(response.clone()), Ok(Pending)); } @@ -445,6 +491,13 @@ mod tests { Ok(Ready(0)) ); assert_eq!(collection.status(), Ready(0)); + assert_eq!( + collection + .prepare_request::() + .recipients() + .len(), + 2 + ); } #[test] @@ -457,7 +510,8 @@ mod tests { let keychain = &keychains[0]; let creator = Creator::new(creator_id, n_members); let validator = Validator::new(session_id, *keychain, max_round); - let (mut collection, salt) = Collection::new(keychain, &validator); + let mut collection = Collection::new(keychain, &validator); + let request = collection.prepare_request().message().clone(); let preunit = creator.create_unit(0).expect("Creation should succeed."); let unit = preunit_to_unchecked_signed_unit(preunit, session_id, keychain); let responses = create_responses( @@ -465,8 +519,7 @@ mod tests { .iter() .skip(1) .zip(repeat(None).take(5).chain(once(Some(unit)))), - salt, - creator_id, + request, ); for response in responses.iter().take(3) { assert_eq!(collection.on_newest_response(response.clone()), Ok(Pending)); @@ -487,24 +540,31 @@ mod tests { #[test] fn detects_salt_mismatch() { let n_members = NodeCount(7); - let creator_id = NodeIndex(0); let session_id = 0; let max_round = 2; let keychains = keychain_set(n_members); let keychain = &keychains[0]; let validator = Validator::new(session_id, *keychain, max_round); - let (mut collection, salt) = Collection::new(keychain, &validator); - let other_salt = salt + 1; + let mut collection = Collection::new(keychain, &validator); + let request = collection + .prepare_request::() + .message() + .clone(); + let wrong_salt_request = match request { + DisseminationMessage::NewestUnitRequest(requester, salt) => { + DisseminationMessage::NewestUnitRequest(requester, salt + 1) + } + _ => unreachable!("Just created the above variant."), + }; let responses = create_responses( keychains.iter().skip(1).zip(repeat(None)), - other_salt, - creator_id, + wrong_salt_request, ); for response in responses { - assert_eq!( + assert!(matches!( collection.on_newest_response(response), - Err(Error::SaltMismatch(salt, other_salt)) - ); + Err(Error::SaltMismatch(_, _)) + )); } assert_eq!(collection.status(), Pending); } @@ -520,14 +580,11 @@ mod tests { let keychain = &keychains[0]; let creator = Creator::new(creator_id, n_members); let validator = Validator::new(session_id, *keychain, max_round); - let (mut collection, salt) = Collection::new(keychain, &validator); + let mut collection = Collection::new(keychain, &validator); + let request = collection.prepare_request().message().clone(); let preunit = creator.create_unit(0).expect("Creation should succeed."); let unit = preunit_to_unchecked_signed_unit(preunit, wrong_session_id, keychain); - let responses = create_responses( - keychains.iter().skip(1).zip(repeat(Some(unit))), - salt, - creator_id, - ); + let responses = create_responses(keychains.iter().skip(1).zip(repeat(Some(unit))), request); for response in responses { match collection.on_newest_response(response) { Err(Error::InvalidUnit(_)) => (), @@ -540,7 +597,6 @@ mod tests { #[test] fn detects_foreign_unit() { let n_members = NodeCount(7); - let creator_id = NodeIndex(0); let other_creator_id = NodeIndex(1); let session_id = 0; let max_round = 2; @@ -548,14 +604,11 @@ mod tests { let keychain = &keychains[0]; let creator = Creator::new(other_creator_id, n_members); let validator = Validator::new(session_id, *keychain, max_round); - let (mut collection, salt) = Collection::new(keychain, &validator); + let mut collection = Collection::new(keychain, &validator); + let request = collection.prepare_request().message().clone(); let preunit = creator.create_unit(0).expect("Creation should succeed."); let unit = preunit_to_unchecked_signed_unit(preunit, session_id, &keychains[1]); - let responses = create_responses( - keychains.iter().skip(1).zip(repeat(Some(unit))), - salt, - creator_id, - ); + let responses = create_responses(keychains.iter().skip(1).zip(repeat(Some(unit))), request); for response in responses { assert_eq!( collection.on_newest_response(response), diff --git a/consensus/src/dissemination/mod.rs b/consensus/src/dissemination/mod.rs index 8bc3a4a4..467aaa83 100644 --- a/consensus/src/dissemination/mod.rs +++ b/consensus/src/dissemination/mod.rs @@ -1,7 +1,7 @@ use crate::{ + collection::{NewestUnitResponse, Salt}, dag::Request as ReconstructionRequest, network::UnitMessage, - runway::{NewestUnitResponse, Salt}, units::UncheckedSignedUnit, Data, Hasher, NodeIndex, Recipient, Signature, UncheckedSigned, }; @@ -57,21 +57,6 @@ impl Addressed { } } -/// Possible requests for information from other nodes. -#[derive(Eq, PartialEq, Debug, Clone)] -pub enum DisseminationRequest { - /// A request for unit information in normal operation. - Unit(ReconstructionRequest), - /// Request for what the specified node thinks about our newest unit. - NewestUnit(NodeIndex, Salt), -} - -impl From> for DisseminationRequest { - fn from(request: ReconstructionRequest) -> Self { - DisseminationRequest::Unit(request) - } -} - /// Responses to requests. #[derive(Eq, PartialEq, Debug, Clone)] pub enum DisseminationResponse { @@ -89,9 +74,11 @@ pub enum DisseminationMessage { /// Unit, either broadcast or in response to a coord request. Unit(UncheckedSignedUnit), /// Request coming from the specified node for something. - Request(NodeIndex, DisseminationRequest), + Request(NodeIndex, ReconstructionRequest), /// Response to a parent request. ParentsResponse(H::Hash, Vec>), + /// Initial unit collection request. + NewestUnitRequest(NodeIndex, Salt), /// Response to initial unit collection. NewestUnitResponse(UncheckedSigned, S>), } @@ -104,15 +91,13 @@ impl From> match message { UnitMessage::Unit(u) => Unit(u), UnitMessage::CoordRequest(node_id, coord) => { - Request(node_id, ReconstructionRequest::Coord(coord).into()) + Request(node_id, ReconstructionRequest::Coord(coord)) } UnitMessage::ParentsRequest(node_id, hash) => { - Request(node_id, ReconstructionRequest::ParentsOf(hash).into()) + Request(node_id, ReconstructionRequest::ParentsOf(hash)) } UnitMessage::ParentsResponse(h, units) => ParentsResponse(h, units), - UnitMessage::NewestRequest(node_id, salt) => { - Request(node_id, DisseminationRequest::NewestUnit(node_id, salt)) - } + UnitMessage::NewestRequest(node_id, salt) => NewestUnitRequest(node_id, salt), UnitMessage::NewestResponse(response) => NewestUnitResponse(response), } } @@ -125,17 +110,14 @@ impl From> use DisseminationMessage::*; match message { Unit(u) => UnitMessage::Unit(u), - Request(node_id, DisseminationRequest::Unit(ReconstructionRequest::Coord(coord))) => { + Request(node_id, ReconstructionRequest::Coord(coord)) => { UnitMessage::CoordRequest(node_id, coord) } - Request( - node_id, - DisseminationRequest::Unit(ReconstructionRequest::ParentsOf(hash)), - ) => UnitMessage::ParentsRequest(node_id, hash), - ParentsResponse(h, units) => UnitMessage::ParentsResponse(h, units), - Request(node_id, DisseminationRequest::NewestUnit(_, salt)) => { - UnitMessage::NewestRequest(node_id, salt) + Request(node_id, ReconstructionRequest::ParentsOf(hash)) => { + UnitMessage::ParentsRequest(node_id, hash) } + ParentsResponse(h, units) => UnitMessage::ParentsResponse(h, units), + NewestUnitRequest(node_id, salt) => UnitMessage::NewestRequest(node_id, salt), NewestUnitResponse(response) => UnitMessage::NewestResponse(response), } } diff --git a/consensus/src/dissemination/responder.rs b/consensus/src/dissemination/responder.rs index 9f36ab93..419b66fb 100644 --- a/consensus/src/dissemination/responder.rs +++ b/consensus/src/dissemination/responder.rs @@ -1,7 +1,7 @@ use crate::{ + collection::{NewestUnitResponse, Salt}, dag::{DagUnit, Request}, - dissemination::{DisseminationRequest, DisseminationResponse}, - runway::{NewestUnitResponse, Salt}, + dissemination::DisseminationResponse, units::{UnitCoord, UnitStore, UnitWithParents, WrappedUnit}, Data, Hasher, MultiKeychain, NodeIndex, Signed, }; @@ -91,18 +91,25 @@ impl Responder { /// aren't able to help. pub fn handle_request( &self, - request: DisseminationRequest, + request: Request, units: &UnitStore>, ) -> Result, Error> { - use DisseminationRequest::*; + use Request::*; match request { - Unit(unit_request) => match unit_request { - Request::Coord(coord) => self.on_request_coord(coord, units), - Request::ParentsOf(hash) => self.on_request_parents(hash, units), - }, - NewestUnit(node_id, salt) => Ok(self.on_request_newest(node_id, salt, units)), + Coord(coord) => self.on_request_coord(coord, units), + ParentsOf(hash) => self.on_request_parents(hash, units), } } + + /// Handle an incoming request for the newest unit of a given node we know of. + pub fn handle_newest_unit_request( + &self, + requester: NodeIndex, + salt: Salt, + units: &UnitStore>, + ) -> DisseminationResponse { + self.on_request_newest(requester, salt, units) + } } #[cfg(test)] @@ -111,7 +118,7 @@ mod test { dag::Request, dissemination::{ responder::{Error, Responder}, - DisseminationRequest, DisseminationResponse, + DisseminationResponse, }, units::{ random_full_parent_reconstrusted_units_up_to, TestingDagUnit, Unit, UnitCoord, @@ -142,7 +149,7 @@ mod test { fn empty_fails_to_respond_to_coords() { let (responder, store, _) = setup(); let coord = UnitCoord::new(0, NodeIndex(1)); - let request = Request::Coord(coord).into(); + let request = Request::Coord(coord); match responder.handle_request(request, &store) { Ok(response) => panic!("Unexpected response: {:?}.", response), Err(err) => assert_eq!(err, Error::NoCanonicalAt(coord)), @@ -160,7 +167,7 @@ mod test { .last() .expect("the round has at least one unit") .hash(); - let request = Request::ParentsOf(hash).into(); + let request = Request::ParentsOf(hash); match responder.handle_request(request, &store) { Ok(response) => panic!("Unexpected response: {:?}.", response), Err(err) => assert_eq!(err, Error::UnknownUnit(hash)), @@ -171,10 +178,7 @@ mod test { fn empty_newest_responds_with_no_units() { let (responder, store, keychains) = setup(); let requester = NodeIndex(1); - let request = DisseminationRequest::NewestUnit(requester, rand::random()); - let response = responder - .handle_request(request, &store) - .expect("newest unit requests always get a response"); + let response = responder.handle_newest_unit_request(requester, rand::random(), &store); match response { DisseminationResponse::NewestUnit(newest_unit_response) => { let checked_newest_unit_response = newest_unit_response @@ -205,7 +209,7 @@ mod test { store.insert(unit.clone()); } } - let request = Request::Coord(coord).into(); + let request = Request::Coord(coord); let response = responder .handle_request(request, &store) .expect("should successfully respond"); @@ -237,7 +241,7 @@ mod test { store.insert(unit.clone()); } } - let request = Request::Coord(coord).into(); + let request = Request::Coord(coord); match responder.handle_request(request, &store) { Ok(response) => panic!("Unexpected response: {:?}.", response), Err(err) => assert_eq!(err, Error::NoCanonicalAt(coord)), @@ -261,7 +265,7 @@ mod test { .last() .expect("the round has at least one unit") .clone(); - let request = Request::ParentsOf(requested_unit.hash()).into(); + let request = Request::ParentsOf(requested_unit.hash()); let response = responder .handle_request(request, &store) .expect("should successfully respond"); @@ -295,7 +299,7 @@ mod test { .last() .expect("the round has at least one unit") .hash(); - let request = Request::ParentsOf(hash).into(); + let request = Request::ParentsOf(hash); match responder.handle_request(request, &store) { Ok(response) => panic!("Unexpected response: {:?}.", response), Err(err) => assert_eq!(err, Error::UnknownUnit(hash)), @@ -314,10 +318,7 @@ mod test { } } let requester = NodeIndex(1); - let request = DisseminationRequest::NewestUnit(requester, rand::random()); - let response = responder - .handle_request(request, &store) - .expect("newest unit requests always get a response"); + let response = responder.handle_newest_unit_request(requester, rand::random(), &store); match response { DisseminationResponse::NewestUnit(newest_unit_response) => { newest_unit_response diff --git a/consensus/src/dissemination/task.rs b/consensus/src/dissemination/task.rs index fb7e2f00..0afa5fe9 100644 --- a/consensus/src/dissemination/task.rs +++ b/consensus/src/dissemination/task.rs @@ -1,4 +1,4 @@ -use crate::{dag::Request, runway::Salt, units::UncheckedSignedUnit, Data, Hasher, Signature}; +use crate::{dag::Request, units::UncheckedSignedUnit, Data, Hasher, Signature}; /// Task that needs to be performed to ensure successful unit dissemination, either requesting or broadcasting a unit. #[derive(Eq, PartialEq, Debug, Clone)] @@ -8,6 +8,4 @@ pub enum DisseminationTask { /// Broadcast a unit. /// TODO(A0-4567): This should soon only contain the hash. Broadcast(UncheckedSignedUnit), - /// TODO(A0-4569): Only here temporarily, will soon be factored out. - RequestNewest(Salt), } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 70b9ce23..6e42c7b6 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -3,6 +3,7 @@ //! gives appropriate access to the set of available data that we need to make consensus on. mod alerts; +mod collection; mod config; mod creation; mod dag; diff --git a/consensus/src/member.rs b/consensus/src/member.rs index 88e3ef8f..78003123 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -1,6 +1,6 @@ use crate::{ dag::Request as ReconstructionRequest, - dissemination::{Addressed, DisseminationMessage, DisseminationRequest, DisseminationTask}, + dissemination::{Addressed, DisseminationMessage, DisseminationTask}, handle_task_termination, network::{Hub as NetworkHub, NetworkData, UnitMessage}, runway::{self, NetworkIO, RunwayIO, RunwayNotificationOut}, @@ -161,14 +161,12 @@ where use DisseminationTask::*; let mut count_coord_request: usize = 0; let mut count_parents_request: usize = 0; - let mut count_request_newest: usize = 0; let mut count_rebroadcast: usize = 0; for task in self.task_queue.iter().map(|st| &st.task) { match task { Request(ReconstructionRequest::Coord(_)) => count_coord_request += 1, Request(ReconstructionRequest::ParentsOf(_)) => count_parents_request += 1, Broadcast(_) => count_rebroadcast += 1, - RequestNewest(_) => count_request_newest += 1, } } let long_time_pending_tasks: Vec<_> = self @@ -180,8 +178,8 @@ where write!(f, "task queue content: ")?; write!( f, - "CoordRequest - {}, ParentsRequest - {}, UnitBroadcast - {}, RequestNewest - {}", - count_coord_request, count_parents_request, count_rebroadcast, count_request_newest, + "CoordRequest - {}, ParentsRequest - {}, UnitBroadcast - {}", + count_coord_request, count_parents_request, count_rebroadcast, )?; if !self.not_resolved_coords.is_empty() { write!( @@ -228,13 +226,12 @@ where task_queue: TaskQueue>, not_resolved_parents: HashSet, not_resolved_coords: HashSet, - newest_unit_resolved: bool, peers: Vec, unit_messages_for_network: Sender<(UnitMessage, Recipient)>, unit_messages_from_network: Receiver>, notifications_for_runway: Sender>, notifications_from_runway: Receiver>, - resolved_requests: Receiver>, + resolved_requests: Receiver>, exiting: bool, top_units: NodeMap, } @@ -251,7 +248,7 @@ where unit_messages_from_network: Receiver>, notifications_for_runway: Sender>, notifications_from_runway: Receiver>, - resolved_requests: Receiver>, + resolved_requests: Receiver>, ) -> Self { let n_members = config.n_members(); let peers = (0..n_members.0) @@ -265,7 +262,6 @@ where task_queue: TaskQueue::new(), not_resolved_parents: HashSet::new(), not_resolved_coords: HashSet::new(), - newest_unit_resolved: false, peers, unit_messages_for_network, unit_messages_from_network, @@ -325,12 +321,6 @@ where self.trigger_tasks(); } - fn on_request_newest(&mut self, salt: u64) { - self.task_queue - .schedule_now(RepeatableTask::new(DisseminationTask::RequestNewest(salt))); - self.trigger_tasks(); - } - fn trigger_tasks(&mut self) { while let Some(mut task) = self.task_queue.pop_due_task() { match self.task_details(&task.task, task.counter) { @@ -396,14 +386,9 @@ where use DisseminationTask::*; match task { Request(request) => { - DisseminationMessage::Request(self.index(), (*request).clone().into()).into() + DisseminationMessage::Request(self.index(), (*request).clone()).into() } Broadcast(unit) => DisseminationMessage::Unit(unit.clone()).into(), - RequestNewest(salt) => DisseminationMessage::Request( - self.index(), - DisseminationRequest::NewestUnit(self.index(), *salt), - ) - .into(), } } @@ -421,7 +406,6 @@ where )) } Broadcast(_) => vec![Recipient::Everyone], - RequestNewest(_) => vec![Recipient::Everyone], } } @@ -434,7 +418,6 @@ where Request(ReconstructionRequest::ParentsOf(hash)) => { self.not_resolved_parents.contains(hash) } - RequestNewest(_) => !self.newest_unit_resolved, Broadcast(unit) => { Some(&unit.as_signable().round()) == self.top_units.get(unit.as_signable().creator()) @@ -464,24 +447,20 @@ where Request(ReconstructionRequest::ParentsOf(_)) => { (self.config.delay_config().parent_request_delay)(counter) } - RequestNewest(_) => (self.config.delay_config().newest_request_delay)(counter), } } fn on_unit_message_from_units(&mut self, message: RunwayNotificationOut) { + use ReconstructionRequest::*; + use RunwayNotificationOut::*; match message { - RunwayNotificationOut::NewSelfUnit(u) => self.on_create(u), - RunwayNotificationOut::NewAnyUnit(u) => self.on_unit_discovered(u), - RunwayNotificationOut::Request(request) => match request { - DisseminationRequest::Unit(ReconstructionRequest::Coord(coord)) => { - self.on_request_coord(coord) - } - DisseminationRequest::Unit(ReconstructionRequest::ParentsOf(u_hash)) => { - self.on_request_parents(u_hash) - } - DisseminationRequest::NewestUnit(_, salt) => self.on_request_newest(salt), + NewSelfUnit(u) => self.on_create(u), + NewAnyUnit(u) => self.on_unit_discovered(u), + Request(request) => match request { + Coord(coord) => self.on_request_coord(coord), + ParentsOf(u_hash) => self.on_request_parents(u_hash), }, - RunwayNotificationOut::Message(message) => self.send_unit_message(message.into()), + Message(message) => self.send_unit_message(message.into()), } } @@ -514,15 +493,12 @@ where event = self.resolved_requests.next() => match event { Some(request) => match request { - DisseminationRequest::Unit(ReconstructionRequest::Coord(coord)) => { + ReconstructionRequest::Coord(coord) => { self.not_resolved_coords.remove(&coord); }, - DisseminationRequest::Unit(ReconstructionRequest::ParentsOf(u_hash)) => { + ReconstructionRequest::ParentsOf(u_hash) => { self.not_resolved_parents.remove(&u_hash); }, - DisseminationRequest::NewestUnit(..) => { - self.newest_unit_resolved = true; - } }, None => { error!(target: "AlephBFT-member", "{:?} Resolved-requests stream from Runway closed.", self.index()); @@ -785,18 +761,6 @@ mod tests { assert_eq!(delay, Duration::from_millis(133)); } - #[test] - fn delay_for_newest_request() { - let mut delay_config = gen_delay_config(); - delay_config.newest_request_delay = Arc::new(|t| Duration::from_millis(123 + t as u64)); - - let member = mock_member(NodeIndex(7), NodeCount(20), delay_config); - - let delay = member.delay(&DisseminationTask::RequestNewest(12345), 10); - - assert_eq!(delay, Duration::from_millis(133)); - } - #[test] fn recipients_for_coord_request() { let node_ix = NodeIndex(7); diff --git a/consensus/src/network/unit.rs b/consensus/src/network/unit.rs index d226e693..594cbf2e 100644 --- a/consensus/src/network/unit.rs +++ b/consensus/src/network/unit.rs @@ -1,5 +1,5 @@ use crate::{ - runway::NewestUnitResponse, + collection::NewestUnitResponse, units::{UncheckedSignedUnit, UnitCoord}, Data, Hasher, NodeIndex, Signature, UncheckedSigned, }; diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 355fdb06..6ae47575 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -1,8 +1,10 @@ use crate::{ alerts::{Alert, ForkingNotification, NetworkMessage}, + collection::{Collection, NewestUnitResponse, IO as CollectionIO}, + config::DelaySchedule, creation, dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest}, - dissemination::{Addressed, DisseminationMessage, DisseminationRequest, Responder}, + dissemination::{Addressed, DisseminationMessage, Responder}, extension::Ordering, handle_task_termination, units::{ @@ -29,19 +31,14 @@ use std::{ time::Duration, }; -mod collection; - use crate::backup::{BackupLoader, BackupSaver}; -#[cfg(feature = "initial_unit_collection")] -use collection::{Collection, IO as CollectionIO}; -pub use collection::{NewestUnitResponse, Salt}; pub(crate) enum RunwayNotificationOut { /// A new unit was generated by this runway NewSelfUnit(UncheckedSignedUnit), /// A new unit was generated by this runway or imported from outside and added to the DAG NewAnyUnit(UncheckedSignedUnit), - Request(DisseminationRequest), + Request(ReconstructionRequest), /// An addressed message that should just be send over the network. /// TODO(A0-4382): This is the only part of this type that should eventually exist. Message(Addressed>), @@ -52,6 +49,14 @@ type CollectionResponse = UncheckedSigned< ::Signature, >; +type AddressedDisseminationMessage = Addressed< + DisseminationMessage< + ::Hasher, + ::Data, + ::Signature, + >, +>; + struct Runway where FH: UnitFinalizationHandler, @@ -69,7 +74,8 @@ where unit_messages_from_network: Receiver>, unit_messages_for_network: Sender>, responses_for_collection: Sender>, - resolved_requests: Sender>, + messages_from_collection: Receiver>, + resolved_requests: Sender>, parents_for_creator: Sender>, backup_units_for_saver: Sender>, backup_units_from_saver: Receiver>, @@ -170,8 +176,9 @@ struct RunwayConfig { Receiver>, unit_messages_for_network: Sender>, responses_for_collection: Sender>, + messages_from_collection: Receiver>, parents_for_creator: Sender>, - resolved_requests: Sender>, + resolved_requests: Sender>, new_units_from_creation: Receiver>, } @@ -200,6 +207,7 @@ where unit_messages_from_network, unit_messages_for_network, responses_for_collection, + messages_from_collection, parents_for_creator, resolved_requests, new_units_from_creation, @@ -225,6 +233,7 @@ where backup_units_for_saver, backup_units_from_saver, responses_for_collection, + messages_from_collection, new_units_from_creation, exiting: false, } @@ -288,6 +297,14 @@ where trace!(target: "AlephBFT-runway", "{:?} Response parents received {:?}.", self.index(), u_hash); self.on_parents_response(u_hash, parents) } + NewestUnitRequest(node_id, salt) => { + let response = + self.responder + .handle_newest_unit_request(node_id, salt, &self.store); + self.send_message_for_network(RunwayNotificationOut::Message( + Addressed::addressed_to(response, node_id).into(), + )) + } NewestUnitResponse(response) => { trace!(target: "AlephBFT-runway", "{:?} Response newest unit received from {:?}.", self.index(), response.index()); let res = self.responses_for_collection.unbounded_send(response); @@ -300,7 +317,7 @@ where fn resolve_missing_coord(&mut self, coord: &UnitCoord) { if self.missing_coords.remove(coord) { - self.send_resolved_request_notification(ReconstructionRequest::Coord(*coord).into()); + self.send_resolved_request_notification(ReconstructionRequest::Coord(*coord)); } } @@ -329,9 +346,7 @@ where fn resolve_missing_parents(&mut self, u_hash: &::Hash) { if self.missing_parents.remove(u_hash) { - self.send_resolved_request_notification( - ReconstructionRequest::ParentsOf(*u_hash).into(), - ); + self.send_resolved_request_notification(ReconstructionRequest::ParentsOf(*u_hash)); } } @@ -351,7 +366,7 @@ where } }; if perform_request { - self.send_message_for_network(RunwayNotificationOut::Request(request.into())); + self.send_message_for_network(RunwayNotificationOut::Request(request)); } } @@ -405,7 +420,7 @@ where fn send_resolved_request_notification( &mut self, - notification: DisseminationRequest, + notification: ReconstructionRequest, ) { if self.resolved_requests.unbounded_send(notification).is_err() { warn!(target: "AlephBFT-runway", "{:?} resolved_requests channel should be open", self.index()); @@ -487,6 +502,13 @@ where } }, + message = self.messages_from_collection.next() => { + match message { + Some(message) => self.send_message_for_network(RunwayNotificationOut::Message(message)), + None => debug!("Messages from collection ended."), + } + }, + _ = &mut status_ticker => { self.status_report(); status_ticker = Delay::new(status_ticker_delay).fuse(); @@ -514,32 +536,26 @@ pub(crate) struct NetworkIO { pub(crate) alert_messages_from_network: Receiver>, pub(crate) unit_messages_for_network: Sender>, pub(crate) unit_messages_from_network: Receiver>, - pub(crate) resolved_requests: Sender>, + pub(crate) resolved_requests: Sender>, } #[cfg(feature = "initial_unit_collection")] fn initial_unit_collection<'a, H: Hasher, D: Data, MK: MultiKeychain>( keychain: &'a MK, validator: &'a Validator, - unit_messages_for_network: &Sender>, + messages_for_network: Sender>>, unit_collection_sender: oneshot::Sender, responses_from_runway: Receiver>, - resolved_requests: Sender>, + request_delay: DelaySchedule, ) -> Result + 'a, ()> { - let (collection, salt) = Collection::new(keychain, validator); - let notification = - RunwayNotificationOut::Request(DisseminationRequest::NewestUnit(keychain.index(), salt)); - - if let Err(e) = unit_messages_for_network.unbounded_send(notification) { - error!(target: "AlephBFT-runway", "Unable to send the newest unit request: {}", e); - return Err(()); - }; + let collection = Collection::new(keychain, validator); let collection = CollectionIO::new( unit_collection_sender, responses_from_runway, - resolved_requests, + messages_for_network, collection, + request_delay, ); Ok(collection.run()) } @@ -717,14 +733,15 @@ pub(crate) async fn run( .fuse(); pin_mut!(backup_loading_handle); + let (messages_for_us, messages_from_collection) = mpsc::unbounded(); #[cfg(feature = "initial_unit_collection")] let starting_round_handle = match initial_unit_collection( &keychain, &validator, - &network_io.unit_messages_for_network, + messages_for_us, unit_collections_sender, responses_from_runway, - network_io.resolved_requests.clone(), + config.delay_config().newest_request_delay.clone(), ) { Ok(handle) => handle.fuse(), Err(_) => return, @@ -748,6 +765,7 @@ pub(crate) async fn run( unit_messages_for_network: network_io.unit_messages_for_network, parents_for_creator, responses_for_collection, + messages_from_collection, resolved_requests: network_io.resolved_requests, new_units_from_creation, }; From 4b1b3fe60cec877f69a7428798d42c9212639f0c Mon Sep 17 00:00:00 2001 From: timorleph Date: Tue, 25 Mar 2025 16:55:03 +0100 Subject: [PATCH 2/2] Clearer complement calculation --- Cargo.lock | 10 +++++----- README.md | 2 +- consensus/Cargo.toml | 6 +++--- consensus/src/collection/mod.rs | 6 +++--- crypto/Cargo.toml | 2 +- crypto/src/node.rs | 6 ++++++ mock/Cargo.toml | 4 ++-- rmc/Cargo.toml | 6 +++--- types/Cargo.toml | 4 ++-- 9 files changed, 26 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d3d99d6..ad91034e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.43.1" +version = "0.44.0" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", @@ -51,7 +51,7 @@ dependencies = [ [[package]] name = "aleph-bft-crypto" -version = "0.9.1" +version = "0.10.0" dependencies = [ "async-trait", "bit-vec", @@ -102,7 +102,7 @@ dependencies = [ [[package]] name = "aleph-bft-mock" -version = "0.16.0" +version = "0.17.0" dependencies = [ "aleph-bft-types", "async-trait", @@ -116,7 +116,7 @@ dependencies = [ [[package]] name = "aleph-bft-rmc" -version = "0.14.0" +version = "0.15.0" dependencies = [ "aleph-bft-crypto", "aleph-bft-mock", @@ -132,7 +132,7 @@ dependencies = [ [[package]] name = "aleph-bft-types" -version = "0.14.0" +version = "0.15.0" dependencies = [ "aleph-bft-crypto", "async-trait", diff --git a/README.md b/README.md index a4d3cec0..ca0093db 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details]. - Import AlephBFT in your crate ```toml [dependencies] - aleph-bft = "^0.43" + aleph-bft = "^0.44" ``` - The main entry point is the `run_session` function, which returns a Future that runs the consensus algorithm. diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 21b4afc4..6a5f93ce 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.43.1" +version = "0.44.0" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] @@ -13,8 +13,8 @@ readme = "../README.md" description = "AlephBFT is an asynchronous and Byzantine fault tolerant consensus protocol aimed at ordering arbitrary messages (transactions). It has been designed to continuously operate even in the harshest conditions: with no bounds on message-delivery delays and in the presence of malicious actors. This makes it an excellent fit for blockchain-related applications." [dependencies] -aleph-bft-rmc = { path = "../rmc", version = "0.14" } -aleph-bft-types = { path = "../types", version = "0.14" } +aleph-bft-rmc = { path = "../rmc", version = "0.15" } +aleph-bft-types = { path = "../types", version = "0.15" } anyhow = "1.0" async-trait = "0.1" codec = { package = "parity-scale-codec", version = "3.0", default-features = false, features = ["derive"] } diff --git a/consensus/src/collection/mod.rs b/consensus/src/collection/mod.rs index c9be16e6..caecde88 100644 --- a/consensus/src/collection/mod.rs +++ b/consensus/src/collection/mod.rs @@ -196,9 +196,9 @@ impl<'a, MK: Keychain> Collection<'a, MK> { fn missing_responders(&self) -> Vec { self.collected_starting_rounds - .size() - .into_iterator() - .filter(|node_id| self.collected_starting_rounds.get(*node_id).is_none()) + .to_subset() + .complement() + .elements() .map(Recipient::Node) .collect() } diff --git a/crypto/Cargo.toml b/crypto/Cargo.toml index 41f1e80f..f36c59f1 100644 --- a/crypto/Cargo.toml +++ b/crypto/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft-crypto" -version = "0.9.1" +version = "0.10.0" edition = "2021" authors = ["Cardinal Cryptography"] documentation = "https://docs.rs/?" diff --git a/crypto/src/node.rs b/crypto/src/node.rs index 677e64c7..794d8927 100644 --- a/crypto/src/node.rs +++ b/crypto/src/node.rs @@ -237,6 +237,12 @@ impl NodeSubset { .filter_map(|(i, b)| if b { Some(i.into()) } else { None }) } + pub fn complement(&self) -> NodeSubset { + let mut result = self.0.clone(); + result.negate(); + NodeSubset(result) + } + pub fn len(&self) -> usize { self.elements().count() } diff --git a/mock/Cargo.toml b/mock/Cargo.toml index 574fa9ab..14bad586 100644 --- a/mock/Cargo.toml +++ b/mock/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft-mock" -version = "0.16.0" +version = "0.17.0" edition = "2021" authors = ["Cardinal Cryptography"] documentation = "https://docs.rs/?" @@ -11,7 +11,7 @@ readme = "./README.md" description = "Mock implementations of traits required by the aleph-bft package. Do NOT use outside of testing!" [dependencies] -aleph-bft-types = { path = "../types", version = "0.14" } +aleph-bft-types = { path = "../types", version = "0.15" } async-trait = "0.1" codec = { package = "parity-scale-codec", version = "3.0", default-features = false, features = ["derive"] } futures = "0.3" diff --git a/rmc/Cargo.toml b/rmc/Cargo.toml index ba17756c..c686bdc9 100644 --- a/rmc/Cargo.toml +++ b/rmc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft-rmc" -version = "0.14.0" +version = "0.15.0" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "cryptography"] @@ -13,8 +13,8 @@ readme = "./README.md" description = "Reliable MultiCast - a primitive for Reliable Broadcast protocol." [dependencies] -aleph-bft-crypto = { path = "../crypto", version = "0.9" } -aleph-bft-types = { path = "../types", version = "0.14" } +aleph-bft-crypto = { path = "../crypto", version = "0.10" } +aleph-bft-types = { path = "../types", version = "0.15" } async-trait = "0.1" codec = { package = "parity-scale-codec", version = "3.0", default-features = false, features = ["derive"] } futures = "0.3" diff --git a/types/Cargo.toml b/types/Cargo.toml index 42e897bc..0202436f 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft-types" -version = "0.14.0" +version = "0.15.0" edition = "2021" authors = ["Cardinal Cryptography"] documentation = "https://docs.rs/?" @@ -11,7 +11,7 @@ readme = "./README.md" description = "Traits that need to be implemented by the user of the aleph-bft package." [dependencies] -aleph-bft-crypto = { path = "../crypto", version = "0.9" } +aleph-bft-crypto = { path = "../crypto", version = "0.10" } async-trait = "0.1" codec = { package = "parity-scale-codec", version = "3.0", default-features = false, features = ["derive"] } futures = "0.3"