Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable "connected peers" and "peer info" protocols. #2295

Merged
merged 4 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 34 additions & 27 deletions crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,16 @@ mod tests;
use crate::protocols::autonat_wrapper::{
Behaviour as AutonatWrapper, Config as AutonatWrapperConfig,
};
use crate::protocols::connected_peers::{
Behaviour as ConnectedPeersBehaviour, Config as ConnectedPeersConfig,
Event as ConnectedPeersEvent,
};
use crate::protocols::peer_info::{
Behaviour as PeerInfoBehaviour, Config as PeerInfoConfig, Event as PeerInfoEvent,
};
use crate::protocols::connected_peers::Config as ConnectedPeersConfig;
use crate::protocols::peer_info::Event as PeerInfoEvent;
use crate::protocols::request_response::request_response_factory::{
Event as RequestResponseEvent, RequestHandler, RequestResponseFactoryBehaviour,
};
use crate::protocols::reserved_peers::{
Behaviour as ReservedPeersBehaviour, Config as ReservedPeersConfig, Event as ReservedPeersEvent,
};
use crate::protocols::subspace_connection_limits::Behaviour as ConnectionLimitsBehaviour;
use crate::PeerInfoProvider;
use crate::{PeerInfoConfig, PeerInfoProvider};
use derive_more::From;
use libp2p::allow_block_list::{Behaviour as AllowBlockListBehaviour, BlockedPeers};
use libp2p::autonat::Event as AutonatEvent;
Expand Down Expand Up @@ -54,12 +49,20 @@ pub(crate) struct BehaviorConfig<RecordStore> {
pub(crate) connection_limits: ConnectionLimits,
/// The configuration for the [`ReservedPeersBehaviour`].
pub(crate) reserved_peers: ReservedPeersConfig,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// The configuration for the [`PeerInfo`] protocol.
pub(crate) peer_info_config: PeerInfoConfig,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// Provides peer-info for local peer.
pub(crate) peer_info_provider: Option<PeerInfoProvider>,
/// The configuration for the [`ConnectedPeers`] protocol (general instance).
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
pub(crate) general_connected_peers_config: Option<ConnectedPeersConfig>,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// The configuration for the [`ConnectedPeers`] protocol (special instance).
pub(crate) special_connected_peers_config: Option<ConnectedPeersConfig>,
/// Autonat configuration.
Expand All @@ -85,11 +88,12 @@ pub(crate) struct Behavior<RecordStore> {
pub(crate) request_response: RequestResponseFactoryBehaviour,
pub(crate) block_list: BlockListBehaviour,
pub(crate) reserved_peers: ReservedPeersBehaviour,
pub(crate) peer_info: Toggle<PeerInfoBehaviour>,
pub(crate) general_connected_peers:
Toggle<ConnectedPeersBehaviour<GeneralConnectedPeersInstance>>,
pub(crate) special_connected_peers:
Toggle<ConnectedPeersBehaviour<SpecialConnectedPeersInstance>>,
// TODO: Restore or remove connected peer later
// pub(crate) peer_info: Toggle<PeerInfoBehaviour>,
// pub(crate) general_connected_peers:
// Toggle<ConnectedPeersBehaviour<GeneralConnectedPeersInstance>>,
// pub(crate) special_connected_peers:
// Toggle<ConnectedPeersBehaviour<SpecialConnectedPeersInstance>>,
pub(crate) autonat: AutonatWrapper,
}

Expand All @@ -116,9 +120,10 @@ where
})
.into();

let peer_info = config
.peer_info_provider
.map(|provider| PeerInfoBehaviour::new(config.peer_info_config, provider));
// TODO: Restore or remove connected peer later
// let peer_info = config
// .peer_info_provider
// .map(|provider| PeerInfoBehaviour::new(config.peer_info_config, provider));

Self {
connection_limits: ConnectionLimitsBehaviour::new(config.connection_limits),
Expand All @@ -133,15 +138,16 @@ where
.expect("RequestResponse protocols registration failed."),
block_list: BlockListBehaviour::default(),
reserved_peers: ReservedPeersBehaviour::new(config.reserved_peers),
peer_info: peer_info.into(),
general_connected_peers: config
.general_connected_peers_config
.map(ConnectedPeersBehaviour::new)
.into(),
special_connected_peers: config
.special_connected_peers_config
.map(ConnectedPeersBehaviour::new)
.into(),
// TODO: Restore or remove connected peer later
//peer_info: peer_info.into(),
// general_connected_peers: config
// .general_connected_peers_config
// .map(ConnectedPeersBehaviour::new)
// .into(),
// special_connected_peers: config
// .special_connected_peers_config
// .map(ConnectedPeersBehaviour::new)
// .into(),
autonat: AutonatWrapper::new(config.autonat),
}
}
Expand All @@ -158,7 +164,8 @@ pub(crate) enum Event {
VoidEventStub(VoidEvent),
ReservedPeers(ReservedPeersEvent),
PeerInfo(PeerInfoEvent),
GeneralConnectedPeers(ConnectedPeersEvent<GeneralConnectedPeersInstance>),
SpecialConnectedPeers(ConnectedPeersEvent<SpecialConnectedPeersInstance>),
// TODO: Restore or remove connected peer later
// GeneralConnectedPeers(ConnectedPeersEvent<GeneralConnectedPeersInstance>),
// SpecialConnectedPeers(ConnectedPeersEvent<SpecialConnectedPeersInstance>),
Autonat(AutonatEvent),
}
132 changes: 70 additions & 62 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ use crate::behavior::persistent_parameters::{
append_p2p_suffix, remove_p2p_suffix, KnownPeersRegistry, PeerAddressRemovedEvent,
PEERS_ADDRESSES_BATCH_SIZE,
};
use crate::behavior::{
Behavior, Event, GeneralConnectedPeersInstance, SpecialConnectedPeersInstance,
};
use crate::behavior::{Behavior, Event};
use crate::constructor;
use crate::constructor::temporary_bans::TemporaryBans;
use crate::constructor::{ConnectedPeersHandler, LocalOnlyRecordStore};
use crate::protocols::connected_peers::Event as ConnectedPeersEvent;
use crate::protocols::peer_info::{Event as PeerInfoEvent, PeerInfoSuccess};
use crate::protocols::request_response::request_response_factory::{
Event as RequestResponseEvent, IfDisconnected,
Expand Down Expand Up @@ -132,10 +129,16 @@ where
peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
/// Defines protocol version for the network peers. Affects network partition.
protocol_version: String,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// Defines whether we maintain a persistent connection for common peers.
general_connection_decision_handler: Option<ConnectedPeersHandler>,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// Defines whether we maintain a persistent connection for special peers.
special_connection_decision_handler: Option<ConnectedPeersHandler>,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
/// Randomness generator used for choosing Kademlia addresses.
rng: StdRng,
/// Addresses to bootstrap Kademlia network
Expand Down Expand Up @@ -454,12 +457,13 @@ where
SwarmEvent::Behaviour(Event::PeerInfo(event)) => {
self.handle_peer_info_event(event).await;
}
SwarmEvent::Behaviour(Event::GeneralConnectedPeers(event)) => {
self.handle_general_connected_peers_event(event).await;
}
SwarmEvent::Behaviour(Event::SpecialConnectedPeers(event)) => {
self.handle_special_connected_peers_event(event).await;
}
// TODO: Restore or remove connected peer later
// SwarmEvent::Behaviour(Event::GeneralConnectedPeers(event)) => {
// self.handle_general_connected_peers_event(event).await;
// }
// SwarmEvent::Behaviour(Event::SpecialConnectedPeers(event)) => {
// self.handle_special_connected_peers_event(event).await;
// }
SwarmEvent::Behaviour(Event::Autonat(event)) => {
self.handle_autonat_event(event).await;
}
Expand Down Expand Up @@ -1178,61 +1182,63 @@ where
});
}

if let Some(general_connected_peers) =
self.swarm.behaviour_mut().general_connected_peers.as_mut()
{
let keep_alive = self
.general_connection_decision_handler
.as_ref()
.map(|handler| handler(&peer_info))
.unwrap_or(false);

general_connected_peers.update_keep_alive_status(event.peer_id, keep_alive);
}

if let Some(special_connected_peers) =
self.swarm.behaviour_mut().special_connected_peers.as_mut()
{
let special_keep_alive = self
.special_connection_decision_handler
.as_ref()
.map(|handler| handler(&peer_info))
.unwrap_or(false);

special_connected_peers.update_keep_alive_status(event.peer_id, special_keep_alive);
}
// TODO: Restore or remove connected peer later
// if let Some(general_connected_peers) =
// self.swarm.behaviour_mut().general_connected_peers.as_mut()
// {
// let keep_alive = self
// .general_connection_decision_handler
// .as_ref()
// .map(|handler| handler(&peer_info))
// .unwrap_or(false);
//
// general_connected_peers.update_keep_alive_status(event.peer_id, keep_alive);
// }
//
// if let Some(special_connected_peers) =
// self.swarm.behaviour_mut().special_connected_peers.as_mut()
// {
// let special_keep_alive = self
// .special_connection_decision_handler
// .as_ref()
// .map(|handler| handler(&peer_info))
// .unwrap_or(false);
//
// special_connected_peers.update_keep_alive_status(event.peer_id, special_keep_alive);
// }
}
}

async fn handle_general_connected_peers_event(
&mut self,
event: ConnectedPeersEvent<GeneralConnectedPeersInstance>,
) {
trace!(?event, "General connected peers event.");

let peers = self.get_peers_to_dial().await;

if let Some(general_connected_peers) =
self.swarm.behaviour_mut().general_connected_peers.as_mut()
{
general_connected_peers.add_peers_to_dial(&peers);
}
}

async fn handle_special_connected_peers_event(
&mut self,
event: ConnectedPeersEvent<SpecialConnectedPeersInstance>,
) {
trace!(?event, "Special connected peers event.");

let peers = self.get_peers_to_dial().await;

if let Some(special_connected_peers) =
self.swarm.behaviour_mut().special_connected_peers.as_mut()
{
special_connected_peers.add_peers_to_dial(&peers);
}
}
// TODO: Restore or remove connected peer later
// async fn handle_general_connected_peers_event(
// &mut self,
// event: ConnectedPeersEvent<GeneralConnectedPeersInstance>,
// ) {
// trace!(?event, "General connected peers event.");
//
// let peers = self.get_peers_to_dial().await;
//
// if let Some(general_connected_peers) =
// self.swarm.behaviour_mut().general_connected_peers.as_mut()
// {
// general_connected_peers.add_peers_to_dial(&peers);
// }
// }
//
// async fn handle_special_connected_peers_event(
// &mut self,
// event: ConnectedPeersEvent<SpecialConnectedPeersInstance>,
// ) {
// trace!(?event, "Special connected peers event.");
//
// let peers = self.get_peers_to_dial().await;
//
// if let Some(special_connected_peers) =
// self.swarm.behaviour_mut().special_connected_peers.as_mut()
// {
// special_connected_peers.add_peers_to_dial(&peers);
// }
// }

async fn handle_autonat_event(&mut self, event: AutonatEvent) {
trace!(?event, "Autonat event received.");
Expand Down Expand Up @@ -1575,6 +1581,8 @@ where
}
}

// TODO: Restore or remove connected peer later
#[allow(dead_code)]
async fn get_peers_to_dial(&mut self) -> Vec<PeerAddress> {
let mut result_peers =
Vec::with_capacity(KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE + PEERS_ADDRESSES_BATCH_SIZE);
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-networking/src/protocols/connected_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
//! attempts, and manages a cache for candidates for permanent connections. It maintains
//! a single connection for each peer. Multiple protocol instances could be instantiated.

//! TODO: Restore or remove connected peer later
#![allow(dead_code)]

mod handler;

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-networking/src/protocols/peer_info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! TODO: Restore or remove connected peer later
#![allow(dead_code)]

mod handler;
mod protocol;

Expand Down
54 changes: 28 additions & 26 deletions crates/subspace-service/src/sync_from_dsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const MIN_OFFLINE_PERIOD: Duration = Duration::from_secs(60);
#[derive(Debug)]
enum NotificationReason {
NoImportedBlocks,
// TODO: Restore or remove connected peer later
#[allow(dead_code)]
WentOnlineSubspace,
WentOnlineSubstrate,
}
Expand Down Expand Up @@ -97,38 +99,38 @@ where

async fn create_observer<Block, Client>(
network_service: &NetworkService<Block, <Block as BlockT>::Hash>,
node: &Node,
_node: &Node,
client: &Client,
notifications_sender: mpsc::Sender<NotificationReason>,
) where
Block: BlockT,
Client: BlockchainEvents<Block> + Send + Sync + 'static,
{
// Separate reactive observer for Subspace networking that is not a future
let _handler_id = node.on_num_established_peer_connections_change({
// Assuming node is offline by default
let last_online = Atomic::new(None::<Instant>);
let notifications_sender = notifications_sender.clone();

Arc::new(move |&new_connections| {
let is_online = new_connections > 0;
let was_online = last_online
.load(Ordering::AcqRel)
.map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
.unwrap_or_default();

if is_online && !was_online {
// Doesn't matter if sending failed here
let _ = notifications_sender
.clone()
.try_send(NotificationReason::WentOnlineSubspace);
}

if is_online {
last_online.store(Some(Instant::now()), Ordering::Release);
}
})
});
// // Separate reactive observer for Subspace networking that is not a future
// let _handler_id = node.on_num_established_peer_connections_change({
// // Assuming node is offline by default
// let last_online = Atomic::new(None::<Instant>);
// let notifications_sender = notifications_sender.clone();
//
// Arc::new(move |&new_connections| {
// let is_online = new_connections > 0;
// let was_online = last_online
// .load(Ordering::AcqRel)
// .map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
// .unwrap_or_default();
//
// if is_online && !was_online {
// // Doesn't matter if sending failed here
// let _ = notifications_sender
// .clone()
// .try_send(NotificationReason::WentOnlineSubspace);
// }
//
// if is_online {
// last_online.store(Some(Instant::now()), Ordering::Release);
// }
// })
// });
futures::select! {
_ = create_imported_blocks_observer(client, notifications_sender.clone()).fuse() => {
// Runs indefinitely
Expand Down
Loading