Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code: Add multiplexer #106

Merged
merged 17 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
run: cargo llvm-cov nextest --workspace --exclude malachite-itf --all-features --lcov --output-path lcov.info
run: cargo llvm-cov nextest --workspace --exclude malachite-itf --all-features --ignore-run-fail --lcov --output-path lcov.info
- name: Generate text report
run: cargo llvm-cov report
- name: Upload coverage to Codecov
Expand Down
72 changes: 42 additions & 30 deletions Code/driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use malachite_vote::Threshold;
use malachite_vote::ThresholdParams;

use crate::input::Input;
use crate::mixer;
use crate::output::Output;
use crate::proposals::Proposals;
use crate::Error;
Expand All @@ -35,6 +36,7 @@ where
pub votes: VoteKeeper<Ctx>,
pub round_state: RoundState<Ctx>,
pub proposals: Proposals<Ctx>,
pub pending_input: Option<(Round, RoundInput<Ctx>)>,
}

impl<Ctx> Driver<Ctx>
Expand All @@ -60,6 +62,7 @@ where
votes,
round_state: RoundState::default(),
proposals: Proposals::new(),
pending_input: None,
}
}

Expand All @@ -84,13 +87,32 @@ where
Ok(proposer)
}

// TODO: Rename to `process`
pub async fn execute(&mut self, msg: Input<Ctx>) -> Result<Option<Output<Ctx>>, Error<Ctx>> {
let round_output = match self.apply(msg).await? {
Some(msg) => msg,
None => return Ok(None),
};

let output = match round_output {
let output = self.round_output_to_output(round_output);
Ok(Some(output))
}

pub fn process_pending(&mut self) -> Result<Vec<Output<Ctx>>, Error<Ctx>> {
let mut outputs = Vec::new();

while let Some((round, input)) = self.pending_input.take() {
if let Some(round_output) = self.apply_input(round, input)? {
let output = self.round_output_to_output(round_output);
outputs.push(output);
};
}

Ok(outputs)
}

fn round_output_to_output(&mut self, round_output: RoundOutput<Ctx>) -> Output<Ctx> {
match round_output {
RoundOutput::NewRound(round) => Output::NewRound(self.height().clone(), round),

RoundOutput::Proposal(proposal) => {
Expand All @@ -113,9 +135,7 @@ where
// TODO: update the state
Output::Decide(value.round, value.value)
}
};

Ok(Some(output))
}
}

async fn apply(&mut self, input: Input<Ctx>) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
Expand Down Expand Up @@ -303,39 +323,31 @@ where
input: RoundInput<Ctx>,
) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
let round_state = core::mem::take(&mut self.round_state);
let proposer = self.get_proposer(round_state.round)?;
let current_step = round_state.step;

let data = Info::new(input_round, &self.address, proposer.address());
let proposer = self.get_proposer(round_state.round)?;
let info = Info::new(input_round, &self.address, proposer.address());

// Multiplex the event with the round state.
let mux_input = match input {
RoundInput::PolkaValue(value_id) => {
let proposal = self.proposals.find(&value_id, |p| p.round() == input_round);
// Multiplex the proposal if we have one already for the input round
let mux_input = mixer::multiplex_proposal(input, input_round, &self.proposals);

if let Some(proposal) = proposal {
assert_eq!(proposal.value().id(), value_id);
RoundInput::ProposalAndPolkaCurrent(proposal.clone())
} else {
RoundInput::PolkaAny
}
}
// Apply the input to the round state machine
let transition = round_state.apply(&info, mux_input);

RoundInput::PrecommitValue(value_id) => {
let proposal = self.proposals.find(&value_id, |p| p.round() == input_round);
let pending_step = transition.next_state.step;

if let Some(proposal) = proposal {
assert_eq!(proposal.value().id(), value_id);
RoundInput::ProposalAndPrecommitValue(proposal.clone())
} else {
RoundInput::PrecommitAny
}
}
if current_step != pending_step {
let pending_input = mixer::multiplex_on_step_change(
pending_step,
input_round,
&self.votes,
&self.proposals,
);

_ => input,
};
dbg!(&pending_input);

// Apply the input to the round state machine
let transition = round_state.apply(&data, mux_input);
self.pending_input = pending_input.map(|input| (input_round, input));
}

// Update state
self.round_state = transition.next_state;
Expand Down
3 changes: 2 additions & 1 deletion Code/driver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Driver for the state machine of the Malachite consensus engine

#![no_std]
// #![no_std]
romac marked this conversation as resolved.
Show resolved Hide resolved
#![forbid(unsafe_code)]
#![deny(unused_crate_dependencies, trivial_casts, trivial_numeric_casts)]
#![warn(
Expand All @@ -17,6 +17,7 @@ extern crate alloc;
mod driver;
mod error;
mod input;
mod mixer;
romac marked this conversation as resolved.
Show resolved Hide resolved
mod output;
mod proposals;
mod proposer;
Expand Down
96 changes: 96 additions & 0 deletions Code/driver/src/mixer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use malachite_common::{Context, Proposal, Round, Value, VoteType};
use malachite_round::input::Input as RoundInput;
use malachite_round::state::Step;
use malachite_vote::keeper::VoteKeeper;
use malachite_vote::Threshold;

use crate::proposals::Proposals;

pub fn multiplex_proposal<Ctx>(
input: RoundInput<Ctx>,
romac marked this conversation as resolved.
Show resolved Hide resolved
input_round: Round,
proposals: &Proposals<Ctx>,
) -> RoundInput<Ctx>
where
Ctx: Context,
{
match input {
// Check if we have a proposal for the input round,
// if so, send `ProposalAndPolkaCurrent` instead of `PolkaAny`
// to the state machine.
RoundInput::PolkaValue(value_id) => {
let proposal = proposals.find(&value_id, |p| p.round() == input_round);

if let Some(proposal) = proposal {
assert_eq!(proposal.value().id(), value_id);
RoundInput::ProposalAndPolkaCurrent(proposal.clone())
} else {
RoundInput::PolkaAny
}
}

// Check if we have a proposal for the input round,
// if so, send `ProposalAndPrecommitValue` instead of `PrecommitAny`.
RoundInput::PrecommitValue(value_id) => {
let proposal = proposals.find(&value_id, |p| p.round() == input_round);

if let Some(proposal) = proposal {
assert_eq!(proposal.value().id(), value_id);
RoundInput::ProposalAndPrecommitValue(proposal.clone())
} else {
RoundInput::PrecommitAny
}
}

// Otherwise, just pass the input through.
_ => input,
}
}
pub fn multiplex_on_step_change<Ctx>(
pending_step: Step,
round: Round,
votekeeper: &VoteKeeper<Ctx>,
proposals: &Proposals<Ctx>,
) -> Option<RoundInput<Ctx>>
where
Ctx: Context,
{
match pending_step {
Step::NewRound => None, // Some(RoundInput::NewRound),

Step::Prevote => {
// TODO: What to do if multiple proposals?
let proposal = proposals.all().next();
dbg!(&proposal);

// TODO: Cleanup
let has_polka_value = || {
proposal.and_then(|p| {
if votekeeper.is_threshold_met(
&round,
VoteType::Prevote,
Threshold::Value(p.value().id()),
) {
Some(p)
} else {
None
}
})
};

if votekeeper.is_threshold_met(&round, VoteType::Prevote, Threshold::Nil) {
Some(RoundInput::PolkaNil)
} else if let Some(proposal) = has_polka_value() {
Some(RoundInput::ProposalAndPolkaCurrent(proposal.clone()))
} else if votekeeper.is_threshold_met(&round, VoteType::Prevote, Threshold::Any) {
Some(RoundInput::PolkaAny)
} else {
None
}
}

Step::Propose => None,
Step::Precommit => None,
Step::Commit => None,
}
}
4 changes: 4 additions & 0 deletions Code/driver/src/proposals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ where
self.proposals.entry(value_id).or_default().push(proposal);
}

pub fn all(&self) -> impl Iterator<Item = &Ctx::Proposal> {
self.proposals.values().flatten()
}

pub fn find(
&self,
value_id: &ValueId<Ctx>,
Expand Down
12 changes: 10 additions & 2 deletions Code/test/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ pub fn prevote_input(addr: &Address, sk: &PrivateKey) -> Input<TestContext> {
)
}

pub fn prevote_nil_input(addr: &Address, sk: &PrivateKey) -> Input<TestContext> {
Input::Vote(Vote::new_prevote(Height::new(1), Round::new(0), None, *addr).signed(sk))
}

pub fn prevote_input_at(round: Round, addr: &Address, sk: &PrivateKey) -> Input<TestContext> {
let value = Value::new(9999);

Expand All @@ -126,9 +130,13 @@ pub fn precommit_output(
))
}

pub fn precommit_nil_output(addr: &Address, sk: &PrivateKey) -> Option<Output<TestContext>> {
pub fn precommit_nil_output(
round: Round,
addr: &Address,
sk: &PrivateKey,
) -> Option<Output<TestContext>> {
Some(Output::Vote(
Vote::new_precommit(Height::new(1), Round::new(0), None, *addr).signed(sk),
Vote::new_precommit(Height::new(1), round, None, *addr).signed(sk),
))
}

Expand Down
Loading
Loading