Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Dec 17, 2024
1 parent 0d5a9be commit 30de7ea
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 80 deletions.
27 changes: 12 additions & 15 deletions code/crates/app-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::app::types::streaming::StreamMessage;
use crate::app::types::sync::DecidedValue;
use crate::app::types::{LocallyProposedValue, PeerId, ProposedValue};

pub type Reply<T> = oneshot::Sender<T>;

/// Channels created for application consumption
pub struct Channels<Ctx: Context> {
pub consensus: mpsc::Receiver<AppMsg<Ctx>>,
Expand All @@ -23,9 +25,7 @@ pub struct Channels<Ctx: Context> {
#[derive_where(Debug)]
pub enum AppMsg<Ctx: Context> {
/// Consensus is ready
ConsensusReady {
reply_to: oneshot::Sender<ConsensusMsg<Ctx>>,
},
ConsensusReady { reply: Reply<ConsensusMsg<Ctx>> },

/// Consensus has started a new round.
StartedRound {
Expand All @@ -38,9 +38,8 @@ pub enum AppMsg<Ctx: Context> {
GetValue {
height: Ctx::Height,
round: Round,
timeout_duration: Duration,
address: Ctx::Address,
reply_to: oneshot::Sender<LocallyProposedValue<Ctx>>,
timeout: Duration,
reply: Reply<LocallyProposedValue<Ctx>>,
},

/// Request to restream an existing block/value from Driver
Expand All @@ -53,33 +52,31 @@ pub enum AppMsg<Ctx: Context> {
},

/// Request the earliest block height in the block store
GetEarliestBlockHeight {
reply_to: oneshot::Sender<Ctx::Height>,
},
GetEarliestBlockHeight { reply: Reply<Ctx::Height> },

/// ProposalPart received <-- consensus <-- gossip
ReceivedProposalPart {
from: PeerId,
part: StreamMessage<Ctx::ProposalPart>,
reply_to: oneshot::Sender<ProposedValue<Ctx>>,
reply: Reply<ProposedValue<Ctx>>,
},

/// Get the validator set at a given height
GetValidatorSet {
height: Ctx::Height,
reply_to: oneshot::Sender<Ctx::ValidatorSet>,
reply: Reply<Ctx::ValidatorSet>,
},

// Consensus has decided on a value
Decided {
certificate: CommitCertificate<Ctx>,
reply_to: oneshot::Sender<ConsensusMsg<Ctx>>,
reply: Reply<ConsensusMsg<Ctx>>,
},

// Retrieve decided block from the block store
GetDecidedBlock {
GetDecidedValue {
height: Ctx::Height,
reply_to: oneshot::Sender<Option<DecidedValue<Ctx>>>,
reply: Reply<Option<DecidedValue<Ctx>>>,
},

// Synced block
Expand All @@ -88,7 +85,7 @@ pub enum AppMsg<Ctx: Context> {
round: Round,
validator_address: Ctx::Address,
value_bytes: Bytes,
reply_to: oneshot::Sender<ProposedValue<Ctx>>,
reply: Reply<ProposedValue<Ctx>>,
},
}

Expand Down
53 changes: 18 additions & 35 deletions code/crates/app-channel/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,9 @@ where
) -> Result<(), ActorProcessingErr> {
match msg {
HostMsg::ConsensusReady(consensus_ref) => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::ConsensusReady { reply_to: tx })
.await?;
self.sender.send(AppMsg::ConsensusReady { reply }).await?;

consensus_ref.cast(rx.await?.into())?;
}
Expand All @@ -79,19 +77,17 @@ where
HostMsg::GetValue {
height,
round,
timeout: timeout_duration,
address,
timeout,
reply_to,
} => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::GetValue {
height,
round,
timeout_duration,
address,
reply_to: tx,
timeout,
reply,
})
.await?;

Expand All @@ -117,10 +113,10 @@ where
}

HostMsg::GetEarliestBlockHeight { reply_to } => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::GetEarliestBlockHeight { reply_to: tx })
.send(AppMsg::GetEarliestBlockHeight { reply })
.await?;

reply_to.send(rx.await?)?;
Expand All @@ -131,27 +127,20 @@ where
part,
reply_to,
} => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::ReceivedProposalPart {
from,
part,
reply_to: tx,
})
.send(AppMsg::ReceivedProposalPart { from, part, reply })
.await?;

reply_to.send(rx.await?)?;
}

HostMsg::GetValidatorSet { height, reply_to } => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::GetValidatorSet {
height,
reply_to: tx,
})
.send(AppMsg::GetValidatorSet { height, reply })
.await?;

reply_to.send(rx.await?)?;
Expand All @@ -161,26 +150,20 @@ where
certificate,
consensus: consensus_ref,
} => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::Decided {
certificate,
reply_to: tx,
})
.send(AppMsg::Decided { certificate, reply })
.await?;

consensus_ref.cast(rx.await?.into())?;
}

HostMsg::GetDecidedValue { height, reply_to } => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::GetDecidedBlock {
height,
reply_to: tx,
})
.send(AppMsg::GetDecidedValue { height, reply })
.await?;

reply_to.send(rx.await?)?;
Expand All @@ -193,15 +176,15 @@ where
value_bytes,
reply_to,
} => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::ProcessSyncedValue {
height,
round,
validator_address,
value_bytes,
reply_to: tx,
reply,
})
.await?;

Expand Down
2 changes: 1 addition & 1 deletion code/crates/app-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod connector;
pub mod spawn;

mod channel;
pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg};
pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply};

mod run;
pub use run::run;
55 changes: 27 additions & 28 deletions code/examples/channel/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use eyre::eyre;
use tracing::{debug, error};
use tracing::{error, info};

use malachite_app_channel::app::host::LocallyProposedValue;
use malachite_app_channel::app::types::core::{Round, Validity};
Expand All @@ -16,10 +16,10 @@ pub async fn run(
) -> eyre::Result<()> {
while let Some(msg) = channels.consensus.recv().await {
match msg {
AppMsg::ConsensusReady { reply_to } => {
debug!("Consensus is ready");
AppMsg::ConsensusReady { reply } => {
info!("Consensus is ready");

if reply_to
if reply
.send(ConsensusMsg::StartHeight(
state.current_height,
genesis.validator_set.clone(),
Expand All @@ -35,18 +35,21 @@ pub async fn run(
round,
proposer,
} => {
info!(%height, %round, %proposer, "Started round");

state.current_height = height;
state.current_round = round;
state.current_proposer = Some(proposer);
}

AppMsg::GetValue {
height,
round: _,
timeout_duration: _,
address: _,
reply_to,
round,
timeout: _,
reply,
} => {
info!(%height, %round, "Get value");

let proposal = state.propose_value(&height);

let value = LocallyProposedValue::new(
Expand All @@ -57,7 +60,7 @@ pub async fn run(
);

// Send it to consensus
if reply_to.send(value.clone()).is_err() {
if reply.send(value.clone()).is_err() {
error!("Failed to send GetValue reply");
}

Expand All @@ -70,39 +73,34 @@ pub async fn run(
.await?;
}

AppMsg::GetEarliestBlockHeight { reply_to } => {
if reply_to.send(state.get_earliest_height()).is_err() {
AppMsg::GetEarliestBlockHeight { reply } => {
if reply.send(state.get_earliest_height()).is_err() {
error!("Failed to send GetEarliestBlockHeight reply");
}
}

AppMsg::ReceivedProposalPart {
from: _,
part,
reply_to,
reply,
} => {
if let Some(proposed_value) = state.add_proposal(part) {
if reply_to.send(proposed_value).is_err() {
if reply.send(proposed_value).is_err() {
error!("Failed to send ReceivedProposalPart reply");
}
}
}

AppMsg::GetValidatorSet {
height: _,
reply_to,
} => {
if reply_to.send(genesis.validator_set.clone()).is_err() {
AppMsg::GetValidatorSet { height: _, reply } => {
if reply.send(genesis.validator_set.clone()).is_err() {
error!("Failed to send GetValidatorSet reply");
}
}

AppMsg::Decided {
certificate,
reply_to,
} => {
AppMsg::Decided { certificate, reply } => {
state.commit_block(certificate);
if reply_to

if reply
.send(ConsensusMsg::StartHeight(
state.current_height,
genesis.validator_set.clone(),
Expand All @@ -113,9 +111,10 @@ pub async fn run(
}
}

AppMsg::GetDecidedBlock { height, reply_to } => {
let block = state.get_block(&height).cloned();
if reply_to.send(block).is_err() {
AppMsg::GetDecidedValue { height, reply } => {
let decided_value = state.get_decided_value(&height).cloned();

if reply.send(decided_value).is_err() {
error!("Failed to send GetDecidedBlock reply");
}
}
Expand All @@ -125,11 +124,11 @@ pub async fn run(
round,
validator_address,
value_bytes,
reply_to,
reply,
} => {
let value = decode_value(value_bytes);

if reply_to
if reply
.send(ProposedValue {
height,
round,
Expand Down
2 changes: 1 addition & 1 deletion code/examples/channel/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl State {
}

/// Retrieves a decided block at the given height
pub fn get_block(&self, height: &Height) -> Option<&DecidedValue<TestContext>> {
pub fn get_decided_value(&self, height: &Height) -> Option<&DecidedValue<TestContext>> {
self.blocks.get(height)
}

Expand Down

0 comments on commit 30de7ea

Please sign in to comment.