Skip to content

Commit

Permalink
Merge branch 'main' into remove-old-protocols
Browse files Browse the repository at this point in the history
# Conflicts:
#	crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs
#	crates/subspace-networking/examples/benchmark.rs
#	crates/subspace-networking/examples/random-walker.rs
#	crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs
#	crates/subspace-networking/src/constructor.rs
#	crates/subspace-networking/src/node_runner.rs
#	crates/subspace-service/src/dsn.rs
  • Loading branch information
shamil-gadelshin committed Dec 15, 2023
2 parents 3bb2da3 + 3f3a82a commit bc8af4b
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 56 deletions.
19 changes: 10 additions & 9 deletions crates/sc-proof-of-time/src/source/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,21 @@ where
return;
}

// This sorts from lowest reputation to highest
potentially_matching_proofs.sort_by_cached_key(|(_proof, peer_ids)| {
peer_ids
.iter()
.map(|peer_id| network.peer_reputation(peer_id))
.max()
});

// If we have too many unique proofs to verify it might be cheaper to prove it ourselves
let correct_proof = if potentially_matching_proofs.len() < EXPECTED_POT_VERIFICATION_SPEEDUP
{
let mut correct_proof = None;

// Verify all proofs
for (proof, _senders) in &potentially_matching_proofs {
// Verify all proofs, starting with those sent by most reputable peers
for (proof, _senders) in potentially_matching_proofs.iter().rev() {
if pot_verifier.verify_checkpoints(
proof.seed,
proof.slot_iterations,
Expand All @@ -438,13 +446,6 @@ where

correct_proof
} else {
// This sorts from lowest reputation to highest
potentially_matching_proofs.sort_by_cached_key(|(_proof, peer_ids)| {
peer_ids
.iter()
.map(|peer_id| network.peer_reputation(peer_id))
.max()
});
// Last proof includes peer with the highest reputation
let (proof, _senders) = potentially_matching_proofs
.last()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::fs;
use std::net::SocketAddr;
use std::num::{NonZeroU8, NonZeroUsize};
Expand Down Expand Up @@ -348,9 +349,11 @@ where

let (piece_cache, piece_cache_worker) = PieceCache::new(node_client.clone(), peer_id);

// Metrics
let mut prometheus_metrics_registry = Registry::default();
let metrics_endpoints_are_specified = !metrics_endpoints.is_empty();

let (node, mut node_runner, metrics_registry) = {
let (node, mut node_runner) = {
if dsn.bootstrap_nodes.is_empty() {
dsn.bootstrap_nodes = farmer_app_info.dsn_bootstrap_nodes.clone();
}
Expand All @@ -363,14 +366,14 @@ where
Arc::downgrade(&readers_and_pieces),
node_client.clone(),
piece_cache.clone(),
metrics_endpoints_are_specified,
metrics_endpoints_are_specified.then_some(&mut prometheus_metrics_registry),
)?
};

let _prometheus_worker = if metrics_endpoints_are_specified {
let prometheus_task = start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metrics_registry),
RegistryAdapter::Libp2p(prometheus_metrics_registry),
)?;

let join_handle = tokio::spawn(prometheus_task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::{NodeClient, NodeRpcClient, KNOWN_PEERS_CACHE_SIZE};
use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::metrics::Metrics;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::strip_peer_id;
Expand Down Expand Up @@ -47,8 +46,8 @@ pub(super) fn configure_dsn(
weak_readers_and_pieces: Weak<Mutex<Option<ReadersAndPieces>>>,
node_client: NodeRpcClient,
piece_cache: PieceCache,
initialize_metrics: bool,
) -> Result<(Node, NodeRunner<PieceCache>, Registry), anyhow::Error> {
prometheus_metrics_registry: Option<&mut Registry>,
) -> Result<(Node, NodeRunner<PieceCache>), anyhow::Error> {
let networking_parameters_registry = KnownPeersManager::new(KnownPeersManagerConfig {
path: Some(base_path.join("known_addresses.bin").into_boxed_path()),
ignore_peer_list: strip_peer_id(bootstrap_nodes.clone())
Expand All @@ -60,11 +59,12 @@ pub(super) fn configure_dsn(
})
.map(Box::new)?;

// Metrics
let mut metrics_registry = Registry::default();
let metrics = initialize_metrics.then(|| Metrics::new(&mut metrics_registry));

let default_config = Config::new(protocol_prefix, keypair, piece_cache.clone());
let default_config = Config::new(
protocol_prefix,
keypair,
piece_cache.clone(),
prometheus_metrics_registry,
);
let config = Config {
reserved_peers,
listen_on,
Expand Down Expand Up @@ -182,7 +182,6 @@ pub(super) fn configure_dsn(
bootstrap_addresses: bootstrap_nodes,
kademlia_mode: KademliaMode::Dynamic,
external_addresses,
metrics,
disable_bootstrap_on_start,
..default_config
};
Expand All @@ -202,7 +201,7 @@ pub(super) fn configure_dsn(
.detach();

// Consider returning HandlerId instead of each `detach()` calls for other usages.
(node, node_runner, metrics_registry)
(node, node_runner)
})
.map_err(Into::into)
}
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ pub async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, ());
let default_config = Config::new(protocol_prefix, keypair, (), None);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() {
let config_1 = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
metrics: Some(metrics),
libp2p_metrics: Some(metrics),
..Config::default()
};
let (node_1, mut node_runner_1) = subspace_networking::construct(config_1).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/random-walker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, ());
let default_config = Config::new(protocol_prefix, keypair, (), None);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
33 changes: 18 additions & 15 deletions crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use clap::Parser;
use futures::{select, FutureExt};
use libp2p::identity::ed25519::Keypair;
use libp2p::kad::Mode;
use libp2p::metrics::Metrics;
use libp2p::{identity, Multiaddr, PeerId};
use prometheus_client::registry::Registry;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -153,19 +152,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let keypair = identity::Keypair::from(decoded_keypair);

// Metrics
let mut metric_registry = Registry::default();
let metrics_endpoints_are_specified = !metrics_endpoints.is_empty();
let metrics =
metrics_endpoints_are_specified.then(|| Metrics::new(&mut metric_registry));

let prometheus_task = metrics_endpoints_are_specified
.then(|| {
start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metric_registry),
)
})
.transpose()?;
let mut metrics_registry = Registry::default();
let dsn_metrics_registry =
metrics_endpoints_are_specified.then_some(&mut metrics_registry);

let known_peers_registry_config = KnownPeersManagerConfig {
enable_known_peers_source: false,
Expand Down Expand Up @@ -193,10 +183,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
bootstrap_addresses: bootstrap_nodes,
kademlia_mode: KademliaMode::Static(Mode::Server),
external_addresses,
metrics,
networking_parameters_registry: known_peers_registry.boxed(),

..Config::new(protocol_version.to_string(), keypair, ())
..Config::new(
protocol_version.to_string(),
keypair,
(),
dsn_metrics_registry,
)
};
let (node, mut node_runner) =
subspace_networking::construct(config).expect("Networking stack creation failed.");
Expand All @@ -214,6 +208,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
.detach();

info!("Subspace Bootstrap Node started");

let prometheus_task = metrics_endpoints_are_specified
.then(|| {
start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metrics_registry),
)
})
.transpose()?;
if let Some(prometheus_task) = prometheus_task {
select! {
_ = node_runner.run().fuse() => {},
Expand Down
31 changes: 26 additions & 5 deletions crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::protocols::request_response::request_response_factory::RequestHandler
use crate::protocols::reserved_peers::Config as ReservedPeersConfig;
use crate::shared::Shared;
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::strip_peer_id;
use crate::utils::{strip_peer_id, SubspaceMetrics};
use backoff::{ExponentialBackoff, SystemClock};
use futures::channel::mpsc;
use libp2p::autonat::Config as AutonatConfig;
Expand All @@ -32,6 +32,7 @@ use libp2p::multiaddr::Protocol;
use libp2p::yamux::Config as YamuxConfig;
use libp2p::{identity, Multiaddr, PeerId, StreamProtocol, SwarmBuilder, TransportError};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::borrow::Cow;
use std::iter::Empty;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -229,8 +230,10 @@ pub struct Config<LocalRecordProvider> {
pub temporary_bans_cache_size: NonZeroUsize,
/// Backoff policy for temporary banning of unreachable peers.
pub temporary_ban_backoff: ExponentialBackoff,
/// Optional external prometheus metrics. None will disable metrics gathering.
pub metrics: Option<Metrics>,
/// Optional libp2p prometheus metrics. None will disable metrics gathering.
pub libp2p_metrics: Option<Metrics>,
/// Internal prometheus metrics. None will disable metrics gathering.
pub metrics: Option<SubspaceMetrics>,
/// Defines protocol version for the network peers. Affects network partition.
pub protocol_version: String,
/// Addresses to bootstrap Kademlia network
Expand Down Expand Up @@ -258,7 +261,12 @@ impl Default for Config<()> {
let ed25519_keypair = identity::ed25519::Keypair::generate();
let keypair = identity::Keypair::from(ed25519_keypair);

Self::new(DEFAULT_NETWORK_PROTOCOL_VERSION.to_string(), keypair, ())
Self::new(
DEFAULT_NETWORK_PROTOCOL_VERSION.to_string(),
keypair,
(),
None,
)
}
}

Expand All @@ -271,7 +279,17 @@ where
protocol_version: String,
keypair: identity::Keypair,
local_records_provider: LocalRecordProvider,
prometheus_registry: Option<&mut Registry>,
) -> Self {
let (libp2p_metrics, metrics) = prometheus_registry
.map(|registry| {
(
Some(Metrics::new(registry)),
Some(SubspaceMetrics::new(registry)),
)
})
.unwrap_or((None, None));

let mut kademlia = KademliaConfig::default();
kademlia
.set_query_timeout(KADEMLIA_QUERY_TIMEOUT)
Expand Down Expand Up @@ -344,7 +362,8 @@ where
max_pending_outgoing_connections: SWARM_MAX_PENDING_OUTGOING_CONNECTIONS,
temporary_bans_cache_size: TEMPORARY_BANS_CACHE_SIZE,
temporary_ban_backoff,
metrics: None,
libp2p_metrics,
metrics,
protocol_version,
bootstrap_addresses: Vec::new(),
kademlia_mode: KademliaMode::Static(Mode::Client),
Expand Down Expand Up @@ -407,6 +426,7 @@ where
max_pending_outgoing_connections,
temporary_bans_cache_size,
temporary_ban_backoff,
libp2p_metrics,
metrics,
protocol_version,
bootstrap_addresses,
Expand Down Expand Up @@ -569,6 +589,7 @@ where
networking_parameters_registry,
reserved_peers: strip_peer_id(reserved_peers).into_iter().collect(),
temporary_bans,
libp2p_metrics,
metrics,
protocol_version,
bootstrap_addresses,
Expand Down
23 changes: 18 additions & 5 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::protocols::request_response::request_response_factory::{
};
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::utils::{is_global_address_or_dns, strip_peer_id};
use crate::utils::{is_global_address_or_dns, strip_peer_id, SubspaceMetrics};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand Down Expand Up @@ -116,8 +116,10 @@ where
reserved_peers: HashMap<PeerId, Multiaddr>,
/// Temporarily banned peers.
temporary_bans: Arc<Mutex<TemporaryBans>>,
/// Prometheus metrics.
metrics: Option<Metrics>,
/// Libp2p Prometheus metrics.
libp2p_metrics: Option<Metrics>,
/// Subspace Prometheus metrics.
metrics: Option<SubspaceMetrics>,
/// Mapping from specific peer to ip addresses
peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
/// Defines protocol version for the network peers. Affects network partition.
Expand Down Expand Up @@ -159,7 +161,8 @@ where
pub(crate) networking_parameters_registry: Box<dyn KnownPeersRegistry>,
pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
pub(crate) metrics: Option<Metrics>,
pub(crate) libp2p_metrics: Option<Metrics>,
pub(crate) metrics: Option<SubspaceMetrics>,
pub(crate) protocol_version: String,
pub(crate) bootstrap_addresses: Vec<Multiaddr>,
pub(crate) disable_bootstrap_on_start: bool,
Expand All @@ -180,6 +183,7 @@ where
mut networking_parameters_registry,
reserved_peers,
temporary_bans,
libp2p_metrics,
metrics,
protocol_version,
bootstrap_addresses,
Expand Down Expand Up @@ -216,6 +220,7 @@ where
networking_parameters_registry,
reserved_peers,
temporary_bans,
libp2p_metrics,
metrics,
peer_ip_addresses: HashMap::new(),
protocol_version,
Expand Down Expand Up @@ -512,6 +517,10 @@ where
if num_established.get() == 1 {
shared.handlers.connected_peer.call_simple(&peer_id);
}

if let Some(metrics) = self.metrics.as_mut() {
metrics.inc_established_connections()
}
}
SwarmEvent::ConnectionClosed {
peer_id,
Expand Down Expand Up @@ -556,6 +565,10 @@ where
if num_established == 0 {
shared.handlers.disconnected_peer.call_simple(&peer_id);
}

if let Some(metrics) = self.metrics.as_mut() {
metrics.dec_established_connections()
};
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = &peer_id {
Expand Down Expand Up @@ -1448,7 +1461,7 @@ where
}

fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
if let Some(ref mut metrics) = self.metrics {
if let Some(ref mut metrics) = self.libp2p_metrics {
match swarm_event {
SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
metrics.record(ping_event);
Expand Down
Loading

0 comments on commit bc8af4b

Please sign in to comment.