Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(example): Add example app using channels #686

Merged
merged 21 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions code/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ members = [
# Starknet
"crates/starknet/*",
"crates/starknet/*",

# Examples
"examples/channel",
]

[workspace.package]
Expand Down
1 change: 1 addition & 0 deletions code/crates/app-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tracing.workspace = true

malachite-app.workspace = true
malachite-engine.workspace = true
malachite-config.workspace = true

[lints]
workspace = true
Expand Down
53 changes: 36 additions & 17 deletions code/crates/app-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@ use std::time::Duration;

use bytes::Bytes;
use derive_where::derive_where;
use tokio::sync::mpsc;
use tokio::sync::oneshot;

use malachite_engine::consensus::Msg as ConsensusActorMsg;
use malachite_engine::network::Msg as NetworkActorMsg;

use crate::app::types::core::{CommitCertificate, Context, Round, ValueId};
use crate::app::types::streaming::StreamMessage;
use crate::app::types::sync::DecidedValue;
use crate::app::types::{LocallyProposedValue, PeerId, ProposedValue};

pub type Reply<T> = oneshot::Sender<T>;

/// Channels created for application consumption
pub struct Channels<Ctx: Context> {
pub consensus: mpsc::Receiver<AppMsg<Ctx>>,
pub network: mpsc::Sender<NetworkMsg<Ctx>>,
}

/// Messages sent from consensus to the application.
#[derive_where(Debug)]
pub enum AppMsg<Ctx: Context> {
/// Consensus is ready
ConsensusReady {
reply_to: oneshot::Sender<ConsensusMsg<Ctx>>,
},
ConsensusReady { reply: Reply<ConsensusMsg<Ctx>> },

/// Consensus has started a new round.
StartedRound {
Expand All @@ -28,9 +38,8 @@ pub enum AppMsg<Ctx: Context> {
GetValue {
height: Ctx::Height,
round: Round,
timeout_duration: Duration,
address: Ctx::Address,
reply_to: oneshot::Sender<LocallyProposedValue<Ctx>>,
timeout: Duration,
reply: Reply<LocallyProposedValue<Ctx>>,
},

/// Request to restream an existing block/value from Driver
Expand All @@ -43,33 +52,31 @@ pub enum AppMsg<Ctx: Context> {
},

/// Request the earliest block height in the block store
GetEarliestBlockHeight {
reply_to: oneshot::Sender<Ctx::Height>,
},
GetHistoryMinHeight { reply: Reply<Ctx::Height> },

/// ProposalPart received <-- consensus <-- gossip
ReceivedProposalPart {
from: PeerId,
part: StreamMessage<Ctx::ProposalPart>,
reply_to: oneshot::Sender<ProposedValue<Ctx>>,
reply: Reply<ProposedValue<Ctx>>,
},

/// Get the validator set at a given height
GetValidatorSet {
height: Ctx::Height,
reply_to: oneshot::Sender<Ctx::ValidatorSet>,
reply: Reply<Ctx::ValidatorSet>,
},

// Consensus has decided on a value
Decided {
certificate: CommitCertificate<Ctx>,
reply_to: oneshot::Sender<ConsensusMsg<Ctx>>,
reply: Reply<ConsensusMsg<Ctx>>,
},

// Retrieve decided block from the block store
GetDecidedBlock {
GetDecidedValue {
height: Ctx::Height,
reply_to: oneshot::Sender<Option<DecidedValue<Ctx>>>,
reply: Reply<Option<DecidedValue<Ctx>>>,
},

// Synced block
Expand All @@ -78,7 +85,7 @@ pub enum AppMsg<Ctx: Context> {
round: Round,
validator_address: Ctx::Address,
value_bytes: Bytes,
reply_to: oneshot::Sender<ProposedValue<Ctx>>,
reply: Reply<ProposedValue<Ctx>>,
},
}

Expand All @@ -88,8 +95,6 @@ pub enum ConsensusMsg<Ctx: Context> {
StartHeight(Ctx::Height, Ctx::ValidatorSet),
}

use malachite_engine::consensus::Msg as ConsensusActorMsg;

impl<Ctx: Context> From<ConsensusMsg<Ctx>> for ConsensusActorMsg<Ctx> {
fn from(msg: ConsensusMsg<Ctx>) -> ConsensusActorMsg<Ctx> {
match msg {
Expand All @@ -99,3 +104,17 @@ impl<Ctx: Context> From<ConsensusMsg<Ctx>> for ConsensusActorMsg<Ctx> {
}
}
}

/// Messages sent from the application to consensus gossip.
#[derive_where(Debug)]
pub enum NetworkMsg<Ctx: Context> {
PublishProposalPart(StreamMessage<Ctx::ProposalPart>),
}

impl<Ctx: Context> From<NetworkMsg<Ctx>> for NetworkActorMsg<Ctx> {
fn from(msg: NetworkMsg<Ctx>) -> NetworkActorMsg<Ctx> {
match msg {
NetworkMsg::PublishProposalPart(part) => NetworkActorMsg::PublishProposalPart(part),
}
}
}
55 changes: 19 additions & 36 deletions code/crates/app-channel/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,9 @@ where
) -> Result<(), ActorProcessingErr> {
match msg {
HostMsg::ConsensusReady(consensus_ref) => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::ConsensusReady { reply_to: tx })
.await?;
self.sender.send(AppMsg::ConsensusReady { reply }).await?;

consensus_ref.cast(rx.await?.into())?;
}
Expand All @@ -79,19 +77,17 @@ where
HostMsg::GetValue {
height,
round,
timeout: timeout_duration,
address,
timeout,
reply_to,
} => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::GetValue {
height,
round,
timeout_duration,
address,
reply_to: tx,
timeout,
reply,
})
.await?;

Expand All @@ -116,11 +112,11 @@ where
.await?
}

HostMsg::GetEarliestBlockHeight { reply_to } => {
let (tx, rx) = oneshot::channel();
HostMsg::GetHistoryMinHeight { reply_to } => {
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::GetEarliestBlockHeight { reply_to: tx })
.send(AppMsg::GetHistoryMinHeight { reply })
.await?;

reply_to.send(rx.await?)?;
Expand All @@ -131,27 +127,20 @@ where
part,
reply_to,
} => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::ReceivedProposalPart {
from,
part,
reply_to: tx,
})
.send(AppMsg::ReceivedProposalPart { from, part, reply })
.await?;

reply_to.send(rx.await?)?;
}

HostMsg::GetValidatorSet { height, reply_to } => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::GetValidatorSet {
height,
reply_to: tx,
})
.send(AppMsg::GetValidatorSet { height, reply })
.await?;

reply_to.send(rx.await?)?;
Expand All @@ -161,26 +150,20 @@ where
certificate,
consensus: consensus_ref,
} => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::Decided {
certificate,
reply_to: tx,
})
.send(AppMsg::Decided { certificate, reply })
.await?;

consensus_ref.cast(rx.await?.into())?;
}

HostMsg::GetDecidedValue { height, reply_to } => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::GetDecidedBlock {
height,
reply_to: tx,
})
.send(AppMsg::GetDecidedValue { height, reply })
.await?;

reply_to.send(rx.await?)?;
Expand All @@ -193,15 +176,15 @@ where
value_bytes,
reply_to,
} => {
let (tx, rx) = oneshot::channel();
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::ProcessSyncedValue {
height,
round,
validator_address,
value_bytes,
reply_to: tx,
reply,
})
.await?;

Expand Down
2 changes: 1 addition & 1 deletion code/crates/app-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod connector;
pub mod spawn;

mod channel;
pub use channel::{AppMsg, ConsensusMsg};
pub use channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply};

mod run;
pub use run::run;
32 changes: 17 additions & 15 deletions code/crates/app-channel/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
//! Run Malachite consensus with the given configuration and context.
//! Provides the application with a channel for receiving messages from consensus.

use std::path::PathBuf;

use eyre::Result;
use tokio::sync::mpsc;

use crate::app;
use crate::app::types::codec::{ConsensusCodec, SyncCodec, WalCodec};
use crate::app::types::config::Config as NodeConfig;
use crate::app::types::core::Context;
use crate::app::types::metrics::{Metrics, SharedRegistry};
use crate::channel::AppMsg;
use crate::spawn::spawn_host_actor;
use crate::spawn::{spawn_host_actor, spawn_network_actor};
use crate::{app, Channels};

use malachite_app::{
spawn_consensus_actor, spawn_network_actor, spawn_sync_actor, spawn_wal_actor,
};
use malachite_app::{spawn_consensus_actor, spawn_sync_actor, spawn_wal_actor};
use malachite_engine::util::events::TxEvent;

#[tracing::instrument("node", skip_all, fields(moniker = %cfg.moniker))]
pub async fn run<Node, Ctx, Codec>(
cfg: NodeConfig,
start_height: Option<Ctx::Height>,
ctx: Ctx,
codec: Codec,
node: Node,
cfg: NodeConfig,
private_key_file: PathBuf,
start_height: Option<Ctx::Height>,
initial_validator_set: Ctx::ValidatorSet,
) -> Result<mpsc::Receiver<AppMsg<Ctx>>>
) -> Result<Channels<Ctx>>
where
Ctx: Context,
Node: app::Node<Context = Ctx>,
Expand All @@ -38,20 +37,20 @@ where
let registry = SharedRegistry::global().with_moniker(cfg.moniker.as_str());
let metrics = Metrics::register(&registry);

// TODO: Simplify this?
let private_key_file = node.load_private_key_file(node.get_home_dir())?;
let private_key_file = node.load_private_key_file(private_key_file)?;
let private_key = node.load_private_key(private_key_file);
let public_key = node.get_public_key(&private_key);
let address = node.get_address(&public_key);
let keypair = node.get_keypair(private_key);

// Spawn consensus gossip
let network = spawn_network_actor(&cfg, keypair, &registry, codec.clone()).await?;
let (network, network_tx) =
spawn_network_actor(&cfg, keypair, &registry, codec.clone()).await?;

let wal = spawn_wal_actor(&ctx, codec, &node.get_home_dir(), &registry).await?;

// Spawn the host actor
let (connector, rx) = spawn_host_actor(metrics.clone()).await?;
let (connector, consensus_rx) = spawn_host_actor(metrics.clone()).await?;

let sync = spawn_sync_actor(
ctx.clone(),
Expand All @@ -78,5 +77,8 @@ where
)
.await?;

Ok(rx)
Ok(Channels {
consensus: consensus_rx,
network: network_tx,
})
}
Loading
Loading