From ade91a2f6846e3531f51520ee3b8920e3bd1ec80 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 30 May 2024 16:46:57 +0200 Subject: [PATCH] Resolve cyclic dependency between Consensus and ProposalBuilder actor --- code/actors/src/consensus.rs | 7 ++- code/actors/src/node.rs | 13 +---- code/actors/src/proposal_builder.rs | 76 ++++++++++++--------------- code/actors/src/util/make_actor.rs | 10 ++-- code/actors/src/util/value_builder.rs | 42 +++++++-------- 5 files changed, 66 insertions(+), 82 deletions(-) diff --git a/code/actors/src/consensus.rs b/code/actors/src/consensus.rs index 918fedf1c..4016d0692 100644 --- a/code/actors/src/consensus.rs +++ b/code/actors/src/consensus.rs @@ -277,8 +277,10 @@ where return Ok(()); } - self.proposal_builder - .cast(ProposalBuilderMsg::BlockPart(signed_block_part.block_part))? + self.proposal_builder.cast(ProposalBuilderMsg::BlockPart { + block_part: signed_block_part.block_part, + reply_to: myself.clone(), + })? } } @@ -525,6 +527,7 @@ where round, timeout_duration, address: self.params.address.clone(), + consensus: myself.clone(), reply, }, myself.get_cell(), diff --git a/code/actors/src/node.rs b/code/actors/src/node.rs index 6c8ef7dfe..d66b8ff08 100644 --- a/code/actors/src/node.rs +++ b/code/actors/src/node.rs @@ -14,7 +14,6 @@ use crate::gossip_mempool::Msg as GossipMempoolMsg; use crate::mempool::Msg as MempoolMsg; use crate::proposal_builder::Msg as ProposalBuilderMsg; use crate::timers::Config as TimersConfig; -use crate::util; use crate::util::ValueBuilder; pub struct Params { @@ -161,17 +160,7 @@ where _state: &mut (), ) -> Result<(), ractor::ActorProcessingErr> { match msg { - Msg::Start => { - let part_store = util::value_builder::test::PartStore::default(); - - self.proposal_builder - .cast(crate::proposal_builder::Msg::Init { - consensus: self.consensus.clone(), - part_store, - })?; - - self.mempool.cast(crate::mempool::Msg::Start)? - } + Msg::Start => self.mempool.cast(crate::mempool::Msg::Start)?, } Ok(()) diff --git a/code/actors/src/proposal_builder.rs b/code/actors/src/proposal_builder.rs index f53f46eb5..691400600 100644 --- a/code/actors/src/proposal_builder.rs +++ b/code/actors/src/proposal_builder.rs @@ -1,9 +1,9 @@ -use std::fmt::Debug; +use std::marker::PhantomData; use std::time::Duration; -use tracing::info; use derive_where::derive_where; use ractor::{async_trait, Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; +use tracing::info; use malachite_common::{Context, Round}; use malachite_driver::Validity; @@ -30,23 +30,21 @@ pub struct ReceivedProposedValue { } pub enum Msg { - // Initialize the builder state with the gossip actor - Init { - consensus: ActorRef>, - part_store: PartStore, - }, - // Request to build a local block/ value from Driver GetValue { height: Ctx::Height, round: Round, timeout_duration: Duration, - reply: RpcReplyPort>, + consensus: ActorRef>, address: Ctx::Address, + reply: RpcReplyPort>, }, // BlockPart received <-- consensus <-- gossip - BlockPart(Ctx::BlockPart), + BlockPart { + block_part: Ctx::BlockPart, + reply_to: ActorRef>, + }, // Retrieve a block/ value for which all parts have been received GetReceivedValue { @@ -56,31 +54,34 @@ pub enum Msg { }, } -pub struct State { - consensus: Option>>, +pub struct State { + part_store: PartStore, +} + +pub struct Args { part_store: PartStore, } pub struct ProposalBuilder { - _ctx: Ctx, value_builder: Box>, + marker: PhantomData, } impl ProposalBuilder where - Ctx: Context + Debug, + Ctx: Context, { pub async fn spawn( - ctx: Ctx, value_builder: Box>, + part_store: PartStore, ) -> Result>, ActorProcessingErr> { let (actor_ref, _) = Actor::spawn( None, Self { - _ctx: ctx, value_builder, + marker: PhantomData, }, - (), + Args { part_store }, ) .await?; @@ -93,7 +94,7 @@ where round: Round, timeout_duration: Duration, address: Ctx::Address, - gossip_actor: Option>>, + consensus: ActorRef>, part_store: &mut PartStore, ) -> Result, ActorProcessingErr> { let value = self @@ -103,7 +104,7 @@ where round, timeout_duration, address, - gossip_actor, + consensus, part_store, ) .await; @@ -135,19 +136,18 @@ where } #[async_trait] -impl Actor for ProposalBuilder { +impl Actor for ProposalBuilder { type Msg = Msg; - type State = State; - type Arguments = (); + type State = State; + type Arguments = Args; async fn pre_start( &self, _myself: ActorRef, - _: Self::Arguments, + args: Self::Arguments, ) -> Result { Ok(State { - consensus: None, - part_store: PartStore::new(), + part_store: args.part_store, }) } @@ -158,18 +158,11 @@ impl Actor for ProposalBuilder { state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { match msg { - Msg::Init { - consensus, - part_store, - } => { - state.consensus = Some(consensus); - state.part_store = part_store - } - Msg::GetValue { height, round, timeout_duration, + consensus, reply, address, } => { @@ -179,23 +172,23 @@ impl Actor for ProposalBuilder { round, timeout_duration, address, - state.consensus.clone(), + consensus, &mut state.part_store, ) .await?; + reply.send(value)?; } - Msg::BlockPart(block_part) => { + Msg::BlockPart { + block_part, + reply_to, + } => { let maybe_block = self.build_value(block_part, &mut state.part_store).await?; + // Send the proposed value (from blockparts) to consensus/ Driver if let Some(value_assembled) = maybe_block { - state - .consensus - .as_ref() - .unwrap() - .cast(ConsensusMsg::::BlockReceived(value_assembled)) - .unwrap(); + reply_to.cast(ConsensusMsg::BlockReceived(value_assembled))?; } } @@ -208,6 +201,7 @@ impl Actor for ProposalBuilder { .value_builder .maybe_received_value(height, round, &mut state.part_store) .await; + reply.send(value)?; } } diff --git a/code/actors/src/util/make_actor.rs b/code/actors/src/util/make_actor.rs index d04b654df..ca036aa33 100644 --- a/code/actors/src/util/make_actor.rs +++ b/code/actors/src/util/make_actor.rs @@ -15,6 +15,7 @@ use crate::mempool::Mempool; use crate::node::{Msg as NodeMsg, Msg, Node}; use crate::proposal_builder::ProposalBuilder; use crate::timers::Config as TimersConfig; +use crate::util::value_builder::test::PartStore; use crate::util::TestValueBuilder; pub async fn make_node_actor( @@ -43,12 +44,11 @@ pub async fn make_node_actor( .await .unwrap(); - // Spawn the proposal builder - let builder = TestValueBuilder::::new(mempool.clone()); - let value_builder = Box::new(builder.clone()); - let ctx = TestContext::new(validator_pk.clone()); - let proposal_builder = ProposalBuilder::spawn(ctx.clone(), value_builder) + + // Spawn the proposal builder + let value_builder = Box::new(TestValueBuilder::::new(mempool.clone())); + let proposal_builder = ProposalBuilder::spawn(value_builder, PartStore::new()) .await .unwrap(); diff --git a/code/actors/src/util/value_builder.rs b/code/actors/src/util/value_builder.rs index f21c3002c..97de47574 100644 --- a/code/actors/src/util/value_builder.rs +++ b/code/actors/src/util/value_builder.rs @@ -1,12 +1,15 @@ -use async_trait::async_trait; -use ractor::ActorRef; use std::marker::PhantomData; use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use ractor::ActorRef; use tracing::{error, info, trace}; +use malachite_common::{Context, Round}; + +use crate::consensus::Msg as ConsensusMsg; use crate::proposal_builder::{LocallyProposedValue, ReceivedProposedValue}; use crate::util::value_builder::test::PartStore; -use malachite_common::{Context, Round}; #[async_trait] pub trait ValueBuilder: Send + Sync + 'static { @@ -16,7 +19,7 @@ pub trait ValueBuilder: Send + Sync + 'static { round: Round, timeout_duration: Duration, address: Ctx::Address, - gossip_actor: Option>>, + consensus: ActorRef>, part_store: &mut PartStore, ) -> Option>; @@ -77,16 +80,16 @@ pub mod test { round: Round, timeout_duration: Duration, validator_address: Address, - gossip_actor: Option>>, + consensus: ActorRef>, part_store: &mut PartStore, ) -> Option> { - let mut result = None; let now = Instant::now(); let deadline = now + timeout_duration.mul_f32(TIME_ALLOWANCE_FACTOR); let expiration_time = now + timeout_duration; let mut tx_batch = vec![]; let mut sequence = 1; + let mut result = None; loop { trace!( @@ -95,6 +98,7 @@ pub mod test { round, sequence ); + let mut txes = self .tx_streamer .call( @@ -124,12 +128,8 @@ pub mod test { part_store.store(block_part.clone()); - gossip_actor - .as_ref() - .unwrap() - .cast(crate::consensus::Msg::::BuilderBlockPart( - block_part.clone(), - )) + consensus + .cast(ConsensusMsg::BuilderBlockPart(block_part.clone())) .unwrap(); // Simulate execution @@ -138,21 +138,22 @@ pub mod test { sequence += 1; - if Instant::now().gt(&expiration_time) { - error!( - "Value Builder started at {:?} but failed to complete by expiration time {:?}", now, expiration_time); + if Instant::now() > expiration_time { + error!( "Value Builder started at {now:?} but failed to complete by expiration time {expiration_time:?}"); result = None; break; } - if Instant::now().gt(&deadline) { + if Instant::now() > deadline { // Create, store and gossip the BlockMetadata in a BlockPart let value = Value::new_from_transactions(tx_batch.clone()); + result = Some(LocallyProposedValue { height, round, value: Some(value), }); + let block_part = BlockPart::new( height, round, @@ -166,12 +167,8 @@ pub mod test { part_store.store(block_part.clone()); - gossip_actor - .as_ref() - .unwrap() - .cast(crate::consensus::Msg::::BuilderBlockPart( - block_part.clone(), - )) + consensus + .cast(ConsensusMsg::BuilderBlockPart(block_part.clone())) .unwrap(); info!( @@ -179,6 +176,7 @@ pub mod test { tx_batch.len(), result ); + break; } }