Skip to content

Commit

Permalink
feat(driver): Turn get_value into an asynchronous step within the s…
Browse files Browse the repository at this point in the history
…tate machine
  • Loading branch information
romac committed Nov 16, 2023
1 parent 8654e59 commit a03adfd
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 107 deletions.
36 changes: 16 additions & 20 deletions Code/driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use malachite_vote::keeper::Message as VoteMessage;
use malachite_vote::keeper::VoteKeeper;
use malachite_vote::Threshold;

use crate::env::Env as DriverEnv;
use crate::event::Event;
use crate::message::Message;
use crate::Error;
Expand All @@ -22,14 +21,12 @@ use crate::Validity;

/// Driver for the state machine of the Malachite consensus engine at a given height.
#[derive(Clone, Debug)]
pub struct Driver<Ctx, Env, PSel>
pub struct Driver<Ctx, PSel>
where
Ctx: Context,
Env: DriverEnv<Ctx>,
PSel: ProposerSelector<Ctx>,
{
pub ctx: Ctx,
pub env: Env,
pub proposer_selector: PSel,

pub height: Ctx::Height,
Expand All @@ -41,15 +38,13 @@ where
pub round_states: BTreeMap<Round, RoundState<Ctx>>,
}

impl<Ctx, Env, PSel> Driver<Ctx, Env, PSel>
impl<Ctx, PSel> Driver<Ctx, PSel>
where
Ctx: Context,
Env: DriverEnv<Ctx>,
PSel: ProposerSelector<Ctx>,
{
pub fn new(
ctx: Ctx,
env: Env,
proposer_selector: PSel,
height: Ctx::Height,
validator_set: Ctx::ValidatorSet,
Expand All @@ -59,7 +54,6 @@ where

Self {
ctx,
env,
proposer_selector,
height,
address,
Expand All @@ -70,10 +64,6 @@ where
}
}

async fn get_value(&self, round: Round) -> Option<Ctx::Value> {
self.env.get_value(self.height.clone(), round).await
}

pub async fn execute(&mut self, msg: Event<Ctx>) -> Result<Option<Message<Ctx>>, Error<Ctx>> {
let round_msg = match self.apply(msg).await? {
Some(msg) => msg,
Expand All @@ -99,6 +89,10 @@ where

RoundMessage::ScheduleTimeout(timeout) => Message::ScheduleTimeout(timeout),

RoundMessage::GetValueAndScheduleTimeout(round, timeout) => {
Message::GetValueAndScheduleTimeout(round, timeout)
}

RoundMessage::Decision(value) => {
// TODO: update the state
Message::Decide(value.round, value.value)
Expand All @@ -111,6 +105,7 @@ where
async fn apply(&mut self, msg: Event<Ctx>) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
match msg {
Event::NewRound(round) => self.apply_new_round(round).await,
Event::ProposeValue(round, value) => Ok(self.apply_propose_value(round, value).await),
Event::Proposal(proposal, validity) => {
Ok(self.apply_proposal(proposal, validity).await)
}
Expand All @@ -133,14 +128,7 @@ where
.ok_or_else(|| Error::ProposerNotFound(proposer_address.clone()))?;

let event = if proposer.address() == &self.address {
// We are the proposer
// TODO: Schedule propose timeout

let Some(value) = self.get_value(round).await else {
return Err(Error::NoValueToPropose);
};

RoundEvent::NewRoundProposer(value)
RoundEvent::NewRoundProposer
} else {
RoundEvent::NewRound
};
Expand All @@ -155,6 +143,14 @@ where
Ok(self.apply_event(round, event))
}

async fn apply_propose_value(
&mut self,
round: Round,
value: Ctx::Value,
) -> Option<RoundMessage<Ctx>> {
self.apply_event(round, RoundEvent::ProposeValue(value))
}

async fn apply_proposal(
&mut self,
proposal: Ctx::Proposal,
Expand Down
19 changes: 0 additions & 19 deletions Code/driver/src/env.rs

This file was deleted.

10 changes: 10 additions & 0 deletions Code/driver/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,18 @@ pub enum Event<Ctx>
where
Ctx: Context,
{
/// Start a new round
NewRound(Round),

/// Propose a value for the given round
ProposeValue(Round, Ctx::Value),

/// Receive a proposal, of the given validity
Proposal(Ctx::Proposal, Validity),

/// Receive a signed vote
Vote(SignedVote<Ctx>),

/// Receive a timeout
TimeoutElapsed(Timeout),
}
2 changes: 0 additions & 2 deletions Code/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
extern crate alloc;

mod driver;
mod env;
mod error;
mod event;
mod message;
mod proposer;
mod util;

pub use driver::Driver;
pub use env::Env;
pub use error::Error;
pub use event::Event;
pub use message::Message;
Expand Down
24 changes: 23 additions & 1 deletion Code/driver/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,23 @@ pub enum Message<Ctx>
where
Ctx: Context,
{
/// Start a new round
NewRound(Round),

/// Broadcast a proposal
Propose(Ctx::Proposal),

/// Broadcast a vote for a value
Vote(SignedVote<Ctx>),

/// Decide on a value
Decide(Round, Ctx::Value),

/// Schedule a timeout
ScheduleTimeout(Timeout),
NewRound(Round),

/// Ask for a value to propose and schedule a timeout
GetValueAndScheduleTimeout(Round, Timeout),
}

// NOTE: We have to derive these instances manually, otherwise
Expand All @@ -26,6 +38,9 @@ impl<Ctx: Context> Clone for Message<Ctx> {
Message::Vote(signed_vote) => Message::Vote(signed_vote.clone()),
Message::Decide(round, value) => Message::Decide(*round, value.clone()),
Message::ScheduleTimeout(timeout) => Message::ScheduleTimeout(*timeout),
Message::GetValueAndScheduleTimeout(round, timeout) => {
Message::GetValueAndScheduleTimeout(*round, *timeout)
}
Message::NewRound(round) => Message::NewRound(*round),
}
}
Expand All @@ -39,6 +54,9 @@ impl<Ctx: Context> fmt::Debug for Message<Ctx> {
Message::Vote(signed_vote) => write!(f, "Vote({:?})", signed_vote),
Message::Decide(round, value) => write!(f, "Decide({:?}, {:?})", round, value),
Message::ScheduleTimeout(timeout) => write!(f, "ScheduleTimeout({:?})", timeout),
Message::GetValueAndScheduleTimeout(round, timeout) => {
write!(f, "GetValueAndScheduleTimeout({:?}, {:?})", round, timeout)
}
Message::NewRound(round) => write!(f, "NewRound({:?})", round),
}
}
Expand All @@ -60,6 +78,10 @@ impl<Ctx: Context> PartialEq for Message<Ctx> {
(Message::ScheduleTimeout(timeout), Message::ScheduleTimeout(other_timeout)) => {
timeout == other_timeout
}
(
Message::GetValueAndScheduleTimeout(round, timeout),
Message::GetValueAndScheduleTimeout(other_round, other_timeout),
) => round == other_round && timeout == other_timeout,
(Message::NewRound(round), Message::NewRound(other_round)) => round == other_round,
_ => false,
}
Expand Down
5 changes: 3 additions & 2 deletions Code/round/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ where
Ctx: Context,
{
NewRound, // Start a new round, not as proposer.L20
NewRoundProposer(Ctx::Value), // Start a new round and propose the Value.L14
Proposal(Ctx::Proposal), // Receive a proposal. L22 + L23 (valid)
NewRoundProposer, // Start a new round and wait for a value to propose.L14
ProposeValue(Ctx::Value), // Propose a value.L14
Proposal(Ctx::Proposal), // Receive a proposal. L22 + L23 (valid)
ProposalAndPolkaPrevious(Ctx::Proposal), // Recieved a proposal and a polka value from a previous round. L28 + L29 (valid)
ProposalInvalid, // Receive an invalid proposal. L26 + L32 (invalid)
PolkaValue(ValueId<Ctx>), // Receive +2/3 prevotes for valueId. L44
Expand Down
25 changes: 20 additions & 5 deletions Code/round/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ pub enum Message<Ctx>
where
Ctx: Context,
{
NewRound(Round), // Move to the new round.
Proposal(Ctx::Proposal), // Broadcast the proposal.
Vote(Ctx::Vote), // Broadcast the vote.
ScheduleTimeout(Timeout), // Schedule the timeout.
Decision(RoundValue<Ctx::Value>), // Decide the value.
NewRound(Round), // Move to the new round.
Proposal(Ctx::Proposal), // Broadcast the proposal.
Vote(Ctx::Vote), // Broadcast the vote.
ScheduleTimeout(Timeout), // Schedule the timeout.
GetValueAndScheduleTimeout(Round, Timeout), // Ask for a value and schedule a timeout.
Decision(RoundValue<Ctx::Value>), // Decide the value.
}

impl<Ctx: Context> Message<Ctx> {
Expand Down Expand Up @@ -47,6 +48,10 @@ impl<Ctx: Context> Message<Ctx> {
Message::ScheduleTimeout(Timeout { round, step })
}

pub fn get_value_and_schedule_timeout(round: Round, step: TimeoutStep) -> Self {
Message::GetValueAndScheduleTimeout(round, Timeout { round, step })
}

pub fn decision(round: Round, value: Ctx::Value) -> Self {
Message::Decision(RoundValue { round, value })
}
Expand All @@ -64,6 +69,9 @@ impl<Ctx: Context> Clone for Message<Ctx> {
Message::Proposal(proposal) => Message::Proposal(proposal.clone()),
Message::Vote(vote) => Message::Vote(vote.clone()),
Message::ScheduleTimeout(timeout) => Message::ScheduleTimeout(*timeout),
Message::GetValueAndScheduleTimeout(round, timeout) => {
Message::GetValueAndScheduleTimeout(*round, *timeout)
}
Message::Decision(round_value) => Message::Decision(round_value.clone()),
}
}
Expand All @@ -77,6 +85,9 @@ impl<Ctx: Context> fmt::Debug for Message<Ctx> {
Message::Proposal(proposal) => write!(f, "Proposal({:?})", proposal),
Message::Vote(vote) => write!(f, "Vote({:?})", vote),
Message::ScheduleTimeout(timeout) => write!(f, "ScheduleTimeout({:?})", timeout),
Message::GetValueAndScheduleTimeout(round, timeout) => {
write!(f, "GetValueAndScheduleTimeout({:?}, {:?})", round, timeout)
}
Message::Decision(round_value) => write!(f, "Decision({:?})", round_value),
}
}
Expand All @@ -94,6 +105,10 @@ impl<Ctx: Context> PartialEq for Message<Ctx> {
(Message::ScheduleTimeout(timeout), Message::ScheduleTimeout(other_timeout)) => {
timeout == other_timeout
}
(
Message::GetValueAndScheduleTimeout(round, timeout),
Message::GetValueAndScheduleTimeout(other_round, other_timeout),
) => round == other_round && timeout == other_timeout,
(Message::Decision(round_value), Message::Decision(other_round_value)) => {
round_value == other_round_value
}
Expand Down
20 changes: 18 additions & 2 deletions Code/round/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,16 @@ where

match (state.step, event) {
// From NewRound. Event must be for current round.
(Step::NewRound, Event::NewRoundProposer(value)) if this_round => {
propose(state, data.height, value) // L11/L14
(Step::NewRound, Event::NewRoundProposer) if this_round => {
get_value_and_schedule_timeout_propose(state) // L18
}
(Step::NewRound, Event::NewRound) if this_round => schedule_timeout_propose(state), // L11/L20

// From Propose. Event must be for current round.
(Step::Propose, Event::ProposeValue(value)) if this_round => {
propose(state, data.height, value) // L11/L14
}

(Step::Propose, Event::Proposal(proposal))
if this_round && proposal.pol_round().is_nil() =>
{
Expand Down Expand Up @@ -131,6 +135,18 @@ where
// Propose
//---------------------------------------------------------------------

/// We are the proposer, but don't have a value yet; schedule timeout propose
/// and ask for a value.
///
/// Ref: L18
pub fn get_value_and_schedule_timeout_propose<Ctx>(state: State<Ctx>) -> Transition<Ctx>
where
Ctx: Context,
{
let timeout = Message::get_value_and_schedule_timeout(state.round, TimeoutStep::Propose);
Transition::to(state.with_step(Step::Propose)).with_message(timeout)
}

/// We are the proposer; propose the valid value if it exists,
/// otherwise propose the given value.
///
Expand Down
25 changes: 0 additions & 25 deletions Code/test/src/env.rs

This file was deleted.

2 changes: 0 additions & 2 deletions Code/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

mod context;
mod env;
mod height;
mod proposal;
mod signing;
Expand All @@ -12,7 +11,6 @@ mod value;
mod vote;

pub use crate::context::*;
pub use crate::env::*;
pub use crate::height::*;
pub use crate::proposal::*;
pub use crate::signing::*;
Expand Down
Loading

0 comments on commit a03adfd

Please sign in to comment.