From c6ebc158057f140b8d959c70943d34ef8679591c Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 26 Oct 2023 17:42:31 +0300 Subject: [PATCH 1/7] Handle permit acquisition error in `plot_sector` --- crates/subspace-farmer-components/src/plotting.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 718047d4db..08b10db9eb 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -26,7 +26,7 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Table, TableGenerator}; use thiserror::Error; -use tokio::sync::Semaphore; +use tokio::sync::{AcquireError, Semaphore}; use tokio::task::yield_now; use tracing::{debug, trace, warn}; @@ -151,6 +151,13 @@ pub enum PlottingError { /// Lower-level error error: Box, }, + /// Failed to acquire permit + #[error("Failed to acquire permit: {error}")] + FailedToAcquirePermit { + /// Lower-level error + #[from] + error: AcquireError, + }, } /// Options for plotting a sector. @@ -244,7 +251,7 @@ where } let _downloading_permit = match downloading_semaphore { - Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await), + Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await?), None => None, }; @@ -311,7 +318,7 @@ where let mut raw_sector = raw_sector.into_inner(); let _encoding_permit = match encoding_semaphore { - Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await), + Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await?), None => None, }; From 4957fb4f560826d928df2e70749b5ebc2d6dd14f Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 26 Oct 2023 17:53:36 +0300 Subject: [PATCH 2/7] Extract public `download_sector` function now used by `plot_sector` for future reusability (the change is just a refactoring, no logic changes) --- .../src/plotting.rs | 211 ++++++++++++------ 1 file changed, 142 insertions(+), 69 deletions(-) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 08b10db9eb..d3d29121f7 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -26,7 +26,7 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Table, TableGenerator}; use thiserror::Error; -use tokio::sync::{AcquireError, Semaphore}; +use tokio::sync::{AcquireError, Semaphore, SemaphorePermit}; use tokio::task::yield_now; use tracing::{debug, trace, warn}; @@ -250,72 +250,22 @@ where }); } - let _downloading_permit = match downloading_semaphore { - Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await?), - None => None, - }; - - let sector_id = SectorId::new(public_key.hash(), sector_index); - - let piece_indexes: Vec = (PieceOffset::ZERO..) - .take(pieces_in_sector.into()) - .map(|piece_offset| { - sector_id.derive_piece_index( - piece_offset, - farmer_protocol_info.history_size, - farmer_protocol_info.max_pieces_in_sector, - farmer_protocol_info.recent_segments, - farmer_protocol_info.recent_history_fraction, - ) - }) - .collect(); - - // TODO: Downloading and encoding below can happen in parallel, but a bit tricky to implement - // due to sync/async pairing - - let raw_sector = Mutex::new(RawSector::new(pieces_in_sector)); - - { - // This list will be mutated, replacing pieces we have already processed with `None` - let incremental_piece_indices = - Mutex::new(piece_indexes.iter().copied().map(Some).collect::>()); - - retry(default_backoff(), || async { - let mut raw_sector = raw_sector.lock().await; - let mut incremental_piece_indices = incremental_piece_indices.lock().await; - - if let Err(error) = download_sector( - &mut raw_sector, - piece_getter, - piece_getter_retry_policy, - kzg, - &mut incremental_piece_indices, - ) - .await - { - let retrieved_pieces = incremental_piece_indices - .iter() - .filter(|maybe_piece_index| maybe_piece_index.is_none()) - .count(); - warn!( - %sector_index, - %error, - %pieces_in_sector, - %retrieved_pieces, - "Sector plotting attempt failed, will retry later" - ); - - return Err(BackoffError::transient(error)); - } - - debug!(%sector_index, "Sector downloaded successfully"); - - Ok(()) - }) - .await?; - } - - let mut raw_sector = raw_sector.into_inner(); + let download_sector_fut = download_sector(DownloadSectorOptions { + public_key, + sector_index, + piece_getter, + piece_getter_retry_policy, + farmer_protocol_info, + kzg, + pieces_in_sector, + downloading_semaphore, + }); + let DownloadedSector { + sector_id, + piece_indices, + mut raw_sector, + downloading_permit: _downloading_permit, + } = download_sector_fut.await?; let _encoding_permit = match encoding_semaphore { Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await?), @@ -491,11 +441,134 @@ where sector_id, sector_index, sector_metadata, - piece_indexes, + piece_indexes: piece_indices, + }) +} + +/// Opaque sector downloaded and ready for encoding +pub struct DownloadedSector<'a> { + sector_id: SectorId, + piece_indices: Vec, + raw_sector: RawSector, + downloading_permit: Option>, +} + +/// Options for sector downloading +pub struct DownloadSectorOptions<'a, PG> { + /// Public key corresponding to sector + pub public_key: &'a PublicKey, + /// Sector index + pub sector_index: SectorIndex, + /// Getter for pieces of archival history + pub piece_getter: &'a PG, + /// Retry policy for piece getter + pub piece_getter_retry_policy: PieceGetterRetryPolicy, + /// Farmer protocol info + pub farmer_protocol_info: &'a FarmerProtocolInfo, + /// KZG instance + pub kzg: &'a Kzg, + /// How many pieces should sector contain + pub pieces_in_sector: u16, + /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory + /// usage of the plotting process, permit will be held until the end of the plotting process + pub downloading_semaphore: Option<&'a Semaphore>, +} + +/// Download sector for plotting. +/// +/// This will identify necessary pieces and download them from DSN, after which they can be encoded +/// and written to the plot. +pub async fn download_sector( + options: DownloadSectorOptions<'_, PG>, +) -> Result, PlottingError> +where + PG: PieceGetter, +{ + let DownloadSectorOptions { + public_key, + sector_index, + piece_getter, + piece_getter_retry_policy, + farmer_protocol_info, + kzg, + pieces_in_sector, + downloading_semaphore, + } = options; + + let downloading_permit = match downloading_semaphore { + Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await?), + None => None, + }; + + let sector_id = SectorId::new(public_key.hash(), sector_index); + + let piece_indices = (PieceOffset::ZERO..) + .take(pieces_in_sector.into()) + .map(|piece_offset| { + sector_id.derive_piece_index( + piece_offset, + farmer_protocol_info.history_size, + farmer_protocol_info.max_pieces_in_sector, + farmer_protocol_info.recent_segments, + farmer_protocol_info.recent_history_fraction, + ) + }) + .collect::>(); + + // TODO: Downloading and encoding below can happen in parallel, but a bit tricky to implement + // due to sync/async pairing + + let raw_sector = Mutex::new(RawSector::new(pieces_in_sector)); + + { + // This list will be mutated, replacing pieces we have already processed with `None` + let incremental_piece_indices = + Mutex::new(piece_indices.iter().copied().map(Some).collect::>()); + + retry(default_backoff(), || async { + let mut raw_sector = raw_sector.lock().await; + let mut incremental_piece_indices = incremental_piece_indices.lock().await; + + if let Err(error) = download_sector_internal( + &mut raw_sector, + piece_getter, + piece_getter_retry_policy, + kzg, + &mut incremental_piece_indices, + ) + .await + { + let retrieved_pieces = incremental_piece_indices + .iter() + .filter(|maybe_piece_index| maybe_piece_index.is_none()) + .count(); + warn!( + %sector_index, + %error, + %pieces_in_sector, + %retrieved_pieces, + "Sector plotting attempt failed, will retry later" + ); + + return Err(BackoffError::transient(error)); + } + + debug!(%sector_index, "Sector downloaded successfully"); + + Ok(()) + }) + .await?; + } + + Ok(DownloadedSector { + sector_id, + piece_indices, + raw_sector: raw_sector.into_inner(), + downloading_permit, }) } -async fn download_sector( +async fn download_sector_internal( raw_sector: &mut RawSector, piece_getter: &PG, piece_getter_retry_policy: PieceGetterRetryPolicy, From fb4d2cc6aca8d0c9bebf2309ae6eeefab7eb758a Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 27 Oct 2023 12:36:52 +0300 Subject: [PATCH 3/7] Extract public `encode_sector` function now used by `plot_sector` for future reusability (the change is just a refactoring, no logic changes) --- .../src/plotting.rs | 345 +++++++++++------- 1 file changed, 205 insertions(+), 140 deletions(-) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index d3d29121f7..a82dbcbffe 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -203,6 +203,8 @@ where /// Plot a single sector. /// +/// This is a convenient wrapper around [`download_sector`] and [`encode_sector`] functions. +/// /// NOTE: Even though this function is async, it has blocking code inside and must be running in a /// separate thread in order to prevent blocking an executor. pub async fn plot_sector( @@ -228,6 +230,209 @@ where table_generator, } = options; + let download_sector_fut = download_sector(DownloadSectorOptions { + public_key, + sector_index, + piece_getter, + piece_getter_retry_policy, + farmer_protocol_info, + kzg, + pieces_in_sector, + downloading_semaphore, + }); + + encode_sector( + download_sector_fut.await?, + EncodeSectorOptions:: { + sector_index, + erasure_coding, + pieces_in_sector, + sector_output, + sector_metadata_output, + encoding_semaphore, + table_generator, + }, + ) + .await +} + +/// Opaque sector downloaded and ready for encoding +pub struct DownloadedSector<'a> { + sector_id: SectorId, + piece_indices: Vec, + raw_sector: RawSector, + farmer_protocol_info: &'a FarmerProtocolInfo, + downloading_permit: Option>, +} + +/// Options for sector downloading +pub struct DownloadSectorOptions<'a, PG> { + /// Public key corresponding to sector + pub public_key: &'a PublicKey, + /// Sector index + pub sector_index: SectorIndex, + /// Getter for pieces of archival history + pub piece_getter: &'a PG, + /// Retry policy for piece getter + pub piece_getter_retry_policy: PieceGetterRetryPolicy, + /// Farmer protocol info + pub farmer_protocol_info: &'a FarmerProtocolInfo, + /// KZG instance + pub kzg: &'a Kzg, + /// How many pieces should sector contain + pub pieces_in_sector: u16, + /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory + /// usage of the plotting process, permit will be held until the end of the plotting process + pub downloading_semaphore: Option<&'a Semaphore>, +} + +/// Download sector for plotting. +/// +/// This will identify necessary pieces and download them from DSN, after which they can be encoded +/// and written to the plot. +pub async fn download_sector( + options: DownloadSectorOptions<'_, PG>, +) -> Result, PlottingError> +where + PG: PieceGetter, +{ + let DownloadSectorOptions { + public_key, + sector_index, + piece_getter, + piece_getter_retry_policy, + farmer_protocol_info, + kzg, + pieces_in_sector, + downloading_semaphore, + } = options; + + let downloading_permit = match downloading_semaphore { + Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await?), + None => None, + }; + + let sector_id = SectorId::new(public_key.hash(), sector_index); + + let piece_indices = (PieceOffset::ZERO..) + .take(pieces_in_sector.into()) + .map(|piece_offset| { + sector_id.derive_piece_index( + piece_offset, + farmer_protocol_info.history_size, + farmer_protocol_info.max_pieces_in_sector, + farmer_protocol_info.recent_segments, + farmer_protocol_info.recent_history_fraction, + ) + }) + .collect::>(); + + // TODO: Downloading and encoding below can happen in parallel, but a bit tricky to implement + // due to sync/async pairing + + let raw_sector = Mutex::new(RawSector::new(pieces_in_sector)); + + { + // This list will be mutated, replacing pieces we have already processed with `None` + let incremental_piece_indices = + Mutex::new(piece_indices.iter().copied().map(Some).collect::>()); + + retry(default_backoff(), || async { + let mut raw_sector = raw_sector.lock().await; + let mut incremental_piece_indices = incremental_piece_indices.lock().await; + + if let Err(error) = download_sector_internal( + &mut raw_sector, + piece_getter, + piece_getter_retry_policy, + kzg, + &mut incremental_piece_indices, + ) + .await + { + let retrieved_pieces = incremental_piece_indices + .iter() + .filter(|maybe_piece_index| maybe_piece_index.is_none()) + .count(); + warn!( + %sector_index, + %error, + %pieces_in_sector, + %retrieved_pieces, + "Sector plotting attempt failed, will retry later" + ); + + return Err(BackoffError::transient(error)); + } + + debug!(%sector_index, "Sector downloaded successfully"); + + Ok(()) + }) + .await?; + } + + Ok(DownloadedSector { + sector_id, + piece_indices, + raw_sector: raw_sector.into_inner(), + farmer_protocol_info, + downloading_permit, + }) +} + +/// Options for encoding a sector. +/// +/// Sector output and sector metadata output should be either empty (in which case they'll be +/// resized to correct size automatically) or correctly sized from the beginning or else error will +/// be returned. +pub struct EncodeSectorOptions<'a, PosTable> +where + PosTable: Table, +{ + /// Sector index + pub sector_index: SectorIndex, + /// Erasure coding instance + pub erasure_coding: &'a ErasureCoding, + /// How many pieces should sector contain + pub pieces_in_sector: u16, + /// Where plotted sector should be written, vector must either be empty (in which case it'll be + /// resized to correct size automatically) or correctly sized from the beginning + pub sector_output: &'a mut Vec, + /// Where plotted sector metadata should be written, vector must either be empty (in which case + /// it'll be resized to correct size automatically) or correctly sized from the beginning + pub sector_metadata_output: &'a mut Vec, + /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically + /// allow one permit at a time for efficient CPU utilization + pub encoding_semaphore: Option<&'a Semaphore>, + /// Proof of space table generator + pub table_generator: &'a mut PosTable::Generator, +} + +pub async fn encode_sector( + downloaded_sector: DownloadedSector<'_>, + encoding_options: EncodeSectorOptions<'_, PosTable>, +) -> Result +where + PosTable: Table, +{ + let DownloadedSector { + sector_id, + piece_indices, + mut raw_sector, + farmer_protocol_info, + downloading_permit: _downloading_permit, + } = downloaded_sector; + let EncodeSectorOptions { + sector_index, + erasure_coding, + pieces_in_sector, + sector_output, + sector_metadata_output, + encoding_semaphore, + table_generator, + } = encoding_options; + if erasure_coding.max_shards() < Record::NUM_S_BUCKETS { return Err(PlottingError::InvalidErasureCodingInstance); } @@ -250,23 +455,6 @@ where }); } - let download_sector_fut = download_sector(DownloadSectorOptions { - public_key, - sector_index, - piece_getter, - piece_getter_retry_policy, - farmer_protocol_info, - kzg, - pieces_in_sector, - downloading_semaphore, - }); - let DownloadedSector { - sector_id, - piece_indices, - mut raw_sector, - downloading_permit: _downloading_permit, - } = download_sector_fut.await?; - let _encoding_permit = match encoding_semaphore { Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await?), None => None, @@ -445,129 +633,6 @@ where }) } -/// Opaque sector downloaded and ready for encoding -pub struct DownloadedSector<'a> { - sector_id: SectorId, - piece_indices: Vec, - raw_sector: RawSector, - downloading_permit: Option>, -} - -/// Options for sector downloading -pub struct DownloadSectorOptions<'a, PG> { - /// Public key corresponding to sector - pub public_key: &'a PublicKey, - /// Sector index - pub sector_index: SectorIndex, - /// Getter for pieces of archival history - pub piece_getter: &'a PG, - /// Retry policy for piece getter - pub piece_getter_retry_policy: PieceGetterRetryPolicy, - /// Farmer protocol info - pub farmer_protocol_info: &'a FarmerProtocolInfo, - /// KZG instance - pub kzg: &'a Kzg, - /// How many pieces should sector contain - pub pieces_in_sector: u16, - /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory - /// usage of the plotting process, permit will be held until the end of the plotting process - pub downloading_semaphore: Option<&'a Semaphore>, -} - -/// Download sector for plotting. -/// -/// This will identify necessary pieces and download them from DSN, after which they can be encoded -/// and written to the plot. -pub async fn download_sector( - options: DownloadSectorOptions<'_, PG>, -) -> Result, PlottingError> -where - PG: PieceGetter, -{ - let DownloadSectorOptions { - public_key, - sector_index, - piece_getter, - piece_getter_retry_policy, - farmer_protocol_info, - kzg, - pieces_in_sector, - downloading_semaphore, - } = options; - - let downloading_permit = match downloading_semaphore { - Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await?), - None => None, - }; - - let sector_id = SectorId::new(public_key.hash(), sector_index); - - let piece_indices = (PieceOffset::ZERO..) - .take(pieces_in_sector.into()) - .map(|piece_offset| { - sector_id.derive_piece_index( - piece_offset, - farmer_protocol_info.history_size, - farmer_protocol_info.max_pieces_in_sector, - farmer_protocol_info.recent_segments, - farmer_protocol_info.recent_history_fraction, - ) - }) - .collect::>(); - - // TODO: Downloading and encoding below can happen in parallel, but a bit tricky to implement - // due to sync/async pairing - - let raw_sector = Mutex::new(RawSector::new(pieces_in_sector)); - - { - // This list will be mutated, replacing pieces we have already processed with `None` - let incremental_piece_indices = - Mutex::new(piece_indices.iter().copied().map(Some).collect::>()); - - retry(default_backoff(), || async { - let mut raw_sector = raw_sector.lock().await; - let mut incremental_piece_indices = incremental_piece_indices.lock().await; - - if let Err(error) = download_sector_internal( - &mut raw_sector, - piece_getter, - piece_getter_retry_policy, - kzg, - &mut incremental_piece_indices, - ) - .await - { - let retrieved_pieces = incremental_piece_indices - .iter() - .filter(|maybe_piece_index| maybe_piece_index.is_none()) - .count(); - warn!( - %sector_index, - %error, - %pieces_in_sector, - %retrieved_pieces, - "Sector plotting attempt failed, will retry later" - ); - - return Err(BackoffError::transient(error)); - } - - debug!(%sector_index, "Sector downloaded successfully"); - - Ok(()) - }) - .await?; - } - - Ok(DownloadedSector { - sector_id, - piece_indices, - raw_sector: raw_sector.into_inner(), - downloading_permit, - }) -} - async fn download_sector_internal( raw_sector: &mut RawSector, piece_getter: &PG, From ec40dffcf9b589b1741e315a964a11dfb679c434 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 27 Oct 2023 15:14:58 +0300 Subject: [PATCH 4/7] Use new separate functions to download and to encode plotted sector --- .../src/single_disk_farm/plotting.rs | 49 +++++++++++-------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 3b6146f6a4..785cf6c9c2 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -27,7 +27,8 @@ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::plotting; use subspace_farmer_components::plotting::{ - plot_sector, PieceGetter, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, + download_sector, encode_sector, DownloadSectorOptions, EncodeSectorOptions, PieceGetter, + PieceGetterRetryPolicy, PlottedSector, }; use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_proof_of_space::Table; @@ -214,6 +215,20 @@ where break farmer_app_info; }; + let downloaded_sector_fut = download_sector(DownloadSectorOptions { + public_key: &public_key, + sector_index, + piece_getter: &piece_getter, + piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( + PIECE_GETTER_RETRY_NUMBER.get(), + ), + farmer_protocol_info: &farmer_app_info.protocol_info, + kzg: &kzg, + pieces_in_sector, + downloading_semaphore: Some(&downloading_semaphore), + }); + let downloaded_sector = downloaded_sector_fut.await?; + // Inform others that this sector is being modified modifying_sector_index.write().await.replace(sector_index); @@ -225,32 +240,24 @@ where let mut sector = Vec::new(); let mut sector_metadata = Vec::new(); - let piece_getter = piece_getter.clone(); - let kzg = kzg.clone(); let erasure_coding = erasure_coding.clone(); - let downloading_semaphore = Arc::clone(&downloading_semaphore); let encoding_semaphore = Arc::clone(&encoding_semaphore); let mut stop_receiver = stop_receiver.resubscribe(); let plotting_fn = move || { tokio::task::block_in_place(move || { - let plot_sector_fut = plot_sector::(PlotSectorOptions { - public_key: &public_key, - sector_index, - piece_getter: &piece_getter, - piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( - PIECE_GETTER_RETRY_NUMBER.get(), - ), - farmer_protocol_info: &farmer_app_info.protocol_info, - kzg: &kzg, - erasure_coding: &erasure_coding, - pieces_in_sector, - sector_output: &mut sector, - sector_metadata_output: &mut sector_metadata, - downloading_semaphore: Some(&downloading_semaphore), - encoding_semaphore: Some(&encoding_semaphore), - table_generator: &mut table_generator, - }); + let plot_sector_fut = encode_sector::( + downloaded_sector, + EncodeSectorOptions { + sector_index, + erasure_coding: &erasure_coding, + pieces_in_sector, + sector_output: &mut sector, + sector_metadata_output: &mut sector_metadata, + encoding_semaphore: Some(&encoding_semaphore), + table_generator: &mut table_generator, + }, + ); let plotted_sector = Handle::current().block_on(async { select! { From 199ee341f836d72e20f9f5199783f55251a5e40c Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 27 Oct 2023 15:23:09 +0300 Subject: [PATCH 5/7] Refactor `subspace-farmer-components` API a bit for future changes --- crates/pallet-subspace/src/mock.rs | 2 +- .../benches/auditing.rs | 2 +- .../benches/plotting.rs | 2 +- .../benches/proving.rs | 2 +- .../benches/reading.rs | 2 +- .../src/plotting.rs | 22 +++++++++---------- .../src/single_disk_farm/plotting.rs | 4 ++-- test/subspace-test-client/src/lib.rs | 2 +- 8 files changed, 19 insertions(+), 19 deletions(-) diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 1285c0704e..aec709821e 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -461,7 +461,7 @@ pub fn create_signed_vote( sector_index, piece_getter: archived_history_segment, piece_getter_retry_policy: PieceGetterRetryPolicy::default(), - farmer_protocol_info: &farmer_protocol_info, + farmer_protocol_info, kzg, erasure_coding, pieces_in_sector, diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index 5d650cfba4..7d13f6d617 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -122,7 +122,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { sector_index, piece_getter: &archived_history_segment, piece_getter_retry_policy: PieceGetterRetryPolicy::default(), - farmer_protocol_info: &farmer_protocol_info, + farmer_protocol_info, kzg: &kzg, erasure_coding: &erasure_coding, pieces_in_sector, diff --git a/crates/subspace-farmer-components/benches/plotting.rs b/crates/subspace-farmer-components/benches/plotting.rs index e5d305773b..ef8c9c6899 100644 --- a/crates/subspace-farmer-components/benches/plotting.rs +++ b/crates/subspace-farmer-components/benches/plotting.rs @@ -73,7 +73,7 @@ fn criterion_benchmark(c: &mut Criterion) { sector_index: black_box(sector_index), piece_getter: black_box(&archived_history_segment), piece_getter_retry_policy: black_box(PieceGetterRetryPolicy::default()), - farmer_protocol_info: black_box(&farmer_protocol_info), + farmer_protocol_info: black_box(farmer_protocol_info), kzg: black_box(&kzg), erasure_coding: black_box(&erasure_coding), pieces_in_sector: black_box(pieces_in_sector), diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index d392907fcb..97f6b9543d 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -129,7 +129,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { sector_index, piece_getter: &archived_history_segment, piece_getter_retry_policy: PieceGetterRetryPolicy::default(), - farmer_protocol_info: &farmer_protocol_info, + farmer_protocol_info, kzg, erasure_coding, pieces_in_sector, diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index 00cd4019ea..64174421b5 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -122,7 +122,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { sector_index, piece_getter: &archived_history_segment, piece_getter_retry_policy: PieceGetterRetryPolicy::default(), - farmer_protocol_info: &farmer_protocol_info, + farmer_protocol_info, kzg: &kzg, erasure_coding: &erasure_coding, pieces_in_sector, diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index a82dbcbffe..9cf7695504 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -26,7 +26,7 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Table, TableGenerator}; use thiserror::Error; -use tokio::sync::{AcquireError, Semaphore, SemaphorePermit}; +use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore}; use tokio::task::yield_now; use tracing::{debug, trace, warn}; @@ -178,7 +178,7 @@ where /// Retry policy for piece getter pub piece_getter_retry_policy: PieceGetterRetryPolicy, /// Farmer protocol info - pub farmer_protocol_info: &'a FarmerProtocolInfo, + pub farmer_protocol_info: FarmerProtocolInfo, /// KZG instance pub kzg: &'a Kzg, /// Erasure coding instance @@ -193,7 +193,7 @@ where pub sector_metadata_output: &'a mut Vec, /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory /// usage of the plotting process, permit will be held until the end of the plotting process - pub downloading_semaphore: Option<&'a Semaphore>, + pub downloading_semaphore: Option>, /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically /// allow one permit at a time for efficient CPU utilization pub encoding_semaphore: Option<&'a Semaphore>, @@ -257,12 +257,12 @@ where } /// Opaque sector downloaded and ready for encoding -pub struct DownloadedSector<'a> { +pub struct DownloadedSector { sector_id: SectorId, piece_indices: Vec, raw_sector: RawSector, - farmer_protocol_info: &'a FarmerProtocolInfo, - downloading_permit: Option>, + farmer_protocol_info: FarmerProtocolInfo, + downloading_permit: Option, } /// Options for sector downloading @@ -276,14 +276,14 @@ pub struct DownloadSectorOptions<'a, PG> { /// Retry policy for piece getter pub piece_getter_retry_policy: PieceGetterRetryPolicy, /// Farmer protocol info - pub farmer_protocol_info: &'a FarmerProtocolInfo, + pub farmer_protocol_info: FarmerProtocolInfo, /// KZG instance pub kzg: &'a Kzg, /// How many pieces should sector contain pub pieces_in_sector: u16, /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory /// usage of the plotting process, permit will be held until the end of the plotting process - pub downloading_semaphore: Option<&'a Semaphore>, + pub downloading_semaphore: Option>, } /// Download sector for plotting. @@ -292,7 +292,7 @@ pub struct DownloadSectorOptions<'a, PG> { /// and written to the plot. pub async fn download_sector( options: DownloadSectorOptions<'_, PG>, -) -> Result, PlottingError> +) -> Result where PG: PieceGetter, { @@ -308,7 +308,7 @@ where } = options; let downloading_permit = match downloading_semaphore { - Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await?), + Some(downloading_semaphore) => Some(downloading_semaphore.acquire_owned().await?), None => None, }; @@ -410,7 +410,7 @@ where } pub async fn encode_sector( - downloaded_sector: DownloadedSector<'_>, + downloaded_sector: DownloadedSector, encoding_options: EncodeSectorOptions<'_, PosTable>, ) -> Result where diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 785cf6c9c2..87da170cac 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -222,10 +222,10 @@ where piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( PIECE_GETTER_RETRY_NUMBER.get(), ), - farmer_protocol_info: &farmer_app_info.protocol_info, + farmer_protocol_info: farmer_app_info.protocol_info, kzg: &kzg, pieces_in_sector, - downloading_semaphore: Some(&downloading_semaphore), + downloading_semaphore: Some(Arc::clone(&downloading_semaphore)), }); let downloaded_sector = downloaded_sector_fut.await?; diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index b1fc434823..4226fda030 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -259,7 +259,7 @@ where sector_index, piece_getter: &archived_segment.pieces, piece_getter_retry_policy: PieceGetterRetryPolicy::default(), - farmer_protocol_info: &farmer_protocol_info, + farmer_protocol_info, kzg: &kzg, erasure_coding, pieces_in_sector, From fe73d99a90c50d79888753629a8b46168e755259 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 27 Oct 2023 16:07:26 +0300 Subject: [PATCH 6/7] Simplify plotting function without extra `move`s --- .../src/single_disk_farm/plotting.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 87da170cac..072b37920c 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -152,8 +152,11 @@ where encoding_semaphore, plotting_thread_pool, replotting_thread_pool, - stop_receiver, + mut stop_receiver, } = plotting_options; + let erasure_coding = &erasure_coding; + let encoding_semaphore = Some(encoding_semaphore.as_ref()); + let stop_receiver = &mut stop_receiver; let mut table_generator = PosTable::generator(); // TODO: Concurrency @@ -237,24 +240,20 @@ where let plotted_sector; (sector, sector_metadata, table_generator, plotted_sector) = { - let mut sector = Vec::new(); - let mut sector_metadata = Vec::new(); + let plotting_fn = || { + tokio::task::block_in_place(|| { + let mut sector = Vec::new(); + let mut sector_metadata = Vec::new(); - let erasure_coding = erasure_coding.clone(); - let encoding_semaphore = Arc::clone(&encoding_semaphore); - let mut stop_receiver = stop_receiver.resubscribe(); - - let plotting_fn = move || { - tokio::task::block_in_place(move || { let plot_sector_fut = encode_sector::( downloaded_sector, EncodeSectorOptions { sector_index, - erasure_coding: &erasure_coding, + erasure_coding, pieces_in_sector, sector_output: &mut sector, sector_metadata_output: &mut sector_metadata, - encoding_semaphore: Some(&encoding_semaphore), + encoding_semaphore, table_generator: &mut table_generator, }, ); From 8a39aedbfc4afccc1655e734e8f7946e4189647e Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 27 Oct 2023 16:51:07 +0300 Subject: [PATCH 7/7] Download next segment while previous segment is being encoded --- .../subspace-farmer/src/single_disk_farm.rs | 16 +-- .../src/single_disk_farm/plotting.rs | 117 +++++++++++++----- 2 files changed, 92 insertions(+), 41 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 9f033f4f15..f88f97d0e7 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -575,7 +575,7 @@ impl SingleDiskFarm { ) -> Result where NC: NodeClient, - PG: PieceGetter + Clone + Send + 'static, + PG: PieceGetter + Clone + Send + Sync + 'static, PosTable: Table, { let SingleDiskFarmOptions { @@ -850,7 +850,7 @@ impl SingleDiskFarm { let (start_sender, mut start_receiver) = broadcast::channel::<()>(1); let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1); let modifying_sector_index = Arc::>>::default(); - let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(0); + let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1); // Some sectors may already be plotted, skip them let sectors_indices_left_to_plot = metadata_header.plotted_sector_count..target_sector_count; @@ -937,7 +937,7 @@ impl SingleDiskFarm { let plotting_options = PlottingOptions { public_key, - node_client, + node_client: &node_client, pieces_in_sector, sector_size, sector_metadata_size, @@ -945,17 +945,17 @@ impl SingleDiskFarm { plot_file, metadata_file, sectors_metadata, - piece_getter, - kzg, - erasure_coding, + piece_getter: &piece_getter, + kzg: &kzg, + erasure_coding: &erasure_coding, handlers, modifying_sector_index, sectors_to_plot_receiver, downloading_semaphore, - encoding_semaphore, + encoding_semaphore: &encoding_semaphore, plotting_thread_pool, replotting_thread_pool, - stop_receiver: stop_receiver.resubscribe(), + stop_receiver: &mut stop_receiver.resubscribe(), }; let plotting_fut = plotting::<_, _, PosTable>(plotting_options); diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 072b37920c..150129f2ba 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -2,6 +2,7 @@ use crate::single_disk_farm::{ BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails, RESERVED_PLOT_METADATA, }; +use crate::utils::AsyncJoinOnDrop; use crate::{node_client, NodeClient}; use async_lock::RwLock; use atomic::Atomic; @@ -27,15 +28,15 @@ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::plotting; use subspace_farmer_components::plotting::{ - download_sector, encode_sector, DownloadSectorOptions, EncodeSectorOptions, PieceGetter, - PieceGetterRetryPolicy, PlottedSector, + download_sector, encode_sector, DownloadSectorOptions, DownloadedSector, EncodeSectorOptions, + PieceGetter, PieceGetterRetryPolicy, PlottedSector, }; use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_proof_of_space::Table; use thiserror::Error; use tokio::runtime::Handle; use tokio::sync::{broadcast, Semaphore}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, trace, warn, Instrument}; const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500); /// Size of the cache of archived segments for the purposes of faster sector expiration checks. @@ -50,6 +51,7 @@ pub(super) struct SectorToPlot { /// Whether this is the last sector queued so far last_queued: bool, acknowledgement_sender: oneshot::Sender<()>, + next_segment_index_hint: Option, } /// Errors that happen during plotting @@ -91,11 +93,14 @@ pub enum PlottingError { /// Failed to create thread pool #[error("Failed to create thread pool: {0}")] FailedToCreateThreadPool(#[from] ThreadPoolBuildError), + /// Background downloading panicked + #[error("Background downloading panicked")] + BackgroundDownloadingPanicked, } -pub(super) struct PlottingOptions { +pub(super) struct PlottingOptions<'a, NC, PG> { pub(super) public_key: PublicKey, - pub(super) node_client: NC, + pub(super) node_client: &'a NC, pub(super) pieces_in_sector: u16, pub(super) sector_size: usize, pub(super) sector_metadata_size: usize, @@ -103,9 +108,9 @@ pub(super) struct PlottingOptions { pub(super) plot_file: Arc, pub(super) metadata_file: File, pub(super) sectors_metadata: Arc>>, - pub(super) piece_getter: PG, - pub(super) kzg: Kzg, - pub(super) erasure_coding: ErasureCoding, + pub(super) piece_getter: &'a PG, + pub(super) kzg: &'a Kzg, + pub(super) erasure_coding: &'a ErasureCoding, pub(super) handlers: Arc, pub(super) modifying_sector_index: Arc>>, pub(super) sectors_to_plot_receiver: mpsc::Receiver, @@ -114,10 +119,10 @@ pub(super) struct PlottingOptions { pub(crate) downloading_semaphore: Arc, /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically /// allow one permit at a time for efficient CPU utilization - pub(crate) encoding_semaphore: Arc, + pub(crate) encoding_semaphore: &'a Semaphore, pub(super) plotting_thread_pool: ThreadPool, pub(super) replotting_thread_pool: ThreadPool, - pub(super) stop_receiver: broadcast::Receiver<()>, + pub(super) stop_receiver: &'a mut broadcast::Receiver<()>, } /// Starts plotting process. @@ -125,11 +130,11 @@ pub(super) struct PlottingOptions { /// NOTE: Returned future is async, but does blocking operations and should be running in dedicated /// thread. pub(super) async fn plotting( - plotting_options: PlottingOptions, + plotting_options: PlottingOptions<'_, NC, PG>, ) -> Result<(), PlottingError> where NC: NodeClient, - PG: PieceGetter + Clone + Send + 'static, + PG: PieceGetter + Clone + Send + Sync + 'static, PosTable: Table, { let PlottingOptions { @@ -152,20 +157,23 @@ where encoding_semaphore, plotting_thread_pool, replotting_thread_pool, - mut stop_receiver, + stop_receiver, } = plotting_options; - let erasure_coding = &erasure_coding; - let encoding_semaphore = Some(encoding_semaphore.as_ref()); - let stop_receiver = &mut stop_receiver; let mut table_generator = PosTable::generator(); - // TODO: Concurrency + + let mut maybe_next_downloaded_sector_fut = + None::>>; while let Some(sector_to_plot) = sectors_to_plot_receiver.next().await { let SectorToPlot { sector_index, progress, last_queued, acknowledgement_sender: _acknowledgement_sender, + // TODO: Remove this hint once we have + // https://github.com/rust-lang/futures-rs/issues/2793 and can + // `sectors_to_plot_receiver.try_peek()` instead + next_segment_index_hint, } = sector_to_plot; trace!(%sector_index, "Preparing to plot sector"); @@ -218,19 +226,55 @@ where break farmer_app_info; }; - let downloaded_sector_fut = download_sector(DownloadSectorOptions { - public_key: &public_key, - sector_index, - piece_getter: &piece_getter, - piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( - PIECE_GETTER_RETRY_NUMBER.get(), - ), - farmer_protocol_info: farmer_app_info.protocol_info, - kzg: &kzg, - pieces_in_sector, - downloading_semaphore: Some(Arc::clone(&downloading_semaphore)), - }); - let downloaded_sector = downloaded_sector_fut.await?; + let downloaded_sector = + if let Some(downloaded_sector_fut) = maybe_next_downloaded_sector_fut.take() { + downloaded_sector_fut + .await + .map_err(|_error| PlottingError::BackgroundDownloadingPanicked)?? + } else { + let downloaded_sector_fut = download_sector(DownloadSectorOptions { + public_key: &public_key, + sector_index, + piece_getter, + piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( + PIECE_GETTER_RETRY_NUMBER.get(), + ), + farmer_protocol_info: farmer_app_info.protocol_info, + kzg, + pieces_in_sector, + downloading_semaphore: Some(Arc::clone(&downloading_semaphore)), + }); + downloaded_sector_fut.await? + }; + + // Initiate downloading of pieces for the next segment index if already known + if let Some(sector_index) = next_segment_index_hint { + let piece_getter = piece_getter.clone(); + let downloading_semaphore = Some(Arc::clone(&downloading_semaphore)); + let kzg = kzg.clone(); + + maybe_next_downloaded_sector_fut.replace(AsyncJoinOnDrop::new( + tokio::spawn( + async move { + let downloaded_sector_fut = download_sector(DownloadSectorOptions { + public_key: &public_key, + sector_index, + piece_getter: &piece_getter, + piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( + PIECE_GETTER_RETRY_NUMBER.get(), + ), + farmer_protocol_info: farmer_app_info.protocol_info, + kzg: &kzg, + pieces_in_sector, + downloading_semaphore, + }); + downloaded_sector_fut.await + } + .in_current_span(), + ), + true, + )); + } // Inform others that this sector is being modified modifying_sector_index.write().await.replace(sector_index); @@ -253,7 +297,7 @@ where pieces_in_sector, sector_output: &mut sector, sector_metadata_output: &mut sector_metadata, - encoding_semaphore, + encoding_semaphore: Some(encoding_semaphore), table_generator: &mut table_generator, }, ); @@ -571,7 +615,8 @@ where NC: NodeClient, { // Finish initial plotting if some sectors were not plotted fully yet - for sector_index in sectors_indices_left_to_plot { + let mut sectors_indices_left_to_plot = sectors_indices_left_to_plot.into_iter().peekable(); + while let Some(sector_index) = sectors_indices_left_to_plot.next() { let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel(); if let Err(error) = sectors_to_plot_sender .send(SectorToPlot { @@ -579,6 +624,7 @@ where progress: sector_index as f32 / target_sector_count as f32 * 100.0, last_queued: sector_index + 1 == target_sector_count, acknowledgement_sender, + next_segment_index_hint: sectors_indices_left_to_plot.peek().copied(), }) .await { @@ -724,7 +770,9 @@ where } let sectors_queued = sector_indices_to_replot.len(); - for (index, sector_index) in sector_indices_to_replot.drain(..).enumerate() { + let mut sector_indices_to_replot = + sector_indices_to_replot.drain(..).enumerate().peekable(); + while let Some((index, sector_index)) = sector_indices_to_replot.next() { let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel(); if let Err(error) = sectors_to_plot_sender .send(SectorToPlot { @@ -732,6 +780,9 @@ where progress: index as f32 / sectors_queued as f32 * 100.0, last_queued: index + 1 == sectors_queued, acknowledgement_sender, + next_segment_index_hint: sector_indices_to_replot + .peek() + .map(|(_index, sector_index)| *sector_index), }) .await {