Skip to content

Commit

Permalink
feat(code/blocksync): Add support for large blocks to BlockSync (#508)
Browse files Browse the repository at this point in the history
* Disable idle connection timeout

* Rename variable

* Improve blocksync logging

* Fix for vote extension being added even when disabled

* Use custom length-prefixed codec instead of CBOR codec with hardcoded limit

* Allow configuring the maxium size of RPC and PubSub messages

* Cleanup mempool

* Fix CI

* Fix config file

* Fix clippy warnings

* Fix config file

* Re-add idle connection timeout
  • Loading branch information
romac authored Nov 4, 2024
1 parent 93f6f5d commit 1d800c3
Show file tree
Hide file tree
Showing 16 changed files with 268 additions and 86 deletions.
1 change: 1 addition & 0 deletions code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions code/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,34 @@ timeout_commit = "0s"
[consensus.p2p]

# Address to listen for incoming connections
# Override with MALACHITE__CONSENSUS__P2P__LISTEN_ADDR env variable
listen_addr = "/ip4/0.0.0.0/udp/0/quic-v1"

# List of nodes to keep persistent connections to
# Override with MALACHITE__CONSENSUS__P2P__PERSISTENT_PEERS env variable
persistent_peers = []

# Transport protocol to use for P2P communication
# Valid values:
# - "tcp": TCP + Noise
# - "quic": QUIC
# Override with MALACHITE__CONSENSUS__P2P__TRANSPORT env variable
transport = "quic"

# Enable the discovery protocol to find more peers
# Override with MALACHITE__CONSENSUS__P2P__DISCOVERY__ENABLED env variable
discovery = { enabled = true }

# The maximum size of messages to send over pub-sub
# Must be larger than the maximum block part size.
# Override with MALACHITE__CONSENSUS__P2P__PUBSUB_MAX_SIZE env variable
pubsub_max_size = "4 MiB"

# The maximum size of messages to send over RPC
# Must be larger than the maximum block size.
# Override with MALACHITE__CONSENSUS__P2P__RPC_MAX_SIZE env variable
rpc_max_size = "10 MiB"

#######################################################
### Consensus P2P Protocol Configuration Options ###
#######################################################
Expand Down Expand Up @@ -139,6 +153,10 @@ persistent_peers = []
# - "quic": QUIC
transport = "quic"

# These have no effects on the mempool yet
pubsub_max_size = "4 MiB"
rpc_max_size = "10 MiB"

#######################################################
### Mempool P2P Protocol Configuration Options ###
#######################################################
Expand Down
1 change: 1 addition & 0 deletions code/crates/blocksync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rust-version.workspace = true
malachite-common = { workspace = true }
malachite-metrics = { workspace = true }

async-trait = { workspace = true }
bytes = { workspace = true, features = ["serde"] }
dashmap = { workspace = true }
derive-where = { workspace = true }
Expand Down
55 changes: 47 additions & 8 deletions code/crates/blocksync/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,79 @@
use std::time::Duration;

use bytes::Bytes;
use displaydoc::Display;
use libp2p::metrics::Registry;
use libp2p::request_response::{self as rpc, OutboundRequestId, ProtocolSupport};
use libp2p::swarm::NetworkBehaviour;
use libp2p::{PeerId, StreamProtocol};

use crate::rpc::Codec;
use crate::types::{RawRequest, RawResponse, ResponseChannel};

// use crate::Metrics;

#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "Event")]
pub struct Behaviour {
rpc: rpc::cbor::Behaviour<RawRequest, RawResponse>,
rpc: rpc::Behaviour<Codec>,
}

pub type Event = rpc::Event<RawRequest, RawResponse>;

#[derive(Copy, Clone, Debug)]
pub struct Config {
pub request_timeout: Duration,
pub max_request_size: usize,
pub max_response_size: usize,
}

impl Config {
pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
self.request_timeout = request_timeout;
self
}

pub fn with_max_request_size(mut self, max_request_size: usize) -> Self {
self.max_request_size = max_request_size;
self
}

pub fn with_max_response_size(mut self, max_response_size: usize) -> Self {
self.max_response_size = max_response_size;
self
}
}

impl Default for Config {
fn default() -> Self {
Self {
request_timeout: Duration::from_secs(30),
max_request_size: 1024 * 1024, // 1 MiB
max_response_size: 512 * 1024 * 1024, // 512 MiB
}
}
}

impl Behaviour {
pub const PROTOCOL: [(StreamProtocol, ProtocolSupport); 1] = [(
StreamProtocol::new("/malachite-blocksync/v1beta1"),
ProtocolSupport::Full,
)];

pub fn new() -> Self {
let config = rpc::Config::default();
pub fn new(config: Config) -> Self {
let rpc_config = rpc::Config::default().with_request_timeout(config.request_timeout);

Self {
rpc: rpc::cbor::Behaviour::new(Self::PROTOCOL, config),
rpc: rpc::Behaviour::with_codec(Codec::new(config), Self::PROTOCOL, rpc_config),
// metrics: None,
}
}

pub fn new_with_metrics(_registry: &mut Registry) -> Self {
let config = rpc::Config::default();
pub fn new_with_metrics(config: Config, _registry: &mut Registry) -> Self {
let rpc_config = rpc::Config::default().with_request_timeout(config.request_timeout);

Self {
rpc: rpc::cbor::Behaviour::new(Self::PROTOCOL, config),
rpc: rpc::Behaviour::with_codec(Codec::new(config), Self::PROTOCOL, rpc_config),
// metrics: Some(Metrics::new(registry)),
}
}
Expand Down Expand Up @@ -63,6 +102,6 @@ impl core::error::Error for Error {}

impl Default for Behaviour {
fn default() -> Self {
Self::new()
Self::new(Config::default())
}
}
4 changes: 3 additions & 1 deletion code/crates/blocksync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod behaviour;
pub use behaviour::{Behaviour, Event};
pub use behaviour::{Behaviour, Config, Event};

mod codec;
pub use codec::NetworkCodec;
Expand All @@ -16,6 +16,8 @@ pub use types::{
Status, SyncedBlock,
};

mod rpc;

mod macros;

#[doc(hidden)]
Expand Down
114 changes: 114 additions & 0 deletions code/crates/blocksync/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use async_trait::async_trait;
use bytes::Bytes;
use libp2p::futures::{io, AsyncRead, AsyncWrite};
use libp2p::StreamProtocol;

use crate::behaviour::Config;
use crate::types::{RawRequest, RawResponse};

#[derive(Copy, Clone)]
pub struct Codec {
config: Config,
}

impl Codec {
pub fn new(config: Config) -> Self {
Self { config }
}
}

#[async_trait]
impl libp2p::request_response::Codec for Codec {
type Protocol = StreamProtocol;

type Request = RawRequest;
type Response = RawResponse;

async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
read_length_prefixed(io, self.config.max_request_size)
.await
.map(RawRequest)
}

async fn read_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
read_length_prefixed(io, self.config.max_response_size)
.await
.map(RawResponse)
}

async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, req.0, self.config.max_request_size).await
}

async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, res.0, self.config.max_response_size).await
}
}

const U32_LENGTH: usize = size_of::<u32>();

async fn write_length_prefixed<T>(dst: &mut T, data: Bytes, max_len: usize) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
use io::AsyncWriteExt;

let len = data.len();
if len > max_len || len > u32::MAX as usize {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"data too large",
));
}

dst.write_all(&(len as u32).to_be_bytes()).await?;
dst.write_all(&data).await?;
dst.flush().await?;

Ok(())
}

async fn read_length_prefixed<T>(src: &mut T, max_len: usize) -> io::Result<Bytes>
where
T: AsyncRead + Unpin + Send,
{
use io::AsyncReadExt;

let mut len_bytes = [0u8; U32_LENGTH];
src.read_exact(&mut len_bytes).await?;
let len = u32::from_be_bytes(len_bytes) as usize;

if len > max_len {
return Err(io::Error::new(io::ErrorKind::InvalidData, "data too large"));
}

let mut data = vec![0u8; len];
src.read_exact(&mut data).await?;
Ok(Bytes::from(data))
}
4 changes: 4 additions & 0 deletions code/crates/cli/src/cmd/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ pub fn generate_config(
enabled: enable_discovery,
},
transport,
rpc_max_size: ByteSize::mib(10),
pubsub_max_size: ByteSize::mib(4),
},
},
mempool: MempoolConfig {
Expand All @@ -254,6 +256,8 @@ pub fn generate_config(
.collect(),
discovery: DiscoveryConfig { enabled: false },
transport,
rpc_max_size: ByteSize::mib(10),
pubsub_max_size: ByteSize::mib(4),
},
max_tx_count: 10000,
gossip_batch_size: 0,
Expand Down
6 changes: 6 additions & 0 deletions code/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ pub struct P2pConfig {

/// The type of pub-sub protocol to use for consensus
pub protocol: PubSubProtocol,

/// The maximum size of messages to send over pub-sub
pub pubsub_max_size: ByteSize,

/// The maximum size of messages to send over RPC
pub rpc_max_size: ByteSize,
}

/// Peer Discovery configuration options
Expand Down
Loading

0 comments on commit 1d800c3

Please sign in to comment.