Skip to content

Commit

Permalink
chore(code): Embed Resumable type inside each Effect for creating…
Browse files Browse the repository at this point in the history
… resumption value (#683)

* chore(code): Embed `Resumable` type inside each `Effect` to provide better type-safety for resuming after performing an effect

* Remove the height from the `GetValidatorSet` resumption

* Add a few more doc comments, cleanup
  • Loading branch information
romac authored Dec 16, 2024
1 parent 0a98050 commit 08a19d8
Show file tree
Hide file tree
Showing 16 changed files with 271 additions and 99 deletions.
174 changes: 153 additions & 21 deletions code/crates/consensus/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,36 @@ use crate::input::RequestId;
use crate::types::SignedConsensusMsg;
use crate::ConsensusMsg;

/// Provides a way to construct the appropriate [`Resume`] value to
/// resume execution after handling an [`Effect`].
///
/// Eeach `Effect` embeds a value that implements [`Resumable`]
/// which is used to construct the appropriate [`Resume`] value.
///
/// ## Example
///
/// ```rust,ignore
/// fn effect_handler(effect: Effect<Ctx>) -> Result<Resume<Ctx>, Error> {
/// match effect {
/// Effect::ResetTimeouts(r) => {
/// reset_timeouts();
/// Ok(r.resume_with(()))
/// }
/// Effect::GetValidatorSet(height, r) => {)
/// let validator_set = get_validator_set(height);
/// Ok(r.resume_with(validator_set))
/// }
/// // ...
/// }
/// ```
pub trait Resumable<Ctx: Context> {
/// The value type that will be used to resume execution
type Value;

/// Creates the appropriate [`Resume`] value to resume execution with.
fn resume_with(self, value: Self::Value) -> Resume<Ctx>;
}

/// An effect which may be yielded by a consensus process.
///
/// Effects are handled by the caller using [`process!`][process]
Expand All @@ -21,71 +51,102 @@ where
{
/// Reset all timeouts
/// Resume with: [`Resume::Continue`]
ResetTimeouts,
ResetTimeouts(resume::Continue),

/// Cancel all timeouts
/// Resume with: [`Resume::Continue`]
CancelAllTimeouts,
CancelAllTimeouts(resume::Continue),

/// Cancel a given timeout
/// Resume with: [`Resume::Continue`]
CancelTimeout(Timeout),
CancelTimeout(Timeout, resume::Continue),

/// Schedule a timeout
/// Resume with: [`Resume::Continue`]
ScheduleTimeout(Timeout),
ScheduleTimeout(Timeout, resume::Continue),

/// Consensus is starting a new round with the given proposer
/// Resume with: [`Resume::Continue`]
StartRound(Ctx::Height, Round, Ctx::Address),
StartRound(Ctx::Height, Round, Ctx::Address, resume::Continue),

/// Broadcast a message
/// Resume with: [`Resume::Continue`]
Broadcast(SignedConsensusMsg<Ctx>),
Broadcast(SignedConsensusMsg<Ctx>, resume::Continue),

/// Get a value to propose at the given height and round, within the given timeout
/// Resume with: [`Resume::Continue`]
GetValue(Ctx::Height, Round, Timeout),
GetValue(Ctx::Height, Round, Timeout, resume::Continue),

/// Restream value at the given height, round and valid round
/// Restream the value identified by the given information.
/// Resume with: [`Resume::Continue`]
RestreamValue(Ctx::Height, Round, Round, Ctx::Address, ValueId<Ctx>),
RestreamValue(
/// Height of the value
Ctx::Height,
/// Round of the value
Round,
/// Valid round of the value
Round,
/// Address of the proposer for that value
Ctx::Address,
/// Value ID of the value to restream
ValueId<Ctx>,
/// For resumption
resume::Continue,
),

/// Get the validator set at the given height
/// Resume with: [`Resume::ValidatorSet`]
GetValidatorSet(Ctx::Height),
GetValidatorSet(Ctx::Height, resume::ValidatorSet),

/// Consensus has decided on a value
/// Resume with: [`Resume::Continue`]
Decide { certificate: CommitCertificate<Ctx> },
Decide(CommitCertificate<Ctx>, resume::Continue),

/// Consensus has been stuck in Prevote or Precommit step, ask for vote sets from peers
/// Resume with: [`Resume::Continue`]
GetVoteSet(Ctx::Height, Round),
GetVoteSet(Ctx::Height, Round, resume::Continue),

/// A peer has required our vote set, send the response
SendVoteSetResponse(RequestId, Ctx::Height, Round, VoteSet<Ctx>),
/// Resume with: [`Resume::Continue`]`
SendVoteSetResponse(
RequestId,
Ctx::Height,
Round,
VoteSet<Ctx>,
resume::Continue,
),

/// Persist a consensus message in the Write-Ahead Log for crash recovery
PersistMessage(SignedConsensusMsg<Ctx>),
/// Resume with: [`Resume::Continue`]`
PersistMessage(SignedConsensusMsg<Ctx>, resume::Continue),

/// Persist a timeout in the Write-Ahead Log for crash recovery
PersistTimeout(Timeout),
/// Resume with: [`Resume::Continue`]`
PersistTimeout(Timeout, resume::Continue),

/// Sign a vote with this node's private key
/// Resume with: [`Resume::SignedVote`]
SignVote(Ctx::Vote),
SignVote(Ctx::Vote, resume::SignedVote),

/// Sign a proposal with this node's private key
/// Resume with: [`Resume::SignedProposal`]
SignProposal(Ctx::Proposal),
SignProposal(Ctx::Proposal, resume::SignedProposal),

/// Verify a signature
/// Resume with: [`Resume::SignatureValidity`]
VerifySignature(SignedMessage<Ctx, ConsensusMsg<Ctx>>, PublicKey<Ctx>),
VerifySignature(
SignedMessage<Ctx, ConsensusMsg<Ctx>>,
PublicKey<Ctx>,
resume::SignatureValidity,
),

/// Verify a commit certificate
VerifyCertificate(CommitCertificate<Ctx>, Ctx::ValidatorSet, ThresholdParams),
VerifyCertificate(
CommitCertificate<Ctx>,
Ctx::ValidatorSet,
ThresholdParams,
resume::CertificateValidity,
),
}

/// A value with which the consensus process can be resumed after yielding an [`Effect`].
Expand All @@ -103,8 +164,9 @@ where
/// Resume execution
Continue,

/// Resume execution with an optional validator set at the given height
ValidatorSet(Ctx::Height, Option<Ctx::ValidatorSet>),
/// Resume execution with `Some(Ctx::ValidatorSet)` if a validator set
/// was successfully fetched, or `None` otherwise.
ValidatorSet(Option<Ctx::ValidatorSet>),

/// Resume execution with the validity of the signature
SignatureValidity(bool),
Expand All @@ -118,3 +180,73 @@ where
/// Resume execution with the result of the verification of the [`CommitCertificate`]
CertificateValidity(Result<(), CertificateError<Ctx>>),
}

pub mod resume {
use super::*;

#[derive(Debug, Default)]
pub struct Continue;

impl<Ctx: Context> Resumable<Ctx> for Continue {
type Value = ();

fn resume_with(self, _: ()) -> Resume<Ctx> {
Resume::Continue
}
}

#[derive(Debug, Default)]
pub struct ValidatorSet;

impl<Ctx: Context> Resumable<Ctx> for ValidatorSet {
type Value = Option<Ctx::ValidatorSet>;

fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
Resume::ValidatorSet(value)
}
}

#[derive(Debug, Default)]
pub struct SignatureValidity;

impl<Ctx: Context> Resumable<Ctx> for SignatureValidity {
type Value = bool;

fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
Resume::SignatureValidity(value)
}
}

#[derive(Debug, Default)]
pub struct SignedVote;

impl<Ctx: Context> Resumable<Ctx> for SignedVote {
type Value = SignedMessage<Ctx, Ctx::Vote>;

fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
Resume::SignedVote(value)
}
}

#[derive(Debug, Default)]
pub struct SignedProposal;

impl<Ctx: Context> Resumable<Ctx> for SignedProposal {
type Value = SignedMessage<Ctx, Ctx::Proposal>;

fn resume_with(self, a: Self::Value) -> Resume<Ctx> {
Resume::SignedProposal(a)
}
}

#[derive(Debug, Default)]
pub struct CertificateValidity;

impl<Ctx: Context> Resumable<Ctx> for CertificateValidity {
type Value = Result<(), CertificateError<Ctx>>;

fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
Resume::CertificateValidity(value)
}
}
}
2 changes: 2 additions & 0 deletions code/crates/consensus/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use crate::effect::Resume;
/// The types of error that can be emitted by the consensus process.
#[derive_where(Debug)]
#[derive(thiserror::Error)]
#[allow(private_interfaces)]
pub enum Error<Ctx>
where
Ctx: Context,
{
/// The consensus process was resumed with a value which
/// does not match the expected type of resume value.
#[allow(private_interfaces)]
#[error("Unexpected resume: {0:?}, expected one of: {1}")]
UnexpectedResume(Resume<Ctx>, &'static str),

Expand Down
5 changes: 4 additions & 1 deletion code/crates/consensus/src/gen.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use genawaiter::sync as gen;
use genawaiter::GeneratorState;

use crate::{Effect, Error, Resume};
use crate::effect::{Effect, Resume};
use crate::error::Error;

pub use gen::Gen;

#[allow(private_interfaces)]
pub type Co<Ctx> = gen::Co<Effect<Ctx>, Resume<Ctx>>;

pub type CoResult<Ctx> = GeneratorState<Effect<Ctx>, Result<(), Error<Ctx>>>;
1 change: 1 addition & 0 deletions code/crates/consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use timeout::on_timeout_elapsed;
use vote::on_vote;
use vote_set::{on_vote_set_request, on_vote_set_response};

#[allow(private_interfaces)]
pub async fn handle<Ctx>(
co: Co<Ctx>,
state: &mut State<Ctx>,
Expand Down
2 changes: 1 addition & 1 deletion code/crates/consensus/src/handle/decide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
CommitCertificate::new(height, proposal_round, value.id(), commits)
});

perform!(co, Effect::Decide { certificate });
perform!(co, Effect::Decide(certificate, Default::default()));

// Reinitialize to remove any previous round or equivocating precommits.
// TODO: Revise when evidence module is added.
Expand Down
Loading

0 comments on commit 08a19d8

Please sign in to comment.