diff --git a/.github/codespell/codespell.ini b/.github/codespell/codespell.ini index 85fa043e4..68615c7c5 100644 --- a/.github/codespell/codespell.ini +++ b/.github/codespell/codespell.ini @@ -1,3 +1,3 @@ [codespell] -skip = ./code/target +skip = ./code/target,./code/Cargo.lock ignore-words = .github/codespell/words.txt diff --git a/.github/codespell/words.txt b/.github/codespell/words.txt index 8b24813a0..3ebd95862 100644 --- a/.github/codespell/words.txt +++ b/.github/codespell/words.txt @@ -1,5 +1,5 @@ crate -shs -ser -numer manuel +numer +ser +shs diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index c9845fced..be853e20f 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -25,10 +25,14 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 - - uses: actions/setup-node@v3 + - name: Install Protoc + uses: arduino/setup-protoc@v3 + - name: Setup Node + uses: actions/setup-node@v3 with: node-version: "18" - - run: npm install -g @informalsystems/quint + - name: Install Quint + run: npm install -g @informalsystems/quint - name: Setup Rust toolchain uses: actions-rust-lang/setup-rust-toolchain@v1 with: @@ -39,7 +43,15 @@ jobs: - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov - name: Generate code coverage - run: cargo llvm-cov nextest --workspace --exclude malachite-itf --all-features --ignore-run-fail --lcov --output-path lcov.info + run: | + cargo llvm-cov nextest \ + --workspace \ + --exclude malachite-itf \ + --ignore-filename-regex node/bin \ + --all-features \ + --ignore-run-fail \ + --lcov \ + --output-path lcov.info - name: Generate text report run: cargo llvm-cov report - name: Upload coverage to Codecov @@ -61,10 +73,14 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 - - uses: actions/setup-node@v3 + - name: Install Protoc + uses: arduino/setup-protoc@v3 + - name: Setup Node + uses: actions/setup-node@v3 with: node-version: "18" - - run: npm install -g @informalsystems/quint + - name: Install Quint + run: npm install -g @informalsystems/quint - name: Setup Rust toolchain uses: actions-rust-lang/setup-rust-toolchain@v1 with: diff --git a/.github/workflows/mbt.yml b/.github/workflows/mbt.yml index 725550ec4..9af99db13 100644 --- a/.github/workflows/mbt.yml +++ b/.github/workflows/mbt.yml @@ -27,10 +27,14 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 - - uses: actions/setup-node@v3 + - name: Install Protoc + uses: arduino/setup-protoc@v3 + - name: Setup Node + uses: actions/setup-node@v3 with: node-version: "18" - - run: npm install -g @informalsystems/quint + - name: Install Quint + run: npm install -g @informalsystems/quint - name: Setup Rust toolchain uses: actions-rust-lang/setup-rust-toolchain@v1 - name: Install cargo-nextest diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 67898cedc..0c5d66700 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -31,10 +31,14 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 - - uses: actions/setup-node@v3 + - name: Install Protoc + uses: arduino/setup-protoc@v3 + - name: Setup Node + uses: actions/setup-node@v3 with: node-version: "18" - - run: npm install -g @informalsystems/quint + - name: Install Quint + run: npm install -g @informalsystems/quint - name: Setup Rust toolchain uses: actions-rust-lang/setup-rust-toolchain@v1 - name: Install cargo-nextest @@ -42,7 +46,7 @@ jobs: - name: Build code run: cargo nextest run --workspace --all-features --no-run - name: Run tests - run: cargo nextest run --workspace --all-features + run: cargo nextest run --workspace --all-features --no-capture clippy: name: Clippy @@ -50,6 +54,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + - name: Install Protoc + uses: arduino/setup-protoc@v3 - name: Setup Rust toolchain uses: actions-rust-lang/setup-rust-toolchain@v1 with: diff --git a/code/Cargo.toml b/code/Cargo.toml index cfa91257d..29243d39b 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -2,9 +2,15 @@ resolver = "2" members = [ + "actors", + "cli", "common", - "driver", + "driver", + "gossip", "itf", + "network", + "node", + "proto", "round", "test", "vote", @@ -18,18 +24,54 @@ license = "Apache-2.0" publish = false rust-version = "1.77" +[workspace.lints.rust] +# unused_crate_dependencies = "warn" + [workspace.dependencies] -derive-where = "1.2.7" -ed25519-consensus = "2.1.0" -futures = "0.3" -glob = "0.3.0" -itf = "0.2.3" -num-bigint = "0.4.4" -num-traits = "0.2.17" -pretty_assertions = "1.4" -rand = { version = "0.8.5", features = ["std_rng"] } -serde = "1.0" -serde_json = "1.0" -serde_with = "3.4" -sha2 = "0.10.8" -signature = "2.1.0" +malachite-actors = { version = "0.1.0", path = "actors" } +malachite-cli = { version = "0.1.0", path = "cli" } +malachite-common = { version = "0.1.0", path = "common" } +malachite-driver = { version = "0.1.0", path = "driver" } +malachite-gossip = { version = "0.1.0", path = "gossip" } +malachite-network = { version = "0.1.0", path = "network" } +malachite-itf = { version = "0.1.0", path = "itf" } +malachite-node = { version = "0.1.0", path = "node" } +malachite-proto = { version = "0.1.0", path = "proto" } +malachite-round = { version = "0.1.0", path = "round" } +malachite-test = { version = "0.1.0", path = "test" } +malachite-vote = { version = "0.1.0", path = "vote" } + +async-trait = "0.1.77" +clap = { version = "4.5.4", features = ["derive"] } +color-eyre = "0.6" +derive-where = "1.2.7" +ed25519-consensus = "2.1.0" +futures = "0.3" +glob = "0.3.0" +humantime-serde = "1.1.1" +itertools = "0.12" +itf = "0.2.3" +libp2p = { version = "0.53.2", features = ["macros", "mdns", "identify", "tokio", "ed25519", "quic", "tls", "gossipsub"] } +libp2p-gossipsub = { version = "0.46.1" } +multiaddr = "0.18.1" +num-bigint = "0.4.4" +num-traits = "0.2.17" +pretty_assertions = "1.4" +prost = "0.12.3" +prost-build = "0.12.3" +prost-types = "0.12.3" +ractor = "0.9.6" +ractor_actors = { version = "0.4.0", default-features= false } +rand = { version = "0.8.5", features = ["std_rng"] } +rand_chacha = "0.3.1" +serde = "1.0" +serde_json = "1.0" +serde_with = "3.7" +sha2 = "0.10.8" +signature = "2.2.0" +thiserror = "1.0" +tokio = "1.36.0" +tokio-stream = "0.1" +toml = "0.8.10" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" diff --git a/code/actors/Cargo.toml b/code/actors/Cargo.toml new file mode 100644 index 000000000..b107cab1d --- /dev/null +++ b/code/actors/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "malachite-actors" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[lints] +workspace = true + +[dependencies] +malachite-common.workspace = true +malachite-driver.workspace = true +malachite-gossip.workspace = true +malachite-network.workspace = true +malachite-node.workspace = true +malachite-proto.workspace = true +malachite-test.workspace = true +malachite-vote.workspace = true + +async-trait = { workspace = true } +libp2p = { workspace = true } +ractor = { workspace = true, features = ["async-trait"] } +rand = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } + +[dev-dependencies] +malachite-test = { workspace = true } +rand_chacha = { workspace = true } +tracing-subscriber = { workspace = true, features = ["fmt"] } diff --git a/code/actors/src/consensus.rs b/code/actors/src/consensus.rs new file mode 100644 index 000000000..fb27eeeff --- /dev/null +++ b/code/actors/src/consensus.rs @@ -0,0 +1,657 @@ +use std::collections::VecDeque; +use std::fmt::Display; +use std::sync::Arc; +use std::time::Instant; + +use async_trait::async_trait; +use ractor::rpc::call_and_forward; +use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef}; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +use malachite_common::{ + Context, Height, NilOrVal, Proposal, Round, SignedProposal, SignedVote, Timeout, TimeoutStep, + Validator, ValidatorSet, ValueId, Vote, VoteType, +}; +use malachite_driver::Input as DriverInput; +use malachite_driver::Output as DriverOutput; +use malachite_driver::Validity; +use malachite_driver::{Driver, ProposerSelector}; +use malachite_gossip::{Channel, Event as GossipEvent}; +use malachite_network::Msg as NetworkMsg; +use malachite_network::PeerId; +use malachite_proto as proto; +use malachite_proto::Protobuf; +use malachite_vote::{Threshold, ThresholdParams}; + +use crate::gossip::Msg as GossipMsg; +use crate::proposal_builder::{BuildProposal, ProposedValue}; +use crate::timers::{Config as TimersConfig, Msg as TimersMsg, TimeoutElapsed, Timers}; +use crate::util::forward; + +pub enum Next { + None, + Input(DriverInput), + Decided(Round, Ctx::Value), +} + +pub struct Params { + pub start_height: Ctx::Height, + pub proposer_selector: Arc>, + pub validator_set: Ctx::ValidatorSet, + pub address: Ctx::Address, + pub threshold_params: ThresholdParams, +} + +pub struct Consensus +where + Ctx: Context, +{ + ctx: Ctx, + params: Params, + timers_config: TimersConfig, + gossip: ActorRef, + proposal_builder: ActorRef>, + tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>, +} + +pub enum Msg { + StartHeight(Ctx::Height), + MoveToHeight(Ctx::Height), + MoveToNextHeight, + GossipEvent(Arc), + TimeoutElapsed(Timeout), + ProposeValue(Ctx::Height, Round, Option), + SendDriverInput(DriverInput), + Decided(Ctx::Height, Round, Ctx::Value), + ProcessDriverOutputs( + Vec>, + Option<(VoteType, Round, NilOrVal>)>, + ), +} + +impl From for Msg { + fn from(msg: TimeoutElapsed) -> Self { + Msg::TimeoutElapsed(msg.timeout()) + } +} + +pub struct State +where + Ctx: Context, +{ + driver: Driver, + timers: ActorRef, + msg_queue: VecDeque>, +} + +impl Consensus +where + Ctx: Context, + Ctx::Vote: Protobuf, + Ctx::Proposal: Protobuf, +{ + pub fn new( + ctx: Ctx, + params: Params, + timers_config: TimersConfig, + gossip: ActorRef, + proposal_builder: ActorRef>, + tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>, + ) -> Self { + Self { + ctx, + params, + timers_config, + gossip, + proposal_builder, + tx_decision, + } + } + + pub async fn spawn( + ctx: Ctx, + params: Params, + timers_config: TimersConfig, + gossip: ActorRef, + proposal_builder: ActorRef>, + tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>, + supervisor: Option, + ) -> Result>, ractor::SpawnErr> { + let node = Self::new( + ctx, + params, + timers_config, + gossip, + proposal_builder, + tx_decision, + ); + + let (actor_ref, _) = if let Some(supervisor) = supervisor { + Actor::spawn_linked(None, node, (), supervisor).await? + } else { + Actor::spawn(None, node, ()).await? + }; + + Ok(actor_ref) + } + + pub async fn handle_gossip_event( + &self, + event: &GossipEvent, + myself: ActorRef>, + state: &mut State, + ) -> Result<(), ractor::ActorProcessingErr> { + match event { + GossipEvent::Listening(addr) => { + info!("Listening on {addr}"); + } + GossipEvent::PeerConnected(peer_id) => { + info!("Connected to peer {peer_id}"); + } + GossipEvent::PeerDisconnected(peer_id) => { + info!("Disconnected from peer {peer_id}"); + } + GossipEvent::Message(from, Channel::Consensus, data) => { + let from = PeerId::new(from.to_string()); + let msg = NetworkMsg::from_network_bytes(data).unwrap(); + + info!("Received message from peer {from}: {msg:?}"); + + self.handle_network_msg(from, msg, myself, state).await?; + } + } + + Ok(()) + } + + pub async fn handle_network_msg( + &self, + from: PeerId, + msg: NetworkMsg, + myself: ActorRef>, + state: &mut State, + ) -> Result<(), ractor::ActorProcessingErr> { + match msg { + NetworkMsg::Vote(signed_vote) => { + let signed_vote = SignedVote::::from_proto(signed_vote).unwrap(); // FIXME + let validator_address = signed_vote.validator_address(); + + info!(%from, %validator_address, "Received vote: {:?}", signed_vote.vote); + + let Some(validator) = self.params.validator_set.get_by_address(validator_address) + else { + warn!(%from, %validator_address, "Received vote from unknown validator"); + return Ok(()); + }; + + if !self + .ctx + .verify_signed_vote(&signed_vote, validator.public_key()) + { + warn!(%from, %validator_address, "Received invalid vote: {signed_vote:?}"); + return Ok(()); + } + + let vote_height = signed_vote.vote.height(); + + if vote_height > state.driver.height() { + warn!( + %from, %validator_address, + "Received vote for height {0} greater than current height {1}, moving to height {0}", + vote_height, state.driver.height(), + ); + + // FIXME: We lose the vote here. We should instead buffer it + // and process it once we moved to the correct height. + // NOTE: We cannot just send the vote via `SendDriverInput` because otherwise + // the vote will reach the driver before it has started the new height. + myself.cast(Msg::MoveToHeight(vote_height))?; + + return Ok(()); + } + + myself.cast(Msg::SendDriverInput(DriverInput::Vote(signed_vote.vote)))?; + } + + NetworkMsg::Proposal(proposal) => { + let signed_proposal = SignedProposal::::from_proto(proposal).unwrap(); + let validator_address = signed_proposal.proposal.validator_address(); + + info!(%from, %validator_address, "Received proposal: {:?}", signed_proposal.proposal); + + let Some(validator) = self.params.validator_set.get_by_address(validator_address) + else { + warn!(%from, %validator_address, "Received proposal from unknown validator"); + return Ok(()); + }; + + let valid = self + .ctx + .verify_signed_proposal(&signed_proposal, validator.public_key()); + + let proposal_height = signed_proposal.proposal.height(); + + if proposal_height > state.driver.height() { + warn!( + %from, %validator_address, + "Received proposal for height {0} greater than current height {1}, moving to height {0}", + proposal_height, state.driver.height(), + ); + + // FIXME: We lose the proposal here. We should instead buffer it + // and process it once we moved to the correct height. + // NOTE: We cannot just send the proposal via `SendDriverInput` because otherwise + // the proposal will reach the driver before it has started the new height. + myself.cast(Msg::MoveToHeight(proposal_height))?; + + return Ok(()); + } + + myself.cast(Msg::SendDriverInput(DriverInput::Proposal( + signed_proposal.proposal, + Validity::from_valid(valid), + )))?; + } + } + + Ok(()) + } + + pub async fn handle_timeout( + &self, + timeout: Timeout, + myself: ActorRef>, + state: &mut State, + ) -> Result<(), ractor::ActorProcessingErr> { + let height = state.driver.height(); + let round = state.driver.round(); + + if timeout.round != round { + debug!( + "Ignoring timeout for round {} at height {}, current round: {round}", + timeout.round, height + ); + + return Ok(()); + } + + info!("{timeout} elapsed at height {height} and round {round}"); + + myself.cast(Msg::SendDriverInput(DriverInput::TimeoutElapsed(timeout)))?; + + if timeout.step == TimeoutStep::Commit { + myself.cast(Msg::MoveToNextHeight)?; + } + + Ok(()) + } + + pub async fn send_driver_input( + &self, + input: DriverInput, + myself: ActorRef>, + state: &mut State, + ) -> Result<(), ractor::ActorProcessingErr> { + match &input { + DriverInput::NewRound(_, _) => { + state.timers.cast(TimersMsg::Reset)?; + } + + DriverInput::ProposeValue(round, _) => state + .timers + .cast(TimersMsg::CancelTimeout(Timeout::propose(*round)))?, + + DriverInput::Proposal(proposal, _) => { + let round = Proposal::::round(proposal); + state + .timers + .cast(TimersMsg::CancelTimeout(Timeout::propose(round)))?; + } + + DriverInput::Vote(_) => (), + DriverInput::TimeoutElapsed(_) => (), + } + + let check_threshold = if let DriverInput::Vote(vote) = &input { + let round = Vote::::round(vote); + let value = Vote::::value(vote); + + Some((vote.vote_type(), round, value.clone())) + } else { + None + }; + + let outputs = state + .driver + .process(input) + .map_err(|e| format!("Driver failed to process input: {e}"))?; + + myself.cast(Msg::ProcessDriverOutputs(outputs, check_threshold))?; + + Ok(()) + } + + async fn process_driver_outputs( + &self, + outputs: Vec>, + check_threshold: Option<(VoteType, Round, NilOrVal>)>, + myself: ActorRef>, + state: &mut State, + ) -> Result<(), ActorProcessingErr> { + // When we receive a vote, check if we've gotten +2/3 votes for the value we just received a vote for, + // if so then cancel the corresponding timeout. + if let Some((vote_type, round, value)) = check_threshold { + let threshold = match value { + NilOrVal::Nil => Threshold::Nil, + NilOrVal::Val(value) => Threshold::Value(value), + }; + + let votes = state.driver.votes(); + + if votes.is_threshold_met(&round, vote_type, threshold.clone()) { + let timeout = match vote_type { + VoteType::Prevote => Timeout::prevote(round), + VoteType::Precommit => Timeout::precommit(round), + }; + + info!("Threshold met for {threshold:?} at round {round}, cancelling {timeout}"); + state.timers.cast(TimersMsg::CancelTimeout(timeout))?; + } + } + + for output in outputs { + let next = self + .handle_driver_output(output, myself.clone(), state) + .await?; + + match next { + Next::None => (), + + Next::Input(input) => myself.cast(Msg::SendDriverInput(input))?, + + Next::Decided(round, value) => { + state + .timers + .cast(TimersMsg::ScheduleTimeout(Timeout::commit(round)))?; + + myself.cast(Msg::Decided(state.driver.height(), round, value))?; + } + } + } + + Ok(()) + } + + async fn handle_driver_output( + &self, + output: DriverOutput, + myself: ActorRef>, + state: &mut State, + ) -> Result, ActorProcessingErr> { + match output { + DriverOutput::NewRound(height, round) => { + info!("New round at height {height}: {round}"); + + Ok(Next::Input(DriverInput::NewRound(height, round))) + } + + DriverOutput::Propose(proposal) => { + info!( + "Proposing value {:?} at round {}", + proposal.value(), + proposal.round() + ); + + let signed_proposal = self.ctx.sign_proposal(proposal); + + // TODO: Refactor to helper method + let proto = signed_proposal.to_proto().unwrap(); // FIXME + let msg = NetworkMsg::Proposal(proto); + let bytes = msg.to_network_bytes().unwrap(); // FIXME + self.gossip + .cast(GossipMsg::Broadcast(Channel::Consensus, bytes))?; + + Ok(Next::Input(DriverInput::Proposal( + signed_proposal.proposal, + Validity::Valid, + ))) + } + + DriverOutput::Vote(vote) => { + info!( + "Voting for value {:?} at round {}", + vote.value(), + vote.round() + ); + + let signed_vote = self.ctx.sign_vote(vote); + + // TODO: Refactor to helper method + let proto = signed_vote.to_proto().unwrap(); // FIXME + let msg = NetworkMsg::Vote(proto); + let bytes = msg.to_network_bytes().unwrap(); // FIXME + self.gossip + .cast(GossipMsg::Broadcast(Channel::Consensus, bytes))?; + + Ok(Next::Input(DriverInput::Vote(signed_vote.vote))) + } + + DriverOutput::Decide(round, value) => { + info!("Decided on value {value:?} at round {round}"); + + let _ = self + .tx_decision + .send((state.driver.height(), round, value.clone())) + .await; + + Ok(Next::Decided(round, value)) + } + + DriverOutput::ScheduleTimeout(timeout) => { + info!("Scheduling {timeout}"); + state.timers.cast(TimersMsg::ScheduleTimeout(timeout))?; + + Ok(Next::None) + } + + DriverOutput::GetValue(height, round, timeout) => { + info!("Requesting value at height {height} and round {round}"); + self.get_value(myself, height, round, timeout).await?; + + Ok(Next::None) + } + } + } + + pub async fn get_value( + &self, + myself: ActorRef>, + height: Ctx::Height, + round: Round, + timeout: Timeout, + ) -> Result<(), ActorProcessingErr> { + let deadline = Instant::now() + self.timers_config.timeout_duration(timeout.step); + + // Call `BuildProposal` on the proposal builder actor, + // and forward the reply to the current actor, wrapping it in `Msg::ProposeValue`. + call_and_forward( + &self.proposal_builder.get_cell(), + |reply| BuildProposal { + height, + round, + deadline, + reply, + }, + myself.get_cell(), + |proposed: ProposedValue| { + Msg::::ProposeValue(proposed.height, proposed.round, proposed.value) + }, + None, + )?; + + Ok(()) + } +} + +#[async_trait] +impl Actor for Consensus +where + Ctx: Context, + Ctx::Height: Display, + Ctx::Vote: Protobuf, + Ctx::Proposal: Protobuf, +{ + type Msg = Msg; + type State = State; + type Arguments = (); + + async fn pre_start( + &self, + myself: ActorRef>, + _args: (), + ) -> Result, ractor::ActorProcessingErr> { + let (timers, _) = + Timers::spawn_linked(self.timers_config, myself.clone(), myself.get_cell()).await?; + + let forward = forward(myself.clone(), Some(myself.get_cell()), Msg::GossipEvent).await?; + self.gossip.cast(GossipMsg::Subscribe(forward))?; + + let driver = Driver::new( + self.ctx.clone(), + self.params.start_height, + self.params.proposer_selector.clone(), + self.params.validator_set.clone(), + self.params.address.clone(), + self.params.threshold_params, + ); + + Ok(State { + driver, + timers, + msg_queue: VecDeque::new(), + }) + } + + #[tracing::instrument( + name = "node", + skip(self, myself, msg, state), + fields( + height = %state.driver.height(), + round = %state.driver.round() + ) + )] + async fn handle( + &self, + myself: ActorRef>, + msg: Msg, + state: &mut State, + ) -> Result<(), ractor::ActorProcessingErr> { + match msg { + Msg::StartHeight(height) => { + info!("Starting height {height}"); + + myself.cast(Msg::SendDriverInput(DriverInput::NewRound( + height, + Round::new(0), + )))?; + + // Drain the pending message queue to process any gossip events that were received + // before the driver started the new height and was still at round Nil. + let pending_msgs = std::mem::take(&mut state.msg_queue); + for msg in pending_msgs { + myself.cast(msg)?; + } + } + + Msg::MoveToNextHeight => { + let height = state.driver.height().increment(); + info!("Moving to next height {height}"); + + myself.cast(Msg::MoveToHeight(height))?; + } + + Msg::MoveToHeight(height) => { + state.timers.cast(TimersMsg::Reset)?; + state.driver.move_to_height(height); + + debug_assert_eq!(state.driver.height(), height); + debug_assert_eq!(state.driver.round(), Round::Nil); + + myself.cast(Msg::StartHeight(height))?; + } + + Msg::ProposeValue(height, round, value) => { + if state.driver.height() != height { + warn!( + "Ignoring proposal for height {height}, current height: {}", + state.driver.height() + ); + + return Ok(()); + } + + if state.driver.round() != round { + warn!( + "Ignoring proposal for round {round}, current round: {}", + state.driver.round() + ); + + return Ok(()); + } + + match value { + Some(value) => myself.cast(Msg::SendDriverInput(DriverInput::ProposeValue( + round, value, + )))?, + + None => warn!( + %height, %round, + "Proposal builder failed to build a value within the deadline" + ), + } + } + + Msg::Decided(height, round, value) => { + info!("Decided on value {value:?} at height {height} and round {round}"); + } + + Msg::GossipEvent(event) => { + if state.driver.round() == Round::Nil { + debug!("Received gossip event at round -1, queuing for later"); + state.msg_queue.push_back(Msg::GossipEvent(event)); + } else { + self.handle_gossip_event(event.as_ref(), myself, state) + .await?; + } + } + + Msg::TimeoutElapsed(timeout) => { + self.handle_timeout(timeout, myself, state).await?; + } + + Msg::SendDriverInput(input) => { + self.send_driver_input(input, myself, state).await?; + } + + Msg::ProcessDriverOutputs(outputs, check_threshold) => { + self.process_driver_outputs(outputs, check_threshold, myself, state) + .await?; + } + } + + Ok(()) + } + + async fn post_stop( + &self, + _myself: ActorRef, + state: &mut State, + ) -> Result<(), ActorProcessingErr> { + info!("Stopping..."); + + state.timers.stop(None); + + Ok(()) + } +} diff --git a/code/actors/src/gossip.rs b/code/actors/src/gossip.rs new file mode 100644 index 000000000..28098049e --- /dev/null +++ b/code/actors/src/gossip.rs @@ -0,0 +1,151 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use libp2p::identity::Keypair; +use libp2p::Multiaddr; +use malachite_gossip::Channel; +use ractor::Actor; +use ractor::ActorCell; +use ractor::ActorProcessingErr; +use ractor::ActorRef; +use tokio::task::JoinHandle; + +use malachite_gossip::handle::CtrlHandle; +use malachite_gossip::Config; +use malachite_gossip::Event; + +pub struct Gossip; + +impl Gossip { + pub async fn spawn( + keypair: Keypair, + addr: Multiaddr, + config: Config, + supervisor: Option, + ) -> Result, ractor::SpawnErr> { + let args = Args { + keypair, + addr, + config, + }; + + let (actor_ref, _) = if let Some(supervisor) = supervisor { + Actor::spawn_linked(None, Self, args, supervisor).await? + } else { + Actor::spawn(None, Self, args).await? + }; + + Ok(actor_ref) + } +} + +pub struct Args { + pub keypair: Keypair, + pub addr: Multiaddr, + pub config: Config, +} + +pub enum State { + Stopped, + Running { + subscribers: Vec>>, + ctrl_handle: CtrlHandle, + recv_task: JoinHandle<()>, + }, +} + +pub enum Msg { + Subscribe(ActorRef>), + Broadcast(Channel, Vec), + + // Internal message + #[doc(hidden)] + NewEvent(Event), +} + +#[async_trait] +impl Actor for Gossip { + type Msg = Msg; + type State = State; + type Arguments = Args; + + async fn pre_start( + &self, + myself: ActorRef, + args: Args, + ) -> Result { + let handle = malachite_gossip::spawn(args.keypair, args.addr, args.config).await?; + let (mut recv_handle, ctrl_handle) = handle.split(); + + let recv_task = tokio::spawn({ + async move { + while let Some(event) = recv_handle.recv().await { + myself.cast(Msg::NewEvent(event)).unwrap(); // FIXME + } + } + }); + + Ok(State::Running { + subscribers: Vec::new(), + ctrl_handle, + recv_task, + }) + } + + async fn post_start( + &self, + _myself: ActorRef, + _state: &mut State, + ) -> Result<(), ActorProcessingErr> { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + msg: Msg, + state: &mut State, + ) -> Result<(), ActorProcessingErr> { + let State::Running { + subscribers, + ctrl_handle, + .. + } = state + else { + return Ok(()); + }; + + match msg { + Msg::Subscribe(subscriber) => subscribers.push(subscriber), + Msg::Broadcast(channel, data) => ctrl_handle.broadcast(channel, data).await?, + Msg::NewEvent(event) => { + let event = Arc::new(event); + for subscriber in subscribers { + subscriber.cast(Arc::clone(&event))?; + } + } + } + + Ok(()) + } + + async fn post_stop( + &self, + _myself: ActorRef, + state: &mut State, + ) -> Result<(), ActorProcessingErr> { + let state = std::mem::replace(state, State::Stopped); + + if let State::Running { + ctrl_handle, + recv_task, + .. + } = state + { + ctrl_handle.wait_shutdown().await?; + recv_task.await?; + } + + Ok(()) + } +} diff --git a/code/actors/src/lib.rs b/code/actors/src/lib.rs new file mode 100644 index 000000000..5297990a2 --- /dev/null +++ b/code/actors/src/lib.rs @@ -0,0 +1,7 @@ +pub mod consensus; +pub mod gossip; +pub mod node; +pub mod prelude; +pub mod proposal_builder; +pub mod timers; +pub mod util; diff --git a/code/actors/src/node.rs b/code/actors/src/node.rs new file mode 100644 index 000000000..31d8ca73f --- /dev/null +++ b/code/actors/src/node.rs @@ -0,0 +1,146 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use ractor::{Actor, ActorRef}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +use malachite_common::{Context, Round}; +use malachite_driver::ProposerSelector; +use malachite_node::value_builder::ValueBuilder; +use malachite_proto::Protobuf; +use malachite_vote::ThresholdParams; + +use crate::consensus::{Consensus, Msg as ConsensusMsg, Params as ConsensusParams}; +use crate::gossip::{Gossip, Msg as GossipMsg}; +use crate::proposal_builder::ProposalBuilder; +use crate::timers::Config as TimersConfig; + +pub struct Params { + pub address: Ctx::Address, + pub initial_validator_set: Ctx::ValidatorSet, + pub keypair: malachite_gossip::Keypair, + pub proposer_selector: Arc>, + pub start_height: Ctx::Height, + pub threshold_params: ThresholdParams, + pub timers_config: TimersConfig, + pub value_builder: Box>, + pub tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>, +} + +pub async fn spawn( + ctx: Ctx, + params: Params, +) -> Result<(ActorRef, JoinHandle<()>), ractor::ActorProcessingErr> +where + Ctx: Context, + Ctx::Vote: Protobuf, + Ctx::Proposal: Protobuf, +{ + let proposal_builder = ProposalBuilder::spawn(params.value_builder, None).await?; + + let consensus_params = ConsensusParams { + start_height: params.start_height, + proposer_selector: params.proposer_selector, + validator_set: params.initial_validator_set, + address: params.address, + threshold_params: params.threshold_params, + }; + + let addr = "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(); + let config = malachite_gossip::Config::default(); + let gossip = Gossip::spawn(params.keypair, addr, config, None) + .await + .unwrap(); + + let consensus = Consensus::spawn( + ctx.clone(), + consensus_params, + params.timers_config, + gossip.clone(), + proposal_builder, + params.tx_decision, + None, + ) + .await?; + + let node = Node::new(ctx, gossip, consensus, params.start_height); + let actor = node.spawn().await?; + Ok(actor) +} + +pub struct Node { + #[allow(dead_code)] + ctx: Ctx, + gossip: ActorRef, + consensus: ActorRef>, + start_height: Ctx::Height, +} + +impl Node +where + Ctx: Context, + Ctx::Vote: Protobuf, + Ctx::Proposal: Protobuf, +{ + pub fn new( + ctx: Ctx, + gossip: ActorRef, + consensus: ActorRef>, + start_height: Ctx::Height, + ) -> Self { + Self { + ctx, + gossip, + consensus, + start_height, + } + } + + pub async fn spawn(self) -> Result<(ActorRef, JoinHandle<()>), ractor::SpawnErr> { + Actor::spawn(None, self, ()).await + } +} + +pub enum Msg { + Start, +} + +#[async_trait] +impl Actor for Node +where + Ctx: Context, + Ctx::Vote: Protobuf, + Ctx::Proposal: Protobuf, +{ + type Msg = Msg; + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + myself: ActorRef, + _args: (), + ) -> Result<(), ractor::ActorProcessingErr> { + // Set ourselves as the supervisor of the gossip and consensus actors + self.gossip.link(myself.get_cell()); + self.consensus.link(myself.get_cell()); + + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + msg: Self::Msg, + _state: &mut (), + ) -> Result<(), ractor::ActorProcessingErr> { + match msg { + Msg::Start => self + .consensus + .cast(crate::consensus::Msg::StartHeight(self.start_height))?, + } + + Ok(()) + } +} diff --git a/code/actors/src/prelude.rs b/code/actors/src/prelude.rs new file mode 100644 index 000000000..47973738f --- /dev/null +++ b/code/actors/src/prelude.rs @@ -0,0 +1 @@ +pub use ractor::{Actor, ActorCell, ActorId, ActorName, ActorRef}; diff --git a/code/actors/src/proposal_builder.rs b/code/actors/src/proposal_builder.rs new file mode 100644 index 000000000..adead7ac1 --- /dev/null +++ b/code/actors/src/proposal_builder.rs @@ -0,0 +1,92 @@ +use std::marker::PhantomData; +use std::sync::atomic::AtomicPtr; +use std::time::Instant; + +use async_trait::async_trait; +use ractor::{Actor, ActorCell, ActorRef, RpcReplyPort}; + +use malachite_common::{Context, Round}; +use malachite_node::value_builder::ValueBuilder; + +pub struct BuildProposal { + pub height: Ctx::Height, + pub round: Round, + pub deadline: Instant, + pub reply: RpcReplyPort>, +} + +pub struct ProposedValue { + pub height: Ctx::Height, + pub round: Round, + pub value: Option, +} + +pub struct ProposalBuilder { + builder: Box>, + marker: PhantomData>, +} + +impl ProposalBuilder +where + Ctx: Context, +{ + pub async fn spawn( + builder: Box>, + supervisor: Option, + ) -> Result>, ractor::SpawnErr> { + let this = Self { + builder, + marker: PhantomData, + }; + + let (actor_ref, _) = if let Some(supervisor) = supervisor { + Actor::spawn_linked(None, this, (), supervisor).await? + } else { + Actor::spawn(None, this, ()).await? + }; + + Ok(actor_ref) + } +} + +#[async_trait] +impl Actor for ProposalBuilder +where + Ctx: Context, +{ + type Msg = BuildProposal; + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + _myself: ractor::ActorRef, + _args: (), + ) -> Result<(), ractor::ActorProcessingErr> { + Ok(()) + } + + async fn handle( + &self, + _myself: ractor::ActorRef, + msg: Self::Msg, + _state: &mut Self::State, + ) -> Result<(), ractor::ActorProcessingErr> { + let BuildProposal { + height, + round, + deadline, + reply, + } = msg; + + let value = self.builder.build_proposal(height, deadline).await; + + reply.send(ProposedValue { + height, + round, + value, + })?; + + Ok(()) + } +} diff --git a/code/actors/src/timers.rs b/code/actors/src/timers.rs new file mode 100644 index 000000000..3cdcb484a --- /dev/null +++ b/code/actors/src/timers.rs @@ -0,0 +1,223 @@ +use std::collections::HashMap; +use std::time::Duration; + +use async_trait::async_trait; +use ractor::time::send_after; +use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, MessagingErr}; +use tokio::task::JoinHandle; + +use malachite_common::{Timeout, TimeoutStep}; + +pub use malachite_node::config::TimeoutConfig as Config; + +pub struct TimeoutElapsed(Timeout); + +impl TimeoutElapsed { + pub fn timeout(&self) -> Timeout { + self.0 + } +} + +pub struct Timers { + config: Config, + listener: ActorRef, +} + +impl Timers +where + M: From + ractor::Message, +{ + pub async fn spawn( + config: Config, + listener: ActorRef, + ) -> Result<(ActorRef, JoinHandle<()>), ractor::SpawnErr> { + Actor::spawn(None, Self { config, listener }, ()).await + } + + pub async fn spawn_linked( + config: Config, + listener: ActorRef, + supervisor: ActorCell, + ) -> Result<(ActorRef, JoinHandle<()>), ractor::SpawnErr> { + Actor::spawn_linked(None, Self { config, listener }, (), supervisor).await + } + + pub fn timeout_duration(&self, step: &TimeoutStep) -> Duration { + match step { + TimeoutStep::Propose => self.config.timeout_propose, + TimeoutStep::Prevote => self.config.timeout_prevote, + TimeoutStep::Precommit => self.config.timeout_precommit, + TimeoutStep::Commit => self.config.timeout_commit, + } + } +} + +pub enum Msg { + ScheduleTimeout(Timeout), + CancelTimeout(Timeout), + Reset, + + // Internal messages + #[doc(hidden)] + TimeoutElapsed(Timeout), +} + +type TimerTask = JoinHandle>>; + +#[derive(Default)] +pub struct State { + timers: HashMap, +} + +#[async_trait] +impl Actor for Timers +where + M: From + ractor::Message, +{ + type Msg = Msg; + type State = State; + type Arguments = (); + + async fn pre_start( + &self, + _myself: ActorRef, + _args: (), + ) -> Result { + Ok(State::default()) + } + + async fn handle( + &self, + myself: ActorRef, + msg: Msg, + state: &mut State, + ) -> Result<(), ActorProcessingErr> { + match msg { + Msg::ScheduleTimeout(timeout) => { + let duration = self.timeout_duration(&timeout.step); + let task = send_after(duration, myself.get_cell(), move || { + Msg::TimeoutElapsed(timeout) + }); + + state.timers.insert(timeout, task); + } + + Msg::CancelTimeout(timeout) => { + if let Some(task) = state.timers.remove(&timeout) { + task.abort(); + } + } + + Msg::Reset => { + for (_, task) in state.timers.drain() { + task.abort(); + } + } + + Msg::TimeoutElapsed(timeout) => { + state.timers.remove(&timeout); + self.listener.cast(TimeoutElapsed(timeout).into())?; + } + } + + Ok(()) + } + + async fn post_stop( + &self, + _myself: ActorRef, + state: &mut State, + ) -> Result<(), ActorProcessingErr> { + for (_, task) in state.timers.drain() { + task.abort(); + } + + Ok(()) + } +} + +// #[cfg(test)] +// #[allow(non_upper_case_globals)] +// mod tests { +// use malachite_common::Round; +// +// use super::*; +// +// const config: Config = Config { +// propose_timeout: Duration::from_millis(50), +// prevote_timeout: Duration::from_millis(100), +// precommit_timeout: Duration::from_millis(150), +// commit_timeout: Duration::from_millis(200), +// }; +// +// const fn timeouts() -> (Timeout, Timeout, Timeout) { +// let (r0, r1, r2) = (Round::new(0), Round::new(1), Round::new(2)); +// +// ( +// Timeout::new(r0, TimeoutStep::Propose), +// Timeout::new(r1, TimeoutStep::Prevote), +// Timeout::new(r2, TimeoutStep::Precommit), +// ) +// } +// +// #[tokio::test] +// async fn timers_no_cancel() { +// let (t0, t1, t2) = timeouts(); +// +// let (mut timers, mut rx_timeout_elapsed) = Timers::new(config); +// +// timers.schedule_timeout(t1).await; +// timers.schedule_timeout(t0).await; +// timers.schedule_timeout(t2).await; +// assert_eq!(timers.scheduled().await, 3); +// +// assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t0); +// assert_eq!(timers.scheduled().await, 2); +// assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t1); +// assert_eq!(timers.scheduled().await, 1); +// assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t2); +// assert_eq!(timers.scheduled().await, 0); +// } +// +// #[tokio::test] +// async fn timers_cancel_first() { +// let (t0, t1, t2) = timeouts(); +// +// let (mut timers, mut rx_timeout_elapsed) = Timers::new(config); +// +// timers.schedule_timeout(t0).await; +// timers.schedule_timeout(t1).await; +// timers.schedule_timeout(t2).await; +// assert_eq!(timers.scheduled().await, 3); +// +// timers.cancel_timeout(&t0).await; +// assert_eq!(timers.scheduled().await, 2); +// +// assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t1); +// assert_eq!(timers.scheduled().await, 1); +// +// assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t2); +// assert_eq!(timers.scheduled().await, 0); +// } +// +// #[tokio::test] +// async fn timers_cancel_middle() { +// let (t0, t1, t2) = timeouts(); +// +// let (mut timers, mut rx_timeout_elapsed) = Timers::new(config); +// +// timers.schedule_timeout(t2).await; +// timers.schedule_timeout(t1).await; +// timers.schedule_timeout(t0).await; +// assert_eq!(timers.scheduled().await, 3); +// +// assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t0); +// assert_eq!(timers.scheduled().await, 2); +// +// timers.cancel_timeout(&t1).await; +// assert_eq!(timers.scheduled().await, 1); +// +// assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t2); +// assert_eq!(timers.scheduled().await, 0); +// } +// } diff --git a/code/actors/src/util/forward.rs b/code/actors/src/util/forward.rs new file mode 100644 index 000000000..1331db8af --- /dev/null +++ b/code/actors/src/util/forward.rs @@ -0,0 +1,66 @@ +use std::marker::PhantomData; +use std::sync::atomic::AtomicPtr; + +use ractor::{Actor, ActorCell, ActorRef, Message}; + +pub struct Forward { + to: ActorRef, + map: F, + _marker: PhantomData>, +} + +#[ractor::async_trait] +impl Actor for Forward +where + A: Message, + B: Message, + F: Fn(A) -> B + Send + Sync + 'static, +{ + type Msg = A; + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + _myself: ActorRef, + _args: (), + ) -> Result<(), ractor::ActorProcessingErr> { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + msg: A, + _state: &mut (), + ) -> Result<(), ractor::ActorProcessingErr> { + let msg = (self.map)(msg); + self.to.cast(msg)?; + Ok(()) + } +} + +pub async fn forward( + to: ActorRef, + supervisor: Option, + map: F, +) -> Result, ractor::SpawnErr> +where + A: Message, + B: Message, + F: Fn(A) -> B + Send + Sync + 'static, +{ + let actor = Forward { + to, + map: Box::new(map), + _marker: PhantomData, + }; + + let (actor_ref, _) = if let Some(supervisor) = supervisor { + Actor::spawn_linked(None, actor, (), supervisor).await? + } else { + Actor::spawn(None, actor, ()).await? + }; + + Ok(actor_ref) +} diff --git a/code/actors/src/util/make_actor.rs b/code/actors/src/util/make_actor.rs new file mode 100644 index 000000000..07dcfc34f --- /dev/null +++ b/code/actors/src/util/make_actor.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; + +use ractor::ActorRef; +use tokio::sync::mpsc; + +use malachite_common::Round; +use malachite_gossip::Keypair; +use malachite_node::value_builder::test::TestValueBuilder; +use malachite_test::utils::RotateProposer; +use malachite_test::{Address, Height, PrivateKey, TestContext, ValidatorSet, Value}; +use tokio::task::JoinHandle; + +use crate::node::{Msg as NodeMsg, Params as NodeParams}; +use crate::timers::Config as TimersConfig; + +pub async fn make_node_actor( + initial_validator_set: ValidatorSet, + private_key: PrivateKey, + address: Address, + tx_decision: mpsc::Sender<(Height, Round, Value)>, +) -> (ActorRef, JoinHandle<()>) { + let keypair = Keypair::ed25519_from_bytes(private_key.inner().to_bytes()).unwrap(); + let start_height = Height::new(1); + let ctx = TestContext::new(private_key); + let proposer_selector = Arc::new(RotateProposer); + + let value_builder = Box::>::default(); + + let timers_config = TimersConfig::default(); + + let params = NodeParams { + address, + initial_validator_set, + keypair, + proposer_selector, + start_height, + threshold_params: Default::default(), + timers_config, + tx_decision, + value_builder, + }; + + crate::node::spawn(ctx, params).await.unwrap() +} diff --git a/code/actors/src/util/mod.rs b/code/actors/src/util/mod.rs new file mode 100644 index 000000000..e4e0cb8a7 --- /dev/null +++ b/code/actors/src/util/mod.rs @@ -0,0 +1,5 @@ +mod forward; +mod make_actor; + +pub use forward::{forward, Forward}; +pub use make_actor::make_node_actor; diff --git a/code/actors/tests/actor_gossip_n3f0.rs b/code/actors/tests/actor_gossip_n3f0.rs new file mode 100644 index 000000000..bba40019c --- /dev/null +++ b/code/actors/tests/actor_gossip_n3f0.rs @@ -0,0 +1,19 @@ +#![allow(unused_crate_dependencies)] + +#[path = "util.rs"] +mod util; +use util::*; + +#[tokio::test] +pub async fn decide_on_value() { + let nodes = Test::new( + [ + TestNode::correct(5), + TestNode::correct(15), + TestNode::correct(10), + ], + 9, + ); + + run_test(nodes).await +} diff --git a/code/actors/tests/util.rs b/code/actors/tests/util.rs new file mode 100644 index 000000000..ce7c0b019 --- /dev/null +++ b/code/actors/tests/util.rs @@ -0,0 +1,139 @@ +#![allow(dead_code)] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use tokio::sync::mpsc; +use tokio::time::{sleep, Duration}; +use tracing::{error, info}; + +use malachite_common::{Round, VotingPower}; +use malachite_test::utils::make_validators; +use malachite_test::{Height, PrivateKey, Validator, ValidatorSet, Value}; + +use malachite_actors::node::Msg; +use malachite_actors::util::make_node_actor; + +pub const SEED: u64 = 42; +pub const HEIGHTS: u64 = 3; +pub const START_HEIGHT: Height = Height::new(1); +pub const END_HEIGHT: Height = Height::new(START_HEIGHT.as_u64() + HEIGHTS - 1); +pub const TEST_TIMEOUT: Duration = Duration::from_secs(20); + +pub struct Test { + pub nodes: [TestNode; N], + pub validator_set: ValidatorSet, + pub vals_and_keys: [(Validator, PrivateKey); N], + pub expected_decisions: usize, +} + +impl Test { + pub fn new(nodes: [TestNode; N], expected_decisions: usize) -> Self { + let vals_and_keys = make_validators(Self::voting_powers(&nodes)); + let validators = vals_and_keys.iter().map(|(v, _)| v).cloned(); + let validator_set = ValidatorSet::new(validators); + + Self { + nodes, + validator_set, + vals_and_keys, + expected_decisions, + } + } + + pub fn get(&self, index: usize) -> Option<&TestNode> { + self.nodes.get(index) + } + + pub fn voting_powers(nodes: &[TestNode; N]) -> [VotingPower; N] { + let mut voting_powers = [0; N]; + for (i, node) in nodes.iter().enumerate() { + voting_powers[i] = node.voting_power; + } + voting_powers + } +} + +pub struct TestNode { + pub voting_power: VotingPower, +} + +impl TestNode { + pub fn correct(voting_power: VotingPower) -> Self { + Self { voting_power } + } +} + +pub async fn run_test(test: Test) { + tracing_subscriber::fmt::init(); + + let mut handles = Vec::with_capacity(N); + + for (v, sk) in &test.vals_and_keys { + let (tx_decision, rx_decision) = mpsc::channel(HEIGHTS as usize); + + let node = tokio::spawn(make_node_actor( + test.validator_set.clone(), + sk.clone(), + v.address, + tx_decision, + )); + + handles.push((node, rx_decision)); + } + + sleep(Duration::from_secs(5)).await; + + let mut nodes = Vec::with_capacity(handles.len()); + for (handle, rx) in handles { + let node = handle.await.expect("Error: node failed to start"); + nodes.push((node, rx)); + } + + let mut actors = Vec::with_capacity(nodes.len()); + let mut rxs = Vec::with_capacity(nodes.len()); + + for ((actor, _), rx) in nodes { + actor.cast(Msg::Start).unwrap(); + + actors.push(actor); + rxs.push(rx); + } + + let correct_decisions = Arc::new(AtomicUsize::new(0)); + + for (i, mut rx_decision) in rxs.into_iter().enumerate() { + let i = i + 1; + + let correct_decisions = Arc::clone(&correct_decisions); + + tokio::spawn(async move { + for height in START_HEIGHT.as_u64()..=END_HEIGHT.as_u64() { + let decision = rx_decision.recv().await; + let expected = Some((Height::new(height), Round::new(0), Value::new(40 + height))); + + if decision == expected { + info!("[{height}] {i}/{HEIGHTS} correct decision"); + correct_decisions.fetch_add(1, Ordering::Relaxed); + } else { + error!("[{height}] {i}/{HEIGHTS} incorrect decision: expected {expected:?}, got {decision:?}"); + } + } + }); + } + + tokio::time::sleep(TEST_TIMEOUT).await; + + let correct_decisions = correct_decisions.load(Ordering::Relaxed); + + if correct_decisions != test.expected_decisions { + panic!( + "Not all nodes made correct decisions: {}/{}", + correct_decisions, test.expected_decisions + ); + } + + for actor in actors { + actor.stop_and_wait(None, None).await.unwrap(); + } +} diff --git a/code/cli/Cargo.toml b/code/cli/Cargo.toml new file mode 100644 index 000000000..af728f4ff --- /dev/null +++ b/code/cli/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "malachite-cli" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[lints] +workspace = true + +[dependencies] +malachite-actors.workspace = true +malachite-node.workspace = true +malachite-test.workspace = true + +clap = { workspace = true } +color-eyre = { workspace = true } +itertools = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["fmt", "env-filter"] } diff --git a/code/cli/src/logging.rs b/code/cli/src/logging.rs new file mode 100644 index 000000000..205e46498 --- /dev/null +++ b/code/cli/src/logging.rs @@ -0,0 +1,100 @@ +use core::fmt; + +use clap::ValueEnum; +use tracing_subscriber::filter::EnvFilter; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::FmtSubscriber; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, ValueEnum)] +pub enum DebugSection { + Ractor, +} + +#[allow(dead_code)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum LogLevel { + Trace, + Debug, + Info, + Warn, + Error, +} + +impl Default for LogLevel { + fn default() -> Self { + Self::Info + } +} + +impl fmt::Display for LogLevel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + match self { + LogLevel::Trace => write!(f, "trace"), + LogLevel::Debug => write!(f, "debug"), + LogLevel::Info => write!(f, "info"), + LogLevel::Warn => write!(f, "warn"), + LogLevel::Error => write!(f, "error"), + } + } +} + +pub fn init(log_level: LogLevel, debug_sections: &[DebugSection]) { + color_eyre::install().expect("Failed to install global error handler"); + + let filter = build_tracing_filter(log_level, debug_sections); + + // Construct a tracing subscriber with the supplied filter and enable reloading. + let builder = FmtSubscriber::builder() + .with_target(false) + .with_env_filter(filter) + .with_writer(std::io::stdout) + .with_ansi(enable_ansi()) + .with_thread_ids(false); + + let subscriber = builder.finish(); + subscriber.init(); +} + +/// Check if both stdout and stderr are proper terminal (tty), +/// so that we know whether or not to enable colored output, +/// using ANSI escape codes. If either is not, eg. because +/// stdout is redirected to a file, we don't enable colored output. +pub fn enable_ansi() -> bool { + use std::io::IsTerminal; + std::io::stdout().is_terminal() && std::io::stderr().is_terminal() +} + +/// The relayer crates targeted by the default log level. +const TARGET_CRATES: &[&str] = &["malachite"]; + +/// Build a tracing directive setting the log level for the relayer crates to the +/// given `log_level`. +pub fn default_directive(log_level: LogLevel) -> String { + use itertools::Itertools; + + TARGET_CRATES + .iter() + .map(|&c| format!("{c}={log_level}")) + .join(",") +} + +/// Builds a tracing filter based on the input `log_level`. +/// Enables tracing exclusively for the relayer crates. +/// Returns error if the filter failed to build. +fn build_tracing_filter(default_level: LogLevel, debug_sections: &[DebugSection]) -> EnvFilter { + let mut directive = + std::env::var("RUST_LOG").unwrap_or_else(|_| default_directive(default_level)); + + if debug_sections.contains(&DebugSection::Ractor) { + // Enable debug tracing for the `ractor` crate as well + directive.push_str(",ractor=debug"); + } + + // Build the filter directive + match EnvFilter::try_new(&directive) { + Ok(out) => out, + Err(e) => panic!( + "ERROR: unable to initialize Malachite with log filtering directive {directive:?}: {e}" + ), + } +} diff --git a/code/cli/src/main.rs b/code/cli/src/main.rs new file mode 100644 index 000000000..012f7f5ef --- /dev/null +++ b/code/cli/src/main.rs @@ -0,0 +1,75 @@ +use std::time::Duration; + +use clap::Parser; +use logging::DebugSection; +use malachite_actors::node::Msg; +use malachite_actors::util::make_node_actor; +use malachite_test::utils::make_validators; +use malachite_test::ValidatorSet; + +use tracing::info; + +use crate::logging::LogLevel; + +#[derive(clap::Parser)] +pub struct Args { + #[clap( + short, + long, + help = "Index of this node in the validator set (0, 1, or 2)" + )] + pub index: usize, + + #[clap( + short, + long = "debug", + help = "Enable debug output for the given comma-separated sections", + value_enum, + value_delimiter = ',' + )] + debug: Vec, +} + +const VOTING_POWERS: [u64; 3] = [11, 10, 10]; + +mod logging; + +#[tokio::main(flavor = "current_thread")] +pub async fn main() -> Result<(), Box> { + let args = Args::parse(); + let index = args.index; + + logging::init(LogLevel::Debug, &args.debug); + + let vs = make_validators(VOTING_POWERS); + + let (val, sk) = vs[index].clone(); + let (vs, _): (Vec<_>, Vec<_>) = vs.into_iter().unzip(); + let vs = ValidatorSet::new(vs); + + info!("[{index}] Starting..."); + + let (tx_decision, mut rx_decision) = tokio::sync::mpsc::channel(32); + let (actor, handle) = make_node_actor(vs, sk, val.address, tx_decision).await; + + tokio::spawn({ + let actor = actor.clone(); + async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("[{index}] Shutting down..."); + actor.stop(None); + } + }); + + tokio::time::sleep(Duration::from_secs(1)).await; + + actor.cast(Msg::Start)?; + + while let Some((height, round, value)) = rx_decision.recv().await { + info!("[{index}] Decision at height {height} and round {round}: {value:?}",); + } + + handle.await?; + + Ok(()) +} diff --git a/code/common/Cargo.toml b/code/common/Cargo.toml index 35dbbee70..0c01aad8c 100644 --- a/code/common/Cargo.toml +++ b/code/common/Cargo.toml @@ -9,6 +9,9 @@ license.workspace = true publish.workspace = true rust-version.workspace = true +[lints] +workspace = true + [dependencies] -derive-where.workspace = true -signature.workspace = true +derive-where.workspace = true +signature.workspace = true diff --git a/code/common/src/context.rs b/code/common/src/context.rs index a6d6521d0..51e3241d7 100644 --- a/code/common/src/context.rs +++ b/code/common/src/context.rs @@ -1,13 +1,13 @@ use crate::{ - Address, Height, NilOrVal, Proposal, PublicKey, Round, SignedVote, SigningScheme, Validator, - ValidatorSet, Value, ValueId, Vote, + Address, Height, NilOrVal, Proposal, PublicKey, Round, SignedProposal, SignedVote, + SigningScheme, Validator, ValidatorSet, Value, ValueId, Vote, }; /// This trait allows to abstract over the various datatypes /// that are used in the consensus engine. pub trait Context where - Self: Sized, + Self: Sized + Clone + Send + Sync + 'static, { /// The type of address of a validator. type Address: Address; @@ -33,7 +33,7 @@ where /// The signing scheme used to sign votes. type SigningScheme: SigningScheme; - /// Sign the given vote our private key. + /// Sign the given vote with our private key. fn sign_vote(&self, vote: Self::Vote) -> SignedVote; /// Verify the given vote's signature using the given public key. @@ -43,12 +43,23 @@ where public_key: &PublicKey, ) -> bool; + /// Sign the given proposal with our private key. + fn sign_proposal(&self, proposal: Self::Proposal) -> SignedProposal; + + /// Verify the given proposal's signature using the given public key. + fn verify_signed_proposal( + &self, + signed_proposal: &SignedProposal, + public_key: &PublicKey, + ) -> bool; + /// Build a new proposal for the given value at the given height, round and POL round. fn new_proposal( height: Self::Height, round: Round, value: Self::Value, pol_round: Round, + address: Self::Address, ) -> Self::Proposal; /// Build a new prevote vote by the validator with the given address, diff --git a/code/common/src/height.rs b/code/common/src/height.rs index 5f458c123..3eea96d82 100644 --- a/code/common/src/height.rs +++ b/code/common/src/height.rs @@ -1,4 +1,4 @@ -use core::fmt::Debug; +use core::fmt::{Debug, Display}; /// Defines the requirements for a height type. /// @@ -7,6 +7,9 @@ use core::fmt::Debug; /// A height of 0 represents a chain which has not yet produced a block. pub trait Height where - Self: Default + Copy + Clone + Debug + PartialEq + Eq + PartialOrd + Ord, + Self: + Default + Copy + Clone + Debug + Display + PartialEq + Eq + PartialOrd + Ord + Send + Sync, { + /// Increment the height by one. + fn increment(&self) -> Self; } diff --git a/code/common/src/lib.rs b/code/common/src/lib.rs index 3abfccdcd..184f10e5c 100644 --- a/code/common/src/lib.rs +++ b/code/common/src/lib.rs @@ -12,10 +12,13 @@ #![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::panic))] #![cfg_attr(coverage_nightly, feature(coverage_attribute))] +extern crate alloc; + mod context; mod height; mod proposal; mod round; +mod signed_proposal; mod signed_vote; mod signing; mod timeout; @@ -42,6 +45,7 @@ pub use context::Context; pub use height::Height; pub use proposal::Proposal; pub use round::Round; +pub use signed_proposal::SignedProposal; pub use signed_vote::SignedVote; pub use signing::SigningScheme; pub use timeout::{Timeout, TimeoutStep}; diff --git a/code/common/src/proposal.rs b/code/common/src/proposal.rs index 4adcc3b38..c2ba96160 100644 --- a/code/common/src/proposal.rs +++ b/code/common/src/proposal.rs @@ -5,7 +5,7 @@ use crate::{Context, Round}; /// Defines the requirements for a proposal type. pub trait Proposal where - Self: Clone + Debug + PartialEq + Eq, + Self: Clone + Debug + Eq + Send + Sync + 'static, Ctx: Context, { /// The height for which the proposal is for. @@ -19,4 +19,7 @@ where /// The POL round for which the proposal is for. fn pol_round(&self) -> Round; + + /// Address of the validator who issued this proposal + fn validator_address(&self) -> &Ctx::Address; } diff --git a/code/common/src/round.rs b/code/common/src/round.rs index 713c8954d..5001c66de 100644 --- a/code/common/src/round.rs +++ b/code/common/src/round.rs @@ -1,4 +1,4 @@ -use core::cmp; +use core::{cmp, fmt}; /// A round number. /// @@ -72,6 +72,12 @@ impl Ord for Round { } } +impl fmt::Display for Round { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.as_i64().fmt(f) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/code/common/src/signed_proposal.rs b/code/common/src/signed_proposal.rs new file mode 100644 index 000000000..f4b07ef2f --- /dev/null +++ b/code/common/src/signed_proposal.rs @@ -0,0 +1,29 @@ +use derive_where::derive_where; + +use crate::{Context, Signature}; + +/// A signed proposal, ie. a proposal emitted by a validator and signed by its private key. +#[derive_where(Clone, Debug, PartialEq, Eq)] +pub struct SignedProposal +where + Ctx: Context, +{ + /// The proposal. + pub proposal: Ctx::Proposal, + + /// The signature of the proposal. + pub signature: Signature, +} + +impl SignedProposal +where + Ctx: Context, +{ + /// Create a new signed proposal from the given proposal and signature. + pub fn new(proposal: Ctx::Proposal, signature: Signature) -> Self { + Self { + proposal, + signature, + } + } +} diff --git a/code/common/src/signing.rs b/code/common/src/signing.rs index d91caabef..52d56ba6c 100644 --- a/code/common/src/signing.rs +++ b/code/common/src/signing.rs @@ -1,4 +1,5 @@ -use core::fmt::Debug; +use alloc::vec::Vec; +use core::fmt::{Debug, Display}; use signature::{Keypair, Signer, Verifier}; @@ -14,12 +15,25 @@ pub trait SigningScheme where Self: Clone + Debug + Eq, { + /// Errors that can occur when decoding a signature from a byte array. + type DecodingError: Display; + /// The type of signatures produced by this signing scheme. - type Signature: Clone + Debug + Eq; + type Signature: Clone + Debug + Eq + Send + Sync; /// The type of public keys produced by this signing scheme. - type PublicKey: Clone + Debug + Eq + Verifier; + type PublicKey: Clone + Debug + Eq + Send + Sync + Verifier; /// The type of private keys produced by this signing scheme. - type PrivateKey: Clone + Signer + Keypair; + type PrivateKey: Clone + + Send + + Sync + + Signer + + Keypair; + + /// Decode a signature from a byte array. + fn decode_signature(bytes: &[u8]) -> Result; + + /// Encode a signature to a byte array. + fn encode_signature(signature: &Self::Signature) -> Vec; } diff --git a/code/common/src/timeout.rs b/code/common/src/timeout.rs index 30b3b76d6..2b981efe7 100644 --- a/code/common/src/timeout.rs +++ b/code/common/src/timeout.rs @@ -1,20 +1,25 @@ +use core::fmt; + use crate::Round; /// The round step for which the timeout is for. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum TimeoutStep { - /// The timeout is for the propose step. + /// Timeout for the propose step. Propose, - /// The timeout is for the prevote step. + /// Timeout for the prevote step. Prevote, - /// The timeout is for the precommit step. + /// Timeout for the precommit step. Precommit, + + /// Timeout for the commit step. + Commit, } /// A timeout for a round step. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub struct Timeout { /// The round for which the timeout is for. pub round: Round, @@ -25,22 +30,33 @@ pub struct Timeout { impl Timeout { /// Create a new timeout for the given round and step. - pub fn new(round: Round, step: TimeoutStep) -> Self { + pub const fn new(round: Round, step: TimeoutStep) -> Self { Self { round, step } } /// Create a new timeout for the propose step of the given round. - pub fn propose(round: Round) -> Self { + pub const fn propose(round: Round) -> Self { Self::new(round, TimeoutStep::Propose) } /// Create a new timeout for the prevote step of the given round. - pub fn prevote(round: Round) -> Self { + pub const fn prevote(round: Round) -> Self { Self::new(round, TimeoutStep::Prevote) } /// Create a new timeout for the precommit step of the given round. - pub fn precommit(round: Round) -> Self { + pub const fn precommit(round: Round) -> Self { Self::new(round, TimeoutStep::Precommit) } + + /// Create a new timeout for the commit step of the given round. + pub const fn commit(round: Round) -> Self { + Self::new(round, TimeoutStep::Commit) + } +} + +impl fmt::Display for Timeout { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}Timeout({})", self.step, self.round) + } } diff --git a/code/common/src/validator_set.rs b/code/common/src/validator_set.rs index ccc4dd1f6..390600751 100644 --- a/code/common/src/validator_set.rs +++ b/code/common/src/validator_set.rs @@ -10,14 +10,14 @@ pub type VotingPower = u64; /// Defines the requirements for an address. pub trait Address where - Self: Clone + Debug + Display + Eq + Ord, + Self: Clone + Debug + Display + Eq + Ord + Send + Sync, { } /// Defines the requirements for a validator. pub trait Validator where - Self: Clone + Debug + PartialEq + Eq, + Self: Clone + Debug + PartialEq + Eq + Send + Sync, Ctx: Context, { /// The address of the validator, typically derived from its public key. @@ -35,7 +35,7 @@ where /// A validator set is a collection of validators. pub trait ValidatorSet where - Self: Clone + Debug, + Self: Clone + Debug + Send + Sync, Ctx: Context, { /// The total voting power of the validator set. diff --git a/code/common/src/value.rs b/code/common/src/value.rs index a16ac246e..090bd75dc 100644 --- a/code/common/src/value.rs +++ b/code/common/src/value.rs @@ -56,11 +56,11 @@ impl NilOrVal { /// Defines the requirements for the type of value to decide on. pub trait Value where - Self: Clone + Debug + PartialEq + Eq + PartialOrd + Ord, + Self: Clone + Debug + PartialEq + Eq + PartialOrd + Ord + Send + Sync, { /// The type of the ID of the value. /// Typically a representation of the value with a lower memory footprint. - type Id: Clone + Debug + PartialEq + Eq + PartialOrd + Ord; + type Id: Clone + Debug + PartialEq + Eq + PartialOrd + Ord + Send + Sync; /// The ID of the value. fn id(&self) -> Self::Id; diff --git a/code/common/src/vote.rs b/code/common/src/vote.rs index 572beea45..82f9c77a9 100644 --- a/code/common/src/vote.rs +++ b/code/common/src/vote.rs @@ -18,7 +18,7 @@ pub enum VoteType { /// include information about the validator signing it. pub trait Vote where - Self: Clone + Debug + Eq, + Self: Clone + Debug + Eq + Send + Sync + 'static, Ctx: Context, { /// The height for which the vote is for. diff --git a/code/config.toml b/code/config.toml new file mode 100644 index 000000000..475340ff3 --- /dev/null +++ b/code/config.toml @@ -0,0 +1,38 @@ +####################################################################### +### Main Base Config Options ### +####################################################################### + +# A custom human readable name for this node +moniker = "malachite" + +####################################################### +### P2P Configuration Options ### +####################################################### +[p2p] + +# Address to listen for incoming connections +listen_addr = "/ip4/0.0.0.0/udp/0/quic-v1" +# List of nodes to keep persistent connections to +persistent_peers = [] + +####################################################### +### Consensus Configuration Options ### +####################################################### +[consensus] + +# How long we wait for a proposal block before prevoting nil +timeout_propose = "3s" +# How much timeout_propose increases with each round +timeout_propose_delta = "500ms" +# How long we wait after receiving +2/3 prevotes for “anything” (ie. not a single block or nil) +timeout_prevote = "1s" +# How much the timeout_prevote increases with each round +timeout_prevote_delta = "500ms" +# How long we wait after receiving +2/3 precommits for “anything” (ie. not a single block or nil) +timeout_precommit = "1s" +# How much the timeout_precommit increases with each round +timeout_precommit_delta = "500ms" +# How long we wait after committing a block, before starting on the new +# height (this gives us a chance to receive some more precommits, even +# though we already have +2/3). +timeout_commit = "1s" diff --git a/code/driver/Cargo.toml b/code/driver/Cargo.toml index 331370ee3..21634f65a 100644 --- a/code/driver/Cargo.toml +++ b/code/driver/Cargo.toml @@ -9,6 +9,9 @@ license.workspace = true publish.workspace = true rust-version.workspace = true +[lints] +workspace = true + [dependencies] malachite-common = { version = "0.1.0", path = "../common" } malachite-round = { version = "0.1.0", path = "../round" } diff --git a/code/driver/src/driver.rs b/code/driver/src/driver.rs index 059d22621..dfe1087de 100644 --- a/code/driver/src/driver.rs +++ b/code/driver/src/driver.rs @@ -1,4 +1,4 @@ -use alloc::boxed::Box; +use alloc::sync::Arc; use alloc::vec; use alloc::vec::Vec; use core::fmt; @@ -29,11 +29,14 @@ where pub ctx: Ctx, /// The proposer selector. - pub proposer_selector: Box>, + pub proposer_selector: Arc>, /// The address of the node. pub address: Ctx::Address, + /// Quorum thresholds + pub threshold_params: ThresholdParams, + /// The validator set at the current height pub validator_set: Ctx::ValidatorSet, @@ -61,25 +64,43 @@ where pub fn new( ctx: Ctx, height: Ctx::Height, - proposer_selector: impl ProposerSelector + 'static, + proposer_selector: Arc>, validator_set: Ctx::ValidatorSet, address: Ctx::Address, threshold_params: ThresholdParams, ) -> Self { - let votes = VoteKeeper::new(validator_set.total_voting_power(), threshold_params); + let vote_keeper = VoteKeeper::new(validator_set.total_voting_power(), threshold_params); + let round_state = RoundState::new(height, Round::Nil); Self { ctx, - proposer_selector: Box::new(proposer_selector), + proposer_selector, address, + threshold_params, validator_set, - vote_keeper: votes, - round_state: RoundState::new(height, Round::Nil), + vote_keeper, + round_state, proposal: None, pending_input: None, } } + /// Reset votes, round state, pending input and move to new height. + /// TODO: Allow validator set to change + pub fn move_to_height(&mut self, height: Ctx::Height) { + let vote_keeper = VoteKeeper::new( + self.validator_set.total_voting_power(), + self.threshold_params, + ); + + let round_state = RoundState::new(height, Round::Nil); + + self.vote_keeper = vote_keeper; + self.round_state = round_state; + self.proposal = None; + self.pending_input = None; + } + /// Return the height of the consensus. pub fn height(&self) -> Ctx::Height { self.round_state.height @@ -90,11 +111,20 @@ where self.round_state.round } + /// Return a reference to the votekeper + pub fn votes(&self) -> &VoteKeeper { + &self.vote_keeper + } + /// Return the proposer for the current round. - pub fn get_proposer(&self, round: Round) -> Result<&Ctx::Validator, Error> { + pub fn get_proposer( + &self, + height: Ctx::Height, + round: Round, + ) -> Result<&Ctx::Validator, Error> { let address = self .proposer_selector - .select_proposer(round, &self.validator_set); + .select_proposer(height, round, &self.validator_set); let proposer = self .validator_set @@ -192,6 +222,11 @@ where proposal: Ctx::Proposal, validity: Validity, ) -> Result>, Error> { + // Discard proposals from different heights + if self.height() != proposal.height() { + return Ok(None); + } + let round = proposal.round(); match self.multiplex_proposal(proposal, validity) { @@ -201,6 +236,11 @@ where } fn apply_vote(&mut self, vote: Ctx::Vote) -> Result>, Error> { + // Discard votes from different heights + if self.height() != vote.height() { + return Ok(None); + } + let validator = self .validator_set .get_by_address(vote.validator_address()) @@ -226,6 +266,9 @@ where TimeoutStep::Propose => RoundInput::TimeoutPropose, TimeoutStep::Prevote => RoundInput::TimeoutPrevote, TimeoutStep::Precommit => RoundInput::TimeoutPrecommit, + + // The driver never receives a commit timeout, so we can just ignore it. + TimeoutStep::Commit => return Ok(None), }; self.apply_input(timeout.round, input) @@ -240,7 +283,7 @@ where let round_state = core::mem::take(&mut self.round_state); let current_step = round_state.step; - let proposer = self.get_proposer(round_state.round)?; + let proposer = self.get_proposer(round_state.height, round_state.round)?; let info = Info::new(input_round, &self.address, proposer.address()); // Apply the input to the round state machine diff --git a/code/driver/src/input.rs b/code/driver/src/input.rs index 6d25d5248..b8eaeef82 100644 --- a/code/driver/src/input.rs +++ b/code/driver/src/input.rs @@ -1,9 +1,11 @@ use malachite_common::{Context, Round, Timeout}; +use derive_where::derive_where; + use crate::Validity; /// Events that can be received by the [`Driver`](crate::Driver). -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive_where(Clone, Debug, PartialEq, Eq)] pub enum Input where Ctx: Context, diff --git a/code/driver/src/proposer.rs b/code/driver/src/proposer.rs index eb4abbd4a..89f2d35d0 100644 --- a/code/driver/src/proposer.rs +++ b/code/driver/src/proposer.rs @@ -3,6 +3,7 @@ use malachite_common::{Context, Round}; /// Defines how to select a proposer amongst a validator set for a given round. pub trait ProposerSelector where + Self: Send + Sync, Ctx: Context, { /// Select a proposer from the given validator set for the given round. @@ -13,5 +14,10 @@ where /// # Important /// This function must be deterministic! /// For a given round and validator set, it must always return the same proposer. - fn select_proposer(&self, round: Round, validator_set: &Ctx::ValidatorSet) -> Ctx::Address; + fn select_proposer( + &self, + height: Ctx::Height, + round: Round, + validator_set: &Ctx::ValidatorSet, + ) -> Ctx::Address; } diff --git a/code/driver/src/util.rs b/code/driver/src/util.rs index 9428d6f2c..3c413a7b2 100644 --- a/code/driver/src/util.rs +++ b/code/driver/src/util.rs @@ -12,4 +12,13 @@ impl Validity { pub fn is_valid(self) -> bool { self == Validity::Valid } + + /// Returns `Valid` if given true, `Invalid` if given false. + pub fn from_valid(valid: bool) -> Self { + if valid { + Validity::Valid + } else { + Validity::Invalid + } + } } diff --git a/code/gossip/Cargo.toml b/code/gossip/Cargo.toml new file mode 100644 index 000000000..0247835c6 --- /dev/null +++ b/code/gossip/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "malachite-gossip" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[lints] +workspace = true + +[dependencies] +futures = { workspace = true } +libp2p = { workspace = true } +# libp2p-gossipsub = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/code/gossip/src/behaviour.rs b/code/gossip/src/behaviour.rs new file mode 100644 index 000000000..07bc046f8 --- /dev/null +++ b/code/gossip/src/behaviour.rs @@ -0,0 +1,61 @@ +use libp2p::swarm::NetworkBehaviour; +use libp2p::{gossipsub, identify, mdns}; + +pub use libp2p::identity::Keypair; +pub use libp2p::{Multiaddr, PeerId}; + +use crate::PROTOCOL_VERSION; + +#[derive(NetworkBehaviour)] +#[behaviour(to_swarm = "NetworkEvent")] +pub struct Behaviour { + pub identify: identify::Behaviour, + pub mdns: mdns::tokio::Behaviour, + pub gossipsub: gossipsub::Behaviour, +} + +impl Behaviour { + pub fn new(keypair: &Keypair) -> Self { + Self { + identify: identify::Behaviour::new(identify::Config::new( + PROTOCOL_VERSION.to_string(), + keypair.public(), + )), + mdns: mdns::tokio::Behaviour::new( + mdns::Config::default(), + keypair.public().to_peer_id(), + ) + .unwrap(), + gossipsub: gossipsub::Behaviour::new( + gossipsub::MessageAuthenticity::Signed(keypair.clone()), + gossipsub::Config::default(), + ) + .unwrap(), + } + } +} + +#[derive(Debug)] +pub enum NetworkEvent { + Identify(identify::Event), + Mdns(mdns::Event), + GossipSub(gossipsub::Event), +} + +impl From for NetworkEvent { + fn from(event: identify::Event) -> Self { + Self::Identify(event) + } +} + +impl From for NetworkEvent { + fn from(event: mdns::Event) -> Self { + Self::Mdns(event) + } +} + +impl From for NetworkEvent { + fn from(event: gossipsub::Event) -> Self { + Self::GossipSub(event) + } +} diff --git a/code/gossip/src/handle.rs b/code/gossip/src/handle.rs new file mode 100644 index 000000000..996d4852f --- /dev/null +++ b/code/gossip/src/handle.rs @@ -0,0 +1,87 @@ +use tokio::sync::mpsc; +use tokio::task; + +use crate::{BoxError, Channel, CtrlMsg, Event}; + +pub struct RecvHandle { + rx_event: mpsc::Receiver, +} + +impl RecvHandle { + pub async fn recv(&mut self) -> Option { + self.rx_event.recv().await + } +} + +pub struct CtrlHandle { + tx_ctrl: mpsc::Sender, + task_handle: task::JoinHandle<()>, +} + +impl CtrlHandle { + pub async fn broadcast(&self, channel: Channel, data: Vec) -> Result<(), BoxError> { + self.tx_ctrl.send(CtrlMsg::Broadcast(channel, data)).await?; + Ok(()) + } + + pub async fn wait_shutdown(self) -> Result<(), BoxError> { + self.shutdown().await?; + self.join().await?; + Ok(()) + } + + pub async fn shutdown(&self) -> Result<(), BoxError> { + self.tx_ctrl.send(CtrlMsg::Shutdown).await?; + Ok(()) + } + + pub async fn join(self) -> Result<(), BoxError> { + self.task_handle.await?; + Ok(()) + } +} + +pub struct Handle { + recv: RecvHandle, + ctrl: CtrlHandle, +} + +impl Handle { + pub fn new( + tx_ctrl: mpsc::Sender, + rx_event: mpsc::Receiver, + task_handle: task::JoinHandle<()>, + ) -> Handle { + Self { + recv: RecvHandle { rx_event }, + ctrl: CtrlHandle { + tx_ctrl, + task_handle, + }, + } + } + + pub fn split(self) -> (RecvHandle, CtrlHandle) { + (self.recv, self.ctrl) + } + + pub async fn recv(&mut self) -> Option { + self.recv.recv().await + } + + pub async fn broadcast(&self, channel: Channel, data: Vec) -> Result<(), BoxError> { + self.ctrl.broadcast(channel, data).await + } + + pub async fn wait_shutdown(self) -> Result<(), BoxError> { + self.ctrl.wait_shutdown().await + } + + pub async fn shutdown(&self) -> Result<(), BoxError> { + self.ctrl.shutdown().await + } + + pub async fn join(self) -> Result<(), BoxError> { + self.ctrl.join().await + } +} diff --git a/code/gossip/src/lib.rs b/code/gossip/src/lib.rs new file mode 100644 index 000000000..ed8ee427d --- /dev/null +++ b/code/gossip/src/lib.rs @@ -0,0 +1,280 @@ +use core::fmt; +use std::error::Error; +use std::ops::ControlFlow; +use std::time::Duration; + +use futures::StreamExt; +use libp2p::swarm::{self, SwarmEvent}; +use libp2p::{gossipsub, identify, mdns, SwarmBuilder}; +use tokio::sync::mpsc; +use tracing::{debug, error, error_span, Instrument}; + +pub use libp2p::identity::Keypair; +pub use libp2p::{Multiaddr, PeerId}; + +pub mod behaviour; +pub mod handle; +use behaviour::{Behaviour, NetworkEvent}; +use handle::Handle; + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Channel { + Consensus, +} + +impl Channel { + pub fn all() -> &'static [Channel] { + &[Channel::Consensus] + } + + pub fn to_topic(self) -> gossipsub::IdentTopic { + gossipsub::IdentTopic::new(self.as_str()) + } + + pub fn topic_hash(&self) -> gossipsub::TopicHash { + self.to_topic().hash() + } + + pub fn as_str(&self) -> &'static str { + match self { + Channel::Consensus => "/consensus", + } + } + + pub fn has_topic(topic_hash: &gossipsub::TopicHash) -> bool { + Self::all() + .iter() + .any(|channel| &channel.topic_hash() == topic_hash) + } + + pub fn from_topic_hash(topic: &gossipsub::TopicHash) -> Option { + match topic.as_str() { + "/consensus" => Some(Channel::Consensus), + _ => None, + } + } +} + +impl fmt::Display for Channel { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.as_str().fmt(f) + } +} + +const PROTOCOL_VERSION: &str = "malachite-gossip/v1beta1"; + +pub type BoxError = Box; + +#[derive(Clone, Debug)] +pub struct Config { + idle_connection_timeout: Duration, +} + +impl Config { + fn apply(self, cfg: swarm::Config) -> swarm::Config { + cfg.with_idle_connection_timeout(self.idle_connection_timeout) + } +} + +impl Default for Config { + fn default() -> Self { + Self { + idle_connection_timeout: Duration::from_secs(30), + } + } +} + +#[derive(Debug)] +pub enum Event { + Listening(Multiaddr), + Message(PeerId, Channel, Vec), + PeerConnected(PeerId), + PeerDisconnected(PeerId), +} + +#[derive(Debug)] +pub enum CtrlMsg { + Broadcast(Channel, Vec), + Shutdown, +} + +pub async fn spawn(keypair: Keypair, addr: Multiaddr, config: Config) -> Result { + let mut swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_quic() + .with_behaviour(Behaviour::new)? + .with_swarm_config(|cfg| config.apply(cfg)) + .build(); + + for channel in Channel::all() { + swarm + .behaviour_mut() + .gossipsub + .subscribe(&channel.to_topic())?; + } + + swarm.listen_on(addr)?; + + let (tx_event, rx_event) = mpsc::channel(32); + let (tx_ctrl, rx_ctrl) = mpsc::channel(32); + + let peer_id = swarm.local_peer_id(); + let span = error_span!("gossip", peer = %peer_id); + let task_handle = tokio::task::spawn(run(swarm, rx_ctrl, tx_event).instrument(span)); + + Ok(Handle::new(tx_ctrl, rx_event, task_handle)) +} + +async fn run( + mut swarm: swarm::Swarm, + mut rx_ctrl: mpsc::Receiver, + tx_event: mpsc::Sender, +) { + loop { + let result = tokio::select! { + event = swarm.select_next_some() => { + handle_swarm_event(event, &mut swarm, &tx_event).await + } + + Some(ctrl) = rx_ctrl.recv() => { + handle_ctrl_msg(ctrl, &mut swarm).await + } + }; + + match result { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(()) => break, + } + } +} + +async fn handle_ctrl_msg(msg: CtrlMsg, swarm: &mut swarm::Swarm) -> ControlFlow<()> { + match msg { + CtrlMsg::Broadcast(channel, data) => { + let msg_size = data.len(); + + let result = swarm + .behaviour_mut() + .gossipsub + .publish(channel.topic_hash(), data); + + match result { + Ok(message_id) => { + debug!("Broadcasted message {message_id} of {msg_size} bytes"); + } + Err(e) => { + error!("Error broadcasting message: {e}"); + } + } + + ControlFlow::Continue(()) + } + + CtrlMsg::Shutdown => ControlFlow::Break(()), + } +} + +async fn handle_swarm_event( + event: SwarmEvent, + swarm: &mut swarm::Swarm, + tx_event: &mpsc::Sender, +) -> ControlFlow<()> { + match event { + SwarmEvent::NewListenAddr { address, .. } => { + debug!("Node is listening on {address}"); + + if let Err(e) = tx_event.send(Event::Listening(address)).await { + error!("Error sending listening event to handle: {e}"); + return ControlFlow::Break(()); + } + } + + SwarmEvent::Behaviour(NetworkEvent::Identify(identify::Event::Sent { peer_id })) => { + debug!("Sent identity to {peer_id}"); + } + + SwarmEvent::Behaviour(NetworkEvent::Identify(identify::Event::Received { + peer_id, + info: _, + })) => { + debug!("Received identity from {peer_id}"); + } + + SwarmEvent::Behaviour(NetworkEvent::Mdns(mdns::Event::Discovered(peers))) => { + for (peer_id, addr) in peers { + debug!("Discovered peer {peer_id} at {addr}"); + swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + + // if let Err(e) = tx_event.send(HandleEvent::PeerConnected(peer_id)).await { + // error!("Error sending peer connected event to handle: {e}"); + // return ControlFlow::Break(()); + // } + } + } + + SwarmEvent::Behaviour(NetworkEvent::Mdns(mdns::Event::Expired(peers))) => { + for (peer_id, _addr) in peers { + debug!("Expired peer: {peer_id}"); + swarm + .behaviour_mut() + .gossipsub + .remove_explicit_peer(&peer_id); + + // if let Err(e) = tx_event.send(HandleEvent::PeerDisconnected(peer_id)).await { + // error!("Error sending peer disconnected event to handle: {e}"); + // return ControlFlow::Break(()); + // } + } + } + + SwarmEvent::Behaviour(NetworkEvent::GossipSub(gossipsub::Event::Subscribed { + peer_id, + topic: topic_hash, + })) => { + if !Channel::has_topic(&topic_hash) { + debug!("Peer {peer_id} tried to subscribe to unknown topic: {topic_hash}"); + return ControlFlow::Continue(()); + } + + debug!("Peer {peer_id} subscribed to {topic_hash}"); + + if let Err(e) = tx_event.send(Event::PeerConnected(peer_id)).await { + error!("Error sending peer connected event to handle: {e}"); + return ControlFlow::Break(()); + } + } + + SwarmEvent::Behaviour(NetworkEvent::GossipSub(gossipsub::Event::Message { + propagation_source: peer_id, + message_id, + message, + })) => { + let Some(channel) = Channel::from_topic_hash(&message.topic) else { + debug!( + "Received message {message_id} from {peer_id} on different channel: {}", + message.topic + ); + + return ControlFlow::Continue(()); + }; + + debug!( + "Received message {message_id} from {peer_id} on channel {} of {} bytes", + channel, + message.data.len() + ); + + if let Err(e) = tx_event + .send(Event::Message(peer_id, channel, message.data)) + .await + { + error!("Error sending message to handle: {e}"); + return ControlFlow::Break(()); + } + } + + _ => {} + } + + ControlFlow::Continue(()) +} diff --git a/code/itf/tests/consensus/runner.rs b/code/itf/tests/consensus/runner.rs index f45663d50..ac49d3a71 100644 --- a/code/itf/tests/consensus/runner.rs +++ b/code/itf/tests/consensus/runner.rs @@ -81,6 +81,7 @@ impl ItfRunner for ConsensusRunner { input_round, value_from_model(value).unwrap(), Round::Nil, + *some_other_node, ); (data, Input::Proposal(proposal)) } @@ -92,6 +93,7 @@ impl ItfRunner for ConsensusRunner { actual.round, value_from_model(value).unwrap(), Round::new(*valid_round), + *some_other_node, ); (data, Input::ProposalAndPolkaPrevious(proposal)) } @@ -103,6 +105,7 @@ impl ItfRunner for ConsensusRunner { actual.round, value_from_model(value).unwrap(), Round::Nil, + *some_other_node, ); (data, Input::ProposalAndPolkaCurrent(proposal)) } @@ -115,6 +118,7 @@ impl ItfRunner for ConsensusRunner { input_round, value_from_model(value).unwrap(), Round::Nil, + *some_other_node, ); (data, Input::InvalidProposalAndPolkaPrevious(proposal)) } @@ -126,6 +130,7 @@ impl ItfRunner for ConsensusRunner { actual.round, value_from_model(value).unwrap(), Round::Nil, + *some_other_node, ); (data, Input::ProposalAndPrecommitValue(proposal)) } diff --git a/code/network/Cargo.toml b/code/network/Cargo.toml new file mode 100644 index 000000000..e0853a873 --- /dev/null +++ b/code/network/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "malachite-network" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[lints] +workspace = true + +[dependencies] +malachite-proto.workspace = true + +prost = { workspace = true } +prost-types = { workspace = true } +serde = { workspace = true, features = ["derive"] } diff --git a/code/network/src/lib.rs b/code/network/src/lib.rs new file mode 100644 index 000000000..a14a9cc71 --- /dev/null +++ b/code/network/src/lib.rs @@ -0,0 +1,7 @@ +#![cfg_attr(coverage_nightly, feature(coverage_attribute))] + +mod msg; +mod peer_id; + +pub use msg::Msg; +pub use peer_id::PeerId; diff --git a/code/network/src/msg.rs b/code/network/src/msg.rs new file mode 100644 index 000000000..7a016ba01 --- /dev/null +++ b/code/network/src/msg.rs @@ -0,0 +1,53 @@ +use prost::{Message, Name}; +use prost_types::Any; + +use malachite_proto::Error as ProtoError; +use malachite_proto::Protobuf; +use malachite_proto::{SignedProposal, SignedVote}; + +#[derive(Clone, Debug, PartialEq)] +pub enum Msg { + Vote(SignedVote), + Proposal(SignedProposal), +} + +impl Msg { + pub fn from_network_bytes(bytes: &[u8]) -> Result { + Protobuf::from_bytes(bytes) + } + + pub fn to_network_bytes(&self) -> Result, ProtoError> { + Protobuf::to_bytes(self) + } +} + +impl Protobuf for Msg { + type Proto = Any; + + fn from_proto(proto: Self::Proto) -> Result { + if proto.type_url == SignedVote::type_url() { + let vote = SignedVote::decode(proto.value.as_slice())?; + Ok(Msg::Vote(vote)) + } else if proto.type_url == SignedProposal::type_url() { + let proposal = SignedProposal::decode(proto.value.as_slice())?; + Ok(Msg::Proposal(proposal)) + } else { + Err(ProtoError::UnknownMessageType { + type_url: proto.type_url, + }) + } + } + + fn to_proto(&self) -> Result { + Ok(match self { + Msg::Vote(vote) => Any { + type_url: SignedVote::type_url(), + value: vote.encode_to_vec(), + }, + Msg::Proposal(proposal) => Any { + type_url: SignedProposal::type_url(), + value: proposal.encode_to_vec(), + }, + }) + } +} diff --git a/code/network/src/peer_id.rs b/code/network/src/peer_id.rs new file mode 100644 index 000000000..efa14e8e2 --- /dev/null +++ b/code/network/src/peer_id.rs @@ -0,0 +1,33 @@ +use core::fmt; +use std::convert::Infallible; +use std::str::FromStr; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct PeerId(String); + +impl PeerId { + pub fn new(id: impl ToString) -> Self { + Self(id.to_string()) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for PeerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +impl FromStr for PeerId { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_string())) + } +} diff --git a/code/node/Cargo.toml b/code/node/Cargo.toml new file mode 100644 index 000000000..c6052367f --- /dev/null +++ b/code/node/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "malachite-node" +description = "Node for running the Malachite consensus engine" + +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[lints] +workspace = true + +[dependencies] +malachite-common.workspace = true +malachite-network.workspace = true +malachite-test.workspace = true + +async-trait = { workspace = true } +derive-where = { workspace = true } +ed25519-consensus = { workspace = true, features = ["serde"] } +humantime-serde = { workspace = true } +multiaddr = { workspace = true } +tokio = { workspace = true, features = ["full"] } +serde = { workspace = true, features = ["derive"] } diff --git a/code/node/src/config.rs b/code/node/src/config.rs new file mode 100644 index 000000000..3f07560d9 --- /dev/null +++ b/code/node/src/config.rs @@ -0,0 +1,99 @@ +use std::time::Duration; + +use malachite_common::TimeoutStep; +use multiaddr::Multiaddr; +use serde::{Deserialize, Serialize}; + +/// Malachite configuration options +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Config { + /// A custom human readable name for this node + pub moniker: String, + /// P2P configuration options + pub p2p: P2pConfig, + /// Consensus configuration options + pub consensus: ConsensusConfig, +} + +/// P2P configuration options +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct P2pConfig { + // Address to listen for incoming connections + pub listen_addr: Multiaddr, + /// List of nodes to keep persistent connections to + pub persistent_peers: Vec, +} + +/// Consensus configuration options +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ConsensusConfig { + #[serde(flatten)] + pub timeouts: TimeoutConfig, +} + +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub struct TimeoutConfig { + /// How long we wait for a proposal block before prevoting nil + #[serde(with = "humantime_serde")] + pub timeout_propose: Duration, + + /// How much timeout_propose increases with each round + #[serde(with = "humantime_serde")] + pub timeout_propose_delta: Duration, + + /// How long we wait after receiving +2/3 prevotes for “anything” (ie. not a single block or nil) + #[serde(with = "humantime_serde")] + pub timeout_prevote: Duration, + + /// How much the timeout_prevote increases with each round + #[serde(with = "humantime_serde")] + pub timeout_prevote_delta: Duration, + + /// How long we wait after receiving +2/3 precommits for “anything” (ie. not a single block or nil) + #[serde(with = "humantime_serde")] + pub timeout_precommit: Duration, + + /// How much the timeout_precommit increases with each round + #[serde(with = "humantime_serde")] + pub timeout_precommit_delta: Duration, + + /// How long we wait after committing a block, before starting on the new + /// height (this gives us a chance to receive some more precommits, even + /// though we already have +2/3). + #[serde(with = "humantime_serde")] + pub timeout_commit: Duration, +} + +impl TimeoutConfig { + pub fn timeout_duration(&self, step: TimeoutStep) -> Duration { + match step { + TimeoutStep::Propose => self.timeout_propose, + TimeoutStep::Prevote => self.timeout_prevote, + TimeoutStep::Precommit => self.timeout_precommit, + TimeoutStep::Commit => self.timeout_commit, + } + } + + pub fn delta_duration(&self, step: TimeoutStep) -> Option { + match step { + TimeoutStep::Propose => Some(self.timeout_propose_delta), + TimeoutStep::Prevote => Some(self.timeout_prevote_delta), + TimeoutStep::Precommit => Some(self.timeout_precommit_delta), + TimeoutStep::Commit => None, + } + } +} + +impl Default for TimeoutConfig { + fn default() -> Self { + Self { + timeout_propose: Duration::from_secs(3), + timeout_propose_delta: Duration::from_millis(500), + timeout_prevote: Duration::from_secs(1), + timeout_prevote_delta: Duration::from_millis(500), + timeout_precommit: Duration::from_secs(1), + timeout_precommit_delta: Duration::from_millis(500), + timeout_commit: Duration::from_secs(1), + } + } +} diff --git a/code/node/src/lib.rs b/code/node/src/lib.rs new file mode 100644 index 000000000..3a9415ca4 --- /dev/null +++ b/code/node/src/lib.rs @@ -0,0 +1,4 @@ +#![cfg_attr(coverage_nightly, feature(coverage_attribute))] + +pub mod config; +pub mod value_builder; diff --git a/code/node/src/value_builder.rs b/code/node/src/value_builder.rs new file mode 100644 index 000000000..03db09995 --- /dev/null +++ b/code/node/src/value_builder.rs @@ -0,0 +1,35 @@ +use std::marker::PhantomData; +use std::time::Instant; + +use async_trait::async_trait; +use derive_where::derive_where; + +use malachite_common::Context; + +#[async_trait] +pub trait ValueBuilder: Send + Sync + 'static { + async fn build_proposal(&self, height: Ctx::Height, deadline: Instant) -> Option; +} + +pub mod test { + use super::*; + + use malachite_test::{Height, TestContext, Value}; + + #[derive_where(Default)] + pub struct TestValueBuilder { + _phantom: PhantomData, + } + + #[async_trait] + impl ValueBuilder for TestValueBuilder { + async fn build_proposal(&self, height: Height, deadline: Instant) -> Option { + let diff = deadline.duration_since(Instant::now()); + let wait = diff / 2; + + tokio::time::sleep(wait).await; + + Some(Value::new(40 + height.as_u64())) + } + } +} diff --git a/code/proto/Cargo.toml b/code/proto/Cargo.toml new file mode 100644 index 000000000..8105ae134 --- /dev/null +++ b/code/proto/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "malachite-proto" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +malachite-common.workspace = true + +prost.workspace = true +prost-types.workspace = true +thiserror.workspace = true + +[build-dependencies] +prost-build.workspace = true diff --git a/code/proto/build.rs b/code/proto/build.rs new file mode 100644 index 000000000..c02bf2f34 --- /dev/null +++ b/code/proto/build.rs @@ -0,0 +1,9 @@ +use std::io::Result; + +fn main() -> Result<()> { + let mut config = prost_build::Config::new(); + config.enable_type_names(); + config.compile_protos(&["src/malachite.proto"], &["src/"])?; + + Ok(()) +} diff --git a/code/proto/src/impls.rs b/code/proto/src/impls.rs new file mode 100644 index 000000000..b0a9061b5 --- /dev/null +++ b/code/proto/src/impls.rs @@ -0,0 +1,87 @@ +use malachite_common::{Context, Round, SignedProposal, SignedVote, SigningScheme, VoteType}; + +use crate::{self as proto, Error, Protobuf}; + +impl Protobuf for Round { + type Proto = proto::Round; + + fn from_proto(proto: Self::Proto) -> Result { + Ok(Round::new(proto.round)) + } + + fn to_proto(&self) -> Result { + Ok(proto::Round { + round: self.as_i64(), + }) + } +} + +impl Protobuf for SignedVote +where + Ctx::Vote: Protobuf, +{ + type Proto = proto::SignedVote; + + fn from_proto(proto: Self::Proto) -> Result { + let vote = proto + .vote + .ok_or_else(|| Error::missing_field::("vote"))?; + + Ok(Self { + vote: Ctx::Vote::from_proto(vote)?, + signature: Ctx::SigningScheme::decode_signature(&proto.signature) + .map_err(|e| Error::Other(format!("Failed to decode signature: {e}")))?, + }) + } + + fn to_proto(&self) -> Result { + Ok(proto::SignedVote { + vote: Some(self.vote.to_proto()?), + signature: Ctx::SigningScheme::encode_signature(&self.signature), + }) + } +} + +impl From for VoteType { + fn from(vote_type: proto::VoteType) -> Self { + match vote_type { + proto::VoteType::Prevote => VoteType::Prevote, + proto::VoteType::Precommit => VoteType::Precommit, + } + } +} + +impl From for proto::VoteType { + fn from(vote_type: VoteType) -> proto::VoteType { + match vote_type { + VoteType::Prevote => proto::VoteType::Prevote, + VoteType::Precommit => proto::VoteType::Precommit, + } + } +} + +impl Protobuf for SignedProposal +where + Ctx::Proposal: Protobuf, +{ + type Proto = proto::SignedProposal; + + fn from_proto(proto: Self::Proto) -> Result { + let proposal = proto + .proposal + .ok_or_else(|| Error::Other("Missing field `proposal`".to_string()))?; + + Ok(Self { + proposal: Ctx::Proposal::from_proto(proposal)?, + signature: Ctx::SigningScheme::decode_signature(&proto.signature) + .map_err(|e| Error::Other(format!("Failed to decode signature: {e}")))?, + }) + } + + fn to_proto(&self) -> Result { + Ok(proto::SignedProposal { + proposal: Some(self.proposal.to_proto()?), + signature: Ctx::SigningScheme::encode_signature(&self.signature), + }) + } +} diff --git a/code/proto/src/lib.rs b/code/proto/src/lib.rs new file mode 100644 index 000000000..7100a2db8 --- /dev/null +++ b/code/proto/src/lib.rs @@ -0,0 +1,68 @@ +use std::convert::Infallible; + +use thiserror::Error; + +use prost::{DecodeError, EncodeError, Message}; + +include!(concat!(env!("OUT_DIR"), "/malachite.rs")); + +mod impls; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Failed to decode Protobuf message")] + Decode(#[from] DecodeError), + + #[error("Failed to encode Protobuf message")] + Encode(#[from] EncodeError), + + #[error("Unable to decode Protobuf message `{type_url}`: missing field `{field}`")] + MissingField { + type_url: String, + field: &'static str, + }, + + #[error("Unknown message type: `{type_url}`")] + UnknownMessageType { type_url: String }, + + #[error("{0}")] + Other(String), +} + +impl Error { + pub fn missing_field(field: &'static str) -> Self { + let type_url = N::full_name(); + Self::MissingField { type_url, field } + } +} + +impl From for Error { + fn from(s: String) -> Self { + Self::Other(s) + } +} + +impl From for Error { + fn from(_: Infallible) -> Self { + unreachable!() + } +} + +pub trait Protobuf: Sized { + type Proto: Message + Default; + + fn from_proto(proto: Self::Proto) -> Result; + + fn to_proto(&self) -> Result; + + fn from_bytes(bytes: &[u8]) -> Result { + let proto = Self::Proto::decode(bytes)?; + let result = Self::from_proto(proto)?; + Ok(result) + } + + fn to_bytes(&self) -> Result, Error> { + let proto = self.to_proto()?; + Ok(proto.encode_to_vec()) + } +} diff --git a/code/proto/src/malachite.proto b/code/proto/src/malachite.proto new file mode 100644 index 000000000..8553e9c27 --- /dev/null +++ b/code/proto/src/malachite.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; + +package malachite; + +message Height { + uint64 value = 1; +} + +message Address { + bytes value = 1; +} + +message Value { + optional bytes value = 2; +} + +message ValueId { + optional bytes value = 1; +} + +message Round { + int64 round = 1; +} + +enum VoteType { + PREVOTE = 0; + PRECOMMIT = 1; +} + +message Vote { + VoteType vote_type = 1; + Height height = 2; + Round round = 3; + ValueId value = 4; + Address validator_address = 5; +} + +message SignedVote { + Vote vote = 1; + bytes signature = 2; +} + +message Proposal { + Height height = 1; + Round round = 2; + Value value = 3; + Round pol_round = 4; + Address validator_address = 5; +} + +message SignedProposal { + Proposal proposal = 1; + bytes signature = 2; +} + diff --git a/code/round/src/output.rs b/code/round/src/output.rs index 46ff981ed..c90972476 100644 --- a/code/round/src/output.rs +++ b/code/round/src/output.rs @@ -39,8 +39,9 @@ impl Output { round: Round, value: Ctx::Value, pol_round: Round, + address: Ctx::Address, ) -> Self { - Output::Proposal(Ctx::new_proposal(height, round, value, pol_round)) + Output::Proposal(Ctx::new_proposal(height, round, value, pol_round, address)) } /// Build a `Vote` output for a prevote. diff --git a/code/round/src/state_machine.rs b/code/round/src/state_machine.rs index 72426f69a..0ea33aba6 100644 --- a/code/round/src/state_machine.rs +++ b/code/round/src/state_machine.rs @@ -84,7 +84,7 @@ where state.round = round; // We are the proposer - propose_valid_or_get_value(state) + propose_valid_or_get_value(state, info.address) } // L11/L20 @@ -104,7 +104,7 @@ where (Step::Propose, Input::ProposeValue(value)) if this_round => { debug_assert!(info.is_proposer()); - propose(state, value) + propose(state, value, info.address) } // L22 with valid proposal @@ -227,7 +227,7 @@ where /// and ask for a value. /// /// Ref: L15-L18 -pub fn propose_valid_or_get_value(state: State) -> Transition +pub fn propose_valid_or_get_value(state: State, address: &Ctx::Address) -> Transition where Ctx: Context, { @@ -239,7 +239,9 @@ where state.round, round_value.value.clone(), pol_round, + address.clone(), ); + Transition::to(state.with_step(Step::Propose)).with_output(proposal) } None => { @@ -257,11 +259,18 @@ where /// otherwise propose the given value. /// /// Ref: L11/L14 -pub fn propose(state: State, value: Ctx::Value) -> Transition +pub fn propose(state: State, value: Ctx::Value, address: &Ctx::Address) -> Transition where Ctx: Context, { - let proposal = Output::proposal(state.height, state.round, value, Round::Nil); + let proposal = Output::proposal( + state.height, + state.round, + value, + Round::Nil, + address.clone(), + ); + Transition::to(state.with_step(Step::Propose)).with_output(proposal) } diff --git a/code/test/Cargo.toml b/code/test/Cargo.toml index 88acea0e5..7558bc3ff 100644 --- a/code/test/Cargo.toml +++ b/code/test/Cargo.toml @@ -10,13 +10,13 @@ license.workspace = true rust-version.workspace = true [dependencies] -malachite-common = { version = "0.1.0", path = "../common" } -malachite-driver = { version = "0.1.0", path = "../driver" } -malachite-round = { version = "0.1.0", path = "../round" } -malachite-vote = { version = "0.1.0", path = "../vote" } - -futures = { workspace = true, features = ["executor"] } +malachite-common.workspace = true +malachite-driver.workspace = true +malachite-round.workspace = true +malachite-vote.workspace = true +malachite-proto.workspace = true +futures = { workspace = true, features = ["executor"] } ed25519-consensus.workspace = true signature.workspace = true rand.workspace = true diff --git a/code/test/src/address.rs b/code/test/src/address.rs new file mode 100644 index 000000000..4103003c6 --- /dev/null +++ b/code/test/src/address.rs @@ -0,0 +1,68 @@ +use core::fmt; + +use malachite_proto as proto; + +use crate::signing::PublicKey; + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct Address([u8; Self::LENGTH]); + +impl Address { + const LENGTH: usize = 20; + + #[cfg_attr(coverage_nightly, coverage(off))] + pub const fn new(value: [u8; Self::LENGTH]) -> Self { + Self(value) + } + + #[cfg_attr(coverage_nightly, coverage(off))] + pub fn from_public_key(public_key: &PublicKey) -> Self { + let hash = public_key.hash(); + let mut address = [0; Self::LENGTH]; + address.copy_from_slice(&hash[..Self::LENGTH]); + Self(address) + } +} + +impl fmt::Display for Address { + #[cfg_attr(coverage_nightly, coverage(off))] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for byte in self.0.iter() { + write!(f, "{:02x}", byte)?; + } + Ok(()) + } +} + +impl fmt::Debug for Address { + #[cfg_attr(coverage_nightly, coverage(off))] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Address({})", self) + } +} + +impl malachite_common::Address for Address {} + +impl malachite_proto::Protobuf for Address { + type Proto = proto::Address; + + fn from_proto(proto: Self::Proto) -> Result { + if proto.value.len() != Self::LENGTH { + return Err(proto::Error::Other(format!( + "Invalid address length: expected {}, got {}", + Self::LENGTH, + proto.value.len() + ))); + } + + let mut address = [0; Self::LENGTH]; + address.copy_from_slice(&proto.value); + Ok(Self(address)) + } + + fn to_proto(&self) -> Result { + Ok(proto::Address { + value: self.0.to_vec(), + }) + } +} diff --git a/code/test/src/context.rs b/code/test/src/context.rs index 9bfbdca95..28a57af72 100644 --- a/code/test/src/context.rs +++ b/code/test/src/context.rs @@ -1,23 +1,29 @@ +use std::sync::Arc; + use malachite_common::Context; use malachite_common::NilOrVal; use malachite_common::Round; +use malachite_common::SignedProposal; use malachite_common::SignedVote; +use crate::address::*; use crate::height::*; use crate::proposal::*; -use crate::signing::{Ed25519, PrivateKey, PublicKey}; +use crate::signing::*; use crate::validator_set::*; use crate::value::*; use crate::vote::*; #[derive(Clone, Debug)] pub struct TestContext { - private_key: PrivateKey, + private_key: Arc, } impl TestContext { pub fn new(private_key: PrivateKey) -> Self { - Self { private_key } + Self { + private_key: Arc::new(private_key), + } } } @@ -44,8 +50,34 @@ impl Context for TestContext { .is_ok() } - fn new_proposal(height: Height, round: Round, value: Value, pol_round: Round) -> Proposal { - Proposal::new(height, round, value, pol_round) + fn sign_proposal(&self, proposal: Self::Proposal) -> SignedProposal { + use signature::Signer; + let signature = self.private_key.sign(&proposal.to_bytes()); + SignedProposal::new(proposal, signature) + } + + fn verify_signed_proposal( + &self, + signed_proposal: &SignedProposal, + public_key: &PublicKey, + ) -> bool { + use signature::Verifier; + public_key + .verify( + &signed_proposal.proposal.to_bytes(), + &signed_proposal.signature, + ) + .is_ok() + } + + fn new_proposal( + height: Height, + round: Round, + value: Value, + pol_round: Round, + address: Address, + ) -> Proposal { + Proposal::new(height, round, value, pol_round, address) } fn new_prevote( diff --git a/code/test/src/height.rs b/code/test/src/height.rs index 3f4a32c3c..e5720756a 100644 --- a/code/test/src/height.rs +++ b/code/test/src/height.rs @@ -1,15 +1,47 @@ +use core::fmt; + +use malachite_proto as proto; + /// A blockchain height -#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct Height(u64); impl Height { - pub fn new(height: u64) -> Self { + pub const fn new(height: u64) -> Self { Self(height) } - pub fn as_u64(&self) -> u64 { + pub const fn as_u64(&self) -> u64 { self.0 } } -impl malachite_common::Height for Height {} +impl fmt::Display for Height { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl fmt::Debug for Height { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Height({})", self.0) + } +} + +impl malachite_common::Height for Height { + fn increment(&self) -> Self { + Self(self.0 + 1) + } +} + +impl proto::Protobuf for Height { + type Proto = proto::Height; + + fn from_proto(proto: Self::Proto) -> Result { + Ok(Self(proto.value)) + } + + fn to_proto(&self) -> Result { + Ok(proto::Height { value: self.0 }) + } +} diff --git a/code/test/src/lib.rs b/code/test/src/lib.rs index 6c24bcd72..869cfe276 100644 --- a/code/test/src/lib.rs +++ b/code/test/src/lib.rs @@ -2,6 +2,7 @@ #![deny(trivial_casts, trivial_numeric_casts)] #![cfg_attr(coverage_nightly, feature(coverage_attribute))] +mod address; mod context; mod height; mod proposal; @@ -12,6 +13,7 @@ mod vote; pub mod utils; +pub use crate::address::*; pub use crate::context::*; pub use crate::height::*; pub use crate::proposal::*; diff --git a/code/test/src/proposal.rs b/code/test/src/proposal.rs index 407b4be02..ceeff158a 100644 --- a/code/test/src/proposal.rs +++ b/code/test/src/proposal.rs @@ -1,6 +1,7 @@ use malachite_common::Round; +use malachite_proto::{self as proto}; -use crate::{Height, TestContext, Value}; +use crate::{Address, Height, TestContext, Value}; /// A proposal for a value in a round #[derive(Clone, Debug, PartialEq, Eq)] @@ -9,17 +10,29 @@ pub struct Proposal { pub round: Round, pub value: Value, pub pol_round: Round, + pub validator_address: Address, } impl Proposal { - pub fn new(height: Height, round: Round, value: Value, pol_round: Round) -> Self { + pub fn new( + height: Height, + round: Round, + value: Value, + pol_round: Round, + validator_address: Address, + ) -> Self { Self { height, round, value, pol_round, + validator_address, } } + + pub fn to_bytes(&self) -> Vec { + proto::Protobuf::to_bytes(self).unwrap() + } } impl malachite_common::Proposal for Proposal { @@ -38,4 +51,52 @@ impl malachite_common::Proposal for Proposal { fn pol_round(&self) -> Round { self.pol_round } + + fn validator_address(&self) -> &Address { + &self.validator_address + } +} + +impl proto::Protobuf for Proposal { + type Proto = malachite_proto::Proposal; + + fn to_proto(&self) -> Result { + Ok(proto::Proposal { + height: Some(self.height.to_proto()?), + round: Some(self.round.to_proto()?), + value: Some(self.value.to_proto()?), + pol_round: Some(self.pol_round.to_proto()?), + validator_address: Some(self.validator_address.to_proto()?), + }) + } + + fn from_proto(proto: Self::Proto) -> Result { + Ok(Self { + height: Height::from_proto( + proto + .height + .ok_or_else(|| proto::Error::missing_field::("height"))?, + )?, + round: Round::from_proto( + proto + .round + .ok_or_else(|| proto::Error::missing_field::("round"))?, + )?, + value: Value::from_proto( + proto + .value + .ok_or_else(|| proto::Error::missing_field::("value"))?, + )?, + pol_round: Round::from_proto( + proto + .pol_round + .ok_or_else(|| proto::Error::missing_field::("pol_round"))?, + )?, + validator_address: Address::from_proto( + proto.validator_address.ok_or_else(|| { + proto::Error::missing_field::("validator_address") + })?, + )?, + }) + } } diff --git a/code/test/src/signing.rs b/code/test/src/signing.rs index a0564f497..b13be62e7 100644 --- a/code/test/src/signing.rs +++ b/code/test/src/signing.rs @@ -18,9 +18,19 @@ impl Ed25519 { } impl SigningScheme for Ed25519 { + type DecodingError = ed25519_consensus::Error; + type Signature = Signature; type PublicKey = PublicKey; type PrivateKey = PrivateKey; + + fn encode_signature(signature: &Signature) -> Vec { + signature.to_bytes().to_vec() + } + + fn decode_signature(bytes: &[u8]) -> Result { + Signature::try_from(bytes) + } } #[derive(Clone, Debug)] @@ -41,6 +51,11 @@ impl PrivateKey { pub fn public_key(&self) -> PublicKey { PublicKey::new(self.0.verification_key()) } + + #[cfg_attr(coverage_nightly, coverage(off))] + pub fn inner(&self) -> &ed25519_consensus::SigningKey { + &self.0 + } } impl Signer for PrivateKey { @@ -71,6 +86,10 @@ impl PublicKey { hasher.update(self.0.as_bytes()); hasher.finalize().into() } + + pub fn inner(&self) -> &ed25519_consensus::VerificationKey { + &self.0 + } } impl Verifier for PublicKey { diff --git a/code/test/src/utils.rs b/code/test/src/utils.rs index efac6cd8a..ef00c4314 100644 --- a/code/test/src/utils.rs +++ b/code/test/src/utils.rs @@ -13,8 +13,18 @@ use crate::{ pub struct RotateProposer; impl ProposerSelector for RotateProposer { - fn select_proposer(&self, round: Round, validator_set: &ValidatorSet) -> Address { - let proposer_index = round.as_i64() as usize % validator_set.validators.len(); + fn select_proposer( + &self, + height: Height, + round: Round, + validator_set: &ValidatorSet, + ) -> Address { + assert!(round != Round::Nil && round.as_i64() >= 0); + + let height = height.as_u64() as usize; + let round = round.as_i64() as usize; + + let proposer_index = (height - 1 + round) % validator_set.validators.len(); validator_set.validators[proposer_index].address } } @@ -31,7 +41,12 @@ impl FixedProposer { } impl ProposerSelector for FixedProposer { - fn select_proposer(&self, _round: Round, _validator_set: &ValidatorSet) -> Address { + fn select_proposer( + &self, + _height: Height, + _round: Round, + _validator_set: &ValidatorSet, + ) -> Address { self.proposer } } @@ -60,8 +75,13 @@ pub fn new_round_output(round: Round) -> Output { Output::NewRound(Height::new(1), round) } -pub fn proposal_output(round: Round, value: Value, locked_round: Round) -> Output { - let proposal = Proposal::new(Height::new(1), round, value, locked_round); +pub fn proposal_output( + round: Round, + value: Value, + locked_round: Round, + address: Address, +) -> Output { + let proposal = Proposal::new(Height::new(1), round, value, locked_round, address); Output::Propose(proposal) } @@ -70,8 +90,9 @@ pub fn proposal_input( value: Value, locked_round: Round, validity: Validity, + address: Address, ) -> Input { - let proposal = Proposal::new(Height::new(1), round, value, locked_round); + let proposal = Proposal::new(Height::new(1), round, value, locked_round, address); Input::Proposal(proposal, validity) } diff --git a/code/test/src/validator_set.rs b/code/test/src/validator_set.rs index 91b903496..49b9c00f5 100644 --- a/code/test/src/validator_set.rs +++ b/code/test/src/validator_set.rs @@ -1,40 +1,7 @@ -use core::fmt; - use malachite_common::VotingPower; -use crate::{signing::PublicKey, TestContext}; - -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct Address([u8; Self::LENGTH]); - -impl Address { - const LENGTH: usize = 20; - - #[cfg_attr(coverage_nightly, coverage(off))] - pub const fn new(value: [u8; Self::LENGTH]) -> Self { - Self(value) - } - - #[cfg_attr(coverage_nightly, coverage(off))] - pub fn from_public_key(public_key: &PublicKey) -> Self { - let hash = public_key.hash(); - let mut address = [0; Self::LENGTH]; - address.copy_from_slice(&hash[..Self::LENGTH]); - Self(address) - } -} - -impl fmt::Display for Address { - #[cfg_attr(coverage_nightly, coverage(off))] - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for byte in self.0.iter() { - write!(f, "{:02x}", byte)?; - } - Ok(()) - } -} - -impl malachite_common::Address for Address {} +use crate::signing::PublicKey; +use crate::{Address, TestContext}; /// A validator is a public key and voting power #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/code/test/src/value.rs b/code/test/src/value.rs index bed49dbf6..fac05ef74 100644 --- a/code/test/src/value.rs +++ b/code/test/src/value.rs @@ -1,3 +1,5 @@ +use malachite_proto::{self as proto}; + #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Copy)] pub struct ValueId(u64); @@ -17,6 +19,27 @@ impl From for ValueId { } } +impl proto::Protobuf for ValueId { + type Proto = proto::ValueId; + + fn from_proto(proto: Self::Proto) -> Result { + let bytes = proto + .value + .ok_or_else(|| proto::Error::missing_field::("value"))?; + + let bytes = <[u8; 8]>::try_from(bytes) + .map_err(|_| proto::Error::Other("Invalid value length".to_string()))?; + + Ok(ValueId::new(u64::from_be_bytes(bytes))) + } + + fn to_proto(&self) -> Result { + Ok(proto::ValueId { + value: Some(self.0.to_be_bytes().to_vec()), + }) + } +} + /// The value to decide on #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct Value(u64); @@ -42,3 +65,24 @@ impl malachite_common::Value for Value { self.id() } } + +impl proto::Protobuf for Value { + type Proto = proto::Value; + + fn from_proto(proto: Self::Proto) -> Result { + let bytes = proto + .value + .ok_or_else(|| proto::Error::missing_field::("value"))?; + + let bytes = <[u8; 8]>::try_from(bytes) + .map_err(|_| proto::Error::Other("Invalid value length".to_string()))?; + + Ok(Value::new(u64::from_be_bytes(bytes))) + } + + fn to_proto(&self) -> Result { + Ok(proto::Value { + value: Some(self.0.to_be_bytes().to_vec()), + }) + } +} diff --git a/code/test/src/vote.rs b/code/test/src/vote.rs index 31b9a250d..32f1c233c 100644 --- a/code/test/src/vote.rs +++ b/code/test/src/vote.rs @@ -1,6 +1,7 @@ use signature::Signer; use malachite_common::{NilOrVal, Round, SignedVote, VoteType}; +use malachite_proto::{self as proto}; use crate::{Address, Height, PrivateKey, TestContext, ValueId}; @@ -46,21 +47,7 @@ impl Vote { } pub fn to_bytes(&self) -> Vec { - let vtpe = match self.typ { - VoteType::Prevote => 0, - VoteType::Precommit => 1, - }; - - let mut bytes = vec![vtpe]; - bytes.extend_from_slice(&self.round.as_i64().to_be_bytes()); - bytes.extend_from_slice( - self.value - .as_ref() - .map(|v| v.as_u64().to_be_bytes()) - .value_or_default() - .as_slice(), - ); - bytes + proto::Protobuf::to_bytes(self).unwrap() } pub fn signed(self, private_key: &PrivateKey) -> SignedVote { @@ -98,3 +85,45 @@ impl malachite_common::Vote for Vote { &self.validator_address } } + +impl proto::Protobuf for Vote { + type Proto = proto::Vote; + + fn from_proto(proto: Self::Proto) -> Result { + Ok(Self { + typ: VoteType::from(proto.vote_type()), + height: Height::from_proto( + proto + .height + .ok_or_else(|| proto::Error::missing_field::("height"))?, + )?, + round: Round::from_proto( + proto + .round + .ok_or_else(|| proto::Error::missing_field::("round"))?, + )?, + value: match proto.value { + Some(value) => NilOrVal::Val(ValueId::from_proto(value)?), + None => NilOrVal::Nil, + }, + validator_address: Address::from_proto( + proto.validator_address.ok_or_else(|| { + proto::Error::missing_field::("validator_address") + })?, + )?, + }) + } + + fn to_proto(&self) -> Result { + Ok(proto::Vote { + vote_type: proto::VoteType::from(self.typ).into(), + height: Some(self.height.to_proto()?), + round: Some(self.round.to_proto()?), + value: match &self.value { + NilOrVal::Nil => None, + NilOrVal::Val(v) => Some(v.to_proto()?), + }, + validator_address: Some(self.validator_address.to_proto()?), + }) + } +} diff --git a/code/test/tests/driver.rs b/code/test/tests/driver.rs index a8ed5d1ac..309fdb382 100644 --- a/code/test/tests/driver.rs +++ b/code/test/tests/driver.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use malachite_test::utils::{make_validators, FixedProposer, RotateProposer}; use malachite_common::{NilOrVal, Round, Timeout, TimeoutStep}; @@ -34,12 +36,18 @@ fn driver_steps_proposer() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = FixedProposer::new(my_addr); + let sel = Arc::new(FixedProposer::new(my_addr)); let vs = ValidatorSet::new(vec![v1, v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); - let proposal = Proposal::new(Height::new(1), Round::new(0), value, Round::new(-1)); + let proposal = Proposal::new( + Height::new(1), + Round::new(0), + value, + Round::new(-1), + my_addr, + ); let steps = vec![ TestStep { @@ -241,7 +249,7 @@ fn driver_steps_proposer_timeout_get_value() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = FixedProposer::new(my_addr); + let sel = Arc::new(FixedProposer::new(my_addr)); let vs = ValidatorSet::new(vec![v1, v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -303,12 +311,18 @@ fn driver_steps_not_proposer_valid() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = FixedProposer::new(v1.address); + let sel = Arc::new(FixedProposer::new(v1.address)); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); - let proposal = Proposal::new(Height::new(1), Round::new(0), value, Round::new(-1)); + let proposal = Proposal::new( + Height::new(1), + Round::new(0), + value, + Round::new(-1), + v1.address, + ); let steps = vec![ TestStep { @@ -493,12 +507,18 @@ fn driver_steps_not_proposer_invalid() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = FixedProposer::new(v1.address); + let sel = Arc::new(FixedProposer::new(v1.address)); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); - let proposal = Proposal::new(Height::new(1), Round::new(0), value, Round::new(-1)); + let proposal = Proposal::new( + Height::new(1), + Round::new(0), + value, + Round::new(-1), + v1.address, + ); let steps = vec![ TestStep { @@ -609,13 +629,19 @@ fn driver_steps_not_proposer_other_height() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = FixedProposer::new(v1.address); + let sel = Arc::new(FixedProposer::new(v1.address)); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); // Proposal is for another height - let proposal = Proposal::new(Height::new(2), Round::new(0), value, Round::new(-1)); + let proposal = Proposal::new( + Height::new(2), + Round::new(0), + value, + Round::new(-1), + v1.address, + ); let steps = vec![ TestStep { @@ -662,13 +688,19 @@ fn driver_steps_not_proposer_other_round() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = FixedProposer::new(v1.address); + let sel = Arc::new(FixedProposer::new(v1.address)); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); // Proposal is for another round - let proposal = Proposal::new(Height::new(1), Round::new(1), value, Round::new(-1)); + let proposal = Proposal::new( + Height::new(1), + Round::new(1), + value, + Round::new(-1), + v2.address, + ); let steps = vec![ TestStep { @@ -715,7 +747,7 @@ fn driver_steps_not_proposer_timeout_multiple_rounds() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = FixedProposer::new(v1.address); + let sel = Arc::new(FixedProposer::new(v1.address)); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -915,7 +947,7 @@ fn driver_steps_no_value_to_propose() { let ctx = TestContext::new(my_sk.clone()); // We are the proposer - let sel = FixedProposer::new(v1.address); + let sel = Arc::new(FixedProposer::new(v1.address)); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -947,7 +979,7 @@ fn driver_steps_proposer_not_found() { let ctx = TestContext::new(my_sk.clone()); // Proposer is v1, which is not in the validator set - let sel = FixedProposer::new(v1.address); + let sel = Arc::new(FixedProposer::new(v1.address)); let vs = ValidatorSet::new(vec![v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -968,7 +1000,7 @@ fn driver_steps_validator_not_found() { let ctx = TestContext::new(my_sk.clone()); // Proposer is v1 - let sel = FixedProposer::new(v1.address); + let sel = Arc::new(FixedProposer::new(v1.address)); // We omit v2 from the validator set let vs = ValidatorSet::new(vec![v1.clone(), v3.clone()]); @@ -994,7 +1026,7 @@ fn driver_steps_validator_not_found() { fn driver_steps_skip_round_skip_threshold() { let value = Value::new(9999); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let [(v1, _sk1), (v2, _sk2), (v3, sk3)] = make_validators([1, 1, 1]); @@ -1107,7 +1139,7 @@ fn driver_steps_skip_round_skip_threshold() { fn driver_steps_skip_round_quorum_threshold() { let value = Value::new(9999); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let [(v1, _sk1), (v2, _sk2), (v3, sk3)] = make_validators([1, 2, 1]); diff --git a/code/test/tests/driver_extra.rs b/code/test/tests/driver_extra.rs index 417d233fa..c41937002 100644 --- a/code/test/tests/driver_extra.rs +++ b/code/test/tests/driver_extra.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use malachite_common::Round; use malachite_driver::{Driver, Input, Output, Validity}; use malachite_round::state::State; @@ -77,7 +79,7 @@ fn driver_steps_decide_current_with_no_locked_no_valid() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -106,7 +108,13 @@ fn driver_steps_decide_current_with_no_locked_no_valid() { }, TestStep { desc: "Receive proposal", - input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid), + input: proposal_input( + Round::new(0), + value, + Round::Nil, + Validity::Valid, + v1.address, + ), expected_outputs: vec![decide_output(Round::new(0), value)], expected_round: Round::new(0), new_state: decided_state(Round::new(0), value), @@ -146,7 +154,7 @@ fn driver_steps_decide_previous_with_no_locked_no_valid() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -203,7 +211,13 @@ fn driver_steps_decide_previous_with_no_locked_no_valid() { }, TestStep { desc: "Receive proposal", - input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid), + input: proposal_input( + Round::new(0), + value, + Round::Nil, + Validity::Valid, + v1.address, + ), expected_outputs: vec![decide_output(Round::new(0), value)], expected_round: Round::new(1), new_state: decided_state(Round::new(1), value), @@ -243,11 +257,13 @@ fn driver_steps_decide_previous_with_locked_and_valid() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); + let proposal = Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v1.address); + let steps = vec![ TestStep { desc: "Start round 0, we, v3, are not the proposer, start timeout propose", @@ -279,7 +295,13 @@ fn driver_steps_decide_previous_with_locked_and_valid() { }, TestStep { desc: "Receive proposal, L37-L43", - input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid), + input: proposal_input( + Round::new(0), + value, + Round::Nil, + Validity::Valid, + v1.address, + ), expected_outputs: vec![precommit_output( Round::new(0), Value::new(9999), @@ -288,7 +310,7 @@ fn driver_steps_decide_previous_with_locked_and_valid() { expected_round: Round::new(0), new_state: precommit_state_with_proposal_and_locked_and_valid( Round::new(0), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + proposal.clone(), ), }, TestStep { @@ -298,7 +320,7 @@ fn driver_steps_decide_previous_with_locked_and_valid() { expected_round: Round::new(1), new_state: new_round_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + proposal.clone(), ), }, TestStep { @@ -308,7 +330,7 @@ fn driver_steps_decide_previous_with_locked_and_valid() { expected_round: Round::new(1), new_state: propose_state_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + proposal.clone(), ), }, TestStep { @@ -318,7 +340,7 @@ fn driver_steps_decide_previous_with_locked_and_valid() { expected_round: Round::new(1), new_state: propose_state_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + proposal.clone(), ), }, TestStep { @@ -328,7 +350,7 @@ fn driver_steps_decide_previous_with_locked_and_valid() { expected_round: Round::new(1), new_state: decided_state_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(1), value, Round::Nil), + proposal.clone(), ), }, ]; @@ -368,7 +390,7 @@ fn driver_steps_polka_previous_with_locked() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -383,7 +405,13 @@ fn driver_steps_polka_previous_with_locked() { }, TestStep { desc: "receive a proposal from v1 - L22 send prevote", - input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid), + input: proposal_input( + Round::new(0), + value, + Round::Nil, + Validity::Valid, + v1.address, + ), expected_outputs: vec![prevote_output(Round::new(0), &my_addr)], expected_round: Round::new(0), new_state: prevote_state( @@ -408,7 +436,7 @@ fn driver_steps_polka_previous_with_locked() { expected_round: Round::new(0), new_state: precommit_state_with_proposal_and_locked_and_valid( Round::new(0), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v1.address), ), }, TestStep { @@ -418,27 +446,44 @@ fn driver_steps_polka_previous_with_locked() { expected_round: Round::new(1), new_state: new_round_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v1.address), ), }, TestStep { desc: "start round 1, we are proposer with a valid value, propose it", input: new_round_input(Round::new(1)), - expected_outputs: vec![proposal_output(Round::new(1), value, Round::new(0))], + expected_outputs: vec![proposal_output( + Round::new(1), + value, + Round::new(0), + v2.address, + )], expected_round: Round::new(1), new_state: propose_state_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v2.address), ), }, TestStep { desc: "Receive our own proposal", - input: proposal_input(Round::new(1), value, Round::new(0), Validity::Valid), - expected_outputs: vec![prevote_output(Round::new(1), &my_addr)], + input: proposal_input( + Round::new(1), + value, + Round::new(0), + Validity::Valid, + v1.address, + ), + expected_outputs: vec![prevote_output(Round::new(1), &v2.address)], expected_round: Round::new(1), new_state: prevote_state_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(1), value, Round::new(0)), + Proposal::new( + Height::new(1), + Round::new(1), + value, + Round::new(0), + v2.address, + ), ), }, ]; @@ -476,7 +521,7 @@ fn driver_steps_polka_previous_invalid_proposal() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -526,7 +571,13 @@ fn driver_steps_polka_previous_invalid_proposal() { }, TestStep { desc: "receive an invalid proposal for POL round 0", - input: proposal_input(Round::new(1), value, Round::new(0), Validity::Invalid), + input: proposal_input( + Round::new(1), + value, + Round::new(0), + Validity::Invalid, + v1.address, + ), expected_outputs: vec![prevote_nil_output(Round::new(1), &my_addr)], expected_round: Round::new(1), new_state: prevote_state(Round::new(1)), @@ -567,7 +618,7 @@ fn driver_steps_polka_previous_new_proposal() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -582,7 +633,13 @@ fn driver_steps_polka_previous_new_proposal() { }, TestStep { desc: "receive a valid proposal for round 0", - input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid), + input: proposal_input( + Round::new(0), + value, + Round::Nil, + Validity::Valid, + v1.address, + ), expected_outputs: vec![prevote_output(Round::new(0), &my_addr)], expected_round: Round::new(0), new_state: prevote_state(Round::new(0)), @@ -601,7 +658,7 @@ fn driver_steps_polka_previous_new_proposal() { expected_round: Round::new(0), new_state: precommit_state_with_proposal_and_locked_and_valid( Round::new(0), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v1.address), ), }, TestStep { @@ -611,7 +668,7 @@ fn driver_steps_polka_previous_new_proposal() { expected_round: Round::new(1), new_state: new_round_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v1.address), ), }, TestStep { @@ -621,17 +678,29 @@ fn driver_steps_polka_previous_new_proposal() { expected_round: Round::new(1), new_state: propose_state_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v1.address), ), }, TestStep { desc: "receive a valid proposal for round 1 with different value", - input: proposal_input(Round::new(1), other_value, Round::Nil, Validity::Valid), + input: proposal_input( + Round::new(1), + other_value, + Round::Nil, + Validity::Valid, + v1.address, + ), expected_outputs: vec![prevote_nil_output(Round::new(1), &my_addr)], expected_round: Round::new(1), new_state: prevote_state_with_proposal_and_locked_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(1), value, Round::new(0)), + Proposal::new( + Height::new(1), + Round::new(1), + value, + Round::new(0), + v1.address, + ), ), }, ]; @@ -678,7 +747,7 @@ fn driver_steps_polka_previous_with_no_locked() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -692,7 +761,7 @@ fn driver_steps_polka_previous_with_no_locked() { new_state: propose_state(Round::new(0)), }, TestStep { - desc: "Timeout propopse, prevote for nil (v2)", + desc: "Timeout propose, prevote for nil (v2)", input: timeout_propose_input(Round::new(0)), expected_outputs: vec![prevote_nil_output(Round::new(0), &my_addr)], expected_round: Round::new(0), @@ -721,13 +790,19 @@ fn driver_steps_polka_previous_with_no_locked() { }, TestStep { desc: "receive a proposal - L36, we don't lock, we set valid", - input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid), + input: proposal_input( + Round::new(0), + value, + Round::Nil, + Validity::Valid, + v3.address, + ), expected_outputs: vec![], expected_round: Round::new(0), new_state: precommit_state_with_proposal_and_valid( Round::new(0), Round::new(0), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v3.address), ), }, TestStep { @@ -737,29 +812,40 @@ fn driver_steps_polka_previous_with_no_locked() { expected_round: Round::new(1), new_state: new_round_with_proposal_and_valid( Round::new(1), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v3.address), ), }, TestStep { desc: "start round 1, we are proposer with a valid value from round 0, propose it", input: new_round_input(Round::new(1)), - expected_outputs: vec![proposal_output(Round::new(1), value, Round::new(0))], + expected_outputs: vec![proposal_output( + Round::new(1), + value, + Round::new(0), + v2.address, + )], expected_round: Round::new(1), new_state: propose_state_with_proposal_and_valid( Round::new(1), Round::new(0), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v2.address), ), }, TestStep { desc: "Receive our own proposal, prevote nil as we are not locked on the value", - input: proposal_input(Round::new(1), value, Round::new(0), Validity::Valid), + input: proposal_input( + Round::new(1), + value, + Round::new(0), + Validity::Valid, + v2.address, + ), expected_outputs: vec![prevote_nil_output(Round::new(1), &my_addr)], expected_round: Round::new(1), new_state: prevote_state_with_proposal_and_valid( Round::new(1), Round::new(0), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v2.address), ), }, ]; @@ -787,7 +873,7 @@ fn driver_steps_polka_nil_and_timout_propose() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -851,12 +937,13 @@ fn driver_steps_polka_value_then_proposal() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); - let steps = vec![ + let steps = + vec![ TestStep { desc: "Start round 0, we, v3, are not the proposer, start timeout propose", input: new_round_input(Round::new(0)), @@ -880,7 +967,7 @@ fn driver_steps_polka_value_then_proposal() { }, TestStep { desc: "receive a proposal from v1 - L22 send prevote", - input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid), + input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid, v1.address), expected_outputs: vec![ prevote_output(Round::new(0), &my_addr), precommit_output(Round::new(0), value, &my_addr), @@ -888,7 +975,7 @@ fn driver_steps_polka_value_then_proposal() { expected_round: Round::new(0), new_state: precommit_state_with_proposal_and_locked_and_valid( Round::new(0), - Proposal::new(Height::new(1), Round::new(0), value, Round::Nil), + Proposal::new(Height::new(1), Round::new(0), value, Round::Nil, v1.address), ), }, ]; @@ -918,7 +1005,7 @@ fn driver_steps_polka_any_then_proposal_other() { let height = Height::new(1); let ctx = TestContext::new(my_sk.clone()); - let sel = RotateProposer; + let sel = Arc::new(RotateProposer); let vs = ValidatorSet::new(vec![v1.clone(), v2.clone(), v3.clone()]); let mut driver = Driver::new(ctx, height, sel, vs, my_addr, Default::default()); @@ -947,7 +1034,7 @@ fn driver_steps_polka_any_then_proposal_other() { }, TestStep { desc: "receive a proposal from v1 - L22 send prevote, replay polkaAny, start timeout prevote", - input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid), + input: proposal_input(Round::new(0), value, Round::Nil, Validity::Valid, v1.address), expected_outputs: vec![ prevote_output(Round::new(0), &my_addr), start_prevote_timer_output(Round::new(0)) diff --git a/code/test/tests/round.rs b/code/test/tests/round.rs index ee2dd22fc..4b8a5b297 100644 --- a/code/test/tests/round.rs +++ b/code/test/tests/round.rs @@ -39,7 +39,13 @@ fn test_propose() { assert_eq!(transition.next_state, state); assert_eq!( transition.output.unwrap(), - Output::proposal(Height::new(10), Round::new(0), Value::new(42), Round::Nil) + Output::proposal( + Height::new(10), + Round::new(0), + Value::new(42), + Round::Nil, + ADDRESS + ) ); } @@ -79,6 +85,7 @@ fn test_prevote() { Round::new(1), value, Round::Nil, + OTHER_ADDRESS, )), ); @@ -106,7 +113,13 @@ fn test_input_message_while_commit_step() { ..Default::default() }; - let proposal = Proposal::new(Height::new(1), Round::new(1), value, Round::Nil); + let proposal = Proposal::new( + Height::new(1), + Round::new(1), + value, + Round::Nil, + OTHER_ADDRESS, + ); let data = Info::new(round, &ADDRESS, &OTHER_ADDRESS);