Skip to content

Commit

Permalink
Add a facility for making faulty nodes and + integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Mar 5, 2024
1 parent 04fb1fd commit f627267
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 45 deletions.
1 change: 1 addition & 0 deletions code/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["fmt"] }
ractor = { workspace = true, features = ["async-trait"] }
ractor_actors = { workspace = true, default-features = false, features = ["streams"] }
rand = { workspace = true }

[dev-dependencies]
malachite-test.workspace = true
185 changes: 185 additions & 0 deletions code/node/src/actors/faulty_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#![allow(dead_code, unused_variables)]

use std::fmt::Display;
use std::time::Duration;

use ractor::Actor;
use ractor::ActorProcessingErr;
use ractor::ActorRef;

use malachite_common::Context;
use malachite_proto::{self as proto, Protobuf};
use rand::seq::IteratorRandom;
use rand::Rng;
use tracing::warn;

use crate::actors::node::Msg;
use crate::actors::node::Node;
use crate::actors::node::State;

pub type Prob = f64;

#[derive(Clone, Debug)]
pub struct Faults {
faults: Vec<Fault>,
}

impl Faults {
pub fn new(faults: Vec<Fault>) -> Self {
Self { faults }
}

pub fn choose_fault_for_msg<Ctx>(
&self,
msg: &Msg<Ctx>,
rng: &mut dyn rand::RngCore,
) -> Option<&Fault>
where
Ctx: Context,
{
self.faults
.iter()
.filter(|f| f.applies_to_msg(msg))
.choose_stable(rng)
.filter(|fault| fault.is_enabled(rng))
}
}

#[derive(Copy, Clone, Debug)]
pub enum Fault {
DiscardGossipEvent(Prob),
DelayStartHeight(Prob, Duration),
DelayMoveToNextHeight(Prob, Duration),
DelayProposeValue(Prob, Duration),
}

impl Fault {
pub fn is_enabled(&self, rng: &mut dyn rand::RngCore) -> bool {
match self {
Fault::DiscardGossipEvent(prob) => rng.gen_bool(*prob),
Fault::DelayStartHeight(prob, _) => rng.gen_bool(*prob),
Fault::DelayMoveToNextHeight(prob, _) => rng.gen_bool(*prob),
Fault::DelayProposeValue(prob, _) => rng.gen_bool(*prob),
}
}

pub fn applies_to_msg<Ctx>(&self, msg: &Msg<Ctx>) -> bool
where
Ctx: Context,
{
match self {
Fault::DiscardGossipEvent(_) => matches!(msg, Msg::GossipEvent(_)),
Fault::DelayStartHeight(_, _) => matches!(msg, Msg::StartHeight(_)),
Fault::DelayMoveToNextHeight(_, _) => matches!(msg, Msg::MoveToNextHeight),
Fault::DelayProposeValue(_, _) => matches!(msg, Msg::ProposeValue(_, _, _)),
}
}
}

pub struct FaultyState<Ctx>
where
Ctx: Context,
{
node_state: State<Ctx>,
rng: Box<dyn rand::RngCore + Send + Sync>,
}

pub struct FaultyArgs {
rng: Box<dyn rand::RngCore + Send + Sync>,
}

pub struct FaultyNode<Ctx>
where
Ctx: Context,
{
node: Node<Ctx>,
faults: Faults,
}

impl<Ctx> FaultyNode<Ctx>
where
Ctx: Context,
Ctx::Height: Display,
Ctx::Vote: Protobuf<Proto = proto::Vote>,
Ctx::Proposal: Protobuf<Proto = proto::Proposal>,
{
pub fn new(node: Node<Ctx>, faults: Vec<Fault>) -> Self {
Self {
node,
faults: Faults::new(faults),
}
}

pub async fn spawn(
node: Node<Ctx>,
faults: Vec<Fault>,
rng: Box<dyn rand::RngCore + Send + Sync>,
) -> Result<ActorRef<Msg<Ctx>>, ractor::SpawnErr> {
let faulty_node = Self::new(node, faults);
let (actor_ref, _) = Actor::spawn(None, faulty_node, FaultyArgs { rng }).await?;

Ok(actor_ref)
}
}

#[ractor::async_trait]
impl<Ctx> Actor for FaultyNode<Ctx>
where
Ctx: Context,
Ctx::Height: Display,
Ctx::Vote: Protobuf<Proto = proto::Vote>,
Ctx::Proposal: Protobuf<Proto = proto::Proposal>,
{
type Msg = Msg<Ctx>;
type State = FaultyState<Ctx>;
type Arguments = FaultyArgs;

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: FaultyArgs,
) -> Result<Self::State, ActorProcessingErr> {
let state = self.node.pre_start(myself, ()).await?;

Ok(FaultyState {
node_state: state,
rng: args.rng,
})
}

async fn handle(
&self,
myself: ActorRef<Msg<Ctx>>,
msg: Msg<Ctx>,
state: &mut FaultyState<Ctx>,
) -> Result<(), ractor::ActorProcessingErr> {
if let Some(fault) = self.faults.choose_fault_for_msg(&msg, &mut state.rng) {
warn!("Injecting fault: {fault:?}");

match (&msg, fault) {
(Msg::GossipEvent(_), Fault::DiscardGossipEvent(_)) => {
// Do nothing
Ok(())
}
(Msg::StartHeight(_), Fault::DelayStartHeight(_, delay)) => {
tokio::time::sleep(*delay).await;
self.node.handle(myself, msg, &mut state.node_state).await
}
(Msg::MoveToNextHeight, Fault::DelayMoveToNextHeight(_, delay)) => {
tokio::time::sleep(*delay).await;
self.node.handle(myself, msg, &mut state.node_state).await
}
(Msg::ProposeValue(_, _, _), Fault::DelayProposeValue(_, delay)) => {
tokio::time::sleep(*delay).await;
self.node.handle(myself, msg, &mut state.node_state).await
}

// Wrong combination of message and fault, just handle the message normally.
// This should never happen, but oh well.
_ => self.node.handle(myself, msg, &mut state.node_state).await,
}
} else {
self.node.handle(myself, msg, &mut state.node_state).await
}
}
}
1 change: 1 addition & 0 deletions code/node/src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod faulty_node;
pub mod node;
pub mod proposal_builder;
pub mod timers;
Expand Down
61 changes: 45 additions & 16 deletions code/node/src/actors/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt::Display;
use std::sync::Arc;
use std::time::Instant;

use malachite_common::ValueId;
use malachite_common::{
Context, Height, NilOrVal, Proposal, Round, SignedProposal, SignedVote, Timeout, TimeoutStep,
Validator, ValidatorSet, Vote, VoteType,
Expand All @@ -16,7 +17,8 @@ use malachite_proto as proto;
use malachite_proto::Protobuf;
use malachite_vote::Threshold;
use ractor::rpc::call_and_forward;
use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef};
use ractor::{Actor, ActorProcessingErr, ActorRef};
use tokio::sync::mpsc;
use tracing::{debug, info, warn};

use crate::actors::proposal_builder::{BuildProposal, ProposedValue};
Expand All @@ -37,6 +39,7 @@ where
timers_config: timers::Config,
gossip: ActorRef<GossipMsg>,
proposal_builder: ActorRef<BuildProposal<Ctx>>,
tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>,
}

pub enum Msg<Ctx: Context> {
Expand All @@ -63,8 +66,6 @@ where
timers: ActorRef<TimerMsg>,
}

pub struct Args;

impl<Ctx> Node<Ctx>
where
Ctx: Context,
Expand All @@ -78,13 +79,15 @@ where
timers_config: timers::Config,
gossip: ActorRef<GossipMsg>,
proposal_builder: ActorRef<BuildProposal<Ctx>>,
tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>,
) -> Self {
Self {
ctx,
params,
timers_config,
gossip,
proposal_builder,
tx_decision,
}
}

Expand All @@ -94,10 +97,18 @@ where
timers_config: timers::Config,
gossip: ActorRef<GossipMsg>,
proposal_builder: ActorRef<BuildProposal<Ctx>>,
tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>,
) -> Result<ActorRef<Msg<Ctx>>, ractor::SpawnErr> {
let node = Self::new(ctx, params, timers_config, gossip, proposal_builder);
let (actor_ref, _) = Actor::spawn(Some(ActorName::from("Node")), node, Args).await?;
let node = Self::new(
ctx,
params,
timers_config,
gossip,
proposal_builder,
tx_decision,
);

let (actor_ref, _) = Actor::spawn(None, node, ()).await?;
Ok(actor_ref)
}

Expand Down Expand Up @@ -253,8 +264,22 @@ where
None
};

let outputs = state.driver.process(input).unwrap();
let outputs = state
.driver
.process(input)
.map_err(|e| format!("Driver failed to process input: {e}"))?;

self.process_driver_outputs(outputs, check_threshold, myself, state)
.await
}

async fn process_driver_outputs(
&self,
outputs: Vec<DriverOutput<Ctx>>,
check_threshold: Option<(VoteType, Round, NilOrVal<ValueId<Ctx>>)>,
myself: ActorRef<Msg<Ctx>>,
state: &mut State<Ctx>,
) -> Result<(), ActorProcessingErr> {
// When we receive a vote, check if we've gotten +2/3 votes for the value we just received a vote for,
// if so then cancel the corresponding timeout.
if let Some((vote_type, round, value)) = check_threshold {
Expand All @@ -263,11 +288,9 @@ where
NilOrVal::Val(value) => Threshold::Value(value),
};

if state
.driver
.votes()
.is_threshold_met(&round, vote_type, threshold.clone())
{
let votes = state.driver.votes();

if votes.is_threshold_met(&round, vote_type, threshold.clone()) {
let timeout = match vote_type {
VoteType::Prevote => Timeout::prevote(round),
VoteType::Precommit => Timeout::precommit(round),
Expand All @@ -279,10 +302,11 @@ where
}

for output in outputs {
match self
let next = self
.handle_driver_output(output, myself.clone(), state)
.await?
{
.await?;

match next {
Next::None => (),

Next::Input(input) => myself.cast(Msg::SendDriverInput(input))?,
Expand Down Expand Up @@ -355,6 +379,11 @@ where
DriverOutput::Decide(round, value) => {
info!("Decided on value {value:?} at round {round}");

let _ = self
.tx_decision
.send((state.driver.height(), round, value.clone()))
.await;

Ok(Next::Decided(round, value))
}

Expand Down Expand Up @@ -412,12 +441,12 @@ where
{
type Msg = Msg<Ctx>;
type State = State<Ctx>;
type Arguments = Args;
type Arguments = ();

async fn pre_start(
&self,
myself: ActorRef<Msg<Ctx>>,
_args: Args,
_args: (),
) -> Result<State<Ctx>, ractor::ActorProcessingErr> {
let (timers, _) =
Timers::spawn_linked(self.timers_config, myself.clone(), myself.get_cell()).await?;
Expand Down
14 changes: 12 additions & 2 deletions code/node/src/util/make_actor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;
use std::time::Duration;

use malachite_common::Round;
use malachite_gossip::actor::Gossip;
use malachite_test::utils::RotateProposer;
use malachite_test::{Address, Height, PrivateKey, TestContext, ValidatorSet};
use malachite_test::{Address, Height, PrivateKey, TestContext, ValidatorSet, Value};
use tokio::sync::mpsc;

use crate::actors::node::Node;
use crate::actors::proposal_builder::ProposalBuilder;
Expand All @@ -16,6 +18,7 @@ pub async fn make_node_actor(
validator_set: ValidatorSet,
private_key: PrivateKey,
address: Address,
tx_decision: mpsc::Sender<(Height, Round, Value)>,
) -> Node<TestContext> {
let keypair = gossip::Keypair::ed25519_from_bytes(private_key.inner().to_bytes()).unwrap();
let start_height = Height::new(1);
Expand Down Expand Up @@ -48,5 +51,12 @@ pub async fn make_node_actor(
let addr = "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap();
let (gossip, _) = Gossip::spawn(keypair, addr, config).await.unwrap();

Node::new(ctx, params, timers_config, gossip, proposal_builder)
Node::new(
ctx,
params,
timers_config,
gossip,
proposal_builder,
tx_decision,
)
}
Loading

0 comments on commit f627267

Please sign in to comment.