From 47f994c025dfbc12238eeb021350d6e60162dcfc Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 10 Jan 2025 10:43:14 +0100 Subject: [PATCH 1/6] feat(test): Add test app based on the example app --- code/Cargo.lock | 54 ++ code/Cargo.toml | 7 +- code/crates/app-channel/src/lib.rs | 2 +- code/crates/app-channel/src/msgs.rs | 3 + code/crates/app-channel/src/run.rs | 61 +- code/crates/app-channel/src/spawn.rs | 3 +- code/crates/app/Cargo.toml | 8 +- code/crates/app/src/lib.rs | 6 +- code/crates/app/src/node.rs | 2 +- code/crates/app/src/spawn.rs | 28 + code/crates/engine/src/node.rs | 9 +- code/crates/engine/src/util/events.rs | 1 + code/crates/starknet/host/src/node.rs | 11 +- code/crates/starknet/host/src/spawn.rs | 12 +- code/crates/starknet/test/src/lib.rs | 25 +- code/crates/test/Cargo.toml | 8 + code/crates/test/app/Cargo.toml | 34 + code/crates/test/app/src/app.rs | 253 ++++++ code/crates/test/app/src/lib.rs | 5 + code/crates/test/app/src/node.rs | 153 ++++ code/crates/test/app/src/state.rs | 354 ++++++++ code/crates/test/app/src/store.rs | 308 +++++++ code/crates/test/app/src/store/keys.rs | 83 ++ code/crates/test/app/src/streaming.rs | 136 ++++ code/crates/test/framework/Cargo.toml | 32 + code/crates/test/framework/src/lib.rs | 770 ++++++++++++++++++ code/crates/test/mempool/src/lib.rs | 2 +- code/crates/test/src/node.rs | 7 +- code/crates/test/tests/it/main.rs | 7 + code/crates/test/tests/it/n3f0.rs | 18 + .../test/tests/it/n3f0_consensus_mode.rs | 50 ++ .../test/tests/it/n3f0_pubsub_protocol.rs | 77 ++ code/crates/test/tests/it/n3f1.rs | 113 +++ code/crates/test/tests/it/value_sync.rs | 202 +++++ code/crates/test/tests/it/vote_sync.rs | 137 ++++ code/crates/test/tests/it/wal.rs | 195 +++++ code/examples/channel/src/node.rs | 12 +- 37 files changed, 3100 insertions(+), 88 deletions(-) create mode 100644 code/crates/test/app/Cargo.toml create mode 100644 code/crates/test/app/src/app.rs create mode 100644 code/crates/test/app/src/lib.rs create mode 100644 code/crates/test/app/src/node.rs create mode 100644 code/crates/test/app/src/state.rs create mode 100644 code/crates/test/app/src/store.rs create mode 100644 code/crates/test/app/src/store/keys.rs create mode 100644 code/crates/test/app/src/streaming.rs create mode 100644 code/crates/test/framework/Cargo.toml create mode 100644 code/crates/test/framework/src/lib.rs create mode 100644 code/crates/test/tests/it/main.rs create mode 100644 code/crates/test/tests/it/n3f0.rs create mode 100644 code/crates/test/tests/it/n3f0_consensus_mode.rs create mode 100644 code/crates/test/tests/it/n3f0_pubsub_protocol.rs create mode 100644 code/crates/test/tests/it/n3f1.rs create mode 100644 code/crates/test/tests/it/value_sync.rs create mode 100644 code/crates/test/tests/it/vote_sync.rs create mode 100644 code/crates/test/tests/it/wal.rs diff --git a/code/Cargo.lock b/code/Cargo.lock index 993505705..a3d6df6bb 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2094,8 +2094,10 @@ dependencies = [ "informalsystems-malachitebft-peer", "informalsystems-malachitebft-sync", "libp2p-identity", + "ractor", "rand", "serde", + "tokio", "tracing", ] @@ -2454,6 +2456,7 @@ dependencies = [ "async-trait", "base64 0.22.1", "bytes", + "bytesize", "ed25519-consensus", "eyre", "hex", @@ -2466,6 +2469,8 @@ dependencies = [ "informalsystems-malachitebft-proto", "informalsystems-malachitebft-signing-ed25519", "informalsystems-malachitebft-sync", + "informalsystems-malachitebft-test-app", + "informalsystems-malachitebft-test-framework", "prost", "prost-build", "prost-types", @@ -2474,6 +2479,32 @@ dependencies = [ "serde_json", "sha3", "signature", + "tokio", + "tracing", +] + +[[package]] +name = "informalsystems-malachitebft-test-app" +version = "0.0.1" +dependencies = [ + "async-trait", + "bytes", + "color-eyre", + "derive-where", + "eyre", + "informalsystems-malachitebft-app-channel", + "informalsystems-malachitebft-proto", + "informalsystems-malachitebft-test", + "informalsystems-malachitebft-test-cli", + "informalsystems-malachitebft-test-framework", + "prost", + "rand", + "redb", + "serde_json", + "sha3", + "thiserror 2.0.9", + "tokio", + "tracing", ] [[package]] @@ -2500,6 +2531,29 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "informalsystems-malachitebft-test-framework" +version = "0.0.1" +dependencies = [ + "axum", + "bytesize", + "eyre", + "informalsystems-malachitebft-app-channel", + "informalsystems-malachitebft-config", + "informalsystems-malachitebft-core-consensus", + "informalsystems-malachitebft-core-types", + "informalsystems-malachitebft-engine", + "informalsystems-malachitebft-metrics", + "informalsystems-malachitebft-test", + "informalsystems-malachitebft-test-app", + "ractor", + "rand", + "tempfile", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "informalsystems-malachitebft-test-mbt" version = "0.0.1" diff --git a/code/Cargo.toml b/code/Cargo.toml index f63950068..0a6e7386c 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -24,14 +24,15 @@ members = [ # Test "crates/test", + "crates/test/app", "crates/test/cli", "crates/test/mbt", "crates/test/mempool", + "crates/test/framework", "crates/network/test", # Starknet "crates/starknet/*", - "crates/starknet/*", # Examples "examples/channel", @@ -64,7 +65,6 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage_nightly)'] } malachitebft-engine = { version = "0.0.1", package = "informalsystems-malachitebft-engine", path = "crates/engine" } malachitebft-app = { version = "0.0.1", package = "informalsystems-malachitebft-app", path = "crates/app" } malachitebft-app-channel = { version = "0.0.1", package = "informalsystems-malachitebft-app-channel", path = "crates/app-channel" } -malachitebft-test-cli = { version = "0.0.1", package = "informalsystems-malachitebft-test-cli", path = "crates/test/cli" } malachitebft-codec = { version = "0.0.1", package = "informalsystems-malachitebft-codec", path = "crates/codec" } malachitebft-config = { version = "0.0.1", package = "informalsystems-malachitebft-config", path = "crates/config" } malachitebft-core-consensus = { version = "0.0.1", package = "informalsystems-malachitebft-core-consensus", path = "crates/core-consensus" } @@ -83,8 +83,11 @@ malachitebft-wal = { version = "0.0.1", package = "informalsystem # Test malachitebft-test = { version = "0.0.1", package = "informalsystems-malachitebft-test", path = "crates/test" } +malachitebft-test-app = { version = "0.0.1", package = "informalsystems-malachitebft-test-app", path = "crates/test/app" } +malachitebft-test-cli = { version = "0.0.1", package = "informalsystems-malachitebft-test-cli", path = "crates/test/cli" } malachitebft-test-mbt = { version = "0.0.1", package = "informalsystems-malachitebft-test-mbt", path = "crates/test/mbt" } malachitebft-test-mempool = { version = "0.0.1", package = "informalsystems-malachitebft-test-mempool", path = "crates/test/mempool" } +malachitebft-test-framework = { version = "0.0.1", package = "informalsystems-malachitebft-test-framework", path = "crates/test/framework" } malachitebft-discovery-test = { version = "0.0.1", package = "informalsystems-malachitebft-discovery-test", path = "crates/network/test" } # Starknet diff --git a/code/crates/app-channel/src/lib.rs b/code/crates/app-channel/src/lib.rs index 37c708f1c..f382394c6 100644 --- a/code/crates/app-channel/src/lib.rs +++ b/code/crates/app-channel/src/lib.rs @@ -19,4 +19,4 @@ mod msgs; pub use msgs::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply}; mod run; -pub use run::run; +pub use run::{start_engine, EngineHandle}; diff --git a/code/crates/app-channel/src/msgs.rs b/code/crates/app-channel/src/msgs.rs index 85196f08a..b610040a5 100644 --- a/code/crates/app-channel/src/msgs.rs +++ b/code/crates/app-channel/src/msgs.rs @@ -2,6 +2,7 @@ use std::time::Duration; use bytes::Bytes; use derive_where::derive_where; +use malachitebft_engine::util::events::TxEvent; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -21,6 +22,8 @@ pub struct Channels { pub consensus: mpsc::Receiver>, /// Channel for sending messages to the networking layer pub network: mpsc::Sender>, + /// Receiver of events, call `subscribe` to receive them + pub events: TxEvent, } /// Messages sent from consensus to the application. diff --git a/code/crates/app-channel/src/run.rs b/code/crates/app-channel/src/run.rs index 44cd85117..921575491 100644 --- a/code/crates/app-channel/src/run.rs +++ b/code/crates/app-channel/src/run.rs @@ -1,30 +1,37 @@ //! Run Malachite consensus with the given configuration and context. //! Provides the application with a channel for receiving messages from consensus. -use std::path::PathBuf; - use eyre::Result; +use malachitebft_app::spawn::{ + spawn_consensus_actor, spawn_node_actor, spawn_sync_actor, spawn_wal_actor, +}; +use malachitebft_engine::node::NodeRef; +use malachitebft_engine::util::events::TxEvent; +use tokio::task::JoinHandle; + +use crate::app; use crate::app::types::codec::{ConsensusCodec, SyncCodec, WalCodec}; use crate::app::types::config::Config as NodeConfig; use crate::app::types::core::Context; use crate::app::types::metrics::{Metrics, SharedRegistry}; use crate::spawn::{spawn_host_actor, spawn_network_actor}; -use crate::{app, Channels}; +use crate::Channels; -use malachitebft_app::{spawn_consensus_actor, spawn_sync_actor, spawn_wal_actor}; -use malachitebft_engine::util::events::TxEvent; +pub struct EngineHandle { + pub actor: NodeRef, + pub handle: JoinHandle<()>, +} #[tracing::instrument("node", skip_all, fields(moniker = %cfg.moniker))] -pub async fn run( +pub async fn start_engine( ctx: Ctx, codec: Codec, node: Node, cfg: NodeConfig, - private_key_file: PathBuf, start_height: Option, initial_validator_set: Ctx::ValidatorSet, -) -> Result> +) -> Result<(Channels, EngineHandle)> where Ctx: Context, Node: app::Node, @@ -37,20 +44,20 @@ where let registry = SharedRegistry::global().with_moniker(cfg.moniker.as_str()); let metrics = Metrics::register(®istry); - let private_key_file = node.load_private_key_file(private_key_file)?; + let private_key_file = node.load_private_key_file()?; let private_key = node.load_private_key(private_key_file); let public_key = node.get_public_key(&private_key); let address = node.get_address(&public_key); let keypair = node.get_keypair(private_key); // Spawn consensus gossip - let (network, network_tx) = + let (network, tx_network) = spawn_network_actor(&cfg, keypair, ®istry, codec.clone()).await?; let wal = spawn_wal_actor(&ctx, codec, &node.get_home_dir(), ®istry).await?; // Spawn the host actor - let (connector, consensus_rx) = spawn_host_actor(metrics.clone()).await?; + let (connector, rx_consensus) = spawn_host_actor(metrics.clone()).await?; let sync = spawn_sync_actor( ctx.clone(), @@ -61,24 +68,36 @@ where ) .await?; + let tx_event = TxEvent::new(); + // Spawn consensus - let _consensus = spawn_consensus_actor( + let consensus = spawn_consensus_actor( start_height, initial_validator_set, address, - ctx, + ctx.clone(), cfg, - network, - connector, - wal, + network.clone(), + connector.clone(), + wal.clone(), sync.clone(), metrics, - TxEvent::new(), + tx_event.clone(), ) .await?; - Ok(Channels { - consensus: consensus_rx, - network: network_tx, - }) + let (node, handle) = spawn_node_actor(ctx, network, consensus, wal, sync, connector).await?; + + let channels = Channels { + consensus: rx_consensus, + network: tx_network, + events: tx_event, + }; + + let handle = EngineHandle { + actor: node, + handle, + }; + + Ok((channels, handle)) } diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs index b93521a3f..39889f47d 100644 --- a/code/crates/app-channel/src/spawn.rs +++ b/code/crates/app-channel/src/spawn.rs @@ -3,6 +3,7 @@ use eyre::Result; use tokio::sync::mpsc; +use malachitebft_app as app; use malachitebft_app::types::metrics::SharedRegistry; use malachitebft_app::types::Keypair; use malachitebft_config::Config as NodeConfig; @@ -40,7 +41,7 @@ where { let (tx, mut rx) = mpsc::channel::>(1); - let actor_ref = malachitebft_app::spawn_network_actor(cfg, keypair, registry, codec).await?; + let actor_ref = app::spawn::spawn_network_actor(cfg, keypair, registry, codec).await?; tokio::spawn({ let actor_ref = actor_ref.clone(); diff --git a/code/crates/app/Cargo.toml b/code/crates/app/Cargo.toml index 12f005e71..baa545002 100644 --- a/code/crates/app/Cargo.toml +++ b/code/crates/app/Cargo.toml @@ -13,13 +13,13 @@ readme = "../../../README.md" all-features = true [dependencies] -malachitebft-engine.workspace = true malachitebft-codec.workspace = true -malachitebft-core-types.workspace = true malachitebft-config.workspace = true malachitebft-core-consensus.workspace = true -malachitebft-network.workspace = true +malachitebft-core-types.workspace = true +malachitebft-engine.workspace = true malachitebft-metrics.workspace = true +malachitebft-network.workspace = true malachitebft-peer.workspace = true malachitebft-sync.workspace = true @@ -27,8 +27,10 @@ async-trait = { workspace = true } derive-where = { workspace = true } eyre = { workspace = true } libp2p-identity = { workspace = true } +ractor = { workspace = true } rand = { workspace = true } serde = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } [lints] diff --git a/code/crates/app/src/lib.rs b/code/crates/app/src/lib.rs index d37d86c5a..f76e9a4b9 100644 --- a/code/crates/app/src/lib.rs +++ b/code/crates/app/src/lib.rs @@ -12,10 +12,12 @@ mod node; pub use node::Node; pub mod part_store; +pub mod spawn; pub mod types; -mod spawn; -pub use spawn::{spawn_consensus_actor, spawn_network_actor, spawn_sync_actor, spawn_wal_actor}; +pub mod events { + pub use malachitebft_engine::util::events::TxEvent; +} pub mod streaming { pub use malachitebft_engine::util::streaming::*; diff --git a/code/crates/app/src/node.rs b/code/crates/app/src/node.rs index 8c359ccf7..6fb8153ac 100644 --- a/code/crates/app/src/node.rs +++ b/code/crates/app/src/node.rs @@ -29,7 +29,7 @@ pub trait Node { fn load_private_key(&self, file: Self::PrivateKeyFile) -> PrivateKey; - fn load_private_key_file(&self, path: impl AsRef) -> io::Result; + fn load_private_key_file(&self) -> io::Result; fn make_private_key_file(&self, private_key: PrivateKey) -> Self::PrivateKeyFile; diff --git a/code/crates/app/src/spawn.rs b/code/crates/app/src/spawn.rs index fdbb0df19..019e8f886 100644 --- a/code/crates/app/src/spawn.rs +++ b/code/crates/app/src/spawn.rs @@ -4,11 +4,13 @@ use std::path::Path; use std::time::Duration; use eyre::Result; +use tokio::task::JoinHandle; use tracing::Span; use malachitebft_engine::consensus::{Consensus, ConsensusCodec, ConsensusParams, ConsensusRef}; use malachitebft_engine::host::HostRef; use malachitebft_engine::network::{Network, NetworkRef}; +use malachitebft_engine::node::{Node, NodeRef}; use malachitebft_engine::sync::{Params as SyncParams, Sync, SyncCodec, SyncRef}; use malachitebft_engine::util::events::TxEvent; use malachitebft_engine::wal::{Wal, WalCodec, WalRef}; @@ -20,6 +22,32 @@ use crate::types::metrics::{Metrics, SharedRegistry}; use crate::types::sync; use crate::types::ValuePayload; +pub async fn spawn_node_actor( + ctx: Ctx, + network: NetworkRef, + consensus: ConsensusRef, + wal: WalRef, + sync: Option>, + host: HostRef, +) -> Result<(NodeRef, JoinHandle<()>)> +where + Ctx: Context, +{ + // Spawn the node actor + let node = Node::new( + ctx, + network, + consensus, + wal, + sync, + host, + tracing::Span::current(), + ); + + let (actor_ref, handle) = node.spawn().await?; + Ok((actor_ref, handle)) +} + pub async fn spawn_network_actor( cfg: &NodeConfig, keypair: Keypair, diff --git a/code/crates/engine/src/node.rs b/code/crates/engine/src/node.rs index c8d331873..db8f53d6a 100644 --- a/code/crates/engine/src/node.rs +++ b/code/crates/engine/src/node.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; +use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent}; use tokio::task::JoinHandle; use tracing::{error, info, warn}; @@ -20,9 +20,7 @@ pub struct Node { consensus: ConsensusRef, wal: WalRef, sync: Option>, - mempool: ActorCell, host: HostRef, - start_height: Ctx::Height, span: tracing::Span, } @@ -37,9 +35,7 @@ where consensus: ConsensusRef, wal: WalRef, sync: Option>, - mempool: ActorCell, host: HostRef, - start_height: Ctx::Height, span: tracing::Span, ) -> Self { Self { @@ -48,9 +44,7 @@ where consensus, wal, sync, - mempool, host, - start_height, span, } } @@ -77,7 +71,6 @@ where // Set ourselves as the supervisor of the other actors self.network.link(myself.get_cell()); self.consensus.link(myself.get_cell()); - self.mempool.link(myself.get_cell()); self.host.link(myself.get_cell()); self.wal.link(myself.get_cell()); diff --git a/code/crates/engine/src/util/events.rs b/code/crates/engine/src/util/events.rs index 82e739554..d83baa1a6 100644 --- a/code/crates/engine/src/util/events.rs +++ b/code/crates/engine/src/util/events.rs @@ -8,6 +8,7 @@ use malachitebft_core_types::{CommitCertificate, Context, Round, Timeout, ValueO pub type RxEvent = broadcast::Receiver>; +#[derive_where(Clone)] pub struct TxEvent { tx: broadcast::Sender>, } diff --git a/code/crates/starknet/host/src/node.rs b/code/crates/starknet/host/src/node.rs index cc155a9c0..a55ef603c 100644 --- a/code/crates/starknet/host/src/node.rs +++ b/code/crates/starknet/host/src/node.rs @@ -86,11 +86,8 @@ impl Node for StarknetNode { file.private_key } - fn load_private_key_file( - &self, - path: impl AsRef, - ) -> std::io::Result { - let private_key = std::fs::read_to_string(path)?; + fn load_private_key_file(&self) -> std::io::Result { + let private_key = std::fs::read_to_string(&self.private_key_file)?; serde_json::from_str(&private_key).map_err(|e| e.into()) } @@ -117,10 +114,8 @@ impl Node for StarknetNode { let span = tracing::error_span!("node", moniker = %self.config.moniker); let _enter = span.enter(); - let priv_key_file = self.load_private_key_file(self.private_key_file.clone())?; - + let priv_key_file = self.load_private_key_file()?; let private_key = self.load_private_key(priv_key_file); - let genesis = self.load_genesis(self.genesis_file.clone())?; let start_height = self.start_height.map(|height| Height::new(height, 1)); diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index b7eaac6b2..87000c1c3 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -98,17 +98,7 @@ pub async fn spawn_node_actor( .await; // Spawn the node actor - let node = Node::new( - ctx, - network, - consensus, - wal, - sync, - mempool.get_cell(), - host, - start_height, - span, - ); + let node = Node::new(ctx, network, consensus, wal, sync, host, span); let (actor_ref, handle) = node.spawn().await.unwrap(); diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index c855eced8..72efe10ec 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -395,12 +395,10 @@ where { let validator_set = self.validator_set.clone(); - let home_dir = tempfile::TempDir::with_prefix(format!( - "informalsystems-malachitebft-starknet-test-{}", - self.id - )) - .unwrap() - .into_path(); + let home_dir = + tempfile::TempDir::with_prefix(format!("malachitebft-starknet-test-{}", self.id)) + .unwrap() + .into_path(); set.spawn( async move { @@ -471,7 +469,7 @@ async fn run_node( let mut rx_event = tx_event.subscribe(); let rx_event_bg = tx_event.subscribe(); - let (mut actor_ref, mut handle) = spawn_node_actor( + let (mut actor_ref, _handle) = spawn_node_actor( config.clone(), home_dir.clone(), validator_set.clone(), @@ -536,9 +534,7 @@ async fn run_node( sleep(after).await; actor_ref.kill_and_wait(None).await.expect("Node must stop"); - bg.abort(); - handle.abort(); } Step::ResetDb => { @@ -559,7 +555,8 @@ async fn run_node( let new_rx_event_bg = tx_event.subscribe(); info!("Spawning node"); - let (new_actor_ref, new_handle) = spawn_node_actor( + + let (new_actor_ref, _) = spawn_node_actor( config.clone(), home_dir.clone(), validator_set.clone(), @@ -575,7 +572,6 @@ async fn run_node( bg = spawn_bg(new_rx_event_bg); actor_ref = new_actor_ref; - handle = new_handle; rx_event = new_rx_event; } @@ -590,7 +586,6 @@ async fn run_node( } Err(e) => { actor_ref.stop(Some("Test failed".to_string())); - handle.abort(); bg.abort(); return TestResult::Failure(e.to_string()); @@ -603,7 +598,6 @@ async fn run_node( let actual = decisions.load(Ordering::SeqCst); actor_ref.stop(Some("Test is over".to_string())); - handle.abort(); bg.abort(); if expected.check(actual) { @@ -623,7 +617,6 @@ async fn run_node( Step::Fail(reason) => { actor_ref.stop(Some("Test failed".to_string())); - handle.abort(); bg.abort(); return TestResult::Failure(reason); @@ -644,9 +637,9 @@ pub fn init_logging(test_module: &str) { .any(|(k, v)| std::env::var(k).as_deref() == Ok(v)); let directive = if enable_debug { - format!("{test_module}=debug,ractor=error,debug") + format!("{test_module}=debug,ractor=error,informalsystems_malachitebft=debug") } else { - format!("{test_module}=debug,ractor=error,warn") + format!("{test_module}=debug,ractor=error,informalsystems_malachitefbft=warn") }; let filter = EnvFilter::builder().parse(directive).unwrap(); diff --git a/code/crates/test/Cargo.toml b/code/crates/test/Cargo.toml index 1c0d22fd4..8562aca19 100644 --- a/code/crates/test/Cargo.toml +++ b/code/crates/test/Cargo.toml @@ -37,6 +37,14 @@ serde_json = { workspace = true } sha3 = { workspace = true } signature = { workspace = true } +[dev-dependencies] +malachitebft-test-app.workspace = true +malachitebft-test-framework.workspace = true + +bytesize.workspace = true +tokio.workspace = true +tracing.workspace = true + [build-dependencies] prost-build = { workspace = true } diff --git a/code/crates/test/app/Cargo.toml b/code/crates/test/app/Cargo.toml new file mode 100644 index 000000000..15e5b04cc --- /dev/null +++ b/code/crates/test/app/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "informalsystems-malachitebft-test-app" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true +publish = false + +[dependencies] +async-trait.workspace = true +bytes.workspace = true +color-eyre.workspace = true +derive-where.workspace = true +eyre.workspace = true +prost.workspace = true +rand.workspace = true +redb.workspace = true +serde_json.workspace = true +sha3.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true + +malachitebft-app-channel.workspace = true +malachitebft-proto.workspace = true +malachitebft-test.workspace = true +malachitebft-test-cli.workspace = true + +[dev-dependencies] +malachitebft-test-framework.workspace = true + +[lints] +workspace = true diff --git a/code/crates/test/app/src/app.rs b/code/crates/test/app/src/app.rs new file mode 100644 index 000000000..affc72d21 --- /dev/null +++ b/code/crates/test/app/src/app.rs @@ -0,0 +1,253 @@ +use eyre::eyre; +use tracing::{error, info}; + +use malachitebft_app_channel::app::streaming::StreamContent; +use malachitebft_app_channel::app::types::codec::Codec; +use malachitebft_app_channel::app::types::core::{Round, Validity}; +use malachitebft_app_channel::app::types::sync::RawDecidedValue; +use malachitebft_app_channel::app::types::ProposedValue; +use malachitebft_app_channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg}; +use malachitebft_test::codec::proto::ProtobufCodec; +use malachitebft_test::{Genesis, TestContext}; + +use crate::state::{decode_value, State}; + +pub async fn run( + genesis: Genesis, + state: &mut State, + channels: &mut Channels, +) -> eyre::Result<()> { + while let Some(msg) = channels.consensus.recv().await { + match msg { + // The first message to handle is the `ConsensusReady` message, signaling to the app + // that Malachite is ready to start consensus + AppMsg::ConsensusReady { reply } => { + info!("Consensus is ready"); + + // We can simply respond by telling the engine to start consensus + // at the current height, which is initially 1 + if reply + .send(ConsensusMsg::StartHeight( + state.current_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send ConsensusReady reply"); + } + } + + // The next message to handle is the `StartRound` message, signaling to the app + // that consensus has entered a new round (including the initial round 0) + AppMsg::StartedRound { + height, + round, + proposer, + } => { + info!(%height, %round, %proposer, "Started round"); + + // We can use that opportunity to update our internal state + state.current_height = height; + state.current_round = round; + state.current_proposer = Some(proposer); + } + + // At some point, we may end up being the proposer for that round, and the engine + // will then ask us for a value to propose to the other validators. + AppMsg::GetValue { + height, + round, + timeout: _, + reply, + } => { + // NOTE: We can ignore the timeout as we are building the value right away. + // If we were let's say reaping as many txes from a mempool and executing them, + // then we would need to respect the timeout and stop at a certain point. + + info!(%height, %round, "Consensus is requesting a value to propose"); + + // Here it is important that, if we have previously built a value for this height and round, + // we send back the very same value. We will not go into details here but this has to do + // with crash recovery and is not strictly necessary in this example app since all our state + // is kept in-memory and therefore is not crash tolerant at all. + if let Some(proposal) = state.get_previously_built_value(height, round).await? { + info!(value = %proposal.value.id(), "Re-using previously built value"); + + if reply.send(proposal).is_err() { + error!("Failed to send GetValue reply"); + } + + return Ok(()); + } + + // If we have not previously built a value for that very same height and round, + // we need to create a new value to propose and send it back to consensus. + let proposal = state.propose_value(height, round).await?; + + // Send it to consensus + if reply.send(proposal.clone()).is_err() { + error!("Failed to send GetValue reply"); + } + + // Now what's left to do is to break down the value to propose into parts, + // and send those parts over the network to our peers, for them to re-assemble the full value. + for stream_message in state.stream_proposal(proposal) { + info!(%height, %round, "Streaming proposal part: {stream_message:?}"); + channels + .network + .send(NetworkMsg::PublishProposalPart(stream_message)) + .await?; + } + + // NOTE: In this tutorial, the value is simply an integer and therefore results in a very small + // message to gossip over the network, but if we were building a real application, + // say building blocks containing thousands of transactions, the proposal would typically only + // carry the block hash and the full block itself would be split into parts in order to + // avoid blowing up the bandwidth requirements by gossiping a single huge message. + } + + // On the receiving end of these proposal parts (ie. when we are not the proposer), + // we need to process these parts and re-assemble the full value. + // To this end, we store each part that we receive and assemble the full value once we + // have all its constituent parts. Then we send that value back to consensus for it to + // consider and vote for or against it (ie. vote `nil`), depending on its validity. + AppMsg::ReceivedProposalPart { from, part, reply } => { + let part_type = match &part.content { + StreamContent::Data(part) => part.get_type(), + StreamContent::Fin(_) => "end of stream", + }; + + info!(%from, %part.sequence, part.type = %part_type, "Received proposal part"); + + let proposed_value = state.received_proposal_part(from, part).await?; + + if reply.send(proposed_value).is_err() { + error!("Failed to send ReceivedProposalPart reply"); + } + } + + // In some cases, e.g. to verify the signature of a vote received at a higher height + // than the one we are at (e.g. because we are lagging behind a little bit), + // the engine may ask us for the validator set at that height. + // + // In our case, our validator set stays constant between heights so we can + // send back the validator set found in our genesis state. + AppMsg::GetValidatorSet { height: _, reply } => { + if reply.send(genesis.validator_set.clone()).is_err() { + error!("Failed to send GetValidatorSet reply"); + } + } + + // After some time, consensus will finally reach a decision on the value + // to commit for the current height, and will notify the application, + // providing it with a commit certificate which contains the ID of the value + // that was decided on as well as the set of commits for that value, + // ie. the precommits together with their (aggregated) signatures. + AppMsg::Decided { certificate, reply } => { + info!( + height = %certificate.height, round = %certificate.round, + value = %certificate.value_id, + "Consensus has decided on value" + ); + + // When that happens, we store the decided value in our store + state.commit(certificate).await?; + + // And then we instruct consensus to start the next height + if reply + .send(ConsensusMsg::StartHeight( + state.current_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send Decided reply"); + } + } + + // It may happen that our node is lagging behind its peers. In that case, + // a synchronization mechanism will automatically kick to try and catch up to + // our peers. When that happens, some of these peers will send us decided values + // for the heights in between the one we are currently at (included) and the one + // that they are at. When the engine receives such a value, it will forward to the application + // to decode it from its wire format and send back the decoded value to consensus. + AppMsg::ProcessSyncedValue { + height, + round, + proposer, + value_bytes, + reply, + } => { + info!(%height, %round, "Processing synced value"); + + let value = decode_value(value_bytes); + + if reply + .send(ProposedValue { + height, + round, + valid_round: Round::Nil, + proposer, + value, + validity: Validity::Valid, + extension: None, + }) + .is_err() + { + error!("Failed to send ProcessSyncedValue reply"); + } + } + + // If, on the other hand, we are not lagging behind but are instead asked by one of + // our peer to help them catch up because they are the one lagging behind, + // then the engine might ask the application to provide with the value + // that was decided at some lower height. In that case, we fetch it from our store + // and send it to consensus. + AppMsg::GetDecidedValue { height, reply } => { + let decided_value = state.get_decided_value(height).await; + + let raw_decided_value = decided_value.map(|decided_value| RawDecidedValue { + certificate: decided_value.certificate, + value_bytes: ProtobufCodec.encode(&decided_value.value).unwrap(), // FIXME: unwrap + }); + + if reply.send(raw_decided_value).is_err() { + error!("Failed to send GetDecidedValue reply"); + } + } + + // In order to figure out if we can help a peer that is lagging behind, + // the engine may ask us for the height of the earliest available value in our store. + AppMsg::GetHistoryMinHeight { reply } => { + let min_height = state.get_earliest_height().await; + + if reply.send(min_height).is_err() { + error!("Failed to send GetHistoryMinHeight reply"); + } + } + + AppMsg::RestreamProposal { .. } => { + error!("RestreamProposal not implemented"); + } + + AppMsg::PeerJoined { peer_id } => { + info!(%peer_id, "Peer joined our local view of network"); + + // You might want to track connected peers in your state + state.peers.insert(peer_id); + } + + AppMsg::PeerLeft { peer_id } => { + info!(%peer_id, "Peer left our local view of network"); + + // Remove the peer from tracking + state.peers.remove(&peer_id); + } + } + } + + // If we get there, it can only be because the channel we use to receive message + // from consensus has been closed, meaning that the consensus actor has died. + // We can do nothing but return an error here. + Err(eyre!("Consensus channel closed unexpectedly")) +} diff --git a/code/crates/test/app/src/lib.rs b/code/crates/test/app/src/lib.rs new file mode 100644 index 000000000..9341da944 --- /dev/null +++ b/code/crates/test/app/src/lib.rs @@ -0,0 +1,5 @@ +pub mod app; +pub mod node; +pub mod state; +pub mod store; +pub mod streaming; diff --git a/code/crates/test/app/src/node.rs b/code/crates/test/app/src/node.rs new file mode 100644 index 000000000..62bddfb09 --- /dev/null +++ b/code/crates/test/app/src/node.rs @@ -0,0 +1,153 @@ +//! The Application (or Node) definition. The Node trait implements the Consensus context and the +//! cryptographic library used for signing. + +use std::path::{Path, PathBuf}; + +use async_trait::async_trait; +use rand::{CryptoRng, RngCore}; +use tokio::task::JoinHandle; +use tracing::Instrument; + +use malachitebft_app_channel::app::events::TxEvent; +use malachitebft_app_channel::app::types::config::Config; +use malachitebft_app_channel::app::types::core::VotingPower; +use malachitebft_app_channel::app::types::Keypair; +use malachitebft_app_channel::app::Node; +use malachitebft_app_channel::EngineHandle; + +// Use the same types used for integration tests. +// A real application would use its own types and context instead. +use malachitebft_test::codec::proto::ProtobufCodec; +use malachitebft_test::{ + Address, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, ValidatorSet, +}; + +use crate::state::State; +use crate::store::Store; + +pub struct Handles { + pub app: JoinHandle>, + pub engine: EngineHandle, + pub tx_event: TxEvent, +} + +/// Main application struct implementing the consensus node functionality +#[derive(Clone)] +pub struct App { + pub config: Config, + pub home_dir: PathBuf, + pub validator_set: ValidatorSet, + pub private_key: PrivateKey, + pub start_height: Option, +} + +impl App { + pub async fn start(&self) -> eyre::Result { + let span = tracing::error_span!("node", moniker = %self.config.moniker); + let _guard = span.enter(); + + let public_key = self.get_public_key(&self.private_key); + let address = self.get_address(&public_key); + let genesis = self.make_genesis( + self.validator_set + .validators + .iter() + .map(|v| (v.public_key, v.voting_power)) + .collect(), + ); + + let ctx = TestContext::new(self.private_key.clone()); + let codec = ProtobufCodec; + + let (mut channels, engine_handle) = malachitebft_app_channel::start_engine( + ctx.clone(), + codec, + self.clone(), + self.config.clone(), + self.start_height, + self.validator_set.clone(), + ) + .await?; + + drop(_guard); + + let store = Store::open(self.get_home_dir().join("store.db"))?; + let start_height = self.start_height.unwrap_or_default(); + let mut state = State::new(ctx, address, start_height, store); + + let tx_event = channels.events.clone(); + + let app_handle = tokio::spawn( + async move { crate::app::run(genesis, &mut state, &mut channels).await } + .instrument(span), + ); + + Ok(Handles { + app: app_handle, + engine: engine_handle, + tx_event, + }) + } +} + +#[async_trait] +impl Node for App { + type Context = TestContext; + type Genesis = Genesis; + type PrivateKeyFile = PrivateKey; + + fn get_home_dir(&self) -> PathBuf { + self.home_dir.to_owned() + } + + fn generate_private_key(&self, rng: R) -> PrivateKey + where + R: RngCore + CryptoRng, + { + PrivateKey::generate(rng) + } + + fn get_address(&self, pk: &PublicKey) -> Address { + Address::from_public_key(pk) + } + + fn get_public_key(&self, pk: &PrivateKey) -> PublicKey { + pk.public_key() + } + + fn get_keypair(&self, pk: PrivateKey) -> Keypair { + Keypair::ed25519_from_bytes(pk.inner().to_bytes()).unwrap() + } + + fn load_private_key(&self, file: Self::PrivateKeyFile) -> PrivateKey { + file + } + + fn load_private_key_file(&self) -> std::io::Result { + Ok(self.private_key.clone()) + } + + fn make_private_key_file(&self, private_key: PrivateKey) -> Self::PrivateKeyFile { + private_key + } + + fn load_genesis(&self, path: impl AsRef) -> std::io::Result { + let genesis = std::fs::read_to_string(path)?; + serde_json::from_str(&genesis).map_err(|e| e.into()) + } + + fn make_genesis(&self, validators: Vec<(PublicKey, VotingPower)>) -> Self::Genesis { + let validators = validators + .into_iter() + .map(|(pk, vp)| Validator::new(pk, vp)); + + let validator_set = ValidatorSet::new(validators); + + Genesis { validator_set } + } + + async fn run(self) -> eyre::Result<()> { + let handles = self.start().await?; + handles.app.await? + } +} diff --git a/code/crates/test/app/src/state.rs b/code/crates/test/app/src/state.rs new file mode 100644 index 000000000..d2c521d32 --- /dev/null +++ b/code/crates/test/app/src/state.rs @@ -0,0 +1,354 @@ +//! Internal state of the application. This is a simplified abstract to keep it simple. +//! A regular application would have mempool implemented, a proper database and input methods like RPC. + +use std::collections::HashSet; + +use bytes::Bytes; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use sha3::Digest; +use tracing::{debug, error}; + +use malachitebft_app_channel::app::consensus::ProposedValue; +use malachitebft_app_channel::app::host::LocallyProposedValue; +use malachitebft_app_channel::app::streaming::{StreamContent, StreamMessage}; +use malachitebft_app_channel::app::types::codec::Codec; +use malachitebft_app_channel::app::types::core::{CommitCertificate, Round, Validity}; +use malachitebft_app_channel::app::types::PeerId; +use malachitebft_test::codec::proto::ProtobufCodec; +use malachitebft_test::{ + Address, Height, ProposalData, ProposalFin, ProposalInit, ProposalPart, TestContext, Value, +}; + +use crate::store::{DecidedValue, Store}; +use crate::streaming::{PartStreamsMap, ProposalParts}; + +/// Represents the internal state of the application node +/// Contains information about current height, round, proposals and blocks +pub struct State { + ctx: TestContext, + address: Address, + store: Store, + stream_id: u64, + streams_map: PartStreamsMap, + rng: StdRng, + + pub current_height: Height, + pub current_round: Round, + pub current_proposer: Option
, + pub peers: HashSet, +} + +// Make up a seed for the rng based on our address in +// order for each node to likely propose different values at +// each round. +fn seed_from_address(address: &Address) -> u64 { + address.into_inner().chunks(8).fold(0u64, |acc, chunk| { + let term = chunk.iter().fold(0u64, |acc, &x| { + acc.wrapping_shl(8).wrapping_add(u64::from(x)) + }); + acc.wrapping_add(term) + }) +} + +impl State { + /// Creates a new State instance with the given validator address and starting height + pub fn new(ctx: TestContext, address: Address, height: Height, store: Store) -> Self { + Self { + ctx, + current_height: height, + current_round: Round::new(0), + current_proposer: None, + address, + store, + stream_id: 0, + streams_map: PartStreamsMap::new(), + rng: StdRng::seed_from_u64(seed_from_address(&address)), + peers: HashSet::new(), + } + } + + /// Returns the earliest height available in the state + pub async fn get_earliest_height(&self) -> Height { + self.store + .min_decided_value_height() + .await + .unwrap_or_default() + } + + /// Processes and adds a new proposal to the state if it's valid + /// Returns Some(ProposedValue) if the proposal was accepted, None otherwise + pub async fn received_proposal_part( + &mut self, + from: PeerId, + part: StreamMessage, + ) -> eyre::Result>> { + let sequence = part.sequence; + + // Check if we have a full proposal + let Some(parts) = self.streams_map.insert(from, part) else { + return Ok(None); + }; + + // Check if the proposal is outdated + if parts.height < self.current_height { + debug!( + height = %self.current_height, + round = %self.current_round, + part.height = %parts.height, + part.round = %parts.round, + part.sequence = %sequence, + "Received outdated proposal part, ignoring" + ); + + return Ok(None); + } + + // Re-assemble the proposal from its parts + let value = assemble_value_from_parts(parts); + + self.store.store_undecided_proposal(value.clone()).await?; + + Ok(Some(value)) + } + + /// Retrieves a decided block at the given height + pub async fn get_decided_value(&self, height: Height) -> Option { + self.store.get_decided_value(height).await.ok().flatten() + } + + /// Commits a value with the given certificate, updating internal state + /// and moving to the next height + pub async fn commit( + &mut self, + certificate: CommitCertificate, + ) -> eyre::Result<()> { + let Ok(Some(proposal)) = self + .store + .get_undecided_proposal(certificate.height, certificate.round) + .await + else { + error!( + height = %certificate.height, + "Trying to commit a value that is not decided" + ); + + return Ok(()); // FIXME + }; + + self.store + .store_decided_value(&certificate, proposal.value) + .await?; + + // Prune the store, keep the last 5 heights + let retain_height = Height::new(certificate.height.as_u64().saturating_sub(5)); + self.store.prune(retain_height).await?; + + // Move to next height + self.current_height = self.current_height.increment(); + self.current_round = Round::new(0); + + Ok(()) + } + + /// Retrieves a previously built proposal value for the given height + pub async fn get_previously_built_value( + &self, + height: Height, + round: Round, + ) -> eyre::Result>> { + let Some(proposal) = self.store.get_undecided_proposal(height, round).await? else { + return Ok(None); + }; + + Ok(Some(LocallyProposedValue::new( + proposal.height, + proposal.round, + proposal.value, + proposal.extension.clone(), + ))) + } + + /// Creates a new proposal value for the given height + /// Returns either a previously built proposal or creates a new one + async fn create_proposal( + &mut self, + height: Height, + round: Round, + ) -> eyre::Result> { + assert_eq!(height, self.current_height); + assert_eq!(round, self.current_round); + + // We create a new value. + let value = self.make_value(); + + let proposal = ProposedValue { + height, + round, + valid_round: Round::Nil, + proposer: self.address, // We are the proposer + value, + validity: Validity::Valid, // Our proposals are de facto valid + extension: None, // Vote extension can be added here + }; + + // Insert the new proposal into the undecided proposals. + self.store + .store_undecided_proposal(proposal.clone()) + .await?; + + Ok(proposal) + } + + /// Make up a new value to propose + /// A real application would have a more complex logic here, + /// typically reaping transactions from a mempool and executing them against its state, + /// before computing the merkle root of the new app state. + fn make_value(&mut self) -> Value { + let value = self.rng.gen_range(100..=100000); + Value::new(value) + } + + /// Creates a new proposal value for the given height + /// Returns either a previously built proposal or creates a new one + pub async fn propose_value( + &mut self, + height: Height, + round: Round, + ) -> eyre::Result> { + assert_eq!(height, self.current_height); + assert_eq!(round, self.current_round); + + let proposal = self.create_proposal(height, round).await?; + + Ok(LocallyProposedValue::new( + proposal.height, + proposal.round, + proposal.value, + proposal.extension, + )) + } + + /// Creates a stream message containing a proposal part. + /// Updates internal sequence number and current proposal. + pub fn stream_proposal( + &mut self, + value: LocallyProposedValue, + ) -> impl Iterator> { + let parts = self.value_to_parts(value); + + let stream_id = self.stream_id; + self.stream_id += 1; + + let mut msgs = Vec::with_capacity(parts.len() + 1); + let mut sequence = 0; + + for part in parts { + let msg = StreamMessage::new(stream_id, sequence, StreamContent::Data(part)); + sequence += 1; + msgs.push(msg); + } + + msgs.push(StreamMessage::new( + stream_id, + sequence, + StreamContent::Fin(true), + )); + + msgs.into_iter() + } + + fn value_to_parts(&self, value: LocallyProposedValue) -> Vec { + let mut hasher = sha3::Keccak256::new(); + let mut parts = Vec::new(); + + // Init + // Include metadata about the proposal + { + parts.push(ProposalPart::Init(ProposalInit::new( + value.height, + value.round, + self.address, + ))); + + hasher.update(value.height.as_u64().to_be_bytes().as_slice()); + hasher.update(value.round.as_i64().to_be_bytes().as_slice()); + + if let Some(ext) = &value.extension { + hasher.update(ext.data.as_ref()); + } + } + + // Data + // Include each prime factor of the value as a separate proposal part + { + for factor in factor_value(value.value) { + parts.push(ProposalPart::Data(ProposalData::new(factor))); + + hasher.update(factor.to_be_bytes().as_slice()); + } + } + + // Fin + // Sign the hash of the proposal parts + { + let hash = hasher.finalize().to_vec(); + let signature = self.ctx.signing_provider.sign(&hash); + parts.push(ProposalPart::Fin(ProposalFin::new(signature))); + } + + parts + } +} + +/// Re-assemble a [`ProposedValue`] from its [`ProposalParts`]. +/// +/// This is done by multiplying all the factors in the parts. +fn assemble_value_from_parts(parts: ProposalParts) -> ProposedValue { + let value = parts + .parts + .iter() + .filter_map(|part| part.as_data()) + .fold(1, |acc, data| acc * data.factor); + + ProposedValue { + height: parts.height, + round: parts.round, + valid_round: Round::Nil, + proposer: parts.proposer, + value: Value::new(value), + validity: Validity::Valid, // TODO: Check signature in Fin part + extension: None, + } +} + +/// Decodes a Value from its byte representation using ProtobufCodec +pub fn decode_value(bytes: Bytes) -> Value { + ProtobufCodec.decode(bytes).unwrap() +} + +/// Returns the list of prime factors of the given value +/// +/// In a real application, this would typically split transactions +/// into chunks ino order to reduce bandwidth requirements due +/// to duplication of gossip messages. +fn factor_value(value: Value) -> Vec { + let mut factors = Vec::new(); + let mut n = value.as_u64(); + + let mut i = 2; + while i * i <= n { + if n % i == 0 { + factors.push(i); + n /= i; + } else { + i += 1; + } + } + + if n > 1 { + factors.push(n); + } + + factors +} diff --git a/code/crates/test/app/src/store.rs b/code/crates/test/app/src/store.rs new file mode 100644 index 000000000..5fdf34305 --- /dev/null +++ b/code/crates/test/app/src/store.rs @@ -0,0 +1,308 @@ +use std::ops::RangeBounds; +use std::path::Path; +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use redb::ReadableTable; +use thiserror::Error; +use tracing::error; + +use malachitebft_app_channel::app::types::codec::Codec; +use malachitebft_app_channel::app::types::core::{CommitCertificate, Round}; +use malachitebft_app_channel::app::types::ProposedValue; +use malachitebft_proto::{Error as ProtoError, Protobuf}; +use malachitebft_test::codec::proto as codec; +use malachitebft_test::codec::proto::ProtobufCodec; +use malachitebft_test::proto; +use malachitebft_test::{Height, TestContext, Value}; + +mod keys; +use keys::{HeightKey, UndecidedValueKey}; + +#[derive(Clone, Debug)] +pub struct DecidedValue { + pub value: Value, + pub certificate: CommitCertificate, +} + +fn decode_certificate(bytes: &[u8]) -> Result, ProtoError> { + let proto = proto::CommitCertificate::decode(bytes)?; + codec::decode_certificate(proto) +} + +fn encode_certificate(certificate: &CommitCertificate) -> Result, ProtoError> { + let proto = codec::encode_certificate(certificate)?; + Ok(proto.encode_to_vec()) +} + +#[derive(Debug, Error)] +pub enum StoreError { + #[error("Database error: {0}")] + Database(#[from] redb::DatabaseError), + + #[error("Storage error: {0}")] + Storage(#[from] redb::StorageError), + + #[error("Table error: {0}")] + Table(#[from] redb::TableError), + + #[error("Commit error: {0}")] + Commit(#[from] redb::CommitError), + + #[error("Transaction error: {0}")] + Transaction(#[from] redb::TransactionError), + + #[error("Failed to encode/decode Protobuf: {0}")] + Protobuf(#[from] ProtoError), + + #[error("Failed to join on task: {0}")] + TaskJoin(#[from] tokio::task::JoinError), +} + +const CERTIFICATES_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("certificates"); + +const DECIDED_VALUES_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("decided_values"); + +const UNDECIDED_PROPOSALS_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("undecided_values"); + +struct Db { + db: redb::Database, +} + +impl Db { + fn new(path: impl AsRef) -> Result { + Ok(Self { + db: redb::Database::create(path).map_err(StoreError::Database)?, + }) + } + + fn get_decided_value(&self, height: Height) -> Result, StoreError> { + let tx = self.db.begin_read()?; + let value = { + let table = tx.open_table(DECIDED_VALUES_TABLE)?; + let value = table.get(&height)?; + value.and_then(|value| Value::from_bytes(&value.value()).ok()) + }; + let certificate = { + let table = tx.open_table(CERTIFICATES_TABLE)?; + let value = table.get(&height)?; + value.and_then(|value| decode_certificate(&value.value()).ok()) + }; + + let decided_value = value + .zip(certificate) + .map(|(value, certificate)| DecidedValue { value, certificate }); + + Ok(decided_value) + } + + fn insert_decided_value(&self, decided_value: DecidedValue) -> Result<(), StoreError> { + let height = decided_value.certificate.height; + + let tx = self.db.begin_write()?; + { + let mut values = tx.open_table(DECIDED_VALUES_TABLE)?; + values.insert(height, decided_value.value.to_bytes()?.to_vec())?; + } + { + let mut certificates = tx.open_table(CERTIFICATES_TABLE)?; + certificates.insert(height, encode_certificate(&decided_value.certificate)?)?; + } + tx.commit()?; + + Ok(()) + } + + #[tracing::instrument(skip(self))] + pub fn get_undecided_proposal( + &self, + height: Height, + round: Round, + ) -> Result>, StoreError> { + let tx = self.db.begin_read()?; + let table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + + let value = if let Ok(Some(value)) = table.get(&(height, round)) { + Some( + ProtobufCodec + .decode(Bytes::from(value.value())) + .map_err(StoreError::Protobuf)?, + ) + } else { + None + }; + + Ok(value) + } + + fn insert_undecided_proposal( + &self, + proposal: ProposedValue, + ) -> Result<(), StoreError> { + let key = (proposal.height, proposal.round); + let value = ProtobufCodec.encode(&proposal)?; + let tx = self.db.begin_write()?; + { + let mut table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + table.insert(key, value.to_vec())?; + } + tx.commit()?; + Ok(()) + } + + fn height_range( + &self, + table: &Table, + range: impl RangeBounds, + ) -> Result, StoreError> + where + Table: redb::ReadableTable>, + { + Ok(table + .range(range)? + .flatten() + .map(|(key, _)| key.value()) + .collect::>()) + } + + fn undecided_proposals_range
( + &self, + table: &Table, + range: impl RangeBounds<(Height, Round)>, + ) -> Result, StoreError> + where + Table: redb::ReadableTable>, + { + Ok(table + .range(range)? + .flatten() + .map(|(key, _)| key.value()) + .collect::>()) + } + + fn prune(&self, retain_height: Height) -> Result, StoreError> { + let tx = self.db.begin_write().unwrap(); + let pruned = { + let mut undecided = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + let keys = self.undecided_proposals_range(&undecided, ..(retain_height, Round::Nil))?; + for key in keys { + undecided.remove(key)?; + } + + let mut decided = tx.open_table(DECIDED_VALUES_TABLE)?; + let mut certificates = tx.open_table(CERTIFICATES_TABLE)?; + + let keys = self.height_range(&decided, ..retain_height)?; + for key in &keys { + decided.remove(key)?; + certificates.remove(key)?; + } + keys + }; + tx.commit()?; + + Ok(pruned) + } + + fn min_decided_value_height(&self) -> Option { + let tx = self.db.begin_read().unwrap(); + let table = tx.open_table(DECIDED_VALUES_TABLE).unwrap(); + let (key, _) = table.first().ok()??; + Some(key.value()) + } + + // fn max_decided_value_height(&self) -> Option { + // let tx = self.db.begin_read().unwrap(); + // let table = tx.open_table(DECIDED_VALUES_TABLE).unwrap(); + // let (key, _) = table.last().ok()??; + // Some(key.value()) + // } + + fn create_tables(&self) -> Result<(), StoreError> { + let tx = self.db.begin_write()?; + // Implicitly creates the tables if they do not exist yet + let _ = tx.open_table(DECIDED_VALUES_TABLE)?; + let _ = tx.open_table(CERTIFICATES_TABLE)?; + let _ = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + tx.commit()?; + Ok(()) + } +} + +#[derive(Clone)] +pub struct Store { + db: Arc, +} + +impl Store { + pub fn open(path: impl AsRef) -> Result { + let db = Db::new(path)?; + db.create_tables()?; + + Ok(Self { db: Arc::new(db) }) + } + + pub async fn min_decided_value_height(&self) -> Option { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.min_decided_value_height()) + .await + .ok() + .flatten() + } + + // pub async fn max_decided_value_height(&self) -> Option { + // let db = Arc::clone(&self.db); + // tokio::task::spawn_blocking(move || db.max_decided_value_height()) + // .await + // .ok() + // .flatten() + // } + + pub async fn get_decided_value( + &self, + height: Height, + ) -> Result, StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.get_decided_value(height)).await? + } + + pub async fn store_decided_value( + &self, + certificate: &CommitCertificate, + value: Value, + ) -> Result<(), StoreError> { + let decided_value = DecidedValue { + value, + certificate: certificate.clone(), + }; + + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.insert_decided_value(decided_value)).await? + } + + pub async fn store_undecided_proposal( + &self, + value: ProposedValue, + ) -> Result<(), StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.insert_undecided_proposal(value)).await? + } + + pub async fn get_undecided_proposal( + &self, + height: Height, + round: Round, + ) -> Result>, StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.get_undecided_proposal(height, round)).await? + } + + pub async fn prune(&self, retain_height: Height) -> Result, StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.prune(retain_height)).await? + } +} diff --git a/code/crates/test/app/src/store/keys.rs b/code/crates/test/app/src/store/keys.rs new file mode 100644 index 000000000..e45cc83c8 --- /dev/null +++ b/code/crates/test/app/src/store/keys.rs @@ -0,0 +1,83 @@ +use core::mem::size_of; + +use malachitebft_app_channel::app::types::core::Round; +use malachitebft_test::Height; + +pub type UndecidedValueKey = (HeightKey, RoundKey); + +#[derive(Copy, Clone, Debug)] +pub struct HeightKey; + +impl redb::Value for HeightKey { + type SelfType<'a> = Height; + type AsBytes<'a> = [u8; size_of::()]; + + fn fixed_width() -> Option { + Some(size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let height = ::from_bytes(data); + + Height::new(height) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + ::as_bytes(&value.as_u64()) + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("Height") + } +} + +impl redb::Key for HeightKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + ::compare(data1, data2) + } +} + +#[derive(Copy, Clone, Debug)] +pub struct RoundKey; + +impl redb::Value for RoundKey { + type SelfType<'a> = Round; + type AsBytes<'a> = [u8; size_of::()]; + + fn fixed_width() -> Option { + Some(size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let round = ::from_bytes(data); + Round::from(round) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + ::as_bytes(&value.as_i64()) + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("Round") + } +} + +impl redb::Key for RoundKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + ::compare(data1, data2) + } +} diff --git a/code/crates/test/app/src/streaming.rs b/code/crates/test/app/src/streaming.rs new file mode 100644 index 000000000..6792b0a5a --- /dev/null +++ b/code/crates/test/app/src/streaming.rs @@ -0,0 +1,136 @@ +use std::cmp::Ordering; +use std::collections::{BTreeMap, BinaryHeap, HashSet}; + +use malachitebft_app_channel::app::consensus::PeerId; +use malachitebft_app_channel::app::streaming::{Sequence, StreamId, StreamMessage}; +use malachitebft_app_channel::app::types::core::Round; +use malachitebft_test::{Address, Height, ProposalInit, ProposalPart}; + +struct MinSeq(StreamMessage); + +impl PartialEq for MinSeq { + fn eq(&self, other: &Self) -> bool { + self.0.sequence == other.0.sequence + } +} + +impl Eq for MinSeq {} + +impl Ord for MinSeq { + fn cmp(&self, other: &Self) -> Ordering { + other.0.sequence.cmp(&self.0.sequence) + } +} + +impl PartialOrd for MinSeq { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +struct MinHeap(BinaryHeap>); + +impl Default for MinHeap { + fn default() -> Self { + Self(BinaryHeap::new()) + } +} + +impl MinHeap { + fn push(&mut self, msg: StreamMessage) { + self.0.push(MinSeq(msg)); + } + + fn len(&self) -> usize { + self.0.len() + } + + fn drain(&mut self) -> Vec { + self.0 + .drain() + .filter_map(|msg| msg.0.content.into_data()) + .collect() + } +} + +#[derive(Default)] +struct StreamState { + buffer: MinHeap, + init_info: Option, + seen_sequences: HashSet, + total_messages: usize, + fin_received: bool, +} + +impl StreamState { + fn is_done(&self) -> bool { + self.init_info.is_some() && self.fin_received && self.buffer.len() == self.total_messages + } + + fn insert(&mut self, msg: StreamMessage) -> Option { + if msg.is_first() { + self.init_info = msg.content.as_data().and_then(|p| p.as_init()).cloned(); + } + + if msg.is_fin() { + self.fin_received = true; + self.total_messages = msg.sequence as usize + 1; + } + + self.buffer.push(msg); + + if self.is_done() { + let init_info = self.init_info.take()?; + + Some(ProposalParts { + height: init_info.height, + round: init_info.round, + proposer: init_info.proposer, + parts: self.buffer.drain(), + }) + } else { + None + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ProposalParts { + pub height: Height, + pub round: Round, + pub proposer: Address, + pub parts: Vec, +} + +#[derive(Default)] +pub struct PartStreamsMap { + streams: BTreeMap<(PeerId, StreamId), StreamState>, +} + +impl PartStreamsMap { + pub fn new() -> Self { + Self::default() + } + + pub fn insert( + &mut self, + peer_id: PeerId, + msg: StreamMessage, + ) -> Option { + let stream_id = msg.stream_id; + let state = self.streams.entry((peer_id, stream_id)).or_default(); + + if !state.seen_sequences.insert(msg.sequence) { + // We have already seen a message with this sequence number. + return None; + } + + let result = state.insert(msg); + + if state.is_done() { + self.streams.remove(&(peer_id, stream_id)); + } + + result + } +} diff --git a/code/crates/test/framework/Cargo.toml b/code/crates/test/framework/Cargo.toml new file mode 100644 index 000000000..cb4d88a0b --- /dev/null +++ b/code/crates/test/framework/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "informalsystems-malachitebft-test-framework" +publish = false + +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true + +[dependencies] +malachitebft-engine.workspace = true +malachitebft-core-types.workspace = true +malachitebft-config.workspace = true +malachitebft-core-consensus.workspace = true +malachitebft-metrics.workspace = true +malachitebft-test.workspace = true +malachitebft-test-app.workspace = true +malachitebft-app-channel.workspace = true + +axum.workspace = true +bytesize.workspace = true +eyre.workspace = true +rand.workspace = true +ractor.workspace = true +tokio.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +tempfile.workspace = true + +[lints] +workspace = true diff --git a/code/crates/test/framework/src/lib.rs b/code/crates/test/framework/src/lib.rs new file mode 100644 index 000000000..dcdd5a59b --- /dev/null +++ b/code/crates/test/framework/src/lib.rs @@ -0,0 +1,770 @@ +use core::fmt; +use std::fs::{create_dir_all, remove_dir_all}; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use eyre::bail; +use malachitebft_test_app::node::App; +use rand::rngs::StdRng; +use rand::SeedableRng; +use tokio::task::JoinSet; +use tokio::time::{sleep, Duration}; +use tracing::{debug, error, error_span, info, Instrument}; + +use malachitebft_config::{ + Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, PubSubProtocol, SyncConfig, + TestConfig, TransportProtocol, +}; +use malachitebft_core_consensus::{SignedConsensusMsg, ValueToPropose}; +use malachitebft_core_types::{SignedVote, VotingPower}; +use malachitebft_engine::util::events::{Event, RxEvent, TxEvent}; +use malachitebft_test::{Height, PrivateKey, TestContext, Validator, ValidatorSet}; + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Expected { + Exactly(usize), + AtLeast(usize), + AtMost(usize), + LessThan(usize), + GreaterThan(usize), +} + +impl Expected { + pub fn check(&self, actual: usize) -> bool { + match self { + Expected::Exactly(expected) => actual == *expected, + Expected::AtLeast(expected) => actual >= *expected, + Expected::AtMost(expected) => actual <= *expected, + Expected::LessThan(expected) => actual < *expected, + Expected::GreaterThan(expected) => actual > *expected, + } + } +} + +impl fmt::Display for Expected { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Expected::Exactly(n) => write!(f, "exactly {n}"), + Expected::AtLeast(n) => write!(f, "at least {n}"), + Expected::AtMost(n) => write!(f, "at most {n}"), + Expected::LessThan(n) => write!(f, "less than {n}"), + Expected::GreaterThan(n) => write!(f, "greater than {n}"), + } + } +} + +pub struct TestParams { + pub enable_sync: bool, + pub protocol: PubSubProtocol, + pub block_size: ByteSize, + pub tx_size: ByteSize, + pub txs_per_part: usize, + pub vote_extensions: Option, + pub value_payload: ValuePayload, + pub max_retain_blocks: usize, + pub timeout_step: Duration, +} + +impl Default for TestParams { + fn default() -> Self { + Self { + enable_sync: false, + protocol: PubSubProtocol::default(), + block_size: ByteSize::mib(1), + tx_size: ByteSize::kib(1), + txs_per_part: 256, + vote_extensions: None, + value_payload: ValuePayload::default(), + max_retain_blocks: 50, + timeout_step: Duration::from_secs(30), + } + } +} + +impl TestParams { + fn apply_to_config(&self, config: &mut Config) { + config.sync.enabled = self.enable_sync; + config.consensus.p2p.protocol = self.protocol; + config.consensus.max_block_size = self.block_size; + config.consensus.value_payload = self.value_payload; + config.test.tx_size = self.tx_size; + config.test.txs_per_part = self.txs_per_part; + config.test.vote_extensions.enabled = self.vote_extensions.is_some(); + config.test.vote_extensions.size = self.vote_extensions.unwrap_or_default(); + config.test.max_retain_blocks = self.max_retain_blocks; + config.consensus.timeouts.timeout_step = self.timeout_step; + } +} + +pub enum Step { + Crash(Duration), + ResetDb, + Restart(Duration), + WaitUntil(u64), + OnEvent(EventHandler), + Expect(Expected), + Success, + Fail(String), +} + +#[derive(Copy, Clone, Debug)] +pub enum HandlerResult { + WaitForNextEvent, + ContinueTest, +} + +pub type EventHandler = + Box, &mut S) -> Result + Send + Sync>; + +pub type NodeId = usize; + +pub struct TestNode { + pub id: NodeId, + pub voting_power: VotingPower, + pub start_height: Height, + pub start_delay: Duration, + pub steps: Vec>, + pub state: State, +} + +impl TestNode { + pub fn new(id: usize) -> Self + where + State: Default, + { + Self::new_with_state(id, State::default()) + } + + pub fn new_with_state(id: usize, state: State) -> Self { + Self { + id, + voting_power: 1, + start_height: Height::new(1), + start_delay: Duration::from_secs(0), + steps: vec![], + state, + } + } + + pub fn with_state(&mut self, state: State) -> &mut Self { + self.state = state; + self + } + + pub fn with_voting_power(&mut self, power: VotingPower) -> &mut Self { + self.voting_power = power; + self + } + + pub fn start(&mut self) -> &mut Self { + self.start_at(1) + } + + pub fn start_at(&mut self, height: u64) -> &mut Self { + self.start_after(height, Duration::from_secs(0)) + } + + pub fn start_after(&mut self, height: u64, delay: Duration) -> &mut Self { + self.start_height = Height::new(height); + self.start_delay = delay; + self + } + + pub fn crash(&mut self) -> &mut Self { + self.steps.push(Step::Crash(Duration::from_secs(0))); + self + } + + pub fn crash_after(&mut self, duration: Duration) -> &mut Self { + self.steps.push(Step::Crash(duration)); + self + } + + pub fn reset_db(&mut self) -> &mut Self { + self.steps.push(Step::ResetDb); + self + } + + pub fn restart_after(&mut self, delay: Duration) -> &mut Self { + self.steps.push(Step::Restart(delay)); + self + } + + pub fn wait_until(&mut self, height: u64) -> &mut Self { + self.steps.push(Step::WaitUntil(height)); + self + } + + pub fn on_event(&mut self, on_event: F) -> &mut Self + where + F: Fn(Event, &mut State) -> Result + + Send + + Sync + + 'static, + { + self.steps.push(Step::OnEvent(Box::new(on_event))); + self + } + + pub fn expect_wal_replay(&mut self, at_height: u64) -> &mut Self { + self.on_event(move |event, _| { + let Event::WalReplayBegin(height, count) = event else { + return Ok(HandlerResult::WaitForNextEvent); + }; + + info!("Replaying WAL at height {height} with {count} messages"); + + if height.as_u64() != at_height { + bail!("Unexpected WAL replay at height {height}, expected {at_height}") + } + + Ok(HandlerResult::ContinueTest) + }) + } + + pub fn expect_vote_set_request(&mut self, at_height: u64) -> &mut Self { + self.on_event(move |event, _| { + let Event::RequestedVoteSet(height, round) = event else { + return Ok(HandlerResult::WaitForNextEvent); + }; + + info!("Requested vote set for height {height} and round {round}"); + + if height.as_u64() != at_height { + bail!("Unexpected vote set request for height {height}, expected {at_height}") + } + + Ok(HandlerResult::ContinueTest) + }) + } + + pub fn on_proposed_value(&mut self, f: F) -> &mut Self + where + F: Fn(ValueToPropose, &mut State) -> Result + + Send + + Sync + + 'static, + { + self.on_event(move |event, state| { + if let Event::ProposedValue(value) = event { + f(value, state) + } else { + Ok(HandlerResult::WaitForNextEvent) + } + }) + } + + pub fn on_vote(&mut self, f: F) -> &mut Self + where + F: Fn(SignedVote, &mut State) -> Result + + Send + + Sync + + 'static, + { + self.on_event(move |event, state| { + if let Event::Published(SignedConsensusMsg::Vote(vote)) = event { + f(vote, state) + } else { + Ok(HandlerResult::WaitForNextEvent) + } + }) + } + + pub fn expect_decisions(&mut self, expected: Expected) -> &mut Self { + self.steps.push(Step::Expect(expected)); + self + } + + pub fn success(&mut self) -> &mut Self { + self.steps.push(Step::Success); + self + } +} + +fn unique_id() -> usize { + use std::sync::atomic::{AtomicUsize, Ordering}; + static ID: AtomicUsize = AtomicUsize::new(1); + ID.fetch_add(1, Ordering::SeqCst) +} + +pub struct TestBuilder { + nodes: Vec>, +} + +impl Default for TestBuilder { + fn default() -> Self { + Self { nodes: Vec::new() } + } +} + +impl TestBuilder +where + S: Send + Sync + 'static, +{ + pub fn new() -> Self { + Self::default() + } + + pub fn add_node(&mut self) -> &mut TestNode + where + S: Default, + { + let node = TestNode::new(self.nodes.len() + 1); + self.nodes.push(node); + self.nodes.last_mut().unwrap() + } + + pub fn build(self) -> Test { + Test::new(self.nodes) + } +} + +pub struct Test { + pub id: usize, + pub nodes: Vec>, + pub private_keys: Vec, + pub validator_set: ValidatorSet, + pub consensus_base_port: usize, + pub mempool_base_port: usize, + pub metrics_base_port: usize, +} + +impl Test +where + S: Send + Sync + 'static, +{ + pub fn new(nodes: Vec>) -> Self { + let vals_and_keys = make_validators(voting_powers(&nodes)); + let (validators, private_keys): (Vec<_>, Vec<_>) = vals_and_keys.into_iter().unzip(); + let validator_set = ValidatorSet::new(validators); + let id = unique_id(); + let base_port = 20_000 + id * 1000; + + Self { + id, + nodes, + private_keys, + validator_set, + consensus_base_port: base_port, + mempool_base_port: base_port + 100, + metrics_base_port: base_port + 200, + } + } + + pub fn generate_default_configs(&self) -> Vec { + (0..self.nodes.len()) + .map(|i| make_node_config(self, i)) + .collect() + } + + pub fn generate_custom_configs(&self, params: TestParams) -> Vec { + let mut configs = self.generate_default_configs(); + for config in &mut configs { + params.apply_to_config(config); + } + configs + } + + pub async fn run(self, timeout: Duration) { + let configs = self.generate_default_configs(); + self.run_with_config(configs, timeout).await + } + + pub async fn run_with_custom_config(self, timeout: Duration, params: TestParams) { + let configs = self.generate_custom_configs(params); + self.run_with_config(configs, timeout).await + } + + pub async fn run_with_config(self, configs: Vec, timeout: Duration) { + let _span = error_span!("test", id = %self.id).entered(); + + let mut set = JoinSet::new(); + + for ((node, config), private_key) in self + .nodes + .into_iter() + .zip(configs.into_iter()) + .zip(self.private_keys.into_iter()) + { + let validator_set = self.validator_set.clone(); + + let home_dir = tempfile::TempDir::with_prefix(format!( + "informalsystems-malachitebft-starknet-test-{}", + self.id + )) + .unwrap() + .into_path(); + + set.spawn( + async move { + let id = node.id; + let result = run_node(node, home_dir, config, validator_set, private_key).await; + (id, result) + } + .in_current_span(), + ); + } + + let metrics = tokio::spawn(serve_metrics("127.0.0.1:0".parse().unwrap())); + let results = tokio::time::timeout(timeout, set.join_all()).await; + metrics.abort(); + + match results { + Ok(results) => { + check_results(results); + } + Err(_) => { + error!("Test timed out after {timeout:?}"); + std::process::exit(1); + } + } + } +} + +fn check_results(results: Vec<(NodeId, TestResult)>) { + let mut errors = 0; + + for (id, result) in results { + let _span = tracing::error_span!("node", %id).entered(); + match result { + TestResult::Success(reason) => { + info!("Test succeeded: {reason}"); + } + TestResult::Failure(reason) => { + errors += 1; + error!("Test failed: {reason}"); + } + } + } + + if errors > 0 { + error!("Test failed with {errors} errors"); + std::process::exit(1); + } +} + +pub enum TestResult { + Success(String), + Failure(String), +} + +#[tracing::instrument("node", skip_all, fields(id = %node.id))] +async fn run_node( + mut node: TestNode, + home_dir: PathBuf, + config: Config, + validator_set: ValidatorSet, + private_key: PrivateKey, +) -> TestResult { + sleep(node.start_delay).await; + + info!("Spawning node with voting power {}", node.voting_power); + + let app = App { + config, + home_dir: home_dir.clone(), + private_key, + validator_set, + start_height: Some(node.start_height), + }; + + let mut handles = app.start().await.unwrap(); + + let mut rx_event = handles.tx_event.subscribe(); + let rx_event_bg = handles.tx_event.subscribe(); + + let decisions = Arc::new(AtomicUsize::new(0)); + let current_height = Arc::new(AtomicUsize::new(0)); + + let spawn_bg = |mut rx: RxEvent| { + tokio::spawn({ + let decisions = Arc::clone(&decisions); + let current_height = Arc::clone(¤t_height); + + async move { + while let Ok(event) = rx.recv().await { + match &event { + Event::StartedHeight(height) => { + current_height.store(height.as_u64() as usize, Ordering::SeqCst); + } + Event::Decided(_) => { + decisions.fetch_add(1, Ordering::SeqCst); + } + _ => (), + } + + debug!("Event: {event}"); + } + } + .in_current_span() + }) + }; + + let mut bg = spawn_bg(rx_event_bg); + + for step in node.steps { + match step { + Step::WaitUntil(target_height) => { + info!("Waiting until node reaches height {target_height}"); + + 'inner: while let Ok(event) = rx_event.recv().await { + let Event::StartedHeight(height) = event else { + continue; + }; + + info!("Node started height {height}"); + + if height.as_u64() == target_height { + break 'inner; + } + } + } + + Step::Crash(after) => { + let height = current_height.load(Ordering::SeqCst); + + info!("Node will crash at height {height}"); + sleep(after).await; + + handles + .engine + .actor + .kill_and_wait(None) + .await + .expect("Node must stop"); + + bg.abort(); + handles.app.abort(); + handles.engine.handle.abort(); + } + + Step::ResetDb => { + info!("Resetting database"); + + let db_path = home_dir.join("db"); + remove_dir_all(&db_path).expect("Database must be removed"); + create_dir_all(&db_path).expect("Database must be created"); + } + + Step::Restart(after) => { + info!("Node will restart in {after:?}"); + + sleep(after).await; + + let tx_event = TxEvent::new(); + let new_rx_event = tx_event.subscribe(); + let new_rx_event_bg = tx_event.subscribe(); + + info!("Spawning node"); + let new_handles = app.start().await.unwrap(); + + info!("Spawned"); + + bg = spawn_bg(new_rx_event_bg); + handles = new_handles; + rx_event = new_rx_event; + } + + Step::OnEvent(on_event) => { + 'inner: while let Ok(event) = rx_event.recv().await { + match on_event(event, &mut node.state) { + Ok(HandlerResult::WaitForNextEvent) => { + continue 'inner; + } + Ok(HandlerResult::ContinueTest) => { + break 'inner; + } + Err(e) => { + bg.abort(); + handles.engine.actor.stop(Some("Test failed".to_string())); + handles.app.abort(); + handles.engine.handle.abort(); + + return TestResult::Failure(e.to_string()); + } + } + } + } + + Step::Expect(expected) => { + let actual = decisions.load(Ordering::SeqCst); + + bg.abort(); + handles.engine.actor.stop(Some("Test failed".to_string())); + handles.app.abort(); + handles.engine.handle.abort(); + + if expected.check(actual) { + return TestResult::Success(format!( + "Correct number of decisions: got {actual}, expected: {expected}" + )); + } else { + return TestResult::Failure(format!( + "Incorrect number of decisions: got {actual}, expected: {expected}" + )); + } + } + + Step::Success => { + break; + } + + Step::Fail(reason) => { + bg.abort(); + handles.engine.actor.stop(Some("Test failed".to_string())); + handles.app.abort(); + handles.engine.handle.abort(); + + return TestResult::Failure(reason); + } + } + } + + return TestResult::Success("OK".to_string()); +} + +pub fn init_logging(test_module: &str) { + use tracing_subscriber::util::SubscriberInitExt; + use tracing_subscriber::{EnvFilter, FmtSubscriber}; + + let debug_vars = &[("ACTIONS_RUNNER_DEBUG", "true"), ("MALACHITE_DEBUG", "1")]; + let enable_debug = debug_vars + .iter() + .any(|(k, v)| std::env::var(k).as_deref() == Ok(v)); + + let directive = if enable_debug { + format!("{test_module}=debug,ractor=error,debug") + } else { + format!("{test_module}=debug,ractor=error,warn") + }; + + let filter = EnvFilter::builder().parse(directive).unwrap(); + + pub fn enable_ansi() -> bool { + use std::io::IsTerminal; + std::io::stdout().is_terminal() && std::io::stderr().is_terminal() + } + + // Construct a tracing subscriber with the supplied filter and enable reloading. + let builder = FmtSubscriber::builder() + .with_target(false) + .with_env_filter(filter) + .with_test_writer() + .with_ansi(enable_ansi()) + .with_thread_ids(false); + + let subscriber = builder.finish(); + + if let Err(e) = subscriber.try_init() { + eprintln!("Failed to initialize logging: {e}"); + } +} + +use bytesize::ByteSize; + +use malachitebft_config::{ + ConsensusConfig, MempoolConfig, MetricsConfig, P2pConfig, RuntimeConfig, TimeoutConfig, + ValuePayload, +}; + +fn transport_from_env(default: TransportProtocol) -> TransportProtocol { + if let Ok(protocol) = std::env::var("MALACHITE_TRANSPORT") { + TransportProtocol::from_str(&protocol).unwrap_or(default) + } else { + default + } +} + +pub fn make_node_config(test: &Test, i: usize) -> NodeConfig { + let transport = transport_from_env(TransportProtocol::Tcp); + let protocol = PubSubProtocol::default(); + + NodeConfig { + moniker: format!("node-{}", test.nodes[i].id), + logging: LoggingConfig::default(), + consensus: ConsensusConfig { + max_block_size: ByteSize::mib(1), + value_payload: ValuePayload::default(), + timeouts: TimeoutConfig::default(), + p2p: P2pConfig { + transport, + protocol, + discovery: DiscoveryConfig::default(), + listen_addr: transport.multiaddr("127.0.0.1", test.consensus_base_port + i), + persistent_peers: (0..test.nodes.len()) + .filter(|j| i != *j) + .map(|j| transport.multiaddr("127.0.0.1", test.consensus_base_port + j)) + .collect(), + ..Default::default() + }, + }, + mempool: MempoolConfig { + p2p: P2pConfig { + transport, + protocol, + listen_addr: transport.multiaddr("127.0.0.1", test.mempool_base_port + i), + persistent_peers: (0..test.nodes.len()) + .filter(|j| i != *j) + .map(|j| transport.multiaddr("127.0.0.1", test.mempool_base_port + j)) + .collect(), + ..Default::default() + }, + max_tx_count: 10000, + gossip_batch_size: 100, + }, + sync: SyncConfig { + enabled: true, + status_update_interval: Duration::from_secs(2), + request_timeout: Duration::from_secs(5), + }, + metrics: MetricsConfig { + enabled: false, + listen_addr: format!("127.0.0.1:{}", test.metrics_base_port + i) + .parse() + .unwrap(), + }, + runtime: RuntimeConfig::single_threaded(), + test: TestConfig::default(), + } +} + +fn voting_powers(nodes: &[TestNode]) -> Vec { + nodes.iter().map(|node| node.voting_power).collect() +} + +pub fn make_validators(voting_powers: Vec) -> Vec<(Validator, PrivateKey)> { + let mut rng = StdRng::seed_from_u64(0x42); + + let mut validators = Vec::with_capacity(voting_powers.len()); + + for vp in voting_powers { + let sk = PrivateKey::generate(&mut rng); + let val = Validator::new(sk.public_key(), vp); + validators.push((val, sk)); + } + + validators +} + +use axum::routing::get; +use axum::Router; +use tokio::net::TcpListener; + +#[tracing::instrument(name = "metrics", skip_all)] +async fn serve_metrics(listen_addr: SocketAddr) { + let app = Router::new().route("/metrics", get(get_metrics)); + let listener = TcpListener::bind(listen_addr).await.unwrap(); + let address = listener.local_addr().unwrap(); + + async fn get_metrics() -> String { + let mut buf = String::new(); + malachitebft_metrics::export(&mut buf); + buf + } + + info!(%address, "Serving metrics"); + axum::serve(listener, app).await.unwrap(); +} diff --git a/code/crates/test/mempool/src/lib.rs b/code/crates/test/mempool/src/lib.rs index d39034561..32511671e 100644 --- a/code/crates/test/mempool/src/lib.rs +++ b/code/crates/test/mempool/src/lib.rs @@ -163,7 +163,7 @@ pub async fn spawn( let (tx_ctrl, rx_ctrl) = mpsc::channel(32); let peer_id = swarm.local_peer_id(); - let span = error_span!("mempool-network", peer = %peer_id); + let span = error_span!("mempool.network", peer = %peer_id); let task_handle = tokio::task::spawn(run(config, metrics, swarm, rx_ctrl, tx_event).instrument(span)); diff --git a/code/crates/test/src/node.rs b/code/crates/test/src/node.rs index 3c5b7d0e1..4fcbcf3c2 100644 --- a/code/crates/test/src/node.rs +++ b/code/crates/test/src/node.rs @@ -52,11 +52,8 @@ impl Node for TestNode { file } - fn load_private_key_file( - &self, - path: impl AsRef, - ) -> std::io::Result { - let private_key = std::fs::read_to_string(path)?; + fn load_private_key_file(&self) -> std::io::Result { + let private_key = std::fs::read_to_string(&self.private_key_file)?; serde_json::from_str(&private_key).map_err(|e| e.into()) } diff --git a/code/crates/test/tests/it/main.rs b/code/crates/test/tests/it/main.rs new file mode 100644 index 000000000..0638122f1 --- /dev/null +++ b/code/crates/test/tests/it/main.rs @@ -0,0 +1,7 @@ +mod n3f0; +mod n3f0_consensus_mode; +mod n3f0_pubsub_protocol; +mod value_sync; +mod vote_sync; +mod wal; +mod n3f1; diff --git a/code/crates/test/tests/it/n3f0.rs b/code/crates/test/tests/it/n3f0.rs new file mode 100644 index 000000000..17ba5f970 --- /dev/null +++ b/code/crates/test/tests/it/n3f0.rs @@ -0,0 +1,18 @@ +use std::time::Duration; + +use malachitebft_test_framework::{init_logging, TestBuilder}; + +#[tokio::test] +pub async fn all_correct_nodes() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + + test.build().run(Duration::from_secs(30)).await +} diff --git a/code/crates/test/tests/it/n3f0_consensus_mode.rs b/code/crates/test/tests/it/n3f0_consensus_mode.rs new file mode 100644 index 000000000..64b459ed8 --- /dev/null +++ b/code/crates/test/tests/it/n3f0_consensus_mode.rs @@ -0,0 +1,50 @@ +use std::time::Duration; + +use malachitebft_config::ValuePayload; +use malachitebft_test_framework::{init_logging, TestBuilder, TestParams}; + +async fn run_test(params: TestParams) { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + + test.build() + .run_with_custom_config(Duration::from_secs(30), params) + .await +} + +#[tokio::test] +pub async fn parts_only() { + let params = TestParams { + value_payload: ValuePayload::PartsOnly, + ..Default::default() + }; + + run_test(params).await +} + +#[tokio::test] +pub async fn proposal_only() { + let params = TestParams { + value_payload: ValuePayload::ProposalOnly, + ..Default::default() + }; + + run_test(params).await +} + +#[tokio::test] +pub async fn proposal_and_parts() { + let params = TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..Default::default() + }; + + run_test(params).await +} diff --git a/code/crates/test/tests/it/n3f0_pubsub_protocol.rs b/code/crates/test/tests/it/n3f0_pubsub_protocol.rs new file mode 100644 index 000000000..0bbf522e4 --- /dev/null +++ b/code/crates/test/tests/it/n3f0_pubsub_protocol.rs @@ -0,0 +1,77 @@ +use std::time::Duration; + +use bytesize::ByteSize; +use malachitebft_config::{GossipSubConfig, PubSubProtocol}; +use malachitebft_test_framework::{init_logging, TestBuilder, TestParams}; + +async fn run_test(params: TestParams) { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + + test.build() + .run_with_custom_config(Duration::from_secs(30), params) + .await +} + +#[tokio::test] +pub async fn broadcast_custom_config_1ktx() { + let params = TestParams { + enable_sync: false, + protocol: PubSubProtocol::Broadcast, + block_size: ByteSize::kib(1), + tx_size: ByteSize::kib(1), + txs_per_part: 1, + ..Default::default() + }; + + run_test(params).await +} + +#[tokio::test] +pub async fn broadcast_custom_config_2ktx() { + let params = TestParams { + enable_sync: false, + protocol: PubSubProtocol::Broadcast, + block_size: ByteSize::kib(2), + tx_size: ByteSize::kib(2), + txs_per_part: 1, + ..Default::default() + }; + + run_test(params).await +} + +#[tokio::test] +pub async fn gossip_custom_config_1ktx() { + let params = TestParams { + enable_sync: false, + protocol: PubSubProtocol::GossipSub(GossipSubConfig::default()), + block_size: ByteSize::kib(1), + tx_size: ByteSize::kib(1), + txs_per_part: 1, + ..Default::default() + }; + + run_test(params).await +} + +#[tokio::test] +pub async fn gossip_custom_config_2ktx() { + let params = TestParams { + enable_sync: false, + protocol: PubSubProtocol::GossipSub(GossipSubConfig::default()), + block_size: ByteSize::kib(2), + tx_size: ByteSize::kib(2), + txs_per_part: 1, + ..Default::default() + }; + + run_test(params).await +} diff --git a/code/crates/test/tests/it/n3f1.rs b/code/crates/test/tests/it/n3f1.rs new file mode 100644 index 000000000..95558c394 --- /dev/null +++ b/code/crates/test/tests/it/n3f1.rs @@ -0,0 +1,113 @@ +use std::time::Duration; + +use malachitebft_test_framework::{init_logging, TestBuilder}; + +#[tokio::test] +pub async fn proposer_fails_to_start() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().with_voting_power(1).success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(30)).await +} + +#[tokio::test] +pub async fn one_node_fails_to_start() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node().with_voting_power(1).success(); + + test.build().run(Duration::from_secs(30)).await +} + +#[tokio::test] +pub async fn proposer_crashes_at_height_2() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(1) + .start() + .wait_until(2) + .crash() + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(30)).await +} + +#[tokio::test] +pub async fn one_node_crashes_at_height_3() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(1) + .start() + .wait_until(3) + .crash() + .success(); + + test.build().run(Duration::from_secs(30)).await +} diff --git a/code/crates/test/tests/it/value_sync.rs b/code/crates/test/tests/it/value_sync.rs new file mode 100644 index 000000000..389ff43a8 --- /dev/null +++ b/code/crates/test/tests/it/value_sync.rs @@ -0,0 +1,202 @@ +use std::time::Duration; + +use malachitebft_config::ValuePayload; +use malachitebft_test_framework::{init_logging, TestBuilder, TestParams}; + +pub async fn crash_restart_from_start(params: TestParams) { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + // Node 1 starts with 10 voting power. + test.add_node() + .with_voting_power(10) + .start() + // Wait until it reaches height 10 + .wait_until(HEIGHT) + // Record a successful test for this node + .success(); + + // Node 2 starts with 10 voting power, in parallel with node 1 and with the same behaviour + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + // Node 3 starts with 5 voting power, in parallel with node 1 and 2. + test.add_node() + .with_voting_power(5) + .start() + // Wait until the node reaches height 2... + .wait_until(2) + // ...and then kills it + .crash() + // Reset the database so that the node has to do Sync from height 1 + .reset_db() + // After that, it waits 5 seconds before restarting the node + .restart_after(Duration::from_secs(5)) + // Wait until the node reached the expected height + .wait_until(HEIGHT) + // Record a successful test for this node + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), // Timeout for the whole test + TestParams { + enable_sync: true, // Enable Sync + ..params + }, + ) + .await +} + +#[tokio::test] +pub async fn crash_restart_from_start_parts_only() { + let params = TestParams { + value_payload: ValuePayload::PartsOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_start_proposal_only() { + let params = TestParams { + value_payload: ValuePayload::ProposalOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_start_proposal_and_parts() { + let params = TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_latest() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(2) + .crash() + // We do not reset the database so that the node can restart from the latest height + .restart_after(Duration::from_secs(5)) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: true, + ..Default::default() + }, + ) + .await +} + +#[tokio::test] +pub async fn aggressive_pruning() { + init_logging(module_path!()); + + const HEIGHT: u64 = 15; + + let mut test = TestBuilder::<()>::new(); + + // Node 1 starts with 10 voting power. + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(2) + .crash() + .reset_db() + .restart_after(Duration::from_secs(5)) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), // Timeout for the whole test + TestParams { + enable_sync: true, // Enable Sync + max_retain_blocks: 10, // Prune blocks older than 10 + ..Default::default() + }, + ) + .await +} + +#[tokio::test] +pub async fn start_late() { + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT * 2) + .success(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT * 2) + .success(); + + test.add_node() + .with_voting_power(5) + .start_after(1, Duration::from_secs(10)) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(30), + TestParams { + enable_sync: true, + ..Default::default() + }, + ) + .await +} diff --git a/code/crates/test/tests/it/vote_sync.rs b/code/crates/test/tests/it/vote_sync.rs new file mode 100644 index 000000000..e4b0da00b --- /dev/null +++ b/code/crates/test/tests/it/vote_sync.rs @@ -0,0 +1,137 @@ +use std::time::Duration; + +use malachitebft_config::ValuePayload; +use malachitebft_test_framework::{init_logging, TestBuilder, TestParams}; + +// NOTE: These tests are very similar to the Sync tests, with the difference that +// all nodes have the same voting power and therefore get stuck when one of them dies. + +pub async fn crash_restart_from_start(params: TestParams) { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + + test.add_node() + .start() + // Wait until the node reaches height 4... + .wait_until(4) + // ...then kill it + .crash() + // Reset the database so that the node has to do Sync from height 1 + .reset_db() + // After that, it waits 5 seconds before restarting the node + .restart_after(Duration::from_secs(5)) + // Expect a vote set request for height 4 + .expect_vote_set_request(4) + // Wait until the node reached the expected height + .wait_until(HEIGHT) + // Record a successful test for this node + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), // Timeout for the whole test + TestParams { + enable_sync: true, // Enable Sync + timeout_step: Duration::from_secs(5), + ..params + }, + ) + .await +} + +#[tokio::test] +pub async fn crash_restart_from_start_parts_only() { + let params = TestParams { + value_payload: ValuePayload::PartsOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_start_proposal_only() { + let params = TestParams { + value_payload: ValuePayload::ProposalOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_start_proposal_and_parts() { + let params = TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_latest() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node() + .start() + .wait_until(2) + .crash() + // We do not reset the database so that the node can restart from the latest height + .restart_after(Duration::from_secs(5)) + // Expect a vote set request for height 2 + .expect_vote_set_request(2) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: true, + timeout_step: Duration::from_secs(5), + ..Default::default() + }, + ) + .await +} + +#[tokio::test] +pub async fn start_late() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT * 2).success(); + test.add_node().start().wait_until(HEIGHT * 2).success(); + test.add_node() + .start_after(1, Duration::from_secs(10)) + // Expect a vote set request for height 1 + .expect_vote_set_request(1) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: true, + timeout_step: Duration::from_secs(5), + ..Default::default() + }, + ) + .await +} diff --git a/code/crates/test/tests/it/wal.rs b/code/crates/test/tests/it/wal.rs new file mode 100644 index 000000000..55f9b1bd1 --- /dev/null +++ b/code/crates/test/tests/it/wal.rs @@ -0,0 +1,195 @@ +use std::time::Duration; + +use eyre::bail; +use tracing::info; + +use informalsystems_malachitebft_test as malachitebft_test; + +use malachitebft_config::ValuePayload; +use malachitebft_core_consensus::ValueToPropose; +use malachitebft_core_types::SignedVote; +use malachitebft_engine::util::events::Event; +use malachitebft_test::TestContext; +use malachitebft_test_framework::{init_logging, HandlerResult, TestBuilder, TestParams}; + +#[tokio::test] +async fn proposer_crashes_after_proposing_parts_only() { + proposer_crashes_after_proposing(TestParams { + value_payload: ValuePayload::PartsOnly, + ..TestParams::default() + }) + .await +} + +#[tokio::test] +async fn proposer_crashes_after_proposing_proposal_and_parts() { + proposer_crashes_after_proposing(TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..TestParams::default() + }) + .await +} + +#[tokio::test] +async fn proposer_crashes_after_proposing_proposal_only() { + proposer_crashes_after_proposing(TestParams { + value_payload: ValuePayload::ProposalOnly, + ..TestParams::default() + }) + .await +} + +async fn proposer_crashes_after_proposing(params: TestParams) { + init_logging(module_path!()); + + #[derive(Clone, Debug, Default)] + struct State { + first_proposed_value: Option>, + } + + const CRASH_HEIGHT: u64 = 4; + + let mut test = TestBuilder::::new(); + + test.add_node().with_voting_power(10).start().success(); + test.add_node().with_voting_power(10).start().success(); + + test.add_node() + .with_voting_power(40) + .start() + .wait_until(CRASH_HEIGHT) + // Wait until this node proposes a value + .on_event(|event, state| match event { + Event::ProposedValue(value) => { + info!("Proposer proposed block: {:?}", value.value); + state.first_proposed_value = Some(value); + Ok(HandlerResult::ContinueTest) + } + _ => Ok(HandlerResult::WaitForNextEvent), + }) + // Crash right after + .crash() + // Restart after 5 seconds + .restart_after(Duration::from_secs(5)) + // Check that we replay messages from the WAL + .expect_wal_replay(CRASH_HEIGHT) + // Wait until it proposes a value again, while replaying WAL + // Check that it is the same value as the first time + .on_proposed_value(|value, state| { + let Some(first_value) = state.first_proposed_value.as_ref() else { + bail!("Proposer did not propose a block"); + }; + + if first_value.value == value.value { + info!("Proposer re-proposed the same block: {:?}", value.value); + Ok(HandlerResult::ContinueTest) + } else { + bail!( + "Proposer just equivocated: expected {:?}, got {:?}", + first_value.value, + value.value + ) + } + }) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: false, + ..params + }, + ) + .await +} + +#[tokio::test] +async fn non_proposer_crashes_after_voting_parts_only() { + non_proposer_crashes_after_voting(TestParams { + value_payload: ValuePayload::PartsOnly, + ..TestParams::default() + }) + .await +} + +#[tokio::test] +async fn non_proposer_crashes_after_voting_proposal_and_parts() { + non_proposer_crashes_after_voting(TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..TestParams::default() + }) + .await +} + +#[tokio::test] +async fn non_proposer_crashes_after_voting_proposal_only() { + non_proposer_crashes_after_voting(TestParams { + value_payload: ValuePayload::ProposalOnly, + ..TestParams::default() + }) + .await +} + +async fn non_proposer_crashes_after_voting(params: TestParams) { + init_logging(module_path!()); + + #[derive(Clone, Debug, Default)] + struct State { + first_vote: Option>, + } + + const CRASH_HEIGHT: u64 = 3; + + let mut test = TestBuilder::::new(); + + test.add_node() + .with_voting_power(40) + .start() + .wait_until(CRASH_HEIGHT) + // Wait until this node proposes a value + .on_vote(|vote, state| { + info!("Non-proposer voted"); + state.first_vote = Some(vote); + + Ok(HandlerResult::ContinueTest) + }) + // Crash right after + .crash() + // Restart after 5 seconds + .restart_after(Duration::from_secs(5)) + // Check that we replay messages from the WAL + .expect_wal_replay(CRASH_HEIGHT) + // Wait until it proposes a value again, while replaying WAL + // Check that it is the same value as the first time + .on_vote(|vote, state| { + let Some(first_vote) = state.first_vote.as_ref() else { + bail!("Non-proposer did not vote") + }; + + if first_vote.value == vote.value { + info!("Non-proposer voted the same way: {first_vote:?}"); + Ok(HandlerResult::ContinueTest) + } else { + bail!( + "Non-proposer just equivocated: expected {:?}, got {:?}", + first_vote.value, + vote.value + ) + } + }) + .success(); + + test.add_node().with_voting_power(10).start().success(); + test.add_node().with_voting_power(10).start().success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: false, + ..params + }, + ) + .await +} diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index 6f1d7589d..4c311e4d3 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -64,11 +64,8 @@ impl Node for App { file } - fn load_private_key_file( - &self, - path: impl AsRef, - ) -> std::io::Result { - let private_key = std::fs::read_to_string(path)?; + fn load_private_key_file(&self) -> std::io::Result { + let private_key = std::fs::read_to_string(&self.private_key_file)?; serde_json::from_str(&private_key).map_err(|e| e.into()) } @@ -95,7 +92,7 @@ impl Node for App { let span = tracing::error_span!("node", moniker = %self.config.moniker); let _enter = span.enter(); - let private_key_file = self.load_private_key_file(&self.private_key_file)?; + let private_key_file = self.load_private_key_file()?; let private_key = self.load_private_key(private_key_file); let public_key = self.get_public_key(&private_key); let address = self.get_address(&public_key); @@ -106,12 +103,11 @@ impl Node for App { let codec = ProtobufCodec; - let mut channels = malachitebft_app_channel::run( + let (mut channels, _handle) = malachitebft_app_channel::start_engine( ctx.clone(), codec, self.clone(), self.config.clone(), - self.private_key_file.clone(), self.start_height, initial_validator_set, ) From 83c5eedefc8ee2276af66635283efc3ddea5a165 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 10 Jan 2025 13:13:48 +0100 Subject: [PATCH 2/6] Improve logging in tests --- code/crates/engine/src/wal.rs | 13 ++++++++++++- code/crates/test/framework/src/lib.rs | 4 ++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/code/crates/engine/src/wal.rs b/code/crates/engine/src/wal.rs index e55d45b94..3b01fc3dd 100644 --- a/code/crates/engine/src/wal.rs +++ b/code/crates/engine/src/wal.rs @@ -185,6 +185,11 @@ where type Arguments = Args; type State = State; + #[tracing::instrument( + name = "wal.pre_start", + parent = &self.span, + skip_all, + )] async fn pre_start( &self, _myself: WalRef, @@ -196,7 +201,7 @@ where let (tx, rx) = mpsc::channel(100); // Spawn a system thread to perform blocking WAL operations. - let handle = self::thread::spawn(tracing::Span::current(), log, args.codec, rx); + let handle = self::thread::spawn(self.span.clone(), log, args.codec, rx); Ok(State { height: Ctx::Height::default(), @@ -224,6 +229,12 @@ where Ok(()) } + #[tracing::instrument( + name = "wal.post_stop", + parent = &self.span, + skip_all, + fields(height = %state.height), + )] async fn post_stop( &self, _: WalRef, diff --git a/code/crates/test/framework/src/lib.rs b/code/crates/test/framework/src/lib.rs index dcdd5a59b..e38625230 100644 --- a/code/crates/test/framework/src/lib.rs +++ b/code/crates/test/framework/src/lib.rs @@ -636,9 +636,9 @@ pub fn init_logging(test_module: &str) { .any(|(k, v)| std::env::var(k).as_deref() == Ok(v)); let directive = if enable_debug { - format!("{test_module}=debug,ractor=error,debug") + format!("{test_module}=debug,informalsystems_malachitebft=trace,informalsystems_malachitebft_discovery=error,libp2p=warn,ractor=warn") } else { - format!("{test_module}=debug,ractor=error,warn") + format!("{test_module}=debug,informalsystems_malachitebft=info,informalsystems_malachitebft_discovery=error,libp2p=warn,ractor=warn") }; let filter = EnvFilter::builder().parse(directive).unwrap(); From 52dd28d52bb719f486f7262ae58c0a8e1a62a805 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 10 Jan 2025 13:57:57 +0100 Subject: [PATCH 3/6] Better error handling --- code/crates/starknet/host/src/actor.rs | 1 + code/crates/test/app/src/app.rs | 22 ++++++------- code/crates/test/app/src/node.rs | 17 ++++++---- code/crates/test/app/src/state.rs | 43 ++++++++++++++++---------- code/examples/channel/src/state.rs | 15 ++++----- 5 files changed, 56 insertions(+), 42 deletions(-) diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index e47756fbe..1853042bd 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -180,6 +180,7 @@ impl Host { value_bytes, reply_to, } => on_process_synced_value(value_bytes, height, round, validator_address, reply_to), + HostMsg::PeerJoined { peer_id } => { debug!(%peer_id, "Peer joined the network"); Ok(()) diff --git a/code/crates/test/app/src/app.rs b/code/crates/test/app/src/app.rs index affc72d21..8656b669e 100644 --- a/code/crates/test/app/src/app.rs +++ b/code/crates/test/app/src/app.rs @@ -181,19 +181,17 @@ pub async fn run( info!(%height, %round, "Processing synced value"); let value = decode_value(value_bytes); + let proposal = ProposedValue { + height, + round, + valid_round: Round::Nil, + proposer, + value, + validity: Validity::Valid, + extension: None, + }; - if reply - .send(ProposedValue { - height, - round, - valid_round: Round::Nil, - proposer, - value, - validity: Validity::Valid, - extension: None, - }) - .is_err() - { + if reply.send(proposal).is_err() { error!("Failed to send ProcessSyncedValue reply"); } } diff --git a/code/crates/test/app/src/node.rs b/code/crates/test/app/src/node.rs index 62bddfb09..e2b1a721d 100644 --- a/code/crates/test/app/src/node.rs +++ b/code/crates/test/app/src/node.rs @@ -9,7 +9,7 @@ use tokio::task::JoinHandle; use tracing::Instrument; use malachitebft_app_channel::app::events::TxEvent; -use malachitebft_app_channel::app::types::config::Config; +use malachitebft_app_channel::app::types::config::Config; // TODO: Move into test app use malachitebft_app_channel::app::types::core::VotingPower; use malachitebft_app_channel::app::types::Keypair; use malachitebft_app_channel::app::Node; @@ -26,7 +26,7 @@ use crate::state::State; use crate::store::Store; pub struct Handles { - pub app: JoinHandle>, + pub app: JoinHandle<()>, pub engine: EngineHandle, pub tx_event: TxEvent, } @@ -71,15 +71,20 @@ impl App { drop(_guard); + let config = self.config.clone(); let store = Store::open(self.get_home_dir().join("store.db"))?; let start_height = self.start_height.unwrap_or_default(); - let mut state = State::new(ctx, address, start_height, store); + let mut state = State::new(ctx, config, address, start_height, store); let tx_event = channels.events.clone(); let app_handle = tokio::spawn( - async move { crate::app::run(genesis, &mut state, &mut channels).await } - .instrument(span), + async move { + if let Err(e) = crate::app::run(genesis, &mut state, &mut channels).await { + tracing::error!("Application has failed with an error: {e}"); + } + } + .instrument(span), ); Ok(Handles { @@ -148,6 +153,6 @@ impl Node for App { async fn run(self) -> eyre::Result<()> { let handles = self.start().await?; - handles.app.await? + handles.app.await.map_err(Into::into) } } diff --git a/code/crates/test/app/src/state.rs b/code/crates/test/app/src/state.rs index d2c521d32..e4dc83356 100644 --- a/code/crates/test/app/src/state.rs +++ b/code/crates/test/app/src/state.rs @@ -4,15 +4,17 @@ use std::collections::HashSet; use bytes::Bytes; +use eyre::eyre; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use sha3::Digest; -use tracing::{debug, error}; +use tracing::debug; use malachitebft_app_channel::app::consensus::ProposedValue; use malachitebft_app_channel::app::host::LocallyProposedValue; use malachitebft_app_channel::app::streaming::{StreamContent, StreamMessage}; use malachitebft_app_channel::app::types::codec::Codec; +use malachitebft_app_channel::app::types::config::Config; // TODO: Move into test app use malachitebft_app_channel::app::types::core::{CommitCertificate, Round, Validity}; use malachitebft_app_channel::app::types::PeerId; use malachitebft_test::codec::proto::ProtobufCodec; @@ -26,17 +28,17 @@ use crate::streaming::{PartStreamsMap, ProposalParts}; /// Represents the internal state of the application node /// Contains information about current height, round, proposals and blocks pub struct State { - ctx: TestContext, - address: Address, - store: Store, - stream_id: u64, - streams_map: PartStreamsMap, - rng: StdRng, - + pub ctx: TestContext, + pub config: Config, + pub address: Address, pub current_height: Height, pub current_round: Round, pub current_proposer: Option
, pub peers: HashSet, + store: Store, + stream_id: u64, + streams_map: PartStreamsMap, + rng: StdRng, } // Make up a seed for the rng based on our address in @@ -53,14 +55,21 @@ fn seed_from_address(address: &Address) -> u64 { impl State { /// Creates a new State instance with the given validator address and starting height - pub fn new(ctx: TestContext, address: Address, height: Height, store: Store) -> Self { + pub fn new( + ctx: TestContext, + config: Config, + address: Address, + height: Height, + store: Store, + ) -> Self { Self { ctx, + config, + address, + store, current_height: height, current_round: Round::new(0), current_proposer: None, - address, - store, stream_id: 0, streams_map: PartStreamsMap::new(), rng: StdRng::seed_from_u64(seed_from_address(&address)), @@ -128,12 +137,12 @@ impl State { .get_undecided_proposal(certificate.height, certificate.round) .await else { - error!( - height = %certificate.height, - "Trying to commit a value that is not decided" - ); - - return Ok(()); // FIXME + return Err(eyre!( + "Trying to commit a value at height {} and round {} that is not decided: {}", + certificate.height, + certificate.round, + certificate.value_id + )); }; self.store diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index d2c521d32..2d67ded4a 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -4,10 +4,11 @@ use std::collections::HashSet; use bytes::Bytes; +use eyre::eyre; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use sha3::Digest; -use tracing::{debug, error}; +use tracing::debug; use malachitebft_app_channel::app::consensus::ProposedValue; use malachitebft_app_channel::app::host::LocallyProposedValue; @@ -128,12 +129,12 @@ impl State { .get_undecided_proposal(certificate.height, certificate.round) .await else { - error!( - height = %certificate.height, - "Trying to commit a value that is not decided" - ); - - return Ok(()); // FIXME + return Err(eyre!( + "Trying to commit a value at height {} and round {} that is not decided: {}", + certificate.height, + certificate.round, + certificate.value_id + )); }; self.store From 32eca8dc5a568725d8c0289d291414e383f50d5c Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 10 Jan 2025 13:59:53 +0100 Subject: [PATCH 4/6] Disable `proposal_only` consensus mode test --- .../starknet/test/src/tests/n3f0_consensus_mode.rs | 11 +++++++---- code/crates/test/tests/it/n3f0_consensus_mode.rs | 11 +++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/code/crates/starknet/test/src/tests/n3f0_consensus_mode.rs b/code/crates/starknet/test/src/tests/n3f0_consensus_mode.rs index 468a18c6f..3dcdcc6b6 100644 --- a/code/crates/starknet/test/src/tests/n3f0_consensus_mode.rs +++ b/code/crates/starknet/test/src/tests/n3f0_consensus_mode.rs @@ -31,21 +31,24 @@ pub async fn parts_only() { } #[tokio::test] -pub async fn proposal_only() { +pub async fn proposal_and_parts() { let params = TestParams { - value_payload: ValuePayload::ProposalOnly, + value_payload: ValuePayload::ProposalAndParts, ..Default::default() }; run_test(params).await } +// This functionality is not fully implemented yet #[tokio::test] -pub async fn proposal_and_parts() { +#[ignore] +pub async fn proposal_only() { let params = TestParams { - value_payload: ValuePayload::ProposalAndParts, + value_payload: ValuePayload::ProposalOnly, ..Default::default() }; run_test(params).await } + diff --git a/code/crates/test/tests/it/n3f0_consensus_mode.rs b/code/crates/test/tests/it/n3f0_consensus_mode.rs index 64b459ed8..4091a22bd 100644 --- a/code/crates/test/tests/it/n3f0_consensus_mode.rs +++ b/code/crates/test/tests/it/n3f0_consensus_mode.rs @@ -30,21 +30,24 @@ pub async fn parts_only() { } #[tokio::test] -pub async fn proposal_only() { +pub async fn proposal_and_parts() { let params = TestParams { - value_payload: ValuePayload::ProposalOnly, + value_payload: ValuePayload::ProposalAndParts, ..Default::default() }; run_test(params).await } +// This functionality is not fully implemented yet #[tokio::test] -pub async fn proposal_and_parts() { +#[ignore] +pub async fn proposal_only() { let params = TestParams { - value_payload: ValuePayload::ProposalAndParts, + value_payload: ValuePayload::ProposalOnly, ..Default::default() }; run_test(params).await } + From 1d49792cba32c9cebaaa91d26db5e9b7e389d5ec Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 10 Jan 2025 14:37:48 +0100 Subject: [PATCH 5/6] Formatting --- code/crates/starknet/test/src/tests/n3f0_consensus_mode.rs | 1 - code/crates/test/tests/it/main.rs | 2 +- code/crates/test/tests/it/n3f0_consensus_mode.rs | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/code/crates/starknet/test/src/tests/n3f0_consensus_mode.rs b/code/crates/starknet/test/src/tests/n3f0_consensus_mode.rs index 3dcdcc6b6..585a49697 100644 --- a/code/crates/starknet/test/src/tests/n3f0_consensus_mode.rs +++ b/code/crates/starknet/test/src/tests/n3f0_consensus_mode.rs @@ -51,4 +51,3 @@ pub async fn proposal_only() { run_test(params).await } - diff --git a/code/crates/test/tests/it/main.rs b/code/crates/test/tests/it/main.rs index 0638122f1..7d44b8a6a 100644 --- a/code/crates/test/tests/it/main.rs +++ b/code/crates/test/tests/it/main.rs @@ -1,7 +1,7 @@ mod n3f0; mod n3f0_consensus_mode; mod n3f0_pubsub_protocol; +mod n3f1; mod value_sync; mod vote_sync; mod wal; -mod n3f1; diff --git a/code/crates/test/tests/it/n3f0_consensus_mode.rs b/code/crates/test/tests/it/n3f0_consensus_mode.rs index 4091a22bd..379fde7a4 100644 --- a/code/crates/test/tests/it/n3f0_consensus_mode.rs +++ b/code/crates/test/tests/it/n3f0_consensus_mode.rs @@ -50,4 +50,3 @@ pub async fn proposal_only() { run_test(params).await } - From 770f2bdc79012c5e3d9a521f64dffff51d412936 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 10 Jan 2025 14:39:17 +0100 Subject: [PATCH 6/6] Fix test suite --- .github/workflows/rust.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 7f5e9073f..02aca6a76 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -55,6 +55,7 @@ jobs: --all-features \ --no-fail-fast \ --failure-output final \ + --exclude informalsystems-malachitebft-test \ --exclude informalsystems-malachitebft-starknet-test \ --exclude informalsystems-malachitebft-discovery-test @@ -91,7 +92,7 @@ jobs: run: | cargo maelstrom \ --slots 16 \ - --include 'package.match(informalsystems-malachitebft-starknet-test) || package.match(informalsystems-malachitebft-discovery-test)' + --include 'package.match(informalsystems-malachitebft-test) || package.match(informalsystems-malachitebft-starknet-test) || package.match(informalsystems-malachitebft-discovery-test)' clippy: name: Clippy