diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs index 49af09578d..26927d8262 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -398,7 +398,10 @@ where cuda_devices .into_iter() .map(|cuda_device| CudaRecordsEncoder::new(cuda_device, Arc::clone(&global_mutex))) - .collect(), + .collect::>() + .map_err(|error| { + anyhow::anyhow!("Failed to create CUDA records encoder: {error}") + })?, global_mutex, kzg, erasure_coding, @@ -477,7 +480,10 @@ where rocm_devices .into_iter() .map(|rocm_device| RocmRecordsEncoder::new(rocm_device, Arc::clone(&global_mutex))) - .collect(), + .collect::>() + .map_err(|error| { + anyhow::anyhow!("Failed to create ROCm records encoder: {error}") + })?, global_mutex, kzg, erasure_coding, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index a6736ed127..1379f42025 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -1072,7 +1072,10 @@ where cuda_devices .into_iter() .map(|cuda_device| CudaRecordsEncoder::new(cuda_device, Arc::clone(&global_mutex))) - .collect(), + .collect::>() + .map_err(|error| { + anyhow::anyhow!("Failed to create CUDA records encoder: {error}") + })?, global_mutex, kzg, erasure_coding, @@ -1151,7 +1154,10 @@ where rocm_devices .into_iter() .map(|rocm_device| RocmRecordsEncoder::new(rocm_device, Arc::clone(&global_mutex))) - .collect(), + .collect::>() + .map_err(|error| { + anyhow::anyhow!("Failed to create ROCm records encoder: {error}") + })?, global_mutex, kzg, erasure_coding, diff --git a/crates/subspace-farmer/src/plotter/gpu/cuda.rs b/crates/subspace-farmer/src/plotter/gpu/cuda.rs index bbbedff731..84af098f9d 100644 --- a/crates/subspace-farmer/src/plotter/gpu/cuda.rs +++ b/crates/subspace-farmer/src/plotter/gpu/cuda.rs @@ -2,6 +2,9 @@ use crate::plotter::gpu::GpuRecordsEncoder; use async_lock::Mutex as AsyncMutex; +use parking_lot::Mutex; +use rayon::{current_thread_index, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder}; +use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use subspace_core_primitives::pieces::{PieceOffset, Record}; @@ -14,6 +17,7 @@ use subspace_proof_of_space_gpu::cuda::CudaDevice; #[derive(Debug)] pub struct CudaRecordsEncoder { cuda_device: CudaDevice, + thread_pool: ThreadPool, global_mutex: Arc>, } @@ -34,21 +38,46 @@ impl RecordsEncoder for CudaRecordsEncoder { .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?; let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); - for ((piece_offset, record), mut encoded_chunks_used) in (PieceOffset::ZERO..) - .zip(records.iter_mut()) - .zip(sector_contents_map.iter_record_bitfields_mut()) { - // Take mutex briefly to make sure encoding is allowed right now - self.global_mutex.lock_blocking(); + let iter = Mutex::new( + (PieceOffset::ZERO..) + .zip(records.iter_mut()) + .zip(sector_contents_map.iter_record_bitfields_mut()), + ); + let plotting_error = Mutex::new(None::); - let pos_seed = sector_id.derive_evaluation_seed(piece_offset); + self.thread_pool.scope(|scope| { + scope.spawn_broadcast(|_scope, _ctx| loop { + // Take mutex briefly to make sure encoding is allowed right now + self.global_mutex.lock_blocking(); - self.cuda_device - .generate_and_encode_pospace(&pos_seed, record, encoded_chunks_used.iter_mut()) - .map_err(anyhow::Error::msg)?; + // This instead of `while` above because otherwise mutex will be held for the + // duration of the loop and will limit concurrency to 1 record + let Some(((piece_offset, record), mut encoded_chunks_used)) = + iter.lock().next() + else { + return; + }; + let pos_seed = sector_id.derive_evaluation_seed(piece_offset); - if abort_early.load(Ordering::Relaxed) { - break; + if let Err(error) = self.cuda_device.generate_and_encode_pospace( + &pos_seed, + record, + encoded_chunks_used.iter_mut(), + ) { + plotting_error.lock().replace(error); + return; + } + + if abort_early.load(Ordering::Relaxed) { + return; + } + }); + }); + + let plotting_error = plotting_error.lock().take(); + if let Some(error) = plotting_error { + return Err(anyhow::Error::msg(error)); } } @@ -58,10 +87,38 @@ impl RecordsEncoder for CudaRecordsEncoder { impl CudaRecordsEncoder { /// Create new instance - pub fn new(cuda_device: CudaDevice, global_mutex: Arc>) -> Self { - Self { + pub fn new( + cuda_device: CudaDevice, + global_mutex: Arc>, + ) -> Result { + let id = cuda_device.id(); + let thread_name = move |thread_index| format!("cuda-{id}.{thread_index}"); + // TODO: remove this panic handler when rayon logs panic_info + // https://github.com/rayon-rs/rayon/issues/1208 + let panic_handler = move |panic_info| { + if let Some(index) = current_thread_index() { + eprintln!("panic on thread {}: {:?}", thread_name(index), panic_info); + } else { + // We want to guarantee exit, rather than panicking in a panic handler. + eprintln!( + "rayon panic handler called on non-rayon thread: {:?}", + panic_info + ); + } + exit(1); + }; + + let thread_pool = ThreadPoolBuilder::new() + .thread_name(thread_name) + .panic_handler(panic_handler) + // Make sure there is overlap between records, so GPU is almost always busy + .num_threads(2) + .build()?; + + Ok(Self { cuda_device, + thread_pool, global_mutex, - } + }) } } diff --git a/crates/subspace-farmer/src/plotter/gpu/rocm.rs b/crates/subspace-farmer/src/plotter/gpu/rocm.rs index 2aef4c482a..db875b83e1 100644 --- a/crates/subspace-farmer/src/plotter/gpu/rocm.rs +++ b/crates/subspace-farmer/src/plotter/gpu/rocm.rs @@ -2,6 +2,9 @@ use crate::plotter::gpu::GpuRecordsEncoder; use async_lock::Mutex as AsyncMutex; +use parking_lot::Mutex; +use rayon::{current_thread_index, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder}; +use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use subspace_core_primitives::pieces::{PieceOffset, Record}; @@ -14,6 +17,7 @@ use subspace_proof_of_space_gpu::rocm::RocmDevice; #[derive(Debug)] pub struct RocmRecordsEncoder { rocm_device: RocmDevice, + thread_pool: ThreadPool, global_mutex: Arc>, } @@ -34,21 +38,46 @@ impl RecordsEncoder for RocmRecordsEncoder { .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?; let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); - for ((piece_offset, record), mut encoded_chunks_used) in (PieceOffset::ZERO..) - .zip(records.iter_mut()) - .zip(sector_contents_map.iter_record_bitfields_mut()) { - // Take mutex briefly to make sure encoding is allowed right now - self.global_mutex.lock_blocking(); + let iter = Mutex::new( + (PieceOffset::ZERO..) + .zip(records.iter_mut()) + .zip(sector_contents_map.iter_record_bitfields_mut()), + ); + let plotting_error = Mutex::new(None::); - let pos_seed = sector_id.derive_evaluation_seed(piece_offset); + self.thread_pool.scope(|scope| { + scope.spawn_broadcast(|_scope, _ctx| loop { + // Take mutex briefly to make sure encoding is allowed right now + self.global_mutex.lock_blocking(); - self.rocm_device - .generate_and_encode_pospace(&pos_seed, record, encoded_chunks_used.iter_mut()) - .map_err(anyhow::Error::msg)?; + // This instead of `while` above because otherwise mutex will be held for the + // duration of the loop and will limit concurrency to 1 record + let Some(((piece_offset, record), mut encoded_chunks_used)) = + iter.lock().next() + else { + return; + }; + let pos_seed = sector_id.derive_evaluation_seed(piece_offset); - if abort_early.load(Ordering::Relaxed) { - break; + if let Err(error) = self.rocm_device.generate_and_encode_pospace( + &pos_seed, + record, + encoded_chunks_used.iter_mut(), + ) { + plotting_error.lock().replace(error); + return; + } + + if abort_early.load(Ordering::Relaxed) { + return; + } + }); + }); + + let plotting_error = plotting_error.lock().take(); + if let Some(error) = plotting_error { + return Err(anyhow::Error::msg(error)); } } @@ -58,10 +87,38 @@ impl RecordsEncoder for RocmRecordsEncoder { impl RocmRecordsEncoder { /// Create new instance - pub fn new(rocm_device: RocmDevice, global_mutex: Arc>) -> Self { - Self { + pub fn new( + rocm_device: RocmDevice, + global_mutex: Arc>, + ) -> Result { + let id = rocm_device.id(); + let thread_name = move |thread_index| format!("rocm-{id}.{thread_index}"); + // TODO: remove this panic handler when rayon logs panic_info + // https://github.com/rayon-rs/rayon/issues/1208 + let panic_handler = move |panic_info| { + if let Some(index) = current_thread_index() { + eprintln!("panic on thread {}: {:?}", thread_name(index), panic_info); + } else { + // We want to guarantee exit, rather than panicking in a panic handler. + eprintln!( + "rayon panic handler called on non-rayon thread: {:?}", + panic_info + ); + } + exit(1); + }; + + let thread_pool = ThreadPoolBuilder::new() + .thread_name(thread_name) + .panic_handler(panic_handler) + // Make sure there is overlap between records, so GPU is almost always busy + .num_threads(2) + .build()?; + + Ok(Self { rocm_device, + thread_pool, global_mutex, - } + }) } }