diff --git a/rust/Cargo.lock b/rust/Cargo.lock index a41b145c..a5e912b6 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1187,6 +1187,7 @@ dependencies = [ "serde_json", "serde_tuple", "tempfile", + "yastl", ] [[package]] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index fce2fea9..17fb80cc 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -46,13 +46,14 @@ serde = "1.0.117" serde_tuple = "0.5" safer-ffi = { version = "0.0.7", features = ["proc_macros"] } filecoin-proofs-api = { version = "16.1", default-features = false } +yastl = "0.1.2" [dev-dependencies] memmap2 = "0.5" tempfile = "3.0.8" [features] -default = ["cuda", "multicore-sdr" ] +default = ["cuda", "multicore-sdr"] blst-portable = ["bls-signatures/blst-portable", "blstrs/portable"] cuda = ["filecoin-proofs-api/cuda", "rust-gpu-tools/cuda", "fvm2/cuda", "fvm3/cuda", "fvm4/cuda"] cuda-supraseal = ["filecoin-proofs-api/cuda-supraseal", "rust-gpu-tools/cuda", "fvm3/cuda-supraseal", "fvm4/cuda-supraseal"] diff --git a/rust/src/fvm/engine.rs b/rust/src/fvm/engine.rs index 2b1aa8c0..7a03fb2b 100644 --- a/rust/src/fvm/engine.rs +++ b/rust/src/fvm/engine.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::ops::RangeInclusive; use std::sync::{Arc, Mutex}; use anyhow::anyhow; @@ -18,7 +17,7 @@ use super::externs::CgoExterns; use super::types::*; // Generic executor; uses the current (v3) engine types -pub trait CgoExecutor { +pub trait CgoExecutor: Send { fn execute_message( &mut self, msg: Message, @@ -65,9 +64,6 @@ pub struct MultiEngineContainer { engines: Mutex>>, } -const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY"; -const VALID_CONCURRENCY_RANGE: RangeInclusive = 1..=128; - impl TryFrom for EngineVersion { type Error = anyhow::Error; fn try_from(value: u32) -> Result { @@ -81,41 +77,6 @@ impl TryFrom for EngineVersion { } impl MultiEngineContainer { - /// Constructs a new multi-engine container with the default concurrency (4). - pub fn new() -> MultiEngineContainer { - Self::with_concurrency(4) - } - - /// Constructs a new multi-engine container with the concurrency specified in the - /// `LOTUS_FVM_CONCURRENCY` environment variable. - pub fn new_env() -> MultiEngineContainer { - let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) { - Some(v) => v, - None => return Self::new(), - }; - let valstr = match valosstr.to_str() { - Some(s) => s, - None => { - log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value"); - return Self::new(); - } - }; - let concurrency: u32 = match valstr.parse() { - Ok(v) => v, - Err(e) => { - log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}"); - return Self::new(); - } - }; - if !VALID_CONCURRENCY_RANGE.contains(&concurrency) { - log::error!( - "{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}" - ); - return Self::new(); - } - Self::with_concurrency(concurrency) - } - pub fn with_concurrency(concurrency: u32) -> MultiEngineContainer { MultiEngineContainer { engines: Mutex::new(HashMap::new()), @@ -146,12 +107,6 @@ impl MultiEngineContainer { } } -impl Default for MultiEngineContainer { - fn default() -> MultiEngineContainer { - MultiEngineContainer::new() - } -} - // fvm v4 implementation mod v4 { use anyhow::anyhow; @@ -160,10 +115,7 @@ mod v4 { use fvm4::call_manager::DefaultCallManager as DefaultCallManager4; use fvm4::engine::{EnginePool as EnginePool4, MultiEngine as MultiEngine4}; - use fvm4::executor::{ - ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4, - ThreadedExecutor as ThreadedExecutor4, - }; + use fvm4::executor::{ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4}; use fvm4::kernel::filecoin::DefaultFilecoinKernel as DefaultFilecoinKernel4; use fvm4::machine::{DefaultMachine as DefaultMachine4, NetworkConfig}; use fvm4_shared::{chainid::ChainID, clock::ChainEpoch, message::Message}; @@ -175,14 +127,13 @@ mod v4 { use super::Config; type CgoMachine4 = DefaultMachine4; - type BaseExecutor4 = DefaultExecutor4>>; - type CgoExecutor4 = ThreadedExecutor4; + type CgoExecutor4 = DefaultExecutor4>>; fn new_executor( engine_pool: EnginePool4, machine: CgoMachine4, ) -> anyhow::Result { - Ok(ThreadedExecutor4(BaseExecutor4::new(engine_pool, machine)?)) + CgoExecutor4::new(engine_pool, machine) } impl CgoExecutor for CgoExecutor4 { @@ -254,8 +205,7 @@ mod v3 { }; use fvm3::engine::{EnginePool as EnginePool3, MultiEngine as MultiEngine3}; use fvm3::executor::{ - ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3, - DefaultExecutor as DefaultExecutor3, ThreadedExecutor as ThreadedExecutor3, + ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3, DefaultExecutor as DefaultExecutor3, }; use fvm3::machine::{DefaultMachine as DefaultMachine3, NetworkConfig as NetworkConfig3}; use fvm3::trace::ExecutionEvent as ExecutionEvent3; @@ -284,14 +234,13 @@ mod v3 { use super::Config; type CgoMachine3 = DefaultMachine3; - type BaseExecutor3 = DefaultExecutor3>>; - type CgoExecutor3 = ThreadedExecutor3; + type CgoExecutor3 = DefaultExecutor3>>; fn new_executor( engine_pool: EnginePool3, machine: CgoMachine3, ) -> anyhow::Result { - Ok(ThreadedExecutor3(BaseExecutor3::new(engine_pool, machine)?)) + CgoExecutor3::new(engine_pool, machine) } impl CgoExecutor for CgoExecutor3 { @@ -533,8 +482,7 @@ mod v2 { backtrace::Cause as Cause2, DefaultCallManager as DefaultCallManager2, }; use fvm2::executor::{ - ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2, - DefaultExecutor as DefaultExecutor2, ThreadedExecutor as ThreadedExecutor2, + ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2, DefaultExecutor as DefaultExecutor2, }; use fvm2::machine::{ DefaultMachine as DefaultMachine2, MultiEngine as MultiEngine2, @@ -565,11 +513,10 @@ mod v2 { use super::Config; type CgoMachine2 = DefaultMachine2; - type BaseExecutor2 = DefaultExecutor2>>; - type CgoExecutor2 = ThreadedExecutor2; + type CgoExecutor2 = DefaultExecutor2>>; fn new_executor(machine: CgoMachine2) -> CgoExecutor2 { - ThreadedExecutor2(BaseExecutor2::new(machine)) + CgoExecutor2::new(machine) } fn bytes_to_block(bytes: RawBytes) -> Option { diff --git a/rust/src/fvm/machine.rs b/rust/src/fvm/machine.rs index 87437d26..517419d9 100644 --- a/rust/src/fvm/machine.rs +++ b/rust/src/fvm/machine.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::convert::{TryFrom, TryInto}; +use std::ops::RangeInclusive; use anyhow::{anyhow, Context}; use cid::Cid; @@ -27,8 +28,54 @@ use super::types::*; use crate::destructor; use crate::util::types::{catch_panic_response, catch_panic_response_no_default, Result}; +const STACK_SIZE: usize = 64 << 20; // 64MiB + lazy_static! { - static ref ENGINES: MultiEngineContainer = MultiEngineContainer::new_env(); + static ref CONCURRENCY: u32 = get_concurrency(); + static ref ENGINES: MultiEngineContainer = MultiEngineContainer::with_concurrency(*CONCURRENCY); + static ref THREAD_POOL: yastl::Pool = yastl::Pool::with_config( + *CONCURRENCY as usize, + yastl::ThreadConfig::new() + .prefix("fvm") + .stack_size(STACK_SIZE) + ); +} + +const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY"; +const VALID_CONCURRENCY_RANGE: RangeInclusive = 1..=256; + +fn available_parallelism() -> u32 { + std::thread::available_parallelism() + .map(usize::from) + .unwrap_or(8) as u32 +} + +fn get_concurrency() -> u32 { + let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) { + Some(v) => v, + None => return available_parallelism(), + }; + let valstr = match valosstr.to_str() { + Some(s) => s, + None => { + log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value"); + return available_parallelism(); + } + }; + let concurrency: u32 = match valstr.parse() { + Ok(v) => v, + Err(e) => { + log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}"); + return available_parallelism(); + } + }; + if !VALID_CONCURRENCY_RANGE.contains(&concurrency) { + log::error!( + "{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}" + ); + return available_parallelism(); + } + concurrency } #[allow(clippy::too_many_arguments)] @@ -181,6 +228,23 @@ fn create_fvm_debug_machine( ) } +fn with_new_stack(name: &str, pool: &yastl::Pool, callback: F) -> repr_c::Box> +where + T: Sized + Default + Send, + F: FnOnce() -> anyhow::Result + std::panic::UnwindSafe + Send, +{ + let mut res = None; + pool.scoped(|scope| scope.execute(|| res = Some(catch_panic_response(name, callback)))); + + res.unwrap_or_else(|| { + repr_c::Box::new(Result::err( + format!("failed to schedule {name}") + .into_bytes() + .into_boxed_slice(), + )) + }) +} + #[ffi_export] fn fvm_machine_execute_message( executor: &'_ InnerFvmMachine, @@ -188,7 +252,8 @@ fn fvm_machine_execute_message( chain_len: u64, apply_kind: u64, /* 0: Explicit, _: Implicit */ ) -> repr_c::Box> { - catch_panic_response("fvm_machine_execute_message", || { + // Execute in the thread-pool because we need a 64MiB stack. + with_new_stack("fvm_machine_execute_message", &THREAD_POOL, || { let apply_kind = if apply_kind == 0 { ApplyKind::Explicit } else {