Skip to content

Commit

Permalink
feat(metrics): expose health metrics and don't disconnect on low succ…
Browse files Browse the repository at this point in the history
…ess rate
  • Loading branch information
RolandSherwin committed Jan 17, 2025
1 parent 1ce8fc6 commit bcd8c4b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 79 deletions.
8 changes: 8 additions & 0 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,14 @@ impl NetworkBuilder {
// Enable relay manager for nodes behind home network
let relay_manager = if !is_client && self.is_behind_home_network {
let relay_manager = RelayManager::new(peer_id);
#[cfg(feature = "open-metrics")]
let mut relay_manager = relay_manager;
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &metrics_recorder {
relay_manager.set_reservation_health_metrics(
metrics_recorder.relay_reservation_health.clone(),
);
}
Some(relay_manager)
} else {
info!("Relay manager is disabled for this node.");
Expand Down
8 changes: 8 additions & 0 deletions ant-networking/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub(crate) struct NetworkMetricsRecorder {
pub(crate) open_connections: Gauge,
pub(crate) peers_in_routing_table: Gauge,
pub(crate) records_stored: Gauge,
pub(crate) relay_reservation_health: Gauge<f64, AtomicU64>,

// quoting metrics
relevant_records: Gauge,
Expand Down Expand Up @@ -86,6 +87,12 @@ impl NetworkMetricsRecorder {
"The number of records stored locally",
records_stored.clone(),
);
let relay_reservation_health = Gauge::<f64, AtomicU64>::default();
sub_registry.register(
"relay_reservation_health",
"The average health of all the relay reservation connections. Value is between 0-1",
relay_reservation_health.clone(),
);

let connected_peers = Gauge::default();
sub_registry.register(
Expand Down Expand Up @@ -216,6 +223,7 @@ impl NetworkMetricsRecorder {
estimated_network_size,
connected_peers,
open_connections,
relay_reservation_health,
peers_in_routing_table,
relevant_records,
max_records,
Expand Down
102 changes: 23 additions & 79 deletions ant-networking/src/relay_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use libp2p::{
core::transport::ListenerId, multiaddr::Protocol, swarm::ConnectionId, Multiaddr, PeerId,
StreamProtocol, Swarm,
};
#[cfg(feature = "open-metrics")]
use prometheus_client::metrics::gauge::Gauge;
use rand::Rng;
#[cfg(feature = "open-metrics")]
use std::sync::atomic::AtomicU64;
use std::{
collections::{btree_map::Entry, BTreeMap, HashMap, HashSet, VecDeque},
time::{Instant, SystemTime},
Expand Down Expand Up @@ -66,6 +70,9 @@ struct RelayReservationHealth {
incoming_connections_from_remote_peer: BTreeMap<PeerId, ConnectionsFromPeer>,
/// A rolling window of reservation score per relay server.
reservation_score: BTreeMap<PeerId, ReservationStat>,
/// To track the avg health of all the reservations.
#[cfg(feature = "open-metrics")]
relay_reservation_health_metric: Option<Gauge<f64, AtomicU64>>,
}

#[derive(Debug, Default, Clone)]
Expand All @@ -91,23 +98,6 @@ impl ReservationStat {
success as f64 / (success + error) as f64
}
}

fn is_faulty(&self) -> bool {
let success = self.stat.iter().filter(|s| **s).count();
let error = self.stat.len() - success;

// Give the relay server a chance to prove itself
if success + error < 30 {
return false;
}

// Still give the relay server a chance to prove itself
if error + error < 100 {
return self.success_rate() < 0.5;
}

self.success_rate() < 0.9
}
}

impl RelayManager {
Expand All @@ -122,6 +112,11 @@ impl RelayManager {
}
}

#[cfg(feature = "open-metrics")]
pub(crate) fn set_reservation_health_metrics(&mut self, gauge: Gauge<f64, AtomicU64>) {
self.reservation_health.relay_reservation_health_metric = Some(gauge);
}

/// Should we keep this peer alive? Closing a connection to that peer would remove that server from the listen addr.
pub(crate) fn keep_alive_peer(&self, peer_id: &PeerId) -> bool {
self.connected_relay_servers.contains_key(peer_id)
Expand Down Expand Up @@ -164,8 +159,6 @@ impl RelayManager {
swarm: &mut Swarm<NodeBehaviour>,
bad_nodes: &BadNodes,
) {
self.remove_faulty_relay_servers(swarm);

if self.connected_relay_servers.len() >= MAX_CONCURRENT_RELAY_CONNECTIONS
|| self.relay_server_candidates.is_empty()
{
Expand Down Expand Up @@ -284,52 +277,6 @@ impl RelayManager {
}
}

/// Remove the faulty relay server.
fn remove_faulty_relay_servers(&mut self, swarm: &mut Swarm<NodeBehaviour>) {
let faulty_relay_servers = self
.reservation_health
.reservation_score
.iter()
.filter(|(_, stat)| stat.is_faulty())
.map(|(peer_id, stat)| (*peer_id, stat.clone()))
.collect_vec();

for (relay_server, score) in faulty_relay_servers {
let Some(listener_id) =
self.relayed_listener_id_map
.iter()
.find_map(|(id, id_peer)| {
if *id_peer == relay_server {
Some(*id)
} else {
None
}
})
else {
error!("Could not find the listener id for the relay server {relay_server:?}");
continue;
};

info!(
"Removing faulty relay server: {relay_server:?} on {listener_id:?} with score: {}",
score.success_rate()
);
debug!("Removing faulty relay server {relay_server:?} on {listener_id:?}, {score:?}");

let result = swarm.remove_listener(listener_id);
info!("Result of removing listener: {result:?}");

self.on_listener_closed(&listener_id, swarm);

self.reservation_health
.reservation_score
.remove(&relay_server);
}

self.reservation_health
.cleanup_stats(&self.connected_relay_servers);
}

/// Track the incoming connections to monitor the health of a reservation.
pub(crate) fn on_incoming_connection(
&mut self,
Expand Down Expand Up @@ -577,22 +524,19 @@ impl RelayReservationHealth {
.remove(&from_peer);
}

self.log_reservation_score();
}

/// Clean up the stats for relay servers that we are no longer connected to.
fn cleanup_stats(&mut self, connected_relay_servers: &BTreeMap<PeerId, Multiaddr>) {
let mut to_remove = Vec::new();
for (relay_server, _) in self.reservation_score.iter() {
if !connected_relay_servers.contains_key(relay_server) {
to_remove.push(*relay_server);
}
#[cfg(feature = "open-metrics")]
if let Some(metric) = &self.relay_reservation_health_metric {
// calculate avg health of all the reservations
let avg_health = self
.reservation_score
.values()
.map(|stat| stat.success_rate())
.sum::<f64>()
/ self.reservation_score.len() as f64;
metric.set(avg_health);
}

for relay_server in to_remove {
debug!("Removing {relay_server:?} from the reservation_score as we are no longer connected to it.");
self.reservation_score.remove(&relay_server);
}
self.log_reservation_score();
}

fn log_reservation_score(&self) {
Expand Down

0 comments on commit bcd8c4b

Please sign in to comment.