Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downloading/encoding concurrency #2172

Merged
merged 7 commits into from
Oct 30, 2023
2 changes: 1 addition & 1 deletion crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ pub fn create_signed_vote(
sector_index,
piece_getter: archived_history_segment,
piece_getter_retry_policy: PieceGetterRetryPolicy::default(),
farmer_protocol_info: &farmer_protocol_info,
farmer_protocol_info,
kzg,
erasure_coding,
pieces_in_sector,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
sector_index,
piece_getter: &archived_history_segment,
piece_getter_retry_policy: PieceGetterRetryPolicy::default(),
farmer_protocol_info: &farmer_protocol_info,
farmer_protocol_info,
kzg: &kzg,
erasure_coding: &erasure_coding,
pieces_in_sector,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn criterion_benchmark(c: &mut Criterion) {
sector_index: black_box(sector_index),
piece_getter: black_box(&archived_history_segment),
piece_getter_retry_policy: black_box(PieceGetterRetryPolicy::default()),
farmer_protocol_info: black_box(&farmer_protocol_info),
farmer_protocol_info: black_box(farmer_protocol_info),
kzg: black_box(&kzg),
erasure_coding: black_box(&erasure_coding),
pieces_in_sector: black_box(pieces_in_sector),
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
sector_index,
piece_getter: &archived_history_segment,
piece_getter_retry_policy: PieceGetterRetryPolicy::default(),
farmer_protocol_info: &farmer_protocol_info,
farmer_protocol_info,
kzg,
erasure_coding,
pieces_in_sector,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/benches/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
sector_index,
piece_getter: &archived_history_segment,
piece_getter_retry_policy: PieceGetterRetryPolicy::default(),
farmer_protocol_info: &farmer_protocol_info,
farmer_protocol_info,
kzg: &kzg,
erasure_coding: &erasure_coding,
pieces_in_sector,
Expand Down
207 changes: 176 additions & 31 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_proof_of_space::{Table, TableGenerator};
use thiserror::Error;
use tokio::sync::Semaphore;
use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore};
use tokio::task::yield_now;
use tracing::{debug, trace, warn};

Expand Down Expand Up @@ -151,6 +151,13 @@ pub enum PlottingError {
/// Lower-level error
error: Box<dyn std::error::Error + Send + Sync + 'static>,
},
/// Failed to acquire permit
#[error("Failed to acquire permit: {error}")]
FailedToAcquirePermit {
/// Lower-level error
#[from]
error: AcquireError,
},
}

/// Options for plotting a sector.
Expand All @@ -171,7 +178,7 @@ where
/// Retry policy for piece getter
pub piece_getter_retry_policy: PieceGetterRetryPolicy,
/// Farmer protocol info
pub farmer_protocol_info: &'a FarmerProtocolInfo,
pub farmer_protocol_info: FarmerProtocolInfo,
/// KZG instance
pub kzg: &'a Kzg,
/// Erasure coding instance
Expand All @@ -186,7 +193,7 @@ where
pub sector_metadata_output: &'a mut Vec<u8>,
/// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
/// usage of the plotting process, permit will be held until the end of the plotting process
pub downloading_semaphore: Option<&'a Semaphore>,
pub downloading_semaphore: Option<Arc<Semaphore>>,
/// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
/// allow one permit at a time for efficient CPU utilization
pub encoding_semaphore: Option<&'a Semaphore>,
Expand All @@ -196,6 +203,8 @@ where

/// Plot a single sector.
///
/// This is a convenient wrapper around [`download_sector`] and [`encode_sector`] functions.
///
/// NOTE: Even though this function is async, it has blocking code inside and must be running in a
/// separate thread in order to prevent blocking an executor.
pub async fn plot_sector<PosTable, PG>(
Expand All @@ -221,36 +230,91 @@ where
table_generator,
} = options;

if erasure_coding.max_shards() < Record::NUM_S_BUCKETS {
return Err(PlottingError::InvalidErasureCodingInstance);
}
let download_sector_fut = download_sector(DownloadSectorOptions {
public_key,
sector_index,
piece_getter,
piece_getter_retry_policy,
farmer_protocol_info,
kzg,
pieces_in_sector,
downloading_semaphore,
});

let sector_size = sector_size(pieces_in_sector);
encode_sector(
download_sector_fut.await?,
EncodeSectorOptions::<PosTable> {
sector_index,
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
encoding_semaphore,
table_generator,
},
)
.await
}

if !sector_output.is_empty() && sector_output.len() != sector_size {
return Err(PlottingError::BadSectorOutputSize {
provided: sector_output.len(),
expected: sector_size,
});
}
/// Opaque sector downloaded and ready for encoding
pub struct DownloadedSector {
sector_id: SectorId,
piece_indices: Vec<PieceIndex>,
raw_sector: RawSector,
farmer_protocol_info: FarmerProtocolInfo,
downloading_permit: Option<OwnedSemaphorePermit>,
}

if !sector_metadata_output.is_empty()
&& sector_metadata_output.len() != SectorMetadataChecksummed::encoded_size()
{
return Err(PlottingError::BadSectorMetadataOutputSize {
provided: sector_metadata_output.len(),
expected: SectorMetadataChecksummed::encoded_size(),
});
}
/// Options for sector downloading
pub struct DownloadSectorOptions<'a, PG> {
/// Public key corresponding to sector
pub public_key: &'a PublicKey,
/// Sector index
pub sector_index: SectorIndex,
/// Getter for pieces of archival history
pub piece_getter: &'a PG,
/// Retry policy for piece getter
pub piece_getter_retry_policy: PieceGetterRetryPolicy,
/// Farmer protocol info
pub farmer_protocol_info: FarmerProtocolInfo,
/// KZG instance
pub kzg: &'a Kzg,
/// How many pieces should sector contain
pub pieces_in_sector: u16,
/// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
/// usage of the plotting process, permit will be held until the end of the plotting process
pub downloading_semaphore: Option<Arc<Semaphore>>,
}

let _downloading_permit = match downloading_semaphore {
Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await),
/// Download sector for plotting.
///
/// This will identify necessary pieces and download them from DSN, after which they can be encoded
/// and written to the plot.
pub async fn download_sector<PG>(
options: DownloadSectorOptions<'_, PG>,
) -> Result<DownloadedSector, PlottingError>
where
PG: PieceGetter,
{
let DownloadSectorOptions {
public_key,
sector_index,
piece_getter,
piece_getter_retry_policy,
farmer_protocol_info,
kzg,
pieces_in_sector,
downloading_semaphore,
} = options;

let downloading_permit = match downloading_semaphore {
Some(downloading_semaphore) => Some(downloading_semaphore.acquire_owned().await?),
None => None,
};

let sector_id = SectorId::new(public_key.hash(), sector_index);

let piece_indexes: Vec<PieceIndex> = (PieceOffset::ZERO..)
let piece_indices = (PieceOffset::ZERO..)
.take(pieces_in_sector.into())
.map(|piece_offset| {
sector_id.derive_piece_index(
Expand All @@ -261,7 +325,7 @@ where
farmer_protocol_info.recent_history_fraction,
)
})
.collect();
.collect::<Vec<_>>();

// TODO: Downloading and encoding below can happen in parallel, but a bit tricky to implement
// due to sync/async pairing
Expand All @@ -271,13 +335,13 @@ where
{
// This list will be mutated, replacing pieces we have already processed with `None`
let incremental_piece_indices =
Mutex::new(piece_indexes.iter().copied().map(Some).collect::<Vec<_>>());
Mutex::new(piece_indices.iter().copied().map(Some).collect::<Vec<_>>());

retry(default_backoff(), || async {
let mut raw_sector = raw_sector.lock().await;
let mut incremental_piece_indices = incremental_piece_indices.lock().await;

if let Err(error) = download_sector(
if let Err(error) = download_sector_internal(
&mut raw_sector,
piece_getter,
piece_getter_retry_policy,
Expand Down Expand Up @@ -308,10 +372,91 @@ where
.await?;
}

let mut raw_sector = raw_sector.into_inner();
Ok(DownloadedSector {
sector_id,
piece_indices,
raw_sector: raw_sector.into_inner(),
farmer_protocol_info,
downloading_permit,
})
}

/// Options for encoding a sector.
///
/// Sector output and sector metadata output should be either empty (in which case they'll be
/// resized to correct size automatically) or correctly sized from the beginning or else error will
/// be returned.
pub struct EncodeSectorOptions<'a, PosTable>
where
PosTable: Table,
{
/// Sector index
pub sector_index: SectorIndex,
/// Erasure coding instance
pub erasure_coding: &'a ErasureCoding,
/// How many pieces should sector contain
pub pieces_in_sector: u16,
/// Where plotted sector should be written, vector must either be empty (in which case it'll be
/// resized to correct size automatically) or correctly sized from the beginning
pub sector_output: &'a mut Vec<u8>,
/// Where plotted sector metadata should be written, vector must either be empty (in which case
/// it'll be resized to correct size automatically) or correctly sized from the beginning
pub sector_metadata_output: &'a mut Vec<u8>,
/// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
/// allow one permit at a time for efficient CPU utilization
pub encoding_semaphore: Option<&'a Semaphore>,
/// Proof of space table generator
pub table_generator: &'a mut PosTable::Generator,
}

pub async fn encode_sector<PosTable>(
downloaded_sector: DownloadedSector,
encoding_options: EncodeSectorOptions<'_, PosTable>,
) -> Result<PlottedSector, PlottingError>
where
PosTable: Table,
{
let DownloadedSector {
sector_id,
piece_indices,
mut raw_sector,
farmer_protocol_info,
downloading_permit: _downloading_permit,
} = downloaded_sector;
let EncodeSectorOptions {
sector_index,
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
encoding_semaphore,
table_generator,
} = encoding_options;

if erasure_coding.max_shards() < Record::NUM_S_BUCKETS {
return Err(PlottingError::InvalidErasureCodingInstance);
}

let sector_size = sector_size(pieces_in_sector);

if !sector_output.is_empty() && sector_output.len() != sector_size {
return Err(PlottingError::BadSectorOutputSize {
provided: sector_output.len(),
expected: sector_size,
});
}

if !sector_metadata_output.is_empty()
&& sector_metadata_output.len() != SectorMetadataChecksummed::encoded_size()
{
return Err(PlottingError::BadSectorMetadataOutputSize {
provided: sector_metadata_output.len(),
expected: SectorMetadataChecksummed::encoded_size(),
});
}

let _encoding_permit = match encoding_semaphore {
Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await?),
None => None,
};

Expand Down Expand Up @@ -484,11 +629,11 @@ where
sector_id,
sector_index,
sector_metadata,
piece_indexes,
piece_indexes: piece_indices,
})
}

async fn download_sector<PG: PieceGetter>(
async fn download_sector_internal<PG: PieceGetter>(
raw_sector: &mut RawSector,
piece_getter: &PG,
piece_getter_retry_policy: PieceGetterRetryPolicy,
Expand Down
16 changes: 8 additions & 8 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ impl SingleDiskFarm {
) -> Result<Self, SingleDiskFarmError>
where
NC: NodeClient,
PG: PieceGetter + Clone + Send + 'static,
PG: PieceGetter + Clone + Send + Sync + 'static,
PosTable: Table,
{
let SingleDiskFarmOptions {
Expand Down Expand Up @@ -850,7 +850,7 @@ impl SingleDiskFarm {
let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
let modifying_sector_index = Arc::<RwLock<Option<SectorIndex>>>::default();
let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(0);
let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
NingLin-P marked this conversation as resolved.
Show resolved Hide resolved
// Some sectors may already be plotted, skip them
let sectors_indices_left_to_plot =
metadata_header.plotted_sector_count..target_sector_count;
Expand Down Expand Up @@ -937,25 +937,25 @@ impl SingleDiskFarm {

let plotting_options = PlottingOptions {
public_key,
node_client,
node_client: &node_client,
pieces_in_sector,
sector_size,
sector_metadata_size,
metadata_header,
plot_file,
metadata_file,
sectors_metadata,
piece_getter,
kzg,
erasure_coding,
piece_getter: &piece_getter,
kzg: &kzg,
erasure_coding: &erasure_coding,
handlers,
modifying_sector_index,
sectors_to_plot_receiver,
downloading_semaphore,
encoding_semaphore,
encoding_semaphore: &encoding_semaphore,
plotting_thread_pool,
replotting_thread_pool,
stop_receiver: stop_receiver.resubscribe(),
stop_receiver: &mut stop_receiver.resubscribe(),
};

let plotting_fut = plotting::<_, _, PosTable>(plotting_options);
Expand Down
Loading
Loading