From 8a39aedbfc4afccc1655e734e8f7946e4189647e Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 27 Oct 2023 16:51:07 +0300 Subject: [PATCH] Download next segment while previous segment is being encoded --- .../subspace-farmer/src/single_disk_farm.rs | 16 +-- .../src/single_disk_farm/plotting.rs | 117 +++++++++++++----- 2 files changed, 92 insertions(+), 41 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 9f033f4f15..f88f97d0e7 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -575,7 +575,7 @@ impl SingleDiskFarm { ) -> Result where NC: NodeClient, - PG: PieceGetter + Clone + Send + 'static, + PG: PieceGetter + Clone + Send + Sync + 'static, PosTable: Table, { let SingleDiskFarmOptions { @@ -850,7 +850,7 @@ impl SingleDiskFarm { let (start_sender, mut start_receiver) = broadcast::channel::<()>(1); let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1); let modifying_sector_index = Arc::>>::default(); - let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(0); + let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1); // Some sectors may already be plotted, skip them let sectors_indices_left_to_plot = metadata_header.plotted_sector_count..target_sector_count; @@ -937,7 +937,7 @@ impl SingleDiskFarm { let plotting_options = PlottingOptions { public_key, - node_client, + node_client: &node_client, pieces_in_sector, sector_size, sector_metadata_size, @@ -945,17 +945,17 @@ impl SingleDiskFarm { plot_file, metadata_file, sectors_metadata, - piece_getter, - kzg, - erasure_coding, + piece_getter: &piece_getter, + kzg: &kzg, + erasure_coding: &erasure_coding, handlers, modifying_sector_index, sectors_to_plot_receiver, downloading_semaphore, - encoding_semaphore, + encoding_semaphore: &encoding_semaphore, plotting_thread_pool, replotting_thread_pool, - stop_receiver: stop_receiver.resubscribe(), + stop_receiver: &mut stop_receiver.resubscribe(), }; let plotting_fut = plotting::<_, _, PosTable>(plotting_options); diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 072b37920c..150129f2ba 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -2,6 +2,7 @@ use crate::single_disk_farm::{ BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails, RESERVED_PLOT_METADATA, }; +use crate::utils::AsyncJoinOnDrop; use crate::{node_client, NodeClient}; use async_lock::RwLock; use atomic::Atomic; @@ -27,15 +28,15 @@ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::plotting; use subspace_farmer_components::plotting::{ - download_sector, encode_sector, DownloadSectorOptions, EncodeSectorOptions, PieceGetter, - PieceGetterRetryPolicy, PlottedSector, + download_sector, encode_sector, DownloadSectorOptions, DownloadedSector, EncodeSectorOptions, + PieceGetter, PieceGetterRetryPolicy, PlottedSector, }; use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_proof_of_space::Table; use thiserror::Error; use tokio::runtime::Handle; use tokio::sync::{broadcast, Semaphore}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, trace, warn, Instrument}; const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500); /// Size of the cache of archived segments for the purposes of faster sector expiration checks. @@ -50,6 +51,7 @@ pub(super) struct SectorToPlot { /// Whether this is the last sector queued so far last_queued: bool, acknowledgement_sender: oneshot::Sender<()>, + next_segment_index_hint: Option, } /// Errors that happen during plotting @@ -91,11 +93,14 @@ pub enum PlottingError { /// Failed to create thread pool #[error("Failed to create thread pool: {0}")] FailedToCreateThreadPool(#[from] ThreadPoolBuildError), + /// Background downloading panicked + #[error("Background downloading panicked")] + BackgroundDownloadingPanicked, } -pub(super) struct PlottingOptions { +pub(super) struct PlottingOptions<'a, NC, PG> { pub(super) public_key: PublicKey, - pub(super) node_client: NC, + pub(super) node_client: &'a NC, pub(super) pieces_in_sector: u16, pub(super) sector_size: usize, pub(super) sector_metadata_size: usize, @@ -103,9 +108,9 @@ pub(super) struct PlottingOptions { pub(super) plot_file: Arc, pub(super) metadata_file: File, pub(super) sectors_metadata: Arc>>, - pub(super) piece_getter: PG, - pub(super) kzg: Kzg, - pub(super) erasure_coding: ErasureCoding, + pub(super) piece_getter: &'a PG, + pub(super) kzg: &'a Kzg, + pub(super) erasure_coding: &'a ErasureCoding, pub(super) handlers: Arc, pub(super) modifying_sector_index: Arc>>, pub(super) sectors_to_plot_receiver: mpsc::Receiver, @@ -114,10 +119,10 @@ pub(super) struct PlottingOptions { pub(crate) downloading_semaphore: Arc, /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically /// allow one permit at a time for efficient CPU utilization - pub(crate) encoding_semaphore: Arc, + pub(crate) encoding_semaphore: &'a Semaphore, pub(super) plotting_thread_pool: ThreadPool, pub(super) replotting_thread_pool: ThreadPool, - pub(super) stop_receiver: broadcast::Receiver<()>, + pub(super) stop_receiver: &'a mut broadcast::Receiver<()>, } /// Starts plotting process. @@ -125,11 +130,11 @@ pub(super) struct PlottingOptions { /// NOTE: Returned future is async, but does blocking operations and should be running in dedicated /// thread. pub(super) async fn plotting( - plotting_options: PlottingOptions, + plotting_options: PlottingOptions<'_, NC, PG>, ) -> Result<(), PlottingError> where NC: NodeClient, - PG: PieceGetter + Clone + Send + 'static, + PG: PieceGetter + Clone + Send + Sync + 'static, PosTable: Table, { let PlottingOptions { @@ -152,20 +157,23 @@ where encoding_semaphore, plotting_thread_pool, replotting_thread_pool, - mut stop_receiver, + stop_receiver, } = plotting_options; - let erasure_coding = &erasure_coding; - let encoding_semaphore = Some(encoding_semaphore.as_ref()); - let stop_receiver = &mut stop_receiver; let mut table_generator = PosTable::generator(); - // TODO: Concurrency + + let mut maybe_next_downloaded_sector_fut = + None::>>; while let Some(sector_to_plot) = sectors_to_plot_receiver.next().await { let SectorToPlot { sector_index, progress, last_queued, acknowledgement_sender: _acknowledgement_sender, + // TODO: Remove this hint once we have + // https://github.com/rust-lang/futures-rs/issues/2793 and can + // `sectors_to_plot_receiver.try_peek()` instead + next_segment_index_hint, } = sector_to_plot; trace!(%sector_index, "Preparing to plot sector"); @@ -218,19 +226,55 @@ where break farmer_app_info; }; - let downloaded_sector_fut = download_sector(DownloadSectorOptions { - public_key: &public_key, - sector_index, - piece_getter: &piece_getter, - piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( - PIECE_GETTER_RETRY_NUMBER.get(), - ), - farmer_protocol_info: farmer_app_info.protocol_info, - kzg: &kzg, - pieces_in_sector, - downloading_semaphore: Some(Arc::clone(&downloading_semaphore)), - }); - let downloaded_sector = downloaded_sector_fut.await?; + let downloaded_sector = + if let Some(downloaded_sector_fut) = maybe_next_downloaded_sector_fut.take() { + downloaded_sector_fut + .await + .map_err(|_error| PlottingError::BackgroundDownloadingPanicked)?? + } else { + let downloaded_sector_fut = download_sector(DownloadSectorOptions { + public_key: &public_key, + sector_index, + piece_getter, + piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( + PIECE_GETTER_RETRY_NUMBER.get(), + ), + farmer_protocol_info: farmer_app_info.protocol_info, + kzg, + pieces_in_sector, + downloading_semaphore: Some(Arc::clone(&downloading_semaphore)), + }); + downloaded_sector_fut.await? + }; + + // Initiate downloading of pieces for the next segment index if already known + if let Some(sector_index) = next_segment_index_hint { + let piece_getter = piece_getter.clone(); + let downloading_semaphore = Some(Arc::clone(&downloading_semaphore)); + let kzg = kzg.clone(); + + maybe_next_downloaded_sector_fut.replace(AsyncJoinOnDrop::new( + tokio::spawn( + async move { + let downloaded_sector_fut = download_sector(DownloadSectorOptions { + public_key: &public_key, + sector_index, + piece_getter: &piece_getter, + piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( + PIECE_GETTER_RETRY_NUMBER.get(), + ), + farmer_protocol_info: farmer_app_info.protocol_info, + kzg: &kzg, + pieces_in_sector, + downloading_semaphore, + }); + downloaded_sector_fut.await + } + .in_current_span(), + ), + true, + )); + } // Inform others that this sector is being modified modifying_sector_index.write().await.replace(sector_index); @@ -253,7 +297,7 @@ where pieces_in_sector, sector_output: &mut sector, sector_metadata_output: &mut sector_metadata, - encoding_semaphore, + encoding_semaphore: Some(encoding_semaphore), table_generator: &mut table_generator, }, ); @@ -571,7 +615,8 @@ where NC: NodeClient, { // Finish initial plotting if some sectors were not plotted fully yet - for sector_index in sectors_indices_left_to_plot { + let mut sectors_indices_left_to_plot = sectors_indices_left_to_plot.into_iter().peekable(); + while let Some(sector_index) = sectors_indices_left_to_plot.next() { let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel(); if let Err(error) = sectors_to_plot_sender .send(SectorToPlot { @@ -579,6 +624,7 @@ where progress: sector_index as f32 / target_sector_count as f32 * 100.0, last_queued: sector_index + 1 == target_sector_count, acknowledgement_sender, + next_segment_index_hint: sectors_indices_left_to_plot.peek().copied(), }) .await { @@ -724,7 +770,9 @@ where } let sectors_queued = sector_indices_to_replot.len(); - for (index, sector_index) in sector_indices_to_replot.drain(..).enumerate() { + let mut sector_indices_to_replot = + sector_indices_to_replot.drain(..).enumerate().peekable(); + while let Some((index, sector_index)) = sector_indices_to_replot.next() { let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel(); if let Err(error) = sectors_to_plot_sender .send(SectorToPlot { @@ -732,6 +780,9 @@ where progress: index as f32 / sectors_queued as f32 * 100.0, last_queued: index + 1 == sectors_queued, acknowledgement_sender, + next_segment_index_hint: sector_indices_to_replot + .peek() + .map(|(_index, sector_index)| *sector_index), }) .await {