Skip to content

Commit

Permalink
Merge pull request #2172 from subspace/downloading/encoding-concurrency
Browse files Browse the repository at this point in the history
Downloading/encoding concurrency
  • Loading branch information
nazar-pc authored Oct 30, 2023
2 parents 3013c16 + 8a39aed commit 31ab035
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 88 deletions.
2 changes: 1 addition & 1 deletion crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ pub fn create_signed_vote(
sector_index,
piece_getter: archived_history_segment,
piece_getter_retry_policy: PieceGetterRetryPolicy::default(),
farmer_protocol_info: &farmer_protocol_info,
farmer_protocol_info,
kzg,
erasure_coding,
pieces_in_sector,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
sector_index,
piece_getter: &archived_history_segment,
piece_getter_retry_policy: PieceGetterRetryPolicy::default(),
farmer_protocol_info: &farmer_protocol_info,
farmer_protocol_info,
kzg: &kzg,
erasure_coding: &erasure_coding,
pieces_in_sector,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn criterion_benchmark(c: &mut Criterion) {
sector_index: black_box(sector_index),
piece_getter: black_box(&archived_history_segment),
piece_getter_retry_policy: black_box(PieceGetterRetryPolicy::default()),
farmer_protocol_info: black_box(&farmer_protocol_info),
farmer_protocol_info: black_box(farmer_protocol_info),
kzg: black_box(&kzg),
erasure_coding: black_box(&erasure_coding),
pieces_in_sector: black_box(pieces_in_sector),
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
sector_index,
piece_getter: &archived_history_segment,
piece_getter_retry_policy: PieceGetterRetryPolicy::default(),
farmer_protocol_info: &farmer_protocol_info,
farmer_protocol_info,
kzg,
erasure_coding,
pieces_in_sector,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/benches/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
sector_index,
piece_getter: &archived_history_segment,
piece_getter_retry_policy: PieceGetterRetryPolicy::default(),
farmer_protocol_info: &farmer_protocol_info,
farmer_protocol_info,
kzg: &kzg,
erasure_coding: &erasure_coding,
pieces_in_sector,
Expand Down
207 changes: 176 additions & 31 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_proof_of_space::{Table, TableGenerator};
use thiserror::Error;
use tokio::sync::Semaphore;
use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore};
use tokio::task::yield_now;
use tracing::{debug, trace, warn};

Expand Down Expand Up @@ -151,6 +151,13 @@ pub enum PlottingError {
/// Lower-level error
error: Box<dyn std::error::Error + Send + Sync + 'static>,
},
/// Failed to acquire permit
#[error("Failed to acquire permit: {error}")]
FailedToAcquirePermit {
/// Lower-level error
#[from]
error: AcquireError,
},
}

/// Options for plotting a sector.
Expand All @@ -171,7 +178,7 @@ where
/// Retry policy for piece getter
pub piece_getter_retry_policy: PieceGetterRetryPolicy,
/// Farmer protocol info
pub farmer_protocol_info: &'a FarmerProtocolInfo,
pub farmer_protocol_info: FarmerProtocolInfo,
/// KZG instance
pub kzg: &'a Kzg,
/// Erasure coding instance
Expand All @@ -186,7 +193,7 @@ where
pub sector_metadata_output: &'a mut Vec<u8>,
/// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
/// usage of the plotting process, permit will be held until the end of the plotting process
pub downloading_semaphore: Option<&'a Semaphore>,
pub downloading_semaphore: Option<Arc<Semaphore>>,
/// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
/// allow one permit at a time for efficient CPU utilization
pub encoding_semaphore: Option<&'a Semaphore>,
Expand All @@ -196,6 +203,8 @@ where

/// Plot a single sector.
///
/// This is a convenient wrapper around [`download_sector`] and [`encode_sector`] functions.
///
/// NOTE: Even though this function is async, it has blocking code inside and must be running in a
/// separate thread in order to prevent blocking an executor.
pub async fn plot_sector<PosTable, PG>(
Expand All @@ -221,36 +230,91 @@ where
table_generator,
} = options;

if erasure_coding.max_shards() < Record::NUM_S_BUCKETS {
return Err(PlottingError::InvalidErasureCodingInstance);
}
let download_sector_fut = download_sector(DownloadSectorOptions {
public_key,
sector_index,
piece_getter,
piece_getter_retry_policy,
farmer_protocol_info,
kzg,
pieces_in_sector,
downloading_semaphore,
});

let sector_size = sector_size(pieces_in_sector);
encode_sector(
download_sector_fut.await?,
EncodeSectorOptions::<PosTable> {
sector_index,
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
encoding_semaphore,
table_generator,
},
)
.await
}

if !sector_output.is_empty() && sector_output.len() != sector_size {
return Err(PlottingError::BadSectorOutputSize {
provided: sector_output.len(),
expected: sector_size,
});
}
/// Opaque sector downloaded and ready for encoding
pub struct DownloadedSector {
sector_id: SectorId,
piece_indices: Vec<PieceIndex>,
raw_sector: RawSector,
farmer_protocol_info: FarmerProtocolInfo,
downloading_permit: Option<OwnedSemaphorePermit>,
}

if !sector_metadata_output.is_empty()
&& sector_metadata_output.len() != SectorMetadataChecksummed::encoded_size()
{
return Err(PlottingError::BadSectorMetadataOutputSize {
provided: sector_metadata_output.len(),
expected: SectorMetadataChecksummed::encoded_size(),
});
}
/// Options for sector downloading
pub struct DownloadSectorOptions<'a, PG> {
/// Public key corresponding to sector
pub public_key: &'a PublicKey,
/// Sector index
pub sector_index: SectorIndex,
/// Getter for pieces of archival history
pub piece_getter: &'a PG,
/// Retry policy for piece getter
pub piece_getter_retry_policy: PieceGetterRetryPolicy,
/// Farmer protocol info
pub farmer_protocol_info: FarmerProtocolInfo,
/// KZG instance
pub kzg: &'a Kzg,
/// How many pieces should sector contain
pub pieces_in_sector: u16,
/// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
/// usage of the plotting process, permit will be held until the end of the plotting process
pub downloading_semaphore: Option<Arc<Semaphore>>,
}

let _downloading_permit = match downloading_semaphore {
Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await),
/// Download sector for plotting.
///
/// This will identify necessary pieces and download them from DSN, after which they can be encoded
/// and written to the plot.
pub async fn download_sector<PG>(
options: DownloadSectorOptions<'_, PG>,
) -> Result<DownloadedSector, PlottingError>
where
PG: PieceGetter,
{
let DownloadSectorOptions {
public_key,
sector_index,
piece_getter,
piece_getter_retry_policy,
farmer_protocol_info,
kzg,
pieces_in_sector,
downloading_semaphore,
} = options;

let downloading_permit = match downloading_semaphore {
Some(downloading_semaphore) => Some(downloading_semaphore.acquire_owned().await?),
None => None,
};

let sector_id = SectorId::new(public_key.hash(), sector_index);

let piece_indexes: Vec<PieceIndex> = (PieceOffset::ZERO..)
let piece_indices = (PieceOffset::ZERO..)
.take(pieces_in_sector.into())
.map(|piece_offset| {
sector_id.derive_piece_index(
Expand All @@ -261,7 +325,7 @@ where
farmer_protocol_info.recent_history_fraction,
)
})
.collect();
.collect::<Vec<_>>();

// TODO: Downloading and encoding below can happen in parallel, but a bit tricky to implement
// due to sync/async pairing
Expand All @@ -271,13 +335,13 @@ where
{
// This list will be mutated, replacing pieces we have already processed with `None`
let incremental_piece_indices =
Mutex::new(piece_indexes.iter().copied().map(Some).collect::<Vec<_>>());
Mutex::new(piece_indices.iter().copied().map(Some).collect::<Vec<_>>());

retry(default_backoff(), || async {
let mut raw_sector = raw_sector.lock().await;
let mut incremental_piece_indices = incremental_piece_indices.lock().await;

if let Err(error) = download_sector(
if let Err(error) = download_sector_internal(
&mut raw_sector,
piece_getter,
piece_getter_retry_policy,
Expand Down Expand Up @@ -308,10 +372,91 @@ where
.await?;
}

let mut raw_sector = raw_sector.into_inner();
Ok(DownloadedSector {
sector_id,
piece_indices,
raw_sector: raw_sector.into_inner(),
farmer_protocol_info,
downloading_permit,
})
}

/// Options for encoding a sector.
///
/// Sector output and sector metadata output should be either empty (in which case they'll be
/// resized to correct size automatically) or correctly sized from the beginning or else error will
/// be returned.
pub struct EncodeSectorOptions<'a, PosTable>
where
PosTable: Table,
{
/// Sector index
pub sector_index: SectorIndex,
/// Erasure coding instance
pub erasure_coding: &'a ErasureCoding,
/// How many pieces should sector contain
pub pieces_in_sector: u16,
/// Where plotted sector should be written, vector must either be empty (in which case it'll be
/// resized to correct size automatically) or correctly sized from the beginning
pub sector_output: &'a mut Vec<u8>,
/// Where plotted sector metadata should be written, vector must either be empty (in which case
/// it'll be resized to correct size automatically) or correctly sized from the beginning
pub sector_metadata_output: &'a mut Vec<u8>,
/// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
/// allow one permit at a time for efficient CPU utilization
pub encoding_semaphore: Option<&'a Semaphore>,
/// Proof of space table generator
pub table_generator: &'a mut PosTable::Generator,
}

pub async fn encode_sector<PosTable>(
downloaded_sector: DownloadedSector,
encoding_options: EncodeSectorOptions<'_, PosTable>,
) -> Result<PlottedSector, PlottingError>
where
PosTable: Table,
{
let DownloadedSector {
sector_id,
piece_indices,
mut raw_sector,
farmer_protocol_info,
downloading_permit: _downloading_permit,
} = downloaded_sector;
let EncodeSectorOptions {
sector_index,
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
encoding_semaphore,
table_generator,
} = encoding_options;

if erasure_coding.max_shards() < Record::NUM_S_BUCKETS {
return Err(PlottingError::InvalidErasureCodingInstance);
}

let sector_size = sector_size(pieces_in_sector);

if !sector_output.is_empty() && sector_output.len() != sector_size {
return Err(PlottingError::BadSectorOutputSize {
provided: sector_output.len(),
expected: sector_size,
});
}

if !sector_metadata_output.is_empty()
&& sector_metadata_output.len() != SectorMetadataChecksummed::encoded_size()
{
return Err(PlottingError::BadSectorMetadataOutputSize {
provided: sector_metadata_output.len(),
expected: SectorMetadataChecksummed::encoded_size(),
});
}

let _encoding_permit = match encoding_semaphore {
Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await?),
None => None,
};

Expand Down Expand Up @@ -484,11 +629,11 @@ where
sector_id,
sector_index,
sector_metadata,
piece_indexes,
piece_indexes: piece_indices,
})
}

async fn download_sector<PG: PieceGetter>(
async fn download_sector_internal<PG: PieceGetter>(
raw_sector: &mut RawSector,
piece_getter: &PG,
piece_getter_retry_policy: PieceGetterRetryPolicy,
Expand Down
16 changes: 8 additions & 8 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ impl SingleDiskFarm {
) -> Result<Self, SingleDiskFarmError>
where
NC: NodeClient,
PG: PieceGetter + Clone + Send + 'static,
PG: PieceGetter + Clone + Send + Sync + 'static,
PosTable: Table,
{
let SingleDiskFarmOptions {
Expand Down Expand Up @@ -850,7 +850,7 @@ impl SingleDiskFarm {
let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
let modifying_sector_index = Arc::<RwLock<Option<SectorIndex>>>::default();
let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(0);
let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
// Some sectors may already be plotted, skip them
let sectors_indices_left_to_plot =
metadata_header.plotted_sector_count..target_sector_count;
Expand Down Expand Up @@ -937,25 +937,25 @@ impl SingleDiskFarm {

let plotting_options = PlottingOptions {
public_key,
node_client,
node_client: &node_client,
pieces_in_sector,
sector_size,
sector_metadata_size,
metadata_header,
plot_file,
metadata_file,
sectors_metadata,
piece_getter,
kzg,
erasure_coding,
piece_getter: &piece_getter,
kzg: &kzg,
erasure_coding: &erasure_coding,
handlers,
modifying_sector_index,
sectors_to_plot_receiver,
downloading_semaphore,
encoding_semaphore,
encoding_semaphore: &encoding_semaphore,
plotting_thread_pool,
replotting_thread_pool,
stop_receiver: stop_receiver.resubscribe(),
stop_receiver: &mut stop_receiver.resubscribe(),
};

let plotting_fut = plotting::<_, _, PosTable>(plotting_options);
Expand Down
Loading

0 comments on commit 31ab035

Please sign in to comment.