Skip to content

Commit

Permalink
feat(code): Add ValueBuilder test params to the config (#206)
Browse files Browse the repository at this point in the history
These values can be overriden using the following environment variables:
- Override with MALACHITE__TEST__TXS_PER_PART
- MALACHITE__TEST__TIME_ALLOWANCE_FACTOR
- MALACHITE__TEST__EXEC_TIME_PER_PART

Timeouts can also be overriden using the following environment variables:
- MALACHITE__CONSENSUS__TIMEOUT_PROPOSE
- MALACHITE__CONSENSUS__TIMEOUT_PREVOTE
- MALACHITE__CONSENSUS__TIMEOUT_PRECOMMIT
- MALACHITE__CONSENSUS__TIMEOUT_COMMIT
  • Loading branch information
romac authored Jun 5, 2024
1 parent 2ae7f78 commit 16b589f
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 16 deletions.
1 change: 1 addition & 0 deletions code/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ ed25519-consensus = "2.1.0"
futures = "0.3"
glob = "0.3.0"
hex = { version = "0.4.3", features = ["serde"] }
humantime = "2.1.0"
humantime-serde = "1.1.1"
itertools = "0.13"
itf = "0.2.3"
Expand Down
6 changes: 5 additions & 1 deletion code/actors/src/util/make_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ pub async fn make_node_actor(
let ctx = TestContext::new(validator_pk.clone());

// Spawn the host actor
let value_builder = Box::new(TestValueBuilder::<TestContext>::new(mempool.clone()));
let value_builder = Box::new(TestValueBuilder::<TestContext>::new(
mempool.clone(),
cfg.test.into(),
));

let host = Host::spawn(
value_builder,
PartStore::new(),
Expand Down
45 changes: 31 additions & 14 deletions code/actors/src/util/value_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,55 @@ pub trait ValueBuilder<Ctx: Context>: Send + Sync + 'static {
}

pub mod test {
// TODO - parameterize
// If based on the propose_timeout and the constants below we end up with more than 300 parts then consensus
// is never reached in a round and we keep moving to the next one.
const NUM_TXES_PER_PART: u64 = 400;
const TIME_ALLOWANCE_FACTOR: f32 = 0.5;
const EXEC_TIME_MICROSEC_PER_PART: u64 = 100000;

use std::marker::PhantomData;

use malachite_node::config::TestConfig;
use ractor::ActorRef;

use malachite_common::Context;
use malachite_driver::Validity;
use malachite_test::{
Address, BlockMetadata, BlockPart, Content, Height, TestContext, TransactionBatch, Value,
};
use ractor::ActorRef;

use crate::mempool::Msg as MempoolMsg;

use super::*;

#[derive(Copy, Clone, Debug)]
pub struct TestParams {
pub txs_per_part: u64,
pub time_allowance_factor: f32,
pub exec_time_per_part: Duration,
}

impl From<TestConfig> for TestParams {
fn from(cfg: TestConfig) -> Self {
Self {
txs_per_part: cfg.txs_per_part,
time_allowance_factor: cfg.time_allowance_factor,
exec_time_per_part: cfg.exec_time_per_part,
}
}
}

#[derive(Clone)]
pub struct TestValueBuilder<Ctx: Context> {
_phantom: PhantomData<Ctx>,
tx_streamer: ActorRef<crate::mempool::Msg>,
tx_streamer: ActorRef<MempoolMsg>,
params: TestParams,
}

impl<Ctx> TestValueBuilder<Ctx>
where
Ctx: Context,
{
pub fn new(tx_streamer: ActorRef<crate::mempool::Msg>) -> Self {
pub fn new(tx_streamer: ActorRef<MempoolMsg>, params: TestParams) -> Self {
Self {
_phantom: Default::default(),
tx_streamer,
params,
}
}
}
Expand All @@ -85,7 +102,7 @@ pub mod test {
part_store: &mut PartStore<TestContext>,
) -> Option<LocallyProposedValue<TestContext>> {
let now = Instant::now();
let deadline = now + timeout_duration.mul_f32(TIME_ALLOWANCE_FACTOR);
let deadline = now + timeout_duration.mul_f32(self.params.time_allowance_factor);
let expiration_time = now + timeout_duration;

let mut tx_batch = vec![];
Expand All @@ -103,9 +120,9 @@ pub mod test {
let mut txes = self
.tx_streamer
.call(
|reply| crate::mempool::Msg::TxStream {
|reply| MempoolMsg::TxStream {
height: height.as_u64(),
num_txes: NUM_TXES_PER_PART,
num_txes: self.params.txs_per_part,
reply,
},
None,
Expand Down Expand Up @@ -134,7 +151,7 @@ pub mod test {
.unwrap();

// Simulate execution
tokio::time::sleep(Duration::from_micros(EXEC_TIME_MICROSEC_PER_PART)).await;
tokio::time::sleep(self.params.exec_time_per_part).await;
tx_batch.append(&mut txes);

sequence += 1;
Expand Down Expand Up @@ -200,7 +217,7 @@ pub mod test {

// Simulate Tx execution and proof verification (assumes success)
// TODO - add config knob for invalid blocks
tokio::time::sleep(Duration::from_micros(EXEC_TIME_MICROSEC_PER_PART)).await;
tokio::time::sleep(self.params.exec_time_per_part).await;

// Get the "last" part, the one with highest sequence.
// Block parts may not be received in order.
Expand Down
1 change: 1 addition & 0 deletions code/actors/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub async fn run_test<const N: usize>(test: Test<N>) {
.collect(),
},
},
test: Default::default(),
};

let node = tokio::spawn(make_node_actor(
Expand Down
1 change: 1 addition & 0 deletions code/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ malachite-test.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
color-eyre = { workspace = true }
directories = { workspace = true }
humantime = { workspace = true }
itertools = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
Expand Down
47 changes: 46 additions & 1 deletion code/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ impl Args {
pub fn load_config(&self) -> Result<Config> {
let config_file = self.get_config_file_path()?;
info!("Loading configuration from {:?}", config_file.display());
load_toml_file(&config_file)
let mut config = load_toml_file(&config_file)?;
override_config_from_env(&mut config)?;
Ok(config)
}

/// load_genesis returns the validator set from the genesis file
Expand Down Expand Up @@ -155,6 +157,49 @@ where
.wrap_err_with(|| eyre!("Failed to load configuration at {}", file.display(),))
}

fn override_config_from_env(config: &mut Config) -> Result<()> {
use std::env;

if let Ok(timeout_propose) = env::var("MALACHITE__CONSENSUS__TIMEOUT_PROPOSE") {
config.consensus.timeouts.timeout_propose = humantime::parse_duration(&timeout_propose)
.wrap_err("Invalid MALACHITE__CONSENSUS__TIMEOUT_PROPOSE")?;
}

if let Ok(timeout_prevote) = env::var("MALACHITE__CONSENSUS__TIMEOUT_PREVOTE") {
config.consensus.timeouts.timeout_prevote = humantime::parse_duration(&timeout_prevote)
.wrap_err("Invalid MALACHITE__CONSENSUS__TIMEOUT_PREVOTE")?;
}

if let Ok(timeout_precommit) = env::var("MALACHITE__CONSENSUS__TIMEOUT_PRECOMMIT") {
config.consensus.timeouts.timeout_precommit = humantime::parse_duration(&timeout_precommit)
.wrap_err("Invalid MALACHITE__CONSENSUS__TIMEOUT_PRECOMMIT")?;
}

if let Ok(timeout_commit) = env::var("MALACHITE__CONSENSUS__TIMEOUT_COMMIT") {
config.consensus.timeouts.timeout_commit = humantime::parse_duration(&timeout_commit)
.wrap_err("Invalid MALACHITE__CONSENSUS__TIMEOUT_COMMIT")?;
}

if let Ok(txs_per_part) = env::var("MALACHITE__TEST__TXS_PER_PART") {
config.test.txs_per_part = txs_per_part
.parse()
.wrap_err("Invalid MALACHITE__TEST__TXS_PER_PART")?;
}

if let Ok(time_allowance_factor) = env::var("MALACHITE__TEST__TIME_ALLOWANCE_FACTOR") {
config.test.time_allowance_factor = time_allowance_factor
.parse()
.wrap_err("Invalid MALACHITE__TEST__TIME_ALLOWANCE_FACTOR")?;
}

if let Ok(exec_time_per_part) = env::var("MALACHITE__TEST__EXEC_TIME_PER_PART") {
config.test.exec_time_per_part = humantime::parse_duration(&exec_time_per_part)
.wrap_err("Invalid MALACHITE__TEST__EXEC_TIME_PER_PART")?;
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions code/cli/src/cmd/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,6 @@ pub fn generate_config(index: usize, total: usize) -> Config {
.collect(),
},
},
test: Default::default(),
}
}
7 changes: 7 additions & 0 deletions code/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ listen_addr = "/ip4/0.0.0.0/udp/0/quic-v1"
# List of nodes to keep persistent connections to
persistent_peers = []

[test]
# Override with MALACHITE__TEST__TXS_PER_PART env variable
txs_per_part = 400
# Override with MALACHITE__TEST__TIME_ALLOWANCE_FACTOR env variable
time_allowance_factor = 0.5
# Override with MALACHITE__TEST__EXEC_TIME_PER_PART env variable
exec_time_per_part = 1000
24 changes: 24 additions & 0 deletions code/node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ use serde::{Deserialize, Serialize};
pub struct Config {
/// A custom human-readable name for this node
pub moniker: String,

/// Consensus configuration options
pub consensus: ConsensusConfig,

/// Mempool configuration options
pub mempool: MempoolConfig,

/// Test configuration
#[serde(default)]
pub test: TestConfig,
}

/// P2P configuration options
Expand Down Expand Up @@ -109,3 +115,21 @@ impl Default for TimeoutConfig {
}
}
}

#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct TestConfig {
pub txs_per_part: u64,
pub time_allowance_factor: f32,
#[serde(with = "humantime_serde")]
pub exec_time_per_part: Duration,
}

impl Default for TestConfig {
fn default() -> Self {
Self {
txs_per_part: 400,
time_allowance_factor: 0.5,
exec_time_per_part: Duration::from_micros(100000),
}
}
}

0 comments on commit 16b589f

Please sign in to comment.