Skip to content

Commit

Permalink
Adding support for "instant" spawning (#48)
Browse files Browse the repository at this point in the history
* Adding support for "instant" spawning

This PR adds support for spawning actors "instantly" in that we don't wait on the `pre_start` routine. This is helpful if you want to immediately have access to the `ActorRef<TActor>` so you can begin sending messages to it without blocking on the `pre_start` routine.

**However** if the `pre_start` fails for any reason, it won't be captured unless the specific join handle is handled manually to monitor the spawning error. Documentation updated to reflect this.

New tests covering instant spawning added.

Additionally we're adding a small auto-implementation for types which implement `BytesSerializable` such that they are automatically implementing `ractor::Message` and can be network-sent. This makes most of the built-in numeric types, `()`, `String`s, and vectors of such able to immediately used in a cluster context.

* version bump
  • Loading branch information
slawlor authored Feb 15, 2023
1 parent 9b38fe2 commit 4fe2cda
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 11 deletions.
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.7.1"
version = "0.7.2"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
79 changes: 79 additions & 0 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,85 @@ where
actor.start(ports, startup_args, Some(supervisor)).await
}

/// Spawn an actor instantly, not waiting on the actor's `pre_start` routine. This is helpful
/// for actors where you want access to the send messages into the actor's message queue
/// without waiting on an asynchronous context.
///
/// **WARNING** Failures in the pre_start routine need to be waited on in the join handle
/// since they will NOT fail the spawn operation in this context
///
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The [Actor] defining the logic for this actor
/// * `startup_args`: Arguements passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
///
/// Returns a [Ok((ActorRef, JoinHandle<Result<JoinHandle<()>, SpawnErr>>))] upon successful creation of the
/// message queues, so you can begin sending messages. However the associated [JoinHandle] contains the inner
/// information around if the actor successfully started or not in it's `pre_start` routine. Returns [Err(SpawnErr)] if
/// the actor name is already allocated
#[allow(clippy::type_complexity)]
pub fn spawn_instant(
name: Option<ActorName>,
handler: TActor,
startup_args: TActor::Arguments,
) -> Result<
(
ActorRef<TActor>,
JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
),
SpawnErr,
> {
let (actor, ports) = Self::new(name, handler)?;
let actor_ref = actor.actor_ref.clone();
let join_op = crate::concurrency::spawn(async move {
let (_, handle) = actor.start(ports, startup_args, None).await?;
Ok(handle)
});
Ok((actor_ref, join_op))
}

/// Spawn an actor instantly with supervision, not waiting on the actor's `pre_start` routine.
/// This is helpful for actors where you want access to the send messages into the actor's
/// message queue without waiting on an asynchronous context.
///
/// **WARNING** Failures in the pre_start routine need to be waited on in the join handle
/// since they will NOT fail the spawn operation in this context. Additionally the supervision
/// tree will **NOT** be linked until the `pre_start` completes so there is a chance an actor
/// is lost during `pre_start` and not successfully started unless it's specifically handled
/// by the caller by awaiting later.
///
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The [Actor] defining the logic for this actor
/// * `startup_args`: Arguements passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
///
/// Returns a [Ok((ActorRef, JoinHandle<Result<JoinHandle<()>, SpawnErr>>))] upon successful creation of the
/// message queues, so you can begin sending messages. However the associated [JoinHandle] contains the inner
/// information around if the actor successfully started or not in it's `pre_start` routine. Returns [Err(SpawnErr)] if
/// the actor name is already allocated
#[allow(clippy::type_complexity)]
pub fn spawn_linked_instant(
name: Option<ActorName>,
handler: TActor,
startup_args: TActor::Arguments,
supervisor: ActorCell,
) -> Result<
(
ActorRef<TActor>,
JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
),
SpawnErr,
> {
let (actor, ports) = Self::new(name, handler)?;
let actor_ref = actor.actor_ref.clone();
let join_op = crate::concurrency::spawn(async move {
let (_, handle) = actor.start(ports, startup_args, Some(supervisor)).await?;
Ok(handle)
});
Ok((actor_ref, join_op))
}

/// Spawn a REMOTE actor with a supervisor, automatically starting the actor. Only for use
/// by `ractor_cluster::node::NodeSession`
///
Expand Down
57 changes: 57 additions & 0 deletions ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,3 +759,60 @@ async fn spawning_local_actor_as_remote_fails() {
actor.stop(None);
handle.await.expect("Failed to clean stop the actor");
}

#[crate::concurrency::test]
async fn instant_spawns() {
let counter = Arc::new(AtomicU8::new(0));

struct EmptyActor;
#[async_trait::async_trait]
impl Actor for EmptyActor {
type Msg = String;
type State = Arc<AtomicU8>;
type Arguments = Arc<AtomicU8>;
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self>,
counter: Arc<AtomicU8>,
) -> Result<Self::State, ActorProcessingErr> {
// delay startup by some amount
crate::concurrency::sleep(Duration::from_millis(200)).await;
Ok(counter)
}

async fn handle(
&self,
_this_actor: crate::ActorRef<Self>,
_message: String,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}

let (actor, handles) = crate::ActorRuntime::spawn_instant(None, EmptyActor, counter.clone())
.expect("Failed to instant spawn");

for i in 0..10 {
actor
.cast(format!("I = {i}"))
.expect("Actor couldn't receive message!");
}

// actor is still starting up
assert_eq!(0, counter.load(Ordering::Relaxed));

crate::concurrency::sleep(Duration::from_millis(250)).await;
// actor is started now and processing messages
assert_eq!(10, counter.load(Ordering::Relaxed));

// Cleanup
actor.stop(None);
handles
.await
.unwrap()
.expect("Actor's pre_start routine panicked")
.await
.unwrap();
}
95 changes: 91 additions & 4 deletions ractor/src/actor/tests/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
//! Supervisor tests
use std::sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicU64, AtomicU8, Ordering},
Arc,
};

use crate::{concurrency::Duration, ActorProcessingErr};

use crate::{Actor, ActorCell, ActorRef, ActorStatus, SupervisionEvent};

#[cfg(feature = "cluster")]
impl crate::Message for () {}

#[crate::concurrency::test]
async fn test_supervision_panic_in_post_startup() {
struct Child;
Expand Down Expand Up @@ -897,5 +894,95 @@ async fn test_killing_a_supervisor_terminates_children() {
c_handle.await.expect("Failed to wait for child to die");
}

#[crate::concurrency::test]
async fn instant_supervised_spawns() {
let counter = Arc::new(AtomicU8::new(0));

struct EmptySupervisor;
#[async_trait::async_trait]
impl Actor for EmptySupervisor {
type Msg = ();
type State = ();
type Arguments = ();
async fn pre_start(&self, _: ActorRef<Self>, _: ()) -> Result<(), ActorProcessingErr> {
Ok(())
}

async fn handle_supervisor_evt(
&self,
_: ActorRef<Self>,
_: SupervisionEvent,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
Err(From::from(
"Supervision event received when it shouldn't have been!",
))
}
}

struct EmptyActor;
#[async_trait::async_trait]
impl Actor for EmptyActor {
type Msg = String;
type State = Arc<AtomicU8>;
type Arguments = Arc<AtomicU8>;
async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self>,
counter: Arc<AtomicU8>,
) -> Result<Self::State, ActorProcessingErr> {
// delay startup by some amount
crate::concurrency::sleep(Duration::from_millis(200)).await;
Ok(counter)
}

async fn handle(
&self,
_this_actor: crate::ActorRef<Self>,
_message: String,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}

let (supervisor, shandle) = Actor::spawn(None, EmptySupervisor, ())
.await
.expect("Failed to startup supervisor");
let (actor, handles) = crate::ActorRuntime::spawn_linked_instant(
None,
EmptyActor,
counter.clone(),
supervisor.get_cell(),
)
.expect("Failed to instant spawn");

for i in 0..10 {
actor
.cast(format!("I = {i}"))
.expect("Actor couldn't receive message!");
}

// actor is still starting up
assert_eq!(0, counter.load(Ordering::Relaxed));

crate::concurrency::sleep(Duration::from_millis(250)).await;
// actor is started now and processing messages
assert_eq!(10, counter.load(Ordering::Relaxed));

// Cleanup
supervisor.stop(None);
shandle.await.unwrap();

actor.stop(None);
handles
.await
.unwrap()
.expect("Actor's pre_start routine panicked")
.await
.unwrap();
}

// TODO: Still to be tested
// 1. terminate_children_after()
2 changes: 0 additions & 2 deletions ractor/src/factory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ struct TestKey {
id: u64,
}
#[cfg(feature = "cluster")]
impl crate::Message for TestKey {}
#[cfg(feature = "cluster")]
impl crate::BytesConvertable for TestKey {
fn from_bytes(bytes: Vec<u8>) -> Self {
Self {
Expand Down
24 changes: 24 additions & 0 deletions ractor/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,27 @@ pub trait Message: Any + Send + Sized + 'static {
// since there's no need for an override
#[cfg(not(feature = "cluster"))]
impl<T: Any + Send + Sized + 'static> Message for T {}

// Blanket implementation for basic types which are directly bytes serializable which
// are all to be CAST operations
#[cfg(feature = "cluster")]
impl<T: Any + Send + Sized + 'static + crate::serialization::BytesConvertable> Message for T {
fn serializable() -> bool {
true
}

fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
Ok(SerializedMessage::Cast {
variant: String::new(),
args: self.into_bytes(),
metadata: None,
})
}

fn deserialize(bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
match bytes {
SerializedMessage::Cast { args, .. } => Ok(T::from_bytes(args)),
_ => Err(BoxedDowncastErr),
}
}
}
7 changes: 7 additions & 0 deletions ractor/src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ implement_numeric! {u128}
implement_numeric! {f32}
implement_numeric! {f64}

impl BytesConvertable for () {
fn into_bytes(self) -> Vec<u8> {
Vec::new()
}
fn from_bytes(_: Vec<u8>) -> Self {}
}

impl BytesConvertable for bool {
fn into_bytes(self) -> Vec<u8> {
if self {
Expand Down
6 changes: 3 additions & 3 deletions ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.7.1"
version = "0.7.2"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand All @@ -24,8 +24,8 @@ bytes = { version = "1" }
log = "0.4"
prost = { version = "0.11" }
prost-types = { version = "0.11" }
ractor = { version = "0.7.1", features = ["cluster"], path = "../ractor" }
ractor_cluster_derive = { version = "0.7.1", path = "../ractor_cluster_derive" }
ractor = { version = "0.7.2", features = ["cluster"], path = "../ractor" }
ractor_cluster_derive = { version = "0.7.2", path = "../ractor_cluster_derive" }
rand = "0.8"
sha2 = "0.10"
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util"]}
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster_derive"
version = "0.7.1"
version = "0.7.2"
authors = ["Sean Lawlor <seanlawlor@fb.com>"]
description = "Derives for ractor_cluster"
license = "MIT"
Expand Down

0 comments on commit 4fe2cda

Please sign in to comment.