diff --git a/Cargo.toml b/Cargo.toml index 8eb7b20..3d228de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ tonic = { version = "0.6.2", default-features = false, features = ["tls"], optio tokio = { version = "1", default-features = false, features = ["rt-multi-thread"], optional = true } tower = { version = "0.4", optional = true } tower-http = { version = "0.5", default-features = false, features = ["fs"], optional = true } -url = { version = "2", optional = true } +url = "2" wasm-bindgen = "=0.2.95" wasm-bindgen-futures = "0.4" @@ -69,7 +69,6 @@ ssr = [ "dep:tokio", "dep:tower", "dep:tower-http", - "dep:url", "leptos/ssr", "leptos_meta/ssr", "leptos_router/ssr", diff --git a/migrations/20241122023406_settings.sql b/migrations/20241122023406_settings.sql new file mode 100644 index 0000000..6743876 --- /dev/null +++ b/migrations/20241122023406_settings.sql @@ -0,0 +1,27 @@ +-- How often to check which is the latest version of the node binary. +ALTER TABLE settings ADD COLUMN node_bin_version_polling_freq_secs INTEGER; + +-- How often to query balances from the ledger. +ALTER TABLE settings ADD COLUMN rewards_balances_retrieval_freq_secs INTEGER; + +-- How often to fetch metrics and node info from active/running nodes +ALTER TABLE settings ADD COLUMN nodes_metrics_polling_freq_secs INTEGER; + +-- URL to send queries using RPC to get rewards addresses balances from L2 network. +ALTER TABLE settings ADD COLUMN l2_network_rpc_url TEXT; + +-- ERC20 token contract address. +ALTER TABLE settings ADD COLUMN token_contract_address TEXT; + + +UPDATE settings SET + -- Check latest version of node binary every couple of hours. + node_bin_version_polling_freq_secs = 60 * 60 * 2, + -- Check balances every 15mins. + rewards_balances_retrieval_freq_secs = 60 * 15, + -- Poll nodes metrics every 5 secs. + nodes_metrics_polling_freq_secs = 5, + -- Arbitrum Sepolia testnet. + l2_network_rpc_url = "https://sepolia-rollup.arbitrum.io/rpc", + -- ANT token contract on Arbitrum Sepolia testnet. + token_contract_address = "0xBE1802c27C324a28aeBcd7eeC7D734246C807194"; diff --git a/src/app.rs b/src/app.rs index 3350acd..4ab4742 100644 --- a/src/app.rs +++ b/src/app.rs @@ -18,7 +18,7 @@ use axum::extract::FromRef; #[cfg(feature = "ssr")] use std::{collections::HashSet, sync::Arc}; #[cfg(feature = "ssr")] -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; #[cfg(feature = "hydrate")] use gloo_timers::future::TimeoutFuture; @@ -26,7 +26,7 @@ use leptos::*; use leptos_meta::*; use leptos_router::*; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use wasm_bindgen::{prelude::*, JsValue}; #[wasm_bindgen(module = "/public/metamask.js")] @@ -35,10 +35,36 @@ extern "C" { } // Application settings values. -#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct AppSettings { pub nodes_auto_upgrade: bool, - pub nodes_auto_upgrade_delay_secs: u64, + pub nodes_auto_upgrade_delay: Duration, + pub node_bin_version_polling_freq: Duration, + pub nodes_metrics_polling_freq: Duration, + pub rewards_balances_retrieval_freq: Duration, + pub l2_network_rpc_url: String, + pub token_contract_address: String, +} + +impl Default for AppSettings { + fn default() -> Self { + Self { + // Node auto-upgrading is disabled by default. + nodes_auto_upgrade: false, + // Delay 10 secs. between each node being auto-upgraded. + nodes_auto_upgrade_delay: Duration::from_secs(10), + // Check latest version of node binary every couple of hours. + node_bin_version_polling_freq: Duration::from_secs(60 * 60 * 2), + // How often to fetch metrics and node info from active/running nodes + nodes_metrics_polling_freq: Duration::from_secs(5), + // Retrieve balances every 15 mins. + rewards_balances_retrieval_freq: Duration::from_secs(60 * 15), + // Arbitrum Sepolia testnet. + l2_network_rpc_url: "https://sepolia-rollup.arbitrum.io/rpc".to_string(), + // ANT token contract on Arbitrum Sepolia testnet. + token_contract_address: "0xBE1802c27C324a28aeBcd7eeC7D734246C807194".to_string(), + } + } } // Frequency in millis for nodes metrics polling @@ -59,6 +85,7 @@ pub struct ServerGlobalState { pub nodes_metrics: Arc>, pub server_api_hit: Arc>, pub node_status_locked: Arc>>, + pub updated_settings_tx: mpsc::Sender, } // Struct to use client side as a global context/state diff --git a/src/bg_tasks.rs b/src/bg_tasks.rs index 795bef5..1a5328a 100644 --- a/src/bg_tasks.rs +++ b/src/bg_tasks.rs @@ -1,5 +1,5 @@ use super::{ - app::{METRICS_MAX_SIZE_PER_CONTAINER, METRICS_POLLING_FREQ_MILLIS}, + app::{AppSettings, METRICS_MAX_SIZE_PER_CONTAINER}, db_client::DbClient, docker_client::DockerClient, metrics_client::{NodeMetricsClient, NodesMetrics}, @@ -20,36 +20,21 @@ use std::{ }; use tokio::{ select, - sync::Mutex, - time::{interval, sleep, Duration}, + sync::{mpsc, Mutex}, + time::{interval, sleep, timeout, Duration, Interval}, }; - -// TODO: move all the following consts to become part of AppSettings, an keep -// a copy of current settings in memory/ServerGlobalState - -// URL to send queries using RPC to get rewards addresses balances from L2. -const L2_RPC_URL: &str = "https://sepolia-rollup.arbitrum.io/rpc"; - -// ERC20 token contract address. -const ANT_TOKEN_CONTRACT_ADDR: &str = "0xBE1802c27C324a28aeBcd7eeC7D734246C807194"; - -// How often to fetch metrics and node info from nodes -const NODES_METRICS_POLLING_FREQ: Duration = - Duration::from_millis(METRICS_POLLING_FREQ_MILLIS as u64); - -// Check latest version of node binary every couple of hours -const BIN_VERSION_POLLING_FREQ: Duration = Duration::from_secs(60 * 60 * 2); +use url::Url; // How often to perform a metrics pruning in the DB. const METRICS_PRUNING_FREQ: Duration = Duration::from_secs(60 * 60); // every hour. -// How often to query balances from the ledger. -const REWARDS_BALANCES_RETRIEVAL: Duration = Duration::from_secs(60 * 15); // every 15mins. - -// Frequency to pull a new version of the formica image +// Frequency to pull a new version of the formica image. const FORMICA_IMAGE_PULLING_FREQ: Duration = Duration::from_secs(60 * 60 * 6); // every 6 hours. -// Create ERC20 contract instance +// Timeout duration when querying for each rewards balance. +const BALANCE_QUERY_TIMEOUT: Duration = Duration::from_secs(10); + +// ERC20 token contract ABI sol!( #[allow(missing_docs)] #[sol(rpc)] @@ -57,6 +42,76 @@ sol!( "artifacts/token_contract_abi.json" ); +// App settings and set of intervals used to schedule each of the tasks. +struct TasksContext { + formica_image_pulling: Interval, + node_bin_version_check: Interval, + balances_retrieval: Interval, + metrics_pruning: Interval, + nodes_metrics_polling: Interval, + app_settings: AppSettings, +} + +impl TasksContext { + fn from(settings: AppSettings) -> Self { + Self { + formica_image_pulling: interval(FORMICA_IMAGE_PULLING_FREQ), + node_bin_version_check: interval(settings.node_bin_version_polling_freq), + balances_retrieval: interval(settings.rewards_balances_retrieval_freq), + metrics_pruning: interval(METRICS_PRUNING_FREQ), + nodes_metrics_polling: interval(settings.nodes_metrics_polling_freq), + app_settings: settings, + } + } + + fn apply_settings(&mut self, settings: AppSettings) { + logging::log!("Applying new settings values immediataly to bg tasks: {settings:#?}"); + + // helper to create a new interval only if new period differs from current + let update_interval = |target: &mut Interval, new_period: Duration| { + let curr_period = target.period(); + if new_period != curr_period { + *target = interval(new_period); + // reset interval to start next period from this instant + target.reset(); + } + }; + + update_interval( + &mut self.node_bin_version_check, + settings.node_bin_version_polling_freq, + ); + update_interval( + &mut self.balances_retrieval, + settings.rewards_balances_retrieval_freq, + ); + update_interval( + &mut self.nodes_metrics_polling, + settings.nodes_metrics_polling_freq, + ); + self.app_settings = settings; + } + + fn parse_token_addr_and_rpc_url(&self) -> (Option
, Option) { + let addr = match self.app_settings.token_contract_address.parse::
() { + Err(err) => { + logging::log!("Rewards balance check disabled. Invalid configured token contract address: {err}"); + None + } + Ok(token_address) => Some(token_address), + }; + let url = match self.app_settings.l2_network_rpc_url.parse::() { + Err(err) => { + logging::log!("Rewards balance check disabled. Invalid configured RPC URL: {err}"); + None + } + Ok(rpc_url) => Some(rpc_url), + }; + + (addr, url) + } +} + // Spawn any required background tasks pub fn spawn_bg_tasks( docker_client: DockerClient, @@ -65,34 +120,53 @@ pub fn spawn_bg_tasks( db_client: DbClient, server_api_hit: Arc>, node_status_locked: Arc>>, + mut updated_settings_rx: mpsc::Receiver, + settings: AppSettings, ) { - let mut formica_image_pulling = interval(FORMICA_IMAGE_PULLING_FREQ); - let mut node_bin_version_check = interval(BIN_VERSION_POLLING_FREQ); - let mut balances_retrieval = interval(REWARDS_BALANCES_RETRIEVAL); - let mut metrics_pruning = interval(METRICS_PRUNING_FREQ); - let mut nodes_metrics_polling = interval(NODES_METRICS_POLLING_FREQ); - - // we start a counter to stop polling RPC API when there is no active client - let mut poll_rpc_counter = 5; + logging::log!("App settings to use: {settings:#?}"); + let mut ctx = TasksContext::from(settings); + + // we start a count down to stop polling RPC API when there is no active client + let mut poll_rpc_countdown = 5; + + // helper which create a new contract if the new configured values are valid. + let update_token_contract = |ctx: &TasksContext| match ctx.parse_token_addr_and_rpc_url() { + (Some(token_address), Some(rpc_url)) => { + let provider = ProviderBuilder::new().on_http(rpc_url); + let token_contract = TokenContract::new(token_address, provider); + Some(token_contract) + } + _ => None, + }; - // FIXME: remove unwrap calls - let token_address: Address = ANT_TOKEN_CONTRACT_ADDR.parse().unwrap(); - let provider = ProviderBuilder::new().on_http(L2_RPC_URL.parse().unwrap()); - let token_contract = TokenContract::new(token_address, provider); + // Token contract used to query rewards balances. + let mut token_contract = update_token_contract(&ctx); tokio::spawn(async move { loop { select! { - _ = formica_image_pulling.tick() => { - let docker_client_clone = docker_client.clone(); + settings = updated_settings_rx.recv() => { + if let Some(s) = settings { + let prev_addr = ctx.app_settings.token_contract_address.clone(); + let prev_url = ctx.app_settings.l2_network_rpc_url.clone(); + ctx.apply_settings(s); + + if prev_addr != ctx.app_settings.token_contract_address + || prev_url != ctx.app_settings.l2_network_rpc_url { + token_contract = update_token_contract(&ctx); + } + } + }, + _ = ctx.formica_image_pulling.tick() => { + let docker_client = docker_client.clone(); tokio::spawn(async move { logging::log!("Pulling formica node image ..."); - if let Err(err) = docker_client_clone.pull_formica_image().await { + if let Err(err) = docker_client.pull_formica_image().await { logging::log!("Failed to pull node image from the periodic task: {err}"); } }); }, - _ = node_bin_version_check.tick() => { + _ = ctx.node_bin_version_check.tick() => { tokio::spawn(check_node_bin_version( docker_client.clone(), latest_bin_version.clone(), @@ -100,43 +174,46 @@ pub fn spawn_bg_tasks( node_status_locked.clone(), )); }, - _ = balances_retrieval.tick() => { - tokio::spawn(retrieve_current_rewards_balances( - token_contract.clone(), - docker_client.clone(), - db_client.clone() - )); + _ = ctx.balances_retrieval.tick() => match token_contract { + Some(ref contract) => { + tokio::spawn(retrieve_current_rewards_balances( + contract.clone(), + docker_client.clone(), + db_client.clone() + )); + }, + None => logging::log!("Skipping balances retrieval due to invalid settings") }, - _ = metrics_pruning.tick() => { + _ = ctx.metrics_pruning.tick() => { tokio::spawn(prune_metrics( docker_client.clone(), db_client.clone() )); }, - _ = nodes_metrics_polling.tick() => { + _ = ctx.nodes_metrics_polling.tick() => { if *server_api_hit.lock().await { // reset the countdown to five more cycles - poll_rpc_counter = 5; + poll_rpc_countdown = 5; *server_api_hit.lock().await = false; - } else if poll_rpc_counter > 0 { - poll_rpc_counter -= 1; + } else if poll_rpc_countdown > 0 { + poll_rpc_countdown -= 1; } - let poll_rpc_api = poll_rpc_counter > 0; + let poll_rpc_api = poll_rpc_countdown > 0; // we don't spawn a task for this one just in case it's taking // too long to complete and we may start overwhelming the backend // with multiple overlapping tasks being launched. // TODO: update also inactive nodes only the first time to get up to date node status. update_nodes_info( - docker_client.clone(), - nodes_metrics.clone(), - db_client.clone(), - node_status_locked.clone(), + &docker_client, + &nodes_metrics, + &db_client, + &node_status_locked, poll_rpc_api ).await; - // reset timer to start next period from this instant, + // reset interval to start next period from this instant, // regardless how long the above polling task lasted. - nodes_metrics_polling.reset_after(NODES_METRICS_POLLING_FREQ); + ctx.nodes_metrics_polling.reset_after(ctx.nodes_metrics_polling.period()); } } } @@ -183,9 +260,7 @@ async fn check_node_bin_version( logging::log!("Failed to auto-upgrade node binary for node instance {container_id}: {err:?}."); } - let delay = Duration::from_secs( - db_client.get_settings().await.nodes_auto_upgrade_delay_secs, - ); + let delay = db_client.get_settings().await.nodes_auto_upgrade_delay; sleep(delay).await; } Ok(None) => break, // all nodes are up to date @@ -239,10 +314,10 @@ async fn latest_version_available() -> Option { // - Nodes' RPC API to get binary version and peer id. // - Nodes' exposed metrics server to obtain stats. async fn update_nodes_info( - docker_client: DockerClient, - nodes_metrics: Arc>, - db_client: DbClient, - node_status_locked: Arc>>, + docker_client: &DockerClient, + nodes_metrics: &Arc>, + db_client: &DbClient, + node_status_locked: &Arc>>, poll_rpc_api: bool, ) { let containers = match docker_client.get_containers_list(false).await { @@ -350,30 +425,36 @@ async fn retrieve_current_rewards_balances()) { let new_balance = if let Some(balance) = updated_balances.get(&address) { - *balance + balance.to_string() } else { // query the balance to the ERC20 contract logging::log!("Querying rewards balance for node {node_short_id} ..."); - match token_contract.balanceOf(address).call().await { - Ok(balance) => { + match timeout( + BALANCE_QUERY_TIMEOUT, + token_contract.balanceOf(address).call(), + ) + .await + { + Ok(Ok(balance)) => { let balance = balance._0; updated_balances.insert(address, balance); - balance + balance.to_string() } - Err(err) => { + Ok(Err(err)) => { logging::log!( "Failed to query rewards balance for node {node_short_id}: {err}" ); - continue; + "".to_string() + } + Err(_) => { + logging::log!("Timeout ({BALANCE_QUERY_TIMEOUT:?}) while querying rewards balance for node {node_short_id}."); + "".to_string() } } }; db_client - .update_node_metadata_fields( - &node_info.container_id, - &[("balance", &new_balance.to_string())], - ) + .update_node_metadata_fields(&node_info.container_id, &[("balance", &new_balance)]) .await; } else { logging::log!("No valid rewards address set for node {node_short_id}."); diff --git a/src/db_client.rs b/src/db_client.rs index fd5dbab..c04d705 100644 --- a/src/db_client.rs +++ b/src/db_client.rs @@ -18,6 +18,7 @@ use std::{ path::Path, str::FromStr, sync::Arc, + time::Duration, }; use thiserror::Error; use tokio::sync::Mutex; @@ -40,15 +41,11 @@ const DEFAULT_DB_PATH: &str = "./"; struct CachedSettings { nodes_auto_upgrade: bool, nodes_auto_upgrade_delay_secs: u64, -} - -impl Default for CachedSettings { - fn default() -> Self { - Self { - nodes_auto_upgrade: false, - nodes_auto_upgrade_delay_secs: 10, - } - } + node_bin_version_polling_freq_secs: u64, + nodes_metrics_polling_freq_secs: u64, + rewards_balances_retrieval_freq_secs: u64, + l2_network_rpc_url: String, + token_contract_address: String, } // Struct stored on the DB caching nodes metadata. @@ -437,36 +434,55 @@ impl DbClient { .await .map(|s| s.get(0).cloned()) { - Ok(Some(settings)) => AppSettings { - nodes_auto_upgrade: settings.nodes_auto_upgrade, - nodes_auto_upgrade_delay_secs: settings.nodes_auto_upgrade_delay_secs, + Ok(Some(s)) => AppSettings { + nodes_auto_upgrade: s.nodes_auto_upgrade, + nodes_auto_upgrade_delay: Duration::from_secs(s.nodes_auto_upgrade_delay_secs), + node_bin_version_polling_freq: Duration::from_secs( + s.node_bin_version_polling_freq_secs, + ), + nodes_metrics_polling_freq: Duration::from_secs(s.nodes_metrics_polling_freq_secs), + rewards_balances_retrieval_freq: Duration::from_secs( + s.rewards_balances_retrieval_freq_secs, + ), + l2_network_rpc_url: s.l2_network_rpc_url, + token_contract_address: s.token_contract_address, }, Ok(None) => { - logging::log!("No settings found in DB, using defaults."); + logging::log!("No settings found in DB, we'll be using defaults."); AppSettings::default() } Err(err) => { - logging::log!("Sqlite query error on settings: {err}. Using defaults."); + logging::log!("Sqlite query error on settings: {err}. We'll be using defaults."); AppSettings::default() } } } // Update the settings values - pub async fn update_settings(&self, settings: AppSettings) -> Result<(), DbError> { + pub async fn update_settings(&self, settings: &AppSettings) -> Result<(), DbError> { let db_lock = self.db.lock().await; match sqlx::query( "UPDATE settings SET \ nodes_auto_upgrade = ?, \ - nodes_auto_upgrade_delay_secs = ?", + nodes_auto_upgrade_delay_secs = ?, \ + node_bin_version_polling_freq_secs = ?, \ + nodes_metrics_polling_freq_secs = ?, \ + rewards_balances_retrieval_freq_secs = ?, \ + l2_network_rpc_url = ?, \ + token_contract_address = ?", ) .bind(settings.nodes_auto_upgrade) - .bind(settings.nodes_auto_upgrade_delay_secs as i64) + .bind(settings.nodes_auto_upgrade_delay.as_secs() as i64) + .bind(settings.node_bin_version_polling_freq.as_secs() as i64) + .bind(settings.nodes_metrics_polling_freq.as_secs() as i64) + .bind(settings.rewards_balances_retrieval_freq.as_secs() as i64) + .bind(settings.l2_network_rpc_url.clone()) + .bind(settings.token_contract_address.clone()) .execute(&*db_lock) .await { Ok(_) => { - logging::log!("Settings updated in DB cache with: {settings:#?}"); + logging::log!("New app settings updated in DB cache."); Ok(()) } Err(err) => { diff --git a/src/main.rs b/src/main.rs index 69176bd..084fb46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ async fn main() { use leptos::*; use leptos_axum::{generate_route_list, LeptosRoutes}; use std::{collections::HashSet, sync::Arc}; - use tokio::sync::Mutex; + use tokio::sync::{mpsc, Mutex}; logging::log!("Starting Formicaio v{} ...", env!("CARGO_PKG_VERSION")); @@ -35,6 +35,11 @@ async fn main() { // List of nodes which are currently being upgraded let node_status_locked = Arc::new(Mutex::new(HashSet::new())); + // Channel to send updates on app settings so they can be applied in the bg tasks + let (updated_settings_tx, updated_settings_rx) = mpsc::channel::(3); + // Let's read currently cached settings to use and push it to channel + let settings = db_client.get_settings().await; + spawn_bg_tasks( docker_client.clone(), latest_bin_version.clone(), @@ -42,6 +47,8 @@ async fn main() { db_client.clone(), server_api_hit.clone(), node_status_locked.clone(), + updated_settings_rx, + settings, ); let app_state = ServerGlobalState { @@ -52,6 +59,7 @@ async fn main() { nodes_metrics, server_api_hit, node_status_locked, + updated_settings_tx, }; let app = Router::new() diff --git a/src/server_api.rs b/src/server_api.rs index fe1a737..7223579 100644 --- a/src/server_api.rs +++ b/src/server_api.rs @@ -271,6 +271,7 @@ pub async fn get_settings() -> Result { #[server(UpdateSettings, "/api", "Url", "/update_settings")] pub async fn update_settings(settings: super::app::AppSettings) -> Result<(), ServerFnError> { let context = expect_context::(); - context.db_client.update_settings(settings).await?; + context.db_client.update_settings(&settings).await?; + context.updated_settings_tx.send(settings).await?; Ok(()) } diff --git a/src/settings.rs b/src/settings.rs index 86136e1..c6e1708 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -5,8 +5,10 @@ use super::{ server_api::{get_settings, update_settings}, }; +use alloy::primitives::Address; use leptos::*; -use std::num::ParseIntError; +use std::time::Duration; +use url::Url; #[component] pub fn SettingsView(settings_panel: RwSignal) -> impl IntoView { @@ -59,7 +61,12 @@ pub fn SettingsForm( settings_panel: RwSignal, ) -> impl IntoView { let auto_upgrade = create_rw_signal(false); - let auto_upgrade_delay_secs = create_rw_signal(Ok(0)); + let auto_upgrade_delay = create_rw_signal(Ok(0)); + let bin_version_polling_freq = create_rw_signal(Ok(0)); + let balances_retrieval_freq = create_rw_signal(Ok(0)); + let metrics_polling_freq = create_rw_signal(Ok(0)); + let l2_network_rpc_url = create_rw_signal(Ok("".to_string())); + let token_contract_address = create_rw_signal(Ok("".to_string())); let update_settings_action = create_action(move |settings: &AppSettings| { let settings = settings.clone(); @@ -99,23 +106,79 @@ pub fn SettingsForm( + + + + ().map_err(|err| err.to_string()).map(|_| v) } + /> + ().map_err(|err| err.to_string()).map(|_| v) } + />