Skip to content

Commit 253eb4f

Browse files
committed
Separate out the synchronous logic from consensus
1 parent 9b99569 commit 253eb4f

File tree

5 files changed

+286
-151
lines changed

5 files changed

+286
-151
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

consensus/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "aleph-bft"
3-
version = "0.45.3"
3+
version = "0.45.4"
44
edition = "2021"
55
authors = ["Cardinal Cryptography"]
66
categories = ["algorithms", "data-structures", "cryptography", "database"]

consensus/src/consensus/handler.rs

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
use crate::{
2+
alerts::{Alert, ForkingNotification},
3+
collection::Salt,
4+
consensus::LOG_TARGET,
5+
dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest},
6+
dissemination::{Addressed, DisseminationMessage, Responder, TaskManager, TaskManagerStatus},
7+
extension::Ordering,
8+
units::{UncheckedSignedUnit, Unit, UnitStore, UnitStoreStatus, Validator},
9+
Data, DelayConfig, Hasher, MultiKeychain, NodeIndex, UnitFinalizationHandler,
10+
};
11+
use log::{debug, trace};
12+
use std::{
13+
cmp::max,
14+
fmt::{Display, Formatter, Result as FmtResult},
15+
time::Duration,
16+
};
17+
18+
/// The main logic of the consensus, minus all the asynchronous components.
19+
pub struct Consensus<UFH, MK>
20+
where
21+
UFH: UnitFinalizationHandler,
22+
MK: MultiKeychain,
23+
{
24+
store: UnitStore<DagUnit<UFH::Hasher, UFH::Data, MK>>,
25+
dag: Dag<UFH::Hasher, UFH::Data, MK>,
26+
responder: Responder<UFH::Hasher, UFH::Data, MK>,
27+
ordering: Ordering<MK, UFH>,
28+
task_manager: TaskManager<UFH::Hasher>,
29+
}
30+
31+
/// The status of the consensus, for logging purposes.
32+
pub struct Status<H: Hasher> {
33+
task_manager_status: TaskManagerStatus<H>,
34+
dag_status: DagStatus,
35+
store_status: UnitStoreStatus,
36+
}
37+
38+
impl<H: Hasher> Status<H> {
39+
fn short_report(&self) -> String {
40+
let rounds_behind = max(self.dag_status.top_round(), self.store_status.top_round())
41+
- self.store_status.top_round();
42+
match rounds_behind {
43+
(0..=2) => "healthy".to_string(),
44+
(3..) => format!("behind by {rounds_behind} rounds"),
45+
}
46+
}
47+
}
48+
49+
impl<H: Hasher> Display for Status<H> {
50+
fn fmt(&self, f: &mut Formatter) -> FmtResult {
51+
write!(f, "{}", self.short_report())?;
52+
write!(f, ";reconstructed DAG: {}", self.store_status)?;
53+
write!(f, ";additional information: {}", self.dag_status)?;
54+
write!(f, ";task manager: {}", self.task_manager_status)?;
55+
Ok(())
56+
}
57+
}
58+
59+
type AddressedDisseminationMessage<H, D, MK> = Addressed<DisseminationMessage<H, D, MK>>;
60+
61+
/// The result of some operation within the consensus, requiring either other components should get
62+
/// informed about it, or messages should be sent to the network.
63+
pub struct ConsensusResult<H: Hasher, D: Data, MK: MultiKeychain> {
64+
/// Units that should be sent for backup saving.
65+
pub units: Vec<DagUnit<H, D, MK>>,
66+
/// Alerts that should be sent to the alerting component.
67+
pub alerts: Vec<Alert<H, D, MK::Signature>>,
68+
/// Messages that should be sent to other committee members.
69+
pub messages: Vec<AddressedDisseminationMessage<H, D, MK::Signature>>,
70+
}
71+
72+
impl<H: Hasher, D: Data, MK: MultiKeychain> ConsensusResult<H, D, MK> {
73+
fn noop() -> Self {
74+
ConsensusResult {
75+
units: Vec::new(),
76+
alerts: Vec::new(),
77+
messages: Vec::new(),
78+
}
79+
}
80+
}
81+
82+
impl<UFH, MK> Consensus<UFH, MK>
83+
where
84+
UFH: UnitFinalizationHandler,
85+
MK: MultiKeychain,
86+
{
87+
/// Create a new Consensus.
88+
pub fn new(
89+
keychain: MK,
90+
validator: Validator<MK>,
91+
finalization_handler: UFH,
92+
delay_config: DelayConfig,
93+
) -> Self {
94+
let n_members = keychain.node_count();
95+
let index = keychain.index();
96+
Consensus {
97+
store: UnitStore::new(n_members),
98+
dag: Dag::new(validator),
99+
responder: Responder::new(keychain),
100+
ordering: Ordering::new(finalization_handler),
101+
task_manager: TaskManager::new(index, n_members, delay_config),
102+
}
103+
}
104+
105+
fn handle_dag_result(
106+
&mut self,
107+
result: DagResult<UFH::Hasher, UFH::Data, MK>,
108+
) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
109+
let DagResult {
110+
units,
111+
alerts,
112+
requests,
113+
} = result;
114+
for request in requests {
115+
self.task_manager.add_request(request);
116+
}
117+
let messages = self.trigger_tasks();
118+
ConsensusResult {
119+
units,
120+
alerts,
121+
messages,
122+
}
123+
}
124+
125+
/// Process a unit received (usually) from the network.
126+
pub fn process_incoming_unit(
127+
&mut self,
128+
unit: UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>,
129+
) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
130+
let result = self.dag.add_unit(unit, &self.store);
131+
self.handle_dag_result(result)
132+
}
133+
134+
/// Process a request received from the network.
135+
pub fn process_request(
136+
&mut self,
137+
request: ReconstructionRequest<UFH::Hasher>,
138+
node_id: NodeIndex,
139+
) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
140+
match self.responder.handle_request(request, &self.store) {
141+
Ok(response) => Some(Addressed::addressed_to(response.into(), node_id)),
142+
Err(err) => {
143+
debug!(target: LOG_TARGET, "Not answering request from node {:?}: {}.", node_id, err);
144+
None
145+
}
146+
}
147+
}
148+
149+
/// Process a parents response.
150+
pub fn process_parents(
151+
&mut self,
152+
u_hash: <UFH::Hasher as Hasher>::Hash,
153+
parents: Vec<UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>>,
154+
) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
155+
if self.store.unit(&u_hash).is_some() {
156+
trace!(target: LOG_TARGET, "We got parents response but already imported the unit.");
157+
return ConsensusResult::noop();
158+
}
159+
let result = self.dag.add_parents(u_hash, parents, &self.store);
160+
self.handle_dag_result(result)
161+
}
162+
163+
/// Process a newest unit request.
164+
pub fn process_newest_unit_request(
165+
&mut self,
166+
salt: Salt,
167+
node_id: NodeIndex,
168+
) -> AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature> {
169+
Addressed::addressed_to(
170+
self.responder
171+
.handle_newest_unit_request(node_id, salt, &self.store)
172+
.into(),
173+
node_id,
174+
)
175+
}
176+
177+
/// Process a forking notification.
178+
pub fn process_forking_notification(
179+
&mut self,
180+
notification: ForkingNotification<UFH::Hasher, UFH::Data, MK::Signature>,
181+
) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
182+
let result = self
183+
.dag
184+
.process_forking_notification(notification, &self.store);
185+
self.handle_dag_result(result)
186+
}
187+
188+
/// What to do once a unit has been securely backed up on disk.
189+
pub fn on_unit_backup_saved(
190+
&mut self,
191+
unit: DagUnit<UFH::Hasher, UFH::Data, MK>,
192+
) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
193+
let unit_hash = unit.hash();
194+
self.store.insert(unit.clone());
195+
self.dag.finished_processing(&unit_hash);
196+
self.ordering.add_unit(unit.clone());
197+
self.task_manager.add_unit(&unit)
198+
}
199+
200+
/// When should `trigger_tasks` be called next.
201+
pub fn next_tick(&self) -> Duration {
202+
self.task_manager.next_tick()
203+
}
204+
205+
/// Trigger all the ready tasks and get all the messages that should be sent now.
206+
pub fn trigger_tasks(
207+
&mut self,
208+
) -> Vec<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
209+
self.task_manager
210+
.trigger_tasks(&self.store, self.dag.processing_units())
211+
}
212+
213+
/// The status of the consensus handler, for logging purposes.
214+
pub fn status(&self) -> Status<UFH::Hasher> {
215+
Status {
216+
dag_status: self.dag.status(),
217+
store_status: self.store.status(),
218+
task_manager_status: self.task_manager.status(),
219+
}
220+
}
221+
}

consensus/src/consensus/mod.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use crate::{
22
alerts::{Handler as AlertHandler, Service as AlertService, IO as AlertIO},
33
backup::{BackupLoader, BackupSaver},
44
collection::initial_unit_collection,
5-
consensus::service::{Config as ServiceConfig, Service},
5+
consensus::{
6+
handler::Consensus,
7+
service::{Service, IO as ConsensusIO},
8+
},
69
creation, handle_task_termination,
710
interface::LocalIO,
811
network::{Hub as NetworkHub, NetworkData},
@@ -16,6 +19,7 @@ use futures::{
1619
};
1720
use log::{debug, error, info};
1821

22+
mod handler;
1923
mod service;
2024

2125
const LOG_TARGET: &str = "AlephBFT-consensus";
@@ -185,13 +189,16 @@ pub async fn run_session<
185189
pin_mut!(starting_round_handle);
186190
debug!(target: LOG_TARGET, "Initial unit collection spawned.");
187191

188-
//TODO: just put the Service here
189192
debug!(target: LOG_TARGET, "Spawning consensus service.");
193+
let consensus = Consensus::new(
194+
keychain.clone(),
195+
validator.clone(),
196+
finalization_handler,
197+
config.delay_config().clone(),
198+
);
190199
let service_handle = spawn_handle
191200
.spawn_essential("consensus/service", {
192-
let service_config = ServiceConfig {
193-
delay_config: config.delay_config().clone(),
194-
finalization_handler,
201+
let consensus_io = ConsensusIO {
195202
backup_units_for_saver,
196203
backup_units_from_saver,
197204
alerts_for_alerter,
@@ -203,9 +210,7 @@ pub async fn run_session<
203210
new_units_from_creator,
204211
};
205212
let service_terminator = terminator.add_offspring_connection("service");
206-
let validator = validator.clone();
207-
let keychain = keychain.clone();
208-
let service = Service::new(service_config, keychain, validator);
213+
let service = Service::new(consensus, consensus_io);
209214

210215
async move { service.run(loaded_units, service_terminator).await }
211216
})

0 commit comments

Comments
 (0)