diff --git a/code/crates/app-channel/src/channel.rs b/code/crates/app-channel/src/channel.rs index f972949db..a73767fe7 100644 --- a/code/crates/app-channel/src/channel.rs +++ b/code/crates/app-channel/src/channel.rs @@ -13,6 +13,8 @@ use crate::app::types::streaming::StreamMessage; use crate::app::types::sync::DecidedValue; use crate::app::types::{LocallyProposedValue, PeerId, ProposedValue}; +pub type Reply = oneshot::Sender; + /// Channels created for application consumption pub struct Channels { pub consensus: mpsc::Receiver>, @@ -23,9 +25,7 @@ pub struct Channels { #[derive_where(Debug)] pub enum AppMsg { /// Consensus is ready - ConsensusReady { - reply_to: oneshot::Sender>, - }, + ConsensusReady { reply: Reply> }, /// Consensus has started a new round. StartedRound { @@ -38,9 +38,8 @@ pub enum AppMsg { GetValue { height: Ctx::Height, round: Round, - timeout_duration: Duration, - address: Ctx::Address, - reply_to: oneshot::Sender>, + timeout: Duration, + reply: Reply>, }, /// Request to restream an existing block/value from Driver @@ -53,33 +52,31 @@ pub enum AppMsg { }, /// Request the earliest block height in the block store - GetEarliestBlockHeight { - reply_to: oneshot::Sender, - }, + GetEarliestBlockHeight { reply: Reply }, /// ProposalPart received <-- consensus <-- gossip ReceivedProposalPart { from: PeerId, part: StreamMessage, - reply_to: oneshot::Sender>, + reply: Reply>, }, /// Get the validator set at a given height GetValidatorSet { height: Ctx::Height, - reply_to: oneshot::Sender, + reply: Reply, }, // Consensus has decided on a value Decided { certificate: CommitCertificate, - reply_to: oneshot::Sender>, + reply: Reply>, }, // Retrieve decided block from the block store - GetDecidedBlock { + GetDecidedValue { height: Ctx::Height, - reply_to: oneshot::Sender>>, + reply: Reply>>, }, // Synced block @@ -88,7 +85,7 @@ pub enum AppMsg { round: Round, validator_address: Ctx::Address, value_bytes: Bytes, - reply_to: oneshot::Sender>, + reply: Reply>, }, } diff --git a/code/crates/app-channel/src/connector.rs b/code/crates/app-channel/src/connector.rs index 6a8186bad..264767912 100644 --- a/code/crates/app-channel/src/connector.rs +++ b/code/crates/app-channel/src/connector.rs @@ -53,11 +53,9 @@ where ) -> Result<(), ActorProcessingErr> { match msg { HostMsg::ConsensusReady(consensus_ref) => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); - self.sender - .send(AppMsg::ConsensusReady { reply_to: tx }) - .await?; + self.sender.send(AppMsg::ConsensusReady { reply }).await?; consensus_ref.cast(rx.await?.into())?; } @@ -79,19 +77,17 @@ where HostMsg::GetValue { height, round, - timeout: timeout_duration, - address, + timeout, reply_to, } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender .send(AppMsg::GetValue { height, round, - timeout_duration, - address, - reply_to: tx, + timeout, + reply, }) .await?; @@ -117,10 +113,10 @@ where } HostMsg::GetEarliestBlockHeight { reply_to } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::GetEarliestBlockHeight { reply_to: tx }) + .send(AppMsg::GetEarliestBlockHeight { reply }) .await?; reply_to.send(rx.await?)?; @@ -131,27 +127,20 @@ where part, reply_to, } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::ReceivedProposalPart { - from, - part, - reply_to: tx, - }) + .send(AppMsg::ReceivedProposalPart { from, part, reply }) .await?; reply_to.send(rx.await?)?; } HostMsg::GetValidatorSet { height, reply_to } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::GetValidatorSet { - height, - reply_to: tx, - }) + .send(AppMsg::GetValidatorSet { height, reply }) .await?; reply_to.send(rx.await?)?; @@ -161,26 +150,20 @@ where certificate, consensus: consensus_ref, } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::Decided { - certificate, - reply_to: tx, - }) + .send(AppMsg::Decided { certificate, reply }) .await?; consensus_ref.cast(rx.await?.into())?; } HostMsg::GetDecidedValue { height, reply_to } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender - .send(AppMsg::GetDecidedBlock { - height, - reply_to: tx, - }) + .send(AppMsg::GetDecidedValue { height, reply }) .await?; reply_to.send(rx.await?)?; @@ -193,7 +176,7 @@ where value_bytes, reply_to, } => { - let (tx, rx) = oneshot::channel(); + let (reply, rx) = oneshot::channel(); self.sender .send(AppMsg::ProcessSyncedValue { @@ -201,7 +184,7 @@ where round, validator_address, value_bytes, - reply_to: tx, + reply, }) .await?; diff --git a/code/crates/app-channel/src/lib.rs b/code/crates/app-channel/src/lib.rs index 9aaadad78..4ac58aaf1 100644 --- a/code/crates/app-channel/src/lib.rs +++ b/code/crates/app-channel/src/lib.rs @@ -14,7 +14,7 @@ pub mod connector; pub mod spawn; mod channel; -pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg}; +pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply}; mod run; pub use run::run; diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs index da9f739f0..eb302109a 100644 --- a/code/examples/channel/src/app.rs +++ b/code/examples/channel/src/app.rs @@ -1,5 +1,5 @@ use eyre::eyre; -use tracing::{debug, error}; +use tracing::{error, info}; use malachite_app_channel::app::host::LocallyProposedValue; use malachite_app_channel::app::types::core::{Round, Validity}; @@ -16,10 +16,10 @@ pub async fn run( ) -> eyre::Result<()> { while let Some(msg) = channels.consensus.recv().await { match msg { - AppMsg::ConsensusReady { reply_to } => { - debug!("Consensus is ready"); + AppMsg::ConsensusReady { reply } => { + info!("Consensus is ready"); - if reply_to + if reply .send(ConsensusMsg::StartHeight( state.current_height, genesis.validator_set.clone(), @@ -35,6 +35,8 @@ pub async fn run( round, proposer, } => { + info!(%height, %round, %proposer, "Started round"); + state.current_height = height; state.current_round = round; state.current_proposer = Some(proposer); @@ -42,11 +44,12 @@ pub async fn run( AppMsg::GetValue { height, - round: _, - timeout_duration: _, - address: _, - reply_to, + round, + timeout: _, + reply, } => { + info!(%height, %round, "Get value"); + let proposal = state.propose_value(&height); let value = LocallyProposedValue::new( @@ -57,7 +60,7 @@ pub async fn run( ); // Send it to consensus - if reply_to.send(value.clone()).is_err() { + if reply.send(value.clone()).is_err() { error!("Failed to send GetValue reply"); } @@ -70,8 +73,8 @@ pub async fn run( .await?; } - AppMsg::GetEarliestBlockHeight { reply_to } => { - if reply_to.send(state.get_earliest_height()).is_err() { + AppMsg::GetEarliestBlockHeight { reply } => { + if reply.send(state.get_earliest_height()).is_err() { error!("Failed to send GetEarliestBlockHeight reply"); } } @@ -79,30 +82,25 @@ pub async fn run( AppMsg::ReceivedProposalPart { from: _, part, - reply_to, + reply, } => { if let Some(proposed_value) = state.add_proposal(part) { - if reply_to.send(proposed_value).is_err() { + if reply.send(proposed_value).is_err() { error!("Failed to send ReceivedProposalPart reply"); } } } - AppMsg::GetValidatorSet { - height: _, - reply_to, - } => { - if reply_to.send(genesis.validator_set.clone()).is_err() { + AppMsg::GetValidatorSet { height: _, reply } => { + if reply.send(genesis.validator_set.clone()).is_err() { error!("Failed to send GetValidatorSet reply"); } } - AppMsg::Decided { - certificate, - reply_to, - } => { + AppMsg::Decided { certificate, reply } => { state.commit_block(certificate); - if reply_to + + if reply .send(ConsensusMsg::StartHeight( state.current_height, genesis.validator_set.clone(), @@ -113,9 +111,10 @@ pub async fn run( } } - AppMsg::GetDecidedBlock { height, reply_to } => { - let block = state.get_block(&height).cloned(); - if reply_to.send(block).is_err() { + AppMsg::GetDecidedValue { height, reply } => { + let decided_value = state.get_decided_value(&height).cloned(); + + if reply.send(decided_value).is_err() { error!("Failed to send GetDecidedBlock reply"); } } @@ -125,11 +124,11 @@ pub async fn run( round, validator_address, value_bytes, - reply_to, + reply, } => { let value = decode_value(value_bytes); - if reply_to + if reply .send(ProposedValue { height, round, diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index c0f846c29..4b0fb9383 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -102,7 +102,7 @@ impl State { } /// Retrieves a decided block at the given height - pub fn get_block(&self, height: &Height) -> Option<&DecidedValue> { + pub fn get_decided_value(&self, height: &Height) -> Option<&DecidedValue> { self.blocks.get(height) }