Skip to content

Commit

Permalink
Download next segment while previous segment is being encoded
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Oct 27, 2023
1 parent fe73d99 commit 8a39aed
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 41 deletions.
16 changes: 8 additions & 8 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ impl SingleDiskFarm {
) -> Result<Self, SingleDiskFarmError>
where
NC: NodeClient,
PG: PieceGetter + Clone + Send + 'static,
PG: PieceGetter + Clone + Send + Sync + 'static,
PosTable: Table,
{
let SingleDiskFarmOptions {
Expand Down Expand Up @@ -850,7 +850,7 @@ impl SingleDiskFarm {
let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
let modifying_sector_index = Arc::<RwLock<Option<SectorIndex>>>::default();
let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(0);
let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
// Some sectors may already be plotted, skip them
let sectors_indices_left_to_plot =
metadata_header.plotted_sector_count..target_sector_count;
Expand Down Expand Up @@ -937,25 +937,25 @@ impl SingleDiskFarm {

let plotting_options = PlottingOptions {
public_key,
node_client,
node_client: &node_client,
pieces_in_sector,
sector_size,
sector_metadata_size,
metadata_header,
plot_file,
metadata_file,
sectors_metadata,
piece_getter,
kzg,
erasure_coding,
piece_getter: &piece_getter,
kzg: &kzg,
erasure_coding: &erasure_coding,
handlers,
modifying_sector_index,
sectors_to_plot_receiver,
downloading_semaphore,
encoding_semaphore,
encoding_semaphore: &encoding_semaphore,
plotting_thread_pool,
replotting_thread_pool,
stop_receiver: stop_receiver.resubscribe(),
stop_receiver: &mut stop_receiver.resubscribe(),
};

let plotting_fut = plotting::<_, _, PosTable>(plotting_options);
Expand Down
117 changes: 84 additions & 33 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::single_disk_farm::{
BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails,
RESERVED_PLOT_METADATA,
};
use crate::utils::AsyncJoinOnDrop;
use crate::{node_client, NodeClient};
use async_lock::RwLock;
use atomic::Atomic;
Expand All @@ -27,15 +28,15 @@ use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::FileExt;
use subspace_farmer_components::plotting;
use subspace_farmer_components::plotting::{
download_sector, encode_sector, DownloadSectorOptions, EncodeSectorOptions, PieceGetter,
PieceGetterRetryPolicy, PlottedSector,
download_sector, encode_sector, DownloadSectorOptions, DownloadedSector, EncodeSectorOptions,
PieceGetter, PieceGetterRetryPolicy, PlottedSector,
};
use subspace_farmer_components::sector::SectorMetadataChecksummed;
use subspace_proof_of_space::Table;
use thiserror::Error;
use tokio::runtime::Handle;
use tokio::sync::{broadcast, Semaphore};
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, trace, warn, Instrument};

const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
/// Size of the cache of archived segments for the purposes of faster sector expiration checks.
Expand All @@ -50,6 +51,7 @@ pub(super) struct SectorToPlot {
/// Whether this is the last sector queued so far
last_queued: bool,
acknowledgement_sender: oneshot::Sender<()>,
next_segment_index_hint: Option<SectorIndex>,
}

/// Errors that happen during plotting
Expand Down Expand Up @@ -91,21 +93,24 @@ pub enum PlottingError {
/// Failed to create thread pool
#[error("Failed to create thread pool: {0}")]
FailedToCreateThreadPool(#[from] ThreadPoolBuildError),
/// Background downloading panicked
#[error("Background downloading panicked")]
BackgroundDownloadingPanicked,
}

pub(super) struct PlottingOptions<NC, PG> {
pub(super) struct PlottingOptions<'a, NC, PG> {
pub(super) public_key: PublicKey,
pub(super) node_client: NC,
pub(super) node_client: &'a NC,
pub(super) pieces_in_sector: u16,
pub(super) sector_size: usize,
pub(super) sector_metadata_size: usize,
pub(super) metadata_header: PlotMetadataHeader,
pub(super) plot_file: Arc<File>,
pub(super) metadata_file: File,
pub(super) sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) piece_getter: PG,
pub(super) kzg: Kzg,
pub(super) erasure_coding: ErasureCoding,
pub(super) piece_getter: &'a PG,
pub(super) kzg: &'a Kzg,
pub(super) erasure_coding: &'a ErasureCoding,
pub(super) handlers: Arc<Handlers>,
pub(super) modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
Expand All @@ -114,22 +119,22 @@ pub(super) struct PlottingOptions<NC, PG> {
pub(crate) downloading_semaphore: Arc<Semaphore>,
/// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
/// allow one permit at a time for efficient CPU utilization
pub(crate) encoding_semaphore: Arc<Semaphore>,
pub(crate) encoding_semaphore: &'a Semaphore,
pub(super) plotting_thread_pool: ThreadPool,
pub(super) replotting_thread_pool: ThreadPool,
pub(super) stop_receiver: broadcast::Receiver<()>,
pub(super) stop_receiver: &'a mut broadcast::Receiver<()>,
}

/// Starts plotting process.
///
/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
/// thread.
pub(super) async fn plotting<NC, PG, PosTable>(
plotting_options: PlottingOptions<NC, PG>,
plotting_options: PlottingOptions<'_, NC, PG>,
) -> Result<(), PlottingError>
where
NC: NodeClient,
PG: PieceGetter + Clone + Send + 'static,
PG: PieceGetter + Clone + Send + Sync + 'static,
PosTable: Table,
{
let PlottingOptions {
Expand All @@ -152,20 +157,23 @@ where
encoding_semaphore,
plotting_thread_pool,
replotting_thread_pool,
mut stop_receiver,
stop_receiver,
} = plotting_options;
let erasure_coding = &erasure_coding;
let encoding_semaphore = Some(encoding_semaphore.as_ref());
let stop_receiver = &mut stop_receiver;

let mut table_generator = PosTable::generator();
// TODO: Concurrency

let mut maybe_next_downloaded_sector_fut =
None::<AsyncJoinOnDrop<Result<DownloadedSector, plotting::PlottingError>>>;
while let Some(sector_to_plot) = sectors_to_plot_receiver.next().await {
let SectorToPlot {
sector_index,
progress,
last_queued,
acknowledgement_sender: _acknowledgement_sender,
// TODO: Remove this hint once we have
// https://github.com/rust-lang/futures-rs/issues/2793 and can
// `sectors_to_plot_receiver.try_peek()` instead
next_segment_index_hint,
} = sector_to_plot;
trace!(%sector_index, "Preparing to plot sector");

Expand Down Expand Up @@ -218,19 +226,55 @@ where
break farmer_app_info;
};

let downloaded_sector_fut = download_sector(DownloadSectorOptions {
public_key: &public_key,
sector_index,
piece_getter: &piece_getter,
piece_getter_retry_policy: PieceGetterRetryPolicy::Limited(
PIECE_GETTER_RETRY_NUMBER.get(),
),
farmer_protocol_info: farmer_app_info.protocol_info,
kzg: &kzg,
pieces_in_sector,
downloading_semaphore: Some(Arc::clone(&downloading_semaphore)),
});
let downloaded_sector = downloaded_sector_fut.await?;
let downloaded_sector =
if let Some(downloaded_sector_fut) = maybe_next_downloaded_sector_fut.take() {
downloaded_sector_fut
.await
.map_err(|_error| PlottingError::BackgroundDownloadingPanicked)??
} else {
let downloaded_sector_fut = download_sector(DownloadSectorOptions {
public_key: &public_key,
sector_index,
piece_getter,
piece_getter_retry_policy: PieceGetterRetryPolicy::Limited(
PIECE_GETTER_RETRY_NUMBER.get(),
),
farmer_protocol_info: farmer_app_info.protocol_info,
kzg,
pieces_in_sector,
downloading_semaphore: Some(Arc::clone(&downloading_semaphore)),
});
downloaded_sector_fut.await?
};

// Initiate downloading of pieces for the next segment index if already known
if let Some(sector_index) = next_segment_index_hint {
let piece_getter = piece_getter.clone();
let downloading_semaphore = Some(Arc::clone(&downloading_semaphore));
let kzg = kzg.clone();

maybe_next_downloaded_sector_fut.replace(AsyncJoinOnDrop::new(
tokio::spawn(
async move {
let downloaded_sector_fut = download_sector(DownloadSectorOptions {
public_key: &public_key,
sector_index,
piece_getter: &piece_getter,
piece_getter_retry_policy: PieceGetterRetryPolicy::Limited(
PIECE_GETTER_RETRY_NUMBER.get(),
),
farmer_protocol_info: farmer_app_info.protocol_info,
kzg: &kzg,
pieces_in_sector,
downloading_semaphore,
});
downloaded_sector_fut.await
}
.in_current_span(),
),
true,
));
}

// Inform others that this sector is being modified
modifying_sector_index.write().await.replace(sector_index);
Expand All @@ -253,7 +297,7 @@ where
pieces_in_sector,
sector_output: &mut sector,
sector_metadata_output: &mut sector_metadata,
encoding_semaphore,
encoding_semaphore: Some(encoding_semaphore),
table_generator: &mut table_generator,
},
);
Expand Down Expand Up @@ -571,14 +615,16 @@ where
NC: NodeClient,
{
// Finish initial plotting if some sectors were not plotted fully yet
for sector_index in sectors_indices_left_to_plot {
let mut sectors_indices_left_to_plot = sectors_indices_left_to_plot.into_iter().peekable();
while let Some(sector_index) = sectors_indices_left_to_plot.next() {
let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
if let Err(error) = sectors_to_plot_sender
.send(SectorToPlot {
sector_index,
progress: sector_index as f32 / target_sector_count as f32 * 100.0,
last_queued: sector_index + 1 == target_sector_count,
acknowledgement_sender,
next_segment_index_hint: sectors_indices_left_to_plot.peek().copied(),
})
.await
{
Expand Down Expand Up @@ -724,14 +770,19 @@ where
}

let sectors_queued = sector_indices_to_replot.len();
for (index, sector_index) in sector_indices_to_replot.drain(..).enumerate() {
let mut sector_indices_to_replot =
sector_indices_to_replot.drain(..).enumerate().peekable();
while let Some((index, sector_index)) = sector_indices_to_replot.next() {
let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
if let Err(error) = sectors_to_plot_sender
.send(SectorToPlot {
sector_index,
progress: index as f32 / sectors_queued as f32 * 100.0,
last_queued: index + 1 == sectors_queued,
acknowledgement_sender,
next_segment_index_hint: sector_indices_to_replot
.peek()
.map(|(_index, sector_index)| *sector_index),
})
.await
{
Expand Down

0 comments on commit 8a39aed

Please sign in to comment.