Skip to content

Commit

Permalink
feat: check node rewards balance as soon as it has been instantiated
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco committed Jan 15, 2025
1 parent 88acff9 commit 6bd5893
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 125 deletions.
2 changes: 1 addition & 1 deletion deploy/umbrelos/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ services:
devices:
- /dev:/dev
extra_hosts:
- "host.docker.internal:host-gateway"
- host.docker.internal:host-gateway
container_name: formicaio-apps-formicaio_formicaio_1
13 changes: 12 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use super::{
stats::AggregatedStatsView,
};

#[cfg(feature = "ssr")]
use super::docker_msgs::Container;
#[cfg(feature = "ssr")]
use axum::extract::FromRef;
#[cfg(feature = "hydrate")]
Expand Down Expand Up @@ -97,6 +99,15 @@ pub const NODES_LIST_POLLING_FREQ_MILLIS: u64 = 5_500;
// i.e. the home-network cannot be disabled for any node instantiated in current deployment.
const HOME_NETWORK_ONLY: &str = "HOME_NETWORK_ONLY";

// Type of actions that can be requested to the bg jobs.
#[cfg(feature = "ssr")]
#[derive(Clone, Debug)]
pub enum BgTasksCmds {
ApplySettings(AppSettings),
CheckBalanceFor(Container),
CheckAllBalances,
}

#[cfg(feature = "ssr")]
#[derive(Clone, FromRef, Debug)]
pub struct ServerGlobalState {
Expand All @@ -107,7 +118,7 @@ pub struct ServerGlobalState {
pub server_api_hit: Arc<Mutex<bool>>,
pub nodes_metrics: Arc<Mutex<super::metrics_client::NodesMetrics>>,
pub node_status_locked: ImmutableNodeStatus,
pub updated_settings_tx: broadcast::Sender<AppSettings>,
pub bg_tasks_cmds_tx: broadcast::Sender<BgTasksCmds>,
pub node_instaces_batches: Arc<
Mutex<(
broadcast::Sender<()>,
Expand Down
194 changes: 120 additions & 74 deletions src/bg_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{
app::{AppSettings, ImmutableNodeStatus, METRICS_MAX_SIZE_PER_CONTAINER},
app::{AppSettings, BgTasksCmds, ImmutableNodeStatus, METRICS_MAX_SIZE_PER_CONTAINER},
db_client::DbClient,
docker_client::DockerClient,
docker_msgs::Container,
lcd::display_stats_on_lcd,
metrics_client::{NodeMetricsClient, NodesMetrics},
node_instance::NodeInstanceInfo,
Expand Down Expand Up @@ -104,25 +105,6 @@ impl TasksContext {
);
self.app_settings = settings;
}

fn parse_token_addr_and_rpc_url(&self) -> (Option<Address>, Option<Url>) {
let addr = match self.app_settings.token_contract_address.parse::<Address>() {
Err(err) => {
logging::log!("Rewards balance check disabled. Invalid configured token contract address: {err}");
None
}
Ok(token_address) => Some(token_address),
};
let url = match self.app_settings.l2_network_rpc_url.parse::<Url>() {
Err(err) => {
logging::log!("Rewards balance check disabled. Invalid configured RPC URL: {err}");
None
}
Ok(rpc_url) => Some(rpc_url),
};

(addr, url)
}
}

// Spawn any required background tasks
Expand All @@ -134,25 +116,12 @@ pub fn spawn_bg_tasks(
db_client: DbClient,
server_api_hit: Arc<Mutex<bool>>,
node_status_locked: ImmutableNodeStatus,
mut updated_settings_rx: broadcast::Receiver<AppSettings>,
bg_tasks_cmds_tx: broadcast::Sender<BgTasksCmds>,
settings: AppSettings,
) {
logging::log!("App settings to use: {settings:#?}");
let mut ctx = TasksContext::from(settings);

// helper which create a new contract if the new configured values are valid.
let update_token_contract = |ctx: &TasksContext| match ctx.parse_token_addr_and_rpc_url() {
(Some(token_address), Some(rpc_url)) => {
let provider = ProviderBuilder::new().on_http(rpc_url);
let token_contract = TokenContract::new(token_address, provider);
Some(token_contract)
}
_ => None,
};

// Token contract used to query rewards balances.
let mut token_contract = update_token_contract(&ctx);

let stats: HashMap<String, String> = [(
"Formicaio".to_string(),
format!("v{}", env!("CARGO_PKG_VERSION")),
Expand All @@ -165,19 +134,26 @@ pub fn spawn_bg_tasks(
if ctx.app_settings.lcd_display_enabled {
tokio::spawn(display_stats_on_lcd(
ctx.app_settings.clone(),
updated_settings_rx.resubscribe(),
bg_tasks_cmds_tx.subscribe(),
lcd_stats.clone(),
));
}

// Spawn task which checks address balances as requested on the provided channel
tokio::spawn(balance_checker_task(
ctx.app_settings.clone(),
docker_client.clone(),
db_client.clone(),
lcd_stats.clone(),
bg_tasks_cmds_tx.subscribe(),
));

tokio::spawn(async move {
let mut bg_tasks_cmds_rx = bg_tasks_cmds_tx.subscribe();
loop {
select! {
settings = updated_settings_rx.recv() => {
if let Ok(s) = settings {
let prev_addr = ctx.app_settings.token_contract_address.clone();
let prev_url = ctx.app_settings.l2_network_rpc_url.clone();

settings = bg_tasks_cmds_rx.recv() => {
if let Ok(BgTasksCmds::ApplySettings(s)) = settings {
if s.lcd_display_enabled && (!ctx.app_settings.lcd_display_enabled
|| ctx.app_settings.lcd_device != s.lcd_device
|| ctx.app_settings.lcd_addr != s.lcd_addr)
Expand All @@ -187,18 +163,12 @@ pub fn spawn_bg_tasks(
// perhaps we need websockets for errors like this one.
tokio::spawn(display_stats_on_lcd(
s.clone(),
updated_settings_rx.resubscribe(),
bg_tasks_cmds_tx.subscribe(),
lcd_stats.clone()
));
}

ctx.apply_settings(s);

if prev_addr != ctx.app_settings.token_contract_address
|| prev_url != ctx.app_settings.l2_network_rpc_url
{
token_contract = update_token_contract(&ctx);
}
}
},
_ = ctx.formica_image_pulling.tick() => {
Expand All @@ -218,16 +188,8 @@ pub fn spawn_bg_tasks(
node_status_locked.clone()
));
},
_ = ctx.balances_retrieval.tick() => match token_contract {
Some(ref contract) => {
tokio::spawn(retrieve_current_rewards_balances(
contract.clone(),
docker_client.clone(),
db_client.clone(),
lcd_stats.clone()
));
},
None => logging::log!("Skipping balances retrieval due to invalid settings")
_ = ctx.balances_retrieval.tick() => {
let _ = bg_tasks_cmds_tx.send(BgTasksCmds::CheckAllBalances);
},
_ = ctx.metrics_pruning.tick() => {
tokio::spawn(prune_metrics(
Expand Down Expand Up @@ -507,28 +469,115 @@ async fn prune_metrics(docker_client: DockerClient, db_client: DbClient) {
}
}

async fn retrieve_current_rewards_balances<T: Transport + Clone, P: Provider<T, N>, N: Network>(
token_contract: TokenContract::TokenContractInstance<T, P, N>,
async fn balance_checker_task(
settings: AppSettings,
docker_client: DockerClient,
db_client: DbClient,
lcd_stats: Arc<Mutex<HashMap<String, String>>>,
mut bg_tasks_cmds_rx: broadcast::Receiver<BgTasksCmds>,
) {
// cache retrieved rewards balances to not query more than once per reward address
let mut updated_balances = HashMap::<Address, U256>::new();
// helper which creates a new contract if the new configured values are valid.
let update_token_contract = |contract_addr: &str, rpc_url: &str| {
let addr = match contract_addr.parse::<Address>() {
Err(err) => {
logging::log!("Rewards balance check disabled. Invalid configured token contract address: {err}");
None
}
Ok(token_address) => Some(token_address),
};
let url = match rpc_url.parse::<Url>() {
Err(err) => {
logging::log!("Rewards balance check disabled. Invalid configured RPC URL: {err}");
None
}
Ok(rpc_url) => Some(rpc_url),
};

let containers = match docker_client.get_containers_list(true).await {
Ok(containers) if !containers.is_empty() => containers,
Err(err) => {
logging::log!("Failed to get containers list: {err}");
remove_lcd_stats(&lcd_stats, &[LCD_LABEL_BALANCE]).await;
return;
}
_ => {
remove_lcd_stats(&lcd_stats, &[LCD_LABEL_BALANCE]).await;
return;
match (addr, url) {
(Some(token_address), Some(rpc_url)) => {
let provider = ProviderBuilder::new().on_http(rpc_url);
let token_contract = TokenContract::new(token_address, provider);
Some(token_contract)
}
_ => None,
}
};

// Token contract used to query rewards balances.
let mut token_contract = update_token_contract(
&settings.token_contract_address,
&settings.l2_network_rpc_url,
);

let mut prev_addr = settings.token_contract_address;
let mut prev_url = settings.l2_network_rpc_url;

// cache retrieved rewards balances to not query more than once per address
let mut updated_balances = HashMap::<Address, U256>::new();

loop {
match bg_tasks_cmds_rx.recv().await {
Ok(BgTasksCmds::ApplySettings(s)) => {
if prev_addr != s.token_contract_address || prev_url != s.l2_network_rpc_url {
token_contract =
update_token_contract(&s.token_contract_address, &s.l2_network_rpc_url);
prev_addr = s.token_contract_address;
prev_url = s.l2_network_rpc_url;
}
}
Ok(BgTasksCmds::CheckBalanceFor(container)) => {
if let Some(ref token_contract) = token_contract {
retrieve_current_balances(
[container],
token_contract,
&db_client,
&mut updated_balances,
)
.await;

let balance: U256 = updated_balances.values().sum();
update_lcd_stats(&lcd_stats, &[(LCD_LABEL_BALANCE, balance.to_string())]).await;
}
}
Ok(BgTasksCmds::CheckAllBalances) => {
if let Some(ref token_contract) = token_contract {
updated_balances.clear();
let containers = match docker_client.get_containers_list(true).await {
Ok(containers) if !containers.is_empty() => containers,
Err(err) => {
logging::log!("Failed to get containers list: {err}");
remove_lcd_stats(&lcd_stats, &[LCD_LABEL_BALANCE]).await;
continue;
}
_ => {
remove_lcd_stats(&lcd_stats, &[LCD_LABEL_BALANCE]).await;
continue;
}
};

retrieve_current_balances(
containers,
token_contract,
&db_client,
&mut updated_balances,
)
.await;

let balance: U256 = updated_balances.values().sum();
update_lcd_stats(&lcd_stats, &[(LCD_LABEL_BALANCE, balance.to_string())]).await;
}
}
Err(_) => {}
}
}
}

async fn retrieve_current_balances<T: Transport + Clone, P: Provider<T, N>, N: Network>(
containers: impl IntoIterator<Item = Container>,
token_contract: &TokenContract::TokenContractInstance<T, P, N>,
db_client: &DbClient,
updated_balances: &mut HashMap<Address, U256>,
) {
for container in containers.into_iter() {
let node_info: NodeInstanceInfo = container.into();
let node_short_id = node_info.short_container_id();
Expand Down Expand Up @@ -573,9 +622,6 @@ async fn retrieve_current_rewards_balances<T: Transport + Clone, P: Provider<T,
logging::log!("No valid rewards address set for node {node_short_id}.");
}
}

let balance: U256 = updated_balances.values().sum();
update_lcd_stats(&lcd_stats, &[(LCD_LABEL_BALANCE, balance.to_string())]).await;
}

// Helper to add/update stats to be disaplyed on external LCD device
Expand Down
8 changes: 4 additions & 4 deletions src/docker_msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct Container {
pub Id: ContainerId,
Expand Down Expand Up @@ -64,19 +64,19 @@ impl From<Container> for NodeInstanceInfo {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct Networks {
pub Networks: HashMap<String, Network>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct Network {
pub IPAddress: String,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct Port {
pub IP: Option<String>,
Expand Down
8 changes: 4 additions & 4 deletions src/lcd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::app::AppSettings;
use super::app::{AppSettings, BgTasksCmds};

use eyre::eyre;
use i2cdev::{
Expand Down Expand Up @@ -51,7 +51,7 @@ fn setup_i2c(settings: &AppSettings) -> Result<Display<Pcf8574>, eyre::Error> {
// Watch the stats and display them on the external LCD device.
pub async fn display_stats_on_lcd(
settings: AppSettings,
mut updated_settings_rx: broadcast::Receiver<AppSettings>,
mut bg_tasks_cmds_rx: broadcast::Receiver<BgTasksCmds>,
stats: Arc<Mutex<HashMap<String, String>>>,
) {
let cur_lcd_device = settings.lcd_device.clone();
Expand All @@ -74,8 +74,8 @@ pub async fn display_stats_on_lcd(

loop {
select! {
settings = updated_settings_rx.recv() => {
if let Ok(s) = settings {
settings = bg_tasks_cmds_rx.recv() => {
if let Ok(BgTasksCmds::ApplySettings(s)) = settings {
if !s.lcd_display_enabled
|| cur_lcd_device != s.lcd_device
|| cur_lcd_addr != s.lcd_addr
Expand Down
Loading

0 comments on commit 6bd5893

Please sign in to comment.