Skip to content

Commit

Permalink
Merge CAL actor with ProposalBuilder actor and rename to Host a…
Browse files Browse the repository at this point in the history
…ctor (#200)

* Remove CAL actor and move `GetValidatorSet` into `ProposalBuilder`

* Rename `ProposalBuilder` actor to `Host`

* Make `PartStore` context-generic and move into its own module (#201)
  • Loading branch information
romac authored May 31, 2024
1 parent 49759cc commit 1e7cba6
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 197 deletions.
64 changes: 0 additions & 64 deletions code/actors/src/cal.rs

This file was deleted.

40 changes: 14 additions & 26 deletions code/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ use malachite_proto as proto;
use malachite_proto::Protobuf;
use malachite_vote::{Threshold, ThresholdParams};

use crate::cal::Msg as CALMsg;
use crate::gossip::Msg as GossipMsg;
use crate::proposal_builder::{
LocallyProposedValue, Msg as ProposalBuilderMsg, ReceivedProposedValue,
};
use crate::host::{LocallyProposedValue, Msg as HostMsg, ReceivedProposedValue};
use crate::timers::{Config as TimersConfig, Msg as TimersMsg, TimeoutElapsed, Timers};
use crate::util::forward;

Expand All @@ -55,8 +52,7 @@ where
params: Params<Ctx>,
timers_config: TimersConfig,
gossip: ActorRef<GossipMsg>,
cal: ActorRef<CALMsg<Ctx>>,
proposal_builder: ActorRef<ProposalBuilderMsg<Ctx>>,
host: ActorRef<HostMsg<Ctx>>,
tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>,
}

Expand Down Expand Up @@ -107,17 +103,15 @@ where
params: Params<Ctx>,
timers_config: TimersConfig,
gossip: ActorRef<GossipMsg>,
cal: ActorRef<CALMsg<Ctx>>,
proposal_builder: ActorRef<ProposalBuilderMsg<Ctx>>,
host: ActorRef<HostMsg<Ctx>>,
tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>,
) -> Self {
Self {
ctx,
params,
timers_config,
gossip,
cal,
proposal_builder,
host,
tx_decision,
}
}
Expand All @@ -128,20 +122,11 @@ where
params: Params<Ctx>,
timers_config: TimersConfig,
gossip: ActorRef<GossipMsg>,
cal: ActorRef<CALMsg<Ctx>>,
proposal_builder: ActorRef<ProposalBuilderMsg<Ctx>>,
host: ActorRef<HostMsg<Ctx>>,
tx_decision: mpsc::Sender<(Ctx::Height, Round, Ctx::Value)>,
supervisor: Option<ActorCell>,
) -> Result<ActorRef<Msg<Ctx>>, ractor::SpawnErr> {
let node = Self::new(
ctx,
params,
timers_config,
gossip,
cal,
proposal_builder,
tx_decision,
);
let node = Self::new(ctx, params, timers_config, gossip, host, tx_decision);

let (actor_ref, _) = if let Some(supervisor) = supervisor {
Actor::spawn_linked(None, node, (), supervisor).await?
Expand Down Expand Up @@ -277,7 +262,7 @@ where
}

// TODO - verify that the proposal was signed by the proposer for the height and round, drop otherwise.
self.proposal_builder.cast(ProposalBuilderMsg::BlockPart {
self.host.cast(HostMsg::BlockPart {
block_part: signed_block_part.block_part,
reply_to: myself.clone(),
})?
Expand Down Expand Up @@ -521,8 +506,8 @@ where
// Call `GetValue` on the CAL actor, and forward the reply to the current actor,
// wrapping it in `Msg::ProposeValue`.
call_and_forward(
&self.proposal_builder.get_cell(),
|reply| ProposalBuilderMsg::GetValue {
&self.host.get_cell(),
|reply| HostMsg::GetValue {
height,
round,
timeout_duration,
Expand Down Expand Up @@ -563,8 +548,11 @@ where
height: Ctx::Height,
) -> Result<Ctx::ValidatorSet, ActorProcessingErr> {
let result = self
.cal
.call(|reply| CALMsg::GetValidatorSet { height, reply }, None)
.host
.call(
|reply_to| HostMsg::GetValidatorSet { height, reply_to },
None,
)
.await?;

// TODO: Figure out better way to handle this:
Expand Down
55 changes: 38 additions & 17 deletions code/actors/src/proposal_builder.rs → code/actors/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use malachite_common::{Context, Round};
use malachite_driver::Validity;

use crate::consensus::Msg as ConsensusMsg;
use crate::util::value_builder::test::PartStore;
use crate::util::PartStore;
use crate::util::ValueBuilder;

#[derive_where(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -50,38 +50,49 @@ pub enum Msg<Ctx: Context> {
GetReceivedValue {
height: Ctx::Height,
round: Round,
reply: RpcReplyPort<Option<ReceivedProposedValue<Ctx>>>,
reply_to: RpcReplyPort<Option<ReceivedProposedValue<Ctx>>>,
},

GetValidatorSet {
height: Ctx::Height,
reply_to: RpcReplyPort<Ctx::ValidatorSet>,
},
}

pub struct State {
part_store: PartStore,
pub struct State<Ctx: Context> {
part_store: PartStore<Ctx>,
validator_set: Ctx::ValidatorSet,
}

pub struct Args {
part_store: PartStore,
pub struct Args<Ctx: Context> {
part_store: PartStore<Ctx>,
validator_set: Ctx::ValidatorSet,
}

pub struct ProposalBuilder<Ctx: Context> {
pub struct Host<Ctx: Context> {
value_builder: Box<dyn ValueBuilder<Ctx>>,
marker: PhantomData<Ctx>,
}

impl<Ctx> ProposalBuilder<Ctx>
impl<Ctx> Host<Ctx>
where
Ctx: Context,
{
pub async fn spawn(
value_builder: Box<dyn ValueBuilder<Ctx>>,
part_store: PartStore,
part_store: PartStore<Ctx>,
validator_set: Ctx::ValidatorSet,
) -> Result<ActorRef<Msg<Ctx>>, ActorProcessingErr> {
let (actor_ref, _) = Actor::spawn(
None,
Self {
value_builder,
marker: PhantomData,
},
Args { part_store },
Args {
part_store,
validator_set,
},
)
.await?;

Expand All @@ -95,7 +106,7 @@ where
timeout_duration: Duration,
address: Ctx::Address,
consensus: ActorRef<ConsensusMsg<Ctx>>,
part_store: &mut PartStore,
part_store: &mut PartStore<Ctx>,
) -> Result<LocallyProposedValue<Ctx>, ActorProcessingErr> {
let value = self
.value_builder
Expand All @@ -120,7 +131,7 @@ where
async fn build_value(
&self,
block_part: Ctx::BlockPart,
part_store: &mut PartStore,
part_store: &mut PartStore<Ctx>,
) -> Result<Option<ReceivedProposedValue<Ctx>>, ActorProcessingErr> {
let value = self
.value_builder
Expand All @@ -136,10 +147,10 @@ where
}

#[async_trait]
impl<Ctx: Context> Actor for ProposalBuilder<Ctx> {
impl<Ctx: Context> Actor for Host<Ctx> {
type Msg = Msg<Ctx>;
type State = State;
type Arguments = Args;
type State = State<Ctx>;
type Arguments = Args<Ctx>;

async fn pre_start(
&self,
Expand All @@ -148,6 +159,7 @@ impl<Ctx: Context> Actor for ProposalBuilder<Ctx> {
) -> Result<Self::State, ActorProcessingErr> {
Ok(State {
part_store: args.part_store,
validator_set: args.validator_set,
})
}

Expand Down Expand Up @@ -195,14 +207,23 @@ impl<Ctx: Context> Actor for ProposalBuilder<Ctx> {
Msg::GetReceivedValue {
height,
round,
reply,
reply_to,
} => {
let value = self
.value_builder
.maybe_received_value(height, round, &mut state.part_store)
.await;

reply.send(value)?;
reply_to.send(value)?;
}

Msg::GetValidatorSet {
height: _,
reply_to,
} => {
// FIXME: This is just a stub
let validator_set = state.validator_set.clone();
reply_to.send(validator_set)?;
}
}

Expand Down
3 changes: 1 addition & 2 deletions code/actors/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
pub mod cal;
pub mod consensus;
pub mod gossip;
pub mod gossip_mempool;
pub mod host;
pub mod mempool;
pub mod node;
pub mod prelude;
pub mod proposal_builder;
pub mod timers;
pub mod util;
15 changes: 5 additions & 10 deletions code/actors/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ use malachite_common::{Context, Round};
use malachite_proto::Protobuf;
use malachite_vote::ThresholdParams;

use crate::cal::Msg as CALMsg;
use crate::consensus::Msg as ConsensusMsg;
use crate::gossip::Msg as GossipMsg;
use crate::gossip_mempool::Msg as GossipMempoolMsg;
use crate::host::Msg as HostMsg;
use crate::mempool::Msg as MempoolMsg;
use crate::proposal_builder::Msg as ProposalBuilderMsg;
use crate::timers::Config as TimersConfig;
use crate::util::ValueBuilder;

Expand Down Expand Up @@ -79,12 +78,11 @@ pub struct Params<Ctx: Context> {
#[allow(dead_code)]
pub struct Node<Ctx: Context> {
ctx: Ctx,
cal: ActorRef<CALMsg<Ctx>>,
gossip: ActorRef<GossipMsg>,
consensus: ActorRef<ConsensusMsg<Ctx>>,
gossip_mempool: ActorRef<GossipMempoolMsg>,
mempool: ActorRef<MempoolMsg>,
proposal_builder: ActorRef<ProposalBuilderMsg<Ctx>>,
host: ActorRef<HostMsg<Ctx>>,
start_height: Ctx::Height,
}

Expand All @@ -97,22 +95,20 @@ where
#[allow(clippy::too_many_arguments)]
pub fn new(
ctx: Ctx,
cal: ActorRef<CALMsg<Ctx>>,
gossip: ActorRef<GossipMsg>,
consensus: ActorRef<ConsensusMsg<Ctx>>,
gossip_mempool: ActorRef<GossipMempoolMsg>,
mempool: ActorRef<MempoolMsg>,
proposal_builder: ActorRef<ProposalBuilderMsg<Ctx>>,
host: ActorRef<HostMsg<Ctx>>,
start_height: Ctx::Height,
) -> Self {
Self {
ctx,
cal,
gossip,
consensus,
gossip_mempool,
mempool,
proposal_builder,
host,
start_height,
}
}
Expand Down Expand Up @@ -143,12 +139,11 @@ where
_args: (),
) -> Result<(), ractor::ActorProcessingErr> {
// Set ourselves as the supervisor of the other actors
self.cal.link(myself.get_cell());
self.gossip.link(myself.get_cell());
self.consensus.link(myself.get_cell());
self.gossip_mempool.link(myself.get_cell());
self.mempool.link(myself.get_cell());
self.proposal_builder.link(myself.get_cell());
self.host.link(myself.get_cell());

Ok(())
}
Expand Down
Loading

0 comments on commit 1e7cba6

Please sign in to comment.