From 2b4e3b567359d083e94c6f4c68729d17a18a2605 Mon Sep 17 00:00:00 2001 From: Greg Szabo Date: Mon, 16 Dec 2024 10:10:48 -0500 Subject: [PATCH 01/19] initial commit --- code/Cargo.lock | 24 +++ code/Cargo.toml | 3 + code/crates/app-channel/Cargo.toml | 1 + code/crates/app-channel/src/channel.rs | 17 +- code/crates/app-channel/src/lib.rs | 2 +- code/crates/app-channel/src/run.rs | 26 +-- code/crates/app-channel/src/spawn.rs | 48 ++++- code/crates/test/src/codec/testcodec.rs | 1 + code/crates/test/src/height.rs | 8 +- code/crates/test/src/proposal_part.rs | 6 + code/examples/channel/Cargo.toml | 31 +++ code/examples/channel/src/main.rs | 103 ++++++++++ code/examples/channel/src/node.rs | 242 ++++++++++++++++++++++++ code/examples/channel/src/state.rs | 162 ++++++++++++++++ 14 files changed, 652 insertions(+), 22 deletions(-) create mode 100644 code/examples/channel/Cargo.toml create mode 100644 code/examples/channel/src/main.rs create mode 100644 code/examples/channel/src/node.rs create mode 100644 code/examples/channel/src/state.rs diff --git a/code/Cargo.lock b/code/Cargo.lock index d0c2f76d3..345b8d096 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2803,6 +2803,7 @@ dependencies = [ "eyre", "malachite-actors", "malachite-app", + "malachite-config", "malachite-test", "ractor", "rand", @@ -2931,6 +2932,29 @@ dependencies = [ "malachite-vote", ] +[[package]] +name = "malachite-example-channel" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "color-eyre", + "eyre", + "libp2p-identity", + "malachite-actors", + "malachite-app", + "malachite-app-channel", + "malachite-cli", + "malachite-common", + "malachite-config", + "malachite-consensus", + "malachite-test", + "rand", + "serde_json", + "sha3", + "tracing", +] + [[package]] name = "malachite-gossip-consensus" version = "0.1.0" diff --git a/code/Cargo.toml b/code/Cargo.toml index 73f9c9e98..2e2c03f05 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -34,6 +34,9 @@ members = [ # Starknet "crates/starknet/*", "crates/starknet/*", + + # Examples + "examples/channel", ] [workspace.package] diff --git a/code/crates/app-channel/Cargo.toml b/code/crates/app-channel/Cargo.toml index 5514eea90..57075317e 100644 --- a/code/crates/app-channel/Cargo.toml +++ b/code/crates/app-channel/Cargo.toml @@ -17,6 +17,7 @@ tracing.workspace = true malachite-app.workspace = true malachite-actors.workspace = true +malachite-config.workspace = true [lints] workspace = true diff --git a/code/crates/app-channel/src/channel.rs b/code/crates/app-channel/src/channel.rs index c68ef7b38..7d3bf92b0 100644 --- a/code/crates/app-channel/src/channel.rs +++ b/code/crates/app-channel/src/channel.rs @@ -2,6 +2,7 @@ use std::time::Duration; use bytes::Bytes; use derive_where::derive_where; +use tokio::sync::mpsc; use tokio::sync::oneshot; use crate::app::types::core::{CommitCertificate, Context, Round, ValueId}; @@ -9,6 +10,14 @@ use crate::app::types::streaming::StreamMessage; use crate::app::types::sync::DecidedValue; use crate::app::types::{LocallyProposedValue, PeerId, ProposedValue}; +use malachite_actors::consensus::Msg as ConsensusActorMsg; + +/// Channels created for application consumption +pub struct Channels { + pub consensus: mpsc::Receiver>, + pub consensus_gossip: mpsc::Sender>, +} + /// Messages sent from consensus to the application. #[derive_where(Debug)] pub enum AppMsg { @@ -88,8 +97,6 @@ pub enum ConsensusMsg { StartHeight(Ctx::Height, Ctx::ValidatorSet), } -use malachite_actors::consensus::Msg as ConsensusActorMsg; - impl From> for ConsensusActorMsg { fn from(msg: ConsensusMsg) -> ConsensusActorMsg { match msg { @@ -99,3 +106,9 @@ impl From> for ConsensusActorMsg { } } } + +/// Messages sent from the application to consensus gossip. +#[derive_where(Debug)] +pub enum ConsensusGossipMsg { + PublishProposalPart(StreamMessage), +} diff --git a/code/crates/app-channel/src/lib.rs b/code/crates/app-channel/src/lib.rs index 81e05b561..0549570d6 100644 --- a/code/crates/app-channel/src/lib.rs +++ b/code/crates/app-channel/src/lib.rs @@ -14,7 +14,7 @@ pub mod connector; pub mod spawn; mod channel; -pub use channel::{AppMsg, ConsensusMsg}; +pub use channel::{AppMsg, Channels, ConsensusGossipMsg, ConsensusMsg}; mod run; pub use run::run; diff --git a/code/crates/app-channel/src/run.rs b/code/crates/app-channel/src/run.rs index a46eb3618..ea2e73900 100644 --- a/code/crates/app-channel/src/run.rs +++ b/code/crates/app-channel/src/run.rs @@ -2,20 +2,16 @@ //! Provides the application with a channel for receiving messages from consensus. use eyre::Result; -use tokio::sync::mpsc; -use crate::app; use crate::app::types::codec::{ConsensusCodec, SyncCodec, WalCodec}; use crate::app::types::config::Config as NodeConfig; use crate::app::types::core::Context; use crate::app::types::metrics::{Metrics, SharedRegistry}; -use crate::channel::AppMsg; -use crate::spawn::spawn_host_actor; +use crate::spawn::{spawn_gossip_consensus_actor, spawn_host_actor}; +use crate::{app, Channels}; use malachite_actors::util::events::TxEvent; -use malachite_app::{ - spawn_consensus_actor, spawn_gossip_consensus_actor, spawn_sync_actor, spawn_wal_actor, -}; +use malachite_app::{spawn_consensus_actor, spawn_sync_actor, spawn_wal_actor}; #[tracing::instrument("node", skip_all, fields(moniker = %cfg.moniker))] pub async fn run( @@ -25,7 +21,7 @@ pub async fn run( codec: Codec, node: Node, initial_validator_set: Ctx::ValidatorSet, -) -> Result>> +) -> Result> where Ctx: Context, Node: app::Node, @@ -39,20 +35,23 @@ where let metrics = Metrics::register(®istry); // TODO: Simplify this? - let private_key_file = node.load_private_key_file(node.get_home_dir())?; + let mut config_file = node.get_home_dir().clone(); + config_file.push("config"); + config_file.push("priv_validator_key.json"); + let private_key_file = node.load_private_key_file(config_file)?; let private_key = node.load_private_key(private_key_file); let public_key = node.get_public_key(&private_key); let address = node.get_address(&public_key); let keypair = node.get_keypair(private_key); // Spawn consensus gossip - let gossip_consensus = + let (gossip_consensus, gossipconsensusmsg_tx) = spawn_gossip_consensus_actor(&cfg, keypair, ®istry, codec.clone()).await?; let wal = spawn_wal_actor(&ctx, codec, &node.get_home_dir(), ®istry).await?; // Spawn the host actor - let (connector, rx) = spawn_host_actor(metrics.clone()).await?; + let (connector, appmsg_rx) = spawn_host_actor(metrics.clone()).await?; let sync = spawn_sync_actor( ctx.clone(), @@ -80,5 +79,8 @@ where ) .await?; - Ok(rx) + Ok(Channels { + consensus: appmsg_rx, + consensus_gossip: gossipconsensusmsg_tx, + }) } diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs index 58cbd2174..537803d98 100644 --- a/code/crates/app-channel/src/spawn.rs +++ b/code/crates/app-channel/src/spawn.rs @@ -1,14 +1,17 @@ //! Utility functions for spawning the actor system and connecting it to the application. -use eyre::Result; -use tokio::sync::mpsc; - -use malachite_actors::host::HostRef; - use crate::app::types::core::Context; use crate::app::types::metrics::Metrics; -use crate::channel::AppMsg; use crate::connector::Connector; +use crate::{AppMsg, ConsensusGossipMsg}; +use eyre::Result; +use malachite_actors::consensus::ConsensusCodec; +use malachite_actors::gossip_consensus::{GossipConsensusMsg, GossipConsensusRef}; +use malachite_actors::host::HostRef; +use malachite_actors::sync::SyncCodec; +use malachite_app::types::{metrics::SharedRegistry, Keypair}; +use malachite_config::Config as NodeConfig; +use tokio::sync::mpsc; pub async fn spawn_host_actor( metrics: Metrics, @@ -20,3 +23,36 @@ where let actor_ref = Connector::spawn(tx, metrics).await?; Ok((actor_ref, rx)) } + +pub async fn spawn_gossip_consensus_actor( + cfg: &NodeConfig, + keypair: Keypair, + registry: &SharedRegistry, + codec: Codec, +) -> Result<( + GossipConsensusRef, + mpsc::Sender>, +)> +where + Ctx: Context, + Codec: ConsensusCodec, + Codec: SyncCodec, +{ + let (tx, mut rx) = mpsc::channel(1); + + let actor_ref = + malachite_app::spawn_gossip_consensus_actor(cfg, keypair, registry, codec).await?; + let actor_ref_return = actor_ref.clone(); + + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + match msg { + ConsensusGossipMsg::PublishProposalPart(ppp) => actor_ref + .cast(GossipConsensusMsg::PublishProposalPart(ppp)) + .unwrap(), + } + } + }); + + Ok((actor_ref_return, tx)) +} diff --git a/code/crates/test/src/codec/testcodec.rs b/code/crates/test/src/codec/testcodec.rs index f1d1bcfec..4e5c577fa 100644 --- a/code/crates/test/src/codec/testcodec.rs +++ b/code/crates/test/src/codec/testcodec.rs @@ -9,6 +9,7 @@ use malachite_actors::util::streaming::StreamMessage; use malachite_consensus::SignedConsensusMsg; use malachite_sync::{Request, Response, Status}; +#[derive(Clone)] pub struct TestCodec; impl Codec for TestCodec { diff --git a/code/crates/test/src/height.rs b/code/crates/test/src/height.rs index 12c12c90f..8da82871c 100644 --- a/code/crates/test/src/height.rs +++ b/code/crates/test/src/height.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use crate::proto; /// A blockchain height -#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct Height(u64); impl Height { @@ -26,6 +26,12 @@ impl Height { } } +impl Default for Height { + fn default() -> Self { + Height(1) + } +} + impl fmt::Display for Height { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) diff --git a/code/crates/test/src/proposal_part.rs b/code/crates/test/src/proposal_part.rs index e745e08fb..0a9419f49 100644 --- a/code/crates/test/src/proposal_part.rs +++ b/code/crates/test/src/proposal_part.rs @@ -64,6 +64,12 @@ impl Content { pub fn size_bytes(&self) -> usize { self.metadata.size_bytes() } + + pub fn new(block_metadata: &BlockMetadata) -> Self { + Self { + metadata: block_metadata.clone(), + } + } } impl Protobuf for Content { diff --git a/code/examples/channel/Cargo.toml b/code/examples/channel/Cargo.toml new file mode 100644 index 000000000..689eaf17b --- /dev/null +++ b/code/examples/channel/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "malachite-example-channel" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true +rust-version.workspace = true + +[dependencies] +async-trait.workspace = true +bytes.workspace = true +color-eyre.workspace = true +eyre.workspace = true +libp2p-identity.workspace = true +rand.workspace = true +serde_json.workspace = true +sha3.workspace = true +tracing.workspace = true + +malachite-actors.workspace = true +malachite-app.workspace = true +malachite-app-channel.workspace = true +malachite-cli.workspace = true +malachite-common.workspace = true +malachite-config.workspace = true +malachite-consensus.workspace = true +malachite-test.workspace = true + +[lints] +workspace = true diff --git a/code/examples/channel/src/main.rs b/code/examples/channel/src/main.rs new file mode 100644 index 000000000..1f450fcb2 --- /dev/null +++ b/code/examples/channel/src/main.rs @@ -0,0 +1,103 @@ +//! Example application using channels + +mod node; +mod state; + +use eyre::eyre; +use malachite_cli::args::{Args, Commands}; +use malachite_cli::{logging, runtime}; +use malachite_config::load_config; +use node::App; +use tracing::{error, info, trace}; + +/// main function parses the command-line arguments, loads configuration, initializes logging +/// and runs the application object. +fn main() -> color_eyre::Result<()> { + color_eyre::install().expect("Failed to install global error handler"); + + // Load command-line arguments and possible configuration file. + let args = Args::new(); + + // Load configuration file if it exists. Some commands do not require a configuration file. + let opt_config_file_path = args + .get_config_file_path() + .map_err(|error| eyre!("Failed to get configuration file path: {:?}", error)); + let opt_config = opt_config_file_path.and_then(|path| { + load_config(&path, None) + .map_err(|error| eyre!("Failed to load configuration file: {:?}", error)) + }); + + // Override logging configuration (if exists) with optional command-line parameters. + let mut logging = opt_config.as_ref().map(|c| c.logging).unwrap_or_default(); + if let Some(log_level) = args.log_level { + logging.log_level = log_level; + } + if let Some(log_format) = args.log_format { + logging.log_format = log_format; + } + + // This is a drop guard responsible for flushing any remaining logs when the program terminates. + // It must be assigned to a binding that is not _, as _ will result in the guard being dropped immediately. + let _guard = logging::init(logging.log_level, logging.log_format); + + trace!("Command-line parameters: {args:?}"); + + // Create the application object. + let node = &App { + home_dir: args.get_home_dir()?, + config: Default::default(), // placeholder, because `init` and `testnet` has no valid configuration file. + genesis_file: args.get_genesis_file_path()?, + private_key_file: args.get_priv_validator_key_file_path()?, + start_height: Default::default(), // placeholder, because start_height is only valid in StartCmd. + }; + + // Parse the input command. + match &args.command { + Commands::Start(cmd) => { + // Build configuration from valid configuration file and command-line parameters. + let mut config = opt_config + .map_err(|error| error!(%error, "Failed to load configuration.")) + .unwrap(); + config.logging = logging; + let runtime = config.runtime; + let metrics = if config.metrics.enabled { + Some(config.metrics.clone()) + } else { + None + }; + + info!( + file = %args.get_config_file_path().unwrap_or_default().display(), + "Loaded configuration", + ); + trace!(?config, "Configuration"); + + // Redefine the node with the valid configuration. + let node = &App { + home_dir: args.get_home_dir()?, + config, + genesis_file: args.get_genesis_file_path()?, + private_key_file: args.get_priv_validator_key_file_path()?, + start_height: cmd.start_height, + }; + + // Define the runtime. If you are not interested in a custom runtime configuration, + // you can use the #[async_trait] attribute on the main function. + let rt = runtime::build_runtime(runtime)?; + rt.block_on(cmd.run(node, metrics)) + .map_err(|error| eyre!("Failed to run start command {:?}", error)) + } + Commands::Init(cmd) => cmd + .run( + node, + &args.get_config_file_path()?, + &args.get_genesis_file_path()?, + &args.get_priv_validator_key_file_path()?, + logging, + ) + .map_err(|error| eyre!("Failed to run init command {:?}", error)), + Commands::Testnet(cmd) => cmd + .run(node, &args.get_home_dir()?, logging) + .map_err(|error| eyre!("Failed to run testnet command {:?}", error)), + } +} diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs new file mode 100644 index 000000000..31c96f3c2 --- /dev/null +++ b/code/examples/channel/src/node.rs @@ -0,0 +1,242 @@ +//! The Application (or Node) definition. The Node trait implements the Consensus context and the +//! cryptographic library used for signing. + +use std::path::{Path, PathBuf}; + +use async_trait::async_trait; +use rand::{CryptoRng, RngCore}; +use tracing::{debug, error}; + +use malachite_app::Node; +use malachite_app_channel::{run, AppMsg, ConsensusGossipMsg, ConsensusMsg}; +use malachite_common::{Round, Validity, VotingPower}; +use malachite_config::Config; +use malachite_consensus::ProposedValue; +use malachite_test::{ + Address, Genesis, Height, PrivateKey, PublicKey, TestCodec, TestContext, Validator, + ValidatorSet, +}; + +use crate::state::State; +use libp2p_identity::Keypair; + +#[derive(Clone)] +pub struct App { + pub config: Config, + pub home_dir: PathBuf, + pub genesis_file: PathBuf, + pub private_key_file: PathBuf, + pub start_height: Option, +} + +#[async_trait] +impl Node for App { + type Context = TestContext; + type Genesis = Genesis; + type PrivateKeyFile = PrivateKey; + + fn get_home_dir(&self) -> PathBuf { + self.home_dir.to_owned() + } + + fn generate_private_key(&self, rng: R) -> PrivateKey + where + R: RngCore + CryptoRng, + { + PrivateKey::generate(rng) + } + + fn get_address(&self, pk: &PublicKey) -> Address { + Address::from_public_key(pk) + } + + fn get_public_key(&self, pk: &PrivateKey) -> PublicKey { + pk.public_key() + } + + fn get_keypair(&self, pk: PrivateKey) -> Keypair { + Keypair::ed25519_from_bytes(pk.inner().to_bytes()).unwrap() + } + + fn load_private_key(&self, file: Self::PrivateKeyFile) -> PrivateKey { + file + } + + fn load_private_key_file( + &self, + path: impl AsRef, + ) -> std::io::Result { + let private_key = std::fs::read_to_string(path)?; + serde_json::from_str(&private_key).map_err(|e| e.into()) + } + + fn make_private_key_file(&self, private_key: PrivateKey) -> Self::PrivateKeyFile { + private_key + } + + fn load_genesis(&self, path: impl AsRef) -> std::io::Result { + let genesis = std::fs::read_to_string(path)?; + serde_json::from_str(&genesis).map_err(|e| e.into()) + } + + fn make_genesis(&self, validators: Vec<(PublicKey, VotingPower)>) -> Self::Genesis { + let validators = validators + .into_iter() + .map(|(pk, vp)| Validator::new(pk, vp)); + + let validator_set = ValidatorSet::new(validators); + + Genesis { validator_set } + } + + async fn run(&self) -> eyre::Result<()> { + let span = tracing::error_span!("node", moniker = %self.config.moniker); + let _enter = span.enter(); + + let priv_key_file = self.load_private_key_file(self.private_key_file.clone())?; + let private_key = self.load_private_key(priv_key_file); + let address = self.get_address(&self.get_public_key(&private_key)); + let ctx = TestContext::new(private_key); + + let genesis = self.load_genesis(self.genesis_file.clone())?; + + let start_height = self.start_height.map(Height::new); + + let codec = TestCodec; + + let mut channels = run( + self.config.clone(), + start_height, + ctx, + codec, + self.clone(), + genesis.validator_set.clone(), + ) + .await?; + + let mut state = State::new(address, start_height.unwrap_or_default()); + + loop { + match channels.consensus.recv().await { + Some(msg) => match msg { + AppMsg::ConsensusReady { reply_to } => { + debug!("Consensus is ready to run"); + if reply_to + .send(ConsensusMsg::StartHeight( + state.current_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send ConsensusReady reply"); + } + } + AppMsg::StartedRound { + height, + round, + proposer, + } => { + state.current_height = height; + state.current_round = round; + state.current_proposer = Some(proposer); + } + AppMsg::GetValue { + height, + round: _, + timeout_duration: _, + address: _, + reply_to, + } => { + let value = state.get_locally_proposed_value(&height); + // Send it to consensus + if reply_to.send(value.clone()).is_err() { + error!("Failed to send GetValue reply"); + } + + let stream_message = state.create_broadcast_message(value); + // Broadcast it to others. Old messages need not be broadcast. + channels + .consensus_gossip + .send(ConsensusGossipMsg::PublishProposalPart(stream_message)) + .await?; + } + AppMsg::GetEarliestBlockHeight { reply_to } => { + error!("GetEarliestBlockHeight"); + if reply_to.send(state.get_earliest_height()).is_err() { + error!("Failed to send GetEarliestBlockHeight reply"); + } + } + AppMsg::ReceivedProposalPart { + from: _, + part, + reply_to, + } => { + let proposed_value = state.add_proposal(part); + if reply_to.send(proposed_value).is_err() { + error!("Failed to send ReceivedProposalPart reply"); + } + } + AppMsg::GetValidatorSet { + height: _, + reply_to, + } => { + if reply_to.send(genesis.validator_set.clone()).is_err() { + error!("Failed to send GetValidatorSet reply"); + } + } + AppMsg::Decided { + certificate, + reply_to, + } => { + state.commit_block(certificate); + if reply_to + .send(ConsensusMsg::StartHeight( + state.current_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send Decided reply"); + } + } + AppMsg::GetDecidedBlock { height, reply_to } => { + let block = state.get_block(&height).map(|o| (*o).clone()); + if reply_to.send(block).is_err() { + error!("Failed to send GetDecidedBlock reply"); + } + } + AppMsg::ProcessSyncedValue { + height, + round, + validator_address, + value_bytes: _, + reply_to, + } => { + // Instead of bothering proto-decoding the value_bytes, we will just fake a value. + let value = state.create_fake_proposal_value(&height); + if reply_to + .send(ProposedValue { + height, + round, + valid_round: Round::Nil, + validator_address, + value, + validity: Validity::Valid, + extension: None, + }) + .is_err() + { + error!("Failed to send ProcessSyncedBlock reply"); + } + } + AppMsg::RestreamValue { .. } => { + unimplemented!("RestreamValue"); + } + }, + None => { + error!("Channel is closed.") + } + } + } + } +} diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs new file mode 100644 index 000000000..6e859a7f9 --- /dev/null +++ b/code/examples/channel/src/state.rs @@ -0,0 +1,162 @@ +//! Internal state of the application. This is a simplified abstract to keep it simple. +//! A regular application would have mempool implemented, a proper database and input methods like RPC. + +use bytes::Bytes; +use malachite_actors::host::LocallyProposedValue; +use malachite_actors::util::streaming::{StreamContent, StreamMessage}; +use malachite_app::types::sync::DecidedValue; +use malachite_common::{CommitCertificate, Round, Validity}; +use malachite_consensus::ProposedValue; +use malachite_test::{Address, BlockMetadata, Content, Height, ProposalPart, TestContext, Value}; +use std::collections::HashMap; + +pub struct State { + pub current_height: Height, + pub current_round: Round, + pub current_proposer: Option
, + earliest_height: Height, + address: Address, + sequence: u64, + undecided_proposals: HashMap>, + decided_proposals: HashMap>, + blocks: HashMap>, + current_proposal: Option>, +} + +impl State { + pub fn new(address: Address, height: Height) -> Self { + Self { + earliest_height: height, + current_height: height, + current_round: Round::new(0), + current_proposer: None, + address, + sequence: 0, + undecided_proposals: HashMap::new(), + decided_proposals: HashMap::new(), + blocks: HashMap::new(), + current_proposal: None, + } + } + + pub fn get_earliest_height(&self) -> Height { + self.earliest_height + } + + pub fn create_fake_proposal_value(&self, height: &Height) -> Value { + use sha3::Digest; + let mut hasher = sha3::Keccak256::new(); + hasher.update(height.as_u64().to_le_bytes()); + let hash = hasher.finalize().to_vec(); + let simplified_hash = 255 * 255 * 255 * hash[0] as u64 + + 255 * 255 * hash[1] as u64 + + 255 * hash[2] as u64 + + hash[3] as u64; + Value::new(simplified_hash) + } + + pub fn add_proposal( + &mut self, + stream_message: StreamMessage, + ) -> ProposedValue { + if let StreamContent::Data(proposal_part) = stream_message.content { + if proposal_part.height > self.current_height + || proposal_part.height == self.current_height + && proposal_part.round >= self.current_round + { + assert!(proposal_part.fin); // we only implemented 1 part === 1 proposal + let value = self.create_fake_proposal_value(&proposal_part.height); + let proposal = ProposedValue { + height: proposal_part.height, + round: proposal_part.round, + valid_round: Round::Nil, + validator_address: proposal_part.validator_address, + value, + validity: Validity::Valid, + extension: None, + }; + self.undecided_proposals + .insert(proposal_part.height, proposal.clone()); + return proposal; + } + } + panic!("Invalid proposal"); + } + + pub fn get_block(&self, height: &Height) -> Option<&DecidedValue> { + self.blocks.get(height) + } + + pub fn commit_block(&mut self, certificate: CommitCertificate) { + // Sort out proposals + for (height, value) in self.undecided_proposals.clone() { + if height > self.current_height { + continue; + } + if height == self.current_height { + self.decided_proposals.insert(height, value); + } + self.undecided_proposals.remove(&height); + } + + // Commit block transactions to "database" + // Todo: retrieve all transactions from block parts + let value_bytes = Bytes::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + self.blocks.insert( + self.current_height, + DecidedValue { + value_bytes, + certificate, + }, + ); + + // Move to next height + self.current_height = self.current_height.increment(); + self.current_round = Round::new(0); + } + + pub fn get_previously_built_value( + &self, + height: &Height, + ) -> Option<&ProposedValue> { + self.undecided_proposals.get(height) + } + + pub fn get_locally_proposed_value(&self, height: &Height) -> LocallyProposedValue { + if let Some(proposal) = self.get_previously_built_value(height) { + // We have an old value to send back. + LocallyProposedValue::new(proposal.height, proposal.round, proposal.value, None) + } else { + assert_eq!(height.as_u64(), self.current_height.as_u64()); + // We create a new value. + let value = Value::new(42); // TODO: get value + LocallyProposedValue::new(self.current_height, self.current_round, value, None) + } + } + + pub fn create_broadcast_message( + &mut self, + value: LocallyProposedValue, + ) -> StreamMessage { + // TODO: create proof properly. + let fake_proof = [ + self.current_height.as_u64().to_le_bytes().to_vec(), + self.current_round.as_u32().unwrap().to_le_bytes().to_vec(), + ] + .concat(); + let content = Content::new(&BlockMetadata::new(fake_proof, value.value)); + let proposal_part = ProposalPart::new( + self.current_height, + self.current_round, + self.sequence, + self.address, + content, + true, // each proposal part is a full proposal + ); + let stream_content = StreamContent::Data(proposal_part); + let msg = StreamMessage::new(self.sequence, self.sequence, stream_content); + self.sequence += 1; + self.current_proposal = Some(msg.clone()); + msg + } +} From d7ee2e42853660a0a8078ecc729c380067f33b92 Mon Sep 17 00:00:00 2001 From: Greg Szabo Date: Mon, 16 Dec 2024 10:40:26 -0500 Subject: [PATCH 02/19] use protobuf values instead of fakes --- code/Cargo.lock | 1 + code/examples/channel/Cargo.toml | 1 + code/examples/channel/src/node.rs | 6 +++--- code/examples/channel/src/state.rs | 34 ++++++++++++++++++------------ 4 files changed, 26 insertions(+), 16 deletions(-) diff --git a/code/Cargo.lock b/code/Cargo.lock index 345b8d096..1275ed7f5 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2948,6 +2948,7 @@ dependencies = [ "malachite-common", "malachite-config", "malachite-consensus", + "malachite-proto", "malachite-test", "rand", "serde_json", diff --git a/code/examples/channel/Cargo.toml b/code/examples/channel/Cargo.toml index 689eaf17b..7aecc49fe 100644 --- a/code/examples/channel/Cargo.toml +++ b/code/examples/channel/Cargo.toml @@ -25,6 +25,7 @@ malachite-cli.workspace = true malachite-common.workspace = true malachite-config.workspace = true malachite-consensus.workspace = true +malachite-proto.workspace = true malachite-test.workspace = true [lints] diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index 31c96f3c2..a4186f6f3 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -17,6 +17,7 @@ use malachite_test::{ ValidatorSet, }; +use crate::state; use crate::state::State; use libp2p_identity::Keypair; @@ -209,11 +210,10 @@ impl Node for App { height, round, validator_address, - value_bytes: _, + value_bytes, reply_to, } => { - // Instead of bothering proto-decoding the value_bytes, we will just fake a value. - let value = state.create_fake_proposal_value(&height); + let value = state::value_from_vec(value_bytes.to_vec()); if reply_to .send(ProposedValue { height, diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 6e859a7f9..c003ba037 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -7,9 +7,18 @@ use malachite_actors::util::streaming::{StreamContent, StreamMessage}; use malachite_app::types::sync::DecidedValue; use malachite_common::{CommitCertificate, Round, Validity}; use malachite_consensus::ProposedValue; +use malachite_proto::Protobuf; use malachite_test::{Address, BlockMetadata, Content, Height, ProposalPart, TestContext, Value}; use std::collections::HashMap; +// Todo: implement better values +pub fn value_from_vec(vec: Vec) -> Value { + assert!(vec.len() >= 8); + let mut bytes = [0; 8]; + bytes.copy_from_slice(&vec); + Value::new(u64::from_le_bytes(bytes)) +} + pub struct State { pub current_height: Height, pub current_round: Round, @@ -43,18 +52,6 @@ impl State { self.earliest_height } - pub fn create_fake_proposal_value(&self, height: &Height) -> Value { - use sha3::Digest; - let mut hasher = sha3::Keccak256::new(); - hasher.update(height.as_u64().to_le_bytes()); - let hash = hasher.finalize().to_vec(); - let simplified_hash = 255 * 255 * 255 * hash[0] as u64 - + 255 * 255 * hash[1] as u64 - + 255 * hash[2] as u64 - + hash[3] as u64; - Value::new(simplified_hash) - } - pub fn add_proposal( &mut self, stream_message: StreamMessage, @@ -65,7 +62,18 @@ impl State { && proposal_part.round >= self.current_round { assert!(proposal_part.fin); // we only implemented 1 part === 1 proposal - let value = self.create_fake_proposal_value(&proposal_part.height); + let value = value_from_vec( + proposal_part + .content + .to_proto() + .unwrap() + .metadata + .unwrap() + .value + .unwrap() + .value() + .to_vec(), + ); let proposal = ProposedValue { height: proposal_part.height, round: proposal_part.round, From a5c77e1068e5ce7a831b8305847d7db60abf11e8 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 16 Dec 2024 17:11:41 +0100 Subject: [PATCH 03/19] Cleanup --- code/examples/channel/src/node.rs | 228 +++++++++++++++-------------- code/examples/channel/src/state.rs | 4 + 2 files changed, 121 insertions(+), 111 deletions(-) diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index a4186f6f3..bbf55cbeb 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -4,6 +4,8 @@ use std::path::{Path, PathBuf}; use async_trait::async_trait; +use eyre::eyre; +use libp2p_identity::Keypair; use rand::{CryptoRng, RngCore}; use tracing::{debug, error}; @@ -19,7 +21,6 @@ use malachite_test::{ use crate::state; use crate::state::State; -use libp2p_identity::Keypair; #[derive(Clone)] pub struct App { @@ -117,126 +118,131 @@ impl Node for App { let mut state = State::new(address, start_height.unwrap_or_default()); - loop { - match channels.consensus.recv().await { - Some(msg) => match msg { - AppMsg::ConsensusReady { reply_to } => { - debug!("Consensus is ready to run"); - if reply_to - .send(ConsensusMsg::StartHeight( - state.current_height, - genesis.validator_set.clone(), - )) - .is_err() - { - error!("Failed to send ConsensusReady reply"); - } - } - AppMsg::StartedRound { - height, - round, - proposer, - } => { - state.current_height = height; - state.current_round = round; - state.current_proposer = Some(proposer); + while let Some(msg) = channels.consensus.recv().await { + match msg { + AppMsg::ConsensusReady { reply_to } => { + debug!("Consensus is ready to run"); + if reply_to + .send(ConsensusMsg::StartHeight( + state.current_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send ConsensusReady reply"); } - AppMsg::GetValue { - height, - round: _, - timeout_duration: _, - address: _, - reply_to, - } => { - let value = state.get_locally_proposed_value(&height); - // Send it to consensus - if reply_to.send(value.clone()).is_err() { - error!("Failed to send GetValue reply"); - } - - let stream_message = state.create_broadcast_message(value); - // Broadcast it to others. Old messages need not be broadcast. - channels - .consensus_gossip - .send(ConsensusGossipMsg::PublishProposalPart(stream_message)) - .await?; - } - AppMsg::GetEarliestBlockHeight { reply_to } => { - error!("GetEarliestBlockHeight"); - if reply_to.send(state.get_earliest_height()).is_err() { - error!("Failed to send GetEarliestBlockHeight reply"); - } + } + + AppMsg::StartedRound { + height, + round, + proposer, + } => { + state.current_height = height; + state.current_round = round; + state.current_proposer = Some(proposer); + } + + AppMsg::GetValue { + height, + round: _, + timeout_duration: _, + address: _, + reply_to, + } => { + let value = state.get_locally_proposed_value(&height); + // Send it to consensus + if reply_to.send(value.clone()).is_err() { + error!("Failed to send GetValue reply"); } - AppMsg::ReceivedProposalPart { - from: _, - part, - reply_to, - } => { - let proposed_value = state.add_proposal(part); - if reply_to.send(proposed_value).is_err() { - error!("Failed to send ReceivedProposalPart reply"); - } + + let stream_message = state.create_broadcast_message(value); + // Broadcast it to others. Old messages need not be broadcast. + channels + .consensus_gossip + .send(ConsensusGossipMsg::PublishProposalPart(stream_message)) + .await?; + } + + AppMsg::GetEarliestBlockHeight { reply_to } => { + if reply_to.send(state.get_earliest_height()).is_err() { + error!("Failed to send GetEarliestBlockHeight reply"); } - AppMsg::GetValidatorSet { - height: _, - reply_to, - } => { - if reply_to.send(genesis.validator_set.clone()).is_err() { - error!("Failed to send GetValidatorSet reply"); - } + } + + AppMsg::ReceivedProposalPart { + from: _, + part, + reply_to, + } => { + let proposed_value = state.add_proposal(part); + if reply_to.send(proposed_value).is_err() { + error!("Failed to send ReceivedProposalPart reply"); } - AppMsg::Decided { - certificate, - reply_to, - } => { - state.commit_block(certificate); - if reply_to - .send(ConsensusMsg::StartHeight( - state.current_height, - genesis.validator_set.clone(), - )) - .is_err() - { - error!("Failed to send Decided reply"); - } + } + + AppMsg::GetValidatorSet { + height: _, + reply_to, + } => { + if reply_to.send(genesis.validator_set.clone()).is_err() { + error!("Failed to send GetValidatorSet reply"); } - AppMsg::GetDecidedBlock { height, reply_to } => { - let block = state.get_block(&height).map(|o| (*o).clone()); - if reply_to.send(block).is_err() { - error!("Failed to send GetDecidedBlock reply"); - } + } + + AppMsg::Decided { + certificate, + reply_to, + } => { + state.commit_block(certificate); + if reply_to + .send(ConsensusMsg::StartHeight( + state.current_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send Decided reply"); } - AppMsg::ProcessSyncedValue { - height, - round, - validator_address, - value_bytes, - reply_to, - } => { - let value = state::value_from_vec(value_bytes.to_vec()); - if reply_to - .send(ProposedValue { - height, - round, - valid_round: Round::Nil, - validator_address, - value, - validity: Validity::Valid, - extension: None, - }) - .is_err() - { - error!("Failed to send ProcessSyncedBlock reply"); - } + } + + AppMsg::GetDecidedBlock { height, reply_to } => { + let block = state.get_block(&height).map(|o| (*o).clone()); + if reply_to.send(block).is_err() { + error!("Failed to send GetDecidedBlock reply"); } - AppMsg::RestreamValue { .. } => { - unimplemented!("RestreamValue"); + } + + AppMsg::ProcessSyncedValue { + height, + round, + validator_address, + value_bytes, + reply_to, + } => { + let value = state::value_from_vec(value_bytes.to_vec()); + if reply_to + .send(ProposedValue { + height, + round, + valid_round: Round::Nil, + validator_address, + value, + validity: Validity::Valid, + extension: None, + }) + .is_err() + { + error!("Failed to send ProcessSyncedBlock reply"); } - }, - None => { - error!("Channel is closed.") + } + + AppMsg::RestreamValue { .. } => { + unimplemented!("RestreamValue"); } } } + + Err(eyre!("Consensus channel closed unexpectedly")) } } diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index c003ba037..e64ae25aa 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -62,6 +62,7 @@ impl State { && proposal_part.round >= self.current_round { assert!(proposal_part.fin); // we only implemented 1 part === 1 proposal + let value = value_from_vec( proposal_part .content @@ -152,7 +153,9 @@ impl State { self.current_round.as_u32().unwrap().to_le_bytes().to_vec(), ] .concat(); + let content = Content::new(&BlockMetadata::new(fake_proof, value.value)); + let proposal_part = ProposalPart::new( self.current_height, self.current_round, @@ -161,6 +164,7 @@ impl State { content, true, // each proposal part is a full proposal ); + let stream_content = StreamContent::Data(proposal_part); let msg = StreamMessage::new(self.sequence, self.sequence, stream_content); self.sequence += 1; From 2c131b378bbce64d82c1780538d40397834b686a Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 16 Dec 2024 17:43:29 +0100 Subject: [PATCH 04/19] Add `Codec` for `Value` --- code/Cargo.lock | 1 - code/crates/test/src/codec/testcodec.rs | 21 +++++++++++++++++---- code/crates/test/src/proposal_part.rs | 2 +- code/examples/channel/Cargo.toml | 1 - 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/code/Cargo.lock b/code/Cargo.lock index 1275ed7f5..345b8d096 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2948,7 +2948,6 @@ dependencies = [ "malachite-common", "malachite-config", "malachite-consensus", - "malachite-proto", "malachite-test", "rand", "serde_json", diff --git a/code/crates/test/src/codec/testcodec.rs b/code/crates/test/src/codec/testcodec.rs index 4e5c577fa..b82a412df 100644 --- a/code/crates/test/src/codec/testcodec.rs +++ b/code/crates/test/src/codec/testcodec.rs @@ -1,17 +1,30 @@ use bytes::Bytes; use malachite_codec::Codec; -use crate::codec::types::{ - RawRequest, RawResponse, RawSignedConsensusMsg, RawStatus, RawStreamMessage, -}; -use crate::{ProposalPart, TestContext}; use malachite_actors::util::streaming::StreamMessage; use malachite_consensus::SignedConsensusMsg; use malachite_sync::{Request, Response, Status}; +use crate::codec::types::{ + RawRequest, RawResponse, RawSignedConsensusMsg, RawStatus, RawStreamMessage, +}; +use crate::{ProposalPart, TestContext, Value}; + #[derive(Clone)] pub struct TestCodec; +impl Codec for TestCodec { + type Error = serde_json::Error; + + fn decode(&self, bytes: Bytes) -> Result { + serde_json::from_slice(&bytes) + } + + fn encode(&self, msg: &Value) -> Result { + serde_json::to_vec(&msg).map(Bytes::from) + } +} + impl Codec for TestCodec { type Error = serde_json::Error; diff --git a/code/crates/test/src/proposal_part.rs b/code/crates/test/src/proposal_part.rs index 0a9419f49..76a288003 100644 --- a/code/crates/test/src/proposal_part.rs +++ b/code/crates/test/src/proposal_part.rs @@ -57,7 +57,7 @@ impl Protobuf for BlockMetadata { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Content { - metadata: BlockMetadata, + pub metadata: BlockMetadata, } impl Content { diff --git a/code/examples/channel/Cargo.toml b/code/examples/channel/Cargo.toml index 7aecc49fe..689eaf17b 100644 --- a/code/examples/channel/Cargo.toml +++ b/code/examples/channel/Cargo.toml @@ -25,7 +25,6 @@ malachite-cli.workspace = true malachite-common.workspace = true malachite-config.workspace = true malachite-consensus.workspace = true -malachite-proto.workspace = true malachite-test.workspace = true [lints] From dd7c8d0c3d22dc412394ae606ce2393a70ea6dab Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 16 Dec 2024 17:45:43 +0100 Subject: [PATCH 05/19] Ensure values always match and store proposed value in state --- code/examples/channel/src/node.rs | 27 +++++-- code/examples/channel/src/state.rs | 121 +++++++++++++++++------------ 2 files changed, 90 insertions(+), 58 deletions(-) diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index bbf55cbeb..a75a05dfd 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -9,6 +9,7 @@ use libp2p_identity::Keypair; use rand::{CryptoRng, RngCore}; use tracing::{debug, error}; +use malachite_app::types::LocallyProposedValue; use malachite_app::Node; use malachite_app_channel::{run, AppMsg, ConsensusGossipMsg, ConsensusMsg}; use malachite_common::{Round, Validity, VotingPower}; @@ -19,8 +20,7 @@ use malachite_test::{ ValidatorSet, }; -use crate::state; -use crate::state::State; +use crate::state::{decode_value, State}; #[derive(Clone)] pub struct App { @@ -150,13 +150,22 @@ impl Node for App { address: _, reply_to, } => { - let value = state.get_locally_proposed_value(&height); + let proposal = state.propose_value(&height); + + let value = LocallyProposedValue::new( + proposal.height, + proposal.round, + proposal.value, + proposal.extension, + ); + // Send it to consensus if reply_to.send(value.clone()).is_err() { error!("Failed to send GetValue reply"); } let stream_message = state.create_broadcast_message(value); + // Broadcast it to others. Old messages need not be broadcast. channels .consensus_gossip @@ -175,9 +184,10 @@ impl Node for App { part, reply_to, } => { - let proposed_value = state.add_proposal(part); - if reply_to.send(proposed_value).is_err() { - error!("Failed to send ReceivedProposalPart reply"); + if let Some(proposed_value) = state.add_proposal(part) { + if reply_to.send(proposed_value).is_err() { + error!("Failed to send ReceivedProposalPart reply"); + } } } @@ -207,7 +217,7 @@ impl Node for App { } AppMsg::GetDecidedBlock { height, reply_to } => { - let block = state.get_block(&height).map(|o| (*o).clone()); + let block = state.get_block(&height).cloned(); if reply_to.send(block).is_err() { error!("Failed to send GetDecidedBlock reply"); } @@ -220,7 +230,8 @@ impl Node for App { value_bytes, reply_to, } => { - let value = state::value_from_vec(value_bytes.to_vec()); + let value = decode_value(value_bytes); + if reply_to .send(ProposedValue { height, diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index e64ae25aa..b70001806 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -1,22 +1,27 @@ //! Internal state of the application. This is a simplified abstract to keep it simple. //! A regular application would have mempool implemented, a proper database and input methods like RPC. +use std::collections::HashMap; + use bytes::Bytes; +use tracing::error; + use malachite_actors::host::LocallyProposedValue; use malachite_actors::util::streaming::{StreamContent, StreamMessage}; +use malachite_app::types::codec::Codec; use malachite_app::types::sync::DecidedValue; use malachite_common::{CommitCertificate, Round, Validity}; use malachite_consensus::ProposedValue; -use malachite_proto::Protobuf; -use malachite_test::{Address, BlockMetadata, Content, Height, ProposalPart, TestContext, Value}; -use std::collections::HashMap; +use malachite_test::{ + Address, BlockMetadata, Content, Height, ProposalPart, TestCodec, TestContext, Value, +}; -// Todo: implement better values -pub fn value_from_vec(vec: Vec) -> Value { - assert!(vec.len() >= 8); - let mut bytes = [0; 8]; - bytes.copy_from_slice(&vec); - Value::new(u64::from_le_bytes(bytes)) +pub fn decode_value(bytes: Bytes) -> Value { + TestCodec.decode(bytes).unwrap() +} + +pub fn encode_value(value: &Value) -> Bytes { + TestCodec.encode(value).unwrap() } pub struct State { @@ -55,41 +60,37 @@ impl State { pub fn add_proposal( &mut self, stream_message: StreamMessage, - ) -> ProposedValue { - if let StreamContent::Data(proposal_part) = stream_message.content { - if proposal_part.height > self.current_height - || proposal_part.height == self.current_height - && proposal_part.round >= self.current_round - { - assert!(proposal_part.fin); // we only implemented 1 part === 1 proposal - - let value = value_from_vec( - proposal_part - .content - .to_proto() - .unwrap() - .metadata - .unwrap() - .value - .unwrap() - .value() - .to_vec(), - ); - let proposal = ProposedValue { - height: proposal_part.height, - round: proposal_part.round, - valid_round: Round::Nil, - validator_address: proposal_part.validator_address, - value, - validity: Validity::Valid, - extension: None, - }; - self.undecided_proposals - .insert(proposal_part.height, proposal.clone()); - return proposal; - } + ) -> Option> { + let StreamContent::Data(proposal_part) = stream_message.content else { + error!("Invalid proposal: {:?}", stream_message.content); + return None; + }; + + if proposal_part.height > self.current_height + || proposal_part.height == self.current_height + && proposal_part.round >= self.current_round + { + assert!(proposal_part.fin); // we only implemented 1 part === 1 proposal + + let value = proposal_part.content.metadata.value(); + + let proposal = ProposedValue { + height: proposal_part.height, + round: proposal_part.round, + valid_round: Round::Nil, + validator_address: proposal_part.validator_address, + value, + validity: Validity::Valid, + extension: None, + }; + + self.undecided_proposals + .insert(proposal_part.height, proposal.clone()); + + Some(proposal) + } else { + None } - panic!("Invalid proposal"); } pub fn get_block(&self, height: &Height) -> Option<&DecidedValue> { @@ -102,15 +103,19 @@ impl State { if height > self.current_height { continue; } - if height == self.current_height { + + if height == certificate.height { self.decided_proposals.insert(height, value); } + self.undecided_proposals.remove(&height); } // Commit block transactions to "database" - // Todo: retrieve all transactions from block parts - let value_bytes = Bytes::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + // TODO: retrieve all transactions from block parts + let value = self.decided_proposals.get(&certificate.height).unwrap(); + let value_bytes = encode_value(&value.value); + self.blocks.insert( self.current_height, DecidedValue { @@ -131,15 +136,29 @@ impl State { self.undecided_proposals.get(height) } - pub fn get_locally_proposed_value(&self, height: &Height) -> LocallyProposedValue { + pub fn propose_value(&mut self, height: &Height) -> ProposedValue { if let Some(proposal) = self.get_previously_built_value(height) { - // We have an old value to send back. - LocallyProposedValue::new(proposal.height, proposal.round, proposal.value, None) + proposal.clone() } else { assert_eq!(height.as_u64(), self.current_height.as_u64()); + // We create a new value. let value = Value::new(42); // TODO: get value - LocallyProposedValue::new(self.current_height, self.current_round, value, None) + + let proposal = ProposedValue { + height: *height, + round: self.current_round, + valid_round: Round::Nil, + validator_address: self.address, + value, + validity: Validity::Valid, + extension: None, + }; + + // Insert the new proposal into the undecided proposals. + self.undecided_proposals.insert(*height, proposal.clone()); + + proposal } } @@ -167,8 +186,10 @@ impl State { let stream_content = StreamContent::Data(proposal_part); let msg = StreamMessage::new(self.sequence, self.sequence, stream_content); + self.sequence += 1; self.current_proposal = Some(msg.clone()); + msg } } From e69164164ac1168ca0256d2bee35a1d232e4563a Mon Sep 17 00:00:00 2001 From: Greg Szabo Date: Mon, 16 Dec 2024 13:31:10 -0500 Subject: [PATCH 06/19] consolidating dependencies --- code/Cargo.lock | 6 ------ code/crates/app/src/lib.rs | 16 ++++++++++++++++ code/crates/cli/src/lib.rs | 4 ++++ code/examples/channel/Cargo.toml | 6 ------ code/examples/channel/src/main.rs | 2 +- code/examples/channel/src/node.rs | 10 +++++----- code/examples/channel/src/state.rs | 12 ++++++------ 7 files changed, 32 insertions(+), 24 deletions(-) diff --git a/code/Cargo.lock b/code/Cargo.lock index 345b8d096..779a2dc0e 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2941,17 +2941,11 @@ dependencies = [ "color-eyre", "eyre", "libp2p-identity", - "malachite-actors", - "malachite-app", "malachite-app-channel", "malachite-cli", - "malachite-common", - "malachite-config", - "malachite-consensus", "malachite-test", "rand", "serde_json", - "sha3", "tracing", ] diff --git a/code/crates/app/src/lib.rs b/code/crates/app/src/lib.rs index efa7a476c..618f51e56 100644 --- a/code/crates/app/src/lib.rs +++ b/code/crates/app/src/lib.rs @@ -17,3 +17,19 @@ mod spawn; pub use spawn::{ spawn_consensus_actor, spawn_gossip_consensus_actor, spawn_sync_actor, spawn_wal_actor, }; + +pub mod streaming { + pub use malachite_actors::util::streaming::*; +} + +pub mod host { + pub use malachite_actors::host::LocallyProposedValue; +} + +pub mod consensus { + pub use malachite_consensus::*; +} + +pub mod common { + pub use malachite_common::*; +} diff --git a/code/crates/cli/src/lib.rs b/code/crates/cli/src/lib.rs index 915c68328..bb0f200b9 100644 --- a/code/crates/cli/src/lib.rs +++ b/code/crates/cli/src/lib.rs @@ -6,3 +6,7 @@ pub mod logging; pub mod metrics; pub mod new; pub mod runtime; + +pub mod config { + pub use malachite_config::*; +} diff --git a/code/examples/channel/Cargo.toml b/code/examples/channel/Cargo.toml index 689eaf17b..e9dad8894 100644 --- a/code/examples/channel/Cargo.toml +++ b/code/examples/channel/Cargo.toml @@ -15,16 +15,10 @@ eyre.workspace = true libp2p-identity.workspace = true rand.workspace = true serde_json.workspace = true -sha3.workspace = true tracing.workspace = true -malachite-actors.workspace = true -malachite-app.workspace = true malachite-app-channel.workspace = true malachite-cli.workspace = true -malachite-common.workspace = true -malachite-config.workspace = true -malachite-consensus.workspace = true malachite-test.workspace = true [lints] diff --git a/code/examples/channel/src/main.rs b/code/examples/channel/src/main.rs index 1f450fcb2..8325410c5 100644 --- a/code/examples/channel/src/main.rs +++ b/code/examples/channel/src/main.rs @@ -5,8 +5,8 @@ mod state; use eyre::eyre; use malachite_cli::args::{Args, Commands}; +use malachite_cli::config::load_config; use malachite_cli::{logging, runtime}; -use malachite_config::load_config; use node::App; use tracing::{error, info, trace}; diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index a75a05dfd..9bb689d2b 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -9,12 +9,12 @@ use libp2p_identity::Keypair; use rand::{CryptoRng, RngCore}; use tracing::{debug, error}; -use malachite_app::types::LocallyProposedValue; -use malachite_app::Node; +use malachite_app_channel::app::common::{Round, Validity, VotingPower}; +use malachite_app_channel::app::consensus::ProposedValue; +use malachite_app_channel::app::types::LocallyProposedValue; +use malachite_app_channel::app::Node; use malachite_app_channel::{run, AppMsg, ConsensusGossipMsg, ConsensusMsg}; -use malachite_common::{Round, Validity, VotingPower}; -use malachite_config::Config; -use malachite_consensus::ProposedValue; +use malachite_cli::config::Config; use malachite_test::{ Address, Genesis, Height, PrivateKey, PublicKey, TestCodec, TestContext, Validator, ValidatorSet, diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index b70001806..53e6ee0f7 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -6,12 +6,12 @@ use std::collections::HashMap; use bytes::Bytes; use tracing::error; -use malachite_actors::host::LocallyProposedValue; -use malachite_actors::util::streaming::{StreamContent, StreamMessage}; -use malachite_app::types::codec::Codec; -use malachite_app::types::sync::DecidedValue; -use malachite_common::{CommitCertificate, Round, Validity}; -use malachite_consensus::ProposedValue; +use malachite_app_channel::app::common::{CommitCertificate, Round, Validity}; +use malachite_app_channel::app::consensus::ProposedValue; +use malachite_app_channel::app::host::LocallyProposedValue; +use malachite_app_channel::app::streaming::{StreamContent, StreamMessage}; +use malachite_app_channel::app::types::codec::Codec; +use malachite_app_channel::app::types::sync::DecidedValue; use malachite_test::{ Address, BlockMetadata, Content, Height, ProposalPart, TestCodec, TestContext, Value, }; From b04bc864b419c88694dde4e2aa4ce5fa78508182 Mon Sep 17 00:00:00 2001 From: Greg Szabo Date: Mon, 16 Dec 2024 13:47:49 -0500 Subject: [PATCH 07/19] colsolidating more dependencies --- code/Cargo.lock | 2 -- code/crates/app/src/lib.rs | 4 ++++ code/crates/cli/Cargo.toml | 4 +--- code/crates/cli/src/metrics.rs | 3 ++- code/crates/cli/src/new.rs | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/code/Cargo.lock b/code/Cargo.lock index 779a2dc0e..6e367823d 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2822,9 +2822,7 @@ dependencies = [ "directories", "itertools 0.13.0", "malachite-app", - "malachite-common", "malachite-config", - "malachite-metrics", "rand", "serde_json", "thiserror 2.0.6", diff --git a/code/crates/app/src/lib.rs b/code/crates/app/src/lib.rs index 618f51e56..e198f274d 100644 --- a/code/crates/app/src/lib.rs +++ b/code/crates/app/src/lib.rs @@ -33,3 +33,7 @@ pub mod consensus { pub mod common { pub use malachite_common::*; } + +pub mod metrics { + pub use malachite_metrics::*; +} diff --git a/code/crates/cli/Cargo.toml b/code/crates/cli/Cargo.toml index ef54e4060..4b6a4e18d 100644 --- a/code/crates/cli/Cargo.toml +++ b/code/crates/cli/Cargo.toml @@ -10,9 +10,7 @@ publish.workspace = true workspace = true [dependencies] -malachite-common.workspace = true -malachite-metrics.workspace = true -malachite-config.workspace = true +malachite-config.workspace = true malachite-app.workspace = true axum = { workspace = true } diff --git a/code/crates/cli/src/metrics.rs b/code/crates/cli/src/metrics.rs index 89e1cfad6..f03297974 100644 --- a/code/crates/cli/src/metrics.rs +++ b/code/crates/cli/src/metrics.rs @@ -3,6 +3,7 @@ use axum::Router; use tokio::net::TcpListener; use tracing::info; +use malachite_app::metrics::export; use malachite_config::MetricsConfig; #[tracing::instrument(name = "metrics", skip_all)] @@ -16,6 +17,6 @@ pub async fn serve(config: MetricsConfig) { async fn get_metrics() -> String { let mut buf = String::new(); - malachite_metrics::export(&mut buf); + export(&mut buf); buf } diff --git a/code/crates/cli/src/new.rs b/code/crates/cli/src/new.rs index ed76951e6..9a8c0e8a9 100644 --- a/code/crates/cli/src/new.rs +++ b/code/crates/cli/src/new.rs @@ -6,8 +6,8 @@ use rand::prelude::StdRng; use rand::rngs::OsRng; use rand::{seq::IteratorRandom, Rng, SeedableRng}; +use malachite_app::common::{PrivateKey, PublicKey}; use malachite_app::Node; -use malachite_common::{PrivateKey, PublicKey}; use malachite_config::*; const MIN_VOTING_POWER: u64 = 1; From 4b8d054771527123b5a8dd7db81c99c83d0cb465 Mon Sep 17 00:00:00 2001 From: Greg Szabo Date: Mon, 16 Dec 2024 16:34:01 -0500 Subject: [PATCH 08/19] fmt --- code/crates/app-channel/src/run.rs | 6 ++---- code/crates/app-channel/src/spawn.rs | 14 +++++--------- code/examples/channel/src/node.rs | 4 ++-- code/examples/channel/src/state.rs | 2 +- 4 files changed, 10 insertions(+), 16 deletions(-) diff --git a/code/crates/app-channel/src/run.rs b/code/crates/app-channel/src/run.rs index 525eb564a..9c7432c0f 100644 --- a/code/crates/app-channel/src/run.rs +++ b/code/crates/app-channel/src/run.rs @@ -7,12 +7,10 @@ use crate::app::types::codec::{ConsensusCodec, SyncCodec, WalCodec}; use crate::app::types::config::Config as NodeConfig; use crate::app::types::core::Context; use crate::app::types::metrics::{Metrics, SharedRegistry}; -use crate::spawn::{spawn_network_actor, spawn_host_actor}; +use crate::spawn::{spawn_host_actor, spawn_network_actor}; use crate::{app, Channels}; -use malachite_app::{ - spawn_consensus_actor, spawn_sync_actor, spawn_wal_actor, -}; +use malachite_app::{spawn_consensus_actor, spawn_sync_actor, spawn_wal_actor}; use malachite_engine::util::events::TxEvent; #[tracing::instrument("node", skip_all, fields(moniker = %cfg.moniker))] diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs index 6ee68556a..98661aaff 100644 --- a/code/crates/app-channel/src/spawn.rs +++ b/code/crates/app-channel/src/spawn.rs @@ -8,12 +8,12 @@ use crate::app::types::metrics::Metrics; use crate::connector::Connector; use crate::{AppMsg, ConsensusGossipMsg}; +use malachite_app::types::{metrics::SharedRegistry, Keypair}; +use malachite_config::Config as NodeConfig; use malachite_engine::consensus::ConsensusCodec; -use malachite_engine::network::{NetworkMsg, NetworkRef}; use malachite_engine::host::HostRef; +use malachite_engine::network::{NetworkMsg, NetworkRef}; use malachite_engine::sync::SyncCodec; -use malachite_app::types::{metrics::SharedRegistry, Keypair}; -use malachite_config::Config as NodeConfig; pub async fn spawn_host_actor( metrics: Metrics, @@ -31,10 +31,7 @@ pub async fn spawn_network_actor( keypair: Keypair, registry: &SharedRegistry, codec: Codec, -) -> Result<( - NetworkRef, - mpsc::Sender>, -)> +) -> Result<(NetworkRef, mpsc::Sender>)> where Ctx: Context, Codec: ConsensusCodec, @@ -42,8 +39,7 @@ where { let (tx, mut rx) = mpsc::channel(1); - let actor_ref = - malachite_app::spawn_network_actor(cfg, keypair, registry, codec).await?; + let actor_ref = malachite_app::spawn_network_actor(cfg, keypair, registry, codec).await?; let actor_ref_return = actor_ref.clone(); tokio::spawn(async move { diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index 7cb7797c9..14c187311 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -9,16 +9,16 @@ use libp2p_identity::Keypair; use rand::{CryptoRng, RngCore}; use tracing::{debug, error}; -use malachite_app_channel::app::types::core::{Round, Validity, VotingPower}; use malachite_app_channel::app::consensus::ProposedValue; +use malachite_app_channel::app::types::core::{Round, Validity, VotingPower}; use malachite_app_channel::app::types::LocallyProposedValue; use malachite_app_channel::app::Node; use malachite_app_channel::{run, AppMsg, ConsensusGossipMsg, ConsensusMsg}; -use malachite_test_cli::config::Config; use malachite_test::{ Address, Genesis, Height, PrivateKey, PublicKey, TestCodec, TestContext, Validator, ValidatorSet, }; +use malachite_test_cli::config::Config; use crate::state::{decode_value, State}; diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index be46b488c..15ec8ba5a 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -6,11 +6,11 @@ use std::collections::HashMap; use bytes::Bytes; use tracing::error; -use malachite_app_channel::app::types::core::{CommitCertificate, Round, Validity}; use malachite_app_channel::app::consensus::ProposedValue; use malachite_app_channel::app::host::LocallyProposedValue; use malachite_app_channel::app::streaming::{StreamContent, StreamMessage}; use malachite_app_channel::app::types::codec::Codec; +use malachite_app_channel::app::types::core::{CommitCertificate, Round, Validity}; use malachite_app_channel::app::types::sync::DecidedValue; use malachite_test::{ Address, BlockMetadata, Content, Height, ProposalPart, TestCodec, TestContext, Value, From 7c46c7e127b47a26f2660ee4ecb3b92ed6d03b88 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 08:35:33 +0100 Subject: [PATCH 09/19] Move JSON codec into `codec::json` module --- .../src/codec/{testcodec.rs => json/mod.rs} | 22 +++++++++---------- .../test/src/codec/{types.rs => json/raw.rs} | 0 code/crates/test/src/codec/mod.rs | 3 +-- code/crates/test/src/lib.rs | 4 +--- code/examples/channel/src/node.rs | 6 ++--- code/examples/channel/src/state.rs | 9 ++++---- 6 files changed, 20 insertions(+), 24 deletions(-) rename code/crates/test/src/codec/{testcodec.rs => json/mod.rs} (85%) rename code/crates/test/src/codec/{types.rs => json/raw.rs} (100%) diff --git a/code/crates/test/src/codec/testcodec.rs b/code/crates/test/src/codec/json/mod.rs similarity index 85% rename from code/crates/test/src/codec/testcodec.rs rename to code/crates/test/src/codec/json/mod.rs index 6b25542b1..eb19934e4 100644 --- a/code/crates/test/src/codec/testcodec.rs +++ b/code/crates/test/src/codec/json/mod.rs @@ -5,15 +5,15 @@ use malachite_consensus::SignedConsensusMsg; use malachite_engine::util::streaming::StreamMessage; use malachite_sync::{Request, Response, Status}; -use crate::codec::types::{ - RawRequest, RawResponse, RawSignedConsensusMsg, RawStatus, RawStreamMessage, -}; +mod raw; +use raw::{RawRequest, RawResponse, RawSignedConsensusMsg, RawStatus, RawStreamMessage}; + use crate::{ProposalPart, TestContext, Value}; #[derive(Clone)] -pub struct TestCodec; +pub struct JsonCodec; -impl Codec for TestCodec { +impl Codec for JsonCodec { type Error = serde_json::Error; fn decode(&self, bytes: Bytes) -> Result { @@ -25,7 +25,7 @@ impl Codec for TestCodec { } } -impl Codec for TestCodec { +impl Codec for JsonCodec { type Error = serde_json::Error; fn decode(&self, bytes: Bytes) -> Result { @@ -37,7 +37,7 @@ impl Codec for TestCodec { } } -impl Codec> for TestCodec { +impl Codec> for JsonCodec { type Error = serde_json::Error; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { @@ -49,7 +49,7 @@ impl Codec> for TestCodec { } } -impl Codec> for TestCodec { +impl Codec> for JsonCodec { type Error = serde_json::Error; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { @@ -61,7 +61,7 @@ impl Codec> for TestCodec { } } -impl Codec> for TestCodec { +impl Codec> for JsonCodec { type Error = serde_json::Error; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { @@ -73,7 +73,7 @@ impl Codec> for TestCodec { } } -impl Codec> for TestCodec { +impl Codec> for JsonCodec { type Error = serde_json::Error; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { @@ -85,7 +85,7 @@ impl Codec> for TestCodec { } } -impl Codec> for TestCodec { +impl Codec> for JsonCodec { type Error = serde_json::Error; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { diff --git a/code/crates/test/src/codec/types.rs b/code/crates/test/src/codec/json/raw.rs similarity index 100% rename from code/crates/test/src/codec/types.rs rename to code/crates/test/src/codec/json/raw.rs diff --git a/code/crates/test/src/codec/mod.rs b/code/crates/test/src/codec/mod.rs index 61fafb4f6..22fdbb38c 100644 --- a/code/crates/test/src/codec/mod.rs +++ b/code/crates/test/src/codec/mod.rs @@ -1,2 +1 @@ -pub mod testcodec; -pub(crate) mod types; +pub mod json; diff --git a/code/crates/test/src/lib.rs b/code/crates/test/src/lib.rs index b6a6e595c..6c6e34524 100644 --- a/code/crates/test/src/lib.rs +++ b/code/crates/test/src/lib.rs @@ -5,10 +5,8 @@ #![cfg_attr(coverage_nightly, feature(coverage_attribute))] mod address; -mod codec; mod context; mod genesis; - mod height; mod node; mod proposal; @@ -18,12 +16,12 @@ mod validator_set; mod value; mod vote; +pub mod codec; pub mod proposer_selector; pub mod proto; pub mod utils; pub use crate::address::*; -pub use crate::codec::testcodec::*; pub use crate::context::*; pub use crate::genesis::*; pub use crate::height::*; diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index 14c187311..aaddbb74b 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -14,9 +14,9 @@ use malachite_app_channel::app::types::core::{Round, Validity, VotingPower}; use malachite_app_channel::app::types::LocallyProposedValue; use malachite_app_channel::app::Node; use malachite_app_channel::{run, AppMsg, ConsensusGossipMsg, ConsensusMsg}; +use malachite_test::codec::json::JsonCodec; use malachite_test::{ - Address, Genesis, Height, PrivateKey, PublicKey, TestCodec, TestContext, Validator, - ValidatorSet, + Address, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, ValidatorSet, }; use malachite_test_cli::config::Config; @@ -104,7 +104,7 @@ impl Node for App { let start_height = self.start_height.map(Height::new); - let codec = TestCodec; + let codec = JsonCodec; let mut channels = run( self.config.clone(), diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 15ec8ba5a..eccfd0b9f 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -12,16 +12,15 @@ use malachite_app_channel::app::streaming::{StreamContent, StreamMessage}; use malachite_app_channel::app::types::codec::Codec; use malachite_app_channel::app::types::core::{CommitCertificate, Round, Validity}; use malachite_app_channel::app::types::sync::DecidedValue; -use malachite_test::{ - Address, BlockMetadata, Content, Height, ProposalPart, TestCodec, TestContext, Value, -}; +use malachite_test::codec::json::JsonCodec; +use malachite_test::{Address, BlockMetadata, Content, Height, ProposalPart, TestContext, Value}; pub fn decode_value(bytes: Bytes) -> Value { - TestCodec.decode(bytes).unwrap() + JsonCodec.decode(bytes).unwrap() } pub fn encode_value(value: &Value) -> Bytes { - TestCodec.encode(value).unwrap() + JsonCodec.encode(value).unwrap() } pub struct State { From 4d95d875076c774e2a995af90c92f077c87e7e63 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 10:04:45 +0100 Subject: [PATCH 10/19] Use Protobuf codec instead of JSON --- code/crates/sync/src/types.rs | 9 + code/crates/test/build.rs | 10 +- .../proto/{test.proto => consensus.proto} | 42 +- code/crates/test/proto/sync.proto | 87 ++++ code/crates/test/src/address.rs | 2 +- code/crates/test/src/codec/json/mod.rs | 2 +- code/crates/test/src/codec/mod.rs | 1 + code/crates/test/src/codec/proto/mod.rs | 464 ++++++++++++++++++ code/crates/test/src/height.rs | 8 +- code/crates/test/src/proposal.rs | 8 +- code/crates/test/src/proposal_part.rs | 29 +- code/crates/test/src/value.rs | 8 +- code/crates/test/src/vote.rs | 8 +- code/examples/channel/src/node.rs | 4 +- code/examples/channel/src/state.rs | 8 +- 15 files changed, 632 insertions(+), 58 deletions(-) rename code/crates/test/proto/{test.proto => consensus.proto} (57%) create mode 100644 code/crates/test/proto/sync.proto create mode 100644 code/crates/test/src/codec/proto/mod.rs diff --git a/code/crates/sync/src/types.rs b/code/crates/sync/src/types.rs index 1a99f18c8..95fc5508f 100644 --- a/code/crates/sync/src/types.rs +++ b/code/crates/sync/src/types.rs @@ -79,6 +79,15 @@ pub struct DecidedValue { pub certificate: CommitCertificate, } +impl DecidedValue { + pub fn new(value_bytes: Bytes, certificate: CommitCertificate) -> Self { + Self { + value_bytes, + certificate, + } + } +} + #[derive(Clone, Debug)] pub enum RawMessage { Request { diff --git a/code/crates/test/build.rs b/code/crates/test/build.rs index 892e2b47d..ab2ba15bf 100644 --- a/code/crates/test/build.rs +++ b/code/crates/test/build.rs @@ -1,9 +1,17 @@ use std::io::Result; fn main() -> Result<()> { + let protos = &["proto/consensus.proto", "proto/sync.proto"]; + + for proto in protos { + println!("cargo:rerun-if-changed={proto}"); + } + let mut config = prost_build::Config::new(); config.enable_type_names(); - config.compile_protos(&["proto/test.proto"], &["proto"])?; + config.bytes(["."]); + + config.compile_protos(protos, &["proto"])?; Ok(()) } diff --git a/code/crates/test/proto/test.proto b/code/crates/test/proto/consensus.proto similarity index 57% rename from code/crates/test/proto/test.proto rename to code/crates/test/proto/consensus.proto index c84cd8ce3..83e620b92 100644 --- a/code/crates/test/proto/test.proto +++ b/code/crates/test/proto/consensus.proto @@ -2,12 +2,6 @@ syntax = "proto3"; package test; -import "google/protobuf/any.proto"; - -message Height { - uint64 value = 1; -} - message Address { bytes value = 1; } @@ -27,30 +21,41 @@ enum VoteType { message Vote { VoteType vote_type = 1; - Height height = 2; + uint64 height = 2; uint32 round = 3; ValueId value = 4; Address validator_address = 5; } +message SignedMessage { + oneof message { + Proposal proposal = 1; + Vote vote = 2; + } + Signature signature = 3; +} + message Proposal { - Height height = 1; + uint64 height = 1; uint32 round = 2; Value value = 3; optional uint32 pol_round = 4; Address validator_address = 5; } +message Signature { + bytes bytes = 1; +} + message ProposalPart { - Height height = 1; + uint64 height = 1; uint32 round = 2; uint64 sequence = 3; Address validator_address = 4; - google.protobuf.Any content = 5; + Content content = 5; bool fin = 6; } - message BlockMetadata { bytes proof = 1; Value value = 2; @@ -60,3 +65,18 @@ message Content { BlockMetadata metadata = 91; } +message Extension { + bytes data = 1; + Signature signature = 2; +} + +message StreamMessage { + uint64 stream_id = 1; + uint64 sequence = 2; + oneof content { + // Serialized content. + bytes data = 3; + // Fin must be set to true. + bool fin = 4; + } +} diff --git a/code/crates/test/proto/sync.proto b/code/crates/test/proto/sync.proto new file mode 100644 index 000000000..04a31d64a --- /dev/null +++ b/code/crates/test/proto/sync.proto @@ -0,0 +1,87 @@ +syntax = "proto3"; + +import "consensus.proto"; + +package test; + +message PeerId { + bytes id = 1; +} + +message Status { + PeerId peer_id = 1; + uint64 height = 2; + uint64 earliest_height = 3; +} + +message ValueRequest { + uint64 height = 1; +} + +message ValueResponse { + uint64 height = 1; + SyncedValue value = 2; +} + +message SyncedValue { + bytes value_bytes = 1; + CommitCertificate certificate = 2; +} + +message CommitSignature { + // TODO: Add flag (no vote, nil, value?) + Address validator_address = 1; + Signature signature = 2; + optional Extension extension = 3; +} + +message AggregatedSignature { + repeated CommitSignature signatures = 1; +} + +message CommitCertificate { + uint64 height = 1; + uint32 round = 2; + ValueId value_id = 3; + AggregatedSignature aggregated_signature = 4; +} + +message ProposedValue { + uint64 height = 1; + uint32 round = 2; + optional uint32 valid_round = 3; + Address proposer = 4; + Value value = 5; + bool validity = 6; + optional Extension extension = 7; +} + +message VoteSetRequest { + uint64 height = 1; + uint32 round = 2; +} + +message VoteSetResponse { + uint64 height = 1; + uint32 round = 2; + VoteSet vote_set = 3; +} + +message VoteSet { + repeated SignedMessage signed_votes = 1; +} + +message SyncRequest { + oneof request { + ValueRequest value_request = 1; + VoteSetRequest vote_set_request = 2; + } +} + +message SyncResponse { + oneof response { + ValueResponse value_response = 1; + VoteSetResponse vote_set_response = 2; + } +} + diff --git a/code/crates/test/src/address.rs b/code/crates/test/src/address.rs index b16e2d729..5f776bcb5 100644 --- a/code/crates/test/src/address.rs +++ b/code/crates/test/src/address.rs @@ -75,7 +75,7 @@ impl Protobuf for Address { fn to_proto(&self) -> Result { Ok(proto::Address { - value: self.0.to_vec(), + value: self.0.to_vec().into(), }) } } diff --git a/code/crates/test/src/codec/json/mod.rs b/code/crates/test/src/codec/json/mod.rs index eb19934e4..7af4cb019 100644 --- a/code/crates/test/src/codec/json/mod.rs +++ b/code/crates/test/src/codec/json/mod.rs @@ -10,7 +10,7 @@ use raw::{RawRequest, RawResponse, RawSignedConsensusMsg, RawStatus, RawStreamMe use crate::{ProposalPart, TestContext, Value}; -#[derive(Clone)] +#[derive(Copy, Clone, Debug)] pub struct JsonCodec; impl Codec for JsonCodec { diff --git a/code/crates/test/src/codec/mod.rs b/code/crates/test/src/codec/mod.rs index 22fdbb38c..b59aa7f07 100644 --- a/code/crates/test/src/codec/mod.rs +++ b/code/crates/test/src/codec/mod.rs @@ -1 +1,2 @@ pub mod json; +pub mod proto; diff --git a/code/crates/test/src/codec/proto/mod.rs b/code/crates/test/src/codec/proto/mod.rs new file mode 100644 index 000000000..28a66fa23 --- /dev/null +++ b/code/crates/test/src/codec/proto/mod.rs @@ -0,0 +1,464 @@ +use bytes::Bytes; +use prost::Message; + +use malachite_app::streaming::{StreamContent, StreamMessage}; +use malachite_codec::Codec; +use malachite_consensus::SignedConsensusMsg; +use malachite_core_types::{ + AggregatedSignature, CommitCertificate, CommitSignature, Extension, Round, SignedExtension, + SignedProposal, SignedVote, VoteSet, +}; +use malachite_proto::{Error as ProtoError, Protobuf}; +use malachite_signing_ed25519::Signature; +use malachite_sync::{self as sync, PeerId}; + +use crate::proto; +use crate::{Address, Height, Proposal, ProposalPart, TestContext, Value, ValueId, Vote}; + +#[derive(Copy, Clone, Debug)] +pub struct ProtobufCodec; + +impl Codec for ProtobufCodec { + type Error = ProtoError; + + fn decode(&self, bytes: Bytes) -> Result { + Protobuf::from_bytes(&bytes) + } + + fn encode(&self, msg: &Value) -> Result { + Protobuf::to_bytes(msg) + } +} + +impl Codec for ProtobufCodec { + type Error = ProtoError; + + fn decode(&self, bytes: Bytes) -> Result { + Protobuf::from_bytes(&bytes) + } + + fn encode(&self, msg: &ProposalPart) -> Result { + Protobuf::to_bytes(msg) + } +} + +impl Codec> for ProtobufCodec { + type Error = ProtoError; + + fn decode(&self, bytes: Bytes) -> Result, Self::Error> { + let proto = proto::SignedMessage::decode(bytes.as_ref())?; + + let signature = proto + .signature + .ok_or_else(|| ProtoError::missing_field::("signature")) + .and_then(decode_signature)?; + + let proto_message = proto + .message + .ok_or_else(|| ProtoError::missing_field::("message"))?; + + match proto_message { + proto::signed_message::Message::Proposal(proto) => { + let proposal = Proposal::from_proto(proto)?; + Ok(SignedConsensusMsg::Proposal(SignedProposal::new( + proposal, signature, + ))) + } + proto::signed_message::Message::Vote(vote) => { + let vote = Vote::from_proto(vote)?; + Ok(SignedConsensusMsg::Vote(SignedVote::new(vote, signature))) + } + } + } + + fn encode(&self, msg: &SignedConsensusMsg) -> Result { + match msg { + SignedConsensusMsg::Vote(vote) => { + let proto = proto::SignedMessage { + message: Some(proto::signed_message::Message::Vote( + vote.message.to_proto()?, + )), + signature: Some(encode_signature(&vote.signature)), + }; + Ok(Bytes::from(proto.encode_to_vec())) + } + SignedConsensusMsg::Proposal(proposal) => { + let proto = proto::SignedMessage { + message: Some(proto::signed_message::Message::Proposal( + proposal.message.to_proto()?, + )), + signature: Some(encode_signature(&proposal.signature)), + }; + Ok(Bytes::from(proto.encode_to_vec())) + } + } + } +} + +impl Codec> for ProtobufCodec { + type Error = ProtoError; + + fn decode(&self, bytes: Bytes) -> Result, Self::Error> { + let proto = proto::StreamMessage::decode(bytes.as_ref())?; + + let proto_content = proto + .content + .ok_or_else(|| ProtoError::missing_field::("content"))?; + + let content = match proto_content { + proto::stream_message::Content::Data(data) => { + StreamContent::Data(ProposalPart::from_bytes(&data)?) + } + proto::stream_message::Content::Fin(end) => StreamContent::Fin(end), + }; + + Ok(StreamMessage { + stream_id: proto.stream_id, + sequence: proto.sequence, + content, + }) + } + + fn encode(&self, msg: &StreamMessage) -> Result { + let proto = proto::StreamMessage { + stream_id: msg.stream_id, + sequence: msg.sequence, + content: match &msg.content { + StreamContent::Data(data) => { + Some(proto::stream_message::Content::Data(data.to_bytes())) + } + StreamContent::Fin(end) => Some(proto::stream_message::Content::Fin(*end)), + }, + }; + + Ok(Bytes::from(proto.encode_to_vec())) + } +} + +impl Codec> for ProtobufCodec { + type Error = ProtoError; + + fn decode(&self, bytes: Bytes) -> Result, Self::Error> { + let proto = proto::Status::decode(bytes.as_ref())?; + + let proto_peer_id = proto + .peer_id + .ok_or_else(|| ProtoError::missing_field::("peer_id"))?; + + Ok(sync::Status { + peer_id: PeerId::from_bytes(proto_peer_id.id.as_ref()).unwrap(), + height: Height::new(proto.height), + history_min_height: Height::new(proto.earliest_height), + }) + } + + fn encode(&self, msg: &sync::Status) -> Result { + let proto = proto::Status { + peer_id: Some(proto::PeerId { + id: Bytes::from(msg.peer_id.to_bytes()), + }), + height: msg.height.as_u64(), + earliest_height: msg.history_min_height.as_u64(), + }; + + Ok(Bytes::from(proto.encode_to_vec())) + } +} + +impl Codec> for ProtobufCodec { + type Error = ProtoError; + + fn decode(&self, bytes: Bytes) -> Result, Self::Error> { + let proto = proto::SyncRequest::decode(bytes.as_ref())?; + let request = proto + .request + .ok_or_else(|| ProtoError::missing_field::("request"))?; + + match request { + proto::sync_request::Request::ValueRequest(req) => Ok(sync::Request::ValueRequest( + sync::ValueRequest::new(Height::new(req.height)), + )), + proto::sync_request::Request::VoteSetRequest(req) => Ok(sync::Request::VoteSetRequest( + sync::VoteSetRequest::new(Height::new(req.height), Round::new(req.round)), + )), + } + } + + fn encode(&self, msg: &sync::Request) -> Result { + let proto = match msg { + sync::Request::ValueRequest(req) => proto::SyncRequest { + request: Some(proto::sync_request::Request::ValueRequest( + proto::ValueRequest { + height: req.height.as_u64(), + }, + )), + }, + sync::Request::VoteSetRequest(req) => proto::SyncRequest { + request: Some(proto::sync_request::Request::VoteSetRequest( + proto::VoteSetRequest { + height: req.height.as_u64(), + round: req.round.as_u32().unwrap(), + }, + )), + }, + }; + + Ok(Bytes::from(proto.encode_to_vec())) + } +} + +impl Codec> for ProtobufCodec { + type Error = ProtoError; + + fn decode(&self, bytes: Bytes) -> Result, Self::Error> { + decode_sync_response(proto::SyncResponse::decode(bytes)?) + } + + fn encode(&self, response: &sync::Response) -> Result { + encode_sync_response(response).map(|proto| proto.encode_to_vec().into()) + } +} + +fn decode_sync_response( + proto_response: proto::SyncResponse, +) -> Result, ProtoError> { + let response = proto_response + .response + .ok_or_else(|| ProtoError::missing_field::("messages"))?; + + let response = match response { + proto::sync_response::Response::ValueResponse(value_response) => { + sync::Response::ValueResponse(sync::ValueResponse::new( + Height::new(value_response.height), + value_response.value.map(decode_synced_value).transpose()?, + )) + } + proto::sync_response::Response::VoteSetResponse(vote_set_response) => { + let height = Height::new(vote_set_response.height); + let round = Round::new(vote_set_response.round); + let vote_set = vote_set_response + .vote_set + .ok_or_else(|| ProtoError::missing_field::("vote_set"))?; + + sync::Response::VoteSetResponse(sync::VoteSetResponse::new( + height, + round, + decode_vote_set(vote_set)?, + )) + } + }; + Ok(response) +} + +fn encode_sync_response( + response: &sync::Response, +) -> Result { + let proto = match response { + sync::Response::ValueResponse(value_response) => proto::SyncResponse { + response: Some(proto::sync_response::Response::ValueResponse( + proto::ValueResponse { + height: value_response.height.as_u64(), + value: value_response + .value + .as_ref() + .map(encode_synced_value) + .transpose()?, + }, + )), + }, + sync::Response::VoteSetResponse(vote_set_response) => proto::SyncResponse { + response: Some(proto::sync_response::Response::VoteSetResponse( + proto::VoteSetResponse { + height: vote_set_response.height.as_u64(), + round: vote_set_response + .round + .as_u32() + .expect("round should not be nil"), + vote_set: Some(encode_vote_set(&vote_set_response.vote_set)?), + }, + )), + }, + }; + + Ok(proto) +} + +fn encode_synced_value( + synced_value: &sync::DecidedValue, +) -> Result { + Ok(proto::SyncedValue { + value_bytes: synced_value.value_bytes.clone(), + certificate: Some(encode_certificate(&synced_value.certificate)?), + }) +} + +fn decode_synced_value( + proto: proto::SyncedValue, +) -> Result, ProtoError> { + let certificate = proto + .certificate + .ok_or_else(|| ProtoError::missing_field::("certificate"))?; + + Ok(sync::DecidedValue { + value_bytes: proto.value_bytes, + certificate: decode_certificate(certificate)?, + }) +} + +fn decode_certificate( + certificate: proto::CommitCertificate, +) -> Result, ProtoError> { + let value_id = certificate + .value_id + .ok_or_else(|| ProtoError::missing_field::("value_id")) + .and_then(ValueId::from_proto)?; + + let aggregated_signature = certificate + .aggregated_signature + .ok_or_else(|| { + ProtoError::missing_field::("aggregated_signature") + }) + .and_then(decode_aggregated_signature)?; + + let certificate = CommitCertificate { + height: Height::new(certificate.height), + round: Round::new(certificate.round), + value_id, + aggregated_signature, + }; + + Ok(certificate) +} + +fn encode_certificate( + certificate: &CommitCertificate, +) -> Result { + Ok(proto::CommitCertificate { + height: certificate.height.as_u64(), + round: certificate.round.as_u32().expect("round should not be nil"), + value_id: Some(certificate.value_id.to_proto()?), + aggregated_signature: Some(encode_aggregate_signature( + &certificate.aggregated_signature, + )?), + }) +} + +fn decode_aggregated_signature( + signature: proto::AggregatedSignature, +) -> Result, ProtoError> { + let signatures = signature + .signatures + .into_iter() + .map(|s| { + let signature = s + .signature + .ok_or_else(|| ProtoError::missing_field::("signature")) + .and_then(decode_signature)?; + + let address = s + .validator_address + .ok_or_else(|| { + ProtoError::missing_field::("validator_address") + }) + .and_then(Address::from_proto)?; + + let extension = s.extension.map(decode_extension).transpose()?; + + Ok(CommitSignature { + address, + signature, + extension, + }) + }) + .collect::, ProtoError>>()?; + + Ok(AggregatedSignature { signatures }) +} + +fn encode_aggregate_signature( + aggregated_signature: &AggregatedSignature, +) -> Result { + let signatures = aggregated_signature + .signatures + .iter() + .map(|s| { + Ok(proto::CommitSignature { + validator_address: Some(s.address.to_proto()?), + signature: Some(encode_signature(&s.signature)), + extension: s.extension.as_ref().map(encode_extension).transpose()?, + }) + }) + .collect::>()?; + + Ok(proto::AggregatedSignature { signatures }) +} + +fn decode_extension(ext: proto::Extension) -> Result, ProtoError> { + let extension = Extension::from(ext.data); + let signature = ext + .signature + .ok_or_else(|| ProtoError::missing_field::("signature")) + .and_then(decode_signature)?; + + Ok(SignedExtension::new(extension, signature)) +} + +fn encode_extension(ext: &SignedExtension) -> Result { + Ok(proto::Extension { + data: ext.message.data.clone(), + signature: Some(encode_signature(&ext.signature)), + }) +} + +fn encode_vote_set(vote_set: &VoteSet) -> Result { + Ok(proto::VoteSet { + signed_votes: vote_set + .votes + .iter() + .map(encode_vote) + .collect::, _>>()?, + }) +} + +fn encode_vote(vote: &SignedVote) -> Result { + Ok(proto::SignedMessage { + message: Some(proto::signed_message::Message::Vote( + vote.message.to_proto()?, + )), + signature: Some(encode_signature(&vote.signature)), + }) +} + +fn decode_vote_set(vote_set: proto::VoteSet) -> Result, ProtoError> { + Ok(VoteSet { + votes: vote_set + .signed_votes + .into_iter() + .filter_map(decode_vote) + .collect(), + }) +} + +fn decode_vote(msg: proto::SignedMessage) -> Option> { + let signature = msg.signature?; + let vote = match msg.message { + Some(proto::signed_message::Message::Vote(v)) => Some(v), + _ => None, + }?; + + let signature = decode_signature(signature).ok()?; + let vote = Vote::from_proto(vote).ok()?; + Some(SignedVote::new(vote, signature)) +} + +fn encode_signature(signature: &Signature) -> proto::Signature { + proto::Signature { + bytes: Bytes::copy_from_slice(signature.to_bytes().as_ref()), + } +} + +fn decode_signature(signature: proto::Signature) -> Result { + let bytes = <[u8; 64]>::try_from(signature.bytes.as_ref()) + .map_err(|_| ProtoError::Other("Invalid signature length".to_string()))?; + Ok(Signature::from_bytes(bytes)) +} diff --git a/code/crates/test/src/height.rs b/code/crates/test/src/height.rs index b940698d5..066cbc903 100644 --- a/code/crates/test/src/height.rs +++ b/code/crates/test/src/height.rs @@ -2,8 +2,6 @@ use core::fmt; use malachite_proto::{Error as ProtoError, Protobuf}; use serde::{Deserialize, Serialize}; -use crate::proto; - /// A blockchain height #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct Height(u64); @@ -59,13 +57,13 @@ impl malachite_core_types::Height for Height { } impl Protobuf for Height { - type Proto = proto::Height; + type Proto = u64; fn from_proto(proto: Self::Proto) -> Result { - Ok(Self(proto.value)) + Ok(Self(proto)) } fn to_proto(&self) -> Result { - Ok(proto::Height { value: self.0 }) + Ok(self.0) } } diff --git a/code/crates/test/src/proposal.rs b/code/crates/test/src/proposal.rs index e085301b9..92e2129c7 100644 --- a/code/crates/test/src/proposal.rs +++ b/code/crates/test/src/proposal.rs @@ -68,7 +68,7 @@ impl Protobuf for Proposal { #[cfg_attr(coverage_nightly, coverage(off))] fn to_proto(&self) -> Result { Ok(Self::Proto { - height: Some(self.height.to_proto()?), + height: self.height.to_proto()?, round: self.round.as_u32().expect("round should not be nil"), value: Some(self.value.to_proto()?), pol_round: self.pol_round.as_u32(), @@ -79,11 +79,7 @@ impl Protobuf for Proposal { #[cfg_attr(coverage_nightly, coverage(off))] fn from_proto(proto: Self::Proto) -> Result { Ok(Self { - height: Height::from_proto( - proto - .height - .ok_or_else(|| ProtoError::missing_field::("height"))?, - )?, + height: Height::from_proto(proto.height)?, round: Round::new(proto.round), value: Value::from_proto( proto diff --git a/code/crates/test/src/proposal_part.rs b/code/crates/test/src/proposal_part.rs index 3f40e814d..d65d1e360 100644 --- a/code/crates/test/src/proposal_part.rs +++ b/code/crates/test/src/proposal_part.rs @@ -1,20 +1,19 @@ -use std::sync::Arc; - use bytes::Bytes; +use serde::{Deserialize, Serialize}; + use malachite_core_types::Round; use malachite_proto::{Error as ProtoError, Protobuf}; -use serde::{Deserialize, Serialize}; use crate::{Address, Height, TestContext, Value}; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct BlockMetadata { - proof: Vec, + proof: Bytes, value: Value, } impl BlockMetadata { - pub fn new(proof: Vec, value: Value) -> Self { + pub fn new(proof: Bytes, value: Value) -> Self { Self { proof, value } } @@ -111,7 +110,7 @@ pub struct ProposalPart { #[serde(with = "RoundDef")] pub round: Round, pub sequence: u64, - pub content: Arc, + pub content: Content, pub validator_address: Address, pub fin: bool, } @@ -129,7 +128,7 @@ impl ProposalPart { height, round, sequence, - content: Arc::new(content), + content, validator_address, fin, } @@ -164,18 +163,14 @@ impl Protobuf for ProposalPart { #[cfg_attr(coverage_nightly, coverage(off))] fn from_proto(proto: Self::Proto) -> Result { Ok(Self { - height: Height::from_proto( - proto - .height - .ok_or_else(|| ProtoError::missing_field::("height"))?, - )?, + height: Height::from_proto(proto.height)?, round: Round::new(proto.round), sequence: proto.sequence, - content: Arc::new(Content::from_any( - &proto + content: Content::from_proto( + proto .content .ok_or_else(|| ProtoError::missing_field::("content"))?, - )?), + )?, validator_address: Address::from_proto( proto .validator_address @@ -188,10 +183,10 @@ impl Protobuf for ProposalPart { #[cfg_attr(coverage_nightly, coverage(off))] fn to_proto(&self) -> Result { Ok(crate::proto::ProposalPart { - height: Some(self.height.to_proto()?), + height: self.height.to_proto()?, round: self.round.as_u32().expect("round should not be nil"), sequence: self.sequence, - content: Some(self.content.to_any()?), + content: Some(self.content.to_proto()?), validator_address: Some(self.validator_address.to_proto()?), fin: self.fin, }) diff --git a/code/crates/test/src/value.rs b/code/crates/test/src/value.rs index 2b0078774..0f7094985 100644 --- a/code/crates/test/src/value.rs +++ b/code/crates/test/src/value.rs @@ -39,7 +39,7 @@ impl Protobuf for ValueId { .ok_or_else(|| ProtoError::missing_field::("value"))?; let len = bytes.len(); - let bytes = <[u8; 8]>::try_from(bytes).map_err(|_| { + let bytes = <[u8; 8]>::try_from(bytes.as_ref()).map_err(|_| { ProtoError::Other(format!( "Invalid value length, got {len} bytes expected {}", u64::BITS / 8 @@ -52,7 +52,7 @@ impl Protobuf for ValueId { #[cfg_attr(coverage_nightly, coverage(off))] fn to_proto(&self) -> Result { Ok(proto::ValueId { - value: Some(self.0.to_be_bytes().to_vec()), + value: Some(self.0.to_be_bytes().to_vec().into()), }) } } @@ -97,7 +97,7 @@ impl Protobuf for Value { .ok_or_else(|| ProtoError::missing_field::("value"))?; let len = bytes.len(); - let bytes = <[u8; 8]>::try_from(bytes).map_err(|_| { + let bytes = <[u8; 8]>::try_from(bytes.as_ref()).map_err(|_| { ProtoError::Other(format!( "Invalid value length, got {len} bytes expected {}", u64::BITS / 8 @@ -110,7 +110,7 @@ impl Protobuf for Value { #[cfg_attr(coverage_nightly, coverage(off))] fn to_proto(&self) -> Result { Ok(proto::Value { - value: Some(self.0.to_be_bytes().to_vec()), + value: Some(self.0.to_be_bytes().to_vec().into()), }) } } diff --git a/code/crates/test/src/vote.rs b/code/crates/test/src/vote.rs index bddeca694..060263e59 100644 --- a/code/crates/test/src/vote.rs +++ b/code/crates/test/src/vote.rs @@ -100,11 +100,7 @@ impl Protobuf for Vote { fn from_proto(proto: Self::Proto) -> Result { Ok(Self { typ: decode_votetype(proto.vote_type()), - height: Height::from_proto( - proto - .height - .ok_or_else(|| ProtoError::missing_field::("height"))?, - )?, + height: Height::from_proto(proto.height)?, round: Round::new(proto.round), value: match proto.value { Some(value) => NilOrVal::Val(ValueId::from_proto(value)?), @@ -123,7 +119,7 @@ impl Protobuf for Vote { fn to_proto(&self) -> Result { Ok(Self::Proto { vote_type: encode_votetype(self.typ).into(), - height: Some(self.height.to_proto()?), + height: self.height.to_proto()?, round: self.round.as_u32().expect("round should not be nil"), value: match &self.value { NilOrVal::Nil => None, diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index aaddbb74b..7a2adfdf3 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -14,7 +14,7 @@ use malachite_app_channel::app::types::core::{Round, Validity, VotingPower}; use malachite_app_channel::app::types::LocallyProposedValue; use malachite_app_channel::app::Node; use malachite_app_channel::{run, AppMsg, ConsensusGossipMsg, ConsensusMsg}; -use malachite_test::codec::json::JsonCodec; +use malachite_test::codec::proto::ProtobufCodec; use malachite_test::{ Address, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, ValidatorSet, }; @@ -104,7 +104,7 @@ impl Node for App { let start_height = self.start_height.map(Height::new); - let codec = JsonCodec; + let codec = ProtobufCodec; let mut channels = run( self.config.clone(), diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index eccfd0b9f..8e7f4bdaf 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -12,15 +12,15 @@ use malachite_app_channel::app::streaming::{StreamContent, StreamMessage}; use malachite_app_channel::app::types::codec::Codec; use malachite_app_channel::app::types::core::{CommitCertificate, Round, Validity}; use malachite_app_channel::app::types::sync::DecidedValue; -use malachite_test::codec::json::JsonCodec; +use malachite_test::codec::proto::ProtobufCodec; use malachite_test::{Address, BlockMetadata, Content, Height, ProposalPart, TestContext, Value}; pub fn decode_value(bytes: Bytes) -> Value { - JsonCodec.decode(bytes).unwrap() + ProtobufCodec.decode(bytes).unwrap() } pub fn encode_value(value: &Value) -> Bytes { - JsonCodec.encode(value).unwrap() + ProtobufCodec.encode(value).unwrap() } pub struct State { @@ -172,7 +172,7 @@ impl State { ] .concat(); - let content = Content::new(&BlockMetadata::new(fake_proof, value.value)); + let content = Content::new(&BlockMetadata::new(fake_proof.into(), value.value)); let proposal_part = ProposalPart::new( self.current_height, From 530694bc4d7f4b29e7d499d23103c197bb0de209 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 10:05:30 +0100 Subject: [PATCH 11/19] Disable JSON codec for now --- code/crates/test/src/codec/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/crates/test/src/codec/mod.rs b/code/crates/test/src/codec/mod.rs index b59aa7f07..da95c4b14 100644 --- a/code/crates/test/src/codec/mod.rs +++ b/code/crates/test/src/codec/mod.rs @@ -1,2 +1,2 @@ -pub mod json; +// pub mod json; pub mod proto; From 3a89be33dd67d99eff1265a0b66357b92d067f64 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 10:18:15 +0100 Subject: [PATCH 12/19] Cleanup --- code/crates/app-channel/src/run.rs | 13 +++++------ code/crates/app/src/node.rs | 2 +- code/crates/starknet/app/src/main.rs | 2 +- code/crates/starknet/host/src/node.rs | 2 +- code/crates/test/cli/src/cmd/start.rs | 10 ++------- code/crates/test/src/node.rs | 2 +- code/examples/channel/src/main.rs | 32 +++++++++++++-------------- code/examples/channel/src/node.rs | 22 +++++++++--------- 8 files changed, 39 insertions(+), 46 deletions(-) diff --git a/code/crates/app-channel/src/run.rs b/code/crates/app-channel/src/run.rs index 9c7432c0f..5762fccec 100644 --- a/code/crates/app-channel/src/run.rs +++ b/code/crates/app-channel/src/run.rs @@ -1,6 +1,8 @@ //! Run Malachite consensus with the given configuration and context. //! Provides the application with a channel for receiving messages from consensus. +use std::path::PathBuf; + use eyre::Result; use crate::app::types::codec::{ConsensusCodec, SyncCodec, WalCodec}; @@ -15,11 +17,12 @@ use malachite_engine::util::events::TxEvent; #[tracing::instrument("node", skip_all, fields(moniker = %cfg.moniker))] pub async fn run( - cfg: NodeConfig, - start_height: Option, ctx: Ctx, codec: Codec, node: Node, + cfg: NodeConfig, + private_key_file: PathBuf, + start_height: Option, initial_validator_set: Ctx::ValidatorSet, ) -> Result> where @@ -34,11 +37,7 @@ where let registry = SharedRegistry::global().with_moniker(cfg.moniker.as_str()); let metrics = Metrics::register(®istry); - // TODO: Simplify this? - let mut config_file = node.get_home_dir().clone(); - config_file.push("config"); - config_file.push("priv_validator_key.json"); - let private_key_file = node.load_private_key_file(config_file)?; + let private_key_file = node.load_private_key_file(private_key_file)?; let private_key = node.load_private_key(private_key_file); let public_key = node.get_public_key(&private_key); let address = node.get_address(&public_key); diff --git a/code/crates/app/src/node.rs b/code/crates/app/src/node.rs index 4532a7fa1..8c359ccf7 100644 --- a/code/crates/app/src/node.rs +++ b/code/crates/app/src/node.rs @@ -41,5 +41,5 @@ pub trait Node { validators: Vec<(PublicKey, VotingPower)>, ) -> Self::Genesis; - async fn run(&self) -> eyre::Result<()>; + async fn run(self) -> eyre::Result<()>; } diff --git a/code/crates/starknet/app/src/main.rs b/code/crates/starknet/app/src/main.rs index bdd1f4dc6..7fd8b8b71 100644 --- a/code/crates/starknet/app/src/main.rs +++ b/code/crates/starknet/app/src/main.rs @@ -65,7 +65,7 @@ pub fn main() -> color_eyre::Result<()> { trace!(?config, "Configuration"); // Redefine the node with the valid configuration. - let node = &StarknetNode { + let node = StarknetNode { home_dir: args.get_home_dir().unwrap(), config, genesis_file: args.get_genesis_file_path().unwrap(), diff --git a/code/crates/starknet/host/src/node.rs b/code/crates/starknet/host/src/node.rs index 23bd0fa52..32ec54ff6 100644 --- a/code/crates/starknet/host/src/node.rs +++ b/code/crates/starknet/host/src/node.rs @@ -113,7 +113,7 @@ impl Node for StarknetNode { Genesis { validator_set } } - async fn run(&self) -> eyre::Result<()> { + async fn run(self) -> eyre::Result<()> { let span = tracing::error_span!("node", moniker = %self.config.moniker); let _enter = span.enter(); diff --git a/code/crates/test/cli/src/cmd/start.rs b/code/crates/test/cli/src/cmd/start.rs index 701037281..8ccec3874 100644 --- a/code/crates/test/cli/src/cmd/start.rs +++ b/code/crates/test/cli/src/cmd/start.rs @@ -14,10 +14,7 @@ pub struct StartCmd { } impl StartCmd { - pub async fn run(&self, node: &N, metrics: Option) -> eyre::Result<()> - where - N: Node, - { + pub async fn run(&self, node: impl Node, metrics: Option) -> eyre::Result<()> { info!("Node is starting..."); start(node, metrics).await?; @@ -29,10 +26,7 @@ impl StartCmd { } /// start command to run a node. -pub async fn start(node: &N, metrics: Option) -> eyre::Result<()> -where - N: Node, -{ +pub async fn start(node: impl Node, metrics: Option) -> eyre::Result<()> { // Enable Prometheus if let Some(metrics) = metrics { tokio::spawn(metrics::serve(metrics.clone())); diff --git a/code/crates/test/src/node.rs b/code/crates/test/src/node.rs index 33c6aecf4..bdc2ca121 100644 --- a/code/crates/test/src/node.rs +++ b/code/crates/test/src/node.rs @@ -79,7 +79,7 @@ impl Node for TestNode { Genesis { validator_set } } - async fn run(&self) -> eyre::Result<()> { + async fn run(self) -> eyre::Result<()> { unimplemented!() } } diff --git a/code/examples/channel/src/main.rs b/code/examples/channel/src/main.rs index 24172c2a7..301827d62 100644 --- a/code/examples/channel/src/main.rs +++ b/code/examples/channel/src/main.rs @@ -22,6 +22,7 @@ fn main() -> color_eyre::Result<()> { let opt_config_file_path = args .get_config_file_path() .map_err(|error| eyre!("Failed to get configuration file path: {:?}", error)); + let opt_config = opt_config_file_path.and_then(|path| { load_config(&path, None) .map_err(|error| eyre!("Failed to load configuration file: {:?}", error)) @@ -43,7 +44,7 @@ fn main() -> color_eyre::Result<()> { trace!("Command-line parameters: {args:?}"); // Create the application object. - let node = &App { + let mut node = App { home_dir: args.get_home_dir()?, config: Default::default(), // placeholder, because `init` and `testnet` has no valid configuration file. genesis_file: args.get_genesis_file_path()?, @@ -58,28 +59,22 @@ fn main() -> color_eyre::Result<()> { let mut config = opt_config .map_err(|error| error!(%error, "Failed to load configuration.")) .unwrap(); + config.logging = logging; + let runtime = config.runtime; - let metrics = if config.metrics.enabled { - Some(config.metrics.clone()) - } else { - None - }; + + let metrics = config.metrics.enabled.then(|| config.metrics.clone()); info!( file = %args.get_config_file_path().unwrap_or_default().display(), "Loaded configuration", ); + trace!(?config, "Configuration"); - // Redefine the node with the valid configuration. - let node = &App { - home_dir: args.get_home_dir()?, - config, - genesis_file: args.get_genesis_file_path()?, - private_key_file: args.get_priv_validator_key_file_path()?, - start_height: cmd.start_height, - }; + // Set the config + node.config = config; // Define the runtime. If you are not interested in a custom runtime configuration, // you can use the #[async_trait] attribute on the main function. @@ -87,20 +82,23 @@ fn main() -> color_eyre::Result<()> { rt.block_on(cmd.run(node, metrics)) .map_err(|error| eyre!("Failed to run start command {:?}", error)) } + Commands::Init(cmd) => cmd .run( - node, + &node, &args.get_config_file_path()?, &args.get_genesis_file_path()?, &args.get_priv_validator_key_file_path()?, logging, ) .map_err(|error| eyre!("Failed to run init command {:?}", error)), + Commands::Testnet(cmd) => cmd - .run(node, &args.get_home_dir()?, logging) + .run(&node, &args.get_home_dir()?, logging) .map_err(|error| eyre!("Failed to run testnet command {:?}", error)), + Commands::DistributedTestnet(cmd) => cmd - .run(node, &args.get_home_dir()?, logging) + .run(&node, &args.get_home_dir()?, logging) .map_err(|error| eyre!("Failed to run distributed testnet command {:?}", error)), } } diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index 7a2adfdf3..a0d733efc 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -13,7 +13,7 @@ use malachite_app_channel::app::consensus::ProposedValue; use malachite_app_channel::app::types::core::{Round, Validity, VotingPower}; use malachite_app_channel::app::types::LocallyProposedValue; use malachite_app_channel::app::Node; -use malachite_app_channel::{run, AppMsg, ConsensusGossipMsg, ConsensusMsg}; +use malachite_app_channel::{AppMsg, ConsensusGossipMsg, ConsensusMsg}; use malachite_test::codec::proto::ProtobufCodec; use malachite_test::{ Address, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, ValidatorSet, @@ -91,28 +91,30 @@ impl Node for App { Genesis { validator_set } } - async fn run(&self) -> eyre::Result<()> { + async fn run(self) -> eyre::Result<()> { let span = tracing::error_span!("node", moniker = %self.config.moniker); let _enter = span.enter(); - let priv_key_file = self.load_private_key_file(self.private_key_file.clone())?; - let private_key = self.load_private_key(priv_key_file); - let address = self.get_address(&self.get_public_key(&private_key)); + let private_key_file = self.load_private_key_file(&self.private_key_file)?; + let private_key = self.load_private_key(private_key_file); + let public_key = self.get_public_key(&private_key); + let address = self.get_address(&public_key); let ctx = TestContext::new(private_key); let genesis = self.load_genesis(self.genesis_file.clone())?; - + let initial_validator_set = genesis.validator_set.clone(); let start_height = self.start_height.map(Height::new); let codec = ProtobufCodec; - let mut channels = run( - self.config.clone(), - start_height, + let mut channels = malachite_app_channel::run( ctx, codec, self.clone(), - genesis.validator_set.clone(), + self.config.clone(), + self.private_key_file.clone(), + start_height, + initial_validator_set, ) .await?; From f3bb926569e404fece9a25c00a3b9d5fed77d243 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 10:23:21 +0100 Subject: [PATCH 13/19] Rename and cleanup --- code/crates/app-channel/src/channel.rs | 13 ++++++++-- code/crates/app-channel/src/lib.rs | 2 +- code/crates/app-channel/src/run.rs | 8 +++--- code/crates/app-channel/src/spawn.rs | 34 +++++++++++++------------- code/examples/channel/src/node.rs | 6 ++--- 5 files changed, 36 insertions(+), 27 deletions(-) diff --git a/code/crates/app-channel/src/channel.rs b/code/crates/app-channel/src/channel.rs index 23bdf1371..9ab57be0c 100644 --- a/code/crates/app-channel/src/channel.rs +++ b/code/crates/app-channel/src/channel.rs @@ -2,6 +2,7 @@ use std::time::Duration; use bytes::Bytes; use derive_where::derive_where; +use malachite_engine::network; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -15,7 +16,7 @@ use malachite_engine::consensus::Msg as ConsensusActorMsg; /// Channels created for application consumption pub struct Channels { pub consensus: mpsc::Receiver>, - pub consensus_gossip: mpsc::Sender>, + pub network: mpsc::Sender>, } /// Messages sent from consensus to the application. @@ -109,6 +110,14 @@ impl From> for ConsensusActorMsg { /// Messages sent from the application to consensus gossip. #[derive_where(Debug)] -pub enum ConsensusGossipMsg { +pub enum NetworkMsg { PublishProposalPart(StreamMessage), } + +impl From> for network::NetworkMsg { + fn from(msg: NetworkMsg) -> network::NetworkMsg { + match msg { + NetworkMsg::PublishProposalPart(part) => network::NetworkMsg::PublishProposalPart(part), + } + } +} diff --git a/code/crates/app-channel/src/lib.rs b/code/crates/app-channel/src/lib.rs index 0549570d6..9aaadad78 100644 --- a/code/crates/app-channel/src/lib.rs +++ b/code/crates/app-channel/src/lib.rs @@ -14,7 +14,7 @@ pub mod connector; pub mod spawn; mod channel; -pub use channel::{AppMsg, Channels, ConsensusGossipMsg, ConsensusMsg}; +pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg}; mod run; pub use run::run; diff --git a/code/crates/app-channel/src/run.rs b/code/crates/app-channel/src/run.rs index 5762fccec..73e912f8a 100644 --- a/code/crates/app-channel/src/run.rs +++ b/code/crates/app-channel/src/run.rs @@ -44,13 +44,13 @@ where let keypair = node.get_keypair(private_key); // Spawn consensus gossip - let (network, network_msg_tx) = + let (network, network_tx) = spawn_network_actor(&cfg, keypair, ®istry, codec.clone()).await?; let wal = spawn_wal_actor(&ctx, codec, &node.get_home_dir(), ®istry).await?; // Spawn the host actor - let (connector, appmsg_rx) = spawn_host_actor(metrics.clone()).await?; + let (connector, consensus_rx) = spawn_host_actor(metrics.clone()).await?; let sync = spawn_sync_actor( ctx.clone(), @@ -78,7 +78,7 @@ where .await?; Ok(Channels { - consensus: appmsg_rx, - consensus_gossip: network_msg_tx, + consensus: consensus_rx, + network: network_tx, }) } diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs index 98661aaff..10787a8f2 100644 --- a/code/crates/app-channel/src/spawn.rs +++ b/code/crates/app-channel/src/spawn.rs @@ -3,18 +3,18 @@ use eyre::Result; use tokio::sync::mpsc; -use crate::app::types::core::Context; -use crate::app::types::metrics::Metrics; -use crate::connector::Connector; -use crate::{AppMsg, ConsensusGossipMsg}; - use malachite_app::types::{metrics::SharedRegistry, Keypair}; use malachite_config::Config as NodeConfig; use malachite_engine::consensus::ConsensusCodec; use malachite_engine::host::HostRef; -use malachite_engine::network::{NetworkMsg, NetworkRef}; +use malachite_engine::network::NetworkRef; use malachite_engine::sync::SyncCodec; +use crate::app::types::core::Context; +use crate::app::types::metrics::Metrics; +use crate::connector::Connector; +use crate::{AppMsg, NetworkMsg}; + pub async fn spawn_host_actor( metrics: Metrics, ) -> Result<(HostRef, mpsc::Receiver>)> @@ -31,26 +31,26 @@ pub async fn spawn_network_actor( keypair: Keypair, registry: &SharedRegistry, codec: Codec, -) -> Result<(NetworkRef, mpsc::Sender>)> +) -> Result<(NetworkRef, mpsc::Sender>)> where Ctx: Context, Codec: ConsensusCodec, Codec: SyncCodec, { - let (tx, mut rx) = mpsc::channel(1); + let (tx, mut rx) = mpsc::channel::>(1); let actor_ref = malachite_app::spawn_network_actor(cfg, keypair, registry, codec).await?; - let actor_ref_return = actor_ref.clone(); - - tokio::spawn(async move { - while let Some(msg) = rx.recv().await { - match msg { - ConsensusGossipMsg::PublishProposalPart(ppp) => actor_ref - .cast(NetworkMsg::PublishProposalPart(ppp)) - .unwrap(), + + tokio::spawn({ + let actor_ref = actor_ref.clone(); + async move { + while let Some(msg) = rx.recv().await { + if let Err(e) = actor_ref.cast(msg.into()) { + tracing::error!("Failed to send message to network actor: {e}"); + } } } }); - Ok((actor_ref_return, tx)) + Ok((actor_ref, tx)) } diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index a0d733efc..eb4965763 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -13,7 +13,7 @@ use malachite_app_channel::app::consensus::ProposedValue; use malachite_app_channel::app::types::core::{Round, Validity, VotingPower}; use malachite_app_channel::app::types::LocallyProposedValue; use malachite_app_channel::app::Node; -use malachite_app_channel::{AppMsg, ConsensusGossipMsg, ConsensusMsg}; +use malachite_app_channel::{AppMsg, ConsensusMsg, NetworkMsg}; use malachite_test::codec::proto::ProtobufCodec; use malachite_test::{ Address, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, ValidatorSet, @@ -170,8 +170,8 @@ impl Node for App { // Broadcast it to others. Old messages need not be broadcast. channels - .consensus_gossip - .send(ConsensusGossipMsg::PublishProposalPart(stream_message)) + .network + .send(NetworkMsg::PublishProposalPart(stream_message)) .await?; } From 32a85e97529c40a18c9fd84680db5f864b6cda0a Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 11:07:26 +0100 Subject: [PATCH 14/19] Cleanup --- code/crates/app-channel/src/channel.rs | 12 ++++++------ code/crates/app-channel/src/spawn.rs | 3 ++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/code/crates/app-channel/src/channel.rs b/code/crates/app-channel/src/channel.rs index 9ab57be0c..f972949db 100644 --- a/code/crates/app-channel/src/channel.rs +++ b/code/crates/app-channel/src/channel.rs @@ -2,17 +2,17 @@ use std::time::Duration; use bytes::Bytes; use derive_where::derive_where; -use malachite_engine::network; use tokio::sync::mpsc; use tokio::sync::oneshot; +use malachite_engine::consensus::Msg as ConsensusActorMsg; +use malachite_engine::network::Msg as NetworkActorMsg; + use crate::app::types::core::{CommitCertificate, Context, Round, ValueId}; use crate::app::types::streaming::StreamMessage; use crate::app::types::sync::DecidedValue; use crate::app::types::{LocallyProposedValue, PeerId, ProposedValue}; -use malachite_engine::consensus::Msg as ConsensusActorMsg; - /// Channels created for application consumption pub struct Channels { pub consensus: mpsc::Receiver>, @@ -114,10 +114,10 @@ pub enum NetworkMsg { PublishProposalPart(StreamMessage), } -impl From> for network::NetworkMsg { - fn from(msg: NetworkMsg) -> network::NetworkMsg { +impl From> for NetworkActorMsg { + fn from(msg: NetworkMsg) -> NetworkActorMsg { match msg { - NetworkMsg::PublishProposalPart(part) => network::NetworkMsg::PublishProposalPart(part), + NetworkMsg::PublishProposalPart(part) => NetworkActorMsg::PublishProposalPart(part), } } } diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs index 10787a8f2..1c978611f 100644 --- a/code/crates/app-channel/src/spawn.rs +++ b/code/crates/app-channel/src/spawn.rs @@ -3,7 +3,8 @@ use eyre::Result; use tokio::sync::mpsc; -use malachite_app::types::{metrics::SharedRegistry, Keypair}; +use malachite_app::types::metrics::SharedRegistry; +use malachite_app::types::Keypair; use malachite_config::Config as NodeConfig; use malachite_engine::consensus::ConsensusCodec; use malachite_engine::host::HostRef; From 9ad2a041b6cff58336814ab5c7d30bba3e8a54c0 Mon Sep 17 00:00:00 2001 From: "Romain Ruetschi (aider)" Date: Tue, 17 Dec 2024 12:13:00 +0100 Subject: [PATCH 15/19] Add doc comments to public definitions --- code/examples/channel/src/main.rs | 10 ++++++++-- code/examples/channel/src/node.rs | 1 + code/examples/channel/src/state.rs | 16 ++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/code/examples/channel/src/main.rs b/code/examples/channel/src/main.rs index 301827d62..c42c30aae 100644 --- a/code/examples/channel/src/main.rs +++ b/code/examples/channel/src/main.rs @@ -10,8 +10,14 @@ use malachite_test_cli::{logging, runtime}; use node::App; use tracing::{error, info, trace}; -/// main function parses the command-line arguments, loads configuration, initializes logging -/// and runs the application object. +/// Main entry point for the application +/// +/// This function: +/// - Parses command-line arguments +/// - Loads configuration from file +/// - Initializes logging system +/// - Sets up error handling +/// - Creates and runs the application node fn main() -> color_eyre::Result<()> { color_eyre::install().expect("Failed to install global error handler"); diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index eb4965763..be70d33f5 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -22,6 +22,7 @@ use malachite_test_cli::config::Config; use crate::state::{decode_value, State}; +/// Main application struct implementing the consensus node functionality #[derive(Clone)] pub struct App { pub config: Config, diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 8e7f4bdaf..74bd521b7 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -15,14 +15,18 @@ use malachite_app_channel::app::types::sync::DecidedValue; use malachite_test::codec::proto::ProtobufCodec; use malachite_test::{Address, BlockMetadata, Content, Height, ProposalPart, TestContext, Value}; +/// Decodes a Value from its byte representation using ProtobufCodec pub fn decode_value(bytes: Bytes) -> Value { ProtobufCodec.decode(bytes).unwrap() } +/// Encodes a Value into its byte representation using ProtobufCodec pub fn encode_value(value: &Value) -> Bytes { ProtobufCodec.encode(value).unwrap() } +/// Represents the internal state of the application node +/// Contains information about current height, round, proposals and blocks pub struct State { pub current_height: Height, pub current_round: Round, @@ -37,6 +41,7 @@ pub struct State { } impl State { + /// Creates a new State instance with the given validator address and starting height pub fn new(address: Address, height: Height) -> Self { Self { earliest_height: height, @@ -52,10 +57,13 @@ impl State { } } + /// Returns the earliest height available in the state pub fn get_earliest_height(&self) -> Height { self.earliest_height } + /// Processes and adds a new proposal to the state if it's valid + /// Returns Some(ProposedValue) if the proposal was accepted, None otherwise pub fn add_proposal( &mut self, stream_message: StreamMessage, @@ -92,10 +100,13 @@ impl State { } } + /// Retrieves a decided block at the given height pub fn get_block(&self, height: &Height) -> Option<&DecidedValue> { self.blocks.get(height) } + /// Commits a block with the given certificate, updating internal state + /// and moving to the next height pub fn commit_block(&mut self, certificate: CommitCertificate) { // Sort out proposals for (height, value) in self.undecided_proposals.clone() { @@ -128,6 +139,7 @@ impl State { self.current_round = Round::new(0); } + /// Retrieves a previously built proposal value for the given height pub fn get_previously_built_value( &self, height: &Height, @@ -135,6 +147,8 @@ impl State { self.undecided_proposals.get(height) } + /// Creates a new proposal value for the given height + /// Returns either a previously built proposal or creates a new one pub fn propose_value(&mut self, height: &Height) -> ProposedValue { if let Some(proposal) = self.get_previously_built_value(height) { proposal.clone() @@ -161,6 +175,8 @@ impl State { } } + /// Creates a broadcast message containing a proposal part + /// Updates internal sequence number and current proposal pub fn create_broadcast_message( &mut self, value: LocallyProposedValue, From acdc5a74c02701b293e2b367d6ae1b88aed4b182 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 12:31:41 +0100 Subject: [PATCH 16/19] Extract `run` function in its own module --- code/examples/channel/src/app.rs | 155 +++++++++++++++++++++++++++++ code/examples/channel/src/main.rs | 3 +- code/examples/channel/src/node.rs | 152 ++-------------------------- code/examples/channel/src/state.rs | 1 + 4 files changed, 165 insertions(+), 146 deletions(-) create mode 100644 code/examples/channel/src/app.rs diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs new file mode 100644 index 000000000..da9f739f0 --- /dev/null +++ b/code/examples/channel/src/app.rs @@ -0,0 +1,155 @@ +use eyre::eyre; +use tracing::{debug, error}; + +use malachite_app_channel::app::host::LocallyProposedValue; +use malachite_app_channel::app::types::core::{Round, Validity}; +use malachite_app_channel::app::types::ProposedValue; +use malachite_app_channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg}; +use malachite_test::{Genesis, TestContext}; + +use crate::state::{decode_value, State}; + +pub async fn run( + genesis: Genesis, + state: &mut State, + channels: &mut Channels, +) -> eyre::Result<()> { + while let Some(msg) = channels.consensus.recv().await { + match msg { + AppMsg::ConsensusReady { reply_to } => { + debug!("Consensus is ready"); + + if reply_to + .send(ConsensusMsg::StartHeight( + state.current_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send ConsensusReady reply"); + } + } + + AppMsg::StartedRound { + height, + round, + proposer, + } => { + state.current_height = height; + state.current_round = round; + state.current_proposer = Some(proposer); + } + + AppMsg::GetValue { + height, + round: _, + timeout_duration: _, + address: _, + reply_to, + } => { + let proposal = state.propose_value(&height); + + let value = LocallyProposedValue::new( + proposal.height, + proposal.round, + proposal.value, + proposal.extension, + ); + + // Send it to consensus + if reply_to.send(value.clone()).is_err() { + error!("Failed to send GetValue reply"); + } + + let stream_message = state.create_broadcast_message(value); + + // Broadcast it to others. Old messages need not be broadcast. + channels + .network + .send(NetworkMsg::PublishProposalPart(stream_message)) + .await?; + } + + AppMsg::GetEarliestBlockHeight { reply_to } => { + if reply_to.send(state.get_earliest_height()).is_err() { + error!("Failed to send GetEarliestBlockHeight reply"); + } + } + + AppMsg::ReceivedProposalPart { + from: _, + part, + reply_to, + } => { + if let Some(proposed_value) = state.add_proposal(part) { + if reply_to.send(proposed_value).is_err() { + error!("Failed to send ReceivedProposalPart reply"); + } + } + } + + AppMsg::GetValidatorSet { + height: _, + reply_to, + } => { + if reply_to.send(genesis.validator_set.clone()).is_err() { + error!("Failed to send GetValidatorSet reply"); + } + } + + AppMsg::Decided { + certificate, + reply_to, + } => { + state.commit_block(certificate); + if reply_to + .send(ConsensusMsg::StartHeight( + state.current_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send Decided reply"); + } + } + + AppMsg::GetDecidedBlock { height, reply_to } => { + let block = state.get_block(&height).cloned(); + if reply_to.send(block).is_err() { + error!("Failed to send GetDecidedBlock reply"); + } + } + + AppMsg::ProcessSyncedValue { + height, + round, + validator_address, + value_bytes, + reply_to, + } => { + let value = decode_value(value_bytes); + + if reply_to + .send(ProposedValue { + height, + round, + valid_round: Round::Nil, + validator_address, + value, + validity: Validity::Valid, + extension: None, + }) + .is_err() + { + error!("Failed to send ProcessSyncedBlock reply"); + } + } + + AppMsg::RestreamValue { .. } => { + unimplemented!("RestreamValue"); + } + } + } + + Err(eyre!("Consensus channel closed unexpectedly")) +} diff --git a/code/examples/channel/src/main.rs b/code/examples/channel/src/main.rs index c42c30aae..2c27dfdd5 100644 --- a/code/examples/channel/src/main.rs +++ b/code/examples/channel/src/main.rs @@ -1,5 +1,6 @@ //! Example application using channels +mod app; mod node; mod state; @@ -11,7 +12,7 @@ use node::App; use tracing::{error, info, trace}; /// Main entry point for the application -/// +/// /// This function: /// - Parses command-line arguments /// - Loads configuration from file diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index be70d33f5..9f0da976d 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -4,23 +4,21 @@ use std::path::{Path, PathBuf}; use async_trait::async_trait; -use eyre::eyre; use libp2p_identity::Keypair; use rand::{CryptoRng, RngCore}; -use tracing::{debug, error}; -use malachite_app_channel::app::consensus::ProposedValue; -use malachite_app_channel::app::types::core::{Round, Validity, VotingPower}; -use malachite_app_channel::app::types::LocallyProposedValue; +use malachite_app_channel::app::types::config::Config; +use malachite_app_channel::app::types::core::VotingPower; use malachite_app_channel::app::Node; -use malachite_app_channel::{AppMsg, ConsensusMsg, NetworkMsg}; + +// Use the same types used for integration tests. +// A real application would use its own types and context instead. use malachite_test::codec::proto::ProtobufCodec; use malachite_test::{ Address, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, ValidatorSet, }; -use malachite_test_cli::config::Config; -use crate::state::{decode_value, State}; +use crate::state::State; /// Main application struct implementing the consensus node functionality #[derive(Clone)] @@ -121,142 +119,6 @@ impl Node for App { let mut state = State::new(address, start_height.unwrap_or_default()); - while let Some(msg) = channels.consensus.recv().await { - match msg { - AppMsg::ConsensusReady { reply_to } => { - debug!("Consensus is ready to run"); - if reply_to - .send(ConsensusMsg::StartHeight( - state.current_height, - genesis.validator_set.clone(), - )) - .is_err() - { - error!("Failed to send ConsensusReady reply"); - } - } - - AppMsg::StartedRound { - height, - round, - proposer, - } => { - state.current_height = height; - state.current_round = round; - state.current_proposer = Some(proposer); - } - - AppMsg::GetValue { - height, - round: _, - timeout_duration: _, - address: _, - reply_to, - } => { - let proposal = state.propose_value(&height); - - let value = LocallyProposedValue::new( - proposal.height, - proposal.round, - proposal.value, - proposal.extension, - ); - - // Send it to consensus - if reply_to.send(value.clone()).is_err() { - error!("Failed to send GetValue reply"); - } - - let stream_message = state.create_broadcast_message(value); - - // Broadcast it to others. Old messages need not be broadcast. - channels - .network - .send(NetworkMsg::PublishProposalPart(stream_message)) - .await?; - } - - AppMsg::GetEarliestBlockHeight { reply_to } => { - if reply_to.send(state.get_earliest_height()).is_err() { - error!("Failed to send GetEarliestBlockHeight reply"); - } - } - - AppMsg::ReceivedProposalPart { - from: _, - part, - reply_to, - } => { - if let Some(proposed_value) = state.add_proposal(part) { - if reply_to.send(proposed_value).is_err() { - error!("Failed to send ReceivedProposalPart reply"); - } - } - } - - AppMsg::GetValidatorSet { - height: _, - reply_to, - } => { - if reply_to.send(genesis.validator_set.clone()).is_err() { - error!("Failed to send GetValidatorSet reply"); - } - } - - AppMsg::Decided { - certificate, - reply_to, - } => { - state.commit_block(certificate); - if reply_to - .send(ConsensusMsg::StartHeight( - state.current_height, - genesis.validator_set.clone(), - )) - .is_err() - { - error!("Failed to send Decided reply"); - } - } - - AppMsg::GetDecidedBlock { height, reply_to } => { - let block = state.get_block(&height).cloned(); - if reply_to.send(block).is_err() { - error!("Failed to send GetDecidedBlock reply"); - } - } - - AppMsg::ProcessSyncedValue { - height, - round, - validator_address, - value_bytes, - reply_to, - } => { - let value = decode_value(value_bytes); - - if reply_to - .send(ProposedValue { - height, - round, - valid_round: Round::Nil, - validator_address, - value, - validity: Validity::Valid, - extension: None, - }) - .is_err() - { - error!("Failed to send ProcessSyncedBlock reply"); - } - } - - AppMsg::RestreamValue { .. } => { - unimplemented!("RestreamValue"); - } - } - } - - Err(eyre!("Consensus channel closed unexpectedly")) + crate::app::run(genesis, &mut state, &mut channels).await } } diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 74bd521b7..c0f846c29 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -31,6 +31,7 @@ pub struct State { pub current_height: Height, pub current_round: Round, pub current_proposer: Option
, + earliest_height: Height, address: Address, sequence: u64, From 0d5a9bedf5ebcb9fbfcd395502148c5169ca156b Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 12:57:23 +0100 Subject: [PATCH 17/19] Remove `GetValue::address` field --- code/crates/engine/src/consensus.rs | 9 ++++----- code/crates/engine/src/host.rs | 1 - code/crates/starknet/host/src/actor.rs | 1 - 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index eb7c8169a..2a2707957 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -668,17 +668,16 @@ where myself: &ActorRef>, height: Ctx::Height, round: Round, - timeout_duration: Duration, + timeout: Duration, ) -> Result<(), ActorProcessingErr> { // Call `GetValue` on the Host actor, and forward the reply // to the current actor, wrapping it in `Msg::ProposeValue`. self.host.call_and_forward( - |reply| HostMsg::GetValue { + |reply_to| HostMsg::GetValue { height, round, - timeout: timeout_duration, - address: self.params.address.clone(), - reply_to: reply, + timeout, + reply_to, }, myself, |proposed: LocallyProposedValue| { diff --git a/code/crates/engine/src/host.rs b/code/crates/engine/src/host.rs index e272de7a6..1a3173b1d 100644 --- a/code/crates/engine/src/host.rs +++ b/code/crates/engine/src/host.rs @@ -62,7 +62,6 @@ pub enum HostMsg { height: Ctx::Height, round: Round, timeout: Duration, - address: Ctx::Address, reply_to: RpcReplyPort>, }, diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index bc9d28616..c25ac25ff 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -134,7 +134,6 @@ impl Host { height, round, timeout, - address: _, reply_to, } => on_get_value(state, &self.network, height, round, timeout, reply_to).await, From 30de7eafffdec78fd942fd80274db4b594a8f780 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 12:57:31 +0100 Subject: [PATCH 18/19] Cleanup --- code/crates/app-channel/src/channel.rs | 27 ++++++------ code/crates/app-channel/src/connector.rs | 53 ++++++++--------------- code/crates/app-channel/src/lib.rs | 2 +- code/examples/channel/src/app.rs | 55 ++++++++++++------------ code/examples/channel/src/state.rs | 2 +- 5 files changed, 59 insertions(+), 80 deletions(-) diff --git a/code/crates/app-channel/src/channel.rs b/code/crates/app-channel/src/channel.rs index f972949db..a73767fe7 100644 --- a/code/crates/app-channel/src/channel.rs +++ b/code/crates/app-channel/src/channel.rs @@ -13,6 +13,8 @@ use crate::app::types::streaming::StreamMessage; use crate::app::types::sync::DecidedValue; use crate::app::types::{LocallyProposedValue, PeerId, ProposedValue}; +pub type Reply = oneshot::Sender; + /// Channels created for application consumption pub struct Channels { pub consensus: mpsc::Receiver>, @@ -23,9 +25,7 @@ pub struct Channels { #[derive_where(Debug)] pub enum AppMsg { /// Consensus is ready - ConsensusReady { - reply_to: oneshot::Sender>, - }, + ConsensusReady { reply: Reply> }, /// Consensus has started a new round. StartedRound { @@ -38,9 +38,8 @@ pub enum AppMsg { GetValue { height: Ctx::Height, round: Round, - timeout_duration: Duration, - address: Ctx::Address, - reply_to: oneshot::Sender>, + timeout: Duration, + reply: Reply>, }, /// Request to restream an existing block/value from Driver @@ -53,33 +52,31 @@ pub enum AppMsg { }, /// Request the earliest block height in the block store - GetEarliestBlockHeight { - reply_to: oneshot::Sender, - }, + GetEarliestBlockHeight { reply: Reply }, /// ProposalPart received <-- consensus <-- gossip ReceivedProposalPart { from: PeerId, part: StreamMessage, - reply_to: oneshot::Sender>, + reply: Reply>, }, /// Get the validator set at a given height GetValidatorSet { height: Ctx::Height, - reply_to: oneshot::Sender, + reply: Reply, }, // Consensus has decided on a value Decided { certificate: CommitCertificate, - reply_to: oneshot::Sender>, + reply: Reply>, }, // Retrieve decided block from the block store - GetDecidedBlock { + GetDecidedValue { height: Ctx::Height, - reply_to: oneshot::Sender>>, + reply: Reply>>, }, // Synced block @@ -88,7 +85,7 @@ pub enum AppMsg { round: Round, validator_address: Ctx::Address, value_bytes: Bytes, - reply_to: oneshot::Sender>, + reply: Reply>, }, } diff --git a/code/crates/app-channel/src/connector.rs b/code/crates/app-channel/src/connector.rs index 6a8186bad..264767912 100644 --- a/code/crates/app-channel/src/connector.rs +++ b/code/crates/app-channel/src/connector.rs @@ -53,11 +53,9 @@ where ) -> Result<(), ActorProcessingErr> { match msg { HostMsg::ConsensusReady(consensus_ref) => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); - self.sender - .send(AppMsg::ConsensusReady { reply_to: tx }) - .await?; + self.sender.send(AppMsg::ConsensusReady { reply }).await?; consensus_ref.cast(rx.await?.into())?; } @@ -79,19 +77,17 @@ where HostMsg::GetValue { height, round, - timeout: timeout_duration, - address, + timeout, reply_to, } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender .send(AppMsg::GetValue { height, round, - timeout_duration, - address, - reply_to: tx, + timeout, + reply, }) .await?; @@ -117,10 +113,10 @@ where } HostMsg::GetEarliestBlockHeight { reply_to } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::GetEarliestBlockHeight { reply_to: tx }) + .send(AppMsg::GetEarliestBlockHeight { reply }) .await?; reply_to.send(rx.await?)?; @@ -131,27 +127,20 @@ where part, reply_to, } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::ReceivedProposalPart { - from, - part, - reply_to: tx, - }) + .send(AppMsg::ReceivedProposalPart { from, part, reply }) .await?; reply_to.send(rx.await?)?; } HostMsg::GetValidatorSet { height, reply_to } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::GetValidatorSet { - height, - reply_to: tx, - }) + .send(AppMsg::GetValidatorSet { height, reply }) .await?; reply_to.send(rx.await?)?; @@ -161,26 +150,20 @@ where certificate, consensus: consensus_ref, } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::Decided { - certificate, - reply_to: tx, - }) + .send(AppMsg::Decided { certificate, reply }) .await?; consensus_ref.cast(rx.await?.into())?; } HostMsg::GetDecidedValue { height, reply_to } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::GetDecidedBlock { - height, - reply_to: tx, - }) + .send(AppMsg::GetDecidedValue { height, reply }) .await?; reply_to.send(rx.await?)?; @@ -193,7 +176,7 @@ where value_bytes, reply_to, } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender .send(AppMsg::ProcessSyncedValue { @@ -201,7 +184,7 @@ where round, validator_address, value_bytes, - reply_to: tx, + reply, }) .await?; diff --git a/code/crates/app-channel/src/lib.rs b/code/crates/app-channel/src/lib.rs index 9aaadad78..4ac58aaf1 100644 --- a/code/crates/app-channel/src/lib.rs +++ b/code/crates/app-channel/src/lib.rs @@ -14,7 +14,7 @@ pub mod connector; pub mod spawn; mod channel; -pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg}; +pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply}; mod run; pub use run::run; diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs index da9f739f0..eb302109a 100644 --- a/code/examples/channel/src/app.rs +++ b/code/examples/channel/src/app.rs @@ -1,5 +1,5 @@ use eyre::eyre; -use tracing::{debug, error}; +use tracing::{error, info}; use malachite_app_channel::app::host::LocallyProposedValue; use malachite_app_channel::app::types::core::{Round, Validity}; @@ -16,10 +16,10 @@ pub async fn run( ) -> eyre::Result<()> { while let Some(msg) = channels.consensus.recv().await { match msg { - AppMsg::ConsensusReady { reply_to } => { - debug!("Consensus is ready"); + AppMsg::ConsensusReady { reply } => { + info!("Consensus is ready"); - if reply_to + if reply .send(ConsensusMsg::StartHeight( state.current_height, genesis.validator_set.clone(), @@ -35,6 +35,8 @@ pub async fn run( round, proposer, } => { + info!(%height, %round, %proposer, "Started round"); + state.current_height = height; state.current_round = round; state.current_proposer = Some(proposer); @@ -42,11 +44,12 @@ pub async fn run( AppMsg::GetValue { height, - round: _, - timeout_duration: _, - address: _, - reply_to, + round, + timeout: _, + reply, } => { + info!(%height, %round, "Get value"); + let proposal = state.propose_value(&height); let value = LocallyProposedValue::new( @@ -57,7 +60,7 @@ pub async fn run( ); // Send it to consensus - if reply_to.send(value.clone()).is_err() { + if reply.send(value.clone()).is_err() { error!("Failed to send GetValue reply"); } @@ -70,8 +73,8 @@ pub async fn run( .await?; } - AppMsg::GetEarliestBlockHeight { reply_to } => { - if reply_to.send(state.get_earliest_height()).is_err() { + AppMsg::GetEarliestBlockHeight { reply } => { + if reply.send(state.get_earliest_height()).is_err() { error!("Failed to send GetEarliestBlockHeight reply"); } } @@ -79,30 +82,25 @@ pub async fn run( AppMsg::ReceivedProposalPart { from: _, part, - reply_to, + reply, } => { if let Some(proposed_value) = state.add_proposal(part) { - if reply_to.send(proposed_value).is_err() { + if reply.send(proposed_value).is_err() { error!("Failed to send ReceivedProposalPart reply"); } } } - AppMsg::GetValidatorSet { - height: _, - reply_to, - } => { - if reply_to.send(genesis.validator_set.clone()).is_err() { + AppMsg::GetValidatorSet { height: _, reply } => { + if reply.send(genesis.validator_set.clone()).is_err() { error!("Failed to send GetValidatorSet reply"); } } - AppMsg::Decided { - certificate, - reply_to, - } => { + AppMsg::Decided { certificate, reply } => { state.commit_block(certificate); - if reply_to + + if reply .send(ConsensusMsg::StartHeight( state.current_height, genesis.validator_set.clone(), @@ -113,9 +111,10 @@ pub async fn run( } } - AppMsg::GetDecidedBlock { height, reply_to } => { - let block = state.get_block(&height).cloned(); - if reply_to.send(block).is_err() { + AppMsg::GetDecidedValue { height, reply } => { + let decided_value = state.get_decided_value(&height).cloned(); + + if reply.send(decided_value).is_err() { error!("Failed to send GetDecidedBlock reply"); } } @@ -125,11 +124,11 @@ pub async fn run( round, validator_address, value_bytes, - reply_to, + reply, } => { let value = decode_value(value_bytes); - if reply_to + if reply .send(ProposedValue { height, round, diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index c0f846c29..4b0fb9383 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -102,7 +102,7 @@ impl State { } /// Retrieves a decided block at the given height - pub fn get_block(&self, height: &Height) -> Option<&DecidedValue> { + pub fn get_decided_value(&self, height: &Height) -> Option<&DecidedValue> { self.blocks.get(height) } From e7e4b20844736debac3cf2063afcc084438efb84 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 17 Dec 2024 12:59:02 +0100 Subject: [PATCH 19/19] Rename `GetEarliestBlockHeight` to `GetHistoryMinHeight` --- code/crates/app-channel/src/channel.rs | 2 +- code/crates/app-channel/src/connector.rs | 4 ++-- code/crates/engine/src/consensus.rs | 2 +- code/crates/engine/src/host.rs | 2 +- code/crates/engine/src/sync.rs | 4 ++-- code/crates/starknet/host/src/actor.rs | 4 +--- code/examples/channel/src/app.rs | 4 ++-- 7 files changed, 10 insertions(+), 12 deletions(-) diff --git a/code/crates/app-channel/src/channel.rs b/code/crates/app-channel/src/channel.rs index a73767fe7..7c1cab637 100644 --- a/code/crates/app-channel/src/channel.rs +++ b/code/crates/app-channel/src/channel.rs @@ -52,7 +52,7 @@ pub enum AppMsg { }, /// Request the earliest block height in the block store - GetEarliestBlockHeight { reply: Reply }, + GetHistoryMinHeight { reply: Reply }, /// ProposalPart received <-- consensus <-- gossip ReceivedProposalPart { diff --git a/code/crates/app-channel/src/connector.rs b/code/crates/app-channel/src/connector.rs index 264767912..62a6b9eb9 100644 --- a/code/crates/app-channel/src/connector.rs +++ b/code/crates/app-channel/src/connector.rs @@ -112,11 +112,11 @@ where .await? } - HostMsg::GetEarliestBlockHeight { reply_to } => { + HostMsg::GetHistoryMinHeight { reply_to } => { let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::GetEarliestBlockHeight { reply }) + .send(AppMsg::GetHistoryMinHeight { reply }) .await?; reply_to.send(rx.await?)?; diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 2a2707957..8bd63e3bd 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -708,7 +708,7 @@ where } async fn get_history_min_height(&self) -> Result { - ractor::call!(self.host, |reply_to| HostMsg::GetEarliestBlockHeight { + ractor::call!(self.host, |reply_to| HostMsg::GetHistoryMinHeight { reply_to }) .map_err(|e| eyre!("Failed to get earliest block height: {e:?}").into()) diff --git a/code/crates/engine/src/host.rs b/code/crates/engine/src/host.rs index 1a3173b1d..c8e5569fc 100644 --- a/code/crates/engine/src/host.rs +++ b/code/crates/engine/src/host.rs @@ -75,7 +75,7 @@ pub enum HostMsg { }, /// Request the earliest block height in the block store - GetEarliestBlockHeight { reply_to: RpcReplyPort }, + GetHistoryMinHeight { reply_to: RpcReplyPort }, /// ProposalPart received <-- consensus <-- gossip ReceivedProposalPart { diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index 1577b9a84..fe594d84b 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -204,10 +204,10 @@ where } async fn get_history_min_height(&self) -> Result { - ractor::call!(self.host, |reply_to| HostMsg::GetEarliestBlockHeight { + ractor::call!(self.host, |reply_to| HostMsg::GetHistoryMinHeight { reply_to }) - .map_err(|e| eyre!("Failed to get earliest block height: {e:?}").into()) + .map_err(|e| eyre!("Failed to get earliest history height: {e:?}").into()) } async fn handle_effect( diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index c25ac25ff..f60f5ec88 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -126,9 +126,7 @@ impl Host { proposer, } => on_started_round(state, height, round, proposer).await, - HostMsg::GetEarliestBlockHeight { reply_to } => { - on_get_history_min_height(state, reply_to) - } + HostMsg::GetHistoryMinHeight { reply_to } => on_get_history_min_height(state, reply_to), HostMsg::GetValue { height, diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs index eb302109a..3800a4965 100644 --- a/code/examples/channel/src/app.rs +++ b/code/examples/channel/src/app.rs @@ -73,9 +73,9 @@ pub async fn run( .await?; } - AppMsg::GetEarliestBlockHeight { reply } => { + AppMsg::GetHistoryMinHeight { reply } => { if reply.send(state.get_earliest_height()).is_err() { - error!("Failed to send GetEarliestBlockHeight reply"); + error!("Failed to send GetHistoryMinHeight reply"); } }