diff --git a/Cargo.lock b/Cargo.lock index a48fbdc9be..eb50bff62f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -945,6 +945,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "auto-const-array" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f7df18977a1ee03650ee4b31b4aefed6d56bac188760b6e37610400fe8d4bb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + [[package]] name = "auto_impl" version = "1.1.0" @@ -3587,6 +3598,19 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -4197,8 +4221,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -4832,6 +4858,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-uring" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "ip_network" version = "0.4.1" @@ -6386,6 +6422,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.8.0" @@ -6506,6 +6551,38 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "monoio" +version = "0.1.10-beta.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368d49210ae1190b898486337a614f1d3ab5fcd8a7ca4a32190b83e9c47c4f52" +dependencies = [ + "auto-const-array", + "bytes", + "flume", + "fxhash", + "io-uring", + "libc", + "mio", + "monoio-macros", + "nix 0.26.4", + "pin-project-lite 0.2.13", + "socket2 0.5.4", + "threadpool", + "windows-sys 0.48.0", +] + +[[package]] +name = "monoio-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + [[package]] name = "multiaddr" version = "0.17.1" @@ -6667,6 +6744,15 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.10", +] + [[package]] name = "netlink-packet-core" version = "0.4.2" @@ -6746,6 +6832,19 @@ dependencies = [ "memoffset 0.6.5", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", + "memoffset 0.7.1", + "pin-utils", +] + [[package]] name = "nohash-hasher" version = "0.2.0" @@ -8628,7 +8727,7 @@ dependencies = [ "log", "netlink-packet-route", "netlink-proto", - "nix", + "nix 0.24.3", "thiserror", "tokio", ] @@ -11408,6 +11507,7 @@ dependencies = [ "lru 0.11.1", "memmap2 0.9.0", "mimalloc", + "monoio", "parity-scale-codec", "parking_lot 0.12.1", "prometheus-client 0.21.2", @@ -13517,7 +13617,7 @@ dependencies = [ "lazy_static", "libc", "log", - "nix", + "nix 0.24.3", "rand 0.8.5", "thiserror", "tokio", diff --git a/crates/subspace-farmer-components/src/auditing.rs b/crates/subspace-farmer-components/src/auditing.rs index 3c09bc84b0..366f769001 100644 --- a/crates/subspace-farmer-components/src/auditing.rs +++ b/crates/subspace-farmer-components/src/auditing.rs @@ -4,7 +4,8 @@ use crate::{ReadAt, ReadAtAsync, ReadAtOffset, ReadAtSync}; use futures::stream::FuturesUnordered; use futures::StreamExt; use rayon::prelude::*; -use std::mem; +use std::pin::pin; +use std::{io, mem}; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{ Blake3Hash, PublicKey, SBucket, SectorId, SectorIndex, SectorSlotChallenge, SolutionRange, @@ -117,9 +118,11 @@ where sectors_metadata .par_iter() .map(|sector_metadata| { - collect_sector_auditing_details(public_key, global_challenge, sector_metadata) + ( + collect_sector_auditing_details(public_key, global_challenge, sector_metadata), + sector_metadata, + ) }) - .zip(sectors_metadata) // Read s-buckets of all sectors, map to winning chunks and then to audit results, all in // parallel .filter_map(|(sector_auditing_info, sector_metadata)| { @@ -128,6 +131,11 @@ where return None; } + if sector_auditing_info.s_bucket_audit_size == 0 { + // S-bucket is empty + return None; + } + let sector = plot.offset( usize::from(sector_metadata.sector_index) * sector_size(sector_metadata.pieces_in_sector), @@ -184,8 +192,8 @@ pub async fn audit_plot_async<'a, Plot>( where Plot: ReadAtAsync + 'a, { - // Create auditing info for all sectors in parallel - sectors_metadata + // Create auditing info for all sectors in parallel and allocate s-buckets + let mut audit_preparation = sectors_metadata .par_iter() .map(|sector_metadata| { ( @@ -193,45 +201,63 @@ where sector_metadata, ) }) - .collect::>() - .into_iter() - // Read s-buckets concurrently - .map(|(sector_auditing_info, sector_metadata)| async move { + .filter_map(|(sector_auditing_info, sector_metadata)| { if maybe_sector_being_modified == Some(sector_metadata.sector_index) { // Skip sector that is being modified right now return None; } - let mut s_bucket = vec![0; sector_auditing_info.s_bucket_audit_size]; - - let sector = plot.offset( - usize::from(sector_metadata.sector_index) - * sector_size(sector_metadata.pieces_in_sector), - ); - - if let Err(error) = sector - .read_at( - &mut s_bucket, - sector_auditing_info.s_bucket_audit_offset_in_sector, - ) - .await - { - warn!( - %error, - sector_index = %sector_metadata.sector_index, - s_bucket_audit_index = %sector_auditing_info.s_bucket_audit_index, - "Failed read s-bucket", - ); - + if sector_auditing_info.s_bucket_audit_size == 0 { + // S-bucket is empty return None; } + let s_bucket = vec![0; sector_auditing_info.s_bucket_audit_size]; + Some((sector_auditing_info, sector_metadata, s_bucket)) }) + .collect::>(); + + // Read all s-buckets concurrently + let reading_s_buckets_stream = audit_preparation + .iter_mut() + .map( + |(sector_auditing_info, sector_metadata, s_bucket)| async move { + let sector = plot.offset( + usize::from(sector_metadata.sector_index) + * sector_size(sector_metadata.pieces_in_sector), + ); + + mem::swap( + &mut sector + .read_at( + mem::take(s_bucket), + sector_auditing_info.s_bucket_audit_offset_in_sector, + ) + .await?, + s_bucket, + ); + + Ok::<_, io::Error>(()) + }, + ) .collect::>() - .filter_map(|value| async move { value }) - .collect::>() - .await + .filter_map(|result| async { + match result { + Ok(()) => None, + Err(error) => Some(error), + } + }); + if let Some(error) = pin!(reading_s_buckets_stream).next().await { + warn!( + %error, + "Failed read multiple s-buckets", + ); + + return Vec::new(); + } + + audit_preparation .into_par_iter() // Map to winning chunks in parallel .filter_map(|(sector_auditing_info, sector_metadata, s_bucket)| { diff --git a/crates/subspace-farmer-components/src/lib.rs b/crates/subspace-farmer-components/src/lib.rs index 928307b526..c13f7f4237 100644 --- a/crates/subspace-farmer-components/src/lib.rs +++ b/crates/subspace-farmer-components/src/lib.rs @@ -89,6 +89,43 @@ impl ReadAtSync for ! { } } +/// Container or asynchronously reading bytes using in [`ReadAtAsync`] +#[repr(transparent)] +pub struct AsyncReadBytes(B) +where + B: AsMut<[u8]> + Unpin + 'static; + +impl From> for AsyncReadBytes> { + fn from(value: Vec) -> Self { + Self(value) + } +} + +impl From> for AsyncReadBytes> { + fn from(value: Box<[u8]>) -> Self { + Self(value) + } +} + +impl AsMut<[u8]> for AsyncReadBytes +where + B: AsMut<[u8]> + Unpin + 'static, +{ + fn as_mut(&mut self) -> &mut [u8] { + self.0.as_mut() + } +} + +impl AsyncReadBytes +where + B: AsMut<[u8]> + Unpin + 'static, +{ + /// Extract inner value + pub fn into_inner(self) -> B { + self.0 + } +} + /// Async version of [`ReadAt`], it is neither [`Send`] nor [`Sync`] and is supposed to be used with /// concurrent async combinators pub trait ReadAtAsync { @@ -103,12 +140,19 @@ pub trait ReadAtAsync { } } - /// Fill the buffer by reading bytes at a specific offset - fn read_at(&self, buf: &mut [u8], offset: usize) -> impl Future>; + /// Fill the buffer by reading bytes at a specific offset and return the buffer back + fn read_at(&self, buf: B, offset: usize) -> impl Future> + where + AsyncReadBytes: From, + B: AsMut<[u8]> + Unpin + 'static; } impl ReadAtAsync for ! { - async fn read_at(&self, _buf: &mut [u8], _offset: usize) -> io::Result<()> { + async fn read_at(&self, _buf: B, _offset: usize) -> io::Result + where + AsyncReadBytes: From, + B: AsMut<[u8]> + Unpin + 'static, + { unreachable!("Is never called") } } @@ -196,7 +240,11 @@ impl ReadAtAsync for ReadAtOffset<'_, T> where T: ReadAtAsync, { - async fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + async fn read_at(&self, buf: B, offset: usize) -> io::Result + where + AsyncReadBytes: From, + B: AsMut<[u8]> + Unpin + 'static, + { self.inner.read_at(buf, offset + self.offset).await } } @@ -205,7 +253,11 @@ impl ReadAtAsync for &ReadAtOffset<'_, T> where T: ReadAtAsync, { - async fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + async fn read_at(&self, buf: B, offset: usize) -> io::Result + where + AsyncReadBytes: From, + B: AsMut<[u8]> + Unpin + 'static, + { self.inner.read_at(buf, offset + self.offset).await } } diff --git a/crates/subspace-farmer-components/src/proving.rs b/crates/subspace-farmer-components/src/proving.rs index d2b4a88063..a90ad39e1d 100644 --- a/crates/subspace-farmer-components/src/proving.rs +++ b/crates/subspace-farmer-components/src/proving.rs @@ -289,7 +289,8 @@ where sector.read_at(&mut sector_contents_map_bytes, 0)?; } ReadAt::Async(sector) => { - sector.read_at(&mut sector_contents_map_bytes, 0).await?; + sector_contents_map_bytes = + sector.read_at(sector_contents_map_bytes, 0).await?; } } diff --git a/crates/subspace-farmer-components/src/reading.rs b/crates/subspace-farmer-components/src/reading.rs index 543215ab5b..a9ce19dcaf 100644 --- a/crates/subspace-farmer-components/src/reading.rs +++ b/crates/subspace-farmer-components/src/reading.rs @@ -177,17 +177,20 @@ where .map( |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| async move { let mut record_chunk = [0; Scalar::FULL_BYTES]; - sector - .read_at( - &mut record_chunk, - SectorContentsMap::encoded_size(pieces_in_sector) - + chunk_location * Scalar::FULL_BYTES, - ) - .await - .map_err(|error| ReadingError::FailedToReadChunk { - chunk_location, - error, - })?; + record_chunk.copy_from_slice( + §or + .read_at( + vec![0; Scalar::FULL_BYTES], + SectorContentsMap::encoded_size(pieces_in_sector) + + chunk_location * Scalar::FULL_BYTES, + ) + .await + .map_err(|error| ReadingError::FailedToReadChunk { + chunk_location, + error, + })? + ); + // Decode chunk if necessary if encoded_chunk_used { @@ -310,14 +313,14 @@ where let record_metadata_offset = sector_metadata_start + RecordMetadata::encoded_size() * usize::from(piece_offset); - let mut record_metadata_bytes = [0; RecordMetadata::encoded_size()]; + let mut record_metadata_bytes = vec![0; RecordMetadata::encoded_size()]; match sector { ReadAt::Sync(sector) => { sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?; } ReadAt::Async(sector) => { - sector - .read_at(&mut record_metadata_bytes, record_metadata_offset) + record_metadata_bytes = sector + .read_at(record_metadata_bytes, record_metadata_offset) .await?; } } @@ -354,7 +357,7 @@ where sector.read_at(&mut sector_contents_map_bytes, 0)?; } ReadAt::Async(sector) => { - sector.read_at(&mut sector_contents_map_bytes, 0).await?; + sector_contents_map_bytes = sector.read_at(sector_contents_map_bytes, 0).await?; } } diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index f313b99b8e..59296ee0e4 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -60,3 +60,6 @@ zeroize = "1.6.0" [target.'cfg(windows)'.dependencies] memmap2 = "0.9.0" + +[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies] +monoio = { version = "0.1.10-beta.1", features = ["sync"] } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs index 63079d67f9..51975c7450 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -2,6 +2,7 @@ use crate::PosTable; use anyhow::anyhow; use clap::Subcommand; use criterion::{black_box, BatchSize, Criterion, Throughput}; +use futures::FutureExt; #[cfg(windows)] use memmap2::Mmap; use parking_lot::Mutex; @@ -11,7 +12,8 @@ use std::path::PathBuf; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{Record, SolutionRange}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer::single_disk_farm::farming::{audit_plot, PlotAuditOptions}; +use subspace_farmer::single_disk_farm::farming::sync_fallback::SyncPlotAudit; +use subspace_farmer::single_disk_farm::farming::{PlotAudit, PlotAuditOptions}; use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary}; use subspace_farmer_components::sector::sector_size; use subspace_proof_of_space::Table; @@ -32,16 +34,16 @@ pub(crate) enum BenchmarkArgs { }, } -pub(crate) async fn benchmark(benchmark_args: BenchmarkArgs) -> anyhow::Result<()> { +pub(crate) fn benchmark(benchmark_args: BenchmarkArgs) -> anyhow::Result<()> { match benchmark_args { BenchmarkArgs::Audit { disk_farm, sample_size, - } => audit(disk_farm, sample_size).await, + } => audit(disk_farm, sample_size), } } -async fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { +fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { let (single_disk_farm_info, disk_farm) = match SingleDiskFarm::collect_summary(disk_farm) { SingleDiskFarmSummary::Found { info, directory } => (info, directory), SingleDiskFarmSummary::NotFound { directory } => { @@ -72,52 +74,132 @@ async fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { let sectors_metadata = SingleDiskFarm::read_all_sectors_metadata(&disk_farm) .map_err(|error| anyhow::anyhow!("Failed to read sectors metadata: {error}"))?; - let plot_file = OpenOptions::new() - .read(true) - .open(disk_farm.join(SingleDiskFarm::PLOT_FILE)) - .map_err(|error| anyhow::anyhow!("Failed to open single disk farm: {error}"))?; - #[cfg(windows)] - let plot_mmap = unsafe { Mmap::map(&plot_file)? }; - let mut criterion = Criterion::default().sample_size(sample_size); - criterion - .benchmark_group("audit") - .throughput(Throughput::Bytes( + { + let mut group = criterion.benchmark_group("audit"); + group.throughput(Throughput::Bytes( sector_size as u64 * sectors_metadata.len() as u64, - )) - .bench_function("plot", |b| { - #[cfg(not(windows))] - let plot = &plot_file; + )); + { + let plot_file = OpenOptions::new() + .read(true) + .open(disk_farm.join(SingleDiskFarm::PLOT_FILE)) + .map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?; #[cfg(windows)] - let plot = &&*plot_mmap; - - b.iter_batched( - rand::random, - |global_challenge| { - let options = PlotAuditOptions:: { - public_key: single_disk_farm_info.public_key(), - reward_address: single_disk_farm_info.public_key(), - slot_info: SlotInfo { - slot_number: 0, - global_challenge, - // No solution will be found, pure audit - solution_range: SolutionRange::MIN, - // No solution will be found, pure audit - voting_solution_range: SolutionRange::MIN, - }, - sectors_metadata: §ors_metadata, - kzg: &kzg, - erasure_coding: &erasure_coding, - plot, - maybe_sector_being_modified: None, - table_generator: &table_generator, - }; - - black_box(audit_plot(black_box(options))) - }, - BatchSize::SmallInput, - ) - }); + let plot_mmap = unsafe { Mmap::map(&plot_file)? }; + + group.bench_function("plot/sync", |b| { + #[cfg(not(windows))] + let plot = &plot_file; + #[cfg(windows)] + let plot = &*plot_mmap; + + let sync_plot_audit = SyncPlotAudit::new(plot); + + b.iter_batched( + rand::random, + |global_challenge| { + let options = PlotAuditOptions:: { + public_key: single_disk_farm_info.public_key(), + reward_address: single_disk_farm_info.public_key(), + slot_info: SlotInfo { + slot_number: 0, + global_challenge, + // No solution will be found, pure audit + solution_range: SolutionRange::MIN, + // No solution will be found, pure audit + voting_solution_range: SolutionRange::MIN, + }, + sectors_metadata: §ors_metadata, + kzg: &kzg, + erasure_coding: &erasure_coding, + maybe_sector_being_modified: None, + table_generator: &table_generator, + }; + + black_box( + sync_plot_audit + .audit(black_box(options)) + .now_or_never() + .unwrap(), + ) + }, + BatchSize::SmallInput, + ) + }); + } + #[cfg(any(target_os = "linux", target_os = "macos"))] + { + use criterion::async_executor::AsyncExecutor; + use monoio::fs::File; + use std::cell::RefCell; + use std::future::Future; + use subspace_farmer::single_disk_farm::farming::monoio::{ + build_monoio_runtime, MonoioFile, MonoioPlotAudit, MonoioRuntime, + }; + + struct MonoioAsyncExecutor<'a>(&'a RefCell); + + impl AsyncExecutor for MonoioAsyncExecutor<'_> { + fn block_on(&self, future: impl Future) -> T { + self.0.borrow_mut().block_on(future) + } + } + + impl<'a> MonoioAsyncExecutor<'a> { + fn new(runtime: &'a RefCell) -> Self { + Self(runtime) + } + } + + /// SATA devices only support 32, for NVMe it is also sufficient at capacities we're + /// working with + const IO_CONCURRENCY: usize = 32; + + let runtime = + RefCell::new(build_monoio_runtime().map_err(|error| { + anyhow::anyhow!("Failed to create monoio runtime: {error}") + })?); + let file = runtime + .borrow_mut() + .block_on(File::open(disk_farm.join(SingleDiskFarm::PLOT_FILE))) + .map_err(|error| anyhow::anyhow!("Failed to open plot with monoio: {error}"))?; + + group.bench_function("plot/monoio", |b| { + let file = MonoioFile::new(&file, IO_CONCURRENCY); + + let monoio_plot_audit = MonoioPlotAudit::new(file); + + b.to_async(MonoioAsyncExecutor::new(&runtime)).iter_batched( + rand::random, + |global_challenge| { + let options = PlotAuditOptions:: { + public_key: single_disk_farm_info.public_key(), + reward_address: single_disk_farm_info.public_key(), + slot_info: SlotInfo { + slot_number: 0, + global_challenge, + // No solution will be found, pure audit + solution_range: SolutionRange::MIN, + // No solution will be found, pure audit + voting_solution_range: SolutionRange::MIN, + }, + sectors_metadata: §ors_metadata, + kzg: &kzg, + erasure_coding: &erasure_coding, + maybe_sector_being_modified: None, + table_generator: &table_generator, + }; + + black_box(monoio_plot_audit.audit(black_box(options))) + }, + BatchSize::SmallInput, + ) + }); + + runtime.borrow_mut().block_on(file.close()).unwrap(); + } + } criterion.final_summary(); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 61b6a852f8..00dbd00910 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -465,11 +465,11 @@ where cache_percentage, downloading_semaphore: Arc::clone(&downloading_semaphore), encoding_semaphore: Arc::clone(&encoding_semaphore), + farm_during_initial_plotting, farming_thread_pool_size, plotting_thread_pool: Arc::clone(&plotting_thread_pool), replotting_thread_pool: Arc::clone(&replotting_thread_pool), plotting_delay: Some(plotting_delay_receiver), - farm_during_initial_plotting, }, disk_farm_index, ); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index 37c37ee362..c4346c016e 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -81,7 +81,7 @@ async fn main() -> anyhow::Result<()> { commands::farm::farm::(farming_args).await?; } Command::Benchmark(benchmark_args) => { - commands::benchmark::benchmark(benchmark_args).await?; + commands::benchmark::benchmark(benchmark_args)?; } Command::Info { disk_farms } => { if disk_farms.is_empty() { diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index ea70709988..62d8b377dd 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -6,6 +6,7 @@ mod plotting; use crate::identity::{Identity, IdentityError}; use crate::node_client::NodeClient; use crate::reward_signing::reward_signing; +use crate::single_disk_farm::farming::sync_fallback::SyncPlotAudit; pub use crate::single_disk_farm::farming::FarmingError; use crate::single_disk_farm::farming::{farming, slot_notification_forwarder, FarmingOptions}; use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError}; @@ -261,6 +262,8 @@ pub struct SingleDiskFarmOptions { /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically /// allow one permit at a time for efficient CPU utilization pub encoding_semaphore: Arc, + /// Whether to farm during initial plotting + pub farm_during_initial_plotting: bool, /// Thread pool size used for farming (mostly for blocking I/O, but also for some /// compute-intensive operations during proving) pub farming_thread_pool_size: usize, @@ -272,8 +275,6 @@ pub struct SingleDiskFarmOptions { /// Notification for plotter to start, can be used to delay plotting until some initialization /// has happened externally pub plotting_delay: Option>, - /// Whether to farm during initial plotting - pub farm_during_initial_plotting: bool, } /// Errors happening when trying to create/open single disk farm @@ -1023,11 +1024,23 @@ impl SingleDiskFarm { } } + #[cfg(not(windows))] + let plot_audit = &SyncPlotAudit::new(&*plot_file); + #[cfg(windows)] + let plot_mmap = unsafe { + memmap2::Mmap::map(&*plot_file).map_err(FarmingError::from)? + }; + // On Windows random read is horrible in terms of performance, memory-mapped I/O helps + // TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649 + // or similar exists in standard library + #[cfg(windows)] + let plot_audit = &SyncPlotAudit::new(&*plot_mmap); + let farming_options = FarmingOptions { public_key, reward_address, node_client, - plot_file: &plot_file, + plot_audit, sectors_metadata, kzg, erasure_coding, @@ -1035,7 +1048,7 @@ impl SingleDiskFarm { modifying_sector_index, slot_info_notifications: slot_info_forwarder_receiver, }; - farming::(farming_options).await + farming::(farming_options).await }; handle.block_on(async { diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 06967e4942..153276ace7 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -1,25 +1,26 @@ +#[cfg(any(target_os = "linux", target_os = "macos"))] +pub mod monoio; +pub mod sync_fallback; + use crate::node_client; use crate::node_client::NodeClient; use crate::single_disk_farm::Handlers; use async_lock::RwLock; use futures::channel::mpsc; -use futures::{FutureExt, StreamExt}; -#[cfg(windows)] -use memmap2::Mmap; +use futures::StreamExt; use parking_lot::Mutex; use rayon::ThreadPoolBuildError; -use std::fs::File; +use std::future::Future; use std::io; use std::sync::Arc; use std::time::Instant; use subspace_core_primitives::crypto::kzg::Kzg; -use subspace_core_primitives::{PosSeed, PublicKey, SectorIndex, Solution, SolutionRange}; +use subspace_core_primitives::{PublicKey, SectorIndex, Solution, SolutionRange}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::auditing::audit_plot_sync; +use subspace_farmer_components::proving; use subspace_farmer_components::proving::ProvableSolutions; use subspace_farmer_components::sector::SectorMetadataChecksummed; -use subspace_farmer_components::{proving, ReadAtSync}; -use subspace_proof_of_space::{Table, TableGenerator}; +use subspace_proof_of_space::Table; use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; use thiserror::Error; use tracing::{debug, error, info, trace, warn}; @@ -79,11 +80,54 @@ where Ok(()) } -pub(super) struct FarmingOptions<'a, NC> { +/// Plot audit options +pub struct PlotAuditOptions<'a, PosTable> +where + PosTable: Table, +{ + /// Public key of the farm + pub public_key: &'a PublicKey, + /// Reward address to use for solutions + pub reward_address: &'a PublicKey, + /// Slot info for the audit + pub slot_info: SlotInfo, + /// Metadata of all sectors plotted so far + pub sectors_metadata: &'a [SectorMetadataChecksummed], + /// Kzg instance + pub kzg: &'a Kzg, + /// Erasure coding instance + pub erasure_coding: &'a ErasureCoding, + /// Optional sector that is currently being modified (for example replotted) and should not be + /// audited + pub maybe_sector_being_modified: Option, + /// Proof of space table generator + pub table_generator: &'a Mutex, +} + +/// Auditing implementation used by farming +pub trait PlotAudit<'p> { + fn audit<'a, PosTable>( + &'p self, + options: PlotAuditOptions<'a, PosTable>, + ) -> impl Future< + Output = Vec<( + SectorIndex, + impl ProvableSolutions< + Item = Result, proving::ProvingError>, + > + Unpin + + 'a, + )>, + > + where + 'p: 'a, + PosTable: Table; +} + +pub(super) struct FarmingOptions<'a, NC, PA> { pub(super) public_key: PublicKey, pub(super) reward_address: PublicKey, pub(super) node_client: NC, - pub(super) plot_file: &'a File, + pub(super) plot_audit: &'a PA, pub(super) sectors_metadata: Arc>>, pub(super) kzg: Kzg, pub(super) erasure_coding: ErasureCoding, @@ -96,18 +140,19 @@ pub(super) struct FarmingOptions<'a, NC> { /// /// NOTE: Returned future is async, but does blocking operations and should be running in dedicated /// thread. -pub(super) async fn farming( - farming_options: FarmingOptions<'_, NC>, +pub(super) async fn farming<'a, PosTable, NC, PA>( + farming_options: FarmingOptions<'a, NC, PA>, ) -> Result<(), FarmingError> where PosTable: Table, NC: NodeClient, + PA: PlotAudit<'a>, { let FarmingOptions { public_key, reward_address, node_client, - plot_file, + plot_audit, sectors_metadata, kzg, erasure_coding, @@ -124,15 +169,6 @@ where // We assume that each slot is one second let farming_timeout = farmer_app_info.farming_timeout; - #[cfg(not(windows))] - let plot = plot_file; - #[cfg(windows)] - let plot_mmap = unsafe { Mmap::map(plot_file)? }; - // On Windows random read is horrible in terms of performance, memory-mapped I/O helps - // TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649 - // or similar exists in standard library - #[cfg(windows)] - let plot = &&*plot_mmap; let table_generator = Arc::new(Mutex::new(PosTable::generator())); while let Some(slot_info) = slot_info_notifications.next().await { @@ -146,17 +182,18 @@ where let modifying_sector_guard = modifying_sector_index.read().await; let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied(); - audit_plot(PlotAuditOptions:: { - public_key: &public_key, - reward_address: &reward_address, - slot_info, - sectors_metadata: §ors_metadata, - kzg: &kzg, - erasure_coding: &erasure_coding, - plot, - maybe_sector_being_modified, - table_generator: &table_generator, - }) + plot_audit + .audit(PlotAuditOptions:: { + public_key: &public_key, + reward_address: &reward_address, + slot_info, + sectors_metadata: §ors_metadata, + kzg: &kzg, + erasure_coding: &erasure_coding, + maybe_sector_being_modified, + table_generator: &table_generator, + }) + .await }; sectors_solutions.sort_by(|a, b| { @@ -212,97 +249,3 @@ where Ok(()) } - -/// Plot audit options -pub struct PlotAuditOptions<'a, PosTable, File> -where - PosTable: Table, -{ - /// Public key of the farm - pub public_key: &'a PublicKey, - /// Reward address to use for solutions - pub reward_address: &'a PublicKey, - /// Slot info for the audit - pub slot_info: SlotInfo, - /// Metadata of all sectors plotted so far - pub sectors_metadata: &'a [SectorMetadataChecksummed], - /// Kzg instance - pub kzg: &'a Kzg, - /// Erasure coding instance - pub erasure_coding: &'a ErasureCoding, - /// File abstraction corresponding to the plot - pub plot: &'a File, - /// Optional sector that is currently being modified (for example replotted) and should not be - /// audited - pub maybe_sector_being_modified: Option, - /// Proof of space table generator - pub table_generator: &'a Mutex, -} - -pub fn audit_plot<'a, PosTable, S>( - options: PlotAuditOptions<'a, PosTable, S>, -) -> Vec<( - SectorIndex, - impl ProvableSolutions, proving::ProvingError>> + 'a, -)> -where - PosTable: Table, - S: ReadAtSync + 'a, -{ - let PlotAuditOptions { - public_key, - reward_address, - slot_info, - sectors_metadata, - kzg, - erasure_coding, - plot, - maybe_sector_being_modified, - table_generator, - } = options; - - let audit_results = audit_plot_sync( - public_key, - &slot_info.global_challenge, - slot_info.voting_solution_range, - plot, - sectors_metadata, - maybe_sector_being_modified, - ); - - audit_results - .into_iter() - .filter_map(|audit_results| { - let sector_index = audit_results.sector_index; - - let sector_solutions_fut = audit_results.solution_candidates.into_solutions( - reward_address, - kzg, - erasure_coding, - |seed: &PosSeed| table_generator.lock().generate_parallel(seed), - ); - - let sector_solutions = match sector_solutions_fut - .now_or_never() - .expect("Implementation of the sector is currently synchronous; qed") - { - Ok(solutions) => solutions, - Err(error) => { - warn!( - %error, - %sector_index, - "Failed to turn solution candidates into solutions", - ); - - return None; - } - }; - - if sector_solutions.len() == 0 { - return None; - } - - Some((sector_index, sector_solutions)) - }) - .collect() -} diff --git a/crates/subspace-farmer/src/single_disk_farm/farming/monoio.rs b/crates/subspace-farmer/src/single_disk_farm/farming/monoio.rs new file mode 100644 index 0000000000..14585254d5 --- /dev/null +++ b/crates/subspace-farmer/src/single_disk_farm/farming/monoio.rs @@ -0,0 +1,184 @@ +use crate::single_disk_farm::farming::{PlotAudit, PlotAuditOptions}; +use async_lock::Semaphore; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use monoio::blocking::{BlockingTask, ThreadPool}; +use monoio::buf::IoBufMut; +use monoio::fs::File; +use monoio::{LegacyDriver, RuntimeBuilder}; +use std::io; +use subspace_core_primitives::{PosSeed, PublicKey, SectorIndex, Solution}; +use subspace_farmer_components::auditing::audit_plot_async; +use subspace_farmer_components::proving::{ProvableSolutions, ProvingError}; +use subspace_farmer_components::{AsyncReadBytes, ReadAtAsync}; +use subspace_proof_of_space::{Table, TableGenerator}; +use tracing::warn; + +/// Re-export of platform-specific [`monoio`] runtime +#[cfg(target_os = "linux")] +pub type MonoioRuntime = monoio::FusionRuntime; + +/// Build platform-specific [`monoio`] runtime +#[cfg(target_os = "linux")] +pub fn build_monoio_runtime() -> io::Result { + RuntimeBuilder::::new() + .attach_thread_pool(Box::new(RayonThreadPool)) + .build() +} + +/// Re-export of platform-specific [`monoio`] runtime +#[cfg(target_os = "macos")] +pub type MonoioRuntime = monoio::Runtime; + +/// Build platform-specific [`monoio`] runtime +#[cfg(target_os = "macos")] +pub fn build_monoio_runtime() -> io::Result { + RuntimeBuilder::::new() + .attach_thread_pool(Box::new(RayonThreadPool)) + .build() +} + +/// Wrapper to use rayon's thread pool with [`monoio`] +struct RayonThreadPool; + +impl ThreadPool for RayonThreadPool { + fn schedule_task(&self, task: BlockingTask) { + rayon::spawn(move || { + task.run(); + }); + } +} + +struct AsyncReadBytesWrapper(B) +where + B: AsMut<[u8]> + Unpin + 'static; + +unsafe impl IoBufMut for AsyncReadBytesWrapper +where + B: AsMut<[u8]> + Unpin + 'static, +{ + #[inline] + fn write_ptr(&mut self) -> *mut u8 { + self.0.as_mut().as_mut_ptr() + } + + #[inline] + fn bytes_total(&mut self) -> usize { + self.0.as_mut().len() + } + + #[inline] + unsafe fn set_init(&mut self, _: usize) {} +} + +/// Wrapper data structure for readable file used with [`monoio`]-based auditing implementation +pub struct MonoioFile<'a> { + file: &'a File, + semaphore: Semaphore, +} + +impl ReadAtAsync for MonoioFile<'_> { + async fn read_at(&self, buf: B, offset: usize) -> io::Result + where + AsyncReadBytes: From, + B: AsMut<[u8]> + Unpin + 'static, + { + let _permit = self.semaphore.acquire().await; + let (read_result, AsyncReadBytesWrapper(buf)) = self + .file + .read_exact_at(AsyncReadBytesWrapper(buf), offset as u64) + .await; + + read_result.map(|()| buf) + } +} + +impl<'a> MonoioFile<'a> { + pub fn new(file: &'a File, io_concurrency: usize) -> Self { + Self { + file, + semaphore: Semaphore::new(io_concurrency), + } + } +} + +/// Plot auditing asynchronously using [`monoio`] runtime +pub struct MonoioPlotAudit<'f>(MonoioFile<'f>); + +impl<'f> PlotAudit<'f> for MonoioPlotAudit<'f> { + async fn audit<'a, PosTable>( + &'f self, + options: PlotAuditOptions<'a, PosTable>, + ) -> Vec<( + SectorIndex, + impl ProvableSolutions, ProvingError>> + Unpin + 'a, + )> + where + 'f: 'a, + PosTable: Table, + { + let PlotAuditOptions { + public_key, + reward_address, + slot_info, + sectors_metadata, + kzg, + erasure_coding, + maybe_sector_being_modified, + table_generator, + } = options; + + let audit_results_fut = audit_plot_async( + public_key, + &slot_info.global_challenge, + slot_info.voting_solution_range, + &self.0, + sectors_metadata, + maybe_sector_being_modified, + ); + + audit_results_fut + .await + .into_iter() + .map(|audit_results| async move { + let sector_index = audit_results.sector_index; + + let sector_solutions_fut = audit_results.solution_candidates.into_solutions( + reward_address, + kzg, + erasure_coding, + |seed: &PosSeed| table_generator.lock().generate_parallel(seed), + ); + + let sector_solutions = match sector_solutions_fut.await { + Ok(solutions) => solutions, + Err(error) => { + warn!( + %error, + %sector_index, + "Failed to turn solution candidates into solutions", + ); + + return None; + } + }; + + if sector_solutions.len() == 0 { + return None; + } + + Some((sector_index, sector_solutions)) + }) + .collect::>() + .filter_map(|value| async move { value }) + .collect() + .await + } +} + +impl<'f> MonoioPlotAudit<'f> { + /// Create new instance + pub fn new(file: MonoioFile<'f>) -> Self { + Self(file) + } +} diff --git a/crates/subspace-farmer/src/single_disk_farm/farming/sync_fallback.rs b/crates/subspace-farmer/src/single_disk_farm/farming/sync_fallback.rs new file mode 100644 index 0000000000..423786a82a --- /dev/null +++ b/crates/subspace-farmer/src/single_disk_farm/farming/sync_fallback.rs @@ -0,0 +1,96 @@ +use crate::single_disk_farm::farming::{PlotAudit, PlotAuditOptions}; +use futures::FutureExt; +use subspace_core_primitives::{PosSeed, PublicKey, SectorIndex, Solution}; +use subspace_farmer_components::auditing::audit_plot_sync; +use subspace_farmer_components::proving::{ProvableSolutions, ProvingError}; +use subspace_farmer_components::ReadAtSync; +use subspace_proof_of_space::{Table, TableGenerator}; +use tracing::warn; + +/// Plot auditing, default synchronous implementation +pub struct SyncPlotAudit(Plot) +where + Plot: ReadAtSync; + +impl<'p, Plot> PlotAudit<'p> for SyncPlotAudit +where + Plot: ReadAtSync + 'p, +{ + async fn audit<'a, PosTable>( + &'p self, + options: PlotAuditOptions<'a, PosTable>, + ) -> Vec<( + SectorIndex, + impl ProvableSolutions, ProvingError>> + Unpin + 'a, + )> + where + 'p: 'a, + PosTable: Table, + { + let PlotAuditOptions { + public_key, + reward_address, + slot_info, + sectors_metadata, + kzg, + erasure_coding, + maybe_sector_being_modified, + table_generator, + } = options; + + let audit_results = audit_plot_sync( + public_key, + &slot_info.global_challenge, + slot_info.voting_solution_range, + &self.0, + sectors_metadata, + maybe_sector_being_modified, + ); + + audit_results + .into_iter() + .filter_map(|audit_results| { + let sector_index = audit_results.sector_index; + + let sector_solutions_fut = audit_results.solution_candidates.into_solutions( + reward_address, + kzg, + erasure_coding, + |seed: &PosSeed| table_generator.lock().generate_parallel(seed), + ); + + let sector_solutions = match sector_solutions_fut + .now_or_never() + .expect("Implementation of the sector is synchronous here; qed") + { + Ok(solutions) => solutions, + Err(error) => { + warn!( + %error, + %sector_index, + "Failed to turn solution candidates into solutions", + ); + + return None; + } + }; + + if sector_solutions.len() == 0 { + return None; + } + + Some((sector_index, sector_solutions)) + }) + .collect() + } +} + +impl SyncPlotAudit +where + Plot: ReadAtSync, +{ + /// Create new instance + pub fn new(plot: Plot) -> Self { + Self(plot) + } +} diff --git a/crates/subspace-rpc-primitives/src/lib.rs b/crates/subspace-rpc-primitives/src/lib.rs index 75c1bd1e23..615566fe13 100644 --- a/crates/subspace-rpc-primitives/src/lib.rs +++ b/crates/subspace-rpc-primitives/src/lib.rs @@ -42,7 +42,7 @@ pub struct FarmerAppInfo { } /// Information about new slot that just arrived -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Debug, Copy, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct SlotInfo { /// Slot number