Skip to content

Commit

Permalink
chore(metrics): not holding system instance too long
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Jan 14, 2025
1 parent 00c2c94 commit 847eca0
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 77 deletions.
62 changes: 33 additions & 29 deletions ant-logging/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,41 +44,45 @@ struct ProcessMetrics {
// Obtains the system metrics every UPDATE_INTERVAL and logs it.
// The function should be spawned as a task and should be re-run if our main process is restarted.
pub async fn init_metrics(pid: u32) {
let mut sys = System::new_all();
let mut networks = Networks::new_with_refreshed_list();
let pid = Pid::from_u32(pid);

loop {
refresh_metrics(&mut sys, &mut networks, pid);
{
let mut sys = System::new_all();
refresh_metrics(&mut sys, &mut networks, pid);
tokio::time::sleep(Duration::from_millis(300)).await;
refresh_metrics(&mut sys, &mut networks, pid);

let process = match sys.process(pid) {
Some(antnode) => {
let disk_usage = antnode.disk_usage();
let process = ProcessMetrics {
cpu_usage_percent: antnode.cpu_usage(),
memory_used_mb: antnode.memory() / TO_MB,
bytes_read: disk_usage.read_bytes,
bytes_written: disk_usage.written_bytes,
total_mb_read: disk_usage.total_read_bytes / TO_MB,
total_mb_written: disk_usage.total_written_bytes / TO_MB,
};
Some(process)
}
None => {
// antnode with the provided Pid not found
None
}
};
let process = match sys.process(pid) {
Some(antnode) => {
let disk_usage = antnode.disk_usage();
let process = ProcessMetrics {
cpu_usage_percent: antnode.cpu_usage(),
memory_used_mb: antnode.memory() / TO_MB,
bytes_read: disk_usage.read_bytes,
bytes_written: disk_usage.written_bytes,
total_mb_read: disk_usage.total_read_bytes / TO_MB,
total_mb_written: disk_usage.total_written_bytes / TO_MB,
};
Some(process)
}
None => {
// antnode with the provided Pid not found
None
}
};

let system_cpu_usage_percent = sys.global_cpu_usage();
let metrics = Metrics {
physical_cpu_threads: sys.cpus().len(),
system_cpu_usage_percent,
process,
};
match serde_json::to_string(&metrics) {
Ok(metrics) => debug!("{metrics}"),
Err(err) => error!("Metrics error, could not serialize to JSON {err}"),
let system_cpu_usage_percent = sys.global_cpu_usage();
let metrics = Metrics {
physical_cpu_threads: sys.cpus().len(),
system_cpu_usage_percent,
process,
};
match serde_json::to_string(&metrics) {
Ok(metrics) => debug!("{metrics}"),
Err(err) => error!("Metrics error, could not serialize to JSON {err}"),
}
}

tokio::time::sleep(UPDATE_INTERVAL).await;
Expand Down
42 changes: 25 additions & 17 deletions ant-networking/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,27 +246,35 @@ impl NetworkMetricsRecorder {

let pid = Pid::from_u32(std::process::id());
let process_refresh_kind = ProcessRefreshKind::everything().without_disk_usage();
let mut system = System::new_all();
let physical_core_count = system.physical_core_count();

tokio::spawn(async move {
loop {
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
true,
process_refresh_kind,
);
if let (Some(process), Some(core_count)) =
(system.process(pid), physical_core_count)
{
let mem_used =
((process.memory() as f64 / TO_MB as f64) * 10000.0).round() / 10000.0;
let _ = process_memory_used_mb.set(mem_used);
// divide by core_count to get value between 0-100
let cpu_usage = ((process.cpu_usage() as f64 / core_count as f64) * 10000.0)
.round()
/ 10000.0;
let _ = process_cpu_usage_percentage.set(cpu_usage);
let mut system = System::new_all();
let physical_core_count = system.physical_core_count();
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
true,
process_refresh_kind,
);
tokio::time::sleep(Duration::from_millis(300)).await;
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
true,
process_refresh_kind,
);
if let (Some(process), Some(core_count)) =
(system.process(pid), physical_core_count)
{
let mem_used =
((process.memory() as f64 / TO_MB as f64) * 10000.0).round() / 10000.0;
let _ = process_memory_used_mb.set(mem_used);
// divide by core_count to get value between 0-100
let cpu_usage =
((process.cpu_usage() as f64 / core_count as f64) * 10000.0).round()
/ 10000.0;
let _ = process_cpu_usage_percentage.set(cpu_usage);
}
}
sleep(UPDATE_INTERVAL).await;
}
Expand Down
73 changes: 42 additions & 31 deletions ant-node/src/bin/antnode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,48 +434,59 @@ You can check your reward balance by running:
let process_refresh_kind = ProcessRefreshKind::everything()
.without_disk_usage()
.without_memory();
let mut system = System::new_all();
let physical_core_count = system.physical_core_count();

// Random initial delay between 1 and 5 minutes
let initial_delay =
Duration::from_secs(thread_rng().gen_range(INITIAL_DELAY_MIN_S..=INITIAL_DELAY_MAX_S));
tokio::time::sleep(initial_delay).await;

loop {
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
true,
process_refresh_kind,
);
if let (Some(process), Some(core_count)) = (system.process(pid), physical_core_count) {
// divide by core_count to get value between 0-100
let cpu_usage =
((process.cpu_usage() as f64 / core_count as f64) * 10000.0).round() / 10000.0;
info!(
"Detected process {pid} CPU usage is {cpu_usage:?} (with {core_count} cores)"
{
let mut system = System::new_all();
let physical_core_count = system.physical_core_count();
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
true,
process_refresh_kind,
);
tokio::time::sleep(Duration::from_millis(300)).await;
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
true,
process_refresh_kind,
);
if let (Some(process), Some(core_count)) =
(system.process(pid), physical_core_count)
{
// divide by core_count to get value between 0-100
let cpu_usage = ((process.cpu_usage() as f64 / core_count as f64) * 10000.0)
.round()
/ 10000.0;
info!(
"Detected process {pid} CPU usage is {cpu_usage:?} (with {core_count} cores)"
);

if cpu_usage > CPU_USAGE_THRESHOLD {
high_cpu_count += 1;
} else {
high_cpu_count = 0;
}

if cpu_usage > CPU_USAGE_THRESHOLD {
high_cpu_count += 1;
} else {
high_cpu_count = 0;
}

if high_cpu_count >= HIGH_CPU_CONSECUTIVE_LIMIT {
if let Err(err) = ctrl_tx_clone_cpu
.send(NodeCtrl::Stop {
delay: NODE_STOP_DELAY,
result: StopResult::Success(format!("Excess host CPU %{CPU_USAGE_THRESHOLD} detected for {HIGH_CPU_CONSECUTIVE_LIMIT} consecutive minutes!")),
})
.await
{
error!("Failed to send node control msg to antnode bin main thread: {err}");
if high_cpu_count >= HIGH_CPU_CONSECUTIVE_LIMIT {
if let Err(err) = ctrl_tx_clone_cpu
.send(NodeCtrl::Stop {
delay: NODE_STOP_DELAY,
result: StopResult::Success(format!("Excess host CPU %{CPU_USAGE_THRESHOLD} detected for {HIGH_CPU_CONSECUTIVE_LIMIT} consecutive minutes!")),
})
.await
{
error!("Failed to send node control msg to antnode bin main thread: {err}");
}
break;
}
break;
} else {
error!("Cann't refresh systeminfo of process {pid} with OS core_count of {physical_core_count:?}");
}
} else {
error!("Cann't refresh systeminfo of process {pid} with OS core_count of {physical_core_count:?}");
}
// Add jitter to the interval
let jitter = Duration::from_secs(thread_rng().gen_range(JITTER_MIN_S..=JITTER_MAX_S));
Expand Down

0 comments on commit 847eca0

Please sign in to comment.