|
| 1 | +use crate::{ |
| 2 | + alerts::{Alert, ForkingNotification}, |
| 3 | + collection::Salt, |
| 4 | + consensus::LOG_TARGET, |
| 5 | + dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest}, |
| 6 | + dissemination::{Addressed, DisseminationMessage, Responder, TaskManager, TaskManagerStatus}, |
| 7 | + extension::Ordering, |
| 8 | + units::{UncheckedSignedUnit, Unit, UnitStore, UnitStoreStatus, Validator}, |
| 9 | + Data, DelayConfig, Hasher, MultiKeychain, NodeIndex, UnitFinalizationHandler, |
| 10 | +}; |
| 11 | +use log::{debug, trace}; |
| 12 | +use std::{ |
| 13 | + cmp::max, |
| 14 | + fmt::{Display, Formatter, Result as FmtResult}, |
| 15 | + time::Duration, |
| 16 | +}; |
| 17 | + |
| 18 | +/// The main logic of the consensus, minus all the asynchronous components. |
| 19 | +pub struct Consensus<UFH, MK> |
| 20 | +where |
| 21 | + UFH: UnitFinalizationHandler, |
| 22 | + MK: MultiKeychain, |
| 23 | +{ |
| 24 | + store: UnitStore<DagUnit<UFH::Hasher, UFH::Data, MK>>, |
| 25 | + dag: Dag<UFH::Hasher, UFH::Data, MK>, |
| 26 | + responder: Responder<UFH::Hasher, UFH::Data, MK>, |
| 27 | + ordering: Ordering<MK, UFH>, |
| 28 | + task_manager: TaskManager<UFH::Hasher>, |
| 29 | +} |
| 30 | + |
| 31 | +/// The status of the consensus, for logging purposes. |
| 32 | +pub struct Status<H: Hasher> { |
| 33 | + task_manager_status: TaskManagerStatus<H>, |
| 34 | + dag_status: DagStatus, |
| 35 | + store_status: UnitStoreStatus, |
| 36 | +} |
| 37 | + |
| 38 | +impl<H: Hasher> Status<H> { |
| 39 | + fn short_report(&self) -> String { |
| 40 | + let rounds_behind = max(self.dag_status.top_round(), self.store_status.top_round()) |
| 41 | + - self.store_status.top_round(); |
| 42 | + match rounds_behind { |
| 43 | + (0..=2) => "healthy".to_string(), |
| 44 | + (3..) => format!("behind by {rounds_behind} rounds"), |
| 45 | + } |
| 46 | + } |
| 47 | +} |
| 48 | + |
| 49 | +impl<H: Hasher> Display for Status<H> { |
| 50 | + fn fmt(&self, f: &mut Formatter) -> FmtResult { |
| 51 | + write!(f, "{}", self.short_report())?; |
| 52 | + write!(f, ";reconstructed DAG: {}", self.store_status)?; |
| 53 | + write!(f, ";additional information: {}", self.dag_status)?; |
| 54 | + write!(f, ";task manager: {}", self.task_manager_status)?; |
| 55 | + Ok(()) |
| 56 | + } |
| 57 | +} |
| 58 | + |
| 59 | +type AddressedDisseminationMessage<H, D, MK> = Addressed<DisseminationMessage<H, D, MK>>; |
| 60 | + |
| 61 | +/// The result of some operation within the consensus, requiring either other components should get |
| 62 | +/// informed about it, or messages should be sent to the network. |
| 63 | +pub struct ConsensusResult<H: Hasher, D: Data, MK: MultiKeychain> { |
| 64 | + /// Units that should be sent for backup saving. |
| 65 | + pub units: Vec<DagUnit<H, D, MK>>, |
| 66 | + /// Alerts that should be sent to the alerting component. |
| 67 | + pub alerts: Vec<Alert<H, D, MK::Signature>>, |
| 68 | + /// Messages that should be sent to other committee members. |
| 69 | + pub messages: Vec<AddressedDisseminationMessage<H, D, MK::Signature>>, |
| 70 | +} |
| 71 | + |
| 72 | +impl<H: Hasher, D: Data, MK: MultiKeychain> ConsensusResult<H, D, MK> { |
| 73 | + fn noop() -> Self { |
| 74 | + ConsensusResult { |
| 75 | + units: Vec::new(), |
| 76 | + alerts: Vec::new(), |
| 77 | + messages: Vec::new(), |
| 78 | + } |
| 79 | + } |
| 80 | +} |
| 81 | + |
| 82 | +impl<UFH, MK> Consensus<UFH, MK> |
| 83 | +where |
| 84 | + UFH: UnitFinalizationHandler, |
| 85 | + MK: MultiKeychain, |
| 86 | +{ |
| 87 | + /// Create a new Consensus. |
| 88 | + pub fn new( |
| 89 | + keychain: MK, |
| 90 | + validator: Validator<MK>, |
| 91 | + finalization_handler: UFH, |
| 92 | + delay_config: DelayConfig, |
| 93 | + ) -> Self { |
| 94 | + let n_members = keychain.node_count(); |
| 95 | + let index = keychain.index(); |
| 96 | + Consensus { |
| 97 | + store: UnitStore::new(n_members), |
| 98 | + dag: Dag::new(validator), |
| 99 | + responder: Responder::new(keychain), |
| 100 | + ordering: Ordering::new(finalization_handler), |
| 101 | + task_manager: TaskManager::new(index, n_members, delay_config), |
| 102 | + } |
| 103 | + } |
| 104 | + |
| 105 | + fn handle_dag_result( |
| 106 | + &mut self, |
| 107 | + result: DagResult<UFH::Hasher, UFH::Data, MK>, |
| 108 | + ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> { |
| 109 | + let DagResult { |
| 110 | + units, |
| 111 | + alerts, |
| 112 | + requests, |
| 113 | + } = result; |
| 114 | + for request in requests { |
| 115 | + self.task_manager.add_request(request); |
| 116 | + } |
| 117 | + let messages = self.trigger_tasks(); |
| 118 | + ConsensusResult { |
| 119 | + units, |
| 120 | + alerts, |
| 121 | + messages, |
| 122 | + } |
| 123 | + } |
| 124 | + |
| 125 | + /// Process a unit received (usually) from the network. |
| 126 | + pub fn process_incoming_unit( |
| 127 | + &mut self, |
| 128 | + unit: UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>, |
| 129 | + ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> { |
| 130 | + let result = self.dag.add_unit(unit, &self.store); |
| 131 | + self.handle_dag_result(result) |
| 132 | + } |
| 133 | + |
| 134 | + /// Process a request received from the network. |
| 135 | + pub fn process_request( |
| 136 | + &mut self, |
| 137 | + request: ReconstructionRequest<UFH::Hasher>, |
| 138 | + node_id: NodeIndex, |
| 139 | + ) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> { |
| 140 | + match self.responder.handle_request(request, &self.store) { |
| 141 | + Ok(response) => Some(Addressed::addressed_to(response.into(), node_id)), |
| 142 | + Err(err) => { |
| 143 | + debug!(target: LOG_TARGET, "Not answering request from node {:?}: {}.", node_id, err); |
| 144 | + None |
| 145 | + } |
| 146 | + } |
| 147 | + } |
| 148 | + |
| 149 | + /// Process a parents response. |
| 150 | + pub fn process_parents( |
| 151 | + &mut self, |
| 152 | + u_hash: <UFH::Hasher as Hasher>::Hash, |
| 153 | + parents: Vec<UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>>, |
| 154 | + ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> { |
| 155 | + if self.store.unit(&u_hash).is_some() { |
| 156 | + trace!(target: LOG_TARGET, "We got parents response but already imported the unit."); |
| 157 | + return ConsensusResult::noop(); |
| 158 | + } |
| 159 | + let result = self.dag.add_parents(u_hash, parents, &self.store); |
| 160 | + self.handle_dag_result(result) |
| 161 | + } |
| 162 | + |
| 163 | + /// Process a newest unit request. |
| 164 | + pub fn process_newest_unit_request( |
| 165 | + &mut self, |
| 166 | + salt: Salt, |
| 167 | + node_id: NodeIndex, |
| 168 | + ) -> AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature> { |
| 169 | + Addressed::addressed_to( |
| 170 | + self.responder |
| 171 | + .handle_newest_unit_request(node_id, salt, &self.store) |
| 172 | + .into(), |
| 173 | + node_id, |
| 174 | + ) |
| 175 | + } |
| 176 | + |
| 177 | + /// Process a forking notification. |
| 178 | + pub fn process_forking_notification( |
| 179 | + &mut self, |
| 180 | + notification: ForkingNotification<UFH::Hasher, UFH::Data, MK::Signature>, |
| 181 | + ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> { |
| 182 | + let result = self |
| 183 | + .dag |
| 184 | + .process_forking_notification(notification, &self.store); |
| 185 | + self.handle_dag_result(result) |
| 186 | + } |
| 187 | + |
| 188 | + /// What to do once a unit has been securely backed up on disk. |
| 189 | + pub fn on_unit_backup_saved( |
| 190 | + &mut self, |
| 191 | + unit: DagUnit<UFH::Hasher, UFH::Data, MK>, |
| 192 | + ) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> { |
| 193 | + let unit_hash = unit.hash(); |
| 194 | + self.store.insert(unit.clone()); |
| 195 | + self.dag.finished_processing(&unit_hash); |
| 196 | + self.ordering.add_unit(unit.clone()); |
| 197 | + self.task_manager.add_unit(&unit) |
| 198 | + } |
| 199 | + |
| 200 | + /// When should `trigger_tasks` be called next. |
| 201 | + pub fn next_tick(&self) -> Duration { |
| 202 | + self.task_manager.next_tick() |
| 203 | + } |
| 204 | + |
| 205 | + /// Trigger all the ready tasks and get all the messages that should be sent now. |
| 206 | + pub fn trigger_tasks( |
| 207 | + &mut self, |
| 208 | + ) -> Vec<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> { |
| 209 | + self.task_manager |
| 210 | + .trigger_tasks(&self.store, self.dag.processing_units()) |
| 211 | + } |
| 212 | + |
| 213 | + /// The status of the consensus handler, for logging purposes. |
| 214 | + pub fn status(&self) -> Status<UFH::Hasher> { |
| 215 | + Status { |
| 216 | + dag_status: self.dag.status(), |
| 217 | + store_status: self.store.status(), |
| 218 | + task_manager_status: self.task_manager.status(), |
| 219 | + } |
| 220 | + } |
| 221 | +} |
0 commit comments