Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(code): Add MovedToStep output to the driver for consensus to track step time #259

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ where
match output {
DriverOutput::NewRound(height, round) => {
info!("Starting round {round} at height {height}");
self.metrics.round.set(round.as_i64());

let validator_set = &state.driver.validator_set;
let proposer = self.get_proposer(height, round, validator_set).await?;
Expand All @@ -545,6 +546,12 @@ where
)))
}

DriverOutput::MovedToStep(height, round, step) => {
info!("Moved to step {step:?} at height {height} and round {round}");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where one would track how long a step took.

Ok(Next::None)
}

DriverOutput::Propose(proposal) => {
info!(
"Proposing value with id: {}, at round {}",
Expand Down Expand Up @@ -766,7 +773,8 @@ where
self.metrics.block_start();

let round = Round::new(0);
info!("Starting height {height} at round {round}");
info!("Starting new height {height}");
self.metrics.height.set(height.as_u64() as i64);

let validator_set = &state.driver.validator_set;
let proposer = self.get_proposer(height, round, validator_set).await?;
Expand Down
22 changes: 21 additions & 1 deletion code/crates/actors/src/consensus/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ pub struct Inner {
/// Number of connected peers, ie. for each consensus node, how many peers is it connected to)
pub connected_peers: Gauge,

/// Current height
pub height: Gauge,

/// Current round
pub round: Gauge,

/// Internal state for measuring time taken to finalize a block
instant_block_started: Arc<AtomicInstant>,
}
Expand All @@ -46,11 +52,13 @@ impl Metrics {
Self(Arc::new(Inner {
finalized_blocks: Counter::default(),
finalized_txes: Counter::default(),
time_per_block: Histogram::new(linear_buckets(0.0, 1.0, 20)),
time_per_block: Histogram::new(linear_buckets(1.0, 0.1, 20)),
block_tx_count: Histogram::new(linear_buckets(0.0, 32.0, 128)),
block_size_bytes: Histogram::new(linear_buckets(0.0, 64.0 * 1024.0, 128)),
rounds_per_block: Histogram::new(linear_buckets(0.0, 1.0, 20)),
connected_peers: Gauge::default(),
height: Gauge::default(),
round: Gauge::default(),
instant_block_started: Arc::new(AtomicInstant::empty()),
}))
}
Expand Down Expand Up @@ -100,6 +108,18 @@ impl Metrics {
"Number of connected peers, ie. for each consensus node, how many peers is it connected to",
metrics.connected_peers.clone(),
);

registry.register(
"height",
"Current height",
metrics.height.clone(),
);

registry.register(
"round",
"Current round",
metrics.round.clone(),
);
});

metrics
Expand Down
63 changes: 41 additions & 22 deletions code/crates/driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,18 @@ where

/// Process the given input, returning the outputs to be broadcast to the network.
pub fn process(&mut self, msg: Input<Ctx>) -> Result<Vec<Output<Ctx>>, Error<Ctx>> {
let round_output = match self.apply(msg)? {
Some(msg) => msg,
None => return Ok(Vec::new()),
};
let round_outputs = self.apply(msg)?;

if round_outputs.is_empty() {
return Ok(vec![]);
}

let mut outputs = vec![];

// Lift the round state machine output to one or more driver outputs
self.lift_output(round_output, &mut outputs);
// Lift the round state machine outputs to one or more driver outputs
for round_output in round_outputs {
self.lift_output(round_output, &mut outputs);
}

// Apply the pending inputs, if any, and lift their outputs
self.process_pending(&mut outputs)?;
Expand All @@ -148,9 +151,9 @@ where
/// Process the pending input, if any.
fn process_pending(&mut self, outputs: &mut Vec<Output<Ctx>>) -> Result<(), Error<Ctx>> {
while let Some((round, input)) = self.pending_input.take() {
if let Some(round_output) = self.apply_input(round, input)? {
for round_output in self.apply_input(round, input)? {
self.lift_output(round_output, outputs);
};
}
}

Ok(())
Expand All @@ -173,11 +176,15 @@ where
}

RoundOutput::Decision(value) => outputs.push(Output::Decide(value.round, value.value)),

RoundOutput::MovedToStep(height, round, step) => {
outputs.push(Output::MovedToStep(height, round, step))
}
}
}

/// Apply the given input to the state machine, returning the output, if any.
fn apply(&mut self, input: Input<Ctx>) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
fn apply(&mut self, input: Input<Ctx>) -> Result<Vec<RoundOutput<Ctx>>, Error<Ctx>> {
match input {
Input::NewRound(height, round, proposer) => {
self.apply_new_round(height, round, proposer)
Expand All @@ -194,7 +201,7 @@ where
height: Ctx::Height,
round: Round,
proposer: Ctx::Address,
) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
) -> Result<Vec<RoundOutput<Ctx>>, Error<Ctx>> {
if self.height() == height {
// If it's a new round for same height, just reset the round, keep the valid and locked values
self.round_state.round = round;
Expand All @@ -212,15 +219,15 @@ where
&mut self,
round: Round,
value: Ctx::Value,
) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
) -> Result<Vec<RoundOutput<Ctx>>, Error<Ctx>> {
self.apply_input(round, RoundInput::ProposeValue(value))
}

fn apply_proposal(
&mut self,
proposal: Ctx::Proposal,
validity: Validity,
) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
) -> Result<Vec<RoundOutput<Ctx>>, Error<Ctx>> {
if self.height() != proposal.height() {
return Err(Error::InvalidProposalHeight {
proposal_height: proposal.height(),
Expand All @@ -232,11 +239,11 @@ where

match self.multiplex_proposal(proposal, validity) {
Some(round_input) => self.apply_input(round, round_input),
None => Ok(None),
None => Ok(Vec::new()),
}
}

fn apply_vote(&mut self, vote: Ctx::Vote) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
fn apply_vote(&mut self, vote: Ctx::Vote) -> Result<Vec<RoundOutput<Ctx>>, Error<Ctx>> {
if self.height() != vote.height() {
return Err(Error::InvalidVoteHeight {
vote_height: vote.height(),
Expand All @@ -257,21 +264,21 @@ where
.apply_vote(vote, validator.voting_power(), current_round);

let Some(vote_output) = vote_output else {
return Ok(None);
return Ok(Vec::new());
};

let round_input = self.multiplex_vote_threshold(vote_output);
self.apply_input(vote_round, round_input)
}

fn apply_timeout(&mut self, timeout: Timeout) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
fn apply_timeout(&mut self, timeout: Timeout) -> Result<Vec<RoundOutput<Ctx>>, Error<Ctx>> {
let input = match timeout.step {
TimeoutStep::Propose => RoundInput::TimeoutPropose,
TimeoutStep::Prevote => RoundInput::TimeoutPrevote,
TimeoutStep::Precommit => RoundInput::TimeoutPrecommit,

// The driver never receives a commit timeout, so we can just ignore it.
TimeoutStep::Commit => return Ok(None),
TimeoutStep::Commit => return Ok(Vec::new()),
};

self.apply_input(timeout.round, input)
Expand All @@ -282,7 +289,7 @@ where
&mut self,
input_round: Round,
input: RoundInput<Ctx>,
) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
) -> Result<Vec<RoundOutput<Ctx>>, Error<Ctx>> {
let round_state = core::mem::take(&mut self.round_state);
let current_step = round_state.step;

Expand All @@ -292,19 +299,31 @@ where
// Apply the input to the round state machine
let transition = round_state.apply(&info, input);

// Initialize outputs with the output of the transition
let mut outputs = Vec::from_iter(transition.output);

// Check if we need to change step
let pending_step = transition.next_state.step;

if current_step != pending_step {
let pending_input = self.multiplex_step_change(pending_step, input_round);
if let Some(pending_input) = self.multiplex_step_change(pending_step, input_round) {
self.pending_input = Some((input_round, pending_input));
} else {
self.pending_input = None;
}

self.pending_input = pending_input.map(|input| (input_round, input));
outputs.push(RoundOutput::MovedToStep(
transition.next_state.height,
transition.next_state.round,
pending_step,
));
}

// Update state
self.round_state = transition.next_state;

// Return output, if any
Ok(transition.output)
// Return outputs, if any
Ok(outputs)
}
}

Expand Down
4 changes: 4 additions & 0 deletions code/crates/driver/src/output.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use derive_where::derive_where;

use malachite_common::{Context, Round, Timeout};
use malachite_round::state::Step;

/// Messages emitted by the [`Driver`](crate::Driver)
#[derive_where(Clone, Debug, PartialEq, Eq)]
Expand All @@ -26,4 +27,7 @@ where
/// Ask for a value at the given height, round.
/// The timeout tells the proposal builder how long it has to build a value.
GetValue(Ctx::Height, Round, Timeout),

/// The round state machine has moved to a new step
MovedToStep(Ctx::Height, Round, Step),
}
5 changes: 3 additions & 2 deletions code/crates/gossip-consensus/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct Behaviour {
pub identify: identify::Behaviour,
pub gossipsub: gossipsub::Behaviour,
}

fn message_id(message: &gossipsub::Message) -> gossipsub::MessageId {
let hash = blake3::hash(&message.data);
gossipsub::MessageId::from(hash.as_bytes().to_vec())
Expand All @@ -28,12 +29,12 @@ fn gossipsub_config() -> gossipsub::Config {
.opportunistic_graft_ticks(3)
.heartbeat_interval(Duration::from_secs(1))
.validation_mode(gossipsub::ValidationMode::Strict)
.history_gossip(50)
.history_gossip(3)
.history_length(5)
.mesh_n_high(12)
.mesh_n_low(4)
.mesh_outbound_min(2)
.mesh_n(6)
.history_length(500)
.message_id_fn(message_id)
.build()
.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions code/crates/gossip-mempool/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ fn gossipsub_config() -> gossipsub::Config {
.opportunistic_graft_ticks(3)
.heartbeat_interval(Duration::from_secs(1))
.validation_mode(gossipsub::ValidationMode::Strict)
.history_gossip(50)
.history_gossip(3)
.history_length(5)
.mesh_n_high(12)
.mesh_n_low(4)
.mesh_outbound_min(2)
.mesh_n(6)
.history_length(500)
.message_id_fn(message_id)
.build()
.unwrap()
Expand Down
5 changes: 4 additions & 1 deletion code/crates/round/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use derive_where::derive_where;

use malachite_common::{Context, NilOrVal, Round, Timeout, TimeoutStep, ValueId};

use crate::state::RoundValue;
use crate::state::{RoundValue, Step};

/// Output of the round state machine.
#[derive_where(Clone, Debug, PartialEq, Eq)]
Expand All @@ -30,6 +30,9 @@ where

/// Decide the value.
Decision(RoundValue<Ctx::Value>),

/// The round state machine as moved to a new step
MovedToStep(Ctx::Height, Round, Step),
}

impl<Ctx: Context> Output<Ctx> {
Expand Down
Loading