Skip to content

Commit

Permalink
Implement TCP-based broadcast network
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Feb 15, 2024
1 parent 1f8700a commit 51b7503
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 3 deletions.
3 changes: 3 additions & 0 deletions code/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"common",
"driver",
"itf",
"node",
"round",
"test",
"vote",
Expand All @@ -31,3 +32,5 @@ serde_json = "1.0"
serde_with = "3.4"
sha2 = "0.10.8"
signature = "2.1.0"
tokio = "1.35.1"
tokio-stream = "0.1"
2 changes: 1 addition & 1 deletion code/common/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
/// that are used in the consensus engine.
pub trait Context
where
Self: Sized,
Self: Sized + Send + Sync + 'static,
{
/// The type of address of a validator.
type Address: Address;
Expand Down
2 changes: 1 addition & 1 deletion code/common/src/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{Context, Round};
/// Defines the requirements for a proposal type.
pub trait Proposal<Ctx>
where
Self: Clone + Debug + PartialEq + Eq,
Self: Clone + Debug + Eq + Send + Sync + 'static,
Ctx: Context,
{
/// The height for which the proposal is for.
Expand Down
2 changes: 1 addition & 1 deletion code/common/src/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub enum VoteType {
/// include information about the validator signing it.
pub trait Vote<Ctx>
where
Self: Clone + Debug + Eq,
Self: Clone + Debug + Eq + Send + Sync + 'static,
Ctx: Context,
{
/// The height for which the vote is for.
Expand Down
22 changes: 22 additions & 0 deletions code/node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "malachite-node"
description = "Node for running the Malachite consensus engine"

version.workspace = true
edition.workspace = true
repository.workspace = true
license.workspace = true
publish.workspace = true

[dependencies]
malachite-common = { version = "0.1.0", path = "../common" }
malachite-driver = { version = "0.1.0", path = "../driver" }
malachite-round = { version = "0.1.0", path = "../round" }
malachite-vote = { version = "0.1.0", path = "../vote" }

futures = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true }

[dev-dependencies]
malachite-test = { version = "0.1.0", path = "../test" }
2 changes: 2 additions & 0 deletions code/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod network;
pub mod node;
19 changes: 19 additions & 0 deletions code/node/src/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use malachite_common::Context;

pub mod broadcast;
mod msg;

pub use self::msg::Msg;

#[allow(async_fn_in_trait)]
pub trait Network<Ctx: Context> {
async fn recv(&mut self) -> Option<Msg<Ctx>>;
async fn broadcast(&mut self, msg: Msg<Ctx>);

async fn broadcast_vote(&mut self, vote: Ctx::Vote) {
self.broadcast(Msg::Vote(vote)).await
}
async fn broadcast_proposal(&mut self, proposal: Ctx::Proposal) {
self.broadcast(Msg::Proposal(proposal)).await
}
}
251 changes: 251 additions & 0 deletions code/node/src/network/broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
use core::fmt;
use std::fmt::Debug;
use std::net::SocketAddr;

use futures::channel::oneshot;
use malachite_common::Context;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc};

use super::Msg;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct PeerId(String);

impl fmt::Display for PeerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}

pub enum PeerEvent<Ctx: Context> {
ConnectToPeer(PeerInfo, oneshot::Sender<()>),
Broadcast(Msg<Ctx>, oneshot::Sender<()>),
}

impl<Ctx: Context> Debug for PeerEvent<Ctx> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PeerEvent::ConnectToPeer(peer_info, _) => {
write!(f, "ConnectToPeer({peer_info:?})")
}
PeerEvent::Broadcast(msg, _) => {
write!(f, "Broadcast({msg:?})")
}
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct PeerInfo {
pub id: PeerId,
pub addr: SocketAddr,
}

pub struct Peer<Ctx: Context> {
id: PeerId,
addr: SocketAddr,
_marker: std::marker::PhantomData<Ctx>,
}

impl<Ctx: Context> Peer<Ctx> {
pub fn new(info: PeerInfo) -> Self {
Self {
id: info.id,
addr: info.addr,
_marker: std::marker::PhantomData,
}
}

pub async fn run(self) -> Handle<Ctx> {
let (tx_peer_event, mut rx_peer_event) = mpsc::channel::<PeerEvent<Ctx>>(16);
let (tx_msg, rx_msg) = mpsc::channel::<(PeerId, Msg<Ctx>)>(16);
let (tx_broadcast_to_peers, _) = broadcast::channel::<(PeerId, Msg<Ctx>)>(16);
let (tx_spawned, rx_spawned) = oneshot::channel();

tokio::spawn(listen(self.id.clone(), self.addr, tx_spawned, tx_msg));

let id = self.id.clone();
tokio::spawn(async move {
while let Some(event) = rx_peer_event.recv().await {
match event {
PeerEvent::ConnectToPeer(peer_info, done) => {
connect_to_peer(id.clone(), peer_info, done, &tx_broadcast_to_peers).await;
}

PeerEvent::Broadcast(msg, done) => {
println!("[{id}] Broadcasting message: {msg:?}");
tx_broadcast_to_peers.send((id.clone(), msg)).unwrap();
done.send(()).unwrap();
}
}
}
});

rx_spawned.await.unwrap();

Handle {
peer_id: self.id,
rx_msg,
tx_peer_event,
}
}
}

async fn connect_to_peer<Ctx: Context>(
id: PeerId,
peer_info: PeerInfo,
done: oneshot::Sender<()>,
per_peer_tx: &broadcast::Sender<(PeerId, Msg<Ctx>)>,
) {
println!("[{id}] Connecting to {peer_info:?}...");

let mut stream = TcpStream::connect(peer_info.addr).await.unwrap();
done.send(()).unwrap();

let mut per_peer_rx = per_peer_tx.subscribe();

tokio::spawn(async move {
loop {
let (from, msg) = per_peer_rx.recv().await.unwrap();
if from == peer_info.id {
continue;
}

println!("[{id}] Sending message to {peer_info:?}: {msg:?}");

let bytes = msg.as_bytes();
stream.write_u32(bytes.len() as u32).await.unwrap();
stream.write_all(&bytes).await.unwrap();
stream.flush().await.unwrap();
}
});
}

async fn listen<Ctx: Context>(
id: PeerId,
addr: SocketAddr,
tx_spawned: oneshot::Sender<()>,
tx_msg: mpsc::Sender<(PeerId, Msg<Ctx>)>,
) -> ! {
let listener = TcpListener::bind(addr).await.unwrap();
println!("[{id}] Listening on {addr}...");

tx_spawned.send(()).unwrap();

loop {
let (mut socket, _) = listener.accept().await.unwrap();

println!(
"[{id}] Accepted connection from {peer}...",
peer = socket.peer_addr().unwrap()
);

let id = id.clone();
let tx_msg = tx_msg.clone();

tokio::spawn(async move {
let len = socket.read_u32().await.unwrap();
let mut buf = vec![0; len as usize];
socket.read_exact(&mut buf).await.unwrap();
let msg: Msg<Ctx> = Msg::from_bytes(&buf);

println!(
"[{id}] Received message from {peer}: {msg:?}",
peer = socket.peer_addr().unwrap(),
);

tx_msg.send((id.clone(), msg)).await.unwrap(); // FIXME
});
}
}

pub struct Handle<Ctx: Context> {
peer_id: PeerId,
rx_msg: mpsc::Receiver<(PeerId, Msg<Ctx>)>,
tx_peer_event: mpsc::Sender<PeerEvent<Ctx>>,
}

impl<Ctx: Context> Handle<Ctx> {
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}

pub async fn recv(&mut self) -> Option<(PeerId, Msg<Ctx>)> {
self.rx_msg.recv().await
}

pub async fn broadcast(&self, msg: Msg<Ctx>) {
let (tx_done, rx_done) = oneshot::channel();

self.tx_peer_event
.send(PeerEvent::Broadcast(msg, tx_done))
.await
.unwrap();

rx_done.await.unwrap();
}

pub async fn connect_to_peer(&self, peer_info: PeerInfo) {
let (tx_done, rx_done) = oneshot::channel();

self.tx_peer_event
.send(PeerEvent::ConnectToPeer(peer_info, tx_done))
.await
.unwrap();

rx_done.await.unwrap();
}
}

#[cfg(test)]
mod tests {
use super::*;
use malachite_test::TestContext;

#[tokio::test]
async fn test_peer() {
let peer1_id = PeerId("peer-1".to_string());
let peer1_info = PeerInfo {
id: peer1_id.clone(),
addr: "127.0.0.1:12001".parse().unwrap(),
};

let peer2_id = PeerId("peer-2".to_string());
let peer2_info = PeerInfo {
id: peer2_id.clone(),
addr: "127.0.0.1:12002".parse().unwrap(),
};

let peer3_id = PeerId("peer-3".to_string());
let peer3_info = PeerInfo {
id: peer3_id.clone(),
addr: "127.0.0.1:12003".parse().unwrap(),
};

let peer1: Peer<TestContext> = Peer::new(peer1_info.clone());
let peer2: Peer<TestContext> = Peer::new(peer2_info.clone());
let peer3: Peer<TestContext> = Peer::new(peer3_info.clone());

let handle1 = peer1.run().await;
let mut handle2 = peer2.run().await;
let mut handle3 = peer3.run().await;

handle1.connect_to_peer(peer2_info.clone()).await;
handle1.connect_to_peer(peer3_info.clone()).await;

handle2.connect_to_peer(peer1_info.clone()).await;
handle2.connect_to_peer(peer3_info.clone()).await;

handle3.connect_to_peer(peer1_info.clone()).await;
handle3.connect_to_peer(peer2_info.clone()).await;

handle1.broadcast(Msg::Dummy(1)).await;

let msg2 = handle2.recv().await.unwrap();
dbg!(&msg2);
let msg3 = handle3.recv().await.unwrap();
dbg!(&msg3);
}
}
Loading

0 comments on commit 51b7503

Please sign in to comment.