diff --git a/Cargo.lock b/Cargo.lock index 5cac11e595..698ed8c6ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11393,12 +11393,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "std-semaphore" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" - [[package]] name = "strsim" version = "0.10.0" @@ -11552,7 +11546,6 @@ dependencies = [ "serde_json", "ss58-registry", "static_assertions", - "std-semaphore", "subspace-archiving", "subspace-core-primitives", "subspace-erasure-coding", diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 6f78cd36fb..79df65025a 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -55,7 +55,9 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector; -use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy}; +use subspace_farmer_components::plotting::{ + plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, +}; use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::shim::ShimTable; use subspace_proof_of_space::{Table, TableGenerator}; @@ -454,19 +456,21 @@ pub fn create_signed_vote( let mut plotted_sector_bytes = Vec::new(); let mut plotted_sector_metadata_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::<_, PosTable>( - &public_key, + let plotted_sector = block_on(plot_sector::(PlotSectorOptions { + public_key: &public_key, sector_index, - archived_history_segment, - PieceGetterRetryPolicy::default(), - &farmer_protocol_info, + piece_getter: archived_history_segment, + piece_getter_retry_policy: PieceGetterRetryPolicy::default(), + farmer_protocol_info: &farmer_protocol_info, kzg, erasure_coding, pieces_in_sector, - &mut plotted_sector_bytes, - &mut plotted_sector_metadata_bytes, - &mut table_generator, - )) + sector_output: &mut plotted_sector_bytes, + sector_metadata_output: &mut plotted_sector_metadata_bytes, + downloading_semaphore: None, + encoding_semaphore: None, + table_generator: &mut table_generator, + })) .unwrap(); let global_challenge = proof_of_time diff --git a/crates/sp-lightclient/src/tests.rs b/crates/sp-lightclient/src/tests.rs index 190d1ccb3f..08dc0d9a4d 100644 --- a/crates/sp-lightclient/src/tests.rs +++ b/crates/sp-lightclient/src/tests.rs @@ -175,7 +175,7 @@ fn valid_header( let mut plotted_sector_bytes = Vec::new(); let mut plotted_sector_metadata_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::<_, PosTable>( + let plotted_sector = block_on(plot_sector::( &public_key, sector_index, &archived_segment.pieces, diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index f6b754072e..03b594f9cc 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -16,7 +16,9 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy, PlottedSector}; +use subspace_farmer_components::plotting::{ + plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, +}; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; @@ -116,19 +118,21 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut plotted_sector_bytes = Vec::new(); let mut plotted_sector_metadata_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::<_, PosTable>( - &public_key, + let plotted_sector = block_on(plot_sector::(PlotSectorOptions { + public_key: &public_key, sector_index, - &archived_history_segment, - PieceGetterRetryPolicy::default(), - &farmer_protocol_info, - &kzg, - &erasure_coding, + piece_getter: &archived_history_segment, + piece_getter_retry_policy: PieceGetterRetryPolicy::default(), + farmer_protocol_info: &farmer_protocol_info, + kzg: &kzg, + erasure_coding: &erasure_coding, pieces_in_sector, - &mut plotted_sector_bytes, - &mut plotted_sector_metadata_bytes, - &mut table_generator, - )) + sector_output: &mut plotted_sector_bytes, + sector_metadata_output: &mut plotted_sector_metadata_bytes, + downloading_semaphore: black_box(None), + encoding_semaphore: black_box(None), + table_generator: &mut table_generator, + })) .unwrap(); (plotted_sector, plotted_sector_bytes) diff --git a/crates/subspace-farmer-components/benches/plotting.rs b/crates/subspace-farmer-components/benches/plotting.rs index 5061cbcc34..d11bdb659c 100644 --- a/crates/subspace-farmer-components/benches/plotting.rs +++ b/crates/subspace-farmer-components/benches/plotting.rs @@ -8,7 +8,9 @@ use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{HistorySize, PublicKey, Record, RecordedHistorySegment}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy}; +use subspace_farmer_components::plotting::{ + plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, +}; use subspace_farmer_components::sector::sector_size; use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::chia::ChiaTable; @@ -65,19 +67,21 @@ fn criterion_benchmark(c: &mut Criterion) { group.throughput(Throughput::Bytes(sector_size as u64)); group.bench_function("in-memory", |b| { b.iter(|| { - block_on(plot_sector::<_, PosTable>( - black_box(&public_key), - black_box(sector_index), - black_box(&archived_history_segment), - black_box(PieceGetterRetryPolicy::default()), - black_box(&farmer_protocol_info), - black_box(&kzg), - black_box(&erasure_coding), - black_box(pieces_in_sector), - black_box(&mut sector_bytes), - black_box(&mut sector_metadata_bytes), - black_box(&mut table_generator), - )) + block_on(plot_sector::(PlotSectorOptions { + public_key: black_box(&public_key), + sector_index: black_box(sector_index), + piece_getter: black_box(&archived_history_segment), + piece_getter_retry_policy: black_box(PieceGetterRetryPolicy::default()), + farmer_protocol_info: black_box(&farmer_protocol_info), + kzg: black_box(&kzg), + erasure_coding: black_box(&erasure_coding), + pieces_in_sector: black_box(pieces_in_sector), + sector_output: black_box(&mut sector_bytes), + sector_metadata_output: black_box(&mut sector_metadata_bytes), + downloading_semaphore: black_box(None), + encoding_semaphore: black_box(None), + table_generator: black_box(&mut table_generator), + })) .unwrap(); }) }); diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index e545594665..bb78b7554d 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -17,7 +17,9 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy, PlottedSector}; +use subspace_farmer_components::plotting::{ + plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, +}; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; @@ -119,19 +121,21 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut plotted_sector_bytes = Vec::new(); let mut plotted_sector_metadata_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::<_, PosTable>( - &public_key, + let plotted_sector = block_on(plot_sector::(PlotSectorOptions { + public_key: &public_key, sector_index, - &archived_history_segment, - PieceGetterRetryPolicy::default(), - &farmer_protocol_info, - &kzg, - &erasure_coding, + piece_getter: &archived_history_segment, + piece_getter_retry_policy: PieceGetterRetryPolicy::default(), + farmer_protocol_info: &farmer_protocol_info, + kzg: &kzg, + erasure_coding: &erasure_coding, pieces_in_sector, - &mut plotted_sector_bytes, - &mut plotted_sector_metadata_bytes, - &mut table_generator, - )) + sector_output: &mut plotted_sector_bytes, + sector_metadata_output: &mut plotted_sector_metadata_bytes, + downloading_semaphore: black_box(None), + encoding_semaphore: black_box(None), + table_generator: &mut table_generator, + })) .unwrap(); (plotted_sector, plotted_sector_bytes) diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index cd40825614..6b1ded117d 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -14,7 +14,9 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy, PlottedSector}; +use subspace_farmer_components::plotting::{ + plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, +}; use subspace_farmer_components::reading::read_piece; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, @@ -113,19 +115,21 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut plotted_sector_bytes = Vec::new(); let mut plotted_sector_metadata_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::<_, PosTable>( - &public_key, + let plotted_sector = block_on(plot_sector::(PlotSectorOptions { + public_key: &public_key, sector_index, - &archived_history_segment, - PieceGetterRetryPolicy::default(), - &farmer_protocol_info, - &kzg, - &erasure_coding, + piece_getter: &archived_history_segment, + piece_getter_retry_policy: PieceGetterRetryPolicy::default(), + farmer_protocol_info: &farmer_protocol_info, + kzg: &kzg, + erasure_coding: &erasure_coding, pieces_in_sector, - &mut plotted_sector_bytes, - &mut plotted_sector_metadata_bytes, - &mut table_generator, - )) + sector_output: &mut plotted_sector_bytes, + sector_metadata_output: &mut plotted_sector_metadata_bytes, + downloading_semaphore: black_box(None), + encoding_semaphore: black_box(None), + table_generator: &mut table_generator, + })) .unwrap(); (plotted_sector, plotted_sector_bytes) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 6919d62d58..c1ea583c90 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -151,34 +151,74 @@ pub enum PlottingError { }, } -/// Plot a single sector, where `sector` and `sector_metadata` must be positioned correctly at the -/// beginning of the sector (seek to desired offset before calling this function and seek back -/// afterwards if necessary). +/// Options for plotting a sector. /// /// Sector output and sector metadata output should be either empty (in which case they'll be /// resized to correct size automatically) or correctly sized from the beginning or else error will /// be returned. +pub struct PlotSectorOptions<'a, PosTable, PG> +where + PosTable: Table, +{ + /// Public key corresponding to sector + pub public_key: &'a PublicKey, + /// Sector index + pub sector_index: SectorIndex, + /// Getter for pieces of archival history + pub piece_getter: &'a PG, + /// Retry policy for piece getter + pub piece_getter_retry_policy: PieceGetterRetryPolicy, + /// Farmer protocol info + pub farmer_protocol_info: &'a FarmerProtocolInfo, + /// KZG instance + pub kzg: &'a Kzg, + /// Erasure coding instance + pub erasure_coding: &'a ErasureCoding, + /// How many pieces should sector contain + pub pieces_in_sector: u16, + /// Where plotted sector should be written, vector must either be empty (in which case it'll be + /// resized to correct size automatically) or correctly sized from the beginning + pub sector_output: &'a mut Vec, + /// Where plotted sector metadata should be written, vector must either be empty (in which case + /// it'll be resized to correct size automatically) or correctly sized from the beginning + pub sector_metadata_output: &'a mut Vec, + /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory + /// usage of the plotting process, permit will be held until the end of the plotting process + pub downloading_semaphore: Option<&'a Semaphore>, + /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically + /// allow one permit at a time for efficient CPU utilization + pub encoding_semaphore: Option<&'a Semaphore>, + /// Proof of space table generator + pub table_generator: &'a mut PosTable::Generator, +} + +/// Plot a single sector. /// /// NOTE: Even though this function is async, it has blocking code inside and must be running in a /// separate thread in order to prevent blocking an executor. -#[allow(clippy::too_many_arguments)] -pub async fn plot_sector( - public_key: &PublicKey, - sector_index: SectorIndex, - piece_getter: &PG, - piece_getter_retry_policy: PieceGetterRetryPolicy, - farmer_protocol_info: &FarmerProtocolInfo, - kzg: &Kzg, - erasure_coding: &ErasureCoding, - pieces_in_sector: u16, - sector_output: &mut Vec, - sector_metadata_output: &mut Vec, - table_generator: &mut PosTable::Generator, +pub async fn plot_sector( + options: PlotSectorOptions<'_, PosTable, PG>, ) -> Result where - PG: PieceGetter, PosTable: Table, + PG: PieceGetter, { + let PlotSectorOptions { + public_key, + sector_index, + piece_getter, + piece_getter_retry_policy, + farmer_protocol_info, + kzg, + erasure_coding, + pieces_in_sector, + sector_output, + sector_metadata_output, + downloading_semaphore, + encoding_semaphore, + table_generator, + } = options; + if erasure_coding.max_shards() < Record::NUM_S_BUCKETS { return Err(PlottingError::InvalidErasureCodingInstance); } @@ -201,6 +241,11 @@ where }); } + let _downloading_permit = match downloading_semaphore { + Some(downloading_semaphore) => Some(downloading_semaphore.acquire().await), + None => None, + }; + let sector_id = SectorId::new(public_key.hash(), sector_index); let piece_indexes: Vec = (PieceOffset::ZERO..) @@ -263,6 +308,11 @@ where let mut raw_sector = raw_sector.into_inner(); + let _encoding_permit = match encoding_semaphore { + Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await), + None => None, + }; + let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); (PieceOffset::ZERO..) diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index ee1d51a803..4747df2540 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -38,7 +38,6 @@ schnorrkel = "0.9.1" serde = { version = "1.0.183", features = ["derive"] } serde_json = "1.0.106" static_assertions = "1.1.0" -std-semaphore = "0.1.0" ss58-registry = "1.43.0" subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" } subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 85f41595be..79b0d646ea 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -31,6 +31,7 @@ use subspace_networking::libp2p::identity::{ed25519, Keypair}; use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; use tempfile::TempDir; +use tokio::sync::Semaphore; use tracing::{debug, error, info, info_span, warn}; use zeroize::Zeroizing; @@ -55,6 +56,8 @@ where tmp, mut disk_farms, metrics_endpoints, + sector_downloading_concurrency, + sector_encoding_concurrency, farming_thread_pool_size, plotting_thread_pool_size, replotting_thread_pool_size, @@ -206,6 +209,9 @@ where .build()?, ); + let downloading_semaphore = Arc::new(Semaphore::new(sector_downloading_concurrency.get())); + let encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get())); + // TODO: Check plot and metadata sizes to ensure there is enough space for farmer to not // fail later for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() { @@ -224,6 +230,8 @@ where erasure_coding: erasure_coding.clone(), piece_getter: piece_getter.clone(), cache_percentage, + downloading_semaphore: Arc::clone(&downloading_semaphore), + encoding_semaphore: Arc::clone(&encoding_semaphore), farming_thread_pool_size, plotting_thread_pool: Arc::clone(&plotting_thread_pool), replotting_thread_pool: Arc::clone(&replotting_thread_pool), diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index b30de8504b..636500688a 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -9,7 +9,7 @@ use clap::{Parser, ValueHint}; use ss58::parse_ss58_reward_address; use std::fs; use std::net::SocketAddr; -use std::num::NonZeroU8; +use std::num::{NonZeroU8, NonZeroUsize}; use std::path::PathBuf; use std::str::FromStr; use subspace_core_primitives::PublicKey; @@ -91,6 +91,15 @@ struct FarmingArgs { /// one specified endpoint. Format: 127.0.0.1:8080 #[arg(long, alias = "metrics-endpoint")] metrics_endpoints: Vec, + /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of + /// the plotting process, increasing beyond 2 makes practical sense due to limited networking + /// concurrency and will likely result in slower plotting overall + #[arg(long, default_value = "2")] + sector_downloading_concurrency: NonZeroUsize, + /// Defines how many sectors farmer will encode concurrently, should generally never be set to + /// more than 1 because it will most likely result in slower plotting overall + #[arg(long, default_value = "1")] + sector_encoding_concurrency: NonZeroUsize, /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some /// compute-intensive operations during proving), defaults to number of CPU cores available in /// the system diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 6bea0d627f..b7e4ceba5f 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -31,13 +31,12 @@ use static_assertions::const_assert; use std::fs::{File, OpenOptions}; use std::future::Future; use std::io::{Seek, SeekFrom}; -use std::num::{NonZeroU16, NonZeroU8}; +use std::num::NonZeroU8; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::{fmt, fs, io, mem, thread}; -use std_semaphore::{Semaphore, SemaphoreGuard}; +use std::{fs, io, mem, thread}; use subspace_core_primitives::crypto::blake3_hash; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{ @@ -54,7 +53,7 @@ use subspace_proof_of_space::Table; use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse}; use thiserror::Error; use tokio::runtime::Handle; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, Semaphore}; use tracing::{debug, error, info, info_span, trace, warn, Instrument, Span}; use ulid::Ulid; @@ -67,35 +66,6 @@ const RESERVED_PLOT_METADATA: u64 = 1024 * 1024; /// Reserve 1M of space for farm info (for potential future expansion) const RESERVED_FARM_INFO: u64 = 1024 * 1024; -/// Semaphore that limits disk access concurrency in strategic places to the number specified during -/// initialization -#[derive(Clone)] -pub struct SingleDiskSemaphore { - inner: Arc, -} - -impl fmt::Debug for SingleDiskSemaphore { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SingleDiskSemaphore").finish() - } -} - -impl SingleDiskSemaphore { - /// Create new semaphore for limiting concurrency of the major processes working with the same - /// disk - pub fn new(concurrency: NonZeroU16) -> Self { - Self { - inner: Arc::new(Semaphore::new(concurrency.get() as isize)), - } - } - - /// Acquire access, will block current thread until previously acquired guards are dropped and - /// access is released - pub fn acquire(&self) -> SemaphoreGuard<'_> { - self.inner.access() - } -} - /// An identifier for single disk farm, can be used for in logs, thread names, etc. #[derive( Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, @@ -286,6 +256,12 @@ pub struct SingleDiskFarmOptions { pub erasure_coding: ErasureCoding, /// Percentage of allocated space dedicated for caching purposes pub cache_percentage: NonZeroU8, + /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory + /// usage of the plotting process, permit will be held until the end of the plotting process + pub downloading_semaphore: Arc, + /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically + /// allow one permit at a time for efficient CPU utilization + pub encoding_semaphore: Arc, /// Thread pool size used for farming (mostly for blocking I/O, but also for some /// compute-intensive operations during proving) pub farming_thread_pool_size: usize, @@ -609,17 +585,14 @@ impl SingleDiskFarm { kzg, erasure_coding, cache_percentage, + downloading_semaphore, + encoding_semaphore, farming_thread_pool_size, plotting_thread_pool, replotting_thread_pool, } = options; fs::create_dir_all(&directory)?; - // TODO: Parametrize concurrency, much higher default due to SSD focus - // TODO: Use this or remove - let _single_disk_semaphore = - SingleDiskSemaphore::new(NonZeroU16::new(10).expect("Not a zero; qed")); - // TODO: Update `Identity` to use more specific error type and remove this `.unwrap()` let identity = Identity::open_or_create(&directory).unwrap(); let public_key = identity.public_key().to_bytes().into(); @@ -919,6 +892,8 @@ impl SingleDiskFarm { handlers, modifying_sector_index, sectors_to_plot_receiver, + downloading_semaphore, + encoding_semaphore, plotting_thread_pool, replotting_thread_pool, }; diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 0bed39e3f5..1f8f7ee874 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -27,12 +27,13 @@ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::plotting; use subspace_farmer_components::plotting::{ - plot_sector, PieceGetter, PieceGetterRetryPolicy, PlottedSector, + plot_sector, PieceGetter, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, }; use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_proof_of_space::Table; use thiserror::Error; use tokio::runtime::Handle; +use tokio::sync::Semaphore; use tracing::{debug, info, trace, warn}; const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500); @@ -101,6 +102,12 @@ pub(super) struct PlottingOptions { pub(super) handlers: Arc, pub(super) modifying_sector_index: Arc>>, pub(super) sectors_to_plot_receiver: mpsc::Receiver, + /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory + /// usage of the plotting process, permit will be held until the end of the plotting process + pub(crate) downloading_semaphore: Arc, + /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically + /// allow one permit at a time for efficient CPU utilization + pub(crate) encoding_semaphore: Arc, pub(super) plotting_thread_pool: Arc, pub(super) replotting_thread_pool: Arc, } @@ -133,6 +140,8 @@ where handlers, modifying_sector_index, mut sectors_to_plot_receiver, + downloading_semaphore, + encoding_semaphore, plotting_thread_pool, replotting_thread_pool, } = plotting_options; @@ -211,22 +220,28 @@ where let piece_getter = piece_getter.clone(); let kzg = kzg.clone(); let erasure_coding = erasure_coding.clone(); + let downloading_semaphore = Arc::clone(&downloading_semaphore); + let encoding_semaphore = Arc::clone(&encoding_semaphore); let plotting_fn = move || { tokio::task::block_in_place(move || { - let plot_sector_fut = plot_sector::<_, PosTable>( - &public_key, + let plot_sector_fut = plot_sector::(PlotSectorOptions { + public_key: &public_key, sector_index, - &piece_getter, - PieceGetterRetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()), - &farmer_app_info.protocol_info, - &kzg, - &erasure_coding, + piece_getter: &piece_getter, + piece_getter_retry_policy: PieceGetterRetryPolicy::Limited( + PIECE_GETTER_RETRY_NUMBER.get(), + ), + farmer_protocol_info: &farmer_app_info.protocol_info, + kzg: &kzg, + erasure_coding: &erasure_coding, pieces_in_sector, - &mut sector, - &mut sector_metadata, - &mut table_generator, - ); + sector_output: &mut sector, + sector_metadata_output: &mut sector_metadata, + downloading_semaphore: Some(&downloading_semaphore), + encoding_semaphore: Some(&encoding_semaphore), + table_generator: &mut table_generator, + }); Handle::current() .block_on(plot_sector_fut) diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index 502df44d17..dd1e749768 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -39,7 +39,9 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector; -use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy, PlottedSector}; +use subspace_farmer_components::plotting::{ + plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, +}; use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_runtime_primitives::opaque::Block; @@ -256,19 +258,21 @@ where min_sector_lifetime: HistorySize::from(NonZeroU64::new(4).unwrap()), }; - let plotted_sector = plot_sector::<_, PosTable>( - &public_key, + let plotted_sector = plot_sector::(PlotSectorOptions { + public_key: &public_key, sector_index, - &archived_segment.pieces, - PieceGetterRetryPolicy::default(), - &farmer_protocol_info, - &kzg, + piece_getter: &archived_segment.pieces, + piece_getter_retry_policy: PieceGetterRetryPolicy::default(), + farmer_protocol_info: &farmer_protocol_info, + kzg: &kzg, erasure_coding, pieces_in_sector, - &mut sector, - &mut sector_metadata, - &mut table_generator, - ) + sector_output: &mut sector, + sector_metadata_output: &mut sector_metadata, + downloading_semaphore: None, + encoding_semaphore: None, + table_generator: &mut table_generator, + }) .await .expect("Plotting one sector in memory must not fail");