From 770f53e626c462ababd2a5bdcaba8fc321ebe376 Mon Sep 17 00:00:00 2001 From: Gabriel Viganotti Date: Tue, 7 Jan 2025 23:09:04 -0300 Subject: [PATCH] feat: poll node status independently from metrics polling --- src/app.rs | 1 + src/bg_tasks.rs | 46 +++++++++++++++++++++++++++++++++++++++------- src/main.rs | 3 +++ src/server_api.rs | 1 + 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/src/app.rs b/src/app.rs index 8e90794..89a16da 100644 --- a/src/app.rs +++ b/src/app.rs @@ -100,6 +100,7 @@ pub struct ServerGlobalState { pub db_client: super::db_client::DbClient, pub docker_client: super::docker_client::DockerClient, pub latest_bin_version: Arc>>, + pub server_api_hit: Arc>, pub nodes_metrics: Arc>, pub node_status_locked: ImmutableNodeStatus, pub updated_settings_tx: broadcast::Sender, diff --git a/src/bg_tasks.rs b/src/bg_tasks.rs index 90daef7..8c59906 100644 --- a/src/bg_tasks.rs +++ b/src/bg_tasks.rs @@ -36,6 +36,9 @@ const BALANCE_QUERY_TIMEOUT: Duration = Duration::from_secs(10); // Timeout duration when querying metrics from each node. const NODE_METRICS_QUERY_TIMEOUT: Duration = Duration::from_secs(3); +// Frequency to poll node status from Docker engine +const NODE_STATUS_POLLING_FREQ: Duration = Duration::from_secs(5); + const LCD_LABEL_NET_SIZE: &str = "Network size:"; const LCD_LABEL_ACTIVE_NODES: &str = "Active nodes:"; const LCD_LABEL_STORED_RECORDS: &str = "Stored records:"; @@ -57,6 +60,7 @@ struct TasksContext { balances_retrieval: Interval, metrics_pruning: Interval, nodes_metrics_polling: Interval, + nodes_status_polling: Interval, app_settings: AppSettings, } @@ -68,6 +72,7 @@ impl TasksContext { balances_retrieval: interval(settings.rewards_balances_retrieval_freq), metrics_pruning: interval(METRICS_PRUNING_FREQ), nodes_metrics_polling: interval(settings.nodes_metrics_polling_freq), + nodes_status_polling: interval(NODE_STATUS_POLLING_FREQ), app_settings: settings, } } @@ -121,11 +126,13 @@ impl TasksContext { } // Spawn any required background tasks +#[allow(clippy::too_many_arguments)] pub fn spawn_bg_tasks( docker_client: DockerClient, latest_bin_version: Arc>>, nodes_metrics: Arc>, db_client: DbClient, + server_api_hit: Arc>, node_status_locked: ImmutableNodeStatus, mut updated_settings_rx: broadcast::Receiver, settings: AppSettings, @@ -245,6 +252,23 @@ pub fn spawn_bg_tasks( // reset interval to start next period from this instant, // regardless how long the above polling task lasted. ctx.nodes_metrics_polling.reset_after(ctx.nodes_metrics_polling.period()); + }, + _ = ctx.nodes_status_polling.tick() => { + // we poll node status only if a client is currently querying the API, + // and only if the metrics polling is not frequent enough + let api_hit = *server_api_hit.lock().await; + if !api_hit || 2 * ctx.nodes_status_polling.period() > ctx.nodes_metrics_polling.period() { + continue; + } + + *server_api_hit.lock().await = false; + match docker_client.get_containers_list(true).await { + Ok(containers) => for container in containers.into_iter() { + let node_info: NodeInstanceInfo = container.into(); + update_node_metadata(&node_info, &db_client, &node_status_locked).await; + }, + Err(err) => logging::log!("Failed to get containers list: {err}") + } } } } @@ -340,6 +364,20 @@ async fn latest_version_available() -> Option { None } +// Update node medata into local DB cache +async fn update_node_metadata( + node_info: &NodeInstanceInfo, + db_client: &DbClient, + node_status_locked: &ImmutableNodeStatus, +) { + let update_status = !node_status_locked + .is_still_locked(&node_info.container_id) + .await; + db_client + .update_node_metadata(node_info, update_status) + .await; +} + // Fetch up to date information for each active node instance // from nodes' exposed metrics server caching them in global context. async fn update_nodes_info( @@ -386,6 +424,7 @@ async fn update_nodes_info( let mut node_metrics = nodes_metrics.lock().await; node_metrics.store(&node_info.container_id, &metrics).await; node_metrics.update_node_info(&mut node_info); + update_node_metadata(&node_info, db_client, node_status_locked).await; } Ok(Err(err)) => { logging::log!("Failed to fetch metrics from node {node_short_id}: {err}"); @@ -397,13 +436,6 @@ async fn update_nodes_info( } } - let update_status = !node_status_locked - .is_still_locked(&node_info.container_id) - .await; - db_client - .update_node_metadata(&node_info, update_status) - .await; - net_size += node_info.connected_peers.unwrap_or_default() * node_info.net_size.unwrap_or_default(); weights += node_info.connected_peers.unwrap_or_default(); diff --git a/src/main.rs b/src/main.rs index 2a0d30a..0f065f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,12 +40,14 @@ async fn main() { let settings = db_client.get_settings().await; // List of node instaces batches currently in progress let node_instaces_batches = Arc::new(Mutex::new((broadcast::channel(3).0, Vec::new()))); + let server_api_hit = Arc::new(Mutex::new(false)); spawn_bg_tasks( docker_client.clone(), latest_bin_version.clone(), nodes_metrics.clone(), db_client.clone(), + server_api_hit.clone(), node_status_locked.clone(), updated_settings_rx, settings, @@ -56,6 +58,7 @@ async fn main() { db_client, docker_client, latest_bin_version, + server_api_hit, nodes_metrics, node_status_locked, updated_settings_tx, diff --git a/src/server_api.rs b/src/server_api.rs index e6975f5..4adcf94 100644 --- a/src/server_api.rs +++ b/src/server_api.rs @@ -41,6 +41,7 @@ pub async fn nodes_instances() -> Result { let context = expect_context::(); let latest_bin_version = context.latest_bin_version.lock().await.clone(); let containers = context.docker_client.get_containers_list(true).await?; + *context.server_api_hit.lock().await = true; let mut nodes = HashMap::new(); for container in containers {