Skip to content

Commit

Permalink
Move Protobuf encoding to and decoding from bytes into gossip crates
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Jun 19, 2024
1 parent 30b5b3e commit 96e76c4
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 65 deletions.
2 changes: 0 additions & 2 deletions code/crates/actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ malachite-vote.workspace = true
async-trait = { workspace = true }
derive-where = { workspace = true }
libp2p = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
ractor = { workspace = true, features = ["async-trait"] }
rand = { workspace = true }
tokio = { workspace = true, features = ["full"] }
Expand Down
24 changes: 5 additions & 19 deletions code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use malachite_common::{
use malachite_driver::Driver;
use malachite_driver::Input as DriverInput;
use malachite_driver::Output as DriverOutput;
use malachite_gossip_consensus::{Channel, Event as GossipEvent, PeerId};
use malachite_gossip_consensus::{Channel, Event as GossipEvent, NetworkMsg, PeerId};
use malachite_proto as proto;
use malachite_proto::Protobuf;
use malachite_vote::ThresholdParams;
Expand All @@ -26,9 +26,6 @@ use crate::host::{HostMsg, HostRef, LocallyProposedValue, ReceivedProposedValue}
use crate::timers::{Config as TimersConfig, Msg as TimersMsg, TimeoutElapsed, Timers, TimersRef};
use crate::util::forward;

mod network;
use network::NetworkMsg;

mod metrics;
pub use metrics::Metrics;

Expand Down Expand Up @@ -210,15 +207,9 @@ where
myself: ActorRef<Msg<Ctx>>,
state: &mut State<Ctx>,
) -> Result<(), ractor::ActorProcessingErr> {
if let GossipEvent::Message(from, _, data) = event {
match NetworkMsg::from_network_bytes(data) {
Ok(msg) => {
self.handle_network_msg(from, msg, myself, state).await?;
}
Err(e) => {
error!(%from, "Failed to decode message: {e}");
}
}
if let GossipEvent::Message(from, msg) = event {
self.handle_network_msg(from, msg.clone(), myself, state) // FIXME: Clone
.await?;
}

Ok(())
Expand Down Expand Up @@ -819,12 +810,7 @@ where
}
}

GossipEvent::Message(_, _, data) => {
let Ok(msg) = NetworkMsg::from_network_bytes(data) else {
error!("Failed to decode message");
return Ok(());
};

GossipEvent::Message(_, msg) => {
let Some(msg_height) = msg.msg_height() else {
trace!("Received message without height, dropping");
return Ok(());
Expand Down
4 changes: 1 addition & 3 deletions code/crates/actors/src/gossip_mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ use tracing::error;

use malachite_common::MempoolTransactionBatch;
use malachite_gossip_mempool::handle::CtrlHandle;
use malachite_gossip_mempool::{Channel, Config, Event, PeerId};
use malachite_gossip_mempool::{Channel, Config, Event, NetworkMsg, PeerId};
use malachite_metrics::SharedRegistry;

use crate::mempool::NetworkMsg;

pub type GossipMempoolRef = ActorRef<Msg>;

pub struct GossipMempool;
Expand Down
39 changes: 8 additions & 31 deletions code/crates/actors/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,15 @@ use async_trait::async_trait;
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, RpcReplyPort};
use rand::distributions::Uniform;
use rand::Rng;
use tracing::{error, info, trace};
use tracing::{info, trace};

use malachite_common::{MempoolTransactionBatch, Transaction, TransactionBatch};
use malachite_gossip_mempool::{Channel, Event as GossipEvent, PeerId};
use malachite_gossip_mempool::{Channel, Event as GossipEvent, NetworkMsg, PeerId};
use malachite_node::config::{MempoolConfig, TestConfig};
use malachite_proto::{Error as ProtoError, Protobuf};

use crate::gossip_mempool::{GossipMempoolRef, Msg as GossipMempoolMsg};
use crate::util::forward;

#[derive(Clone, Debug, PartialEq)]
pub enum NetworkMsg {
TransactionBatch(MempoolTransactionBatch),
}

impl NetworkMsg {
pub fn from_network_bytes(bytes: &[u8]) -> Result<Self, ProtoError> {
Protobuf::from_bytes(bytes).map(NetworkMsg::TransactionBatch)
}

pub fn to_network_bytes(&self) -> Result<Vec<u8>, ProtoError> {
match self {
NetworkMsg::TransactionBatch(batch) => batch.to_bytes(),
}
}
}

pub type MempoolRef = ActorRef<MempoolMsg>;

pub struct Mempool {
Expand Down Expand Up @@ -132,17 +114,12 @@ impl Mempool {
GossipEvent::PeerDisconnected(peer_id) => {
info!("Disconnected from peer {peer_id}");
}
GossipEvent::Message(from, Channel::Mempool, data) => {
trace!(%from, "Received message of size {} bytes", data.len());

match NetworkMsg::from_network_bytes(data) {
Ok(msg) => {
self.handle_network_msg(from, msg, myself, state).await?;
}
Err(e) => {
error!("Failed to parse network message: {e}");
}
}
GossipEvent::Message(from, msg) => {
// TODO: Implement Protobuf on NetworkMsg
// trace!(%from, "Received message of size {} bytes", msg.encoded_len());
trace!(%from, "Received message");
self.handle_network_msg(from, msg.clone(), myself, state) // FIXME: Clone
.await?;
}
}

Expand Down
3 changes: 3 additions & 0 deletions code/crates/gossip-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ workspace = true

[dependencies]
malachite-metrics = { workspace = true }
malachite-proto = { workspace = true }

futures = { workspace = true }
libp2p = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
15 changes: 10 additions & 5 deletions code/crates/gossip-consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub use libp2p::{Multiaddr, PeerId};
pub mod behaviour;
pub mod handle;

mod msg;
pub use msg::NetworkMsg;

use behaviour::{Behaviour, NetworkEvent};
use handle::Handle;

Expand Down Expand Up @@ -92,7 +95,7 @@ impl Config {
#[derive(Debug)]
pub enum Event {
Listening(Multiaddr),
Message(PeerId, Channel, Vec<u8>),
Message(PeerId, NetworkMsg),
PeerConnected(PeerId),
PeerDisconnected(PeerId),
}
Expand Down Expand Up @@ -310,10 +313,12 @@ async fn handle_swarm_event(
message.data.len()
);

if let Err(e) = tx_event
.send(Event::Message(peer_id, channel, message.data))
.await
{
let Ok(network_msg) = NetworkMsg::from_network_bytes(&message.data) else {
error!("Error decoding message {message_id} from {peer_id}: invalid format");
return ControlFlow::Continue(());
};

if let Err(e) = tx_event.send(Event::Message(peer_id, network_msg)).await {
error!("Error sending message to handle: {e}");
return ControlFlow::Break(());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use malachite_proto::Error as ProtoError;
use malachite_proto::Protobuf;
use malachite_proto::{SignedBlockPart, SignedProposal, SignedVote};

use crate::Channel;

#[derive(Clone, Debug, PartialEq)]
pub enum NetworkMsg {
Vote(SignedVote),
Expand All @@ -13,6 +15,13 @@ pub enum NetworkMsg {
}

impl NetworkMsg {
pub fn channel(&self) -> Channel {
match self {
NetworkMsg::Vote(_) | NetworkMsg::Proposal(_) => Channel::Consensus,
NetworkMsg::BlockPart(_) => Channel::BlockParts,
}
}

pub fn from_network_bytes(bytes: &[u8]) -> Result<Self, ProtoError> {
Protobuf::from_bytes(bytes)
}
Expand Down
2 changes: 2 additions & 0 deletions code/crates/gossip-mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ publish.workspace = true
workspace = true

[dependencies]
malachite-common = { workspace = true }
malachite-metrics = { workspace = true }
malachite-proto = { workspace = true }

futures = { workspace = true }
libp2p = { workspace = true }
Expand Down
16 changes: 11 additions & 5 deletions code/crates/gossip-mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub use libp2p::{Multiaddr, PeerId};

pub mod behaviour;
pub mod handle;

mod msg;
pub use msg::NetworkMsg;

use behaviour::{Behaviour, NetworkEvent};
use handle::Handle;

Expand Down Expand Up @@ -93,7 +97,7 @@ pub struct State {
#[derive(Debug)]
pub enum Event {
Listening(Multiaddr),
Message(PeerId, Channel, Vec<u8>),
Message(PeerId, NetworkMsg),
PeerConnected(PeerId),
PeerDisconnected(PeerId),
}
Expand Down Expand Up @@ -290,10 +294,12 @@ async fn handle_swarm_event(
message.data.len()
);

if let Err(e) = tx_event
.send(Event::Message(peer_id, channel, message.data))
.await
{
let Ok(network_msg) = NetworkMsg::from_network_bytes(&message.data) else {
error!("Error decoding message {message_id} from {peer_id}: invalid format");
return ControlFlow::Continue(());
};

if let Err(e) = tx_event.send(Event::Message(peer_id, network_msg)).await {
error!("Error sending message to handle: {e}");
return ControlFlow::Break(());
}
Expand Down
25 changes: 25 additions & 0 deletions code/crates/gossip-mempool/src/msg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use malachite_common::MempoolTransactionBatch;
use malachite_proto::{Error as ProtoError, Protobuf};

use crate::Channel;

#[derive(Clone, Debug, PartialEq)]
pub enum NetworkMsg {
TransactionBatch(MempoolTransactionBatch),
}

impl NetworkMsg {
pub fn channel(&self) -> Channel {
Channel::Mempool
}

pub fn from_network_bytes(bytes: &[u8]) -> Result<Self, ProtoError> {
Protobuf::from_bytes(bytes).map(NetworkMsg::TransactionBatch)
}

pub fn to_network_bytes(&self) -> Result<Vec<u8>, ProtoError> {
match self {
NetworkMsg::TransactionBatch(batch) => batch.to_bytes(),
}
}
}

0 comments on commit 96e76c4

Please sign in to comment.