Skip to content

Commit

Permalink
code: Add multiplexer (#106)
Browse files Browse the repository at this point in the history
* Add tests (all failing) for polka occuring in propose step

* Report coverage even when tests fail

* Split event multiplexing into its own function

* Small refactor

* wip: Add mixer on step changes

* Add case in step change mixer for polka value

* Revert misakenly deleted comment

* Process pending inputs automatically after processing an input (#108)

* Cleanup

* Naming

* Multiplexer refactoring (#110)

* Sketch for mixer reorg

* Small cleanup

* No fail fast on tests

* Emit thresholds even when there are no proposals

* Debugging

* Fix emitted inputs when no proposal

* Cleanup

* Restore insertion of proposal

* Remove debug statements

---------

Co-authored-by: Anca Zamfir <anca@informal.systems>

* Only store a single proposal in the driver (#107)

* Only store a single proposal

* Check proposal round matches input round

* Remove dead code

* Turn `multiplex_` functions into methods of the `Driver`

* Some cleanup

* Re-enable `no_std` in driver

---------

Co-authored-by: Anca Zamfir <anca@informal.systems>
  • Loading branch information
romac and ancazamfir authored Dec 8, 2023
1 parent 759dfda commit ccdbe8a
Show file tree
Hide file tree
Showing 10 changed files with 520 additions and 414 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
run: cargo llvm-cov nextest --workspace --exclude malachite-itf --all-features --lcov --output-path lcov.info
run: cargo llvm-cov nextest --workspace --exclude malachite-itf --all-features --ignore-run-fail --lcov --output-path lcov.info
- name: Generate text report
run: cargo llvm-cov report
- name: Upload coverage to Codecov
Expand Down
4 changes: 2 additions & 2 deletions Code/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[alias]
mbt = "nextest run -p malachite-itf --all-features"
integration = "nextest run --workspace --exclude malachite-itf"
mbt = "nextest run -p malachite-itf --all-features --no-fail-fast"
integration = "nextest run --workspace --exclude malachite-itf --no-fail-fast"
206 changes: 61 additions & 145 deletions Code/driver/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use alloc::boxed::Box;
use alloc::vec;
use alloc::vec::Vec;
use core::fmt;

use malachite_common::{
Context, Proposal, Round, SignedVote, Timeout, TimeoutStep, Validator, ValidatorSet, Value,
Vote, VoteType,
Context, Proposal, Round, SignedVote, Timeout, TimeoutStep, Validator, ValidatorSet, Vote,
};
use malachite_round::input::Input as RoundInput;
use malachite_round::output::Output as RoundOutput;
use malachite_round::state::{State as RoundState, Step};
use malachite_round::state::State as RoundState;
use malachite_round::state_machine::Info;
use malachite_vote::keeper::Output as VoteKeeperOutput;
use malachite_vote::keeper::VoteKeeper;
use malachite_vote::Threshold;
use malachite_vote::ThresholdParams;

use crate::input::Input;
use crate::output::Output;
use crate::proposals::Proposals;
use crate::Error;
use crate::ProposerSelector;
use crate::Validity;
Expand All @@ -32,9 +30,10 @@ where
pub address: Ctx::Address,
pub validator_set: Ctx::ValidatorSet,

pub votes: VoteKeeper<Ctx>,
pub vote_keeper: VoteKeeper<Ctx>,
pub round_state: RoundState<Ctx>,
pub proposals: Proposals<Ctx>,
pub proposal: Option<Ctx::Proposal>,
pub pending_input: Option<(Round, RoundInput<Ctx>)>,
}

impl<Ctx> Driver<Ctx>
Expand All @@ -57,9 +56,10 @@ where
proposer_selector: Box::new(proposer_selector),
address,
validator_set,
votes,
vote_keeper: votes,
round_state: RoundState::default(),
proposals: Proposals::new(),
proposal: None,
pending_input: None,
}
}

Expand All @@ -84,13 +84,33 @@ where
Ok(proposer)
}

pub async fn execute(&mut self, msg: Input<Ctx>) -> Result<Option<Output<Ctx>>, Error<Ctx>> {
pub async fn process(&mut self, msg: Input<Ctx>) -> Result<Vec<Output<Ctx>>, Error<Ctx>> {
let round_output = match self.apply(msg).await? {
Some(msg) => msg,
None => return Ok(None),
None => return Ok(Vec::new()),
};

let output = match round_output {
let output = self.lift_output(round_output);
let mut outputs = vec![output];

self.process_pending(&mut outputs)?;

Ok(outputs)
}

fn process_pending(&mut self, outputs: &mut Vec<Output<Ctx>>) -> Result<(), Error<Ctx>> {
while let Some((round, input)) = self.pending_input.take() {
if let Some(round_output) = self.apply_input(round, input)? {
let output = self.lift_output(round_output);
outputs.push(output);
};
}

Ok(())
}

fn lift_output(&mut self, round_output: RoundOutput<Ctx>) -> Output<Ctx> {
match round_output {
RoundOutput::NewRound(round) => Output::NewRound(self.height().clone(), round),

RoundOutput::Proposal(proposal) => {
Expand All @@ -113,9 +133,7 @@ where
// TODO: update the state
Output::Decide(value.round, value.value)
}
};

Ok(Some(output))
}
}

async fn apply(&mut self, input: Input<Ctx>) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
Expand All @@ -139,6 +157,7 @@ where
} else {
self.round_state = RoundState::new(height, round);
}

self.apply_input(round, RoundInput::NewRound)
}

Expand All @@ -155,94 +174,12 @@ where
proposal: Ctx::Proposal,
validity: Validity,
) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
// Check that there is an ongoing round
if self.round_state.round == Round::Nil {
return Ok(None);
}

// Check that the proposal is for the current height
if self.round_state.height != proposal.height() {
return Ok(None);
}

self.proposals.insert(proposal.clone());
let round = proposal.round();

let polka_for_pol = self.votes.is_threshold_met(
&proposal.pol_round(),
VoteType::Prevote,
Threshold::Value(proposal.value().id()),
);

let polka_previous = proposal.pol_round().is_defined()
&& polka_for_pol
&& proposal.pol_round() < self.round_state.round;

// Handle invalid proposal
if !validity.is_valid() {
if self.round_state.step == Step::Propose {
if proposal.pol_round().is_nil() {
// L26
return self.apply_input(proposal.round(), RoundInput::InvalidProposal);
} else if polka_previous {
// L32
return self.apply_input(
proposal.round(),
RoundInput::InvalidProposalAndPolkaPrevious(proposal),
);
} else {
return Ok(None);
}
} else {
return Ok(None);
}
match self.multiplex_proposal(proposal, validity) {
Some(round_input) => self.apply_input(round, round_input),
None => Ok(None),
}

// We have a valid proposal.
// L49
// TODO - check if not already decided
if self.votes.is_threshold_met(
&proposal.round(),
VoteType::Precommit,
Threshold::Value(proposal.value().id()),
) {
return self.apply_input(
proposal.round(),
RoundInput::ProposalAndPrecommitValue(proposal),
);
}

// If the proposal is for a different round, drop the proposal
if self.round() != proposal.round() {
return Ok(None);
}

let polka_for_current = self.votes.is_threshold_met(
&proposal.round(),
VoteType::Prevote,
Threshold::Value(proposal.value().id()),
);

let polka_current = polka_for_current && self.round_state.step >= Step::Prevote;

// L36
if polka_current {
return self.apply_input(
proposal.round(),
RoundInput::ProposalAndPolkaCurrent(proposal),
);
}

// L28
if self.round_state.step == Step::Propose && polka_previous {
// TODO: Check proposal vr is equal to threshold vr
return self.apply_input(
proposal.round(),
RoundInput::ProposalAndPolkaPrevious(proposal),
);
}

// TODO - Caller needs to store the proposal (valid or not) as the quorum (polka or commits) may be met later
self.apply_input(proposal.round(), RoundInput::Proposal(proposal))
}

fn apply_vote(
Expand All @@ -267,23 +204,20 @@ where
let vote_round = signed_vote.vote.round();
let current_round = self.round();

let Some(vote_output) =
self.votes
.apply_vote(signed_vote.vote, validator.voting_power(), current_round)
else {
let vote_output =
self.vote_keeper
.apply_vote(signed_vote.vote, validator.voting_power(), current_round);

let Some(vote_output) = vote_output else {
return Ok(None);
};

let round_input = match vote_output {
VoteKeeperOutput::PolkaAny => RoundInput::PolkaAny,
VoteKeeperOutput::PolkaNil => RoundInput::PolkaNil,
VoteKeeperOutput::PolkaValue(v) => RoundInput::PolkaValue(v),
VoteKeeperOutput::PrecommitAny => RoundInput::PrecommitAny,
VoteKeeperOutput::PrecommitValue(v) => RoundInput::PrecommitValue(v),
VoteKeeperOutput::SkipRound(r) => RoundInput::SkipRound(r),
};
let round_input = self.multiplex_vote_threshold(vote_output);

self.apply_input(vote_round, round_input)
match round_input {
Some(input) => self.apply_input(vote_round, input),
None => Ok(None),
}
}

fn apply_timeout(&mut self, timeout: Timeout) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
Expand All @@ -303,39 +237,21 @@ where
input: RoundInput<Ctx>,
) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
let round_state = core::mem::take(&mut self.round_state);
let proposer = self.get_proposer(round_state.round)?;

let data = Info::new(input_round, &self.address, proposer.address());
let current_step = round_state.step;

// Multiplex the event with the round state.
let mux_input = match input {
RoundInput::PolkaValue(value_id) => {
let proposal = self.proposals.find(&value_id, |p| p.round() == input_round);

if let Some(proposal) = proposal {
assert_eq!(proposal.value().id(), value_id);
RoundInput::ProposalAndPolkaCurrent(proposal.clone())
} else {
RoundInput::PolkaAny
}
}
let proposer = self.get_proposer(round_state.round)?;
let info = Info::new(input_round, &self.address, proposer.address());

RoundInput::PrecommitValue(value_id) => {
let proposal = self.proposals.find(&value_id, |p| p.round() == input_round);
// Apply the input to the round state machine
let transition = round_state.apply(&info, input);

if let Some(proposal) = proposal {
assert_eq!(proposal.value().id(), value_id);
RoundInput::ProposalAndPrecommitValue(proposal.clone())
} else {
RoundInput::PrecommitAny
}
}
let pending_step = transition.next_state.step;

_ => input,
};
if current_step != pending_step {
let pending_input = self.multiplex_step_change(pending_step, input_round);

// Apply the input to the round state machine
let transition = round_state.apply(&data, mux_input);
self.pending_input = pending_input.map(|input| (input_round, input));
}

// Update state
self.round_state = transition.next_state;
Expand All @@ -354,8 +270,8 @@ where
f.debug_struct("Driver")
.field("address", &self.address)
.field("validator_set", &self.validator_set)
.field("votes", &self.votes)
.field("proposals", &self.proposals.proposals)
.field("votes", &self.vote_keeper)
.field("proposal", &self.proposal)
.field("round_state", &self.round_state)
.finish()
}
Expand Down
2 changes: 1 addition & 1 deletion Code/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ extern crate alloc;
mod driver;
mod error;
mod input;
mod mux;
mod output;
mod proposals;
mod proposer;
mod util;

Expand Down
Loading

0 comments on commit ccdbe8a

Please sign in to comment.