Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(code): Do not cancel prevote/precommit timeout when threshold is reached #208

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 7 additions & 46 deletions code/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};

use malachite_common::{
Context, Height, NilOrVal, Proposal, Round, SignedBlockPart, SignedProposal, SignedVote,
Timeout, TimeoutStep, Validator, ValidatorSet, Value, ValueId, Vote, VoteType,
Context, Height, Proposal, Round, SignedBlockPart, SignedProposal, SignedVote, Timeout,
TimeoutStep, Validator, ValidatorSet, Value, Vote,
};
use malachite_driver::Driver;
use malachite_driver::Input as DriverInput;
Expand All @@ -22,7 +22,7 @@ use malachite_network::Msg as NetworkMsg;
use malachite_network::PeerId;
use malachite_proto as proto;
use malachite_proto::Protobuf;
use malachite_vote::{Threshold, ThresholdParams};
use malachite_vote::ThresholdParams;

use crate::gossip::Msg as GossipMsg;
use crate::host::{LocallyProposedValue, Msg as HostMsg, ReceivedProposedValue};
Expand Down Expand Up @@ -63,10 +63,7 @@ pub enum Msg<Ctx: Context> {
TimeoutElapsed(Timeout),
SendDriverInput(DriverInput<Ctx>),
Decided(Ctx::Height, Round, Ctx::Value),
ProcessDriverOutputs(
Vec<DriverOutput<Ctx>>,
Option<(VoteType, Round, NilOrVal<ValueId<Ctx>>)>,
),
ProcessDriverOutputs(Vec<DriverOutput<Ctx>>),
// The proposal builder has built a value and can be used in a new proposal consensus message
ProposeValue(Ctx::Height, Round, Option<Ctx::Value>),
// The proposal builder has build a new block part, needs to be signed and gossiped by consensus
Expand Down Expand Up @@ -330,57 +327,22 @@ where
}
}

let check_threshold = if let DriverInput::Vote(vote) = &input {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always felt like this was a big hack when I added it, so I am glad we can actually likely remove it altogether.

let round = Vote::<Ctx>::round(vote);
let value = Vote::<Ctx>::value(vote);

Some((vote.vote_type(), round, value.clone()))
} else {
None
};

let outputs = state
.driver
.process(input)
.map_err(|e| format!("Driver failed to process input: {e}"))?;

myself.cast(Msg::ProcessDriverOutputs(outputs, check_threshold))?;
myself.cast(Msg::ProcessDriverOutputs(outputs))?;

Ok(())
}

async fn process_driver_outputs(
&self,
outputs: Vec<DriverOutput<Ctx>>,
check_threshold: Option<(VoteType, Round, NilOrVal<ValueId<Ctx>>)>,
myself: ActorRef<Msg<Ctx>>,
state: &mut State<Ctx>,
) -> Result<(), ActorProcessingErr> {
// When we receive a vote, check if we've gotten +2/3 votes for the value we just received a vote for,
// if so then cancel the corresponding timeout.
if let Some((vote_type, round, value)) = check_threshold {
let threshold = match value {
NilOrVal::Nil => Threshold::Nil,
NilOrVal::Val(value) => Threshold::Value(value),
};

let votes = state.driver.votes();

if votes.is_threshold_met(&round, vote_type, threshold.clone()) {
let timeout = match vote_type {
VoteType::Prevote => Timeout::prevote(round),
VoteType::Precommit => Timeout::precommit(round),
};

info!("Threshold met for {threshold:?} at round {round}, cancelling {timeout}");
// TODO - check on this. For L47 (PrecommitAny) the spec says:
// upon 2f + 1 (PRECOMMIT, hp, roundp, *) for the first time do
// schedule OnTimeoutPrecommit(hp , roundp) to be executed after timeoutPrecommit(roundp)
// If we cancel the timeout we will not move to next round
state.timers.cast(TimersMsg::CancelTimeout(timeout))?;
}
}

for output in outputs {
let next = self
.handle_driver_output(output, myself.clone(), state)
Expand Down Expand Up @@ -757,9 +719,8 @@ where
self.send_driver_input(input, myself, state).await?;
}

Msg::ProcessDriverOutputs(outputs, check_threshold) => {
self.process_driver_outputs(outputs, check_threshold, myself, state)
.await?;
Msg::ProcessDriverOutputs(outputs) => {
self.process_driver_outputs(outputs, myself, state).await?;
}

Msg::BuilderBlockPart(block_part) => {
Expand Down
Loading