Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

fix(p2p): accept listener connection during bootstrap #484

Merged
merged 6 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/topos-p2p/src/behaviour/grpc/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use tonic::transport::Channel;

use super::{OutboundError, RequestId};

#[allow(unused)]
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug)]
pub enum Event {
OutboundFailure {
Expand Down
43 changes: 29 additions & 14 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};
use futures::Stream;
use libp2p::{
core::upgrade,
core::{transport::MemoryTransport, upgrade},
dns,
identity::Keypair,
kad::store::MemoryStore,
Expand All @@ -30,6 +30,7 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;

pub fn builder<'a>() -> NetworkBuilder<'a> {
NetworkBuilder::default()
Expand All @@ -48,9 +49,16 @@ pub struct NetworkBuilder<'a> {
local_port: Option<u8>,
config: NetworkConfig,
grpc_context: GrpcContext,
memory_transport: bool,
}

impl<'a> NetworkBuilder<'a> {
#[cfg(test)]
pub(crate) fn memory(mut self) -> Self {
self.memory_transport = true;

self
}
pub fn grpc_context(mut self, grpc_context: GrpcContext) -> Self {
self.grpc_context = grpc_context;

Expand Down Expand Up @@ -131,6 +139,7 @@ impl<'a> NetworkBuilder<'a> {

let grpc = grpc::Behaviour::new(self.grpc_context);

debug!("Known peers: {:?}", self.known_peers);
let behaviour = Behaviour {
gossipsub,
peer_info: PeerInfoBehaviour::new(PEER_INFO_PROTOCOL, &peer_key),
Expand All @@ -148,23 +157,29 @@ impl<'a> NetworkBuilder<'a> {
grpc,
};

let transport = {
let multiplex_config = libp2p::yamux::Config::default();

let transport = if self.memory_transport {
MemoryTransport::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed()
} else {
let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
let dns_tcp = dns::tokio::Transport::system(tcp).unwrap();

let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
dns_tcp.or_transport(tcp)
dns_tcp
.or_transport(tcp)
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed()
};

let multiplex_config = libp2p::yamux::Config::default();

let transport = transport
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed();

let swarm = Swarm::new(
transport,
behaviour,
Expand Down Expand Up @@ -216,8 +231,8 @@ impl<'a> NetworkBuilder<'a> {
pending_record_requests: HashMap::new(),
shutdown,
health_state: crate::runtime::HealthState {
bootpeer_connection_retries: 3,
successfully_connected_to_bootpeer: if self.known_peers.is_empty() {
bootnode_connection_retries: 3,
successfully_connected_to_bootnode: if self.known_peers.is_empty() {
// Node seems to be a boot node
Some(ConnectionId::new_unchecked(0))
} else {
Expand Down
61 changes: 43 additions & 18 deletions crates/topos-p2p/src/runtime/handle_event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use libp2p::{multiaddr::Protocol, swarm::SwarmEvent};
use libp2p::{core::Endpoint, multiaddr::Protocol, swarm::SwarmEvent};
use tracing::{debug, error, info, warn};

use crate::{error::P2PError, event::ComposedEvent, Event, Runtime};
Expand Down Expand Up @@ -62,13 +62,13 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
error,
} if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.health_state.dialed_bootpeer.contains(&connection_id) =>
&& self.health_state.dialed_bootnode.contains(&connection_id) =>
{
warn!("Unable to connect to bootpeer {peer_id}: {error:?}");
self.health_state.dialed_bootpeer.remove(&connection_id);
if self.health_state.dialed_bootpeer.is_empty() {
warn!("Unable to connect to bootnode {peer_id}: {error:?}");
self.health_state.dialed_bootnode.remove(&connection_id);
if self.health_state.dialed_bootnode.is_empty() {
// We tried to connect to all bootnode without success
error!("Unable to connect to any bootnode");
}
Expand Down Expand Up @@ -100,25 +100,49 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
num_established,
concurrent_dial_errors,
established_in,
} if self.health_state.dialed_bootpeer.contains(&connection_id) => {
info!("Successfully connected to bootpeer {peer_id}");
} if self.health_state.dialed_bootnode.contains(&connection_id) => {
info!("Successfully connected to bootnode {peer_id}");
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
self.health_state.successfully_connected_to_bootpeer = Some(connection_id);
_ = self.health_state.dialed_bootpeer.remove(&connection_id);
self.health_state.successfully_connected_to_bootnode = Some(connection_id);
_ = self.health_state.dialed_bootnode.remove(&connection_id);
}
}

SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
peer_id,
endpoint,
connection_id,
..
} => {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
if self
.health_state
.successfully_connected_to_bootnode
.is_none()
&& self.boot_peers.contains(&peer_id)
{
info!(
"Connection established with bootnode {peer_id} as {:?}",
endpoint.to_endpoint()
);

if endpoint.to_endpoint() == Endpoint::Listener {
if let Err(error) = self.swarm.dial(peer_id) {
error!(
"Unable to dial bootnode {peer_id} after incoming connection: \
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
{error}"
);
}
}
} else {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
}

if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size {
if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() {
Expand Down Expand Up @@ -164,8 +188,8 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
peer_id: Some(ref peer_id),
connection_id,
} if self.boot_peers.contains(peer_id) => {
info!("Dialing bootpeer {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootpeer.insert(connection_id);
info!("Dialing bootnode {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootnode.insert(connection_id);
}

SwarmEvent::Dialing {
Expand All @@ -185,6 +209,7 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
SwarmEvent::ListenerError { listener_id, error } => {
error!("Unhandled ListenerError {listener_id:?} | {error}")
}

event => {
warn!("Unhandled SwarmEvent: {:?}", event);
}
Expand Down
10 changes: 5 additions & 5 deletions crates/topos-p2p/src/runtime/handle_event/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl EventHandler<Box<Event>> for Runtime {
{
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
warn!(
Expand Down Expand Up @@ -85,11 +85,11 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
match self.health_state.bootpeer_connection_retries.checked_sub(1) {
match self.health_state.bootnode_connection_retries.checked_sub(1) {
None => {
error!(
"Bootstrap query finished but unable to connect to bootnode, stopping"
Expand All @@ -103,7 +103,7 @@ impl EventHandler<Box<Event>> for Runtime {
{} more times",
new
);
self.health_state.bootpeer_connection_retries = new;
self.health_state.bootnode_connection_retries = new;
}
}
}
Expand All @@ -119,7 +119,7 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_some()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ pub(crate) struct HealthState {
/// Indicates if the node is listening on any address
pub(crate) is_listening: bool,
/// List the bootnodes that the node has tried to connect to
pub(crate) dialed_bootpeer: HashSet<ConnectionId>,
pub(crate) dialed_bootnode: HashSet<ConnectionId>,
/// Indicates if the node has successfully connected to a bootnode
pub(crate) successfully_connected_to_bootpeer: Option<ConnectionId>,
pub(crate) successfully_connected_to_bootnode: Option<ConnectionId>,
/// Track the number of remaining retries to connect to any bootnode
pub(crate) bootpeer_connection_retries: usize,
pub(crate) bootnode_connection_retries: usize,
}

impl Runtime {
Expand Down
60 changes: 60 additions & 0 deletions crates/topos-p2p/src/tests/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::time::Duration;

use futures::{future::join_all, FutureExt};
use rstest::rstest;
use test_log::test;
use topos_test_sdk::tce::NodeConfig;
use tracing::Instrument;

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(5))]
async fn two_bootnode_communicating() {
let bootnode = NodeConfig::memory(2);
let local = NodeConfig::memory(1);
let bootnode_known_peers = vec![(local.peer_id(), local.addr.clone())];
let local_known_peers = vec![(bootnode.peer_id(), bootnode.addr.clone())];

let mut handlers = Vec::new();

let context_local = tracing::info_span!("start_node", "peer_id" = local.peer_id().to_string());

let context_bootnode =
tracing::info_span!("start_node", "peer_id" = bootnode.peer_id().to_string());
handlers.push(
async move {
let (client, mut stream, runtime) = crate::network::builder()
.minimum_cluster_size(1)
.peer_key(local.keypair.clone())
.listen_addresses(&[local.addr.clone()])
.known_peers(&local_known_peers)
.memory()
.build()
.await
.expect("Unable to create p2p network");

runtime.bootstrap(&mut stream).await
}
.instrument(context_local)
.boxed(),
);

handlers.push(
async move {
let (client, mut stream, runtime) = crate::network::builder()
.minimum_cluster_size(1)
.peer_key(bootnode.keypair.clone())
.listen_addresses(&[bootnode.addr.clone()])
.known_peers(&bootnode_known_peers)
.memory()
.build()
.await
.expect("Unable to create p2p network");

runtime.bootstrap(&mut stream).await
}
.instrument(context_bootnode)
.boxed(),
);
assert!(join_all(handlers).await.iter().all(Result::is_ok));
}
1 change: 1 addition & 0 deletions crates/topos-p2p/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod behaviour;
mod bootstrap;
mod command;
mod support;
2 changes: 1 addition & 1 deletion crates/topos-test-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ethers.workspace = true
async-trait.workspace = true
futures.workspace = true
lazy_static = { version = "1.4.0" }
libp2p.workspace = true
libp2p = { workspace = true, features = ["macros"] }
proc_macro_sdk = { path = "./proc_macro_sdk/" }
rand.workspace = true
rstest.workspace = true
Expand Down
28 changes: 16 additions & 12 deletions crates/topos-test-sdk/src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
use libp2p::{
build_multiaddr,
identity::{self, Keypair},
Multiaddr,
};
use rand::{thread_rng, Rng};

use crate::networking::get_available_port;

pub type Port = u16;

pub fn local_peer(peer_index: u8) -> (Keypair, Port, Multiaddr) {
pub fn local_peer(peer_index: u8, memory_transport: bool) -> (Keypair, Multiaddr) {
let peer_id: Keypair = keypair_from_seed(peer_index);
let port = get_available_port();
let local_listen_addr: Multiaddr = format!(
"/ip4/127.0.0.1/tcp/{}/p2p/{}",
port,
peer_id.public().to_peer_id()
)
.parse()
.unwrap();
let local_listen_addr = if memory_transport {
build_multiaddr![Memory(thread_rng().gen::<u64>())]
} else {
let port = get_available_port();
format!(
"/ip4/127.0.0.1/tcp/{}/p2p/{}",
port,
peer_id.public().to_peer_id()
)
.parse()
.unwrap()
};

(peer_id, port, local_listen_addr)
(peer_id, local_listen_addr)
}

pub fn keypair_from_seed(seed: u8) -> Keypair {
Expand Down
Loading
Loading