From 96e76c4bff311e1b7c7072eed175445041739d03 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 19 Jun 2024 20:27:55 +0200 Subject: [PATCH] Move Protobuf encoding to and decoding from bytes into gossip crates --- code/crates/actors/Cargo.toml | 2 - code/crates/actors/src/consensus.rs | 24 +++--------- code/crates/actors/src/gossip_mempool.rs | 4 +- code/crates/actors/src/mempool.rs | 39 ++++--------------- code/crates/gossip-consensus/Cargo.toml | 3 ++ code/crates/gossip-consensus/src/lib.rs | 15 ++++--- .../src/msg.rs} | 9 +++++ code/crates/gossip-mempool/Cargo.toml | 2 + code/crates/gossip-mempool/src/lib.rs | 16 +++++--- code/crates/gossip-mempool/src/msg.rs | 25 ++++++++++++ 10 files changed, 74 insertions(+), 65 deletions(-) rename code/crates/{actors/src/consensus/network.rs => gossip-consensus/src/msg.rs} (90%) create mode 100644 code/crates/gossip-mempool/src/msg.rs diff --git a/code/crates/actors/Cargo.toml b/code/crates/actors/Cargo.toml index d6a6bf26f..038bdd1d9 100644 --- a/code/crates/actors/Cargo.toml +++ b/code/crates/actors/Cargo.toml @@ -22,8 +22,6 @@ malachite-vote.workspace = true async-trait = { workspace = true } derive-where = { workspace = true } libp2p = { workspace = true } -prost = { workspace = true } -prost-types = { workspace = true } ractor = { workspace = true, features = ["async-trait"] } rand = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/code/crates/actors/src/consensus.rs b/code/crates/actors/src/consensus.rs index c6b3916b3..9ae1e1052 100644 --- a/code/crates/actors/src/consensus.rs +++ b/code/crates/actors/src/consensus.rs @@ -16,7 +16,7 @@ use malachite_common::{ use malachite_driver::Driver; use malachite_driver::Input as DriverInput; use malachite_driver::Output as DriverOutput; -use malachite_gossip_consensus::{Channel, Event as GossipEvent, PeerId}; +use malachite_gossip_consensus::{Channel, Event as GossipEvent, NetworkMsg, PeerId}; use malachite_proto as proto; use malachite_proto::Protobuf; use malachite_vote::ThresholdParams; @@ -26,9 +26,6 @@ use crate::host::{HostMsg, HostRef, LocallyProposedValue, ReceivedProposedValue} use crate::timers::{Config as TimersConfig, Msg as TimersMsg, TimeoutElapsed, Timers, TimersRef}; use crate::util::forward; -mod network; -use network::NetworkMsg; - mod metrics; pub use metrics::Metrics; @@ -210,15 +207,9 @@ where myself: ActorRef>, state: &mut State, ) -> Result<(), ractor::ActorProcessingErr> { - if let GossipEvent::Message(from, _, data) = event { - match NetworkMsg::from_network_bytes(data) { - Ok(msg) => { - self.handle_network_msg(from, msg, myself, state).await?; - } - Err(e) => { - error!(%from, "Failed to decode message: {e}"); - } - } + if let GossipEvent::Message(from, msg) = event { + self.handle_network_msg(from, msg.clone(), myself, state) // FIXME: Clone + .await?; } Ok(()) @@ -819,12 +810,7 @@ where } } - GossipEvent::Message(_, _, data) => { - let Ok(msg) = NetworkMsg::from_network_bytes(data) else { - error!("Failed to decode message"); - return Ok(()); - }; - + GossipEvent::Message(_, msg) => { let Some(msg_height) = msg.msg_height() else { trace!("Received message without height, dropping"); return Ok(()); diff --git a/code/crates/actors/src/gossip_mempool.rs b/code/crates/actors/src/gossip_mempool.rs index 73b9a2295..b28cdff7b 100644 --- a/code/crates/actors/src/gossip_mempool.rs +++ b/code/crates/actors/src/gossip_mempool.rs @@ -12,11 +12,9 @@ use tracing::error; use malachite_common::MempoolTransactionBatch; use malachite_gossip_mempool::handle::CtrlHandle; -use malachite_gossip_mempool::{Channel, Config, Event, PeerId}; +use malachite_gossip_mempool::{Channel, Config, Event, NetworkMsg, PeerId}; use malachite_metrics::SharedRegistry; -use crate::mempool::NetworkMsg; - pub type GossipMempoolRef = ActorRef; pub struct GossipMempool; diff --git a/code/crates/actors/src/mempool.rs b/code/crates/actors/src/mempool.rs index f83db2949..7dc468df3 100644 --- a/code/crates/actors/src/mempool.rs +++ b/code/crates/actors/src/mempool.rs @@ -6,33 +6,15 @@ use async_trait::async_trait; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, RpcReplyPort}; use rand::distributions::Uniform; use rand::Rng; -use tracing::{error, info, trace}; +use tracing::{info, trace}; use malachite_common::{MempoolTransactionBatch, Transaction, TransactionBatch}; -use malachite_gossip_mempool::{Channel, Event as GossipEvent, PeerId}; +use malachite_gossip_mempool::{Channel, Event as GossipEvent, NetworkMsg, PeerId}; use malachite_node::config::{MempoolConfig, TestConfig}; -use malachite_proto::{Error as ProtoError, Protobuf}; use crate::gossip_mempool::{GossipMempoolRef, Msg as GossipMempoolMsg}; use crate::util::forward; -#[derive(Clone, Debug, PartialEq)] -pub enum NetworkMsg { - TransactionBatch(MempoolTransactionBatch), -} - -impl NetworkMsg { - pub fn from_network_bytes(bytes: &[u8]) -> Result { - Protobuf::from_bytes(bytes).map(NetworkMsg::TransactionBatch) - } - - pub fn to_network_bytes(&self) -> Result, ProtoError> { - match self { - NetworkMsg::TransactionBatch(batch) => batch.to_bytes(), - } - } -} - pub type MempoolRef = ActorRef; pub struct Mempool { @@ -132,17 +114,12 @@ impl Mempool { GossipEvent::PeerDisconnected(peer_id) => { info!("Disconnected from peer {peer_id}"); } - GossipEvent::Message(from, Channel::Mempool, data) => { - trace!(%from, "Received message of size {} bytes", data.len()); - - match NetworkMsg::from_network_bytes(data) { - Ok(msg) => { - self.handle_network_msg(from, msg, myself, state).await?; - } - Err(e) => { - error!("Failed to parse network message: {e}"); - } - } + GossipEvent::Message(from, msg) => { + // TODO: Implement Protobuf on NetworkMsg + // trace!(%from, "Received message of size {} bytes", msg.encoded_len()); + trace!(%from, "Received message"); + self.handle_network_msg(from, msg.clone(), myself, state) // FIXME: Clone + .await?; } } diff --git a/code/crates/gossip-consensus/Cargo.toml b/code/crates/gossip-consensus/Cargo.toml index 3b2fbb12c..4ecf2ddac 100644 --- a/code/crates/gossip-consensus/Cargo.toml +++ b/code/crates/gossip-consensus/Cargo.toml @@ -11,8 +11,11 @@ workspace = true [dependencies] malachite-metrics = { workspace = true } +malachite-proto = { workspace = true } futures = { workspace = true } libp2p = { workspace = true } +prost = { workspace = true } +prost-types = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/code/crates/gossip-consensus/src/lib.rs b/code/crates/gossip-consensus/src/lib.rs index 4227e690c..5af1c2568 100644 --- a/code/crates/gossip-consensus/src/lib.rs +++ b/code/crates/gossip-consensus/src/lib.rs @@ -22,6 +22,9 @@ pub use libp2p::{Multiaddr, PeerId}; pub mod behaviour; pub mod handle; +mod msg; +pub use msg::NetworkMsg; + use behaviour::{Behaviour, NetworkEvent}; use handle::Handle; @@ -92,7 +95,7 @@ impl Config { #[derive(Debug)] pub enum Event { Listening(Multiaddr), - Message(PeerId, Channel, Vec), + Message(PeerId, NetworkMsg), PeerConnected(PeerId), PeerDisconnected(PeerId), } @@ -310,10 +313,12 @@ async fn handle_swarm_event( message.data.len() ); - if let Err(e) = tx_event - .send(Event::Message(peer_id, channel, message.data)) - .await - { + let Ok(network_msg) = NetworkMsg::from_network_bytes(&message.data) else { + error!("Error decoding message {message_id} from {peer_id}: invalid format"); + return ControlFlow::Continue(()); + }; + + if let Err(e) = tx_event.send(Event::Message(peer_id, network_msg)).await { error!("Error sending message to handle: {e}"); return ControlFlow::Break(()); } diff --git a/code/crates/actors/src/consensus/network.rs b/code/crates/gossip-consensus/src/msg.rs similarity index 90% rename from code/crates/actors/src/consensus/network.rs rename to code/crates/gossip-consensus/src/msg.rs index 0fba130d8..11cc63bc9 100644 --- a/code/crates/actors/src/consensus/network.rs +++ b/code/crates/gossip-consensus/src/msg.rs @@ -5,6 +5,8 @@ use malachite_proto::Error as ProtoError; use malachite_proto::Protobuf; use malachite_proto::{SignedBlockPart, SignedProposal, SignedVote}; +use crate::Channel; + #[derive(Clone, Debug, PartialEq)] pub enum NetworkMsg { Vote(SignedVote), @@ -13,6 +15,13 @@ pub enum NetworkMsg { } impl NetworkMsg { + pub fn channel(&self) -> Channel { + match self { + NetworkMsg::Vote(_) | NetworkMsg::Proposal(_) => Channel::Consensus, + NetworkMsg::BlockPart(_) => Channel::BlockParts, + } + } + pub fn from_network_bytes(bytes: &[u8]) -> Result { Protobuf::from_bytes(bytes) } diff --git a/code/crates/gossip-mempool/Cargo.toml b/code/crates/gossip-mempool/Cargo.toml index 868915993..709d482c1 100644 --- a/code/crates/gossip-mempool/Cargo.toml +++ b/code/crates/gossip-mempool/Cargo.toml @@ -10,7 +10,9 @@ publish.workspace = true workspace = true [dependencies] +malachite-common = { workspace = true } malachite-metrics = { workspace = true } +malachite-proto = { workspace = true } futures = { workspace = true } libp2p = { workspace = true } diff --git a/code/crates/gossip-mempool/src/lib.rs b/code/crates/gossip-mempool/src/lib.rs index 01b6f6e1f..018bb5173 100644 --- a/code/crates/gossip-mempool/src/lib.rs +++ b/code/crates/gossip-mempool/src/lib.rs @@ -21,6 +21,10 @@ pub use libp2p::{Multiaddr, PeerId}; pub mod behaviour; pub mod handle; + +mod msg; +pub use msg::NetworkMsg; + use behaviour::{Behaviour, NetworkEvent}; use handle::Handle; @@ -93,7 +97,7 @@ pub struct State { #[derive(Debug)] pub enum Event { Listening(Multiaddr), - Message(PeerId, Channel, Vec), + Message(PeerId, NetworkMsg), PeerConnected(PeerId), PeerDisconnected(PeerId), } @@ -290,10 +294,12 @@ async fn handle_swarm_event( message.data.len() ); - if let Err(e) = tx_event - .send(Event::Message(peer_id, channel, message.data)) - .await - { + let Ok(network_msg) = NetworkMsg::from_network_bytes(&message.data) else { + error!("Error decoding message {message_id} from {peer_id}: invalid format"); + return ControlFlow::Continue(()); + }; + + if let Err(e) = tx_event.send(Event::Message(peer_id, network_msg)).await { error!("Error sending message to handle: {e}"); return ControlFlow::Break(()); } diff --git a/code/crates/gossip-mempool/src/msg.rs b/code/crates/gossip-mempool/src/msg.rs new file mode 100644 index 000000000..2e8e832b6 --- /dev/null +++ b/code/crates/gossip-mempool/src/msg.rs @@ -0,0 +1,25 @@ +use malachite_common::MempoolTransactionBatch; +use malachite_proto::{Error as ProtoError, Protobuf}; + +use crate::Channel; + +#[derive(Clone, Debug, PartialEq)] +pub enum NetworkMsg { + TransactionBatch(MempoolTransactionBatch), +} + +impl NetworkMsg { + pub fn channel(&self) -> Channel { + Channel::Mempool + } + + pub fn from_network_bytes(bytes: &[u8]) -> Result { + Protobuf::from_bytes(bytes).map(NetworkMsg::TransactionBatch) + } + + pub fn to_network_bytes(&self) -> Result, ProtoError> { + match self { + NetworkMsg::TransactionBatch(batch) => batch.to_bytes(), + } + } +}