Skip to content

Commit

Permalink
Resolve cyclic dependency between Consensus and ProposalBuilder actor
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed May 30, 2024
1 parent 9dd8c69 commit ade91a2
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 82 deletions.
7 changes: 5 additions & 2 deletions code/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,10 @@ where
return Ok(());
}

self.proposal_builder
.cast(ProposalBuilderMsg::BlockPart(signed_block_part.block_part))?
self.proposal_builder.cast(ProposalBuilderMsg::BlockPart {
block_part: signed_block_part.block_part,
reply_to: myself.clone(),
})?
}
}

Expand Down Expand Up @@ -525,6 +527,7 @@ where
round,
timeout_duration,
address: self.params.address.clone(),
consensus: myself.clone(),
reply,
},
myself.get_cell(),
Expand Down
13 changes: 1 addition & 12 deletions code/actors/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::gossip_mempool::Msg as GossipMempoolMsg;
use crate::mempool::Msg as MempoolMsg;
use crate::proposal_builder::Msg as ProposalBuilderMsg;
use crate::timers::Config as TimersConfig;
use crate::util;
use crate::util::ValueBuilder;

pub struct Params<Ctx: Context> {
Expand Down Expand Up @@ -161,17 +160,7 @@ where
_state: &mut (),
) -> Result<(), ractor::ActorProcessingErr> {
match msg {
Msg::Start => {
let part_store = util::value_builder::test::PartStore::default();

self.proposal_builder
.cast(crate::proposal_builder::Msg::Init {
consensus: self.consensus.clone(),
part_store,
})?;

self.mempool.cast(crate::mempool::Msg::Start)?
}
Msg::Start => self.mempool.cast(crate::mempool::Msg::Start)?,
}

Ok(())
Expand Down
76 changes: 35 additions & 41 deletions code/actors/src/proposal_builder.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::fmt::Debug;
use std::marker::PhantomData;
use std::time::Duration;
use tracing::info;

use derive_where::derive_where;
use ractor::{async_trait, Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
use tracing::info;

use malachite_common::{Context, Round};
use malachite_driver::Validity;
Expand All @@ -30,23 +30,21 @@ pub struct ReceivedProposedValue<Ctx: Context> {
}

pub enum Msg<Ctx: Context> {
// Initialize the builder state with the gossip actor
Init {
consensus: ActorRef<ConsensusMsg<Ctx>>,
part_store: PartStore,
},

// Request to build a local block/ value from Driver
GetValue {
height: Ctx::Height,
round: Round,
timeout_duration: Duration,
reply: RpcReplyPort<LocallyProposedValue<Ctx>>,
consensus: ActorRef<ConsensusMsg<Ctx>>,
address: Ctx::Address,
reply: RpcReplyPort<LocallyProposedValue<Ctx>>,
},

// BlockPart received <-- consensus <-- gossip
BlockPart(Ctx::BlockPart),
BlockPart {
block_part: Ctx::BlockPart,
reply_to: ActorRef<ConsensusMsg<Ctx>>,
},

// Retrieve a block/ value for which all parts have been received
GetReceivedValue {
Expand All @@ -56,31 +54,34 @@ pub enum Msg<Ctx: Context> {
},
}

pub struct State<Ctx: Context> {
consensus: Option<ActorRef<ConsensusMsg<Ctx>>>,
pub struct State {
part_store: PartStore,
}

pub struct Args {
part_store: PartStore,
}

pub struct ProposalBuilder<Ctx: Context> {
_ctx: Ctx,
value_builder: Box<dyn ValueBuilder<Ctx>>,
marker: PhantomData<Ctx>,
}

impl<Ctx> ProposalBuilder<Ctx>
where
Ctx: Context + Debug,
Ctx: Context,
{
pub async fn spawn(
ctx: Ctx,
value_builder: Box<dyn ValueBuilder<Ctx>>,
part_store: PartStore,
) -> Result<ActorRef<Msg<Ctx>>, ActorProcessingErr> {
let (actor_ref, _) = Actor::spawn(
None,
Self {
_ctx: ctx,
value_builder,
marker: PhantomData,
},
(),
Args { part_store },
)
.await?;

Expand All @@ -93,7 +94,7 @@ where
round: Round,
timeout_duration: Duration,
address: Ctx::Address,
gossip_actor: Option<ActorRef<ConsensusMsg<Ctx>>>,
consensus: ActorRef<ConsensusMsg<Ctx>>,
part_store: &mut PartStore,
) -> Result<LocallyProposedValue<Ctx>, ActorProcessingErr> {
let value = self
Expand All @@ -103,7 +104,7 @@ where
round,
timeout_duration,
address,
gossip_actor,
consensus,
part_store,
)
.await;
Expand Down Expand Up @@ -135,19 +136,18 @@ where
}

#[async_trait]
impl<Ctx: Context + std::fmt::Debug> Actor for ProposalBuilder<Ctx> {
impl<Ctx: Context> Actor for ProposalBuilder<Ctx> {
type Msg = Msg<Ctx>;
type State = State<Ctx>;
type Arguments = ();
type State = State;
type Arguments = Args;

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: Self::Arguments,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(State {
consensus: None,
part_store: PartStore::new(),
part_store: args.part_store,
})
}

Expand All @@ -158,18 +158,11 @@ impl<Ctx: Context + std::fmt::Debug> Actor for ProposalBuilder<Ctx> {
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match msg {
Msg::Init {
consensus,
part_store,
} => {
state.consensus = Some(consensus);
state.part_store = part_store
}

Msg::GetValue {
height,
round,
timeout_duration,
consensus,
reply,
address,
} => {
Expand All @@ -179,23 +172,23 @@ impl<Ctx: Context + std::fmt::Debug> Actor for ProposalBuilder<Ctx> {
round,
timeout_duration,
address,
state.consensus.clone(),
consensus,
&mut state.part_store,
)
.await?;

reply.send(value)?;
}

Msg::BlockPart(block_part) => {
Msg::BlockPart {
block_part,
reply_to,
} => {
let maybe_block = self.build_value(block_part, &mut state.part_store).await?;

// Send the proposed value (from blockparts) to consensus/ Driver
if let Some(value_assembled) = maybe_block {
state
.consensus
.as_ref()
.unwrap()
.cast(ConsensusMsg::<Ctx>::BlockReceived(value_assembled))
.unwrap();
reply_to.cast(ConsensusMsg::BlockReceived(value_assembled))?;
}
}

Expand All @@ -208,6 +201,7 @@ impl<Ctx: Context + std::fmt::Debug> Actor for ProposalBuilder<Ctx> {
.value_builder
.maybe_received_value(height, round, &mut state.part_store)
.await;

reply.send(value)?;
}
}
Expand Down
10 changes: 5 additions & 5 deletions code/actors/src/util/make_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::mempool::Mempool;
use crate::node::{Msg as NodeMsg, Msg, Node};
use crate::proposal_builder::ProposalBuilder;
use crate::timers::Config as TimersConfig;
use crate::util::value_builder::test::PartStore;
use crate::util::TestValueBuilder;

pub async fn make_node_actor(
Expand Down Expand Up @@ -43,12 +44,11 @@ pub async fn make_node_actor(
.await
.unwrap();

// Spawn the proposal builder
let builder = TestValueBuilder::<TestContext>::new(mempool.clone());
let value_builder = Box::new(builder.clone());

let ctx = TestContext::new(validator_pk.clone());
let proposal_builder = ProposalBuilder::spawn(ctx.clone(), value_builder)

// Spawn the proposal builder
let value_builder = Box::new(TestValueBuilder::<TestContext>::new(mempool.clone()));
let proposal_builder = ProposalBuilder::spawn(value_builder, PartStore::new())
.await
.unwrap();

Expand Down
42 changes: 20 additions & 22 deletions code/actors/src/util/value_builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use async_trait::async_trait;
use ractor::ActorRef;
use std::marker::PhantomData;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use ractor::ActorRef;
use tracing::{error, info, trace};

use malachite_common::{Context, Round};

use crate::consensus::Msg as ConsensusMsg;
use crate::proposal_builder::{LocallyProposedValue, ReceivedProposedValue};
use crate::util::value_builder::test::PartStore;
use malachite_common::{Context, Round};

#[async_trait]
pub trait ValueBuilder<Ctx: Context>: Send + Sync + 'static {
Expand All @@ -16,7 +19,7 @@ pub trait ValueBuilder<Ctx: Context>: Send + Sync + 'static {
round: Round,
timeout_duration: Duration,
address: Ctx::Address,
gossip_actor: Option<ActorRef<crate::consensus::Msg<Ctx>>>,
consensus: ActorRef<ConsensusMsg<Ctx>>,
part_store: &mut PartStore,
) -> Option<LocallyProposedValue<Ctx>>;

Expand Down Expand Up @@ -77,16 +80,16 @@ pub mod test {
round: Round,
timeout_duration: Duration,
validator_address: Address,
gossip_actor: Option<ActorRef<crate::consensus::Msg<TestContext>>>,
consensus: ActorRef<ConsensusMsg<TestContext>>,
part_store: &mut PartStore,
) -> Option<LocallyProposedValue<TestContext>> {
let mut result = None;
let now = Instant::now();
let deadline = now + timeout_duration.mul_f32(TIME_ALLOWANCE_FACTOR);
let expiration_time = now + timeout_duration;

let mut tx_batch = vec![];
let mut sequence = 1;
let mut result = None;

loop {
trace!(
Expand All @@ -95,6 +98,7 @@ pub mod test {
round,
sequence
);

let mut txes = self
.tx_streamer
.call(
Expand Down Expand Up @@ -124,12 +128,8 @@ pub mod test {

part_store.store(block_part.clone());

gossip_actor
.as_ref()
.unwrap()
.cast(crate::consensus::Msg::<TestContext>::BuilderBlockPart(
block_part.clone(),
))
consensus
.cast(ConsensusMsg::BuilderBlockPart(block_part.clone()))
.unwrap();

// Simulate execution
Expand All @@ -138,21 +138,22 @@ pub mod test {

sequence += 1;

if Instant::now().gt(&expiration_time) {
error!(
"Value Builder started at {:?} but failed to complete by expiration time {:?}", now, expiration_time);
if Instant::now() > expiration_time {
error!( "Value Builder started at {now:?} but failed to complete by expiration time {expiration_time:?}");
result = None;
break;
}

if Instant::now().gt(&deadline) {
if Instant::now() > deadline {
// Create, store and gossip the BlockMetadata in a BlockPart
let value = Value::new_from_transactions(tx_batch.clone());

result = Some(LocallyProposedValue {
height,
round,
value: Some(value),
});

let block_part = BlockPart::new(
height,
round,
Expand All @@ -166,19 +167,16 @@ pub mod test {

part_store.store(block_part.clone());

gossip_actor
.as_ref()
.unwrap()
.cast(crate::consensus::Msg::<TestContext>::BuilderBlockPart(
block_part.clone(),
))
consensus
.cast(ConsensusMsg::BuilderBlockPart(block_part.clone()))
.unwrap();

info!(
"Value Builder created a block with {} tx-es, block hash (consensus value) {:?} ",
tx_batch.len(),
result
);

break;
}
}
Expand Down

0 comments on commit ade91a2

Please sign in to comment.