Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an abstraction that opens file for auditing multiple times, once for every auditing rayon thread #2151

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,5 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
ulid = { version = "1.0.0", features = ["serde"] }
zeroize = "1.6.0"

[target.'cfg(windows)'.dependencies]
memmap2 = "0.9.0"

[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
monoio = { version = "0.1.10-beta.1", features = ["sync"] }
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ use anyhow::anyhow;
use clap::Subcommand;
use criterion::{black_box, BatchSize, Criterion, Throughput};
use futures::FutureExt;
#[cfg(windows)]
use memmap2::Mmap;
use parking_lot::Mutex;
use std::fs::OpenOptions;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{Record, SolutionRange};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::single_disk_farm::farming::rayon_files::RayonFiles;
use subspace_farmer::single_disk_farm::farming::sync_fallback::SyncPlotAudit;
use subspace_farmer::single_disk_farm::farming::{PlotAudit, PlotAuditOptions};
use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary};
Expand Down Expand Up @@ -81,20 +80,52 @@ fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> {
sector_size as u64 * sectors_metadata.len() as u64,
));
{
let plot_file = OpenOptions::new()
let plot = OpenOptions::new()
.read(true)
.open(disk_farm.join(SingleDiskFarm::PLOT_FILE))
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
#[cfg(windows)]
let plot_mmap = unsafe { Mmap::map(&plot_file)? };

group.bench_function("plot/sync", |b| {
#[cfg(not(windows))]
let plot = &plot_file;
#[cfg(windows)]
let plot = &*plot_mmap;
group.bench_function("plot/sync/single", |b| {
let sync_plot_audit = SyncPlotAudit::new(&plot);

let sync_plot_audit = SyncPlotAudit::new(plot);
b.iter_batched(
rand::random,
|global_challenge| {
let options = PlotAuditOptions::<PosTable> {
public_key: single_disk_farm_info.public_key(),
reward_address: single_disk_farm_info.public_key(),
slot_info: SlotInfo {
slot_number: 0,
global_challenge,
// No solution will be found, pure audit
solution_range: SolutionRange::MIN,
// No solution will be found, pure audit
voting_solution_range: SolutionRange::MIN,
},
sectors_metadata: &sectors_metadata,
kzg: &kzg,
erasure_coding: &erasure_coding,
maybe_sector_being_modified: None,
table_generator: &table_generator,
};

black_box(
sync_plot_audit
.audit(black_box(options))
.now_or_never()
.unwrap(),
)
},
BatchSize::SmallInput,
)
});
}
{
let plot = RayonFiles::open(&disk_farm.join(SingleDiskFarm::PLOT_FILE))
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;

group.bench_function("plot/sync/rayon", |b| {
let sync_plot_audit = SyncPlotAudit::new(&plot);

b.iter_batched(
rand::random,
Expand Down
15 changes: 3 additions & 12 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod plotting;
use crate::identity::{Identity, IdentityError};
use crate::node_client::NodeClient;
use crate::reward_signing::reward_signing;
use crate::single_disk_farm::farming::rayon_files::RayonFiles;
use crate::single_disk_farm::farming::sync_fallback::SyncPlotAudit;
pub use crate::single_disk_farm::farming::FarmingError;
use crate::single_disk_farm::farming::{farming, slot_notification_forwarder, FarmingOptions};
Expand Down Expand Up @@ -971,7 +972,6 @@ impl SingleDiskFarm {
let farming_join_handle = thread::Builder::new()
.name(format!("farming-{disk_farm_index}"))
.spawn({
let plot_file = Arc::clone(&plot_file);
let handle = handle.clone();
let erasure_coding = erasure_coding.clone();
let handlers = Arc::clone(&handlers);
Expand Down Expand Up @@ -1024,17 +1024,8 @@ impl SingleDiskFarm {
}
}

#[cfg(not(windows))]
let plot_audit = &SyncPlotAudit::new(&*plot_file);
#[cfg(windows)]
let plot_mmap = unsafe {
memmap2::Mmap::map(&*plot_file).map_err(FarmingError::from)?
};
// On Windows random read is horrible in terms of performance, memory-mapped I/O helps
// TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649
// or similar exists in standard library
#[cfg(windows)]
let plot_audit = &SyncPlotAudit::new(&*plot_mmap);
let plot = RayonFiles::open(&directory.join(Self::PLOT_FILE))?;
let plot_audit = &SyncPlotAudit::new(&plot);

let farming_options = FarmingOptions {
public_key,
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/single_disk_farm/farming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[cfg(any(target_os = "linux", target_os = "macos"))]
pub mod monoio;
pub mod rayon_files;
pub mod sync_fallback;

use crate::node_client;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::fs::{File, OpenOptions};
use std::io;
use std::path::Path;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::ReadAtSync;

/// Wrapper data structure for multiple files to be used with [`rayon`] thread pool, where the same
/// file is opened multiple times, once for each thread.
pub struct RayonFiles {
files: Vec<File>,
rahulksnv marked this conversation as resolved.
Show resolved Hide resolved
}

impl ReadAtSync for RayonFiles {
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
let thread_index = rayon::current_thread_index().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"Reads must be called from rayon worker thread",
)
})?;
let file = self.files.get(thread_index).ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "No files entry for this rayon thread")
})?;

file.read_at(buf, offset)
}
}

impl ReadAtSync for &RayonFiles {
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
(*self).read_at(buf, offset)
}
}

impl RayonFiles {
/// Open file at specified as many times as there is number of threads in current [`rayon`]
/// thread pool.
pub fn open(path: &Path) -> io::Result<Self> {
let files = (0..rayon::current_num_threads())
.map(|_| {
let file = OpenOptions::new()
.read(true)
.advise_random_access()
.open(path)?;
file.advise_random_access()?;

Ok::<_, io::Error>(file)
})
.collect::<Result<Vec<_>, _>>()?;

Ok(Self { files })
}
}
Loading