From 0258a647ca2561278662eb5502d207199dc19238 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 30 Sep 2024 18:48:26 +0200 Subject: [PATCH] chore(code): Cleanup `malachite-consensus` (#432) * Rename `GossipMsg` to `SignedConsensusMsg` * Hide some modules from documentation * Add missing doc comment * Hide internal `perform!` macro * Fix link * Rename `Msg` to `Input` * Fix link * Rename `msg_queue` to `input_queue` --- code/crates/actors/src/consensus.rs | 34 ++++++++----- code/crates/actors/src/gossip_consensus.rs | 10 ++-- code/crates/actors/src/util/codec.rs | 6 +-- code/crates/consensus/src/effect.rs | 6 +-- code/crates/consensus/src/handle.rs | 49 +++++++++++-------- .../crates/consensus/src/{msg.rs => input.rs} | 7 +-- code/crates/consensus/src/lib.rs | 7 ++- code/crates/consensus/src/macros.rs | 19 +++---- code/crates/consensus/src/params.rs | 1 + code/crates/consensus/src/state.rs | 10 ++-- code/crates/consensus/src/types.rs | 10 ++-- code/crates/starknet/app/src/codec.rs | 14 +++--- .../crates/test/tests/full_proposal_keeper.rs | 12 ++--- 13 files changed, 104 insertions(+), 81 deletions(-) rename code/crates/consensus/src/{msg.rs => input.rs} (85%) diff --git a/code/crates/actors/src/consensus.rs b/code/crates/actors/src/consensus.rs index 3e347d75c..a251b563d 100644 --- a/code/crates/actors/src/consensus.rs +++ b/code/crates/actors/src/consensus.rs @@ -57,7 +57,7 @@ pub enum Msg { ReceivedProposedValue(ProposedValue), } -type InnerMsg = malachite_consensus::Msg; +type ConsensusInput = malachite_consensus::Input; impl From> for Msg { fn from(msg: TimeoutElapsed) -> Self { @@ -168,14 +168,14 @@ where Ok(actor_ref) } - async fn process_msg( + async fn process_input( &self, myself: &ActorRef>, state: &mut State, - msg: InnerMsg, + input: ConsensusInput, ) -> Result<(), ActorProcessingErr> { malachite_consensus::process!( - msg: msg, + input: input, state: &mut state.consensus, metrics: &self.metrics, with: effect => { @@ -194,7 +194,11 @@ where Msg::StartHeight(height) => { let validator_set = self.get_validator_set(height).await?; let result = self - .process_msg(&myself, state, InnerMsg::StartHeight(height, validator_set)) + .process_input( + &myself, + state, + ConsensusInput::StartHeight(height, validator_set), + ) .await; if let Err(e) = result { @@ -206,7 +210,11 @@ where Msg::ProposeValue(height, round, value) => { let result = self - .process_msg(&myself, state, InnerMsg::ProposeValue(height, round, value)) + .process_input( + &myself, + state, + ConsensusInput::ProposeValue(height, round, value), + ) .await; if let Err(e) = result { @@ -245,10 +253,10 @@ where let validator_set = self.get_validator_set(height).await?; let result = self - .process_msg( + .process_input( &myself, state, - InnerMsg::StartHeight(height, validator_set), + ConsensusInput::StartHeight(height, validator_set), ) .await; @@ -273,7 +281,9 @@ where } GossipEvent::Vote(from, vote) => { - if let Err(e) = self.process_msg(&myself, state, InnerMsg::Vote(vote)).await + if let Err(e) = self + .process_input(&myself, state, ConsensusInput::Vote(vote)) + .await { error!(%from, "Error when processing vote: {e:?}"); } @@ -283,7 +293,7 @@ where GossipEvent::Proposal(from, proposal) => { if let Err(e) = self - .process_msg(&myself, state, InnerMsg::Proposal(proposal)) + .process_input(&myself, state, ConsensusInput::Proposal(proposal)) .await { error!(%from, "Error when processing proposal: {e:?}"); @@ -372,7 +382,7 @@ where } let result = self - .process_msg(&myself, state, InnerMsg::TimeoutElapsed(timeout)) + .process_input(&myself, state, ConsensusInput::TimeoutElapsed(timeout)) .await; if let Err(e) = result { @@ -384,7 +394,7 @@ where Msg::ReceivedProposedValue(value) => { let result = self - .process_msg(&myself, state, InnerMsg::ReceivedProposedValue(value)) + .process_input(&myself, state, ConsensusInput::ReceivedProposedValue(value)) .await; if let Err(e) = result { diff --git a/code/crates/actors/src/gossip_consensus.rs b/code/crates/actors/src/gossip_consensus.rs index b0fe3846f..3f4f3a6c1 100644 --- a/code/crates/actors/src/gossip_consensus.rs +++ b/code/crates/actors/src/gossip_consensus.rs @@ -9,7 +9,7 @@ use tokio::task::JoinHandle; use tracing::{debug, error, error_span, Instrument}; use malachite_common::{Context, SignedProposal, SignedVote}; -use malachite_consensus::GossipMsg; +use malachite_consensus::SignedConsensusMsg; use malachite_gossip_consensus::handle::CtrlHandle; use malachite_gossip_consensus::{Channel, Config, Event, Multiaddr, PeerId}; use malachite_metrics::SharedRegistry; @@ -98,8 +98,8 @@ pub enum Msg { /// Subscribe this actor to receive gossip events Subscribe(ActorRef>), - /// Broadcast a gossip message - BroadcastMsg(GossipMsg), + /// Broadcast a signed consensus message + BroadcastMsg(SignedConsensusMsg), /// Broadcast a proposal part BroadcastProposalPart(StreamMessage), @@ -225,8 +225,8 @@ where }; let event = match msg { - GossipMsg::Vote(vote) => GossipEvent::Vote(from, vote), - GossipMsg::Proposal(proposal) => GossipEvent::Proposal(from, proposal), + SignedConsensusMsg::Vote(vote) => GossipEvent::Vote(from, vote), + SignedConsensusMsg::Proposal(proposal) => GossipEvent::Proposal(from, proposal), }; self.publish(event, subscribers); diff --git a/code/crates/actors/src/util/codec.rs b/code/crates/actors/src/util/codec.rs index 3e612dd34..78e0a31e6 100644 --- a/code/crates/actors/src/util/codec.rs +++ b/code/crates/actors/src/util/codec.rs @@ -1,5 +1,5 @@ use malachite_common::Context; -use malachite_consensus::GossipMsg; +use malachite_consensus::SignedConsensusMsg; use malachite_gossip_consensus::Bytes; use malachite_proto::Protobuf; @@ -8,8 +8,8 @@ use super::streaming::StreamMessage; pub trait NetworkCodec: Sync + Send + 'static { type Error: std::error::Error + Send + Sync + 'static; - fn decode_msg(bytes: Bytes) -> Result, Self::Error>; - fn encode_msg(msg: GossipMsg) -> Result; + fn decode_msg(bytes: Bytes) -> Result, Self::Error>; + fn encode_msg(msg: SignedConsensusMsg) -> Result; fn decode_stream_msg(bytes: Bytes) -> Result, Self::Error> where diff --git a/code/crates/consensus/src/effect.rs b/code/crates/consensus/src/effect.rs index 493491f17..109f56f6e 100644 --- a/code/crates/consensus/src/effect.rs +++ b/code/crates/consensus/src/effect.rs @@ -2,11 +2,11 @@ use derive_where::derive_where; use malachite_common::*; -use crate::types::GossipMsg; +use crate::types::SignedConsensusMsg; /// An effect which may be yielded by a consensus process. /// -/// Effects are handled by the caller using [`process`][process]. +/// Effects are handled by the caller using [`process!`][process]. /// After that the consensus computation is then resumed. /// /// [process]: crate::process @@ -32,7 +32,7 @@ where StartRound(Ctx::Height, Round, Ctx::Address), /// Broadcast a message - Broadcast(GossipMsg), + Broadcast(SignedConsensusMsg), /// Get a value to propose at the given height and round, within the given timeout GetValue(Ctx::Height, Round, Timeout), diff --git a/code/crates/consensus/src/handle.rs b/code/crates/consensus/src/handle.rs index 9aa808e60..19d400031 100644 --- a/code/crates/consensus/src/handle.rs +++ b/code/crates/consensus/src/handle.rs @@ -9,22 +9,22 @@ use malachite_metrics::Metrics; use crate::effect::Effect; use crate::error::Error; use crate::gen::Co; -use crate::msg::Msg; +use crate::input::Input; use crate::perform; use crate::state::State; -use crate::types::{GossipMsg, ProposedValue}; +use crate::types::{ProposedValue, SignedConsensusMsg}; use crate::util::pretty::{PrettyProposal, PrettyVal, PrettyVote}; pub async fn handle( co: Co, state: &mut State, metrics: &Metrics, - msg: Msg, + input: Input, ) -> Result<(), Error> where Ctx: Context, { - handle_msg(&co, state, metrics, msg).await + handle_msg(&co, state, metrics, input).await } #[async_recursion] @@ -32,22 +32,22 @@ async fn handle_msg( co: &Co, state: &mut State, metrics: &Metrics, - msg: Msg, + input: Input, ) -> Result<(), Error> where Ctx: Context, { - match msg { - Msg::StartHeight(height, vs) => { + match input { + Input::StartHeight(height, vs) => { reset_and_start_height(co, state, metrics, height, vs).await } - Msg::Vote(vote) => on_vote(co, state, metrics, vote).await, - Msg::Proposal(proposal) => on_proposal(co, state, metrics, proposal).await, - Msg::ProposeValue(height, round, value) => { + Input::Vote(vote) => on_vote(co, state, metrics, vote).await, + Input::Proposal(proposal) => on_proposal(co, state, metrics, proposal).await, + Input::ProposeValue(height, round, value) => { propose_value(co, state, metrics, height, round, value).await } - Msg::TimeoutElapsed(timeout) => on_timeout_elapsed(co, state, metrics, timeout).await, - Msg::ReceivedProposedValue(block) => { + Input::TimeoutElapsed(timeout) => on_timeout_elapsed(co, state, metrics, timeout).await, + Input::ReceivedProposedValue(block) => { on_received_proposed_value(co, state, metrics, block).await } } @@ -117,7 +117,7 @@ async fn replay_pending_msgs( where Ctx: Context, { - let pending_msgs = std::mem::take(&mut state.msg_queue); + let pending_msgs = std::mem::take(&mut state.input_queue); debug!("Replaying {} messages", pending_msgs.len()); for pending_msg in pending_msgs { @@ -268,7 +268,7 @@ where perform!( co, - Effect::Broadcast(GossipMsg::Proposal(signed_proposal.clone())) + Effect::Broadcast(SignedConsensusMsg::Proposal(signed_proposal.clone())) ); on_proposal(co, state, metrics, signed_proposal).await @@ -284,7 +284,10 @@ where let signed_vote = state.ctx.sign_vote(vote); - perform!(co, Effect::Broadcast(GossipMsg::Vote(signed_vote.clone()),)); + perform!( + co, + Effect::Broadcast(SignedConsensusMsg::Vote(signed_vote.clone())) + ); apply_driver_input(co, state, metrics, DriverInput::Vote(signed_vote)).await } @@ -482,7 +485,7 @@ where "Received vote at round -1, queuing for later" ); - state.msg_queue.push_back(Msg::Vote(signed_vote)); + state.input_queue.push_back(Input::Vote(signed_vote)); return Ok(()); } @@ -494,7 +497,7 @@ where "Received vote for higher height, queuing for later" ); - state.msg_queue.push_back(Msg::Vote(signed_vote)); + state.input_queue.push_back(Input::Vote(signed_vote)); return Ok(()); } @@ -568,13 +571,17 @@ where // Drop all others. if state.driver.round() == Round::Nil { debug!("Received proposal at round -1, queuing for later"); - state.msg_queue.push_back(Msg::Proposal(signed_proposal)); + state + .input_queue + .push_back(Input::Proposal(signed_proposal)); return Ok(()); } if state.driver.height() < proposal_height { debug!("Received proposal for higher height, queuing for later"); - state.msg_queue.push_back(Msg::Proposal(signed_proposal)); + state + .input_queue + .push_back(Input::Proposal(signed_proposal)); return Ok(()); } @@ -675,8 +682,8 @@ where if state.driver.height() < proposed_value.height { debug!("Received value for higher height, queuing for later"); state - .msg_queue - .push_back(Msg::ReceivedProposedValue(proposed_value)); + .input_queue + .push_back(Input::ReceivedProposedValue(proposed_value)); return Ok(()); } diff --git a/code/crates/consensus/src/msg.rs b/code/crates/consensus/src/input.rs similarity index 85% rename from code/crates/consensus/src/msg.rs rename to code/crates/consensus/src/input.rs index 5b006ec8c..581cc6108 100644 --- a/code/crates/consensus/src/msg.rs +++ b/code/crates/consensus/src/input.rs @@ -1,10 +1,11 @@ -use crate::ProposedValue; use derive_where::derive_where; use malachite_common::{Context, Round, SignedProposal, SignedVote, Timeout}; -/// Messages that can be handled by the consensus process +use crate::types::ProposedValue; + +/// Inputs to be handled by the consensus process. #[derive_where(Clone, Debug, PartialEq, Eq)] -pub enum Msg +pub enum Input where Ctx: Context, { diff --git a/code/crates/consensus/src/lib.rs b/code/crates/consensus/src/lib.rs index 29c1aa86d..5f866434c 100644 --- a/code/crates/consensus/src/lib.rs +++ b/code/crates/consensus/src/lib.rs @@ -1,5 +1,5 @@ -mod msg; -pub use msg::Msg; +mod input; +pub use input::Input; mod state; pub use state::State; @@ -21,11 +21,14 @@ mod handle; mod macros; mod util; +// Only used in macros #[doc(hidden)] pub mod gen; +// Only used in macros #[doc(hidden)] pub use handle::handle; +// Only used internally, but needs to be exposed for tests #[doc(hidden)] pub use full_proposal::{FullProposal, FullProposalKeeper}; diff --git a/code/crates/consensus/src/macros.rs b/code/crates/consensus/src/macros.rs index 3f2664480..a9c5cee41 100644 --- a/code/crates/consensus/src/macros.rs +++ b/code/crates/consensus/src/macros.rs @@ -1,22 +1,23 @@ -/// Process a message and handle the emitted effects. +/// Process an input and handle the emitted effects. /// /// # Example /// /// ```rust,ignore -/// /// malachite_consensus::process!( -/// // Message to process -/// msg: msg, -/// // Consensus state and metrics -/// state: &mut state, metrics: &metrics, +/// // Input to process +/// input: input, +/// // Consensus state +/// state: &mut state, +/// // Metrics +/// metrics: &metrics, /// // Effect handler -/// on: effect => handle_effect(myself, &mut timers, &mut timeouts, effect).await +/// on: effect => handle_effect(effect).await /// ) /// ``` #[macro_export] macro_rules! process { - (msg: $msg:expr, state: $state:expr, metrics: $metrics:expr, with: $effect:ident => $handle:expr) => {{ - let mut gen = $crate::gen::Gen::new(|co| $crate::handle(co, $state, $metrics, $msg)); + (input: $input:expr, state: $state:expr, metrics: $metrics:expr, with: $effect:ident => $handle:expr) => {{ + let mut gen = $crate::gen::Gen::new(|co| $crate::handle(co, $state, $metrics, $input)); let mut co_result = gen.resume_with(()); diff --git a/code/crates/consensus/src/params.rs b/code/crates/consensus/src/params.rs index ce2ab7478..f5d66acf1 100644 --- a/code/crates/consensus/src/params.rs +++ b/code/crates/consensus/src/params.rs @@ -3,6 +3,7 @@ use malachite_common::Context; pub use malachite_driver::ThresholdParams; +/// Consensus parameters. #[derive_where(Clone, Debug)] pub struct Params { pub start_height: Ctx::Height, diff --git a/code/crates/consensus/src/state.rs b/code/crates/consensus/src/state.rs index e5f3cc203..580015ce2 100644 --- a/code/crates/consensus/src/state.rs +++ b/code/crates/consensus/src/state.rs @@ -4,12 +4,12 @@ use malachite_common::*; use malachite_driver::Driver; use crate::error::Error; -use crate::msg::Msg; +use crate::input::Input; use crate::Params; use crate::ProposedValue; use crate::{FullProposal, FullProposalKeeper}; -/// The state maintained by consensus for processing a [`Msg`][crate::msg::Msg]. +/// The state maintained by consensus for processing a [`Input`][crate::Input]. pub struct State where Ctx: Context, @@ -20,9 +20,9 @@ where /// Driver for the per-round consensus state machine pub driver: Driver, - /// A queue of gossip events that were received before the + /// A queue of inputs that were received before the /// driver started the new height and was still at round Nil. - pub msg_queue: VecDeque>, + pub input_queue: VecDeque>, /// The proposals to decide on. pub full_proposal_keeper: FullProposalKeeper, @@ -50,7 +50,7 @@ where Self { ctx, driver, - msg_queue: Default::default(), + input_queue: Default::default(), full_proposal_keeper: Default::default(), signed_precommits: Default::default(), decision: Default::default(), diff --git a/code/crates/consensus/src/types.rs b/code/crates/consensus/src/types.rs index a3e5cb7dc..793034c96 100644 --- a/code/crates/consensus/src/types.rs +++ b/code/crates/consensus/src/types.rs @@ -5,18 +5,18 @@ use malachite_common::{Context, Proposal, Round, SignedProposal, SignedVote, Val pub use libp2p_identity::PeerId; pub use multiaddr::Multiaddr; -/// A message that can be broadcast by the gossip layer +/// A signed consensus message, ie. a signed vote or a signed proposal. #[derive_where(Clone, Debug, PartialEq, Eq)] -pub enum GossipMsg { +pub enum SignedConsensusMsg { Vote(SignedVote), Proposal(SignedProposal), } -impl GossipMsg { +impl SignedConsensusMsg { pub fn msg_height(&self) -> Ctx::Height { match self { - GossipMsg::Vote(msg) => msg.height(), - GossipMsg::Proposal(msg) => msg.height(), + SignedConsensusMsg::Vote(msg) => msg.height(), + SignedConsensusMsg::Proposal(msg) => msg.height(), } } } diff --git a/code/crates/starknet/app/src/codec.rs b/code/crates/starknet/app/src/codec.rs index 1c19d3c07..ad7640000 100644 --- a/code/crates/starknet/app/src/codec.rs +++ b/code/crates/starknet/app/src/codec.rs @@ -4,7 +4,7 @@ use prost::Message; use malachite_actors::util::codec::NetworkCodec; use malachite_actors::util::streaming::{StreamContent, StreamMessage}; use malachite_common::{SignedProposal, SignedVote}; -use malachite_consensus::GossipMsg; +use malachite_consensus::SignedConsensusMsg; use malachite_proto::{Error as ProtoError, Protobuf}; use malachite_starknet_host::mock::context::MockContext; use malachite_starknet_host::types::Vote; @@ -17,7 +17,7 @@ pub struct ProtobufCodec; impl NetworkCodec for ProtobufCodec { type Error = ProtoError; - fn decode_msg(bytes: Bytes) -> Result, Self::Error> { + fn decode_msg(bytes: Bytes) -> Result, Self::Error> { let proto = ConsensusMessage::decode(bytes)?; let proto_signature = proto @@ -32,20 +32,20 @@ impl NetworkCodec for ProtobufCodec { match message { Messages::Vote(v) => { - Vote::from_proto(v).map(|v| GossipMsg::Vote(SignedVote::new(v, signature))) + Vote::from_proto(v).map(|v| SignedConsensusMsg::Vote(SignedVote::new(v, signature))) } Messages::Proposal(p) => p2p::Proposal::from_proto(p) - .map(|p| GossipMsg::Proposal(SignedProposal::new(p, signature))), + .map(|p| SignedConsensusMsg::Proposal(SignedProposal::new(p, signature))), } } - fn encode_msg(msg: GossipMsg) -> Result { + fn encode_msg(msg: SignedConsensusMsg) -> Result { let message = match msg { - GossipMsg::Vote(v) => ConsensusMessage { + SignedConsensusMsg::Vote(v) => ConsensusMessage { messages: Some(Messages::Vote(v.to_proto()?)), signature: Some(v.signature.to_proto()?), }, - GossipMsg::Proposal(p) => ConsensusMessage { + SignedConsensusMsg::Proposal(p) => ConsensusMessage { messages: Some(Messages::Proposal(p.to_proto()?)), signature: Some(p.signature.to_proto()?), }, diff --git a/code/crates/test/tests/full_proposal_keeper.rs b/code/crates/test/tests/full_proposal_keeper.rs index de98af33e..1cadb0060 100644 --- a/code/crates/test/tests/full_proposal_keeper.rs +++ b/code/crates/test/tests/full_proposal_keeper.rs @@ -1,6 +1,6 @@ use malachite_actors::host::ProposedValue; use malachite_common::{Context, Round, SignedProposal, Validity}; -use malachite_consensus::{FullProposalKeeper, Msg}; +use malachite_consensus::{FullProposalKeeper, Input}; use malachite_test::utils::validators::make_validators; use malachite_test::{Address, Proposal, Value}; use malachite_test::{Height, TestContext}; @@ -32,7 +32,7 @@ macro_rules! prop { macro_rules! prop_msg { ($co:expr, $a:expr, $r:expr, $v:expr, $vr: expr) => { - Msg::Proposal(signed_proposal_pol( + Input::Proposal(signed_proposal_pol( $co, Height::new(1), Round::new($r), @@ -57,7 +57,7 @@ macro_rules! value { macro_rules! val_msg { ( $a:expr, $r:expr, $v:expr, $val: expr) => { - Msg::ReceivedProposedValue(ProposedValue { + Input::ReceivedProposedValue(ProposedValue { height: Height::new(1), round: Round::new($r), value: Value::new($v), @@ -86,7 +86,7 @@ macro_rules! props_for_value { // - fps_for_value - full proposals expected for a given ProposedValue struct Test { desc: &'static str, - input: Vec>, + input: Vec>, some_fp_for_rv: Vec<(i64, u64)>, none_fp_for_rv: Vec<(i64, u64)>, fps_for_value: (ProposedValue, Vec>), @@ -261,8 +261,8 @@ fn full_proposal_keeper_tests() { for m in s.input { match m { - Msg::Proposal(p) => keeper.store_proposal(p), - Msg::ReceivedProposedValue(v) => keeper.store_value(&v), + Input::Proposal(p) => keeper.store_proposal(p), + Input::ReceivedProposedValue(v) => keeper.store_value(&v), _ => continue, } }