From bcd8c4b3fe012365298ec63b31e9fcf8bd0f898f Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Fri, 17 Jan 2025 16:51:52 +0530 Subject: [PATCH] feat(metrics): expose health metrics and don't disconnect on low success rate --- ant-networking/src/driver.rs | 8 +++ ant-networking/src/metrics/mod.rs | 8 +++ ant-networking/src/relay_manager.rs | 102 +++++++--------------------- 3 files changed, 39 insertions(+), 79 deletions(-) diff --git a/ant-networking/src/driver.rs b/ant-networking/src/driver.rs index bb1637a099..0f2271098b 100644 --- a/ant-networking/src/driver.rs +++ b/ant-networking/src/driver.rs @@ -704,6 +704,14 @@ impl NetworkBuilder { // Enable relay manager for nodes behind home network let relay_manager = if !is_client && self.is_behind_home_network { let relay_manager = RelayManager::new(peer_id); + #[cfg(feature = "open-metrics")] + let mut relay_manager = relay_manager; + #[cfg(feature = "open-metrics")] + if let Some(metrics_recorder) = &metrics_recorder { + relay_manager.set_reservation_health_metrics( + metrics_recorder.relay_reservation_health.clone(), + ); + } Some(relay_manager) } else { info!("Relay manager is disabled for this node."); diff --git a/ant-networking/src/metrics/mod.rs b/ant-networking/src/metrics/mod.rs index 9b94190973..7c0fbd9235 100644 --- a/ant-networking/src/metrics/mod.rs +++ b/ant-networking/src/metrics/mod.rs @@ -46,6 +46,7 @@ pub(crate) struct NetworkMetricsRecorder { pub(crate) open_connections: Gauge, pub(crate) peers_in_routing_table: Gauge, pub(crate) records_stored: Gauge, + pub(crate) relay_reservation_health: Gauge, // quoting metrics relevant_records: Gauge, @@ -86,6 +87,12 @@ impl NetworkMetricsRecorder { "The number of records stored locally", records_stored.clone(), ); + let relay_reservation_health = Gauge::::default(); + sub_registry.register( + "relay_reservation_health", + "The average health of all the relay reservation connections. Value is between 0-1", + relay_reservation_health.clone(), + ); let connected_peers = Gauge::default(); sub_registry.register( @@ -216,6 +223,7 @@ impl NetworkMetricsRecorder { estimated_network_size, connected_peers, open_connections, + relay_reservation_health, peers_in_routing_table, relevant_records, max_records, diff --git a/ant-networking/src/relay_manager.rs b/ant-networking/src/relay_manager.rs index ed933e24d0..6c491c37e3 100644 --- a/ant-networking/src/relay_manager.rs +++ b/ant-networking/src/relay_manager.rs @@ -12,7 +12,11 @@ use libp2p::{ core::transport::ListenerId, multiaddr::Protocol, swarm::ConnectionId, Multiaddr, PeerId, StreamProtocol, Swarm, }; +#[cfg(feature = "open-metrics")] +use prometheus_client::metrics::gauge::Gauge; use rand::Rng; +#[cfg(feature = "open-metrics")] +use std::sync::atomic::AtomicU64; use std::{ collections::{btree_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}, time::{Instant, SystemTime}, @@ -66,6 +70,9 @@ struct RelayReservationHealth { incoming_connections_from_remote_peer: BTreeMap, /// A rolling window of reservation score per relay server. reservation_score: BTreeMap, + /// To track the avg health of all the reservations. + #[cfg(feature = "open-metrics")] + relay_reservation_health_metric: Option>, } #[derive(Debug, Default, Clone)] @@ -91,23 +98,6 @@ impl ReservationStat { success as f64 / (success + error) as f64 } } - - fn is_faulty(&self) -> bool { - let success = self.stat.iter().filter(|s| **s).count(); - let error = self.stat.len() - success; - - // Give the relay server a chance to prove itself - if success + error < 30 { - return false; - } - - // Still give the relay server a chance to prove itself - if error + error < 100 { - return self.success_rate() < 0.5; - } - - self.success_rate() < 0.9 - } } impl RelayManager { @@ -122,6 +112,11 @@ impl RelayManager { } } + #[cfg(feature = "open-metrics")] + pub(crate) fn set_reservation_health_metrics(&mut self, gauge: Gauge) { + self.reservation_health.relay_reservation_health_metric = Some(gauge); + } + /// Should we keep this peer alive? Closing a connection to that peer would remove that server from the listen addr. pub(crate) fn keep_alive_peer(&self, peer_id: &PeerId) -> bool { self.connected_relay_servers.contains_key(peer_id) @@ -164,8 +159,6 @@ impl RelayManager { swarm: &mut Swarm, bad_nodes: &BadNodes, ) { - self.remove_faulty_relay_servers(swarm); - if self.connected_relay_servers.len() >= MAX_CONCURRENT_RELAY_CONNECTIONS || self.relay_server_candidates.is_empty() { @@ -284,52 +277,6 @@ impl RelayManager { } } - /// Remove the faulty relay server. - fn remove_faulty_relay_servers(&mut self, swarm: &mut Swarm) { - let faulty_relay_servers = self - .reservation_health - .reservation_score - .iter() - .filter(|(_, stat)| stat.is_faulty()) - .map(|(peer_id, stat)| (*peer_id, stat.clone())) - .collect_vec(); - - for (relay_server, score) in faulty_relay_servers { - let Some(listener_id) = - self.relayed_listener_id_map - .iter() - .find_map(|(id, id_peer)| { - if *id_peer == relay_server { - Some(*id) - } else { - None - } - }) - else { - error!("Could not find the listener id for the relay server {relay_server:?}"); - continue; - }; - - info!( - "Removing faulty relay server: {relay_server:?} on {listener_id:?} with score: {}", - score.success_rate() - ); - debug!("Removing faulty relay server {relay_server:?} on {listener_id:?}, {score:?}"); - - let result = swarm.remove_listener(listener_id); - info!("Result of removing listener: {result:?}"); - - self.on_listener_closed(&listener_id, swarm); - - self.reservation_health - .reservation_score - .remove(&relay_server); - } - - self.reservation_health - .cleanup_stats(&self.connected_relay_servers); - } - /// Track the incoming connections to monitor the health of a reservation. pub(crate) fn on_incoming_connection( &mut self, @@ -577,22 +524,19 @@ impl RelayReservationHealth { .remove(&from_peer); } - self.log_reservation_score(); - } - - /// Clean up the stats for relay servers that we are no longer connected to. - fn cleanup_stats(&mut self, connected_relay_servers: &BTreeMap) { - let mut to_remove = Vec::new(); - for (relay_server, _) in self.reservation_score.iter() { - if !connected_relay_servers.contains_key(relay_server) { - to_remove.push(*relay_server); - } + #[cfg(feature = "open-metrics")] + if let Some(metric) = &self.relay_reservation_health_metric { + // calculate avg health of all the reservations + let avg_health = self + .reservation_score + .values() + .map(|stat| stat.success_rate()) + .sum::() + / self.reservation_score.len() as f64; + metric.set(avg_health); } - for relay_server in to_remove { - debug!("Removing {relay_server:?} from the reservation_score as we are no longer connected to it."); - self.reservation_score.remove(&relay_server); - } + self.log_reservation_score(); } fn log_reservation_score(&self) {