Skip to content

Commit

Permalink
Abort prometheus worker on farmer exit. (#2104)
Browse files Browse the repository at this point in the history
* farmer: Abort prometheus worker on app exit.

* farmer: Introduce PrometheusTaskDropHelper.

* farmer: Add join handle and future wrappers.

* farmer: Refactor AsyncJoinOnDrop
  • Loading branch information
shamil-gadelshin authored Oct 17, 2023
1 parent dc05c3f commit 487ae10
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
13 changes: 9 additions & 4 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter;
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::utils::ss58::parse_ss58_reward_address;
use subspace_farmer::utils::{run_future_in_dedicated_thread, tokio_rayon_spawn_handler};
use subspace_farmer::utils::{
run_future_in_dedicated_thread, tokio_rayon_spawn_handler, AsyncJoinOnDrop,
};
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
Expand Down Expand Up @@ -358,14 +360,17 @@ where
)?
};

if metrics_endpoints_are_specified {
let _prometheus_worker = if metrics_endpoints_are_specified {
let prometheus_task = start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metrics_registry),
)?;

let _prometheus_worker = tokio::spawn(prometheus_task);
}
let join_handle = tokio::spawn(prometheus_task);
Some(AsyncJoinOnDrop::new(join_handle, true))
} else {
None
};

let kzg = Kzg::new(embedded_kzg_settings());
let erasure_coding = ErasureCoding::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ impl PieceCache {
}
});

match AsyncJoinOnDrop::new(maybe_piece_fut).await {
match AsyncJoinOnDrop::new(maybe_piece_fut, false).await {
Ok(maybe_piece) => maybe_piece,
Err(error) => {
error!(%error, ?key, "Piece reading task failed");
Expand Down
39 changes: 27 additions & 12 deletions crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ mod tests;

use futures::channel::oneshot;
use futures::channel::oneshot::Canceled;
use futures::future::{Either, Fuse, FusedFuture};
use futures::FutureExt;
use futures::future::Either;
use rayon::ThreadBuilder;
use std::future::Future;
use std::ops::Deref;
Expand All @@ -20,31 +19,47 @@ use tokio::task;
use tracing::debug;

/// Joins async join handle on drop
pub(crate) struct AsyncJoinOnDrop<T>(Option<Fuse<task::JoinHandle<T>>>);
pub struct AsyncJoinOnDrop<T> {
handle: Option<task::JoinHandle<T>>,
abort_on_drop: bool,
}

impl<T> Drop for AsyncJoinOnDrop<T> {
fn drop(&mut self) {
let handle = self.0.take().expect("Always called exactly once; qed");
if !handle.is_terminated() {
task::block_in_place(move || {
let _ = Handle::current().block_on(handle);
});
if let Some(handle) = self.handle.take() {
if self.abort_on_drop {
handle.abort();
}

if !handle.is_finished() {
task::block_in_place(move || {
let _ = Handle::current().block_on(handle);
});
}
}
}
}

impl<T> AsyncJoinOnDrop<T> {
// Create new instance
pub(crate) fn new(handle: task::JoinHandle<T>) -> Self {
Self(Some(handle.fuse()))
/// Create new instance.
pub fn new(handle: task::JoinHandle<T>, abort_on_drop: bool) -> Self {
Self {
handle: Some(handle),
abort_on_drop,
}
}
}

impl<T> Future for AsyncJoinOnDrop<T> {
type Output = Result<T, task::JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(self.0.as_mut().expect("Only dropped in Drop impl; qed")).poll(cx)
Pin::new(
self.handle
.as_mut()
.expect("Only dropped in Drop impl; qed"),
)
.poll(cx)
}
}

Expand Down

0 comments on commit 487ae10

Please sign in to comment.