Skip to content

⚡ Weak broadcast of data proposals in mempool #1547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/contracts/staking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ borsh = { workspace = true, features = ["derive"] }

risc0-zkvm = { workspace = true, optional = true, features = ['std'] }
client-sdk = { workspace = true, features = ["risc0"], optional = true }
rand.workspace = true

[dev-dependencies]
risc0-zkvm = { workspace = true, features = ['std', 'prove'] }
Expand Down
80 changes: 80 additions & 0 deletions crates/contracts/staking/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeMap;

use anyhow::Result;
use borsh::{BorshDeserialize, BorshSerialize};
use rand::seq::SliceRandom;
use sdk::{info, BlockHeight, Identity, LaneBytesSize, LaneId, ValidatorPublicKey};
use serde::{Deserialize, Serialize};

Expand All @@ -26,6 +27,36 @@ pub struct Staking {
/// Minimal stake necessary to be part of consensus
pub const MIN_STAKE: u128 = 32;

#[derive(Debug, PartialEq, Eq)]
pub enum CertificateReliability {
None, // _ < f + 1
Weak, // f + 1 <= _ < 2f + 1
Reliable, // 2f + 1 <= _ < 3f + 1
Full, // 3f + 1 <= _
}

// Implémentation de Ord et PartialOrd
impl PartialOrd for CertificateReliability {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for CertificateReliability {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use CertificateReliability::*;
let rank = |r: &CertificateReliability| -> u8 {
match r {
None => 0,
Weak => 1,
Reliable => 2,
Full => 3,
}
};
rank(self).cmp(&rank(other))
}
}

impl Staking {
pub fn new() -> Self {
Staking {
Expand Down Expand Up @@ -73,6 +104,55 @@ impl Staking {
}
}

/// Returns a random list of validators to add to present validators, to form a weak quorum
pub fn choose_weak_quorum<'a, R>(
&'a self,
present_pubkeys: Vec<&'a ValidatorPublicKey>,
rng: &mut R,
) -> Result<Vec<&'a ValidatorPublicKey>>
where
R: rand::Rng + ?Sized,
{
let mut validators: Vec<&ValidatorPublicKey> = self
.bonded()
.iter()
.filter(|v| !present_pubkeys.contains(v))
.collect();

let mut res: Vec<&ValidatorPublicKey> = vec![];

validators.shuffle(rng);

let mut power: u128 = present_pubkeys
.iter()
.filter_map(|pp| self.get_stake(pp))
.sum();
let f = self.compute_f();

while power < f + 1 && !validators.is_empty() {
let random = validators.remove(0);
power += self.get_stake(random).unwrap_or(0);
res.push(random);
}

Ok(res)
}

pub fn check_reliability(&self, validators: &[ValidatorPublicKey]) -> CertificateReliability {
let f = self.compute_f();
let power = self.compute_voting_power(validators);

if power < f + 1 {
return CertificateReliability::None;
} else if power < 2 * f + 1 {
return CertificateReliability::Weak;
} else if power < 3 * f + 1 {
return CertificateReliability::Reliable;
} else {
return CertificateReliability::Full;
}
}

/// Compute f value
pub fn compute_f(&self) -> u128 {
self.total_bond().div_euclid(3)
Expand Down
15 changes: 15 additions & 0 deletions src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,21 @@ impl Mempool {
Ok(())
}

fn broadcast_weak(&mut self, net_message: MempoolNetMessage) -> Result<()> {
let own_key = self.crypto.validator_pubkey();
let selected: HashSet<ValidatorPublicKey> = self
.staking
.choose_weak_quorum(vec![own_key], &mut rand::thread_rng())
.context("Choosing validators for a weak certificate")?
.into_iter()
.map(|s| s.clone())
.collect();

_ = self.broadcast_only_for_net_message(selected, net_message)?;

Ok(())
}

#[inline(always)]
fn broadcast_net_message(&mut self, net_message: MempoolNetMessage) -> Result<()> {
let enum_variant_name: &'static str = (&net_message).into();
Expand Down
57 changes: 39 additions & 18 deletions src/mempool/own_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{bus::BusClientSender, model::*};
use anyhow::{bail, Context, Result};
use client_sdk::tcp_client::TcpServerMessage;
use futures::StreamExt;
use staking::state::CertificateReliability;
use std::collections::HashSet;
use tracing::{debug, trace};

Expand Down Expand Up @@ -159,8 +160,16 @@ impl super::Mempool {
return Ok(true);
};

self.rebroadcast_data_proposal(&metadata, &dp_hash)
.context("Rebroadcasting oldest DataProposal")
if self
.staking
.check_reliability(metadata.validators().as_slice())
< CertificateReliability::Weak
{
self.rebroadcast_data_proposal(&metadata, &dp_hash)
.context("Rebroadcasting oldest DataProposal")
} else {
Ok(false)
}
}

/// Rebroadcast DataProposal to validators that have not signed it yet.
Expand Down Expand Up @@ -191,26 +200,26 @@ impl super::Mempool {
self.metrics
.dp_disseminations
.add(self.staking.bonded().len() as u64, &[]);
self.broadcast_net_message(MempoolNetMessage::DataProposal(
self.broadcast_weak(MempoolNetMessage::DataProposal(
data_proposal.hashed(),
data_proposal.clone(),
))?;
} else {
// If None, rebroadcast it to every validator that has not yet signed it
let validator_that_has_signed: HashSet<&ValidatorPublicKey> = entry_metadata
let signators: HashSet<&ValidatorPublicKey> = entry_metadata
.signatures
.iter()
.map(|s| &s.signature.validator)
.collect();
let signators: Vec<&ValidatorPublicKey> = signators.into_iter().collect();

// No PoA means we rebroadcast the DataProposal for non present voters
let only_for: HashSet<ValidatorPublicKey> = self
.staking
.bonded()
.iter()
.filter(|pubkey| !validator_that_has_signed.contains(pubkey))
.cloned()
.collect();
let only_for: HashSet<ValidatorPublicKey> = HashSet::from_iter(
self.staking
.choose_weak_quorum(signators, &mut rand::thread_rng())?
.into_iter()
.cloned(),
);

if only_for.is_empty() {
return Ok(false);
Expand Down Expand Up @@ -615,7 +624,7 @@ pub mod test {
ctx.process_new_data_proposal(dp)?;
ctx.timer_tick().await?;

let data_proposal = match ctx.assert_broadcast("DataProposal").await.msg {
let data_proposal = match ctx.assert_broadcast_only_for("DataProposal").await.1.msg {
MempoolNetMessage::DataProposal(_, dp) => dp,
_ => panic!("Expected DataProposal message"),
};
Expand Down Expand Up @@ -767,8 +776,12 @@ pub mod test {
// Récupère les deux DataProposals broadcastées par ctx1
let mut dps = vec![];
for _ in 0..2 {
match ctx1.assert_broadcast("DataProposal").await.msg {
MempoolNetMessage::DataProposal(hash, dp) => dps.push((hash, dp)),
let (set, msg) = ctx1.assert_broadcast_only_for("DataProposal").await;
match msg.msg {
MempoolNetMessage::DataProposal(hash, dp) => {
assert_eq!(set.len(), 1);
dps.push((hash, dp));
}
_ => panic!("Expected DataProposal message"),
}
}
Expand All @@ -786,8 +799,12 @@ pub mod test {
// Récupère les deux DataProposals broadcastées par ctx1
let mut dps = vec![];
for _ in 0..1 {
match ctx1.assert_broadcast("DataProposal").await.msg {
MempoolNetMessage::DataProposal(hash, dp) => dps.push((hash, dp)),
let (set, msg) = ctx1.assert_broadcast_only_for("DataProposal").await;
match msg.msg {
MempoolNetMessage::DataProposal(hash, dp) => {
assert_eq!(set.len(), 1);
dps.push((hash, dp));
}
_ => panic!("Expected DataProposal message"),
}
}
Expand All @@ -801,8 +818,12 @@ pub mod test {
// Récupère les deux DataProposals broadcastées par ctx1
let mut dps = vec![];
for _ in 0..1 {
match ctx1.assert_broadcast("DataProposal").await.msg {
MempoolNetMessage::DataProposal(hash, dp) => dps.push((hash, dp)),
let (set, msg) = ctx1.assert_broadcast_only_for("DataProposal").await;
match msg.msg {
MempoolNetMessage::DataProposal(hash, dp) => {
assert_eq!(set.len(), 1);
dps.push((hash, dp));
}
_ => panic!("Expected DataProposal message"),
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/mempool/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ pub struct LaneEntryMetadata {
pub signatures: Vec<ValidatorDAG>,
}

impl LaneEntryMetadata {
pub fn validators(&self) -> Vec<ValidatorPublicKey> {
self.signatures
.iter()
.map(|s| s.signature.validator.clone())
.collect()
}
}

pub trait Storage {
fn persist(&self) -> Result<()>;

Expand Down
68 changes: 38 additions & 30 deletions src/mempool/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,36 +169,6 @@ impl MempoolTestCtx {
.expect("fail to handle event");
}

#[track_caller]
pub fn assert_broadcast_only_for(
&mut self,
description: &str,
) -> MsgWithHeader<MempoolNetMessage> {
#[allow(clippy::expect_fun_call)]
let rec = self
.out_receiver
.try_recv()
.expect(format!("{description}: No message broadcasted").as_str());

match rec {
OutboundMessage::BroadcastMessageOnlyFor(_, net_msg) => {
if let NetMessage::MempoolMessage(msg) = net_msg {
msg
} else {
println!(
"{description}: Mempool OutboundMessage message is missing, found {net_msg}"
);
self.assert_broadcast_only_for(description)
}
}
_ => {
println!(
"{description}: Broadcast OutboundMessage message is missing, found {rec:?}",
);
self.assert_broadcast_only_for(description)
}
}
}
pub fn assert_send(
&mut self,
to: &ValidatorPublicKey,
Expand Down Expand Up @@ -282,6 +252,44 @@ impl MempoolTestCtx {
})
}

pub fn assert_broadcast_only_for(
&mut self,
description: &str,
) -> Pin<
Box<
dyn Future<
Output = (
HashSet<ValidatorPublicKey>,
MsgWithHeader<MempoolNetMessage>,
),
> + '_,
>,
> {
let description = description.to_string().clone();
Box::pin(async move {
#[allow(clippy::expect_fun_call)]
let rec = tokio::time::timeout(Duration::from_millis(1000), self.out_receiver.recv())
.await
.expect(format!("{description}: No message broadcasted only for").as_str())
.expect(format!("{description}: No message broadcasted only for").as_str());

match rec {
OutboundMessage::BroadcastMessageOnlyFor(validators, net_msg) => {
if let NetMessage::MempoolMessage(msg) = net_msg {
(validators, msg)
} else {
println!("{description}: Mempool OutboundMessage message is missing, found {net_msg}");
self.assert_broadcast_only_for(description.as_str()).await
}
}
_ => {
println!("{description}: BroadcastOnlyFor OutboundMessage message is missing, found {rec:?}");
self.assert_broadcast_only_for(description.as_str()).await
}
}
})
}

pub async fn handle_msg(&mut self, msg: &MsgWithHeader<MempoolNetMessage>, _err: &str) {
debug!("📥 {} Handling message: {:?}", self.name, msg);
self.mempool
Expand Down
Loading
Loading