diff --git a/code/Cargo.lock b/code/Cargo.lock index 4ffce89d5..bf8c221ce 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2439,6 +2439,7 @@ dependencies = [ name = "malachite-blocksync" version = "0.1.0" dependencies = [ + "async-trait", "bytes", "dashmap", "derive-where", diff --git a/code/config.toml b/code/config.toml index 1b11a8634..0c02c77fd 100644 --- a/code/config.toml +++ b/code/config.toml @@ -70,20 +70,34 @@ timeout_commit = "0s" [consensus.p2p] # Address to listen for incoming connections +# Override with MALACHITE__CONSENSUS__P2P__LISTEN_ADDR env variable listen_addr = "/ip4/0.0.0.0/udp/0/quic-v1" # List of nodes to keep persistent connections to +# Override with MALACHITE__CONSENSUS__P2P__PERSISTENT_PEERS env variable persistent_peers = [] # Transport protocol to use for P2P communication # Valid values: # - "tcp": TCP + Noise # - "quic": QUIC +# Override with MALACHITE__CONSENSUS__P2P__TRANSPORT env variable transport = "quic" # Enable the discovery protocol to find more peers +# Override with MALACHITE__CONSENSUS__P2P__DISCOVERY__ENABLED env variable discovery = { enabled = true } +# The maximum size of messages to send over pub-sub +# Must be larger than the maximum block part size. +# Override with MALACHITE__CONSENSUS__P2P__PUBSUB_MAX_SIZE env variable +pubsub_max_size = "4 MiB" + +# The maximum size of messages to send over RPC +# Must be larger than the maximum block size. +# Override with MALACHITE__CONSENSUS__P2P__RPC_MAX_SIZE env variable +rpc_max_size = "10 MiB" + ####################################################### ### Consensus P2P Protocol Configuration Options ### ####################################################### @@ -139,6 +153,10 @@ persistent_peers = [] # - "quic": QUIC transport = "quic" +# These have no effects on the mempool yet +pubsub_max_size = "4 MiB" +rpc_max_size = "10 MiB" + ####################################################### ### Mempool P2P Protocol Configuration Options ### ####################################################### diff --git a/code/crates/blocksync/Cargo.toml b/code/crates/blocksync/Cargo.toml index 1a5204bc2..ede0b1bf0 100644 --- a/code/crates/blocksync/Cargo.toml +++ b/code/crates/blocksync/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true malachite-common = { workspace = true } malachite-metrics = { workspace = true } +async-trait = { workspace = true } bytes = { workspace = true, features = ["serde"] } dashmap = { workspace = true } derive-where = { workspace = true } diff --git a/code/crates/blocksync/src/behaviour.rs b/code/crates/blocksync/src/behaviour.rs index 636c0da6c..96317d100 100644 --- a/code/crates/blocksync/src/behaviour.rs +++ b/code/crates/blocksync/src/behaviour.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use bytes::Bytes; use displaydoc::Display; use libp2p::metrics::Registry; @@ -5,6 +7,7 @@ use libp2p::request_response::{self as rpc, OutboundRequestId, ProtocolSupport}; use libp2p::swarm::NetworkBehaviour; use libp2p::{PeerId, StreamProtocol}; +use crate::rpc::Codec; use crate::types::{RawRequest, RawResponse, ResponseChannel}; // use crate::Metrics; @@ -12,29 +15,65 @@ use crate::types::{RawRequest, RawResponse, ResponseChannel}; #[derive(NetworkBehaviour)] #[behaviour(to_swarm = "Event")] pub struct Behaviour { - rpc: rpc::cbor::Behaviour, + rpc: rpc::Behaviour, } pub type Event = rpc::Event; +#[derive(Copy, Clone, Debug)] +pub struct Config { + pub request_timeout: Duration, + pub max_request_size: usize, + pub max_response_size: usize, +} + +impl Config { + pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self { + self.request_timeout = request_timeout; + self + } + + pub fn with_max_request_size(mut self, max_request_size: usize) -> Self { + self.max_request_size = max_request_size; + self + } + + pub fn with_max_response_size(mut self, max_response_size: usize) -> Self { + self.max_response_size = max_response_size; + self + } +} + +impl Default for Config { + fn default() -> Self { + Self { + request_timeout: Duration::from_secs(30), + max_request_size: 1024 * 1024, // 1 MiB + max_response_size: 512 * 1024 * 1024, // 512 MiB + } + } +} + impl Behaviour { pub const PROTOCOL: [(StreamProtocol, ProtocolSupport); 1] = [( StreamProtocol::new("/malachite-blocksync/v1beta1"), ProtocolSupport::Full, )]; - pub fn new() -> Self { - let config = rpc::Config::default(); + pub fn new(config: Config) -> Self { + let rpc_config = rpc::Config::default().with_request_timeout(config.request_timeout); + Self { - rpc: rpc::cbor::Behaviour::new(Self::PROTOCOL, config), + rpc: rpc::Behaviour::with_codec(Codec::new(config), Self::PROTOCOL, rpc_config), // metrics: None, } } - pub fn new_with_metrics(_registry: &mut Registry) -> Self { - let config = rpc::Config::default(); + pub fn new_with_metrics(config: Config, _registry: &mut Registry) -> Self { + let rpc_config = rpc::Config::default().with_request_timeout(config.request_timeout); + Self { - rpc: rpc::cbor::Behaviour::new(Self::PROTOCOL, config), + rpc: rpc::Behaviour::with_codec(Codec::new(config), Self::PROTOCOL, rpc_config), // metrics: Some(Metrics::new(registry)), } } @@ -63,6 +102,6 @@ impl core::error::Error for Error {} impl Default for Behaviour { fn default() -> Self { - Self::new() + Self::new(Config::default()) } } diff --git a/code/crates/blocksync/src/lib.rs b/code/crates/blocksync/src/lib.rs index 0827f2a5c..0cbef13c7 100644 --- a/code/crates/blocksync/src/lib.rs +++ b/code/crates/blocksync/src/lib.rs @@ -1,5 +1,5 @@ mod behaviour; -pub use behaviour::{Behaviour, Event}; +pub use behaviour::{Behaviour, Config, Event}; mod codec; pub use codec::NetworkCodec; @@ -16,6 +16,8 @@ pub use types::{ Status, SyncedBlock, }; +mod rpc; + mod macros; #[doc(hidden)] diff --git a/code/crates/blocksync/src/rpc.rs b/code/crates/blocksync/src/rpc.rs new file mode 100644 index 000000000..f3cee3fb9 --- /dev/null +++ b/code/crates/blocksync/src/rpc.rs @@ -0,0 +1,114 @@ +use async_trait::async_trait; +use bytes::Bytes; +use libp2p::futures::{io, AsyncRead, AsyncWrite}; +use libp2p::StreamProtocol; + +use crate::behaviour::Config; +use crate::types::{RawRequest, RawResponse}; + +#[derive(Copy, Clone)] +pub struct Codec { + config: Config, +} + +impl Codec { + pub fn new(config: Config) -> Self { + Self { config } + } +} + +#[async_trait] +impl libp2p::request_response::Codec for Codec { + type Protocol = StreamProtocol; + + type Request = RawRequest; + type Response = RawResponse; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + read_length_prefixed(io, self.config.max_request_size) + .await + .map(RawRequest) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + read_length_prefixed(io, self.config.max_response_size) + .await + .map(RawResponse) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + write_length_prefixed(io, req.0, self.config.max_request_size).await + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + write_length_prefixed(io, res.0, self.config.max_response_size).await + } +} + +const U32_LENGTH: usize = size_of::(); + +async fn write_length_prefixed(dst: &mut T, data: Bytes, max_len: usize) -> io::Result<()> +where + T: AsyncWrite + Unpin + Send, +{ + use io::AsyncWriteExt; + + let len = data.len(); + if len > max_len || len > u32::MAX as usize { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "data too large", + )); + } + + dst.write_all(&(len as u32).to_be_bytes()).await?; + dst.write_all(&data).await?; + dst.flush().await?; + + Ok(()) +} + +async fn read_length_prefixed(src: &mut T, max_len: usize) -> io::Result +where + T: AsyncRead + Unpin + Send, +{ + use io::AsyncReadExt; + + let mut len_bytes = [0u8; U32_LENGTH]; + src.read_exact(&mut len_bytes).await?; + let len = u32::from_be_bytes(len_bytes) as usize; + + if len > max_len { + return Err(io::Error::new(io::ErrorKind::InvalidData, "data too large")); + } + + let mut data = vec![0u8; len]; + src.read_exact(&mut data).await?; + Ok(Bytes::from(data)) +} diff --git a/code/crates/cli/src/cmd/testnet.rs b/code/crates/cli/src/cmd/testnet.rs index 039cec714..09141f5ce 100644 --- a/code/crates/cli/src/cmd/testnet.rs +++ b/code/crates/cli/src/cmd/testnet.rs @@ -242,6 +242,8 @@ pub fn generate_config( enabled: enable_discovery, }, transport, + rpc_max_size: ByteSize::mib(10), + pubsub_max_size: ByteSize::mib(4), }, }, mempool: MempoolConfig { @@ -254,6 +256,8 @@ pub fn generate_config( .collect(), discovery: DiscoveryConfig { enabled: false }, transport, + rpc_max_size: ByteSize::mib(10), + pubsub_max_size: ByteSize::mib(4), }, max_tx_count: 10000, gossip_batch_size: 0, diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 193497c14..2810286d1 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -88,6 +88,12 @@ pub struct P2pConfig { /// The type of pub-sub protocol to use for consensus pub protocol: PubSubProtocol, + + /// The maximum size of messages to send over pub-sub + pub pubsub_max_size: ByteSize, + + /// The maximum size of messages to send over RPC + pub rpc_max_size: ByteSize, } /// Peer Discovery configuration options diff --git a/code/crates/gossip-consensus/src/behaviour.rs b/code/crates/gossip-consensus/src/behaviour.rs index 6ccafe786..0ba4ac254 100644 --- a/code/crates/gossip-consensus/src/behaviour.rs +++ b/code/crates/gossip-consensus/src/behaviour.rs @@ -14,9 +14,7 @@ use malachite_blocksync as blocksync; use malachite_discovery as discovery; use malachite_metrics::Registry; -use crate::{GossipSubConfig, PubSubProtocol, PROTOCOL}; - -const MAX_TRANSMIT_SIZE: usize = 4 * 1024 * 1024; // 4 MiB +use crate::{Config, GossipSubConfig, PubSubProtocol, PROTOCOL}; #[derive(Debug)] pub enum NetworkEvent { @@ -84,14 +82,14 @@ pub struct Behaviour { pub ping: ping::Behaviour, pub pubsub: Either, pub blocksync: blocksync::Behaviour, - pub request_response: Toggle, + pub discovery: Toggle, } impl discovery::SendRequestResponse for Behaviour { fn send_request(&mut self, peer_id: &PeerId, req: discovery::Request) -> OutboundRequestId { - self.request_response + self.discovery .as_mut() - .expect("Request-response behaviour should be available") + .expect("Discovery behaviour should be available") .send_request(peer_id, req) } @@ -100,9 +98,9 @@ impl discovery::SendRequestResponse for Behaviour { ch: ResponseChannel, rs: discovery::Response, ) -> Result<(), discovery::Response> { - self.request_response + self.discovery .as_mut() - .expect("Request-response behaviour should be available") + .expect("Discovery behaviour should be available") .send_response(ch, rs) } } @@ -116,9 +114,9 @@ fn message_id(message: &gossipsub::Message) -> gossipsub::MessageId { gossipsub::MessageId::new(hasher.finish().to_be_bytes().as_slice()) } -fn gossipsub_config(config: GossipSubConfig) -> gossipsub::Config { +fn gossipsub_config(config: GossipSubConfig, max_transmit_size: usize) -> gossipsub::Config { gossipsub::ConfigBuilder::default() - .max_transmit_size(MAX_TRANSMIT_SIZE) + .max_transmit_size(max_transmit_size) .opportunistic_graft_ticks(3) .heartbeat_interval(Duration::from_secs(1)) .validation_mode(gossipsub::ValidationMode::Strict) @@ -134,12 +132,7 @@ fn gossipsub_config(config: GossipSubConfig) -> gossipsub::Config { } impl Behaviour { - pub fn new_with_metrics( - tpe: PubSubProtocol, - keypair: &Keypair, - discovery: discovery::Config, - registry: &mut Registry, - ) -> Self { + pub fn new_with_metrics(config: &Config, keypair: &Keypair, registry: &mut Registry) -> Self { let identify = identify::Behaviour::new(identify::Config::new( PROTOCOL.to_string(), keypair.public(), @@ -147,11 +140,11 @@ impl Behaviour { let ping = ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(5))); - let pubsub = match tpe { - PubSubProtocol::GossipSub(config) => Either::Left( + let pubsub = match config.protocol { + PubSubProtocol::GossipSub(cfg) => Either::Left( gossipsub::Behaviour::new_with_metrics( gossipsub::MessageAuthenticity::Signed(keypair.clone()), - gossipsub_config(config), + gossipsub_config(cfg, config.pubsub_max_size), registry.sub_registry_with_prefix("gossipsub"), Default::default(), ) @@ -159,23 +152,25 @@ impl Behaviour { ), PubSubProtocol::Broadcast => Either::Right(broadcast::Behaviour::new_with_metrics( broadcast::Config { - max_buf_size: MAX_TRANSMIT_SIZE, + max_buf_size: config.pubsub_max_size, }, registry.sub_registry_with_prefix("broadcast"), )), }; - let blocksync = - blocksync::Behaviour::new_with_metrics(registry.sub_registry_with_prefix("blocksync")); + let blocksync = blocksync::Behaviour::new_with_metrics( + blocksync::Config::default().with_max_response_size(config.rpc_max_size), + registry.sub_registry_with_prefix("blocksync"), + ); - let request_response = Toggle::from(discovery.enabled.then(discovery::new_behaviour)); + let discovery = Toggle::from(config.discovery.enabled.then(discovery::new_behaviour)); Self { identify, ping, pubsub, blocksync, - request_response, + discovery, } } } diff --git a/code/crates/gossip-consensus/src/lib.rs b/code/crates/gossip-consensus/src/lib.rs index 7fa07aa55..cd3717f20 100644 --- a/code/crates/gossip-consensus/src/lib.rs +++ b/code/crates/gossip-consensus/src/lib.rs @@ -96,6 +96,8 @@ pub struct Config { pub idle_connection_timeout: Duration, pub transport: TransportProtocol, pub protocol: PubSubProtocol, + pub rpc_max_size: usize, + pub pubsub_max_size: usize, } impl Config { @@ -175,18 +177,14 @@ pub async fn spawn( )? .with_dns()? .with_bandwidth_metrics(registry) - .with_behaviour(|kp| { - Behaviour::new_with_metrics(config.protocol, kp, config.discovery, registry) - })? + .with_behaviour(|kp| Behaviour::new_with_metrics(&config, kp, registry))? .with_swarm_config(|cfg| config.apply_to_swarm(cfg)) .build()), TransportProtocol::Quic => Ok(builder .with_quic_config(|cfg| config.apply_to_quic(cfg)) .with_dns()? .with_bandwidth_metrics(registry) - .with_behaviour(|kp| { - Behaviour::new_with_metrics(config.protocol, kp, config.discovery, registry) - })? + .with_behaviour(|kp| Behaviour::new_with_metrics(&config, kp, registry))? .with_swarm_config(|cfg| config.apply_to_swarm(cfg)) .build()), } @@ -303,8 +301,8 @@ async fn handle_ctrl_msg( let result = swarm.behaviour_mut().blocksync.send_response(channel, data); match result { - Ok(()) => trace!("Replied to BlockSync request"), - Err(e) => error!("Error replying to BlockSync request: {e}"), + Ok(()) => debug!(%request_id, "Replied to BlockSync request"), + Err(e) => error!(%request_id, "Error replying to BlockSync request: {e}"), } ControlFlow::Continue(()) diff --git a/code/crates/gossip-consensus/test/src/lib.rs b/code/crates/gossip-consensus/test/src/lib.rs index f4ca38602..5ad6f29ce 100644 --- a/code/crates/gossip-consensus/test/src/lib.rs +++ b/code/crates/gossip-consensus/test/src/lib.rs @@ -162,6 +162,8 @@ impl Test { idle_connection_timeout: Duration::from_secs(60), transport: malachite_gossip_consensus::TransportProtocol::Quic, protocol: malachite_gossip_consensus::PubSubProtocol::default(), + rpc_max_size: 10 * 1024 * 1024, // 10 MiB + pubsub_max_size: 4 * 1024 * 1024, // 4 MiB }) } diff --git a/code/crates/starknet/app/src/spawn.rs b/code/crates/starknet/app/src/spawn.rs index 05ff620b6..9ade24d78 100644 --- a/code/crates/starknet/app/src/spawn.rs +++ b/code/crates/starknet/app/src/spawn.rs @@ -175,7 +175,7 @@ async fn spawn_gossip_consensus_actor( enabled: cfg.consensus.p2p.discovery.enabled, ..Default::default() }, - idle_connection_timeout: Duration::from_secs(60), + idle_connection_timeout: Duration::from_secs(15 * 60), transport: match cfg.consensus.p2p.transport { TransportProtocol::Tcp => malachite_gossip_consensus::TransportProtocol::Tcp, TransportProtocol::Quic => malachite_gossip_consensus::TransportProtocol::Quic, @@ -191,6 +191,8 @@ async fn spawn_gossip_consensus_actor( } PubSubProtocol::Broadcast => malachite_gossip_consensus::PubSubProtocol::Broadcast, }, + rpc_max_size: cfg.consensus.p2p.rpc_max_size.as_u64() as usize, + pubsub_max_size: cfg.consensus.p2p.pubsub_max_size.as_u64() as usize, }; let keypair = make_keypair(private_key); @@ -226,7 +228,7 @@ async fn spawn_gossip_mempool_actor( let config_gossip_mempool = GossipMempoolConfig { listen_addr: cfg.mempool.p2p.listen_addr.clone(), persistent_peers: cfg.mempool.p2p.persistent_peers.clone(), - idle_connection_timeout: Duration::from_secs(60), + idle_connection_timeout: Duration::from_secs(15 * 60), transport: match cfg.mempool.p2p.transport { TransportProtocol::Tcp => malachite_gossip_mempool::TransportProtocol::Tcp, TransportProtocol::Quic => malachite_gossip_mempool::TransportProtocol::Quic, diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index 3c3c42909..ae6aa4f3c 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -137,12 +137,9 @@ impl StarknetHost { trace!(parts.len = %parts.len(), "Building proposal content from parts"); let extension = self.host.params().vote_extensions.enabled.then(|| { - debug!( - size = %self.host.params().vote_extensions.size, - "Vote extensions are enabled" - ); - let size = self.host.params().vote_extensions.size.as_u64() as usize; + debug!(%size, "Vote extensions are enabled" ); + let mut bytes = vec![0u8; size]; rand::thread_rng().fill_bytes(&mut bytes); @@ -289,22 +286,11 @@ impl Actor for StarknetHost { state.next_stream_id += 1; let mut sequence = 0; - let mut extension_part = None; while let Some(part) = rx_part.recv().await { state.part_store.store(height, round, part.clone()); - if let ProposalPart::Transactions(_) = &part { - if extension_part.is_none() { - extension_part = Some(part.clone()); - } - } - - debug!( - %stream_id, - %sequence, - "Broadcasting proposal part" - ); + debug!(%stream_id, %sequence, "Broadcasting proposal part"); let msg = StreamMessage::new(stream_id, sequence, StreamContent::Data(part.clone())); @@ -320,13 +306,19 @@ impl Actor for StarknetHost { .cast(GossipConsensusMsg::PublishProposalPart(msg))?; let block_hash = rx_hash.await?; - debug!(%block_hash, "Got block"); + debug!(%block_hash, "Assembled block"); let parts = state.part_store.all_parts(height, round); - let extension = extension_part - .and_then(|part| part.as_transactions().and_then(|txs| txs.to_bytes().ok())) - .map(Extension::from); + let extension = self.host.params().vote_extensions.enabled.then(|| { + let size = self.host.params().vote_extensions.size.as_u64() as usize; + debug!(%size, "Vote extensions are enabled"); + + let mut bytes = vec![0u8; size]; + rand::thread_rng().fill_bytes(&mut bytes); + + Extension::from(bytes) + }); if let Some(value) = self.build_value_from_parts(&parts, height, round) { reply_to.send(LocallyProposedValue::new( @@ -466,7 +458,8 @@ impl Actor for StarknetHost { match state.block_store.store.get(&height).cloned() { None => { warn!( - "No block for {height}, available blocks: {}", + %height, + "No block found, available blocks: {}", state.block_store.store_keys().format(", ") ); @@ -479,7 +472,7 @@ impl Actor for StarknetHost { certificate: block.certificate, }; - debug!("Got block at {height}"); + debug!(%height, "Found decided block in store"); reply_to.send(Some(block))?; } } diff --git a/code/crates/starknet/host/src/mempool.rs b/code/crates/starknet/host/src/mempool.rs index ec491216b..1daa408b7 100644 --- a/code/crates/starknet/host/src/mempool.rs +++ b/code/crates/starknet/host/src/mempool.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, VecDeque}; +use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; @@ -38,14 +38,12 @@ pub enum MempoolMsg { #[allow(dead_code)] pub struct State { - pub msg_queue: VecDeque, pub transactions: BTreeMap, } impl State { pub fn new() -> Self { Self { - msg_queue: VecDeque::new(), transactions: BTreeMap::new(), } } @@ -160,6 +158,7 @@ impl Actor for Mempool { MempoolMsg::GossipEvent, ) .await?; + self.gossip_mempool .cast(GossipMempoolMsg::Subscribe(forward))?; @@ -226,14 +225,15 @@ fn generate_and_broadcast_txes( count: usize, size: usize, config: &MempoolConfig, - state: &mut State, + _state: &mut State, gossip_mempool: &GossipMempoolRef, ) -> Result, ActorProcessingErr> { debug!(%count, %size, "Generating transactions"); let batch_size = std::cmp::min(config.gossip_batch_size, count); + let gossip_enabled = config.gossip_batch_size > 0; - let mut transactions = vec![]; + let mut transactions = Vec::with_capacity(count); let mut tx_batch = Transactions::default(); let mut rng = rand::thread_rng(); @@ -243,15 +243,18 @@ fn generate_and_broadcast_txes( rng.fill_bytes(&mut tx_bytes); let tx = Transaction::new(tx_bytes); - // Add transaction to state - if state.transactions.len() < config.max_tx_count { - state.add_tx(&tx); + if gossip_enabled { + tx_batch.push(tx.clone()); } - tx_batch.push(tx.clone()); + transactions.push(tx); + + // if state.transactions.len() < config.max_tx_count { + // state.add_tx(&tx); + // } // Gossip tx-es to peers in batches - if config.gossip_batch_size > 0 && tx_batch.len() >= batch_size { + if gossip_enabled && tx_batch.len() >= batch_size { let tx_batch = std::mem::take(&mut tx_batch); let Ok(tx_batch_any) = tx_batch.to_any() else { @@ -262,8 +265,6 @@ fn generate_and_broadcast_txes( let mempool_batch = MempoolTransactionBatch::new(tx_batch_any); gossip_mempool.cast(GossipMempoolMsg::BroadcastMsg(mempool_batch))?; } - - transactions.push(tx); } Ok(transactions) diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 1d2587e3f..aca03def7 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -493,6 +493,8 @@ pub fn make_node_config(test: &Test, i: usize, app: App) -> N .map(|j| transport.multiaddr("127.0.0.1", test.consensus_base_port + j)) .collect(), discovery: DiscoveryConfig { enabled: false }, + rpc_max_size: ByteSize::mib(10), + pubsub_max_size: ByteSize::mib(4), }, }, mempool: MempoolConfig { @@ -505,6 +507,8 @@ pub fn make_node_config(test: &Test, i: usize, app: App) -> N .map(|j| transport.multiaddr("127.0.0.1", test.mempool_base_port + j)) .collect(), discovery: DiscoveryConfig { enabled: false }, + rpc_max_size: ByteSize::mib(10), + pubsub_max_size: ByteSize::mib(4), }, max_tx_count: 10000, gossip_batch_size: 100, diff --git a/code/scripts/spawn.fish b/code/scripts/spawn.fish index 746403fe8..9712f7dc4 100755 --- a/code/scripts/spawn.fish +++ b/code/scripts/spawn.fish @@ -1,22 +1,24 @@ #!/usr/bin/env fish -set -x MALACHITE__CONSENSUS__P2P__PROTOCOL__TYPE "broadcast" -set -x MALACHITE__CONSENSUS__MAX_BLOCK_SIZE "10MiB" -set -x MALACHITE__CONSENSUS__TIMEOUT_PROPOSE "5s" +set -x MALACHITE__CONSENSUS__MAX_BLOCK_SIZE "100 MiB" +set -x MALACHITE__CONSENSUS__P2P__RPC_MAX_SIZE "110 MiB" +set -x MALACHITE__CONSENSUS__P2P__PUBSUB_MAX_SIZE "20 MiB" +set -x MALACHITE__CONSENSUS__P2P__PROTOCOL__TYPE "gossipsub" +set -x MALACHITE__CONSENSUS__TIMEOUT_PROPOSE "10s" set -x MALACHITE__CONSENSUS__TIMEOUT_PREVOTE "3s" set -x MALACHITE__CONSENSUS__TIMEOUT_PRECOMMIT "3s" set -x MALACHITE__CONSENSUS__TIMEOUT_COMMIT "0s" -set -x MALACHITE__MEMPOOL__MAX_TX_COUNT 10000 +set -x MALACHITE__MEMPOOL__MAX_TX_COUNT 1000 set -x MALACHITE__MEMPOOL__GOSSIP_BATCH_SIZE 0 -set -x MALACHITE__TEST__TX_SIZE "1KB" +set -x MALACHITE__TEST__TX_SIZE "10 KiB" set -x MALACHITE__TEST__TXS_PER_PART 1024 set -x MALACHITE__TEST__TIME_ALLOWANCE_FACTOR 0.5 -set -x MALACHITE__TEST__EXEC_TIME_PER_TX "500us" -set -x MALACHITE__TEST__MAX_RETAIN_BLOCKS 100 +set -x MALACHITE__TEST__EXEC_TIME_PER_TX "100us" +set -x MALACHITE__TEST__MAX_RETAIN_BLOCKS 50 set -x MALACHITE__TEST__VOTE_EXTENSIONS__ENABLED false set -x MALACHITE__TEST__VOTE_EXTENSIONS__SIZE "1KiB" set -x MALACHITE__BLOCKSYNC__ENABLED true - +set -x MALACHITE__BLOCKSYNC__REQUEST_TIMEOUT "30s" # This script takes: # - a number of nodes to run as an argument,