Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code: Multiplexer refactoring #110

Merged
merged 9 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Code/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[alias]
mbt = "nextest run -p malachite-itf --all-features"
integration = "nextest run --workspace --exclude malachite-itf"
mbt = "nextest run -p malachite-itf --all-features --no-fail-fast"
integration = "nextest run --workspace --exclude malachite-itf --no-fail-fast"
132 changes: 24 additions & 108 deletions Code/driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ use alloc::boxed::Box;
use core::fmt;

use malachite_common::{
Context, Proposal, Round, SignedVote, Timeout, TimeoutStep, Validator, ValidatorSet, Value,
Vote, VoteType,
Context, Proposal, Round, SignedVote, Timeout, TimeoutStep, Validator, ValidatorSet, Vote,
};
use malachite_round::input::Input as RoundInput;
use malachite_round::output::Output as RoundOutput;
use malachite_round::state::{State as RoundState, Step};
use malachite_round::state::State as RoundState;
use malachite_round::state_machine::Info;
use malachite_vote::keeper::Output as VoteKeeperOutput;
use malachite_vote::keeper::VoteKeeper;
use malachite_vote::Threshold;
use malachite_vote::ThresholdParams;

use crate::input::Input;
Expand Down Expand Up @@ -160,6 +157,7 @@ where
} else {
self.round_state = RoundState::new(height, round);
}

self.apply_input(round, RoundInput::NewRound)
}

Expand All @@ -176,94 +174,18 @@ where
proposal: Ctx::Proposal,
validity: Validity,
) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
// Check that there is an ongoing round
if self.round_state.round == Round::Nil {
return Ok(None);
}

// Check that the proposal is for the current height
if self.round_state.height != proposal.height() {
return Ok(None);
}

self.proposals.insert(proposal.clone());

let polka_for_pol = self.votes.is_threshold_met(
&proposal.pol_round(),
VoteType::Prevote,
Threshold::Value(proposal.value().id()),
);

let polka_previous = proposal.pol_round().is_defined()
&& polka_for_pol
&& proposal.pol_round() < self.round_state.round;

// Handle invalid proposal
if !validity.is_valid() {
if self.round_state.step == Step::Propose {
if proposal.pol_round().is_nil() {
// L26
return self.apply_input(proposal.round(), RoundInput::InvalidProposal);
} else if polka_previous {
// L32
return self.apply_input(
proposal.round(),
RoundInput::InvalidProposalAndPolkaPrevious(proposal),
);
} else {
return Ok(None);
}
} else {
return Ok(None);
}
}

// We have a valid proposal.
// L49
// TODO - check if not already decided
if self.votes.is_threshold_met(
&proposal.round(),
VoteType::Precommit,
Threshold::Value(proposal.value().id()),
let round = proposal.round();

match mux::multiplex_proposal(
&self.round_state,
&self.votes,
&mut self.proposals,
proposal,
validity,
) {
return self.apply_input(
proposal.round(),
RoundInput::ProposalAndPrecommitValue(proposal),
);
Some(round_input) => self.apply_input(round, round_input),
None => Ok(None),
}

// If the proposal is for a different round, drop the proposal
if self.round() != proposal.round() {
return Ok(None);
}

let polka_for_current = self.votes.is_threshold_met(
&proposal.round(),
VoteType::Prevote,
Threshold::Value(proposal.value().id()),
);

let polka_current = polka_for_current && self.round_state.step >= Step::Prevote;

// L36
if polka_current {
return self.apply_input(
proposal.round(),
RoundInput::ProposalAndPolkaCurrent(proposal),
);
}

// L28
if self.round_state.step == Step::Propose && polka_previous {
// TODO: Check proposal vr is equal to threshold vr
return self.apply_input(
proposal.round(),
RoundInput::ProposalAndPolkaPrevious(proposal),
);
}

// TODO - Caller needs to store the proposal (valid or not) as the quorum (polka or commits) may be met later
self.apply_input(proposal.round(), RoundInput::Proposal(proposal))
}

fn apply_vote(
Expand All @@ -288,23 +210,20 @@ where
let vote_round = signed_vote.vote.round();
let current_round = self.round();

let Some(vote_output) =
let vote_output =
self.votes
.apply_vote(signed_vote.vote, validator.voting_power(), current_round)
else {
.apply_vote(signed_vote.vote, validator.voting_power(), current_round);

let Some(vote_output) = vote_output else {
return Ok(None);
};

let round_input = match vote_output {
VoteKeeperOutput::PolkaAny => RoundInput::PolkaAny,
VoteKeeperOutput::PolkaNil => RoundInput::PolkaNil,
VoteKeeperOutput::PolkaValue(v) => RoundInput::PolkaValue(v),
VoteKeeperOutput::PrecommitAny => RoundInput::PrecommitAny,
VoteKeeperOutput::PrecommitValue(v) => RoundInput::PrecommitValue(v),
VoteKeeperOutput::SkipRound(r) => RoundInput::SkipRound(r),
};
let round_input = mux::multiplex_on_vote_threshold(vote_output, &self.proposals);

self.apply_input(vote_round, round_input)
match round_input {
Some(input) => self.apply_input(vote_round, input),
None => Ok(None),
}
}

fn apply_timeout(&mut self, timeout: Timeout) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
Expand All @@ -329,11 +248,8 @@ where
let proposer = self.get_proposer(round_state.round)?;
let info = Info::new(input_round, &self.address, proposer.address());

// Multiplex the proposal if we have one already for the input round
let mux_input = mux::multiplex_proposal(input, input_round, &self.proposals);

// Apply the input to the round state machine
let transition = round_state.apply(&info, mux_input);
let transition = round_state.apply(&info, input);

let pending_step = transition.next_state.step;

Expand All @@ -345,7 +261,7 @@ where
&self.proposals,
);

dbg!(&pending_input);
println!("multiplex_on_step_change: {pending_input:?}");

self.pending_input = pending_input.map(|input| (input_round, input));
}
Expand Down
151 changes: 123 additions & 28 deletions Code/driver/src/mux.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,147 @@
use malachite_common::ValueId;
use malachite_common::{Context, Proposal, Round, Value, VoteType};
use malachite_round::input::Input as RoundInput;
use malachite_round::state::State as RoundState;
use malachite_round::state::Step;
use malachite_vote::keeper::Output as VoteKeeperOutput;
use malachite_vote::keeper::VoteKeeper;
use malachite_vote::Threshold;

use crate::proposals::Proposals;
use crate::Validity;

pub fn multiplex_proposal<Ctx>(
input: RoundInput<Ctx>,
input_round: Round,
proposals: &Proposals<Ctx>,
) -> RoundInput<Ctx>
round_state: &RoundState<Ctx>,
votekeeper: &VoteKeeper<Ctx>,
proposals: &mut Proposals<Ctx>,
proposal: Ctx::Proposal,
validity: Validity,
) -> Option<RoundInput<Ctx>>
where
Ctx: Context,
{
match input {
// Check if we have a proposal for the input round,
// if so, send `ProposalAndPolkaCurrent` instead of `PolkaAny`
// to the state machine.
RoundInput::PolkaValue(value_id) => {
let proposal = proposals.find(&value_id, |p| p.round() == input_round);

if let Some(proposal) = proposal {
assert_eq!(proposal.value().id(), value_id);
RoundInput::ProposalAndPolkaCurrent(proposal.clone())
// Check that there is an ongoing round
if round_state.round == Round::Nil {
return None;
}

// Check that the proposal is for the current height
if round_state.height != proposal.height() {
return None;
}

// Store the proposal
proposals.insert(proposal.clone());

let polka_for_pol = votekeeper.is_threshold_met(
&proposal.pol_round(),
VoteType::Prevote,
Threshold::Value(proposal.value().id()),
);

let polka_previous = proposal.pol_round().is_defined()
&& polka_for_pol
&& proposal.pol_round() < round_state.round;

// Handle invalid proposal
if !validity.is_valid() {
if round_state.step == Step::Propose {
if proposal.pol_round().is_nil() {
// L26
return Some(RoundInput::InvalidProposal);
} else if polka_previous {
// L32
return Some(RoundInput::InvalidProposalAndPolkaPrevious(
proposal.clone(),
));
} else {
RoundInput::PolkaAny
return None;
}
} else {
return None;
}
}

// Check if we have a proposal for the input round,
// if so, send `ProposalAndPrecommitValue` instead of `PrecommitAny`.
RoundInput::PrecommitValue(value_id) => {
let proposal = proposals.find(&value_id, |p| p.round() == input_round);
// We have a valid proposal.
// L49
// TODO - check if not already decided
if votekeeper.is_threshold_met(
&proposal.round(),
VoteType::Precommit,
Threshold::Value(proposal.value().id()),
) {
return Some(RoundInput::ProposalAndPrecommitValue(proposal.clone()));
}

if let Some(proposal) = proposal {
assert_eq!(proposal.value().id(), value_id);
RoundInput::ProposalAndPrecommitValue(proposal.clone())
} else {
RoundInput::PrecommitAny
// If the proposal is for a different round, drop the proposal
if round_state.round != proposal.round() {
return None;
}

let polka_for_current = votekeeper.is_threshold_met(
&proposal.round(),
VoteType::Prevote,
Threshold::Value(proposal.value().id()),
);

let polka_current = polka_for_current && round_state.step >= Step::Prevote;

// L36
if polka_current {
return Some(RoundInput::ProposalAndPolkaCurrent(proposal));
}

// L28
if round_state.step == Step::Propose && polka_previous {
// TODO: Check proposal vr is equal to threshold vr
return Some(RoundInput::ProposalAndPolkaPrevious(proposal));
}

Some(RoundInput::Proposal(proposal))
}

pub fn multiplex_on_vote_threshold<Ctx>(
new_threshold: VoteKeeperOutput<ValueId<Ctx>>,
proposals: &Proposals<Ctx>,
) -> Option<RoundInput<Ctx>>
where
Ctx: Context,
{
let proposal = proposals.all().next();

if let Some(proposal) = proposal {
match new_threshold {
VoteKeeperOutput::PolkaAny => Some(RoundInput::PolkaAny),
VoteKeeperOutput::PolkaNil => Some(RoundInput::PolkaNil),
VoteKeeperOutput::PolkaValue(v) => {
if v == proposal.value().id() {
Some(RoundInput::ProposalAndPolkaCurrent(proposal.clone()))
} else {
Some(RoundInput::PolkaAny)
}
}
VoteKeeperOutput::PrecommitAny => Some(RoundInput::PrecommitAny),
VoteKeeperOutput::PrecommitValue(v) => {
if v == proposal.value().id() {
Some(RoundInput::ProposalAndPrecommitValue(proposal.clone()))
} else {
Some(RoundInput::PrecommitAny)
}
}
VoteKeeperOutput::SkipRound(r) => Some(RoundInput::SkipRound(r)),
}
} else {
match new_threshold {
VoteKeeperOutput::PolkaAny => Some(RoundInput::PolkaAny),
VoteKeeperOutput::PolkaNil => Some(RoundInput::PolkaNil),
VoteKeeperOutput::PolkaValue(_) => Some(RoundInput::PolkaAny),
VoteKeeperOutput::PrecommitAny => Some(RoundInput::PrecommitAny),
VoteKeeperOutput::PrecommitValue(_) => Some(RoundInput::PrecommitAny),
VoteKeeperOutput::SkipRound(r) => Some(RoundInput::SkipRound(r)),
}

// Otherwise, just pass the input through.
_ => input,
}
}

pub fn multiplex_on_step_change<Ctx>(
pending_step: Step,
round: Round,
Expand All @@ -61,7 +157,6 @@ where
Step::Prevote => {
// TODO: What to do if multiple proposals?
let proposal = proposals.all().next();
dbg!(&proposal);

if has_polka_nil(votekeeper, round) {
Some(RoundInput::PolkaNil)
Expand Down
Loading
Loading