Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): expose connected_peers, peers_in_rt and uptime metrics #1896

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sn_networking/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl SwarmDriver {
) -> Option<Interval> {
let (should_bootstrap, new_interval) = self
.bootstrap
.should_we_bootstrap(self.connected_peers as u32, current_bootstrap_interval)
.should_we_bootstrap(self.peers_in_rt as u32, current_bootstrap_interval)
.await;
if should_bootstrap {
self.trigger_network_discovery();
Expand Down
7 changes: 1 addition & 6 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,12 +827,7 @@ impl SwarmDriver {
if *is_bad {
warn!("Cleaning out bad_peer {peer_id:?}");
if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
self.connected_peers = self.connected_peers.saturating_sub(1);
self.send_event(NetworkEvent::PeerRemoved(
*dead_peer.node.key.preimage(),
self.connected_peers,
));
self.log_kbuckets(&peer_id);
self.update_on_peer_removal(*dead_peer.node.key.preimage());
let _ = self.check_for_change_in_our_close_group();
}

Expand Down
4 changes: 2 additions & 2 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ impl NetworkBuilder {
listen_port: self.listen_addr.map(|addr| addr.port()),
is_client,
is_behind_home_network: self.is_behind_home_network,
connected_peers: 0,
peers_in_rt: 0,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tracks the total number of peers in the RT rather than the "connections". So renamed it for clarity.

bootstrap,
relay_manager,
close_group: Default::default(),
Expand Down Expand Up @@ -637,7 +637,7 @@ pub struct SwarmDriver {
pub(crate) is_behind_home_network: bool,
/// The port that was set by the user
pub(crate) listen_port: Option<u16>,
pub(crate) connected_peers: usize,
pub(crate) peers_in_rt: usize,
pub(crate) bootstrap: ContinuousBootstrap,
pub(crate) relay_manager: RelayManager,
/// The peers that are closer to our PeerId. Includes self.
Expand Down
17 changes: 5 additions & 12 deletions sn_networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use crate::{
driver::PendingGetClosestType, get_quorum_value, GetRecordCfg, GetRecordError, NetworkError,
NetworkEvent, Result, SwarmDriver, CLOSE_GROUP_SIZE,
Result, SwarmDriver, CLOSE_GROUP_SIZE,
};
use itertools::Itertools;
use libp2p::kad::{
Expand Down Expand Up @@ -245,26 +245,19 @@ impl SwarmDriver {
} => {
event_string = "kad_event::RoutingUpdated";
if is_new_peer {
self.connected_peers = self.connected_peers.saturating_add(1);

info!("New peer added to routing table: {peer:?}, now we have #{} connected peers", self.connected_peers);
self.log_kbuckets(&peer);
self.update_on_peer_addition(peer);

// This should only happen once
if self.bootstrap.notify_new_peer() {
info!("Performing the first bootstrap");
self.trigger_network_discovery();
}
self.send_event(NetworkEvent::PeerAdded(peer, self.connected_peers));
}

info!("kad_event::RoutingUpdated {:?}: {peer:?}, is_new_peer: {is_new_peer:?} old_peer: {old_peer:?}", self.connected_peers);
if old_peer.is_some() {
self.connected_peers = self.connected_peers.saturating_sub(1);

info!("kad_event::RoutingUpdated {:?}: {peer:?}, is_new_peer: {is_new_peer:?} old_peer: {old_peer:?}", self.peers_in_rt);
if let Some(old_peer) = old_peer {
info!("Evicted old peer on new peer join: {old_peer:?}");
self.send_event(NetworkEvent::PeerRemoved(peer, self.connected_peers));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were supposed to emit PeerRemoved(old_peer) instead of peer here.

self.log_kbuckets(&peer);
self.update_on_peer_removal(old_peer);
}
let _ = self.check_for_change_in_our_close_group();
}
Expand Down
42 changes: 42 additions & 0 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,38 @@ impl SwarmDriver {
}
}

/// Update state on addition of a peer to the routing table.
pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId) {
self.peers_in_rt = self.peers_in_rt.saturating_add(1);
info!(
"New peer added to routing table: {added_peer:?}, now we have #{} connected peers",
self.peers_in_rt
);
self.log_kbuckets(&added_peer);
self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt));

#[cfg(feature = "open-metrics")]
if let Some(metrics) = &self.network_metrics {
metrics.peers_in_routing_table.set(self.peers_in_rt as i64);
}
}

/// Update state on removal of a peer from the routing table.
pub(crate) fn update_on_peer_removal(&mut self, removed_peer: PeerId) {
self.peers_in_rt = self.peers_in_rt.saturating_sub(1);
info!(
"Peer removed from routing table: {removed_peer:?}, now we have #{} connected peers",
self.peers_in_rt
);
self.log_kbuckets(&removed_peer);
self.send_event(NetworkEvent::PeerRemoved(removed_peer, self.peers_in_rt));

#[cfg(feature = "open-metrics")]
if let Some(metrics) = &self.network_metrics {
metrics.peers_in_routing_table.set(self.peers_in_rt as i64);
}
}

/// Logs the kbuckets also records the bucket info.
pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) {
let distance = NetworkAddress::from_peer(self.self_peer_id)
Expand Down Expand Up @@ -301,6 +333,16 @@ impl SwarmDriver {
.set(estimated_network_size as i64);
}

// Just to warn if our tracking goes out of sync with libp2p. Can happen if someone forgets to call
// update_on_peer_addition or update_on_peer_removal when adding or removing a peer.
// Only log every 10th peer to avoid spamming the logs.
if total_peers % 10 == 0 && total_peers != self.peers_in_rt {
warn!(
"Total peers in routing table: {}, but kbucket table has {total_peers} peers",
self.peers_in_rt
);
}

info!("kBucketTable has {index:?} kbuckets {total_peers:?} peers, {kbucket_table_stats:?}, estimated network size: {estimated_network_size:?}");
}

Expand Down
45 changes: 35 additions & 10 deletions sn_networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,15 @@ impl SwarmDriver {
connection_id,
(peer_id, Instant::now() + Duration::from_secs(60)),
);
#[cfg(feature = "open-metrics")]
if let Some(metrics) = &self.network_metrics {
metrics
.open_connections
.set(self.live_connected_peers.len() as i64);
metrics
.connected_peers
.set(self.swarm.connected_peers().count() as i64);
}

if endpoint.is_dialer() {
self.dialed_peers.push(peer_id);
Expand All @@ -396,6 +405,15 @@ impl SwarmDriver {
event_string = "ConnectionClosed";
trace!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint));
let _ = self.live_connected_peers.remove(&connection_id);
#[cfg(feature = "open-metrics")]
if let Some(metrics) = &self.network_metrics {
metrics
.open_connections
.set(self.live_connected_peers.len() as i64);
metrics
.connected_peers
.set(self.swarm.connected_peers().count() as i64);
}
}
SwarmEvent::OutgoingConnectionError {
connection_id,
Expand Down Expand Up @@ -445,7 +463,7 @@ impl SwarmDriver {
.any(|(_ilog2, peers)| peers.contains(&failed_peer_id));

if is_bootstrap_peer
&& self.connected_peers < self.bootstrap_peers.len()
&& self.peers_in_rt < self.bootstrap_peers.len()
{
warn!("OutgoingConnectionError: On bootstrap peer {failed_peer_id:?}, while still in bootstrap mode, ignoring");
there_is_a_serious_issue = false;
Expand Down Expand Up @@ -514,19 +532,13 @@ impl SwarmDriver {
.kademlia
.remove_peer(&failed_peer_id)
{
self.connected_peers = self.connected_peers.saturating_sub(1);
self.update_on_peer_removal(*dead_peer.node.key.preimage());

self.handle_cmd(SwarmCmd::RecordNodeIssue {
peer_id: failed_peer_id,
issue: crate::NodeIssue::ConnectionIssue,
})?;

self.send_event(NetworkEvent::PeerRemoved(
*dead_peer.node.key.preimage(),
self.connected_peers,
));

self.log_kbuckets(&failed_peer_id);
let _ = self.check_for_change_in_our_close_group();
}
}
Expand Down Expand Up @@ -630,12 +642,16 @@ impl SwarmDriver {
}
}
if let Some(to_be_removed_bootstrap) = shall_removed {
trace!("Bootstrap node {to_be_removed_bootstrap:?} to be replaced by peer {peer_id:?}");
let _entry = self
info!("Bootstrap node {to_be_removed_bootstrap:?} to be replaced by peer {peer_id:?}");
let entry = self
.swarm
.behaviour_mut()
.kademlia
.remove_peer(&to_be_removed_bootstrap);
if let Some(removed_peer) = entry {
self.update_on_peer_removal(*removed_peer.node.key.preimage());
let _ = self.check_for_change_in_our_close_group();
}
}
}

Expand Down Expand Up @@ -687,6 +703,15 @@ impl SwarmDriver {
for (connection_id, peer_id) in shall_removed {
let _ = self.live_connected_peers.remove(&connection_id);
let result = self.swarm.close_connection(connection_id);
#[cfg(feature = "open-metrics")]
if let Some(metrics) = &self.network_metrics {
metrics
.open_connections
.set(self.live_connected_peers.len() as i64);
metrics
.connected_peers
.set(self.swarm.connected_peers().count() as i64);
}
trace!("Removed outdated connection {connection_id:?} to {peer_id:?} with result: {result:?}");
}
}
Expand Down
27 changes: 26 additions & 1 deletion sn_networking/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ pub(crate) struct NetworkMetrics {
libp2p_metrics: Libp2pMetrics,

// metrics from sn_networking
pub(crate) records_stored: Gauge,
pub(crate) connected_peers: Gauge,
pub(crate) estimated_network_size: Gauge,
pub(crate) open_connections: Gauge,
pub(crate) peers_in_routing_table: Gauge,
pub(crate) records_stored: Gauge,
pub(crate) store_cost: Gauge,
#[cfg(feature = "upnp")]
pub(crate) upnp_events: Family<upnp::UpnpEventLabels, Counter>,
Expand All @@ -51,12 +54,31 @@ impl NetworkMetrics {
records_stored.clone(),
);

let connected_peers = Gauge::default();
sub_registry.register(
"connected_peers",
"The number of peers that we are currently connected to",
connected_peers.clone(),
);

let estimated_network_size = Gauge::default();
sub_registry.register(
"estimated_network_size",
"The estimated number of nodes in the network calculated by the peers in our RT",
estimated_network_size.clone(),
);
let open_connections = Gauge::default();
sub_registry.register(
"open_connections",
"The number of active connections to other peers",
open_connections.clone(),
);
let peers_in_routing_table = Gauge::default();
sub_registry.register(
"peers_in_routing_table",
"The total number of peers in our routing table",
peers_in_routing_table.clone(),
);
let store_cost = Gauge::default();
sub_registry.register(
"store_cost",
Expand Down Expand Up @@ -91,6 +113,9 @@ impl NetworkMetrics {
libp2p_metrics,
records_stored,
estimated_network_size,
connected_peers,
open_connections,
peers_in_routing_table,
store_cost,
#[cfg(feature = "upnp")]
upnp_events,
Expand Down
14 changes: 14 additions & 0 deletions sn_node/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use prometheus_client::{
},
registry::Registry,
};
use sn_networking::Instant;

#[derive(Clone)]
pub(crate) struct NodeMetrics {
Expand All @@ -35,6 +36,10 @@ pub(crate) struct NodeMetrics {
// wallet
pub(crate) current_reward_wallet_balance: Gauge,
pub(crate) total_forwarded_rewards: Gauge,

// to track the uptime of the node.
pub(crate) started_instant: Instant,
pub(crate) uptime: Gauge,
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -109,6 +114,13 @@ impl NodeMetrics {
total_forwarded_rewards.clone(),
);

let uptime = Gauge::default();
sub_registry.register(
"uptime",
"The uptime of the node in seconds",
uptime.clone(),
);

Self {
put_record_ok,
put_record_err,
Expand All @@ -118,6 +130,8 @@ impl NodeMetrics {
peer_removed_from_routing_table,
current_reward_wallet_balance,
total_forwarded_rewards,
started_instant: Instant::now(),
uptime,
}
}

Expand Down
Loading
Loading