From 847eca0c8f3a3b7cdeeab199b899c508fa9577c1 Mon Sep 17 00:00:00 2001 From: qima Date: Tue, 14 Jan 2025 21:26:31 +0800 Subject: [PATCH] chore(metrics): not holding system instance too long --- ant-logging/src/metrics.rs | 62 ++++++++++++++------------ ant-networking/src/metrics/mod.rs | 42 +++++++++++------- ant-node/src/bin/antnode/main.rs | 73 ++++++++++++++++++------------- 3 files changed, 100 insertions(+), 77 deletions(-) diff --git a/ant-logging/src/metrics.rs b/ant-logging/src/metrics.rs index 414a43281f..c0a13abc09 100644 --- a/ant-logging/src/metrics.rs +++ b/ant-logging/src/metrics.rs @@ -44,41 +44,45 @@ struct ProcessMetrics { // Obtains the system metrics every UPDATE_INTERVAL and logs it. // The function should be spawned as a task and should be re-run if our main process is restarted. pub async fn init_metrics(pid: u32) { - let mut sys = System::new_all(); let mut networks = Networks::new_with_refreshed_list(); let pid = Pid::from_u32(pid); loop { - refresh_metrics(&mut sys, &mut networks, pid); + { + let mut sys = System::new_all(); + refresh_metrics(&mut sys, &mut networks, pid); + tokio::time::sleep(Duration::from_millis(300)).await; + refresh_metrics(&mut sys, &mut networks, pid); - let process = match sys.process(pid) { - Some(antnode) => { - let disk_usage = antnode.disk_usage(); - let process = ProcessMetrics { - cpu_usage_percent: antnode.cpu_usage(), - memory_used_mb: antnode.memory() / TO_MB, - bytes_read: disk_usage.read_bytes, - bytes_written: disk_usage.written_bytes, - total_mb_read: disk_usage.total_read_bytes / TO_MB, - total_mb_written: disk_usage.total_written_bytes / TO_MB, - }; - Some(process) - } - None => { - // antnode with the provided Pid not found - None - } - }; + let process = match sys.process(pid) { + Some(antnode) => { + let disk_usage = antnode.disk_usage(); + let process = ProcessMetrics { + cpu_usage_percent: antnode.cpu_usage(), + memory_used_mb: antnode.memory() / TO_MB, + bytes_read: disk_usage.read_bytes, + bytes_written: disk_usage.written_bytes, + total_mb_read: disk_usage.total_read_bytes / TO_MB, + total_mb_written: disk_usage.total_written_bytes / TO_MB, + }; + Some(process) + } + None => { + // antnode with the provided Pid not found + None + } + }; - let system_cpu_usage_percent = sys.global_cpu_usage(); - let metrics = Metrics { - physical_cpu_threads: sys.cpus().len(), - system_cpu_usage_percent, - process, - }; - match serde_json::to_string(&metrics) { - Ok(metrics) => debug!("{metrics}"), - Err(err) => error!("Metrics error, could not serialize to JSON {err}"), + let system_cpu_usage_percent = sys.global_cpu_usage(); + let metrics = Metrics { + physical_cpu_threads: sys.cpus().len(), + system_cpu_usage_percent, + process, + }; + match serde_json::to_string(&metrics) { + Ok(metrics) => debug!("{metrics}"), + Err(err) => error!("Metrics error, could not serialize to JSON {err}"), + } } tokio::time::sleep(UPDATE_INTERVAL).await; diff --git a/ant-networking/src/metrics/mod.rs b/ant-networking/src/metrics/mod.rs index 6a411d44f7..eef95e3249 100644 --- a/ant-networking/src/metrics/mod.rs +++ b/ant-networking/src/metrics/mod.rs @@ -246,27 +246,35 @@ impl NetworkMetricsRecorder { let pid = Pid::from_u32(std::process::id()); let process_refresh_kind = ProcessRefreshKind::everything().without_disk_usage(); - let mut system = System::new_all(); - let physical_core_count = system.physical_core_count(); tokio::spawn(async move { loop { - system.refresh_processes_specifics( - ProcessesToUpdate::Some(&[pid]), - true, - process_refresh_kind, - ); - if let (Some(process), Some(core_count)) = - (system.process(pid), physical_core_count) { - let mem_used = - ((process.memory() as f64 / TO_MB as f64) * 10000.0).round() / 10000.0; - let _ = process_memory_used_mb.set(mem_used); - // divide by core_count to get value between 0-100 - let cpu_usage = ((process.cpu_usage() as f64 / core_count as f64) * 10000.0) - .round() - / 10000.0; - let _ = process_cpu_usage_percentage.set(cpu_usage); + let mut system = System::new_all(); + let physical_core_count = system.physical_core_count(); + system.refresh_processes_specifics( + ProcessesToUpdate::Some(&[pid]), + true, + process_refresh_kind, + ); + tokio::time::sleep(Duration::from_millis(300)).await; + system.refresh_processes_specifics( + ProcessesToUpdate::Some(&[pid]), + true, + process_refresh_kind, + ); + if let (Some(process), Some(core_count)) = + (system.process(pid), physical_core_count) + { + let mem_used = + ((process.memory() as f64 / TO_MB as f64) * 10000.0).round() / 10000.0; + let _ = process_memory_used_mb.set(mem_used); + // divide by core_count to get value between 0-100 + let cpu_usage = + ((process.cpu_usage() as f64 / core_count as f64) * 10000.0).round() + / 10000.0; + let _ = process_cpu_usage_percentage.set(cpu_usage); + } } sleep(UPDATE_INTERVAL).await; } diff --git a/ant-node/src/bin/antnode/main.rs b/ant-node/src/bin/antnode/main.rs index a29b9260ff..730d4944ce 100644 --- a/ant-node/src/bin/antnode/main.rs +++ b/ant-node/src/bin/antnode/main.rs @@ -434,8 +434,6 @@ You can check your reward balance by running: let process_refresh_kind = ProcessRefreshKind::everything() .without_disk_usage() .without_memory(); - let mut system = System::new_all(); - let physical_core_count = system.physical_core_count(); // Random initial delay between 1 and 5 minutes let initial_delay = @@ -443,39 +441,52 @@ You can check your reward balance by running: tokio::time::sleep(initial_delay).await; loop { - system.refresh_processes_specifics( - ProcessesToUpdate::Some(&[pid]), - true, - process_refresh_kind, - ); - if let (Some(process), Some(core_count)) = (system.process(pid), physical_core_count) { - // divide by core_count to get value between 0-100 - let cpu_usage = - ((process.cpu_usage() as f64 / core_count as f64) * 10000.0).round() / 10000.0; - info!( - "Detected process {pid} CPU usage is {cpu_usage:?} (with {core_count} cores)" + { + let mut system = System::new_all(); + let physical_core_count = system.physical_core_count(); + system.refresh_processes_specifics( + ProcessesToUpdate::Some(&[pid]), + true, + process_refresh_kind, ); + tokio::time::sleep(Duration::from_millis(300)).await; + system.refresh_processes_specifics( + ProcessesToUpdate::Some(&[pid]), + true, + process_refresh_kind, + ); + if let (Some(process), Some(core_count)) = + (system.process(pid), physical_core_count) + { + // divide by core_count to get value between 0-100 + let cpu_usage = ((process.cpu_usage() as f64 / core_count as f64) * 10000.0) + .round() + / 10000.0; + info!( + "Detected process {pid} CPU usage is {cpu_usage:?} (with {core_count} cores)" + ); + + if cpu_usage > CPU_USAGE_THRESHOLD { + high_cpu_count += 1; + } else { + high_cpu_count = 0; + } - if cpu_usage > CPU_USAGE_THRESHOLD { - high_cpu_count += 1; - } else { - high_cpu_count = 0; - } - - if high_cpu_count >= HIGH_CPU_CONSECUTIVE_LIMIT { - if let Err(err) = ctrl_tx_clone_cpu - .send(NodeCtrl::Stop { - delay: NODE_STOP_DELAY, - result: StopResult::Success(format!("Excess host CPU %{CPU_USAGE_THRESHOLD} detected for {HIGH_CPU_CONSECUTIVE_LIMIT} consecutive minutes!")), - }) - .await - { - error!("Failed to send node control msg to antnode bin main thread: {err}"); + if high_cpu_count >= HIGH_CPU_CONSECUTIVE_LIMIT { + if let Err(err) = ctrl_tx_clone_cpu + .send(NodeCtrl::Stop { + delay: NODE_STOP_DELAY, + result: StopResult::Success(format!("Excess host CPU %{CPU_USAGE_THRESHOLD} detected for {HIGH_CPU_CONSECUTIVE_LIMIT} consecutive minutes!")), + }) + .await + { + error!("Failed to send node control msg to antnode bin main thread: {err}"); + } + break; } - break; + } else { + error!("Cann't refresh systeminfo of process {pid} with OS core_count of {physical_core_count:?}"); } - } else { - error!("Cann't refresh systeminfo of process {pid} with OS core_count of {physical_core_count:?}"); } // Add jitter to the interval let jitter = Duration::from_secs(thread_rng().gen_range(JITTER_MIN_S..=JITTER_MAX_S));