diff --git a/crates/contracts/staking/src/state.rs b/crates/contracts/staking/src/state.rs index 2e4ae191a..c0383e755 100644 --- a/crates/contracts/staking/src/state.rs +++ b/crates/contracts/staking/src/state.rs @@ -138,10 +138,17 @@ impl Staking { Ok(res) } - pub fn check_reliability(&self, validators: &[ValidatorPublicKey]) -> CertificateReliability { - let f = self.compute_f(); + pub fn validators_reliability<'a>( + &self, + validators: impl Iterator, + ) -> CertificateReliability { let power = self.compute_voting_power(validators); + self.power_reliability(power) + } + + pub fn power_reliability(&self, power: u128) -> CertificateReliability { + let f = self.compute_f(); if power < f + 1 { return CertificateReliability::None; } else if power < 2 * f + 1 { @@ -158,9 +165,12 @@ impl Staking { self.total_bond().div_euclid(3) } - pub fn compute_voting_power(&self, validators: &[ValidatorPublicKey]) -> u128 { + pub fn compute_voting_power<'a>( + &self, + validators: impl Iterator, + ) -> u128 { // Deduplicate validators before computing voting power - let mut unique_validators = validators.to_vec(); + let mut unique_validators: Vec<&'a ValidatorPublicKey> = validators.collect(); unique_validators.sort(); unique_validators.dedup(); unique_validators diff --git a/src/consensus.rs b/src/consensus.rs index 313eaa144..44a77544c 100644 --- a/src/consensus.rs +++ b/src/consensus.rs @@ -31,7 +31,7 @@ use role_follower::FollowerState; use role_leader::LeaderState; use role_timeout::TimeoutRoleState; use serde::{Deserialize, Serialize}; -use staking::state::{Staking, MIN_STAKE}; +use staking::state::{CertificateReliability, Staking, MIN_STAKE}; use std::ops::Deref; use std::ops::DerefMut; use std::time::Duration; @@ -397,31 +397,28 @@ impl Consensus { // This helpfully ignores any signatures that would not be actually part of the consensus // since those would have voting power 0. // TODO: should we reject such messages? - let voting_power = self + let reliability = self .bft_round_state .staking - .compute_voting_power(quorum_certificate.validators.as_slice()); - - let f = self.bft_round_state.staking.compute_f(); + .validators_reliability(quorum_certificate.validators.iter()); trace!( - "📩 Slot {} validated votes: {} / {} ({} validators for a total bond = {})", + "📩 Slot {} reliability: {reliability:?} ({} validators for a total bond = {})", self.bft_round_state.slot, - voting_power, - 2 * f + 1, self.bft_round_state.staking.bonded().len(), self.bft_round_state.staking.total_bond() ); // Verify enough validators signed - if voting_power < 2 * f + 1 { + if reliability >= CertificateReliability::Reliable { + Ok(()) + } else { bail!( - "Quorum Certificate does not contain enough voting power ({} < {})", - voting_power, - 2 * f + 1 + "Quorum Certificate does not contain enough voting power ({:?} < {:?})", + reliability, + CertificateReliability::Reliable ); } - Ok(()) } /// Connect to all validators & ask to be part of consensus diff --git a/src/consensus/role_follower.rs b/src/consensus/role_follower.rs index f2ba27d1e..cff7e8708 100644 --- a/src/consensus/role_follower.rs +++ b/src/consensus/role_follower.rs @@ -1,5 +1,6 @@ use anyhow::{bail, Context, Result}; use borsh::{BorshDeserialize, BorshSerialize}; +use staking::state::CertificateReliability; use std::collections::BTreeMap; use tracing::{debug, info, trace, warn}; @@ -343,8 +344,6 @@ impl Consensus { /// - Each DataProposal associated with a validator must have received sufficient signatures. /// - The aggregated signatures for each DataProposal must be valid. fn verify_poda(&mut self, consensus_proposal: &ConsensusProposal) -> Result<()> { - let f = self.bft_round_state.staking.compute_f(); - trace!( "verify poda with staking: {:#?}", self.bft_round_state.staking @@ -362,10 +361,10 @@ impl Consensus { } for (lane_id, data_proposal_hash, lane_size, poda_sig) in &consensus_proposal.cut { - let voting_power = self + let reliability = self .bft_round_state .staking - .compute_voting_power(poda_sig.validators.as_slice()); + .validators_reliability(poda_sig.validators.iter()); // Check that this is a known lane. // TODO: this prevents ever deleting lane which may or may not be desirable. @@ -408,10 +407,10 @@ impl Consensus { } trace!("consensus_proposal: {:#?}", consensus_proposal); - trace!("voting_power: {voting_power} < {f} + 1"); + trace!("votes reliability: {reliability:?}"); // Verify that DataProposal received enough votes - if voting_power < f + 1 { + if reliability < CertificateReliability::Weak { bail!( "PoDA for lane {lane_id} does not have enough validators that signed his DataProposal" ); diff --git a/src/consensus/role_leader.rs b/src/consensus/role_leader.rs index 2d9fea8b9..7d7779bfb 100644 --- a/src/consensus/role_leader.rs +++ b/src/consensus/role_leader.rs @@ -6,10 +6,10 @@ use crate::{ bus::command_response::CmdRespClient, consensus::{role_follower::follower_state, *}, mempool::QueryNewCut, - model::{Hashed, ValidatorPublicKey}, + model::Hashed, }; use hyle_model::{utils::TimestampMs, ConsensusProposal, ConsensusStakingAction}; -use staking::state::MIN_STAKE; +use staking::state::{CertificateReliability, MIN_STAKE}; use tokio::sync::broadcast; use tracing::{debug, error, trace}; @@ -280,33 +280,25 @@ impl Consensus { .insert(prepare_vote); // Get matching vote count - let validated_votes = self - .bft_round_state - .leader - .prepare_votes - .iter() - .map(|signed_message| signed_message.signature.validator.clone()) - .collect::>(); - let votes_power = self - .bft_round_state - .staking - .compute_voting_power(&validated_votes); - let voting_power = votes_power + self.get_own_voting_power(); - - // Waits for at least n-f = 2f+1 matching PrepareVote messages - let f = self.bft_round_state.staking.compute_f(); + let reliability = self.bft_round_state.staking.validators_reliability( + self.bft_round_state + .leader + .prepare_votes + .iter() + .map(|signed_message| &signed_message.signature.validator) + .chain(std::iter::once(self.crypto.validator_pubkey())), + ); debug!( - "📩 Slot {} validated votes: {} / {} ({} validators for a total bond = {})", + "📩 Slot {} reliability {:?} ({} validators for a total bond = {})", self.bft_round_state.slot, - voting_power, - 2 * f + 1, + reliability, self.bft_round_state.staking.bonded().len(), self.bft_round_state.staking.total_bond() ); - if voting_power > 2 * f { + if reliability >= CertificateReliability::Reliable { // Get all received signatures let aggregates: &Vec<&PrepareVote> = &self.bft_round_state.leader.prepare_votes.iter().collect(); @@ -386,33 +378,25 @@ impl Consensus { return Ok(()); } - // Compute voting power so far and hope for >= 2f+1 - let confirmed_ack_validators = self - .bft_round_state - .leader - .confirm_ack - .iter() - .map(|signed_message| signed_message.signature.validator.clone()) - .collect::>(); - - let confirmed_power = self - .bft_round_state - .staking - .compute_voting_power(&confirmed_ack_validators); - let voting_power = confirmed_power + self.get_own_voting_power(); - - let f = self.bft_round_state.staking.compute_f(); + // Check certificate is reliable (>= 2f+1) + let reliability = self.bft_round_state.staking.validators_reliability( + self.bft_round_state + .leader + .confirm_ack + .iter() + .map(|signed_message| &signed_message.signature.validator) + .chain(std::iter::once(self.crypto.validator_pubkey())), + ); debug!( - "✅ Slot {} confirmed acks: {} / {} ({} validators for a total bond = {})", + "✅ Slot {} reliability {:?} ({} validators for a total bond = {})", self.bft_round_state.slot, - voting_power, - 2 * f + 1, + reliability, self.bft_round_state.staking.bonded().len(), self.bft_round_state.staking.total_bond() ); - if voting_power > 2 * f { + if reliability >= CertificateReliability::Reliable { // Get all signatures received and change ValidatorPublicKey for ValidatorPubKey let aggregates: &Vec<&ConfirmAck> = &self.bft_round_state.leader.confirm_ack.iter().collect(); diff --git a/src/consensus/role_timeout.rs b/src/consensus/role_timeout.rs index cbf9cc5c7..43bde649a 100644 --- a/src/consensus/role_timeout.rs +++ b/src/consensus/role_timeout.rs @@ -1,5 +1,6 @@ use anyhow::{bail, Context, Result}; use borsh::{BorshDeserialize, BorshSerialize}; +use staking::state::CertificateReliability; use std::{collections::HashSet, time::Duration}; use tracing::{debug, info, trace, warn}; @@ -270,8 +271,6 @@ impl Consensus { return Ok(()); } - let f = self.bft_round_state.staking.compute_f(); - // At this point we must select both NIL and QC timeouts. let (mut relevant_timeout_messages, mut tc_kinds) = self .store @@ -292,12 +291,13 @@ impl Consensus { let mut len = relevant_timeout_messages.len(); + let f = self.bft_round_state.staking.compute_f(); // TODO: rework function to avoid cloning let mut voting_power = self.store.bft_round_state.staking.compute_voting_power( - &relevant_timeout_messages + (&relevant_timeout_messages) .iter() - .map(|s| s.signature.validator.clone()) - .collect::>(), + .map(|s| &s.signature.validator) + .filter(|v| v != &self.crypto.validator_pubkey()), ); info!( @@ -305,10 +305,8 @@ impl Consensus { ); // Count requests and if f+1 requests, and not already part of it, join the mutiny - if voting_power > f - && !relevant_timeout_messages - .iter() - .any(|s| &s.signature.validator == self.crypto.validator_pubkey()) + if self.bft_round_state.staking.power_reliability(voting_power) + >= CertificateReliability::Weak { info!("Joining timeout mutiny!"); @@ -361,7 +359,8 @@ impl Consensus { } // Create TC if applicable - if voting_power > 2 * f + if self.bft_round_state.staking.power_reliability(voting_power) + >= CertificateReliability::Reliable && !matches!( self.bft_round_state.timeout.state, TimeoutState::CertificateEmitted diff --git a/src/mempool/own_lane.rs b/src/mempool/own_lane.rs index d72fe2f95..98bfc4855 100644 --- a/src/mempool/own_lane.rs +++ b/src/mempool/own_lane.rs @@ -42,26 +42,21 @@ impl super::Mempool { .add_signatures(&lane_id, &data_proposal_hash, std::iter::once(vdag))?; // Compute voting power of all signers to check if the DataProposal received enough votes - let validators: Vec = signatures - .iter() - .map(|s| s.signature.validator.clone()) - .collect(); - let old_voting_power = self.staking.compute_voting_power( - validators + + let old_reliability = self.staking.validators_reliability( + signatures .iter() - .filter(|v| *v != &validator) - .cloned() - .collect::>() - .as_slice(), + .map(|s| &s.signature.validator) + .filter(|v| *v != &validator), ); - let new_voting_power = self.staking.compute_voting_power(validators.as_slice()); - let f = self.staking.compute_f(); + + let new_reliability = self + .staking + .validators_reliability(signatures.iter().map(|s| &s.signature.validator)); + // Only send the message if voting power exceeds f, 2 * f or is exactly 3 * f + 1 // This garentees that the message is sent only once per threshold - if old_voting_power < f && new_voting_power >= f - || old_voting_power < 2 * f && new_voting_power >= 2 * f - || new_voting_power > 3 * f - { + if old_reliability != new_reliability { self.broadcast_net_message(MempoolNetMessage::PoDAUpdate( data_proposal_hash, signatures, @@ -160,10 +155,7 @@ impl super::Mempool { return Ok(true); }; - if self - .staking - .check_reliability(metadata.validators().as_slice()) - < CertificateReliability::Weak + if self.staking.validators_reliability(metadata.validators()) < CertificateReliability::Weak { self.rebroadcast_data_proposal(&metadata, &dp_hash) .context("Rebroadcasting oldest DataProposal") diff --git a/src/mempool/storage.rs b/src/mempool/storage.rs index 62a732aa4..2ca975800 100644 --- a/src/mempool/storage.rs +++ b/src/mempool/storage.rs @@ -5,13 +5,11 @@ use futures::{Stream, StreamExt}; use hyle_crypto::BlstCrypto; use hyle_model::{DataSized, LaneId}; use serde::{Deserialize, Serialize}; -use staking::state::Staking; +use staking::state::{CertificateReliability, Staking}; use std::{future::Future, vec}; use tracing::{error, trace}; -use crate::model::{ - Cut, DataProposal, DataProposalHash, Hashed, PoDA, SignedByValidator, ValidatorPublicKey, -}; +use crate::model::{Cut, DataProposal, DataProposalHash, Hashed, PoDA, ValidatorPublicKey}; use super::ValidatorDAG; @@ -44,11 +42,8 @@ pub struct LaneEntryMetadata { } impl LaneEntryMetadata { - pub fn validators(&self) -> Vec { - self.signatures - .iter() - .map(|s| s.signature.validator.clone()) - .collect() + pub fn validators(&self) -> impl Iterator { + self.signatures.iter().map(|s| &s.signature.validator) } } @@ -126,27 +121,23 @@ pub trait Storage { return Ok(None); }; - let filtered: Vec<&SignedByValidator<(DataProposalHash, LaneBytesSize)>> = le - .signatures - .iter() - .filter(|s| bonded_validators.contains(&s.signature.validator)) - .collect(); - - // Compute voting power from the filtered validator set. - let filtered_validators: Vec = filtered - .iter() - .map(|s| s.signature.validator.clone()) - .collect(); + let filtered = || { + le.signatures + .iter() + .filter(|s| bonded_validators.contains(&s.signature.validator)) + }; - // TODO: take by reference to avoid cloning above - let voting_power = staking.compute_voting_power(&filtered_validators); - let f = staking.compute_f(); + let reliability = + staking.validators_reliability(filtered().map(|s| &s.signature.validator)); - trace!("Checking for sufficient voting power: {voting_power} > {f} ?"); + trace!("Checking for weak reliability {reliability:?} ?"); // Enough votes: aggregate into PoDA and return. - if voting_power > f { - match BlstCrypto::aggregate((current.clone(), le.cumul_size), &filtered) { + if reliability >= CertificateReliability::Weak { + match BlstCrypto::aggregate( + (current.clone(), le.cumul_size), + &filtered().collect::>().as_slice(), + ) { Ok(poda) => { return Ok(Some((current, le.cumul_size, poda.signature))); } @@ -429,7 +420,9 @@ mod tests { use crate::mempool::storage_memory::LanesStorage; use assertables::assert_none; use futures::StreamExt; - use hyle_model::{DataSized, Identity, Signature, Transaction, ValidatorSignature}; + use hyle_model::{ + DataSized, Identity, Signature, SignedByValidator, Transaction, ValidatorSignature, + }; use staking::state::Staking; fn setup_storage() -> LanesStorage {