diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 447372fa88..6bea0d627f 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -525,8 +525,21 @@ type BackgroundTask = Pin = Arc; type Handler = Bag, A>; +/// Details about sector currently being plotted +pub struct SectorPlottingDetails { + /// Sector index + pub sector_index: SectorIndex, + /// Progress so far in % (not including this sector) + pub progress: f32, + /// Whether sector is being replotted + pub replotting: bool, + /// Whether this is the last sector queued so far + pub last_queued: bool, +} + #[derive(Default, Debug)] struct Handlers { + sector_plotting: Handler, sector_plotted: Handler<(PlottedSector, Option)>, solution: Handler, } @@ -905,7 +918,6 @@ impl SingleDiskFarm { erasure_coding, handlers, modifying_sector_index, - target_sector_count, sectors_to_plot_receiver, plotting_thread_pool, replotting_thread_pool, @@ -1162,9 +1174,11 @@ impl SingleDiskFarm { } /// Subscribe to sector plotting notification - /// - /// Plotting permit is given such that it can be dropped later by the implementation is - /// throttling of the plotting process is desired. + pub fn on_sector_plotting(&self, callback: HandlerFn) -> HandlerId { + self.handlers.sector_plotting.add(callback) + } + + /// Subscribe to notification about plotted sectors pub fn on_sector_plotted( &self, callback: HandlerFn<(PlottedSector, Option)>, diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 34c1d8dc55..0bed39e3f5 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -1,5 +1,6 @@ use crate::single_disk_farm::{ - BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA, + BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails, + RESERVED_PLOT_METADATA, }; use crate::{node_client, NodeClient}; use async_lock::RwLock; @@ -40,6 +41,15 @@ const ARCHIVED_SEGMENTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).expec /// Get piece retry attempts number. const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed"); +pub(super) struct SectorToPlot { + sector_index: SectorIndex, + /// Progress so far in % (not including this sector) + progress: f32, + /// Whether this is the last sector queued so far + last_queued: bool, + acknowledgement_sender: oneshot::Sender<()>, +} + /// Errors that happen during plotting #[derive(Debug, Error)] pub enum PlottingError { @@ -90,8 +100,7 @@ pub(super) struct PlottingOptions { pub(super) erasure_coding: ErasureCoding, pub(super) handlers: Arc, pub(super) modifying_sector_index: Arc>>, - pub(super) target_sector_count: u16, - pub(super) sectors_to_plot_receiver: mpsc::Receiver<(SectorIndex, oneshot::Sender<()>)>, + pub(super) sectors_to_plot_receiver: mpsc::Receiver, pub(super) plotting_thread_pool: Arc, pub(super) replotting_thread_pool: Arc, } @@ -123,7 +132,6 @@ where erasure_coding, handlers, modifying_sector_index, - target_sector_count, mut sectors_to_plot_receiver, plotting_thread_pool, replotting_thread_pool, @@ -131,8 +139,13 @@ where let mut table_generator = PosTable::generator(); // TODO: Concurrency - while let Some((sector_index, _acknowledgement_sender)) = sectors_to_plot_receiver.next().await - { + while let Some(sector_to_plot) = sectors_to_plot_receiver.next().await { + let SectorToPlot { + sector_index, + progress, + last_queued, + acknowledgement_sender: _acknowledgement_sender, + } = sector_to_plot; trace!(%sector_index, "Preparing to plot sector"); let maybe_old_sector_metadata = sectors_metadata @@ -143,11 +156,20 @@ where let replotting = maybe_old_sector_metadata.is_some(); if replotting { - debug!(%sector_index, "Replotting sector"); + info!(%sector_index, "Replotting sector ({progress:.2}% complete)"); } else { - debug!(%sector_index, "Plotting sector"); + info!(%sector_index, "Plotting sector ({progress:.2}% complete)"); } + handlers + .sector_plotting + .call_simple(&SectorPlottingDetails { + sector_index, + progress, + replotting, + last_queued, + }); + // This `loop` is a workaround for edge-case in local setup if expiration is configured to // 1. In that scenario we get replotting notification essentially straight from block import // pipeline of the node, before block is imported. This can result in subsequent request for @@ -272,13 +294,15 @@ where modifying_sector_index.write().await.take(); if replotting { - info!(%sector_index, "Sector replotted successfully"); + debug!(%sector_index, "Sector replotted successfully"); + if last_queued { + info!("Replotting complete"); + } } else { - info!( - %sector_index, - "Sector plotted successfully ({:.2}%)", - (sector_index + 1) as f32 / target_sector_count as f32 * 100.0 - ); + debug!(%sector_index, "Sector plotted successfully"); + if last_queued { + info!("Initial plotting complete"); + } } handlers @@ -297,7 +321,7 @@ pub(super) struct PlottingSchedulerOptions { pub(super) min_sector_lifetime: HistorySize, pub(super) node_client: NC, pub(super) sectors_metadata: Arc>>, - pub(super) sectors_to_plot_sender: mpsc::Sender<(SectorIndex, oneshot::Sender<()>)>, + pub(super) sectors_to_plot_sender: mpsc::Sender, } pub(super) async fn plotting_scheduler( @@ -415,8 +439,8 @@ where async fn pause_plotting_if_node_not_synced( node_client: &NC, - sectors_to_plot_proxy_receiver: mpsc::Receiver<(SectorIndex, oneshot::Sender<()>)>, - mut sectors_to_plot_sender: mpsc::Sender<(SectorIndex, oneshot::Sender<()>)>, + sectors_to_plot_proxy_receiver: mpsc::Receiver, + mut sectors_to_plot_sender: mpsc::Sender, ) -> Result<(), BackgroundTaskError> where NC: NodeClient, @@ -494,7 +518,7 @@ async fn send_plotting_notifications( sectors_metadata: Arc>>, last_archived_segment: &Atomic, mut archived_segments_receiver: mpsc::Receiver<()>, - mut sectors_to_plot_sender: mpsc::Sender<(SectorIndex, oneshot::Sender<()>)>, + mut sectors_to_plot_sender: mpsc::Sender, ) -> Result<(), BackgroundTaskError> where NC: NodeClient, @@ -503,7 +527,12 @@ where for sector_index in sectors_indices_left_to_plot { let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel(); if let Err(error) = sectors_to_plot_sender - .send((sector_index, acknowledgement_sender)) + .send(SectorToPlot { + sector_index, + progress: sector_index as f32 / target_sector_count as f32 * 100.0, + last_queued: sector_index + 1 == target_sector_count, + acknowledgement_sender, + }) .await { warn!(%error, "Failed to send sector index for initial plotting"); @@ -642,10 +671,16 @@ where } } - for sector_index in sector_indices_to_replot.iter() { + let sectors_queued = sector_indices_to_replot.len(); + for (index, sector_index) in sector_indices_to_replot.drain(..).enumerate() { let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel(); if let Err(error) = sectors_to_plot_sender - .send((*sector_index, acknowledgement_sender)) + .send(SectorToPlot { + sector_index, + progress: index as f32 / sectors_queued as f32 * 100.0, + last_queued: index + 1 == sectors_queued, + acknowledgement_sender, + }) .await { warn!(%error, "Failed to send sector index for replotting"); @@ -655,10 +690,8 @@ where // We do not care if message was sent back or sender was just dropped let _ = acknowledgement_receiver.await; - sectors_expire_at.remove(sector_index); + sectors_expire_at.remove(§or_index); } - - sector_indices_to_replot.clear(); } Ok(())