Skip to content

Commit

Permalink
Merge pull request #2135 from subspace/async-farming
Browse files Browse the repository at this point in the history
Async farming (auditing)
  • Loading branch information
nazar-pc authored Oct 19, 2023
2 parents 5d06efe + 5f23416 commit 308c6b9
Show file tree
Hide file tree
Showing 14 changed files with 740 additions and 237 deletions.
104 changes: 102 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 59 additions & 33 deletions crates/subspace-farmer-components/src/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use crate::{ReadAt, ReadAtAsync, ReadAtOffset, ReadAtSync};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use rayon::prelude::*;
use std::mem;
use std::pin::pin;
use std::{io, mem};
use subspace_core_primitives::crypto::Scalar;
use subspace_core_primitives::{
Blake3Hash, PublicKey, SBucket, SectorId, SectorIndex, SectorSlotChallenge, SolutionRange,
Expand Down Expand Up @@ -117,9 +118,11 @@ where
sectors_metadata
.par_iter()
.map(|sector_metadata| {
collect_sector_auditing_details(public_key, global_challenge, sector_metadata)
(
collect_sector_auditing_details(public_key, global_challenge, sector_metadata),
sector_metadata,
)
})
.zip(sectors_metadata)
// Read s-buckets of all sectors, map to winning chunks and then to audit results, all in
// parallel
.filter_map(|(sector_auditing_info, sector_metadata)| {
Expand All @@ -128,6 +131,11 @@ where
return None;
}

if sector_auditing_info.s_bucket_audit_size == 0 {
// S-bucket is empty
return None;
}

let sector = plot.offset(
usize::from(sector_metadata.sector_index)
* sector_size(sector_metadata.pieces_in_sector),
Expand Down Expand Up @@ -184,54 +192,72 @@ pub async fn audit_plot_async<'a, Plot>(
where
Plot: ReadAtAsync + 'a,
{
// Create auditing info for all sectors in parallel
sectors_metadata
// Create auditing info for all sectors in parallel and allocate s-buckets
let mut audit_preparation = sectors_metadata
.par_iter()
.map(|sector_metadata| {
(
collect_sector_auditing_details(public_key, global_challenge, sector_metadata),
sector_metadata,
)
})
.collect::<Vec<_>>()
.into_iter()
// Read s-buckets concurrently
.map(|(sector_auditing_info, sector_metadata)| async move {
.filter_map(|(sector_auditing_info, sector_metadata)| {
if maybe_sector_being_modified == Some(sector_metadata.sector_index) {
// Skip sector that is being modified right now
return None;
}

let mut s_bucket = vec![0; sector_auditing_info.s_bucket_audit_size];

let sector = plot.offset(
usize::from(sector_metadata.sector_index)
* sector_size(sector_metadata.pieces_in_sector),
);

if let Err(error) = sector
.read_at(
&mut s_bucket,
sector_auditing_info.s_bucket_audit_offset_in_sector,
)
.await
{
warn!(
%error,
sector_index = %sector_metadata.sector_index,
s_bucket_audit_index = %sector_auditing_info.s_bucket_audit_index,
"Failed read s-bucket",
);

if sector_auditing_info.s_bucket_audit_size == 0 {
// S-bucket is empty
return None;
}

let s_bucket = vec![0; sector_auditing_info.s_bucket_audit_size];

Some((sector_auditing_info, sector_metadata, s_bucket))
})
.collect::<Vec<_>>();

// Read all s-buckets concurrently
let reading_s_buckets_stream = audit_preparation
.iter_mut()
.map(
|(sector_auditing_info, sector_metadata, s_bucket)| async move {
let sector = plot.offset(
usize::from(sector_metadata.sector_index)
* sector_size(sector_metadata.pieces_in_sector),
);

mem::swap(
&mut sector
.read_at(
mem::take(s_bucket),
sector_auditing_info.s_bucket_audit_offset_in_sector,
)
.await?,
s_bucket,
);

Ok::<_, io::Error>(())
},
)
.collect::<FuturesUnordered<_>>()
.filter_map(|value| async move { value })
.collect::<Vec<_>>()
.await
.filter_map(|result| async {
match result {
Ok(()) => None,
Err(error) => Some(error),
}
});
if let Some(error) = pin!(reading_s_buckets_stream).next().await {
warn!(
%error,
"Failed read multiple s-buckets",
);

return Vec::new();
}

audit_preparation
.into_par_iter()
// Map to winning chunks in parallel
.filter_map(|(sector_auditing_info, sector_metadata, s_bucket)| {
Expand Down
Loading

0 comments on commit 308c6b9

Please sign in to comment.