Skip to content

Commit

Permalink
chore(code): Cleanup malachite-consensus (#432)
Browse files Browse the repository at this point in the history
* Rename `GossipMsg` to `SignedConsensusMsg`

* Hide some modules from documentation

* Add missing doc comment

* Hide internal `perform!` macro

* Fix link

* Rename `Msg` to `Input`

* Fix link

* Rename `msg_queue` to `input_queue`
  • Loading branch information
romac authored Sep 30, 2024
1 parent 095973e commit 0258a64
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 81 deletions.
34 changes: 22 additions & 12 deletions code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub enum Msg<Ctx: Context> {
ReceivedProposedValue(ProposedValue<Ctx>),
}

type InnerMsg<Ctx> = malachite_consensus::Msg<Ctx>;
type ConsensusInput<Ctx> = malachite_consensus::Input<Ctx>;

impl<Ctx: Context> From<TimeoutElapsed<Timeout>> for Msg<Ctx> {
fn from(msg: TimeoutElapsed<Timeout>) -> Self {
Expand Down Expand Up @@ -168,14 +168,14 @@ where
Ok(actor_ref)
}

async fn process_msg(
async fn process_input(
&self,
myself: &ActorRef<Msg<Ctx>>,
state: &mut State<Ctx>,
msg: InnerMsg<Ctx>,
input: ConsensusInput<Ctx>,
) -> Result<(), ActorProcessingErr> {
malachite_consensus::process!(
msg: msg,
input: input,
state: &mut state.consensus,
metrics: &self.metrics,
with: effect => {
Expand All @@ -194,7 +194,11 @@ where
Msg::StartHeight(height) => {
let validator_set = self.get_validator_set(height).await?;
let result = self
.process_msg(&myself, state, InnerMsg::StartHeight(height, validator_set))
.process_input(
&myself,
state,
ConsensusInput::StartHeight(height, validator_set),
)
.await;

if let Err(e) = result {
Expand All @@ -206,7 +210,11 @@ where

Msg::ProposeValue(height, round, value) => {
let result = self
.process_msg(&myself, state, InnerMsg::ProposeValue(height, round, value))
.process_input(
&myself,
state,
ConsensusInput::ProposeValue(height, round, value),
)
.await;

if let Err(e) = result {
Expand Down Expand Up @@ -245,10 +253,10 @@ where
let validator_set = self.get_validator_set(height).await?;

let result = self
.process_msg(
.process_input(
&myself,
state,
InnerMsg::StartHeight(height, validator_set),
ConsensusInput::StartHeight(height, validator_set),
)
.await;

Expand All @@ -273,7 +281,9 @@ where
}

GossipEvent::Vote(from, vote) => {
if let Err(e) = self.process_msg(&myself, state, InnerMsg::Vote(vote)).await
if let Err(e) = self
.process_input(&myself, state, ConsensusInput::Vote(vote))
.await
{
error!(%from, "Error when processing vote: {e:?}");
}
Expand All @@ -283,7 +293,7 @@ where

GossipEvent::Proposal(from, proposal) => {
if let Err(e) = self
.process_msg(&myself, state, InnerMsg::Proposal(proposal))
.process_input(&myself, state, ConsensusInput::Proposal(proposal))
.await
{
error!(%from, "Error when processing proposal: {e:?}");
Expand Down Expand Up @@ -372,7 +382,7 @@ where
}

let result = self
.process_msg(&myself, state, InnerMsg::TimeoutElapsed(timeout))
.process_input(&myself, state, ConsensusInput::TimeoutElapsed(timeout))
.await;

if let Err(e) = result {
Expand All @@ -384,7 +394,7 @@ where

Msg::ReceivedProposedValue(value) => {
let result = self
.process_msg(&myself, state, InnerMsg::ReceivedProposedValue(value))
.process_input(&myself, state, ConsensusInput::ReceivedProposedValue(value))
.await;

if let Err(e) = result {
Expand Down
10 changes: 5 additions & 5 deletions code/crates/actors/src/gossip_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::task::JoinHandle;
use tracing::{debug, error, error_span, Instrument};

use malachite_common::{Context, SignedProposal, SignedVote};
use malachite_consensus::GossipMsg;
use malachite_consensus::SignedConsensusMsg;
use malachite_gossip_consensus::handle::CtrlHandle;
use malachite_gossip_consensus::{Channel, Config, Event, Multiaddr, PeerId};
use malachite_metrics::SharedRegistry;
Expand Down Expand Up @@ -98,8 +98,8 @@ pub enum Msg<Ctx: Context> {
/// Subscribe this actor to receive gossip events
Subscribe(ActorRef<GossipEvent<Ctx>>),

/// Broadcast a gossip message
BroadcastMsg(GossipMsg<Ctx>),
/// Broadcast a signed consensus message
BroadcastMsg(SignedConsensusMsg<Ctx>),

/// Broadcast a proposal part
BroadcastProposalPart(StreamMessage<Ctx::ProposalPart>),
Expand Down Expand Up @@ -225,8 +225,8 @@ where
};

let event = match msg {
GossipMsg::Vote(vote) => GossipEvent::Vote(from, vote),
GossipMsg::Proposal(proposal) => GossipEvent::Proposal(from, proposal),
SignedConsensusMsg::Vote(vote) => GossipEvent::Vote(from, vote),
SignedConsensusMsg::Proposal(proposal) => GossipEvent::Proposal(from, proposal),
};

self.publish(event, subscribers);
Expand Down
6 changes: 3 additions & 3 deletions code/crates/actors/src/util/codec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use malachite_common::Context;
use malachite_consensus::GossipMsg;
use malachite_consensus::SignedConsensusMsg;
use malachite_gossip_consensus::Bytes;
use malachite_proto::Protobuf;

Expand All @@ -8,8 +8,8 @@ use super::streaming::StreamMessage;
pub trait NetworkCodec<Ctx: Context>: Sync + Send + 'static {
type Error: std::error::Error + Send + Sync + 'static;

fn decode_msg(bytes: Bytes) -> Result<GossipMsg<Ctx>, Self::Error>;
fn encode_msg(msg: GossipMsg<Ctx>) -> Result<Bytes, Self::Error>;
fn decode_msg(bytes: Bytes) -> Result<SignedConsensusMsg<Ctx>, Self::Error>;
fn encode_msg(msg: SignedConsensusMsg<Ctx>) -> Result<Bytes, Self::Error>;

fn decode_stream_msg<T>(bytes: Bytes) -> Result<StreamMessage<T>, Self::Error>
where
Expand Down
6 changes: 3 additions & 3 deletions code/crates/consensus/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use derive_where::derive_where;

use malachite_common::*;

use crate::types::GossipMsg;
use crate::types::SignedConsensusMsg;

/// An effect which may be yielded by a consensus process.
///
/// Effects are handled by the caller using [`process`][process].
/// Effects are handled by the caller using [`process!`][process].
/// After that the consensus computation is then resumed.
///
/// [process]: crate::process
Expand All @@ -32,7 +32,7 @@ where
StartRound(Ctx::Height, Round, Ctx::Address),

/// Broadcast a message
Broadcast(GossipMsg<Ctx>),
Broadcast(SignedConsensusMsg<Ctx>),

/// Get a value to propose at the given height and round, within the given timeout
GetValue(Ctx::Height, Round, Timeout),
Expand Down
49 changes: 28 additions & 21 deletions code/crates/consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,45 +9,45 @@ use malachite_metrics::Metrics;
use crate::effect::Effect;
use crate::error::Error;
use crate::gen::Co;
use crate::msg::Msg;
use crate::input::Input;
use crate::perform;
use crate::state::State;
use crate::types::{GossipMsg, ProposedValue};
use crate::types::{ProposedValue, SignedConsensusMsg};
use crate::util::pretty::{PrettyProposal, PrettyVal, PrettyVote};

pub async fn handle<Ctx>(
co: Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
msg: Msg<Ctx>,
input: Input<Ctx>,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
{
handle_msg(&co, state, metrics, msg).await
handle_msg(&co, state, metrics, input).await
}

#[async_recursion]
async fn handle_msg<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
msg: Msg<Ctx>,
input: Input<Ctx>,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
{
match msg {
Msg::StartHeight(height, vs) => {
match input {
Input::StartHeight(height, vs) => {
reset_and_start_height(co, state, metrics, height, vs).await
}
Msg::Vote(vote) => on_vote(co, state, metrics, vote).await,
Msg::Proposal(proposal) => on_proposal(co, state, metrics, proposal).await,
Msg::ProposeValue(height, round, value) => {
Input::Vote(vote) => on_vote(co, state, metrics, vote).await,
Input::Proposal(proposal) => on_proposal(co, state, metrics, proposal).await,
Input::ProposeValue(height, round, value) => {
propose_value(co, state, metrics, height, round, value).await
}
Msg::TimeoutElapsed(timeout) => on_timeout_elapsed(co, state, metrics, timeout).await,
Msg::ReceivedProposedValue(block) => {
Input::TimeoutElapsed(timeout) => on_timeout_elapsed(co, state, metrics, timeout).await,
Input::ReceivedProposedValue(block) => {
on_received_proposed_value(co, state, metrics, block).await
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn replay_pending_msgs<Ctx>(
where
Ctx: Context,
{
let pending_msgs = std::mem::take(&mut state.msg_queue);
let pending_msgs = std::mem::take(&mut state.input_queue);
debug!("Replaying {} messages", pending_msgs.len());

for pending_msg in pending_msgs {
Expand Down Expand Up @@ -268,7 +268,7 @@ where

perform!(
co,
Effect::Broadcast(GossipMsg::Proposal(signed_proposal.clone()))
Effect::Broadcast(SignedConsensusMsg::Proposal(signed_proposal.clone()))
);

on_proposal(co, state, metrics, signed_proposal).await
Expand All @@ -284,7 +284,10 @@ where

let signed_vote = state.ctx.sign_vote(vote);

perform!(co, Effect::Broadcast(GossipMsg::Vote(signed_vote.clone()),));
perform!(
co,
Effect::Broadcast(SignedConsensusMsg::Vote(signed_vote.clone()))
);

apply_driver_input(co, state, metrics, DriverInput::Vote(signed_vote)).await
}
Expand Down Expand Up @@ -482,7 +485,7 @@ where
"Received vote at round -1, queuing for later"
);

state.msg_queue.push_back(Msg::Vote(signed_vote));
state.input_queue.push_back(Input::Vote(signed_vote));
return Ok(());
}

Expand All @@ -494,7 +497,7 @@ where
"Received vote for higher height, queuing for later"
);

state.msg_queue.push_back(Msg::Vote(signed_vote));
state.input_queue.push_back(Input::Vote(signed_vote));
return Ok(());
}

Expand Down Expand Up @@ -568,13 +571,17 @@ where
// Drop all others.
if state.driver.round() == Round::Nil {
debug!("Received proposal at round -1, queuing for later");
state.msg_queue.push_back(Msg::Proposal(signed_proposal));
state
.input_queue
.push_back(Input::Proposal(signed_proposal));
return Ok(());
}

if state.driver.height() < proposal_height {
debug!("Received proposal for higher height, queuing for later");
state.msg_queue.push_back(Msg::Proposal(signed_proposal));
state
.input_queue
.push_back(Input::Proposal(signed_proposal));
return Ok(());
}

Expand Down Expand Up @@ -675,8 +682,8 @@ where
if state.driver.height() < proposed_value.height {
debug!("Received value for higher height, queuing for later");
state
.msg_queue
.push_back(Msg::ReceivedProposedValue(proposed_value));
.input_queue
.push_back(Input::ReceivedProposedValue(proposed_value));
return Ok(());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::ProposedValue;
use derive_where::derive_where;
use malachite_common::{Context, Round, SignedProposal, SignedVote, Timeout};

/// Messages that can be handled by the consensus process
use crate::types::ProposedValue;

/// Inputs to be handled by the consensus process.
#[derive_where(Clone, Debug, PartialEq, Eq)]
pub enum Msg<Ctx>
pub enum Input<Ctx>
where
Ctx: Context,
{
Expand Down
7 changes: 5 additions & 2 deletions code/crates/consensus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod msg;
pub use msg::Msg;
mod input;
pub use input::Input;

mod state;
pub use state::State;
Expand All @@ -21,11 +21,14 @@ mod handle;
mod macros;
mod util;

// Only used in macros
#[doc(hidden)]
pub mod gen;

// Only used in macros
#[doc(hidden)]
pub use handle::handle;

// Only used internally, but needs to be exposed for tests
#[doc(hidden)]
pub use full_proposal::{FullProposal, FullProposalKeeper};
19 changes: 10 additions & 9 deletions code/crates/consensus/src/macros.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
/// Process a message and handle the emitted effects.
/// Process an input and handle the emitted effects.
///
/// # Example
///
/// ```rust,ignore
///
/// malachite_consensus::process!(
/// // Message to process
/// msg: msg,
/// // Consensus state and metrics
/// state: &mut state, metrics: &metrics,
/// // Input to process
/// input: input,
/// // Consensus state
/// state: &mut state,
/// // Metrics
/// metrics: &metrics,
/// // Effect handler
/// on: effect => handle_effect(myself, &mut timers, &mut timeouts, effect).await
/// on: effect => handle_effect(effect).await
/// )
/// ```
#[macro_export]
macro_rules! process {
(msg: $msg:expr, state: $state:expr, metrics: $metrics:expr, with: $effect:ident => $handle:expr) => {{
let mut gen = $crate::gen::Gen::new(|co| $crate::handle(co, $state, $metrics, $msg));
(input: $input:expr, state: $state:expr, metrics: $metrics:expr, with: $effect:ident => $handle:expr) => {{
let mut gen = $crate::gen::Gen::new(|co| $crate::handle(co, $state, $metrics, $input));

let mut co_result = gen.resume_with(());

Expand Down
1 change: 1 addition & 0 deletions code/crates/consensus/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use malachite_common::Context;

pub use malachite_driver::ThresholdParams;

/// Consensus parameters.
#[derive_where(Clone, Debug)]
pub struct Params<Ctx: Context> {
pub start_height: Ctx::Height,
Expand Down
Loading

0 comments on commit 0258a64

Please sign in to comment.