Skip to content

Commit

Permalink
feat: server health call implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Vovke committed Oct 15, 2024
1 parent b2e1bd2 commit 9e3af4b
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 67 deletions.
3 changes: 2 additions & 1 deletion src/chain/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
tracker::ChainWatcher,
},
definitions::{
api_v2::{OrderInfo, Timestamp},
api_v2::{OrderInfo, RpcInfo, Timestamp},
Balance,
},
error::{ChainError, NotHex},
Expand Down Expand Up @@ -89,6 +89,7 @@ pub enum ChainRequest {
WatchAccount(WatchAccount),
Reap(WatchAccount),
Shutdown(oneshot::Sender<()>),
GetConnectedRpcs(oneshot::Sender<Vec<RpcInfo>>),
}

#[derive(Debug)]
Expand Down
126 changes: 82 additions & 44 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Everything related to actual interaction with blockchain
use std::collections::HashMap;

use std::sync::Arc;
use substrate_crypto_light::common::AccountId32;
use tokio::{
sync::{mpsc, oneshot},
Expand All @@ -23,6 +23,7 @@ pub mod rpc;
pub mod tracker;
pub mod utils;

use crate::definitions::api_v2::{Health, RpcInfo, ServerHealth};
use definitions::{ChainRequest, ChainTrackerRequest, WatchAccount};
use tracker::start_chain_watch;

Expand All @@ -35,7 +36,7 @@ const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(120000);
/// RPC server handle
#[derive(Clone, Debug)]
pub struct ChainManager {
pub tx: tokio::sync::mpsc::Sender<ChainRequest>,
pub tx: mpsc::Sender<ChainRequest>,
}

impl ChainManager {
Expand All @@ -54,6 +55,9 @@ impl ChainManager {

let mut currency_map = HashMap::new();

// Create a channel for receiving RPC status updates
let (rpc_update_tx, mut rpc_update_rx) = mpsc::channel(1024);

// start network monitors
for c in chain {
if c.endpoints.is_empty() {
Expand Down Expand Up @@ -82,61 +86,86 @@ impl ChainManager {
signer.interface(),
task_tracker.clone(),
cancellation_token.clone(),
rpc_update_tx.clone(),
);
}

task_tracker
.clone()
.spawn("Blockchain connections manager", async move {
let mut rpc_statuses: HashMap<(String, String), Health> = HashMap::new();


// start requests engine
while let Some(request) = rx.recv().await {
match request {
ChainRequest::WatchAccount(request) => {
if let Some(chain) = currency_map.get(&request.currency) {
if let Some(receiver) = watch_chain.get(chain) {
let _unused = receiver
.send(ChainTrackerRequest::WatchAccount(request))
.await;
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidChain(chain.to_string())));
loop {
tokio::select! {
Some(request) = rx.recv() => {
match request {
ChainRequest::WatchAccount(request) => {
if let Some(chain) = currency_map.get(&request.currency) {
if let Some(receiver) = watch_chain.get(chain) {
let _unused = receiver
.send(ChainTrackerRequest::WatchAccount(request))
.await;
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidChain(chain.to_string())));
}
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidCurrency(request.currency)));
}
}
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidCurrency(request.currency)));
}
}
ChainRequest::Reap(request) => {
if let Some(chain) = currency_map.get(&request.currency) {
if let Some(receiver) = watch_chain.get(chain) {
let _unused =
receiver.send(ChainTrackerRequest::Reap(request)).await;
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidChain(chain.to_string())));
ChainRequest::Reap(request) => {
if let Some(chain) = currency_map.get(&request.currency) {
if let Some(receiver) = watch_chain.get(chain) {
let _unused =
receiver.send(ChainTrackerRequest::Reap(request)).await;
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidChain(chain.to_string())));
}
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidCurrency(request.currency)));
}
}
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidCurrency(request.currency)));
}
}
ChainRequest::Shutdown(res) => {
for (name, chain) in watch_chain.drain() {
let (tx, rx) = oneshot::channel();
if chain.send(ChainTrackerRequest::Shutdown(tx)).await.is_ok() {
if timeout(SHUTDOWN_TIMEOUT, rx).await.is_err() {
tracing::error!("Chain monitor for {name} took too much time to wind down, probably it was frozen. Discarding it.");
};
ChainRequest::Shutdown(res) => {
for (name, chain) in watch_chain.drain() {
let (tx, rx) = oneshot::channel();
if chain.send(ChainTrackerRequest::Shutdown(tx)).await.is_ok() {
if timeout(SHUTDOWN_TIMEOUT, rx).await.is_err() {
tracing::error!("Chain monitor for {name} took too much time to wind down, probably it was frozen. Discarding it.");
};
}
}
let _ = res.send(());
break;
}
ChainRequest::GetConnectedRpcs(res_tx) => {
// Collect the RpcInfo from rpc_statuses
let connected_rpcs: Vec<RpcInfo> = rpc_statuses.iter().map(|((chain_name, rpc_url), status)| {
RpcInfo {
chain_name: chain_name.clone(),
rpc_url: rpc_url.clone(),
status: *status,
}
}).collect();
let _ = res_tx.send(connected_rpcs);
}
}
let _ = res.send(());
break;
}
Some(rpc_update) = rpc_update_rx.recv() => {
rpc_statuses.insert(
(rpc_update.chain_name.clone(), rpc_update.rpc_url.clone()),
rpc_update.status,
);
}
else => break,
}
}

Expand All @@ -162,6 +191,15 @@ impl ChainManager {
rx.await.map_err(|_| ChainError::MessageDropped)?
}

pub async fn get_connected_rpcs(&self) -> Result<Vec<RpcInfo>, Error> {
let (res_tx, res_rx) = oneshot::channel();
self.tx
.send(ChainRequest::GetConnectedRpcs(res_tx))
.await
.map_err(|_| Error::Fatal)?;
res_rx.await.map_err(|_| Error::Fatal)
}

pub async fn reap(
&self,
id: String,
Expand Down
35 changes: 32 additions & 3 deletions src/chain/tracker.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
//! A tracker that follows individual chain
use std::{collections::HashMap, time::SystemTime};

use frame_metadata::v15::RuntimeMetadataV15;
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use serde_json::Value;
use std::sync::Arc;
use std::{collections::HashMap, time::SystemTime};
use substrate_parser::{AsMetadata, ShortSpecs};
use tokio::{
sync::mpsc,
time::{timeout, Duration},
};
use tokio_util::sync::CancellationToken;

use crate::definitions::api_v2::{Health, RpcInfo};
use crate::{
chain::{
definitions::{BlockHash, ChainTrackerRequest, Invoice},
Expand All @@ -38,6 +39,7 @@ pub fn start_chain_watch(
signer: Signer,
task_tracker: TaskTracker,
cancellation_token: CancellationToken,
rpc_update_tx: mpsc::Sender<RpcInfo>,
) {
task_tracker
.clone()
Expand All @@ -51,9 +53,30 @@ pub fn start_chain_watch(
if shutdown || cancellation_token.is_cancelled() {
break;
}

let _ = rpc_update_tx.send(RpcInfo {
chain_name: chain.name.clone(),
rpc_url: endpoint.clone(),
status: Health::Degraded,
}).await;

if let Ok(client) = WsClientBuilder::default().build(endpoint).await {
let _ = rpc_update_tx.send(RpcInfo {
chain_name: chain.name.clone(),
rpc_url: endpoint.clone(),
status: Health::Ok,
}).await;

// prepare chain
let watcher = match ChainWatcher::prepare_chain(&client, chain.clone(), &mut watched_accounts, endpoint, chain_tx.clone(), state.interface(), task_tracker.clone())
let watcher = match ChainWatcher::prepare_chain(
&client,
chain.clone(),
&mut watched_accounts,
endpoint,
chain_tx.clone(),
state.interface(),
task_tracker.clone(),
)
.await
{
Ok(a) => a,
Expand Down Expand Up @@ -165,6 +188,12 @@ pub fn start_chain_watch(
}
}
}
} else {
let _ = rpc_update_tx.send(RpcInfo {
chain_name: chain.name.clone(),
rpc_url: endpoint.clone(),
status: Health::Critical,
}).await;
}
}
Ok(format!("Chain {} monitor shut down", chain.name).into())
Expand Down
24 changes: 12 additions & 12 deletions src/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub mod api_v2 {
pub enum WithdrawalStatus {
Waiting,
Failed,
Forced,
Completed,
}

Expand All @@ -185,24 +186,23 @@ pub mod api_v2 {
pub supported_currencies: HashMap<std::string::String, CurrencyProperties>,
}

#[allow(dead_code)] // TODO: Use this for health response?
#[derive(Debug, Serialize)]
struct ServerHealth {
server_info: ServerInfo,
connected_rpcs: Vec<RpcInfo>,
status: Health,
pub struct ServerHealth {
pub server_info: ServerInfo,
pub connected_rpcs: Vec<RpcInfo>,
pub status: Health,
}

#[derive(Debug, Serialize)]
struct RpcInfo {
rpc_url: String,
chain_name: String,
status: Health,
#[derive(Debug, Serialize, Clone)]
pub struct RpcInfo {
pub rpc_url: String,
pub chain_name: String,
pub status: Health,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone, PartialEq, Copy)]
#[serde(rename_all = "lowercase")]
enum Health {
pub enum Health {
Ok,
Degraded,
Critical,
Expand Down
14 changes: 10 additions & 4 deletions src/handlers/health.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::definitions::api_v2::ServerStatus;
use crate::definitions::api_v2::{ServerHealth, ServerStatus};
use crate::state::State;
use axum::{extract::State as ExtractState, http::StatusCode, Json};

Expand All @@ -18,12 +18,18 @@ pub async fn status(
}

pub async fn health(
ExtractState(_state): ExtractState<State>,
ExtractState(state): ExtractState<State>,
) -> (
[(axum::http::header::HeaderName, &'static str); 1],
Json<ServerStatus>,
Json<ServerHealth>,
) {
todo!();
match state.server_health().await {
Ok(status) => (
[(axum::http::header::CACHE_CONTROL, "no-store")],
Json(status),
),
Err(_) => panic!("db connection is down, state is lost"),
}
}

pub async fn audit(ExtractState(_state): ExtractState<State>) -> StatusCode {
Expand Down
Loading

0 comments on commit 9e3af4b

Please sign in to comment.