diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index 51a8f41a..679c21ee 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor" -version = "0.7.1" +version = "0.7.2" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "A actor framework for Rust" documentation = "https://docs.rs/ractor" diff --git a/ractor/src/actor/mod.rs b/ractor/src/actor/mod.rs index 1bff6fc7..b3efbaee 100644 --- a/ractor/src/actor/mod.rs +++ b/ractor/src/actor/mod.rs @@ -271,6 +271,85 @@ where actor.start(ports, startup_args, Some(supervisor)).await } + /// Spawn an actor instantly, not waiting on the actor's `pre_start` routine. This is helpful + /// for actors where you want access to the send messages into the actor's message queue + /// without waiting on an asynchronous context. + /// + /// **WARNING** Failures in the pre_start routine need to be waited on in the join handle + /// since they will NOT fail the spawn operation in this context + /// + /// * `name`: A name to give the actor. Useful for global referencing or debug printing + /// * `handler` The [Actor] defining the logic for this actor + /// * `startup_args`: Arguements passed to the `pre_start` call of the [Actor] to facilitate startup and + /// initial state creation + /// + /// Returns a [Ok((ActorRef, JoinHandle, SpawnErr>>))] upon successful creation of the + /// message queues, so you can begin sending messages. However the associated [JoinHandle] contains the inner + /// information around if the actor successfully started or not in it's `pre_start` routine. Returns [Err(SpawnErr)] if + /// the actor name is already allocated + #[allow(clippy::type_complexity)] + pub fn spawn_instant( + name: Option, + handler: TActor, + startup_args: TActor::Arguments, + ) -> Result< + ( + ActorRef, + JoinHandle, SpawnErr>>, + ), + SpawnErr, + > { + let (actor, ports) = Self::new(name, handler)?; + let actor_ref = actor.actor_ref.clone(); + let join_op = crate::concurrency::spawn(async move { + let (_, handle) = actor.start(ports, startup_args, None).await?; + Ok(handle) + }); + Ok((actor_ref, join_op)) + } + + /// Spawn an actor instantly with supervision, not waiting on the actor's `pre_start` routine. + /// This is helpful for actors where you want access to the send messages into the actor's + /// message queue without waiting on an asynchronous context. + /// + /// **WARNING** Failures in the pre_start routine need to be waited on in the join handle + /// since they will NOT fail the spawn operation in this context. Additionally the supervision + /// tree will **NOT** be linked until the `pre_start` completes so there is a chance an actor + /// is lost during `pre_start` and not successfully started unless it's specifically handled + /// by the caller by awaiting later. + /// + /// * `name`: A name to give the actor. Useful for global referencing or debug printing + /// * `handler` The [Actor] defining the logic for this actor + /// * `startup_args`: Arguements passed to the `pre_start` call of the [Actor] to facilitate startup and + /// initial state creation + /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor + /// + /// Returns a [Ok((ActorRef, JoinHandle, SpawnErr>>))] upon successful creation of the + /// message queues, so you can begin sending messages. However the associated [JoinHandle] contains the inner + /// information around if the actor successfully started or not in it's `pre_start` routine. Returns [Err(SpawnErr)] if + /// the actor name is already allocated + #[allow(clippy::type_complexity)] + pub fn spawn_linked_instant( + name: Option, + handler: TActor, + startup_args: TActor::Arguments, + supervisor: ActorCell, + ) -> Result< + ( + ActorRef, + JoinHandle, SpawnErr>>, + ), + SpawnErr, + > { + let (actor, ports) = Self::new(name, handler)?; + let actor_ref = actor.actor_ref.clone(); + let join_op = crate::concurrency::spawn(async move { + let (_, handle) = actor.start(ports, startup_args, Some(supervisor)).await?; + Ok(handle) + }); + Ok((actor_ref, join_op)) + } + /// Spawn a REMOTE actor with a supervisor, automatically starting the actor. Only for use /// by `ractor_cluster::node::NodeSession` /// diff --git a/ractor/src/actor/tests/mod.rs b/ractor/src/actor/tests/mod.rs index 89c000f3..5d74db04 100644 --- a/ractor/src/actor/tests/mod.rs +++ b/ractor/src/actor/tests/mod.rs @@ -759,3 +759,60 @@ async fn spawning_local_actor_as_remote_fails() { actor.stop(None); handle.await.expect("Failed to clean stop the actor"); } + +#[crate::concurrency::test] +async fn instant_spawns() { + let counter = Arc::new(AtomicU8::new(0)); + + struct EmptyActor; + #[async_trait::async_trait] + impl Actor for EmptyActor { + type Msg = String; + type State = Arc; + type Arguments = Arc; + async fn pre_start( + &self, + _this_actor: crate::ActorRef, + counter: Arc, + ) -> Result { + // delay startup by some amount + crate::concurrency::sleep(Duration::from_millis(200)).await; + Ok(counter) + } + + async fn handle( + &self, + _this_actor: crate::ActorRef, + _message: String, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + state.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + } + + let (actor, handles) = crate::ActorRuntime::spawn_instant(None, EmptyActor, counter.clone()) + .expect("Failed to instant spawn"); + + for i in 0..10 { + actor + .cast(format!("I = {i}")) + .expect("Actor couldn't receive message!"); + } + + // actor is still starting up + assert_eq!(0, counter.load(Ordering::Relaxed)); + + crate::concurrency::sleep(Duration::from_millis(250)).await; + // actor is started now and processing messages + assert_eq!(10, counter.load(Ordering::Relaxed)); + + // Cleanup + actor.stop(None); + handles + .await + .unwrap() + .expect("Actor's pre_start routine panicked") + .await + .unwrap(); +} diff --git a/ractor/src/actor/tests/supervisor.rs b/ractor/src/actor/tests/supervisor.rs index b8d81986..0533701c 100644 --- a/ractor/src/actor/tests/supervisor.rs +++ b/ractor/src/actor/tests/supervisor.rs @@ -6,7 +6,7 @@ //! Supervisor tests use std::sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicU64, AtomicU8, Ordering}, Arc, }; @@ -14,9 +14,6 @@ use crate::{concurrency::Duration, ActorProcessingErr}; use crate::{Actor, ActorCell, ActorRef, ActorStatus, SupervisionEvent}; -#[cfg(feature = "cluster")] -impl crate::Message for () {} - #[crate::concurrency::test] async fn test_supervision_panic_in_post_startup() { struct Child; @@ -897,5 +894,95 @@ async fn test_killing_a_supervisor_terminates_children() { c_handle.await.expect("Failed to wait for child to die"); } +#[crate::concurrency::test] +async fn instant_supervised_spawns() { + let counter = Arc::new(AtomicU8::new(0)); + + struct EmptySupervisor; + #[async_trait::async_trait] + impl Actor for EmptySupervisor { + type Msg = (); + type State = (); + type Arguments = (); + async fn pre_start(&self, _: ActorRef, _: ()) -> Result<(), ActorProcessingErr> { + Ok(()) + } + + async fn handle_supervisor_evt( + &self, + _: ActorRef, + _: SupervisionEvent, + _: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + Err(From::from( + "Supervision event received when it shouldn't have been!", + )) + } + } + + struct EmptyActor; + #[async_trait::async_trait] + impl Actor for EmptyActor { + type Msg = String; + type State = Arc; + type Arguments = Arc; + async fn pre_start( + &self, + _this_actor: crate::ActorRef, + counter: Arc, + ) -> Result { + // delay startup by some amount + crate::concurrency::sleep(Duration::from_millis(200)).await; + Ok(counter) + } + + async fn handle( + &self, + _this_actor: crate::ActorRef, + _message: String, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + state.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + } + + let (supervisor, shandle) = Actor::spawn(None, EmptySupervisor, ()) + .await + .expect("Failed to startup supervisor"); + let (actor, handles) = crate::ActorRuntime::spawn_linked_instant( + None, + EmptyActor, + counter.clone(), + supervisor.get_cell(), + ) + .expect("Failed to instant spawn"); + + for i in 0..10 { + actor + .cast(format!("I = {i}")) + .expect("Actor couldn't receive message!"); + } + + // actor is still starting up + assert_eq!(0, counter.load(Ordering::Relaxed)); + + crate::concurrency::sleep(Duration::from_millis(250)).await; + // actor is started now and processing messages + assert_eq!(10, counter.load(Ordering::Relaxed)); + + // Cleanup + supervisor.stop(None); + shandle.await.unwrap(); + + actor.stop(None); + handles + .await + .unwrap() + .expect("Actor's pre_start routine panicked") + .await + .unwrap(); +} + // TODO: Still to be tested // 1. terminate_children_after() diff --git a/ractor/src/factory/tests.rs b/ractor/src/factory/tests.rs index 947ef982..f324a4b4 100644 --- a/ractor/src/factory/tests.rs +++ b/ractor/src/factory/tests.rs @@ -25,8 +25,6 @@ struct TestKey { id: u64, } #[cfg(feature = "cluster")] -impl crate::Message for TestKey {} -#[cfg(feature = "cluster")] impl crate::BytesConvertable for TestKey { fn from_bytes(bytes: Vec) -> Self { Self { diff --git a/ractor/src/message.rs b/ractor/src/message.rs index 4faf83dd..3654c219 100644 --- a/ractor/src/message.rs +++ b/ractor/src/message.rs @@ -169,3 +169,27 @@ pub trait Message: Any + Send + Sized + 'static { // since there's no need for an override #[cfg(not(feature = "cluster"))] impl Message for T {} + +// Blanket implementation for basic types which are directly bytes serializable which +// are all to be CAST operations +#[cfg(feature = "cluster")] +impl Message for T { + fn serializable() -> bool { + true + } + + fn serialize(self) -> Result { + Ok(SerializedMessage::Cast { + variant: String::new(), + args: self.into_bytes(), + metadata: None, + }) + } + + fn deserialize(bytes: SerializedMessage) -> Result { + match bytes { + SerializedMessage::Cast { args, .. } => Ok(T::from_bytes(args)), + _ => Err(BoxedDowncastErr), + } + } +} diff --git a/ractor/src/serialization.rs b/ractor/src/serialization.rs index 561ec982..477af13f 100644 --- a/ractor/src/serialization.rs +++ b/ractor/src/serialization.rs @@ -55,6 +55,13 @@ implement_numeric! {u128} implement_numeric! {f32} implement_numeric! {f64} +impl BytesConvertable for () { + fn into_bytes(self) -> Vec { + Vec::new() + } + fn from_bytes(_: Vec) -> Self {} +} + impl BytesConvertable for bool { fn into_bytes(self) -> Vec { if self { diff --git a/ractor_cluster/Cargo.toml b/ractor_cluster/Cargo.toml index 172d677e..ac82fc5c 100644 --- a/ractor_cluster/Cargo.toml +++ b/ractor_cluster/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster" -version = "0.7.1" +version = "0.7.2" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "Distributed cluster environment of Ractor actors" documentation = "https://docs.rs/ractor" @@ -24,8 +24,8 @@ bytes = { version = "1" } log = "0.4" prost = { version = "0.11" } prost-types = { version = "0.11" } -ractor = { version = "0.7.1", features = ["cluster"], path = "../ractor" } -ractor_cluster_derive = { version = "0.7.1", path = "../ractor_cluster_derive" } +ractor = { version = "0.7.2", features = ["cluster"], path = "../ractor" } +ractor_cluster_derive = { version = "0.7.2", path = "../ractor_cluster_derive" } rand = "0.8" sha2 = "0.10" tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util"]} diff --git a/ractor_cluster_derive/Cargo.toml b/ractor_cluster_derive/Cargo.toml index 2886b968..f056f62c 100644 --- a/ractor_cluster_derive/Cargo.toml +++ b/ractor_cluster_derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster_derive" -version = "0.7.1" +version = "0.7.2" authors = ["Sean Lawlor "] description = "Derives for ractor_cluster" license = "MIT"