From a03adfdc1b984935bc3f8343b885534747926e75 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 14 Nov 2023 14:18:27 +0100 Subject: [PATCH] feat(driver): Turn `get_value` into an asynchronous step within the state machine --- Code/driver/src/driver.rs | 36 ++++++++--------- Code/driver/src/env.rs | 19 --------- Code/driver/src/event.rs | 10 +++++ Code/driver/src/lib.rs | 2 - Code/driver/src/message.rs | 24 +++++++++++- Code/round/src/events.rs | 5 ++- Code/round/src/message.rs | 25 +++++++++--- Code/round/src/state_machine.rs | 20 +++++++++- Code/test/src/env.rs | 25 ------------ Code/test/src/lib.rs | 2 - Code/test/tests/driver.rs | 68 +++++++++++++++++++-------------- Code/test/tests/round.rs | 10 ++++- 12 files changed, 139 insertions(+), 107 deletions(-) delete mode 100644 Code/driver/src/env.rs delete mode 100644 Code/test/src/env.rs diff --git a/Code/driver/src/driver.rs b/Code/driver/src/driver.rs index d7e044fe4..6741d0259 100644 --- a/Code/driver/src/driver.rs +++ b/Code/driver/src/driver.rs @@ -13,7 +13,6 @@ use malachite_vote::keeper::Message as VoteMessage; use malachite_vote::keeper::VoteKeeper; use malachite_vote::Threshold; -use crate::env::Env as DriverEnv; use crate::event::Event; use crate::message::Message; use crate::Error; @@ -22,14 +21,12 @@ use crate::Validity; /// Driver for the state machine of the Malachite consensus engine at a given height. #[derive(Clone, Debug)] -pub struct Driver +pub struct Driver where Ctx: Context, - Env: DriverEnv, PSel: ProposerSelector, { pub ctx: Ctx, - pub env: Env, pub proposer_selector: PSel, pub height: Ctx::Height, @@ -41,15 +38,13 @@ where pub round_states: BTreeMap>, } -impl Driver +impl Driver where Ctx: Context, - Env: DriverEnv, PSel: ProposerSelector, { pub fn new( ctx: Ctx, - env: Env, proposer_selector: PSel, height: Ctx::Height, validator_set: Ctx::ValidatorSet, @@ -59,7 +54,6 @@ where Self { ctx, - env, proposer_selector, height, address, @@ -70,10 +64,6 @@ where } } - async fn get_value(&self, round: Round) -> Option { - self.env.get_value(self.height.clone(), round).await - } - pub async fn execute(&mut self, msg: Event) -> Result>, Error> { let round_msg = match self.apply(msg).await? { Some(msg) => msg, @@ -99,6 +89,10 @@ where RoundMessage::ScheduleTimeout(timeout) => Message::ScheduleTimeout(timeout), + RoundMessage::GetValueAndScheduleTimeout(round, timeout) => { + Message::GetValueAndScheduleTimeout(round, timeout) + } + RoundMessage::Decision(value) => { // TODO: update the state Message::Decide(value.round, value.value) @@ -111,6 +105,7 @@ where async fn apply(&mut self, msg: Event) -> Result>, Error> { match msg { Event::NewRound(round) => self.apply_new_round(round).await, + Event::ProposeValue(round, value) => Ok(self.apply_propose_value(round, value).await), Event::Proposal(proposal, validity) => { Ok(self.apply_proposal(proposal, validity).await) } @@ -133,14 +128,7 @@ where .ok_or_else(|| Error::ProposerNotFound(proposer_address.clone()))?; let event = if proposer.address() == &self.address { - // We are the proposer - // TODO: Schedule propose timeout - - let Some(value) = self.get_value(round).await else { - return Err(Error::NoValueToPropose); - }; - - RoundEvent::NewRoundProposer(value) + RoundEvent::NewRoundProposer } else { RoundEvent::NewRound }; @@ -155,6 +143,14 @@ where Ok(self.apply_event(round, event)) } + async fn apply_propose_value( + &mut self, + round: Round, + value: Ctx::Value, + ) -> Option> { + self.apply_event(round, RoundEvent::ProposeValue(value)) + } + async fn apply_proposal( &mut self, proposal: Ctx::Proposal, diff --git a/Code/driver/src/env.rs b/Code/driver/src/env.rs deleted file mode 100644 index 2915da387..000000000 --- a/Code/driver/src/env.rs +++ /dev/null @@ -1,19 +0,0 @@ -use alloc::boxed::Box; - -use async_trait::async_trait; - -use malachite_common::{Context, Round}; - -/// Environment for use by the [`Driver`](crate::Driver) to ask -/// for a value to propose and validate proposals. -#[async_trait] -pub trait Env -where - Ctx: Context, -{ - /// Get the value to propose for the given height and round. - /// - /// If `None` is returned, the driver will understand this - /// as an error and will not propose a value. - async fn get_value(&self, height: Ctx::Height, round: Round) -> Option; -} diff --git a/Code/driver/src/event.rs b/Code/driver/src/event.rs index d75dfab5c..c3b411df5 100644 --- a/Code/driver/src/event.rs +++ b/Code/driver/src/event.rs @@ -8,8 +8,18 @@ pub enum Event where Ctx: Context, { + /// Start a new round NewRound(Round), + + /// Propose a value for the given round + ProposeValue(Round, Ctx::Value), + + /// Receive a proposal, of the given validity Proposal(Ctx::Proposal, Validity), + + /// Receive a signed vote Vote(SignedVote), + + /// Receive a timeout TimeoutElapsed(Timeout), } diff --git a/Code/driver/src/lib.rs b/Code/driver/src/lib.rs index 62401f55e..6fba7865f 100644 --- a/Code/driver/src/lib.rs +++ b/Code/driver/src/lib.rs @@ -15,7 +15,6 @@ extern crate alloc; mod driver; -mod env; mod error; mod event; mod message; @@ -23,7 +22,6 @@ mod proposer; mod util; pub use driver::Driver; -pub use env::Env; pub use error::Error; pub use event::Event; pub use message::Message; diff --git a/Code/driver/src/message.rs b/Code/driver/src/message.rs index 89a56333b..fd21fc569 100644 --- a/Code/driver/src/message.rs +++ b/Code/driver/src/message.rs @@ -7,11 +7,23 @@ pub enum Message where Ctx: Context, { + /// Start a new round + NewRound(Round), + + /// Broadcast a proposal Propose(Ctx::Proposal), + + /// Broadcast a vote for a value Vote(SignedVote), + + /// Decide on a value Decide(Round, Ctx::Value), + + /// Schedule a timeout ScheduleTimeout(Timeout), - NewRound(Round), + + /// Ask for a value to propose and schedule a timeout + GetValueAndScheduleTimeout(Round, Timeout), } // NOTE: We have to derive these instances manually, otherwise @@ -26,6 +38,9 @@ impl Clone for Message { Message::Vote(signed_vote) => Message::Vote(signed_vote.clone()), Message::Decide(round, value) => Message::Decide(*round, value.clone()), Message::ScheduleTimeout(timeout) => Message::ScheduleTimeout(*timeout), + Message::GetValueAndScheduleTimeout(round, timeout) => { + Message::GetValueAndScheduleTimeout(*round, *timeout) + } Message::NewRound(round) => Message::NewRound(*round), } } @@ -39,6 +54,9 @@ impl fmt::Debug for Message { Message::Vote(signed_vote) => write!(f, "Vote({:?})", signed_vote), Message::Decide(round, value) => write!(f, "Decide({:?}, {:?})", round, value), Message::ScheduleTimeout(timeout) => write!(f, "ScheduleTimeout({:?})", timeout), + Message::GetValueAndScheduleTimeout(round, timeout) => { + write!(f, "GetValueAndScheduleTimeout({:?}, {:?})", round, timeout) + } Message::NewRound(round) => write!(f, "NewRound({:?})", round), } } @@ -60,6 +78,10 @@ impl PartialEq for Message { (Message::ScheduleTimeout(timeout), Message::ScheduleTimeout(other_timeout)) => { timeout == other_timeout } + ( + Message::GetValueAndScheduleTimeout(round, timeout), + Message::GetValueAndScheduleTimeout(other_round, other_timeout), + ) => round == other_round && timeout == other_timeout, (Message::NewRound(round), Message::NewRound(other_round)) => round == other_round, _ => false, } diff --git a/Code/round/src/events.rs b/Code/round/src/events.rs index a0e1e4bba..544edfc00 100644 --- a/Code/round/src/events.rs +++ b/Code/round/src/events.rs @@ -6,8 +6,9 @@ where Ctx: Context, { NewRound, // Start a new round, not as proposer.L20 - NewRoundProposer(Ctx::Value), // Start a new round and propose the Value.L14 - Proposal(Ctx::Proposal), // Receive a proposal. L22 + L23 (valid) + NewRoundProposer, // Start a new round and wait for a value to propose.L14 + ProposeValue(Ctx::Value), // Propose a value.L14 + Proposal(Ctx::Proposal), // Receive a proposal. L22 + L23 (valid) ProposalAndPolkaPrevious(Ctx::Proposal), // Recieved a proposal and a polka value from a previous round. L28 + L29 (valid) ProposalInvalid, // Receive an invalid proposal. L26 + L32 (invalid) PolkaValue(ValueId), // Receive +2/3 prevotes for valueId. L44 diff --git a/Code/round/src/message.rs b/Code/round/src/message.rs index 877677f9e..fc5d8f8f5 100644 --- a/Code/round/src/message.rs +++ b/Code/round/src/message.rs @@ -8,11 +8,12 @@ pub enum Message where Ctx: Context, { - NewRound(Round), // Move to the new round. - Proposal(Ctx::Proposal), // Broadcast the proposal. - Vote(Ctx::Vote), // Broadcast the vote. - ScheduleTimeout(Timeout), // Schedule the timeout. - Decision(RoundValue), // Decide the value. + NewRound(Round), // Move to the new round. + Proposal(Ctx::Proposal), // Broadcast the proposal. + Vote(Ctx::Vote), // Broadcast the vote. + ScheduleTimeout(Timeout), // Schedule the timeout. + GetValueAndScheduleTimeout(Round, Timeout), // Ask for a value and schedule a timeout. + Decision(RoundValue), // Decide the value. } impl Message { @@ -47,6 +48,10 @@ impl Message { Message::ScheduleTimeout(Timeout { round, step }) } + pub fn get_value_and_schedule_timeout(round: Round, step: TimeoutStep) -> Self { + Message::GetValueAndScheduleTimeout(round, Timeout { round, step }) + } + pub fn decision(round: Round, value: Ctx::Value) -> Self { Message::Decision(RoundValue { round, value }) } @@ -64,6 +69,9 @@ impl Clone for Message { Message::Proposal(proposal) => Message::Proposal(proposal.clone()), Message::Vote(vote) => Message::Vote(vote.clone()), Message::ScheduleTimeout(timeout) => Message::ScheduleTimeout(*timeout), + Message::GetValueAndScheduleTimeout(round, timeout) => { + Message::GetValueAndScheduleTimeout(*round, *timeout) + } Message::Decision(round_value) => Message::Decision(round_value.clone()), } } @@ -77,6 +85,9 @@ impl fmt::Debug for Message { Message::Proposal(proposal) => write!(f, "Proposal({:?})", proposal), Message::Vote(vote) => write!(f, "Vote({:?})", vote), Message::ScheduleTimeout(timeout) => write!(f, "ScheduleTimeout({:?})", timeout), + Message::GetValueAndScheduleTimeout(round, timeout) => { + write!(f, "GetValueAndScheduleTimeout({:?}, {:?})", round, timeout) + } Message::Decision(round_value) => write!(f, "Decision({:?})", round_value), } } @@ -94,6 +105,10 @@ impl PartialEq for Message { (Message::ScheduleTimeout(timeout), Message::ScheduleTimeout(other_timeout)) => { timeout == other_timeout } + ( + Message::GetValueAndScheduleTimeout(round, timeout), + Message::GetValueAndScheduleTimeout(other_round, other_timeout), + ) => round == other_round && timeout == other_timeout, (Message::Decision(round_value), Message::Decision(other_round_value)) => { round_value == other_round_value } diff --git a/Code/round/src/state_machine.rs b/Code/round/src/state_machine.rs index cf51be128..8e28fef7c 100644 --- a/Code/round/src/state_machine.rs +++ b/Code/round/src/state_machine.rs @@ -61,12 +61,16 @@ where match (state.step, event) { // From NewRound. Event must be for current round. - (Step::NewRound, Event::NewRoundProposer(value)) if this_round => { - propose(state, data.height, value) // L11/L14 + (Step::NewRound, Event::NewRoundProposer) if this_round => { + get_value_and_schedule_timeout_propose(state) // L18 } (Step::NewRound, Event::NewRound) if this_round => schedule_timeout_propose(state), // L11/L20 // From Propose. Event must be for current round. + (Step::Propose, Event::ProposeValue(value)) if this_round => { + propose(state, data.height, value) // L11/L14 + } + (Step::Propose, Event::Proposal(proposal)) if this_round && proposal.pol_round().is_nil() => { @@ -131,6 +135,18 @@ where // Propose //--------------------------------------------------------------------- +/// We are the proposer, but don't have a value yet; schedule timeout propose +/// and ask for a value. +/// +/// Ref: L18 +pub fn get_value_and_schedule_timeout_propose(state: State) -> Transition +where + Ctx: Context, +{ + let timeout = Message::get_value_and_schedule_timeout(state.round, TimeoutStep::Propose); + Transition::to(state.with_step(Step::Propose)).with_message(timeout) +} + /// We are the proposer; propose the valid value if it exists, /// otherwise propose the given value. /// diff --git a/Code/test/src/env.rs b/Code/test/src/env.rs deleted file mode 100644 index af79543a4..000000000 --- a/Code/test/src/env.rs +++ /dev/null @@ -1,25 +0,0 @@ -use async_trait::async_trait; - -use malachite_common::Round; -use malachite_driver::Env; - -use crate::{Height, TestContext, Value}; - -pub struct TestEnv { - get_value: Box Option + Send + Sync>, -} - -impl TestEnv { - pub fn new(get_value: impl Fn(Height, Round) -> Option + Send + Sync + 'static) -> Self { - Self { - get_value: Box::new(get_value), - } - } -} - -#[async_trait] -impl Env for TestEnv { - async fn get_value(&self, height: Height, round: Round) -> Option { - (self.get_value)(height, round) - } -} diff --git a/Code/test/src/lib.rs b/Code/test/src/lib.rs index 8e09b3a31..181ccd7ce 100644 --- a/Code/test/src/lib.rs +++ b/Code/test/src/lib.rs @@ -3,7 +3,6 @@ #![cfg_attr(coverage_nightly, feature(coverage_attribute))] mod context; -mod env; mod height; mod proposal; mod signing; @@ -12,7 +11,6 @@ mod value; mod vote; pub use crate::context::*; -pub use crate::env::*; pub use crate::height::*; pub use crate::proposal::*; pub use crate::signing::*; diff --git a/Code/test/tests/driver.rs b/Code/test/tests/driver.rs index 81588032b..33aa4dcd7 100644 --- a/Code/test/tests/driver.rs +++ b/Code/test/tests/driver.rs @@ -2,12 +2,11 @@ use futures::executor::block_on; use rand::rngs::StdRng; use rand::SeedableRng; -use malachite_common::{Round, Timeout}; +use malachite_common::{Round, Timeout, TimeoutStep}; use malachite_driver::{Driver, Error, Event, Message, ProposerSelector, Validity}; use malachite_round::state::{RoundValue, State, Step}; use malachite_test::{ - Address, Height, PrivateKey, Proposal, TestContext, TestEnv, Validator, ValidatorSet, Value, - Vote, + Address, Height, PrivateKey, Proposal, TestContext, Validator, ValidatorSet, Value, Vote, }; struct TestStep { @@ -20,12 +19,13 @@ struct TestStep { fn to_input_msg(output: Message) -> Option> { match output { + Message::NewRound(round) => Some(Event::NewRound(round)), // Let's consider our own proposal to always be valid Message::Propose(p) => Some(Event::Proposal(p, Validity::Valid)), Message::Vote(v) => Some(Event::Vote(v)), Message::Decide(_, _) => None, Message::ScheduleTimeout(_) => None, - Message::NewRound(round) => Some(Event::NewRound(round)), + Message::GetValueAndScheduleTimeout(_, _) => None, } } @@ -64,7 +64,6 @@ fn driver_steps_proposer() { let value = Value::new(9999); let sel = RotateProposer::default(); - let env = TestEnv::new(move |_, _| Some(value)); let mut rng = StdRng::seed_from_u64(0x42); @@ -85,14 +84,31 @@ fn driver_steps_proposer() { let ctx = TestContext::new(my_sk.clone()); let vs = ValidatorSet::new(vec![v1, v2.clone(), v3.clone()]); - let mut driver = Driver::new(ctx, env, sel, Height::new(1), vs, my_addr); + let mut driver = Driver::new(ctx, sel, Height::new(1), vs, my_addr); let proposal = Proposal::new(Height::new(1), Round::new(0), value, Round::new(-1)); let steps = vec![ TestStep { - desc: "Start round 0, we are proposer, propose value", + desc: "Start round 0, we are proposer, ask for a value to propose", input_event: Some(Event::NewRound(Round::new(0))), + expected_output: Some(Message::GetValueAndScheduleTimeout( + Round::new(0), + Timeout::new(Round::new(0), TimeoutStep::Propose), + )), + expected_round: Round::new(0), + new_state: State { + height: Height::new(1), + round: Round::new(0), + step: Step::Propose, + proposal: None, + locked: None, + valid: None, + }, + }, + TestStep { + desc: "Feed a value to propose, propose that value", + input_event: Some(Event::ProposeValue(Round::new(0), value)), expected_output: Some(Message::Propose(proposal.clone())), expected_round: Round::new(0), new_state: State { @@ -272,7 +288,6 @@ fn driver_steps_not_proposer_valid() { let value = Value::new(9999); let sel = RotateProposer::default(); - let env = TestEnv::new(move |_, _| Some(value)); let mut rng = StdRng::seed_from_u64(0x42); @@ -294,7 +309,7 @@ fn driver_steps_not_proposer_valid() { let ctx = TestContext::new(my_sk.clone()); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); - let mut driver = Driver::new(ctx, env, sel, Height::new(1), vs, my_addr); + let mut driver = Driver::new(ctx, sel, Height::new(1), vs, my_addr); let proposal = Proposal::new(Height::new(1), Round::new(0), value, Round::new(-1)); @@ -481,7 +496,6 @@ fn driver_steps_not_proposer_invalid() { let value = Value::new(9999); let sel = RotateProposer::default(); - let env = TestEnv::new(move |_, _| Some(value)); let mut rng = StdRng::seed_from_u64(0x42); @@ -503,7 +517,7 @@ fn driver_steps_not_proposer_invalid() { let ctx = TestContext::new(my_sk.clone()); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); - let mut driver = Driver::new(ctx, env, sel, Height::new(1), vs, my_addr); + let mut driver = Driver::new(ctx, sel, Height::new(1), vs, my_addr); let proposal = Proposal::new(Height::new(1), Round::new(0), value, Round::new(-1)); @@ -628,7 +642,6 @@ fn driver_steps_not_proposer_timeout_multiple_rounds() { let value = Value::new(9999); let sel = RotateProposer::default(); - let env = TestEnv::new(move |_, _| Some(value)); let mut rng = StdRng::seed_from_u64(0x42); @@ -650,7 +663,7 @@ fn driver_steps_not_proposer_timeout_multiple_rounds() { let ctx = TestContext::new(my_sk.clone()); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); - let mut driver = Driver::new(ctx, env, sel, Height::new(1), vs, my_addr); + let mut driver = Driver::new(ctx, sel, Height::new(1), vs, my_addr); let steps = vec![ // Start round 0, we, v3, are not the proposer @@ -842,7 +855,6 @@ fn driver_steps_not_proposer_timeout_multiple_rounds() { #[test] fn driver_steps_no_value_to_propose() { // No value to propose - let env = TestEnv::new(|_, _| None); let mut rng = StdRng::seed_from_u64(0x42); @@ -861,18 +873,22 @@ fn driver_steps_no_value_to_propose() { let sel = FixedProposer::new(v1.address); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); - let mut driver = Driver::new(ctx, env, sel, Height::new(1), vs, my_addr); + let mut driver = Driver::new(ctx, sel, Height::new(1), vs, my_addr); - let output = block_on(driver.execute(Event::NewRound(Round::new(0)))); - assert_eq!(output, Err(Error::NoValueToPropose)); + let output = + block_on(driver.execute(Event::NewRound(Round::new(0)))).expect("execute succeeded"); + + assert_eq!( + output, + Some(Message::GetValueAndScheduleTimeout( + Round::new(0), + Timeout::propose(Round::new(0)) + )) + ); } #[test] fn driver_steps_proposer_not_found() { - let value = Value::new(9999); - - let env = TestEnv::new(move |_, _| Some(value)); - let mut rng = StdRng::seed_from_u64(0x42); let sk1 = PrivateKey::generate(&mut rng); @@ -892,7 +908,7 @@ fn driver_steps_proposer_not_found() { let sel = FixedProposer::new(v1.address); let vs = ValidatorSet::new(vec![v2.clone(), v3.clone()]); - let mut driver = Driver::new(ctx, env, sel, Height::new(1), vs, my_addr); + let mut driver = Driver::new(ctx, sel, Height::new(1), vs, my_addr); let output = block_on(driver.execute(Event::NewRound(Round::new(0)))); assert_eq!(output, Err(Error::ProposerNotFound(v1.address))); @@ -902,8 +918,6 @@ fn driver_steps_proposer_not_found() { fn driver_steps_validator_not_found() { let value = Value::new(9999); - let env = TestEnv::new(move |_, _| Some(value)); - let mut rng = StdRng::seed_from_u64(0x42); let sk1 = PrivateKey::generate(&mut rng); @@ -922,7 +936,7 @@ fn driver_steps_validator_not_found() { // We omit v2 from the validator set let vs = ValidatorSet::new(vec![v1.clone(), v3.clone()]); - let mut driver = Driver::new(ctx, env, sel, Height::new(1), vs, my_addr); + let mut driver = Driver::new(ctx, sel, Height::new(1), vs, my_addr); // Start new round block_on(driver.execute(Event::NewRound(Round::new(0)))).expect("execute succeeded"); @@ -939,8 +953,6 @@ fn driver_steps_validator_not_found() { fn driver_steps_invalid_signature() { let value = Value::new(9999); - let env = TestEnv::new(move |_, _| Some(value)); - let mut rng = StdRng::seed_from_u64(0x42); let sk1 = PrivateKey::generate(&mut rng); @@ -957,7 +969,7 @@ fn driver_steps_invalid_signature() { let sel = FixedProposer::new(v1.address); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); - let mut driver = Driver::new(ctx, env, sel, Height::new(1), vs, my_addr); + let mut driver = Driver::new(ctx, sel, Height::new(1), vs, my_addr); // Start new round block_on(driver.execute(Event::NewRound(Round::new(0)))).expect("execute succeeded"); diff --git a/Code/test/tests/round.rs b/Code/test/tests/round.rs index 34b02d566..2cd077e32 100644 --- a/Code/test/tests/round.rs +++ b/Code/test/tests/round.rs @@ -21,11 +21,19 @@ fn test_propose() { let data = RoundData::new(round, &height, &ADDRESS); - let transition = apply_event(state.clone(), &data, Event::NewRoundProposer(value)); + let transition = apply_event(state.clone(), &data, Event::NewRoundProposer); state.step = Step::Propose; assert_eq!(transition.next_state, state); + assert_eq!( + transition.message.unwrap(), + Message::get_value_and_schedule_timeout(round, TimeoutStep::Propose) + ); + let transition = apply_event(transition.next_state, &data, Event::ProposeValue(value)); + + state.step = Step::Propose; + assert_eq!(transition.next_state, state); assert_eq!( transition.message.unwrap(), Message::proposal(Height::new(10), Round::new(0), Value::new(42), Round::Nil)