Skip to content

Commit

Permalink
Show registered process name in trap warning messages (lunatic-soluti…
Browse files Browse the repository at this point in the history
…ons#187)

* Show registered process name in trap warning messages

* Show all registered names for process when it fails

* Ignore failing doc test

Ignore failing doc test

* Remove spawn doc example
  • Loading branch information
tqwewe authored Feb 7, 2023
1 parent 07454fb commit d428e93
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/lunatic-distributed/src/distributed/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub async fn node_server<T, E>(
key: String,
) -> Result<()>
where
T: ProcessState + ResourceLimiter + DistributedCtx<E> + Send + 'static,
T: ProcessState + ResourceLimiter + DistributedCtx<E> + Send + Sync + 'static,
E: Environment + 'static,
{
let mut quic_server = quic::new_quic_server(socket, &cert, &key, &ca_cert)?;
Expand All @@ -83,7 +83,7 @@ pub async fn handle_message<T, E>(
msg_id: u64,
msg: Request,
) where
T: ProcessState + DistributedCtx<E> + ResourceLimiter + Send + 'static,
T: ProcessState + DistributedCtx<E> + ResourceLimiter + Send + Sync + 'static,
E: Environment + 'static,
{
if let Err(e) = handle_message_err(ctx, send, msg_id, msg).await {
Expand All @@ -98,7 +98,7 @@ async fn handle_message_err<T, E>(
msg: Request,
) -> Result<()>
where
T: ProcessState + DistributedCtx<E> + ResourceLimiter + Send + 'static,
T: ProcessState + DistributedCtx<E> + ResourceLimiter + Send + Sync + 'static,
E: Environment + 'static,
{
match msg {
Expand Down Expand Up @@ -143,7 +143,7 @@ where

async fn handle_spawn<T, E>(ctx: ServerCtx<T, E>, spawn: Spawn) -> Result<Result<u64, ClientError>>
where
T: ProcessState + DistributedCtx<E> + ResourceLimiter + Send + 'static,
T: ProcessState + DistributedCtx<E> + ResourceLimiter + Send + Sync + 'static,
E: Environment + 'static,
{
let Spawn {
Expand Down
6 changes: 3 additions & 3 deletions crates/lunatic-distributed/src/quic/quin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub async fn handle_node_server<T, E>(
ctx: distributed::server::ServerCtx<T, E>,
) -> Result<()>
where
T: ProcessState + ResourceLimiter + DistributedCtx<E> + Send + 'static,
T: ProcessState + ResourceLimiter + DistributedCtx<E> + Send + Sync + 'static,
E: Environment + 'static,
{
while let Some(conn) = quic_server.accept().await {
Expand All @@ -160,7 +160,7 @@ async fn handle_quic_connection_node<T, E>(
conn: Connecting,
) -> Result<()>
where
T: ProcessState + ResourceLimiter + DistributedCtx<E> + Send + 'static,
T: ProcessState + ResourceLimiter + DistributedCtx<E> + Send + Sync + 'static,
E: Environment + 'static,
{
log::info!("New node connection");
Expand Down Expand Up @@ -192,7 +192,7 @@ async fn handle_quic_stream_node<T, E>(
mut send: SendStream,
mut recv: RecvStream,
) where
T: ProcessState + ResourceLimiter + DistributedCtx<E> + Send + 'static,
T: ProcessState + ResourceLimiter + DistributedCtx<E> + Send + Sync + 'static,
E: Environment + 'static,
{
while let Ok(bytes) = recv.receive().await {
Expand Down
18 changes: 16 additions & 2 deletions crates/lunatic-process-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ pub trait ProcessCtx<S: ProcessState> {
// Register the process APIs to the linker
pub fn register<T>(linker: &mut Linker<T>) -> Result<()>
where
T: ProcessState + ProcessCtx<T> + ErrorCtx + LunaticWasiCtx + Send + ResourceLimiter + 'static,
T: ProcessState
+ ProcessCtx<T>
+ ErrorCtx
+ LunaticWasiCtx
+ Send
+ Sync
+ ResourceLimiter
+ 'static,
for<'a> &'a T: Send,
T::Config: ProcessConfigCtx,
{
Expand Down Expand Up @@ -534,7 +541,14 @@ fn spawn<T>(
id_ptr: u32,
) -> Box<dyn Future<Output = Result<u32>> + Send + '_>
where
T: ProcessState + ProcessCtx<T> + ErrorCtx + LunaticWasiCtx + ResourceLimiter + Send + 'static,
T: ProcessState
+ ProcessCtx<T>
+ ErrorCtx
+ LunaticWasiCtx
+ ResourceLimiter
+ Send
+ Sync
+ 'static,
for<'a> &'a T: Send,
T::Config: ProcessConfigCtx,
{
Expand Down
1 change: 1 addition & 0 deletions crates/lunatic-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dashmap = { workspace = true }
log = { workspace = true }
metrics = { workspace = true, optional = true }
serde = { workspace = true }
smallvec = "1.10"
tokio = { workspace = true, features = [
"macros",
"rt-multi-thread",
Expand Down
82 changes: 62 additions & 20 deletions crates/lunatic-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use anyhow::{anyhow, Result};
use env::Environment;
use log::{debug, log_enabled, trace, warn, Level};

use smallvec::SmallVec;
use state::ProcessState;
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -211,6 +213,46 @@ impl Process for WasmProcess {
}
}

/// Enum containing a process name if available, otherwise its ID.
enum NameOrID<'a> {
Names(SmallVec<[&'a str; 2]>),
ID(u64),
}

impl<'a> NameOrID<'a> {
/// Returns names, otherwise id if names is empty.
fn or_id(self, id: u64) -> Self {
match self {
NameOrID::Names(ref names) if !names.is_empty() => self,
_ => NameOrID::ID(id),
}
}
}

impl<'a> std::fmt::Display for NameOrID<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NameOrID::Names(names) => {
for (i, name) in names.iter().enumerate() {
if i > 0 {
write!(f, " / ")?;
}
write!(f, "'{name}'")?;
}
Ok(())
}
NameOrID::ID(id) => write!(f, "{id}"),
}
}
}

impl<'a> FromIterator<&'a str> for NameOrID<'a> {
fn from_iter<T: IntoIterator<Item = &'a str>>(iter: T) -> Self {
let names = SmallVec::from_iter(iter);
NameOrID::Names(names)
}
}

/// Turns a `Future` into a process, enabling signals (e.g. kill).
///
/// This function represents the core execution loop of lunatic processes:
Expand All @@ -235,6 +277,7 @@ pub(crate) async fn new<F, S, R>(
message_mailbox: MessageMailbox,
) -> Result<S>
where
S: ProcessState,
R: Into<ExecutionResult<S>>,
F: Future<Output = R> + Send + 'static,
{
Expand Down Expand Up @@ -339,11 +382,19 @@ where

match result {
Finished::Normal(result) => {
let result = result.into();
let result: ExecutionResult<_> = result.into();

if let Some(failure) = result.failure() {
let registry = result.state().registry().read().await;
let name = registry
.iter()
.filter(|(_, (_, process_id))| process_id == &id)
.map(|(name, _)| name.splitn(4, '/').last().unwrap_or(name.as_str()))
.collect::<NameOrID>()
.or_id(id);
warn!(
"Process {} failed, notifying: {} links {}",
id,
name,
links.len(),
// If the log level is WARN instruct user how to display the stacktrace
if !log_enabled!(Level::Debug) {
Expand All @@ -363,7 +414,7 @@ where
links.iter().for_each(|(_, (proc, tag))| {
proc.send(Signal::LinkDied(id, *tag, DeathReason::Normal));
});
Ok(result.state())
Ok(result.into_state())
}
}
Finished::KillSignal => {
Expand All @@ -389,27 +440,13 @@ pub struct NativeProcess {
}

/// Spawns a process from a closure.
///
/// ## Example:
///
/// ```no_run
/// use std::sync::Arc;
/// let env = Arc::new(lunatic_process::env::LunaticEnvironment::new(1));
/// let _proc = lunatic_process::spawn(env, |_this, mailbox| async move {
/// // Wait on a message with the tag `27`.
/// mailbox.pop(Some(&[27])).await;
/// // TODO: Needs to return ExecutionResult. Probably the `new` function will need to be adjusted
/// Ok(())
/// });
/// ```
pub fn spawn<T, F, K, R>(
env: Arc<dyn Environment>,
func: F,
) -> (JoinHandle<Result<T>>, NativeProcess)
where
T: Send + 'static,
R: Into<ExecutionResult<T>> + 'static,
T: ProcessState + Send + Sync + 'static,
R: Into<ExecutionResult<T>> + Send + 'static,
K: Future<Output = R> + Send + 'static,
F: FnOnce(NativeProcess, MessageMailbox) -> K,
{
Expand Down Expand Up @@ -468,8 +505,13 @@ impl<T> ExecutionResult<T> {
}
}

// Returns the process state reference
pub fn state(&self) -> &T {
&self.state
}

// Returns the process state
pub fn state(self) -> T {
pub fn into_state(self) -> T {
self.state
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/lunatic-process/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn spawn_wasm<S>(
link: Option<(Option<i64>, Arc<dyn Process>)>,
) -> Result<(JoinHandle<Result<S>>, Arc<dyn Process>)>
where
S: ProcessState + Send + ResourceLimiter + 'static,
S: ProcessState + Send + Sync + ResourceLimiter + 'static,
{
let id = state.id();
trace!("Spawning process: {}", id);
Expand Down

0 comments on commit d428e93

Please sign in to comment.