Skip to content

Commit

Permalink
Merge pull request #2085 from subspace/better-plotting-replotting-pro…
Browse files Browse the repository at this point in the history
…gress-tracking

Better plotting/replotting progress
  • Loading branch information
nazar-pc authored Oct 11, 2023
2 parents 4ec9e5f + b238c29 commit 1c68f46
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 28 deletions.
22 changes: 18 additions & 4 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,21 @@ type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError
type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
type Handler<A> = Bag<HandlerFn<A>, A>;

/// Details about sector currently being plotted
pub struct SectorPlottingDetails {
/// Sector index
pub sector_index: SectorIndex,
/// Progress so far in % (not including this sector)
pub progress: f32,
/// Whether sector is being replotted
pub replotting: bool,
/// Whether this is the last sector queued so far
pub last_queued: bool,
}

#[derive(Default, Debug)]
struct Handlers {
sector_plotting: Handler<SectorPlottingDetails>,
sector_plotted: Handler<(PlottedSector, Option<PlottedSector>)>,
solution: Handler<SolutionResponse>,
}
Expand Down Expand Up @@ -905,7 +918,6 @@ impl SingleDiskFarm {
erasure_coding,
handlers,
modifying_sector_index,
target_sector_count,
sectors_to_plot_receiver,
plotting_thread_pool,
replotting_thread_pool,
Expand Down Expand Up @@ -1162,9 +1174,11 @@ impl SingleDiskFarm {
}

/// Subscribe to sector plotting notification
///
/// Plotting permit is given such that it can be dropped later by the implementation is
/// throttling of the plotting process is desired.
pub fn on_sector_plotting(&self, callback: HandlerFn<SectorPlottingDetails>) -> HandlerId {
self.handlers.sector_plotting.add(callback)
}

/// Subscribe to notification about plotted sectors
pub fn on_sector_plotted(
&self,
callback: HandlerFn<(PlottedSector, Option<PlottedSector>)>,
Expand Down
81 changes: 57 additions & 24 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::single_disk_farm::{
BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails,
RESERVED_PLOT_METADATA,
};
use crate::{node_client, NodeClient};
use async_lock::RwLock;
Expand Down Expand Up @@ -40,6 +41,15 @@ const ARCHIVED_SEGMENTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).expec
/// Get piece retry attempts number.
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed");

pub(super) struct SectorToPlot {
sector_index: SectorIndex,
/// Progress so far in % (not including this sector)
progress: f32,
/// Whether this is the last sector queued so far
last_queued: bool,
acknowledgement_sender: oneshot::Sender<()>,
}

/// Errors that happen during plotting
#[derive(Debug, Error)]
pub enum PlottingError {
Expand Down Expand Up @@ -90,8 +100,7 @@ pub(super) struct PlottingOptions<NC, PG> {
pub(super) erasure_coding: ErasureCoding,
pub(super) handlers: Arc<Handlers>,
pub(super) modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
pub(super) target_sector_count: u16,
pub(super) sectors_to_plot_receiver: mpsc::Receiver<(SectorIndex, oneshot::Sender<()>)>,
pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
pub(super) plotting_thread_pool: Arc<ThreadPool>,
pub(super) replotting_thread_pool: Arc<ThreadPool>,
}
Expand Down Expand Up @@ -123,16 +132,20 @@ where
erasure_coding,
handlers,
modifying_sector_index,
target_sector_count,
mut sectors_to_plot_receiver,
plotting_thread_pool,
replotting_thread_pool,
} = plotting_options;

let mut table_generator = PosTable::generator();
// TODO: Concurrency
while let Some((sector_index, _acknowledgement_sender)) = sectors_to_plot_receiver.next().await
{
while let Some(sector_to_plot) = sectors_to_plot_receiver.next().await {
let SectorToPlot {
sector_index,
progress,
last_queued,
acknowledgement_sender: _acknowledgement_sender,
} = sector_to_plot;
trace!(%sector_index, "Preparing to plot sector");

let maybe_old_sector_metadata = sectors_metadata
Expand All @@ -143,11 +156,20 @@ where
let replotting = maybe_old_sector_metadata.is_some();

if replotting {
debug!(%sector_index, "Replotting sector");
info!(%sector_index, "Replotting sector ({progress:.2}% complete)");
} else {
debug!(%sector_index, "Plotting sector");
info!(%sector_index, "Plotting sector ({progress:.2}% complete)");
}

handlers
.sector_plotting
.call_simple(&SectorPlottingDetails {
sector_index,
progress,
replotting,
last_queued,
});

// This `loop` is a workaround for edge-case in local setup if expiration is configured to
// 1. In that scenario we get replotting notification essentially straight from block import
// pipeline of the node, before block is imported. This can result in subsequent request for
Expand Down Expand Up @@ -272,13 +294,15 @@ where
modifying_sector_index.write().await.take();

if replotting {
info!(%sector_index, "Sector replotted successfully");
debug!(%sector_index, "Sector replotted successfully");
if last_queued {
info!("Replotting complete");
}
} else {
info!(
%sector_index,
"Sector plotted successfully ({:.2}%)",
(sector_index + 1) as f32 / target_sector_count as f32 * 100.0
);
debug!(%sector_index, "Sector plotted successfully");
if last_queued {
info!("Initial plotting complete");
}
}

handlers
Expand All @@ -297,7 +321,7 @@ pub(super) struct PlottingSchedulerOptions<NC> {
pub(super) min_sector_lifetime: HistorySize,
pub(super) node_client: NC,
pub(super) sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) sectors_to_plot_sender: mpsc::Sender<(SectorIndex, oneshot::Sender<()>)>,
pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
}

pub(super) async fn plotting_scheduler<NC>(
Expand Down Expand Up @@ -415,8 +439,8 @@ where

async fn pause_plotting_if_node_not_synced<NC>(
node_client: &NC,
sectors_to_plot_proxy_receiver: mpsc::Receiver<(SectorIndex, oneshot::Sender<()>)>,
mut sectors_to_plot_sender: mpsc::Sender<(SectorIndex, oneshot::Sender<()>)>,
sectors_to_plot_proxy_receiver: mpsc::Receiver<SectorToPlot>,
mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
) -> Result<(), BackgroundTaskError>
where
NC: NodeClient,
Expand Down Expand Up @@ -494,7 +518,7 @@ async fn send_plotting_notifications<NC>(
sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
last_archived_segment: &Atomic<SegmentHeader>,
mut archived_segments_receiver: mpsc::Receiver<()>,
mut sectors_to_plot_sender: mpsc::Sender<(SectorIndex, oneshot::Sender<()>)>,
mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
) -> Result<(), BackgroundTaskError>
where
NC: NodeClient,
Expand All @@ -503,7 +527,12 @@ where
for sector_index in sectors_indices_left_to_plot {
let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
if let Err(error) = sectors_to_plot_sender
.send((sector_index, acknowledgement_sender))
.send(SectorToPlot {
sector_index,
progress: sector_index as f32 / target_sector_count as f32 * 100.0,
last_queued: sector_index + 1 == target_sector_count,
acknowledgement_sender,
})
.await
{
warn!(%error, "Failed to send sector index for initial plotting");
Expand Down Expand Up @@ -642,10 +671,16 @@ where
}
}

for sector_index in sector_indices_to_replot.iter() {
let sectors_queued = sector_indices_to_replot.len();
for (index, sector_index) in sector_indices_to_replot.drain(..).enumerate() {
let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
if let Err(error) = sectors_to_plot_sender
.send((*sector_index, acknowledgement_sender))
.send(SectorToPlot {
sector_index,
progress: index as f32 / sectors_queued as f32 * 100.0,
last_queued: index + 1 == sectors_queued,
acknowledgement_sender,
})
.await
{
warn!(%error, "Failed to send sector index for replotting");
Expand All @@ -655,10 +690,8 @@ where
// We do not care if message was sent back or sender was just dropped
let _ = acknowledgement_receiver.await;

sectors_expire_at.remove(sector_index);
sectors_expire_at.remove(&sector_index);
}

sector_indices_to_replot.clear();
}

Ok(())
Expand Down

0 comments on commit 1c68f46

Please sign in to comment.