Skip to content

Commit

Permalink
Process received block parts, send valueId and valid to consensus actor.
Browse files Browse the repository at this point in the history
Multiplex proposal and last block part, pass valid from the latter.

Check if all parts are present when a proposal is received, otherwise
queue.

Implement a temporary store for block parts.
  • Loading branch information
ancazamfir committed May 30, 2024
1 parent f569037 commit 3e3ff5a
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 89 deletions.
75 changes: 62 additions & 13 deletions code/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use async_trait::async_trait;
use ractor::rpc::{call_and_forward, CallResult};
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef};
use tokio::sync::mpsc;
use tracing::{debug, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

use malachite_common::{
Context, Height, NilOrVal, Proposal, Round, SignedBlockPart, SignedProposal, SignedVote,
Timeout, TimeoutStep, Validator, ValidatorSet, Value, ValueId, Vote, VoteType,
};
use malachite_driver::Driver;
use malachite_driver::Input as DriverInput;
use malachite_driver::Input::BlockReceived;
use malachite_driver::Output as DriverOutput;
use malachite_driver::Validity;
use malachite_gossip::{Channel, Event as GossipEvent};
Expand All @@ -25,7 +26,9 @@ use malachite_vote::{Threshold, ThresholdParams};

use crate::cal::Msg as CALMsg;
use crate::gossip::Msg as GossipMsg;
use crate::proposal_builder::{Msg as ProposalBuilderMsg, ProposedValue};
use crate::proposal_builder::{
LocallyProposedValue, Msg as ProposalBuilderMsg, ReceivedProposedValue,
};
use crate::timers::{Config as TimersConfig, Msg as TimersMsg, TimeoutElapsed, Timers};
use crate::util::forward;

Expand Down Expand Up @@ -72,6 +75,7 @@ pub enum Msg<Ctx: Context> {
ProposeValue(Ctx::Height, Round, Option<Ctx::Value>),
// The proposal builder has build a new block part, needs to be signed and gossiped by consensus
BuilderBlockPart(Ctx::BlockPart),
BlockReceived(ReceivedProposedValue<Ctx>),
}

impl<Ctx: Context> From<TimeoutElapsed> for Msg<Ctx> {
Expand All @@ -93,7 +97,7 @@ where

impl<Ctx> Consensus<Ctx>
where
Ctx: Context,
Ctx: Context + std::fmt::Debug,
Ctx::Vote: Protobuf<Proto = proto::Vote>,
Ctx::Proposal: Protobuf<Proto = proto::Proposal>,
Ctx::BlockPart: Protobuf<Proto = proto::BlockPart>,
Expand Down Expand Up @@ -214,17 +218,47 @@ where
// TODO - proposals with invalid signatures should be dropped.
// For well signed we should validate the proposal against the block parts (if all received).
// Add `valid()` to Context.
let valid = self
.ctx
.verify_signed_proposal(&signed_proposal, validator.public_key());
let proposal = &signed_proposal.proposal;
let proposal_height = proposal.height();
let proposal_round = proposal.round();

let proposal_height = signed_proposal.proposal.height();
if !self
.ctx
.verify_signed_proposal(&signed_proposal, validator.public_key())
{
error!(
"Received invalid signature for proposal ({}, {}, {:?}",
proposal_height,
proposal_round,
proposal.value()
)
}
assert!(proposal_height == state.driver.height());

myself.cast(Msg::SendDriverInput(DriverInput::Proposal(
signed_proposal.proposal,
Validity::from_valid(valid),
)))?;
let received_block = state
.driver
.received_blocks
.iter()
.find(|&x| x.0 == proposal_height && x.1 == proposal_round);

match received_block {
Some(block) => {
let valid = block.3;
myself.cast(Msg::SendDriverInput(DriverInput::Proposal(
proposal.clone(),
valid,
)))?;
}
None => {
// Store the proposal and wait for all block parts
// TODO - or maybe integrate with receive-proposal() here? will this block until all parts are received?
info!(
"Received proposal {:?} before all block parts --STORING",
proposal
);
state.driver.proposal = Some(proposal.clone());
}
}
}
NetworkMsg::BlockPart(block_part) => {
let signed_block_part = SignedBlockPart::<Ctx>::from_proto(block_part).unwrap();
Expand Down Expand Up @@ -304,6 +338,9 @@ where

DriverInput::Vote(_) => (),
DriverInput::TimeoutElapsed(_) => (),
DriverInput::BlockReceived(..) => {
debug!("Received full block {:?}", input);
}
}

let check_threshold = if let DriverInput::Vote(vote) = &input {
Expand Down Expand Up @@ -491,7 +528,7 @@ where
reply,
},
myself.get_cell(),
|proposed: ProposedValue<Ctx>| {
|proposed: LocallyProposedValue<Ctx>| {
Msg::<Ctx>::ProposeValue(proposed.height, proposed.round, proposed.value)
},
None,
Expand Down Expand Up @@ -543,7 +580,7 @@ where
#[async_trait]
impl<Ctx> Actor for Consensus<Ctx>
where
Ctx: Context,
Ctx: Context + std::fmt::Debug,
Ctx::Height: Display,
Ctx::Vote: Protobuf<Proto = proto::Vote>,
Ctx::Proposal: Protobuf<Proto = proto::Proposal>,
Expand Down Expand Up @@ -742,6 +779,18 @@ where
self.gossip
.cast(GossipMsg::Broadcast(Channel::BlockParts, bytes))?;
}
Msg::BlockReceived(value) => {
info!("BLOCK RECEIVED {:?}", value);

if let Some(v) = value.value {
self.send_driver_input(
BlockReceived(value.height, value.round, v, value.valid),
myself,
state,
)
.await?;
}
}
}

Ok(())
Expand Down
3 changes: 3 additions & 0 deletions code/actors/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::gossip_mempool::Msg as GossipMempoolMsg;
use crate::mempool::Msg as MempoolMsg;
use crate::proposal_builder::Msg as ProposalBuilderMsg;
use crate::timers::Config as TimersConfig;
use crate::util;
use crate::util::ValueBuilder;

pub struct Params<Ctx: Context> {
Expand Down Expand Up @@ -161,9 +162,11 @@ where
) -> Result<(), ractor::ActorProcessingErr> {
match msg {
Msg::Start => {
let part_store = util::value_builder::test::PartStore::default();
self.proposal_builder
.cast(crate::proposal_builder::Msg::Init {
gossip_actor: self.consensus.clone(),
part_store,
})?;
self.mempool.cast(crate::mempool::Msg::Start)?
}
Expand Down
118 changes: 93 additions & 25 deletions code/actors/src/proposal_builder.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,72 @@
use std::fmt::Debug;
use std::time::Duration;
use tracing::info;

use crate::util::value_builder::test::PartStore;
use malachite_common::{Context, Round};
use malachite_driver::Validity;
use ractor::{async_trait, Actor, ActorProcessingErr, ActorRef, RpcReplyPort};

use crate::util::ValueBuilder;

pub struct ProposedValue<Ctx: Context> {
#[derive(Debug)]
pub struct LocallyProposedValue<Ctx: Context> {
pub height: Ctx::Height,
pub round: Round,
pub value: Option<Ctx::Value>, // todo - should we remove?
}

#[derive(Debug)]
pub struct ReceivedProposedValue<Ctx: Context> {
pub validator_address: Ctx::Address,
pub height: Ctx::Height,
pub round: Round,
pub value: Option<Ctx::Value>,
pub valid: Validity,
}

pub enum Msg<Ctx: Context> {
// Initialize the builder state with the gossip actor
Init {
gossip_actor: ActorRef<crate::consensus::Msg<Ctx>>,
part_store: PartStore,
},

// Request for a value from Driver
// Request to build a local block/ value from Driver
GetValue {
height: Ctx::Height,
round: Round,
timeout_duration: Duration,
reply: RpcReplyPort<ProposedValue<Ctx>>,
reply: RpcReplyPort<LocallyProposedValue<Ctx>>,
address: Ctx::Address,
},

// BlockPart received <-- consensus <-- gossip
BlockPart(Ctx::BlockPart),

// Retrieve a block/ value for which all parts have been received
GetReceivedValue {
height: Ctx::Height,
round: Round,
reply: RpcReplyPort<Option<ReceivedProposedValue<Ctx>>>,
},
}

pub struct State<Ctx>
where
Ctx: Context,
{
pub struct State<Ctx: Context> {
gossip_actor: Option<ActorRef<crate::consensus::Msg<Ctx>>>,
part_store: PartStore,
}

pub struct ProposalBuilder<Ctx: Context> {
pub struct ProposalBuilder<Ctx: Context + std::fmt::Debug> {
#[allow(dead_code)]
ctx: Ctx,
value_builder: Box<dyn ValueBuilder<Ctx>>,
}

impl<Ctx: Context> ProposalBuilder<Ctx> {
impl<Ctx> ProposalBuilder<Ctx>
where
Ctx: Context + Debug,
{
pub async fn spawn(
ctx: Ctx,
value_builder: Box<dyn ValueBuilder<Ctx>>,
Expand All @@ -60,34 +83,49 @@ impl<Ctx: Context> ProposalBuilder<Ctx> {
timeout_duration: Duration,
address: Ctx::Address,
gossip_actor: Option<ActorRef<crate::consensus::Msg<Ctx>>>,
) -> Result<ProposedValue<Ctx>, ActorProcessingErr> {
part_store: &mut PartStore,
) -> Result<LocallyProposedValue<Ctx>, ActorProcessingErr> {
let value = self
.value_builder
.build_value_locally(height, round, timeout_duration, address, gossip_actor)
.build_value_locally(
height,
round,
timeout_duration,
address,
gossip_actor,
part_store,
)
.await;

Ok(ProposedValue {
height,
round,
value,
})
match value {
Some(value) => Ok(value),
None => {
todo!()
}
}
}

async fn build_value(
&self,
block_part: Ctx::BlockPart,
) -> Result<Option<ProposedValue<Ctx>>, ActorProcessingErr> {
// TODO
let _ = self
part_store: &mut PartStore,
) -> Result<Option<ReceivedProposedValue<Ctx>>, ActorProcessingErr> {
let value = self
.value_builder
.build_value_from_block_parts(block_part)
.build_value_from_block_parts(block_part, part_store)
.await;
Ok(None)
if value.is_some() {
info!(
"Value Builder received all parts, produced value {:?} for proposal",
value
);
}
Ok(value)
}
}

#[async_trait]
impl<Ctx: Context> Actor for ProposalBuilder<Ctx> {
impl<Ctx: Context + std::fmt::Debug> Actor for ProposalBuilder<Ctx> {
type Msg = Msg<Ctx>;
type State = State<Ctx>;
type Arguments = ();
Expand All @@ -97,7 +135,10 @@ impl<Ctx: Context> Actor for ProposalBuilder<Ctx> {
_myself: ActorRef<Self::Msg>,
_: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(State { gossip_actor: None })
Ok(State {
gossip_actor: None,
part_store: PartStore::new(),
})
}

async fn handle(
Expand All @@ -107,8 +148,12 @@ impl<Ctx: Context> Actor for ProposalBuilder<Ctx> {
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match msg {
Msg::Init { gossip_actor } => {
Msg::Init {
gossip_actor,
part_store,
} => {
state.gossip_actor = Some(gossip_actor);
state.part_store = part_store
}

Msg::GetValue {
Expand All @@ -125,13 +170,36 @@ impl<Ctx: Context> Actor for ProposalBuilder<Ctx> {
timeout_duration,
address,
state.gossip_actor.clone(),
&mut state.part_store,
)
.await?;
reply.send(value)?;
}

Msg::BlockPart(block_part) => {
self.build_value(block_part).await?;
let maybe_block = self.build_value(block_part, &mut state.part_store).await?;
// TODO - Send the proposed value (from blockparts) to Driver
// to be maybe multiplexed with the proposal (from consensus)
if let Some(value_assembled) = maybe_block {
state
.gossip_actor
.as_ref()
.unwrap()
.cast(crate::consensus::Msg::<Ctx>::BlockReceived(value_assembled))
.unwrap();
}
}

Msg::GetReceivedValue {
height,
round,
reply,
} => {
let value = self
.value_builder
.maybe_received_value(height, round, &mut state.part_store)
.await;
reply.send(value)?;
}
}

Expand Down
2 changes: 1 addition & 1 deletion code/actors/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod forward;
mod make_actor;
mod value_builder;
pub(crate) mod value_builder;

pub use forward::{forward, Forward};
pub use make_actor::make_node_actor;
Expand Down
Loading

0 comments on commit 3e3ff5a

Please sign in to comment.