diff --git a/code/crates/actors/src/consensus.rs b/code/crates/actors/src/consensus.rs index 4916c1f9c..2015ba871 100644 --- a/code/crates/actors/src/consensus.rs +++ b/code/crates/actors/src/consensus.rs @@ -533,6 +533,7 @@ where match output { DriverOutput::NewRound(height, round) => { info!("Starting round {round} at height {height}"); + self.metrics.round.set(round.as_i64()); let validator_set = &state.driver.validator_set; let proposer = self.get_proposer(height, round, validator_set).await?; @@ -545,6 +546,12 @@ where ))) } + DriverOutput::MovedToStep(height, round, step) => { + info!("Moved to step {step:?} at height {height} and round {round}"); + + Ok(Next::None) + } + DriverOutput::Propose(proposal) => { info!( "Proposing value with id: {}, at round {}", @@ -766,7 +773,8 @@ where self.metrics.block_start(); let round = Round::new(0); - info!("Starting height {height} at round {round}"); + info!("Starting new height {height}"); + self.metrics.height.set(height.as_u64() as i64); let validator_set = &state.driver.validator_set; let proposer = self.get_proposer(height, round, validator_set).await?; diff --git a/code/crates/actors/src/consensus/metrics.rs b/code/crates/actors/src/consensus/metrics.rs index 0f49bbf14..9e986c7c4 100644 --- a/code/crates/actors/src/consensus/metrics.rs +++ b/code/crates/actors/src/consensus/metrics.rs @@ -37,6 +37,12 @@ pub struct Inner { /// Number of connected peers, ie. for each consensus node, how many peers is it connected to) pub connected_peers: Gauge, + /// Current height + pub height: Gauge, + + /// Current round + pub round: Gauge, + /// Internal state for measuring time taken to finalize a block instant_block_started: Arc, } @@ -46,11 +52,13 @@ impl Metrics { Self(Arc::new(Inner { finalized_blocks: Counter::default(), finalized_txes: Counter::default(), - time_per_block: Histogram::new(linear_buckets(0.0, 1.0, 20)), + time_per_block: Histogram::new(linear_buckets(1.0, 0.1, 20)), block_tx_count: Histogram::new(linear_buckets(0.0, 32.0, 128)), block_size_bytes: Histogram::new(linear_buckets(0.0, 64.0 * 1024.0, 128)), rounds_per_block: Histogram::new(linear_buckets(0.0, 1.0, 20)), connected_peers: Gauge::default(), + height: Gauge::default(), + round: Gauge::default(), instant_block_started: Arc::new(AtomicInstant::empty()), })) } @@ -100,6 +108,18 @@ impl Metrics { "Number of connected peers, ie. for each consensus node, how many peers is it connected to", metrics.connected_peers.clone(), ); + + registry.register( + "height", + "Current height", + metrics.height.clone(), + ); + + registry.register( + "round", + "Current round", + metrics.round.clone(), + ); }); metrics diff --git a/code/crates/driver/src/driver.rs b/code/crates/driver/src/driver.rs index 69326c51f..ff47c3a7c 100644 --- a/code/crates/driver/src/driver.rs +++ b/code/crates/driver/src/driver.rs @@ -129,15 +129,18 @@ where /// Process the given input, returning the outputs to be broadcast to the network. pub fn process(&mut self, msg: Input) -> Result>, Error> { - let round_output = match self.apply(msg)? { - Some(msg) => msg, - None => return Ok(Vec::new()), - }; + let round_outputs = self.apply(msg)?; + + if round_outputs.is_empty() { + return Ok(vec![]); + } let mut outputs = vec![]; - // Lift the round state machine output to one or more driver outputs - self.lift_output(round_output, &mut outputs); + // Lift the round state machine outputs to one or more driver outputs + for round_output in round_outputs { + self.lift_output(round_output, &mut outputs); + } // Apply the pending inputs, if any, and lift their outputs self.process_pending(&mut outputs)?; @@ -148,9 +151,9 @@ where /// Process the pending input, if any. fn process_pending(&mut self, outputs: &mut Vec>) -> Result<(), Error> { while let Some((round, input)) = self.pending_input.take() { - if let Some(round_output) = self.apply_input(round, input)? { + for round_output in self.apply_input(round, input)? { self.lift_output(round_output, outputs); - }; + } } Ok(()) @@ -173,11 +176,15 @@ where } RoundOutput::Decision(value) => outputs.push(Output::Decide(value.round, value.value)), + + RoundOutput::MovedToStep(height, round, step) => { + outputs.push(Output::MovedToStep(height, round, step)) + } } } /// Apply the given input to the state machine, returning the output, if any. - fn apply(&mut self, input: Input) -> Result>, Error> { + fn apply(&mut self, input: Input) -> Result>, Error> { match input { Input::NewRound(height, round, proposer) => { self.apply_new_round(height, round, proposer) @@ -194,7 +201,7 @@ where height: Ctx::Height, round: Round, proposer: Ctx::Address, - ) -> Result>, Error> { + ) -> Result>, Error> { if self.height() == height { // If it's a new round for same height, just reset the round, keep the valid and locked values self.round_state.round = round; @@ -212,7 +219,7 @@ where &mut self, round: Round, value: Ctx::Value, - ) -> Result>, Error> { + ) -> Result>, Error> { self.apply_input(round, RoundInput::ProposeValue(value)) } @@ -220,7 +227,7 @@ where &mut self, proposal: Ctx::Proposal, validity: Validity, - ) -> Result>, Error> { + ) -> Result>, Error> { if self.height() != proposal.height() { return Err(Error::InvalidProposalHeight { proposal_height: proposal.height(), @@ -232,11 +239,11 @@ where match self.multiplex_proposal(proposal, validity) { Some(round_input) => self.apply_input(round, round_input), - None => Ok(None), + None => Ok(Vec::new()), } } - fn apply_vote(&mut self, vote: Ctx::Vote) -> Result>, Error> { + fn apply_vote(&mut self, vote: Ctx::Vote) -> Result>, Error> { if self.height() != vote.height() { return Err(Error::InvalidVoteHeight { vote_height: vote.height(), @@ -257,21 +264,21 @@ where .apply_vote(vote, validator.voting_power(), current_round); let Some(vote_output) = vote_output else { - return Ok(None); + return Ok(Vec::new()); }; let round_input = self.multiplex_vote_threshold(vote_output); self.apply_input(vote_round, round_input) } - fn apply_timeout(&mut self, timeout: Timeout) -> Result>, Error> { + fn apply_timeout(&mut self, timeout: Timeout) -> Result>, Error> { let input = match timeout.step { TimeoutStep::Propose => RoundInput::TimeoutPropose, TimeoutStep::Prevote => RoundInput::TimeoutPrevote, TimeoutStep::Precommit => RoundInput::TimeoutPrecommit, // The driver never receives a commit timeout, so we can just ignore it. - TimeoutStep::Commit => return Ok(None), + TimeoutStep::Commit => return Ok(Vec::new()), }; self.apply_input(timeout.round, input) @@ -282,7 +289,7 @@ where &mut self, input_round: Round, input: RoundInput, - ) -> Result>, Error> { + ) -> Result>, Error> { let round_state = core::mem::take(&mut self.round_state); let current_step = round_state.step; @@ -292,19 +299,31 @@ where // Apply the input to the round state machine let transition = round_state.apply(&info, input); + // Initialize outputs with the output of the transition + let mut outputs = Vec::from_iter(transition.output); + + // Check if we need to change step let pending_step = transition.next_state.step; if current_step != pending_step { - let pending_input = self.multiplex_step_change(pending_step, input_round); + if let Some(pending_input) = self.multiplex_step_change(pending_step, input_round) { + self.pending_input = Some((input_round, pending_input)); + } else { + self.pending_input = None; + } - self.pending_input = pending_input.map(|input| (input_round, input)); + outputs.push(RoundOutput::MovedToStep( + transition.next_state.height, + transition.next_state.round, + pending_step, + )); } // Update state self.round_state = transition.next_state; - // Return output, if any - Ok(transition.output) + // Return outputs, if any + Ok(outputs) } } diff --git a/code/crates/driver/src/output.rs b/code/crates/driver/src/output.rs index 3fc85f281..e4a028fec 100644 --- a/code/crates/driver/src/output.rs +++ b/code/crates/driver/src/output.rs @@ -1,6 +1,7 @@ use derive_where::derive_where; use malachite_common::{Context, Round, Timeout}; +use malachite_round::state::Step; /// Messages emitted by the [`Driver`](crate::Driver) #[derive_where(Clone, Debug, PartialEq, Eq)] @@ -26,4 +27,7 @@ where /// Ask for a value at the given height, round. /// The timeout tells the proposal builder how long it has to build a value. GetValue(Ctx::Height, Round, Timeout), + + /// The round state machine has moved to a new step + MovedToStep(Ctx::Height, Round, Step), } diff --git a/code/crates/gossip-consensus/src/behaviour.rs b/code/crates/gossip-consensus/src/behaviour.rs index fb3561e7b..0853d3dad 100644 --- a/code/crates/gossip-consensus/src/behaviour.rs +++ b/code/crates/gossip-consensus/src/behaviour.rs @@ -17,6 +17,7 @@ pub struct Behaviour { pub identify: identify::Behaviour, pub gossipsub: gossipsub::Behaviour, } + fn message_id(message: &gossipsub::Message) -> gossipsub::MessageId { let hash = blake3::hash(&message.data); gossipsub::MessageId::from(hash.as_bytes().to_vec()) @@ -28,12 +29,12 @@ fn gossipsub_config() -> gossipsub::Config { .opportunistic_graft_ticks(3) .heartbeat_interval(Duration::from_secs(1)) .validation_mode(gossipsub::ValidationMode::Strict) - .history_gossip(50) + .history_gossip(3) + .history_length(5) .mesh_n_high(12) .mesh_n_low(4) .mesh_outbound_min(2) .mesh_n(6) - .history_length(500) .message_id_fn(message_id) .build() .unwrap() diff --git a/code/crates/gossip-mempool/src/behaviour.rs b/code/crates/gossip-mempool/src/behaviour.rs index cc7778fb3..f9f09de39 100644 --- a/code/crates/gossip-mempool/src/behaviour.rs +++ b/code/crates/gossip-mempool/src/behaviour.rs @@ -30,12 +30,12 @@ fn gossipsub_config() -> gossipsub::Config { .opportunistic_graft_ticks(3) .heartbeat_interval(Duration::from_secs(1)) .validation_mode(gossipsub::ValidationMode::Strict) - .history_gossip(50) + .history_gossip(3) + .history_length(5) .mesh_n_high(12) .mesh_n_low(4) .mesh_outbound_min(2) .mesh_n(6) - .history_length(500) .message_id_fn(message_id) .build() .unwrap() diff --git a/code/crates/round/src/output.rs b/code/crates/round/src/output.rs index c90972476..c62b3c7bf 100644 --- a/code/crates/round/src/output.rs +++ b/code/crates/round/src/output.rs @@ -4,7 +4,7 @@ use derive_where::derive_where; use malachite_common::{Context, NilOrVal, Round, Timeout, TimeoutStep, ValueId}; -use crate::state::RoundValue; +use crate::state::{RoundValue, Step}; /// Output of the round state machine. #[derive_where(Clone, Debug, PartialEq, Eq)] @@ -30,6 +30,9 @@ where /// Decide the value. Decision(RoundValue), + + /// The round state machine as moved to a new step + MovedToStep(Ctx::Height, Round, Step), } impl Output {