From 4eabe5e0dddc4cd31ad9dab5645350360d4d36a5 Mon Sep 17 00:00:00 2001 From: maksimryndin Date: Fri, 19 Apr 2024 15:36:36 +0200 Subject: [PATCH] Pvf refactor execute worker errors follow up (#4071) follow up of https://github.com/paritytech/polkadot-sdk/pull/2604 closes https://github.com/paritytech/polkadot-sdk/pull/2604 - [x] take relevant changes from Marcin's PR - [x] extract common duplicate code for workers (low-hanging fruits) ~Some unpassed ci problems are more general and should be fixed in master (see https://github.com/paritytech/polkadot-sdk/pull/4074)~ Proposed labels: **T0-node**, **R0-silent**, **I4-refactor** ----- kusama address: FZXVQLqLbFV2otNXs6BMnNch54CFJ1idpWwjMb3Z8fTLQC6 --------- Co-authored-by: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com> --- Cargo.lock | 6 +- polkadot/node/core/pvf/Cargo.toml | 22 ++- polkadot/node/core/pvf/common/Cargo.toml | 7 +- polkadot/node/core/pvf/common/src/error.rs | 3 + polkadot/node/core/pvf/common/src/execute.rs | 36 ++-- polkadot/node/core/pvf/common/src/lib.rs | 1 + polkadot/node/core/pvf/common/src/pvf.rs | 7 +- .../node/core/pvf/common/src/worker/mod.rs | 84 +++++++- .../node/core/pvf/execute-worker/src/lib.rs | 180 +++++++++--------- .../node/core/pvf/prepare-worker/src/lib.rs | 51 +---- polkadot/node/core/pvf/src/execute/queue.rs | 55 ++++-- .../core/pvf/src/execute/worker_interface.rs | 163 ++++++++-------- polkadot/node/core/pvf/src/host.rs | 5 +- 13 files changed, 333 insertions(+), 287 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7bf5215b6dec..951f2548d34d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7575,9 +7575,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.152" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libflate" @@ -13303,7 +13303,6 @@ dependencies = [ "slotmap", "sp-core", "sp-maybe-compressed-blob", - "sp-wasm-interface 20.0.0", "tempfile", "test-parachain-adder", "test-parachain-halt", @@ -13340,7 +13339,6 @@ name = "polkadot-node-core-pvf-common" version = "7.0.0" dependencies = [ "assert_matches", - "cfg-if", "cpu-time", "futures", "landlock", diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml index a0233d6b7517..8bfe2baa42fd 100644 --- a/polkadot/node/core/pvf/Cargo.toml +++ b/polkadot/node/core/pvf/Cargo.toml @@ -17,8 +17,7 @@ cfg-if = "1.0" futures = "0.3.30" futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../../gum" } -is_executable = "1.0.1" -libc = "0.2.152" +is_executable = { version = "1.0.1", optional = true } pin-project = "1.0.9" rand = "0.8.5" slotmap = "1.0" @@ -26,7 +25,9 @@ tempfile = "3.3.0" thiserror = { workspace = true } tokio = { version = "1.24.2", features = ["fs", "process"] } -parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] } +parity-scale-codec = { version = "3.6.1", default-features = false, features = [ + "derive", +] } polkadot-parachain-primitives = { path = "../../../parachain" } polkadot-core-primitives = { path = "../../../core-primitives" } @@ -37,14 +38,16 @@ polkadot-node-subsystem = { path = "../../subsystem" } polkadot-primitives = { path = "../../../primitives" } sp-core = { path = "../../../../substrate/primitives/core" } -sp-wasm-interface = { path = "../../../../substrate/primitives/wasm-interface" } -sp-maybe-compressed-blob = { path = "../../../../substrate/primitives/maybe-compressed-blob" } +sp-maybe-compressed-blob = { path = "../../../../substrate/primitives/maybe-compressed-blob", optional = true } polkadot-node-core-pvf-prepare-worker = { path = "prepare-worker", optional = true } polkadot-node-core-pvf-execute-worker = { path = "execute-worker", optional = true } [dev-dependencies] assert_matches = "1.4.0" -criterion = { version = "0.4.0", default-features = false, features = ["async_tokio", "cargo_bench_support"] } +criterion = { version = "0.4.0", default-features = false, features = [ + "async_tokio", + "cargo_bench_support", +] } hex-literal = "0.4.1" polkadot-node-core-pvf-common = { path = "common", features = ["test-utils"] } @@ -57,6 +60,7 @@ adder = { package = "test-parachain-adder", path = "../../../parachain/test-para halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" } [target.'cfg(target_os = "linux")'.dev-dependencies] +libc = "0.2.153" procfs = "0.16.0" rusty-fork = "0.3.0" sc-sysinfo = { path = "../../../../substrate/client/sysinfo" } @@ -70,6 +74,8 @@ ci-only-tests = [] jemalloc-allocator = ["polkadot-node-core-pvf-common/jemalloc-allocator"] # This feature is used to export test code to other crates without putting it in the production build. test-utils = [ - "polkadot-node-core-pvf-execute-worker", - "polkadot-node-core-pvf-prepare-worker", + "dep:is_executable", + "dep:polkadot-node-core-pvf-execute-worker", + "dep:polkadot-node-core-pvf-prepare-worker", + "dep:sp-maybe-compressed-blob", ] diff --git a/polkadot/node/core/pvf/common/Cargo.toml b/polkadot/node/core/pvf/common/Cargo.toml index f3eb9d919aae..e1ce6e79cb99 100644 --- a/polkadot/node/core/pvf/common/Cargo.toml +++ b/polkadot/node/core/pvf/common/Cargo.toml @@ -10,14 +10,16 @@ license.workspace = true workspace = true [dependencies] -cfg-if = "1.0" cpu-time = "1.0.0" futures = "0.3.30" gum = { package = "tracing-gum", path = "../../../gum" } libc = "0.2.152" +nix = { version = "0.27.1", features = ["resource", "sched"] } thiserror = { workspace = true } -parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] } +parity-scale-codec = { version = "3.6.1", default-features = false, features = [ + "derive", +] } polkadot-parachain-primitives = { path = "../../../../parachain" } polkadot-primitives = { path = "../../../../primitives" } @@ -34,7 +36,6 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" } [target.'cfg(target_os = "linux")'.dependencies] landlock = "0.3.0" -nix = { version = "0.27.1", features = ["sched"] } [target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies] seccompiler = "0.4.0" diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs index cf274044456f..adeb40c0b195 100644 --- a/polkadot/node/core/pvf/common/src/error.rs +++ b/polkadot/node/core/pvf/common/src/error.rs @@ -136,6 +136,9 @@ pub enum InternalValidationError { /// Could not find or open compiled artifact file. #[error("validation: could not find or open compiled artifact file: {0}")] CouldNotOpenFile(String), + /// Could not create a pipe between the worker and a child process. + #[error("validation: could not create pipe: {0}")] + CouldNotCreatePipe(String), /// Host could not clear the worker cache after a job. #[error("validation: host could not clear the worker cache ({path:?}) after a job: {err}")] CouldNotClearWorkerDir { diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs index 18c97b03cbcd..ae6096cacec4 100644 --- a/polkadot/node/core/pvf/common/src/execute.rs +++ b/polkadot/node/core/pvf/common/src/execute.rs @@ -30,35 +30,36 @@ pub struct Handshake { /// The response from the execution worker. #[derive(Debug, Encode, Decode)] -pub enum WorkerResponse { - /// The job completed successfully. - Ok { - /// The result of parachain validation. - result_descriptor: ValidationResult, - /// The amount of CPU time taken by the job. - duration: Duration, - }, - /// The candidate is invalid. - InvalidCandidate(String), - /// Instantiation of the WASM module instance failed during an execution. - /// Possibly related to local issues or dirty node update. May be retried with re-preparation. - RuntimeConstruction(String), +pub struct WorkerResponse { + /// The response from the execute job process. + pub job_response: JobResponse, + /// The amount of CPU time taken by the job. + pub duration: Duration, +} + +/// An error occurred in the worker process. +#[derive(thiserror::Error, Debug, Clone, Encode, Decode)] +pub enum WorkerError { /// The job timed out. + #[error("The job timed out")] JobTimedOut, /// The job process has died. We must kill the worker just in case. /// /// We cannot treat this as an internal error because malicious code may have killed the job. /// We still retry it, because in the non-malicious case it is likely spurious. + #[error("The job process (pid {job_pid}) has died: {err}")] JobDied { err: String, job_pid: i32 }, /// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic, /// etc. /// /// Because malicious code can cause a job error, we must not treat it as an internal error. We /// still retry it, because in the non-malicious case it is likely spurious. - JobError(String), + #[error("An unexpected error occurred in the job process: {0}")] + JobError(#[from] JobError), /// Some internal error occurred. - InternalError(InternalValidationError), + #[error("An internal error occurred: {0}")] + InternalError(#[from] InternalValidationError), } /// The result of a job on the execution worker. @@ -101,7 +102,7 @@ impl JobResponse { /// An unexpected error occurred in the execution job process. Because this comes from the job, /// which executes untrusted code, this error must likewise be treated as untrusted. That is, we /// cannot raise an internal error based on this. -#[derive(thiserror::Error, Debug, Encode, Decode)] +#[derive(thiserror::Error, Clone, Debug, Encode, Decode)] pub enum JobError { #[error("The job timed out")] TimedOut, @@ -114,4 +115,7 @@ pub enum JobError { CouldNotSpawnThread(String), #[error("An error occurred in the CPU time monitor thread: {0}")] CpuTimeMonitorThread(String), + /// Since the job can return any exit status it wants, we have to treat this as untrusted. + #[error("Unexpected exit status: {0}")] + UnexpectedExitStatus(i32), } diff --git a/polkadot/node/core/pvf/common/src/lib.rs b/polkadot/node/core/pvf/common/src/lib.rs index 15097dbd3af5..0cd928201639 100644 --- a/polkadot/node/core/pvf/common/src/lib.rs +++ b/polkadot/node/core/pvf/common/src/lib.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . //! Contains functionality related to PVFs that is shared by the PVF host and the PVF workers. +#![deny(unused_crate_dependencies)] pub mod error; pub mod execute; diff --git a/polkadot/node/core/pvf/common/src/pvf.rs b/polkadot/node/core/pvf/common/src/pvf.rs index 340dffe07c3f..5f248f49b9a3 100644 --- a/polkadot/node/core/pvf/common/src/pvf.rs +++ b/polkadot/node/core/pvf/common/src/pvf.rs @@ -18,12 +18,7 @@ use crate::prepare::PrepareJobKind; use parity_scale_codec::{Decode, Encode}; use polkadot_parachain_primitives::primitives::ValidationCodeHash; use polkadot_primitives::ExecutorParams; -use std::{ - cmp::{Eq, PartialEq}, - fmt, - sync::Arc, - time::Duration, -}; +use std::{fmt, sync::Arc, time::Duration}; /// A struct that carries the exhaustive set of data to prepare an artifact out of plain /// Wasm binary diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs index d7c95d9e7047..67e7bece407d 100644 --- a/polkadot/node/core/pvf/common/src/worker/mod.rs +++ b/polkadot/node/core/pvf/common/src/worker/mod.rs @@ -18,10 +18,13 @@ pub mod security; -use crate::{framed_recv_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET}; +use crate::{ + framed_recv_blocking, framed_send_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET, +}; use cpu_time::ProcessTime; use futures::never::Never; -use parity_scale_codec::Decode; +use nix::{errno::Errno, sys::resource::Usage}; +use parity_scale_codec::{Decode, Encode}; use std::{ any::Any, fmt::{self}, @@ -58,8 +61,6 @@ macro_rules! decl_worker_main { $crate::sp_tracing::try_init_simple(); - let worker_pid = std::process::id(); - let args = std::env::args().collect::>(); if args.len() == 1 { print_help($expected_command); @@ -548,6 +549,81 @@ fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result Ok(worker_handshake) } +/// Calculate the total CPU time from the given `usage` structure, returned from +/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user +/// and system time. +/// +/// # Arguments +/// +/// - `rusage`: Contains resource usage information. +/// +/// # Returns +/// +/// Returns a `Duration` representing the total CPU time. +pub fn get_total_cpu_usage(rusage: Usage) -> Duration { + let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) + + (rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64; + + return Duration::from_micros(micros) +} + +/// Get a job response. +pub fn recv_child_response( + received_data: &mut io::BufReader<&[u8]>, + context: &'static str, +) -> io::Result +where + T: Decode, +{ + let response_bytes = framed_recv_blocking(received_data)?; + T::decode(&mut response_bytes.as_slice()).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("{} pvf recv_child_response: decode error: {}", context, e), + ) + }) +} + +pub fn send_result( + stream: &mut UnixStream, + result: Result, + worker_info: &WorkerInfo, +) -> io::Result<()> +where + T: std::fmt::Debug, + E: std::fmt::Debug + std::fmt::Display, + Result: Encode, +{ + if let Err(ref err) = result { + gum::warn!( + target: LOG_TARGET, + ?worker_info, + "worker: error occurred: {}", + err + ); + } + gum::trace!( + target: LOG_TARGET, + ?worker_info, + "worker: sending result to host: {:?}", + result + ); + + framed_send_blocking(stream, &result.encode()).map_err(|err| { + gum::warn!( + target: LOG_TARGET, + ?worker_info, + "worker: error occurred sending result to host: {}", + err + ); + err + }) +} + +pub fn stringify_errno(context: &'static str, errno: Errno) -> String { + format!("{}: {}: {}", context, errno, io::Error::last_os_error()) +} + /// Functionality related to threads spawned by the workers. /// /// The motivation for this module is to coordinate worker threads without using async Rust. diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index bd7e76010a6d..55f5290bd87e 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -16,6 +16,9 @@ //! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary. +#![deny(unused_crate_dependencies)] +#![warn(missing_docs)] + pub use polkadot_node_core_pvf_common::{ error::ExecuteError, executor_interface::execute_artifact, }; @@ -36,11 +39,12 @@ use nix::{ use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::InternalValidationError, - execute::{Handshake, JobError, JobResponse, JobResult, WorkerResponse}, + execute::{Handshake, JobError, JobResponse, JobResult, WorkerError, WorkerResponse}, executor_interface::params_to_wasmtime_semantics, framed_recv_blocking, framed_send_blocking, worker::{ - cpu_time_monitor_loop, pipe2_cloexec, run_worker, stringify_panic_payload, + cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker, + send_result, stringify_errno, stringify_panic_payload, thread::{self, WaitOutcome}, PipeFd, WorkerInfo, WorkerKind, }, @@ -93,8 +97,14 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> { Ok((params, execution_timeout)) } -fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Result<()> { - framed_send_blocking(stream, &response.encode()) +/// Sends an error to the host and returns the original error wrapped in `io::Error`. +macro_rules! map_and_send_err { + ($error:expr, $err_constructor:expr, $stream:expr, $worker_info:expr) => {{ + let err: WorkerError = $err_constructor($error.to_string()).into(); + let io_err = io::Error::new(io::ErrorKind::Other, err.to_string()); + let _ = send_result::($stream, Err(err), $worker_info); + io_err + }}; } /// The entrypoint that the spawned execute worker should start with. @@ -110,8 +120,6 @@ fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Resul /// check is not necessary. /// /// - `worker_version`: see above -/// -/// - `security_status`: contains the detected status of security features. pub fn worker_entrypoint( socket_path: PathBuf, worker_dir_path: PathBuf, @@ -127,13 +135,28 @@ pub fn worker_entrypoint( |mut stream, worker_info, security_status| { let artifact_path = worker_dir::execute_artifact(&worker_info.worker_dir_path); - let Handshake { executor_params } = recv_execute_handshake(&mut stream)?; + let Handshake { executor_params } = + recv_execute_handshake(&mut stream).map_err(|e| { + map_and_send_err!( + e, + InternalValidationError::HostCommunication, + &mut stream, + worker_info + ) + })?; let executor_params: Arc = Arc::new(executor_params); let execute_thread_stack_size = max_stack_size(&executor_params); loop { - let (params, execution_timeout) = recv_request(&mut stream)?; + let (params, execution_timeout) = recv_request(&mut stream).map_err(|e| { + map_and_send_err!( + e, + InternalValidationError::HostCommunication, + &mut stream, + worker_info + ) + })?; gum::debug!( target: LOG_TARGET, ?worker_info, @@ -143,27 +166,34 @@ pub fn worker_entrypoint( ); // Get the artifact bytes. - let compiled_artifact_blob = match std::fs::read(&artifact_path) { - Ok(bytes) => bytes, - Err(err) => { - let response = WorkerResponse::InternalError( - InternalValidationError::CouldNotOpenFile(err.to_string()), - ); - send_response(&mut stream, response)?; - continue - }, - }; - - let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?; - - let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { - Ok(usage) => usage, - Err(errno) => { - let response = internal_error_from_errno("getrusage before", errno); - send_response(&mut stream, response)?; - continue - }, - }; + let compiled_artifact_blob = std::fs::read(&artifact_path).map_err(|e| { + map_and_send_err!( + e, + InternalValidationError::CouldNotOpenFile, + &mut stream, + worker_info + ) + })?; + + let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec().map_err(|e| { + map_and_send_err!( + e, + InternalValidationError::CouldNotCreatePipe, + &mut stream, + worker_info + ) + })?; + + let usage_before = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) + .map_err(|errno| { + let e = stringify_errno("getrusage before", errno); + map_and_send_err!( + e, + InternalValidationError::Kernel, + &mut stream, + worker_info + ) + })?; let stream_fd = stream.as_raw_fd(); let compiled_artifact_blob = Arc::new(compiled_artifact_blob); @@ -222,7 +252,7 @@ pub fn worker_entrypoint( "worker: sending result to host: {:?}", result ); - send_response(&mut stream, result)?; + send_result(&mut stream, result, worker_info)?; } }, ); @@ -270,7 +300,7 @@ fn handle_clone( worker_info: &WorkerInfo, have_unshare_newuser: bool, usage_before: Usage, -) -> io::Result { +) -> io::Result> { use polkadot_node_core_pvf_common::worker::security; // SAFETY: new process is spawned within a single threaded process. This invariant @@ -301,7 +331,8 @@ fn handle_clone( usage_before, execution_timeout, ), - Err(security::clone::Error::Clone(errno)) => Ok(internal_error_from_errno("clone", errno)), + Err(security::clone::Error::Clone(errno)) => + Ok(Err(internal_error_from_errno("clone", errno))), } } @@ -316,7 +347,7 @@ fn handle_fork( execute_worker_stack_size: usize, worker_info: &WorkerInfo, usage_before: Usage, -) -> io::Result { +) -> io::Result> { // SAFETY: new process is spawned within a single threaded process. This invariant // is enforced by tests. match unsafe { nix::unistd::fork() } { @@ -338,7 +369,7 @@ fn handle_fork( usage_before, execution_timeout, ), - Err(errno) => Ok(internal_error_from_errno("fork", errno)), + Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))), } } @@ -483,11 +514,11 @@ fn handle_parent_process( job_pid: Pid, usage_before: Usage, timeout: Duration, -) -> io::Result { +) -> io::Result> { // the read end will wait until all write ends have been closed, // this drop is necessary to avoid deadlock if let Err(errno) = nix::unistd::close(pipe_write_fd) { - return Ok(internal_error_from_errno("closing pipe write fd", errno)); + return Ok(Err(internal_error_from_errno("closing pipe write fd", errno))); }; // SAFETY: pipe_read_fd is an open and owned file descriptor at this point. @@ -512,7 +543,7 @@ fn handle_parent_process( let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { Ok(usage) => usage, - Err(errno) => return Ok(internal_error_from_errno("getrusage after", errno)), + Err(errno) => return Ok(Err(internal_error_from_errno("getrusage after", errno))), }; // Using `getrusage` is needed to check whether child has timedout since we cannot rely on @@ -530,32 +561,25 @@ fn handle_parent_process( cpu_tv.as_millis(), timeout.as_millis(), ); - return Ok(WorkerResponse::JobTimedOut) + return Ok(Err(WorkerError::JobTimedOut)) } match status { Ok(WaitStatus::Exited(_, exit_status)) => { let mut reader = io::BufReader::new(received_data.as_slice()); - let result = match recv_child_response(&mut reader) { - Ok(result) => result, - Err(err) => return Ok(WorkerResponse::JobError(err.to_string())), - }; + let result = recv_child_response(&mut reader, "execute")?; match result { - Ok(JobResponse::Ok { result_descriptor }) => { + Ok(job_response) => { // The exit status should have been zero if no error occurred. if exit_status != 0 { - return Ok(WorkerResponse::JobError(format!( - "unexpected exit status: {}", - exit_status - ))) + return Ok(Err(WorkerError::JobError(JobError::UnexpectedExitStatus( + exit_status, + )))); } - Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv }) + Ok(Ok(WorkerResponse { job_response, duration: cpu_tv })) }, - Ok(JobResponse::InvalidCandidate(err)) => Ok(WorkerResponse::InvalidCandidate(err)), - Ok(JobResponse::RuntimeConstruction(err)) => - Ok(WorkerResponse::RuntimeConstruction(err)), Err(job_error) => { gum::warn!( target: LOG_TARGET, @@ -565,9 +589,9 @@ fn handle_parent_process( job_error, ); if matches!(job_error, JobError::TimedOut) { - Ok(WorkerResponse::JobTimedOut) + Ok(Err(WorkerError::JobTimedOut)) } else { - Ok(WorkerResponse::JobError(job_error.to_string())) + Ok(Err(WorkerError::JobError(job_error.into()))) } }, } @@ -576,50 +600,21 @@ fn handle_parent_process( // // The job gets SIGSYS on seccomp violations, but this signal may have been sent for some // other reason, so we still need to check for seccomp violations elsewhere. - Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Ok(WorkerResponse::JobDied { + Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Ok(Err(WorkerError::JobDied { err: format!("received signal: {signal:?}"), job_pid: job_pid.as_raw(), - }), - Err(errno) => Ok(internal_error_from_errno("waitpid", errno)), + })), + Err(errno) => Ok(Err(internal_error_from_errno("waitpid", errno))), // It is within an attacker's power to send an unexpected exit status. So we cannot treat // this as an internal error (which would make us abstain), but must vote against. - Ok(unexpected_wait_status) => Ok(WorkerResponse::JobDied { + Ok(unexpected_wait_status) => Ok(Err(WorkerError::JobDied { err: format!("unexpected status from wait: {unexpected_wait_status:?}"), job_pid: job_pid.as_raw(), - }), + })), } } -/// Calculate the total CPU time from the given `usage` structure, returned from -/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user -/// and system time. -/// -/// # Arguments -/// -/// - `rusage`: Contains resource usage information. -/// -/// # Returns -/// -/// Returns a `Duration` representing the total CPU time. -fn get_total_cpu_usage(rusage: Usage) -> Duration { - let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) + - (rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64; - - return Duration::from_micros(micros) -} - -/// Get a job response. -fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result { - let response_bytes = framed_recv_blocking(received_data)?; - JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("execute pvf recv_child_response: decode error: {}", e), - ) - }) -} - /// Write a job response to the pipe and exit process after. /// /// # Arguments @@ -638,15 +633,10 @@ fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! { } } -fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerResponse { - WorkerResponse::InternalError(InternalValidationError::Kernel(format!( - "{}: {}: {}", - context, - errno, - io::Error::last_os_error() - ))) +fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerError { + WorkerError::InternalError(InternalValidationError::Kernel(stringify_errno(context, errno))) } fn job_error_from_errno(context: &'static str, errno: Errno) -> JobResult { - Err(JobError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error()))) + Err(JobError::Kernel(stringify_errno(context, errno))) } diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index 82a56107ef53..d1b218f48ae8 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -26,7 +26,6 @@ const LOG_TARGET: &str = "parachain::pvf-prepare-worker"; use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread}; #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; -use libc; use nix::{ errno::Errno, sys::{ @@ -48,7 +47,8 @@ use polkadot_node_core_pvf_common::{ prepare::{MemoryStats, PrepareJobKind, PrepareStats, PrepareWorkerSuccess}, pvf::PvfPrepData, worker::{ - cpu_time_monitor_loop, run_worker, stringify_panic_payload, + cpu_time_monitor_loop, get_total_cpu_usage, recv_child_response, run_worker, send_result, + stringify_errno, stringify_panic_payload, thread::{self, spawn_worker_thread, WaitOutcome}, WorkerKind, }, @@ -117,11 +117,6 @@ fn recv_request(stream: &mut UnixStream) -> io::Result { Ok(pvf) } -/// Send a worker response. -fn send_response(stream: &mut UnixStream, result: PrepareWorkerResult) -> io::Result<()> { - framed_send_blocking(stream, &result.encode()) -} - fn start_memory_tracking(fd: RawFd, limit: Option) { unsafe { // SAFETY: Inside the failure handler, the allocator is locked and no allocations or @@ -178,8 +173,6 @@ fn end_memory_tracking() -> isize { /// /// - `worker_version`: see above /// -/// - `security_status`: contains the detected status of security features. -/// /// # Flow /// /// This runs the following in a loop: @@ -233,8 +226,9 @@ pub fn worker_entrypoint( let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { Ok(usage) => usage, Err(errno) => { - let result = Err(error_from_errno("getrusage before", errno)); - send_response(&mut stream, result)?; + let result: PrepareWorkerResult = + Err(error_from_errno("getrusage before", errno)); + send_result(&mut stream, result, worker_info)?; continue }, }; @@ -294,7 +288,7 @@ pub fn worker_entrypoint( "worker: sending result to host: {:?}", result ); - send_response(&mut stream, result)?; + send_result(&mut stream, result, worker_info)?; } }, ); @@ -666,7 +660,7 @@ fn handle_parent_process( match status { Ok(WaitStatus::Exited(_pid, exit_status)) => { let mut reader = io::BufReader::new(received_data.as_slice()); - let result = recv_child_response(&mut reader) + let result = recv_child_response(&mut reader, "prepare") .map_err(|err| PrepareError::JobError(err.to_string()))?; match result { @@ -726,35 +720,6 @@ fn handle_parent_process( } } -/// Calculate the total CPU time from the given `usage` structure, returned from -/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user -/// and system time. -/// -/// # Arguments -/// -/// - `rusage`: Contains resource usage information. -/// -/// # Returns -/// -/// Returns a `Duration` representing the total CPU time. -fn get_total_cpu_usage(rusage: Usage) -> Duration { - let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) + - (rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64; - - return Duration::from_micros(micros) -} - -/// Get a job response. -fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result { - let response_bytes = framed_recv_blocking(received_data)?; - JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("prepare pvf recv_child_response: decode error: {:?}", e), - ) - }) -} - /// Write a job response to the pipe and exit process after. /// /// # Arguments @@ -774,7 +739,7 @@ fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! { } fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError { - PrepareError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error())) + PrepareError::Kernel(stringify_errno(context, errno)) } type JobResult = Result; diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index bdc3c7327b06..af147a2ba227 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -16,7 +16,7 @@ //! A queue that handles requests for PVF execution. -use super::worker_interface::Outcome; +use super::worker_interface::{Error as WorkerInterfaceError, Response as WorkerInterfaceResponse}; use crate::{ artifacts::{ArtifactId, ArtifactPathId}, host::ResultSender, @@ -30,7 +30,10 @@ use futures::{ stream::{FuturesUnordered, StreamExt as _}, Future, FutureExt, }; -use polkadot_node_core_pvf_common::SecurityStatus; +use polkadot_node_core_pvf_common::{ + execute::{JobResponse, WorkerError, WorkerResponse}, + SecurityStatus, +}; use polkadot_primitives::{ExecutorParams, ExecutorParamsHash}; use slotmap::HopSlotMap; use std::{ @@ -133,7 +136,12 @@ impl Workers { enum QueueEvent { Spawn(IdleWorker, WorkerHandle, ExecuteJob), - StartWork(Worker, Outcome, ArtifactId, ResultSender), + StartWork( + Worker, + Result, + ArtifactId, + ResultSender, + ), } type Mux = FuturesUnordered>; @@ -340,23 +348,34 @@ fn handle_worker_spawned( async fn handle_job_finish( queue: &mut Queue, worker: Worker, - outcome: Outcome, + worker_result: Result, artifact_id: ArtifactId, result_tx: ResultSender, ) { - let (idle_worker, result, duration, sync_channel) = match outcome { - Outcome::Ok { result_descriptor, duration, idle_worker } => { + let (idle_worker, result, duration, sync_channel) = match worker_result { + Ok(WorkerInterfaceResponse { + worker_response: + WorkerResponse { job_response: JobResponse::Ok { result_descriptor }, duration }, + idle_worker, + }) => { // TODO: propagate the soft timeout (Some(idle_worker), Ok(result_descriptor), Some(duration), None) }, - Outcome::InvalidCandidate { err, idle_worker } => ( + Ok(WorkerInterfaceResponse { + worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. }, + idle_worker, + }) => ( Some(idle_worker), Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))), None, None, ), - Outcome::RuntimeConstruction { err, idle_worker } => { + Ok(WorkerInterfaceResponse { + worker_response: + WorkerResponse { job_response: JobResponse::RuntimeConstruction(err), .. }, + idle_worker, + }) => { // The task for artifact removal is executed concurrently with // the message to the host on the execution result. let (result_tx, result_rx) = oneshot::channel(); @@ -376,27 +395,31 @@ async fn handle_job_finish( Some(result_rx), ) }, - Outcome::InternalError { err } => (None, Err(ValidationError::Internal(err)), None, None), + + Err(WorkerInterfaceError::InternalError(err)) | + Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) => + (None, Err(ValidationError::Internal(err)), None, None), // Either the worker or the job timed out. Kill the worker in either case. Treated as // definitely-invalid, because if we timed out, there's no time left for a retry. - Outcome::HardTimeout => + Err(WorkerInterfaceError::HardTimeout) | + Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) => (None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None), // "Maybe invalid" errors (will retry). - Outcome::WorkerIntfErr => ( + Err(WorkerInterfaceError::CommunicationErr(_err)) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)), None, None, ), - Outcome::JobDied { err } => ( + Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))), None, None, ), - Outcome::JobError { err } => ( + Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => ( None, - Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))), + Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))), None, None, ), @@ -543,14 +566,14 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { queue.mux.push( async move { let _timer = execution_timer; - let outcome = super::worker_interface::start_work( + let result = super::worker_interface::start_work( idle, job.artifact.clone(), job.exec_timeout, job.params, ) .await; - QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx) + QueueEvent::StartWork(worker, result, job.artifact.id, job.result_tx) } .boxed(), ); diff --git a/polkadot/node/core/pvf/src/execute/worker_interface.rs b/polkadot/node/core/pvf/src/execute/worker_interface.rs index db81da118d7b..9dcadfb4c2a7 100644 --- a/polkadot/node/core/pvf/src/execute/worker_interface.rs +++ b/polkadot/node/core/pvf/src/execute/worker_interface.rs @@ -29,10 +29,9 @@ use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::InternalValidationError, - execute::{Handshake, WorkerResponse}, + execute::{Handshake, WorkerError, WorkerResponse}, worker_dir, SecurityStatus, }; -use polkadot_parachain_primitives::primitives::ValidationResult; use polkadot_primitives::ExecutorParams; use std::{path::Path, time::Duration}; use tokio::{io, net::UnixStream}; @@ -69,7 +68,8 @@ pub async fn spawn( gum::warn!( target: LOG_TARGET, worker_pid = %idle_worker.pid, - %err + "failed to send a handshake to the spawned worker: {}", + error ); err })?; @@ -78,39 +78,40 @@ pub async fn spawn( /// Outcome of PVF execution. /// -/// If the idle worker token is not returned, it means the worker must be terminated. -pub enum Outcome { - /// PVF execution completed successfully and the result is returned. The worker is ready for - /// another job. - Ok { result_descriptor: ValidationResult, duration: Duration, idle_worker: IdleWorker }, - /// The candidate validation failed. It may be for example because the wasm execution triggered - /// a trap. Errors related to the preparation process are not expected to be encountered by the - /// execution workers. - InvalidCandidate { err: String, idle_worker: IdleWorker }, - /// The error is probably transient. It may be for example - /// because the artifact was prepared with a Wasmtime version different from the version - /// in the current execution environment. - RuntimeConstruction { err: String, idle_worker: IdleWorker }, +/// PVF execution completed and the result is returned. The worker is ready for +/// another job. +pub struct Response { + /// The response (valid/invalid) from the worker. + pub worker_response: WorkerResponse, + /// Returning the idle worker token means the worker can be reused. + pub idle_worker: IdleWorker, +} +/// The idle worker token is not returned for any of these cases, meaning the worker must be +/// terminated. +/// +/// NOTE: Errors related to the preparation process are not expected to be encountered by the +/// execution workers. +#[derive(thiserror::Error, Debug)] +pub enum Error { /// The execution time exceeded the hard limit. The worker is terminated. + #[error("The communication with the worker exceeded the hard limit")] HardTimeout, /// An I/O error happened during communication with the worker. This may mean that the worker /// process already died. The token is not returned in any case. - WorkerIntfErr, - /// The job process has died. We must kill the worker just in case. - /// - /// We cannot treat this as an internal error because malicious code may have caused this. - JobDied { err: String }, - /// An unexpected error occurred in the job process. - /// - /// Because malicious code can cause a job error, we must not treat it as an internal error. - JobError { err: String }, + #[error("An I/O error happened during communication with the worker: {0}")] + CommunicationErr(#[from] io::Error), + /// The worker reported an error (can be from itself or from the job). The worker should not be + /// reused. + #[error("The worker reported an error: {0}")] + WorkerError(#[from] WorkerError), /// An internal error happened during the validation. Such an error is most likely related to /// some transient glitch. /// /// Should only ever be used for errors independent of the candidate and PVF. Therefore it may /// be a problem with the worker, so we terminate it. - InternalError { err: InternalValidationError }, + #[error("An internal error occurred: {0}")] + InternalError(#[from] InternalValidationError), } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -123,7 +124,7 @@ pub async fn start_work( artifact: ArtifactPathId, execution_timeout: Duration, validation_params: Vec, -) -> Outcome { +) -> Result { let IdleWorker { mut stream, pid, worker_dir } = worker; gum::debug!( @@ -136,16 +137,18 @@ pub async fn start_work( ); with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move { - if let Err(error) = send_request(&mut stream, &validation_params, execution_timeout).await { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - validation_code_hash = ?artifact.id.code_hash, - ?error, - "failed to send an execute request", - ); - return Outcome::WorkerIntfErr - } + send_request(&mut stream, &validation_params, execution_timeout).await.map_err( + |error| { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + validation_code_hash = ?artifact.id.code_hash, + "failed to send an execute request: {}", + error, + ); + Error::InternalError(InternalValidationError::HostCommunication(error.to_string())) + }, + )?; // We use a generous timeout here. This is in addition to the one in the child process, in // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout @@ -153,12 +156,12 @@ pub async fn start_work( // load, but the CPU resources of the child can only be measured from the parent after the // child process terminates. let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; - let response = futures::select! { - response = recv_response(&mut stream).fuse() => { - match response { - Ok(response) => - handle_response( - response, + let worker_result = futures::select! { + worker_result = recv_result(&mut stream).fuse() => { + match worker_result { + Ok(result) => + handle_result( + result, pid, execution_timeout, ) @@ -168,11 +171,11 @@ pub async fn start_work( target: LOG_TARGET, worker_pid = %pid, validation_code_hash = ?artifact.id.code_hash, - ?error, - "failed to recv an execute response", + "failed to recv an execute result: {}", + error, ); - return Outcome::WorkerIntfErr + return Err(Error::CommunicationErr(error)) }, } }, @@ -183,29 +186,16 @@ pub async fn start_work( validation_code_hash = ?artifact.id.code_hash, "execution worker exceeded lenient timeout for execution, child worker likely stalled", ); - WorkerResponse::JobTimedOut + return Err(Error::HardTimeout) }, }; - match response { - WorkerResponse::Ok { result_descriptor, duration } => Outcome::Ok { - result_descriptor, - duration, - idle_worker: IdleWorker { stream, pid, worker_dir }, - }, - WorkerResponse::InvalidCandidate(err) => Outcome::InvalidCandidate { - err, - idle_worker: IdleWorker { stream, pid, worker_dir }, - }, - WorkerResponse::RuntimeConstruction(err) => Outcome::RuntimeConstruction { - err, + match worker_result { + Ok(worker_response) => Ok(Response { + worker_response, idle_worker: IdleWorker { stream, pid, worker_dir }, - }, - WorkerResponse::JobTimedOut => Outcome::HardTimeout, - WorkerResponse::JobDied { err, job_pid: _ } => Outcome::JobDied { err }, - WorkerResponse::JobError(err) => Outcome::JobError { err }, - - WorkerResponse::InternalError(err) => Outcome::InternalError { err }, + }), + Err(worker_error) => Err(worker_error.into()), } }) .await @@ -215,12 +205,12 @@ pub async fn start_work( /// /// Here we know the artifact exists, but is still located in a temporary file which will be cleared /// by [`with_worker_dir_setup`]. -async fn handle_response( - response: WorkerResponse, +async fn handle_result( + worker_result: Result, worker_pid: u32, execution_timeout: Duration, -) -> WorkerResponse { - if let WorkerResponse::Ok { duration, .. } = response { +) -> Result { + if let Ok(WorkerResponse { duration, .. }) = worker_result { if duration > execution_timeout { // The job didn't complete within the timeout. gum::warn!( @@ -232,11 +222,11 @@ async fn handle_response( ); // Return a timeout error. - return WorkerResponse::JobTimedOut + return Err(WorkerError::JobTimedOut) } } - response + worker_result } /// Create a temporary file for an artifact in the worker cache, execute the given future/closure @@ -249,9 +239,9 @@ async fn with_worker_dir_setup( pid: u32, artifact_path: &Path, f: F, -) -> Outcome +) -> Result where - Fut: futures::Future, + Fut: futures::Future>, F: FnOnce(WorkerDir) -> Fut, { // Cheaply create a hard link to the artifact. The artifact is always at a known location in the @@ -263,16 +253,14 @@ where target: LOG_TARGET, worker_pid = %pid, ?worker_dir, - "failed to clear worker cache after the job: {:?}", + "failed to clear worker cache after the job: {}", err, ); - return Outcome::InternalError { - err: InternalValidationError::CouldNotCreateLink(format!("{:?}", err)), - } + return Err(InternalValidationError::CouldNotCreateLink(format!("{:?}", err)).into()); } let worker_dir_path = worker_dir.path().to_owned(); - let outcome = f(worker_dir).await; + let result = f(worker_dir).await; // Try to clear the worker dir. if let Err(err) = clear_worker_dir_path(&worker_dir_path) { @@ -283,15 +271,14 @@ where "failed to clear worker cache after the job: {:?}", err, ); - return Outcome::InternalError { - err: InternalValidationError::CouldNotClearWorkerDir { - err: format!("{:?}", err), - path: worker_dir_path.to_str().map(String::from), - }, + return Err(InternalValidationError::CouldNotClearWorkerDir { + err: format!("{:?}", err), + path: worker_dir_path.to_str().map(String::from), } + .into()) } - outcome + result } /// Sends a handshake with information specific to the execute worker. @@ -308,12 +295,12 @@ async fn send_request( framed_send(stream, &execution_timeout.encode()).await } -async fn recv_response(stream: &mut UnixStream) -> io::Result { - let response_bytes = framed_recv(stream).await?; - WorkerResponse::decode(&mut response_bytes.as_slice()).map_err(|e| { +async fn recv_result(stream: &mut UnixStream) -> io::Result> { + let result_bytes = framed_recv(stream).await?; + Result::::decode(&mut result_bytes.as_slice()).map_err(|e| { io::Error::new( io::ErrorKind::Other, - format!("execute pvf recv_response: decode error: {:?}", e), + format!("execute pvf recv_result: decode error: {:?}", e), ) }) } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 247d753d7c44..2d180fc59295 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -959,10 +959,7 @@ pub(crate) mod tests { use crate::{artifacts::generate_artifact_path, PossiblyInvalidError}; use assert_matches::assert_matches; use futures::future::BoxFuture; - use polkadot_node_core_pvf_common::{ - error::PrepareError, - prepare::{PrepareStats, PrepareSuccess}, - }; + use polkadot_node_core_pvf_common::prepare::PrepareStats; const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);