Skip to content

Commit

Permalink
Merge pull request #11 from fluiderson/background-scanning
Browse files Browse the repository at this point in the history
Background balance checking
  • Loading branch information
Slesarew authored Mar 3, 2024
2 parents 342f853 + ca28e20 commit a320d02
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 136 deletions.
47 changes: 31 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ categories = ["finance"]

[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["full"] }
anyhow = "1"
env_logger = "0.10"
log = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion shoot.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
curl 127.0.0.1:16726/recipient/0xd43593c715fdd31c61141abd04a99fd6822c8558854ccde39a5684e7a56da27d/order/1337/price/100000000000
curl 127.0.0.1:16726/order/1337/price/1
10 changes: 10 additions & 0 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,16 @@ impl ReadInvoices<'_> {
.get(AsRef::<[u8; 32]>::as_ref(account))
.context("failed to get an invoice from the database")
}

pub fn iter(
&self,
) -> Result<impl Iterator<Item = Result<(AccessGuard<'_, &[u8; 32]>, AccessGuard<'_, Invoice>)>>>
{
self.0
.iter()
.context("failed to get the invoices iterator")
.map(|iter| iter.map(|item| item.context("failed to get an invoice from the iterator")))
}
}

pub struct WriteTransaction<'db>(redb::WriteTransaction<'db>);
Expand Down
110 changes: 52 additions & 58 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use anyhow::{Context, Result};
use anyhow::{Context, Error, Result};
use database::Database;
use env_logger::{Builder, Env};
use environment_variables::*;
use log::LevelFilter;
use rpc::Processor;
use std::{
env::{self, VarError},
sync::Arc,
future::Future,
};
use subxt::{
config::{
Expand All @@ -21,9 +21,9 @@ use subxt::{
};
use tokio::{
signal,
sync::watch::{Receiver, Sender},
task::JoinSet,
sync::mpsc::{self, UnboundedSender},
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};

mod database;
mod rpc;
Expand All @@ -43,7 +43,7 @@ pub mod environment_variables {
pub const DESTINATION: &str = "KALATORI_DESTINATION";
}

pub const DEFAULT_RPC: &str = "wss://rpc.polkadot.io";
pub const DEFAULT_RPC: &str = "wss://westend-rpc.polkadot.io";
pub const DEFAULT_DATABASE: &str = "database.redb";
pub const DATABASE_VERSION: Version = 0;

Expand Down Expand Up @@ -175,66 +175,63 @@ pub async fn main() -> Result<()> {
env!("CARGO_PKG_AUTHORS")
);

let shutdown_notification = Arc::new(Sender::new(false));
let shutdown_notification = CancellationToken::new();
let (error_tx, mut error_rx) = mpsc::unbounded_channel();

let (api_config, endpoint_properties, updater) =
rpc::prepare(endpoint, decimals, shutdown_notification.subscribe())
rpc::prepare(endpoint, decimals, shutdown_notification.clone())
.await
.context("failed to prepare the node module")?;

let (database, last_saved_block) =
Database::initialise(database_path, override_rpc, pair, endpoint_properties, destination)
.context("failed to initialise the database module")?;

let processor = Processor::new(
api_config,
database.clone(),
shutdown_notification.subscribe(),
let (database, last_saved_block) = Database::initialise(
database_path,
override_rpc,
pair,
endpoint_properties,
destination,
)
.context("failed to initialise the RPC module")?;
.context("failed to initialise the database module")?;

let processor = Processor::new(api_config, database.clone(), shutdown_notification.clone())
.context("failed to initialise the RPC module")?;

let server = server::new(shutdown_notification.subscribe(), host, database)
let server = server::new(shutdown_notification.clone(), host, database)
.await
.context("failed to initialise the server module")?;

let mut join_set = JoinSet::new();
let task_tracker = TaskTracker::new();

join_set.spawn(shutdown_listener(
shutdown_notification.clone(),
shutdown_notification.subscribe(),
));
join_set.spawn(updater.ignite());
join_set.spawn(processor.ignite(last_saved_block));
join_set.spawn(server);
task_tracker.close();

while let Some(task) = join_set.join_next().await {
let result = task.context("failed to shutdown a loop")?;
task_tracker.spawn(shutdown(
shutdown_listener(shutdown_notification.clone()),
error_tx.clone(),
));
task_tracker.spawn(shutdown(updater.ignite(), error_tx.clone()));
task_tracker.spawn(shutdown(
processor.ignite(last_saved_block, task_tracker.clone(), error_tx.clone()),
error_tx,
));
task_tracker.spawn(server);

match result {
Ok(shutdown_message) => log::info!("{shutdown_message}"),
Err(error) => {
log::error!("Received a fatal error!\n{error:?}");
while let Some(error) = error_rx.recv().await {
log::error!("Received a fatal error!\n{error:?}");

if !*shutdown_notification.borrow() {
log::info!("Initialising the shutdown...");
if !shutdown_notification.is_cancelled() {
log::info!("Initialising the shutdown...");

shutdown_notification
.send(true)
.with_context(|| unexpected_closure_of_notification_channel("shutdown"))?;
}
}
shutdown_notification.cancel();
}
}

task_tracker.wait().await;

log::info!("Goodbye!");

Ok(())
}

async fn shutdown_listener(
shutdown_notification_sender: Arc<Sender<bool>>,
mut shutdown_notification_receiver: Receiver<bool>,
) -> Result<&'static str> {
async fn shutdown_listener(shutdown_notification: CancellationToken) -> Result<&'static str> {
tokio::select! {
biased;
signal = signal::ctrl_c() => {
Expand All @@ -245,25 +242,22 @@ async fn shutdown_listener(

log::info!("Received the shutdown signal. Initialising the shutdown...");

process_shutdown_notification(shutdown_notification_sender.send(true), "send")
shutdown_notification.cancel();

Ok("The shutdown signal listener is shut down.")
}
notification = shutdown_notification_receiver.changed() => {
process_shutdown_notification(notification, "receive")
() = shutdown_notification.cancelled() => {
Ok("The shutdown signal listener is shut down.")
}
}
}

fn process_shutdown_notification<E>(
result: impl Context<(), E>,
kind: &str,
) -> Result<&'static str> {
result
.with_context(|| {
unexpected_closure_of_notification_channel(&format!("shutdown listener ({kind})"))
})
.map(|()| "The shutdown signal listener is shut down.")
}

fn unexpected_closure_of_notification_channel(loop_name: &str) -> String {
format!("unexpected closed shutdown notification channel in the {loop_name} loop")
async fn shutdown(
task: impl Future<Output = Result<&'static str>>,
error_tx: UnboundedSender<Error>,
) {
match task.await {
Ok(shutdown_message) => log::info!("{shutdown_message}"),
Err(error) => error_tx.send(error).unwrap(),
}
}
Loading

0 comments on commit a320d02

Please sign in to comment.