From 487ae106bdae74007dc9c92a10d9d057bd1913c2 Mon Sep 17 00:00:00 2001 From: shamil-gadelshin Date: Tue, 17 Oct 2023 16:09:43 +0700 Subject: [PATCH] Abort prometheus worker on farmer exit. (#2104) * farmer: Abort prometheus worker on app exit. * farmer: Introduce PrometheusTaskDropHelper. * farmer: Add join handle and future wrappers. * farmer: Refactor AsyncJoinOnDrop --- .../src/bin/subspace-farmer/commands/farm.rs | 13 +++++-- crates/subspace-farmer/src/piece_cache.rs | 2 +- crates/subspace-farmer/src/utils.rs | 39 +++++++++++++------ 3 files changed, 37 insertions(+), 17 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 3b59dde084..61b6a852f8 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -29,7 +29,9 @@ use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter; use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::utils::ss58::parse_ss58_reward_address; -use subspace_farmer::utils::{run_future_in_dedicated_thread, tokio_rayon_spawn_handler}; +use subspace_farmer::utils::{ + run_future_in_dedicated_thread, tokio_rayon_spawn_handler, AsyncJoinOnDrop, +}; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; @@ -358,14 +360,17 @@ where )? }; - if metrics_endpoints_are_specified { + let _prometheus_worker = if metrics_endpoints_are_specified { let prometheus_task = start_prometheus_metrics_server( metrics_endpoints, RegistryAdapter::Libp2p(metrics_registry), )?; - let _prometheus_worker = tokio::spawn(prometheus_task); - } + let join_handle = tokio::spawn(prometheus_task); + Some(AsyncJoinOnDrop::new(join_handle, true)) + } else { + None + }; let kzg = Kzg::new(embedded_kzg_settings()); let erasure_coding = ErasureCoding::new( diff --git a/crates/subspace-farmer/src/piece_cache.rs b/crates/subspace-farmer/src/piece_cache.rs index 1cc5a4c543..f98b832409 100644 --- a/crates/subspace-farmer/src/piece_cache.rs +++ b/crates/subspace-farmer/src/piece_cache.rs @@ -709,7 +709,7 @@ impl PieceCache { } }); - match AsyncJoinOnDrop::new(maybe_piece_fut).await { + match AsyncJoinOnDrop::new(maybe_piece_fut, false).await { Ok(maybe_piece) => maybe_piece, Err(error) => { error!(%error, ?key, "Piece reading task failed"); diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index 6fd4f0a8ed..d7cbf238f4 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -7,8 +7,7 @@ mod tests; use futures::channel::oneshot; use futures::channel::oneshot::Canceled; -use futures::future::{Either, Fuse, FusedFuture}; -use futures::FutureExt; +use futures::future::Either; use rayon::ThreadBuilder; use std::future::Future; use std::ops::Deref; @@ -20,23 +19,34 @@ use tokio::task; use tracing::debug; /// Joins async join handle on drop -pub(crate) struct AsyncJoinOnDrop(Option>>); +pub struct AsyncJoinOnDrop { + handle: Option>, + abort_on_drop: bool, +} impl Drop for AsyncJoinOnDrop { fn drop(&mut self) { - let handle = self.0.take().expect("Always called exactly once; qed"); - if !handle.is_terminated() { - task::block_in_place(move || { - let _ = Handle::current().block_on(handle); - }); + if let Some(handle) = self.handle.take() { + if self.abort_on_drop { + handle.abort(); + } + + if !handle.is_finished() { + task::block_in_place(move || { + let _ = Handle::current().block_on(handle); + }); + } } } } impl AsyncJoinOnDrop { - // Create new instance - pub(crate) fn new(handle: task::JoinHandle) -> Self { - Self(Some(handle.fuse())) + /// Create new instance. + pub fn new(handle: task::JoinHandle, abort_on_drop: bool) -> Self { + Self { + handle: Some(handle), + abort_on_drop, + } } } @@ -44,7 +54,12 @@ impl Future for AsyncJoinOnDrop { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(self.0.as_mut().expect("Only dropped in Drop impl; qed")).poll(cx) + Pin::new( + self.handle + .as_mut() + .expect("Only dropped in Drop impl; qed"), + ) + .poll(cx) } }