diff --git a/code/Cargo.toml b/code/Cargo.toml index 835213861..cebfd3c34 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -55,6 +55,7 @@ ed25519-consensus = "2.1.0" futures = "0.3" glob = "0.3.0" hex = { version = "0.4.3", features = ["serde"] } +humantime = "2.1.0" humantime-serde = "1.1.1" itertools = "0.13" itf = "0.2.3" diff --git a/code/actors/src/util/make_actor.rs b/code/actors/src/util/make_actor.rs index 598792646..67f340a22 100644 --- a/code/actors/src/util/make_actor.rs +++ b/code/actors/src/util/make_actor.rs @@ -48,7 +48,11 @@ pub async fn make_node_actor( let ctx = TestContext::new(validator_pk.clone()); // Spawn the host actor - let value_builder = Box::new(TestValueBuilder::::new(mempool.clone())); + let value_builder = Box::new(TestValueBuilder::::new( + mempool.clone(), + cfg.test.into(), + )); + let host = Host::spawn( value_builder, PartStore::new(), diff --git a/code/actors/src/util/value_builder.rs b/code/actors/src/util/value_builder.rs index a25227858..920cb0f08 100644 --- a/code/actors/src/util/value_builder.rs +++ b/code/actors/src/util/value_builder.rs @@ -37,38 +37,55 @@ pub trait ValueBuilder: Send + Sync + 'static { } pub mod test { - // TODO - parameterize - // If based on the propose_timeout and the constants below we end up with more than 300 parts then consensus - // is never reached in a round and we keep moving to the next one. - const NUM_TXES_PER_PART: u64 = 400; - const TIME_ALLOWANCE_FACTOR: f32 = 0.5; - const EXEC_TIME_MICROSEC_PER_PART: u64 = 100000; use std::marker::PhantomData; + use malachite_node::config::TestConfig; + use ractor::ActorRef; + use malachite_common::Context; use malachite_driver::Validity; use malachite_test::{ Address, BlockMetadata, BlockPart, Content, Height, TestContext, TransactionBatch, Value, }; - use ractor::ActorRef; + + use crate::mempool::Msg as MempoolMsg; use super::*; + #[derive(Copy, Clone, Debug)] + pub struct TestParams { + pub txs_per_part: u64, + pub time_allowance_factor: f32, + pub exec_time_per_part: Duration, + } + + impl From for TestParams { + fn from(cfg: TestConfig) -> Self { + Self { + txs_per_part: cfg.txs_per_part, + time_allowance_factor: cfg.time_allowance_factor, + exec_time_per_part: cfg.exec_time_per_part, + } + } + } + #[derive(Clone)] pub struct TestValueBuilder { _phantom: PhantomData, - tx_streamer: ActorRef, + tx_streamer: ActorRef, + params: TestParams, } impl TestValueBuilder where Ctx: Context, { - pub fn new(tx_streamer: ActorRef) -> Self { + pub fn new(tx_streamer: ActorRef, params: TestParams) -> Self { Self { _phantom: Default::default(), tx_streamer, + params, } } } @@ -85,7 +102,7 @@ pub mod test { part_store: &mut PartStore, ) -> Option> { let now = Instant::now(); - let deadline = now + timeout_duration.mul_f32(TIME_ALLOWANCE_FACTOR); + let deadline = now + timeout_duration.mul_f32(self.params.time_allowance_factor); let expiration_time = now + timeout_duration; let mut tx_batch = vec![]; @@ -103,9 +120,9 @@ pub mod test { let mut txes = self .tx_streamer .call( - |reply| crate::mempool::Msg::TxStream { + |reply| MempoolMsg::TxStream { height: height.as_u64(), - num_txes: NUM_TXES_PER_PART, + num_txes: self.params.txs_per_part, reply, }, None, @@ -134,7 +151,7 @@ pub mod test { .unwrap(); // Simulate execution - tokio::time::sleep(Duration::from_micros(EXEC_TIME_MICROSEC_PER_PART)).await; + tokio::time::sleep(self.params.exec_time_per_part).await; tx_batch.append(&mut txes); sequence += 1; @@ -200,7 +217,7 @@ pub mod test { // Simulate Tx execution and proof verification (assumes success) // TODO - add config knob for invalid blocks - tokio::time::sleep(Duration::from_micros(EXEC_TIME_MICROSEC_PER_PART)).await; + tokio::time::sleep(self.params.exec_time_per_part).await; // Get the "last" part, the one with highest sequence. // Block parts may not be received in order. diff --git a/code/actors/tests/util.rs b/code/actors/tests/util.rs index 65712b249..74009148c 100644 --- a/code/actors/tests/util.rs +++ b/code/actors/tests/util.rs @@ -149,6 +149,7 @@ pub async fn run_test(test: Test) { .collect(), }, }, + test: Default::default(), }; let node = tokio::spawn(make_node_actor( diff --git a/code/cli/Cargo.toml b/code/cli/Cargo.toml index 2850ebd3e..30a61025b 100644 --- a/code/cli/Cargo.toml +++ b/code/cli/Cargo.toml @@ -17,6 +17,7 @@ malachite-test.workspace = true clap = { workspace = true, features = ["derive", "env"] } color-eyre = { workspace = true } directories = { workspace = true } +humantime = { workspace = true } itertools = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/code/cli/src/args.rs b/code/cli/src/args.rs index e9bf4ca9c..96c7e9bf6 100644 --- a/code/cli/src/args.rs +++ b/code/cli/src/args.rs @@ -113,7 +113,9 @@ impl Args { pub fn load_config(&self) -> Result { let config_file = self.get_config_file_path()?; info!("Loading configuration from {:?}", config_file.display()); - load_toml_file(&config_file) + let mut config = load_toml_file(&config_file)?; + override_config_from_env(&mut config)?; + Ok(config) } /// load_genesis returns the validator set from the genesis file @@ -155,6 +157,49 @@ where .wrap_err_with(|| eyre!("Failed to load configuration at {}", file.display(),)) } +fn override_config_from_env(config: &mut Config) -> Result<()> { + use std::env; + + if let Ok(timeout_propose) = env::var("MALACHITE__CONSENSUS__TIMEOUT_PROPOSE") { + config.consensus.timeouts.timeout_propose = humantime::parse_duration(&timeout_propose) + .wrap_err("Invalid MALACHITE__CONSENSUS__TIMEOUT_PROPOSE")?; + } + + if let Ok(timeout_prevote) = env::var("MALACHITE__CONSENSUS__TIMEOUT_PREVOTE") { + config.consensus.timeouts.timeout_prevote = humantime::parse_duration(&timeout_prevote) + .wrap_err("Invalid MALACHITE__CONSENSUS__TIMEOUT_PREVOTE")?; + } + + if let Ok(timeout_precommit) = env::var("MALACHITE__CONSENSUS__TIMEOUT_PRECOMMIT") { + config.consensus.timeouts.timeout_precommit = humantime::parse_duration(&timeout_precommit) + .wrap_err("Invalid MALACHITE__CONSENSUS__TIMEOUT_PRECOMMIT")?; + } + + if let Ok(timeout_commit) = env::var("MALACHITE__CONSENSUS__TIMEOUT_COMMIT") { + config.consensus.timeouts.timeout_commit = humantime::parse_duration(&timeout_commit) + .wrap_err("Invalid MALACHITE__CONSENSUS__TIMEOUT_COMMIT")?; + } + + if let Ok(txs_per_part) = env::var("MALACHITE__TEST__TXS_PER_PART") { + config.test.txs_per_part = txs_per_part + .parse() + .wrap_err("Invalid MALACHITE__TEST__TXS_PER_PART")?; + } + + if let Ok(time_allowance_factor) = env::var("MALACHITE__TEST__TIME_ALLOWANCE_FACTOR") { + config.test.time_allowance_factor = time_allowance_factor + .parse() + .wrap_err("Invalid MALACHITE__TEST__TIME_ALLOWANCE_FACTOR")?; + } + + if let Ok(exec_time_per_part) = env::var("MALACHITE__TEST__EXEC_TIME_PER_PART") { + config.test.exec_time_per_part = humantime::parse_duration(&exec_time_per_part) + .wrap_err("Invalid MALACHITE__TEST__EXEC_TIME_PER_PART")?; + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/code/cli/src/cmd/testnet.rs b/code/cli/src/cmd/testnet.rs index 71d7f2638..d0b6a55ad 100644 --- a/code/cli/src/cmd/testnet.rs +++ b/code/cli/src/cmd/testnet.rs @@ -131,5 +131,6 @@ pub fn generate_config(index: usize, total: usize) -> Config { .collect(), }, }, + test: Default::default(), } } diff --git a/code/config.toml b/code/config.toml index 1dbe39467..bab280325 100644 --- a/code/config.toml +++ b/code/config.toml @@ -50,3 +50,10 @@ listen_addr = "/ip4/0.0.0.0/udp/0/quic-v1" # List of nodes to keep persistent connections to persistent_peers = [] +[test] +# Override with MALACHITE__TEST__TXS_PER_PART env variable +txs_per_part = 400 +# Override with MALACHITE__TEST__TIME_ALLOWANCE_FACTOR env variable +time_allowance_factor = 0.5 +# Override with MALACHITE__TEST__EXEC_TIME_PER_PART env variable +exec_time_per_part = 1000 diff --git a/code/node/src/config.rs b/code/node/src/config.rs index f9e397f07..04f3641be 100644 --- a/code/node/src/config.rs +++ b/code/node/src/config.rs @@ -9,10 +9,16 @@ use serde::{Deserialize, Serialize}; pub struct Config { /// A custom human-readable name for this node pub moniker: String, + /// Consensus configuration options pub consensus: ConsensusConfig, + /// Mempool configuration options pub mempool: MempoolConfig, + + /// Test configuration + #[serde(default)] + pub test: TestConfig, } /// P2P configuration options @@ -109,3 +115,21 @@ impl Default for TimeoutConfig { } } } + +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub struct TestConfig { + pub txs_per_part: u64, + pub time_allowance_factor: f32, + #[serde(with = "humantime_serde")] + pub exec_time_per_part: Duration, +} + +impl Default for TestConfig { + fn default() -> Self { + Self { + txs_per_part: 400, + time_allowance_factor: 0.5, + exec_time_per_part: Duration::from_micros(100000), + } + } +}