Skip to content

Commit

Permalink
Support the async-std runtime as an alternative to tokio. (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
slawlor authored Oct 12, 2023
1 parent 1ae8d86 commit c593b4c
Show file tree
Hide file tree
Showing 23 changed files with 698 additions and 314 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ jobs:
- name: Run the default tests
package: ractor
# flags:
- name: Test ractor in async_std
package: ractor
flags: --features async-std
- name: Test ractor with the `cluster` feature
package: ractor
flags: -F cluster
Expand Down
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"rust-analyzer.showUnlinkedFileNotification": false
}
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ members = [
"ractor_cluster_integration_tests",
"xtask"
]
resolver = "2"
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ The minimum supported Rust version (MSRV) of `ractor` is `1.64`

## Features

`ractor` exposes a single feature currently, namely:
`ractor` exposes the following features:

1. `cluster`, which exposes various functionality required for `ractor_cluster` to set up and manage a cluster of actors over a network link. This is work-in-progress and is being tracked in [#16](https://github.com/slawlor/ractor/issues/16).
2. `async-std`, which enables usage of `async-std`'s asynchronous runtime instead of the `tokio` runtime. **However** `tokio` remains a dependency because we utilize the messaging synchronization primatives from `tokio` regardless of runtime as they are not specific to the `tokio` runtime. This work is tracked in [#173](https://github.com/slawlor/ractor/pull/173).

## Working with Actors

Expand Down Expand Up @@ -183,11 +184,8 @@ will be supported by `ractor`. There are 4 concurrent message types, which are l

1. Signals: Signals are the highest-priority of all and will interrupt the actor wherever processing currently is (this includes terminating async work). There
is only 1 signal today, which is `Signal::Kill`, and it immediately terminates all work. This includes message processing or supervision event processing.
2. Stop: There is also a pre-defined stop signal. You can give a "stop reason" if you want, but it's optional. Stop is a graceful exit, meaning currently executing async
work will complete, and on the next message processing iteration Stop will take priority over future supervision events or regular messages. It will **not** terminate
currently executing work, regardless of the provided reason.
3. SupervisionEvent: Supervision events are messages from child actors to their supervisors in the event of their startup, death, and/or unhandled panic. Supervision events
are how an actor's supervisor(s) are notified of events of their children and can handle lifetime events for them.
2. Stop: There is also the pre-defined stop signal. You can give a "stop reason" if you want, but it's optional. Stop is a graceful exit, meaning currently executing async work will complete, and on the next message processing iteration Stop will take priority over future supervision events or regular messages. It will **not** terminate currently executing work, regardless of the provided reason.
3. SupervisionEvent: Supervision events are messages from child actors to their supervisors in the event of their startup, death, and/or unhandled panic. Supervision events are how an actor's supervisor(parent) or peer monitors are notified of events of their children/peers and can handle lifetime events for them.
4. Messages: Regular, user-defined, messages are the last channel of communication to actors. They are the lowest priority of the 4 message types and denote general actor work. The first
3 messages types (signals, stop, supervision) are generally quiet unless it's a lifecycle event for the actor, but this channel is the "work" channel doing what your actor wants to do!

Expand Down Expand Up @@ -255,7 +253,7 @@ enum MyBasicMessageType {
}
```

which adds a significant amount of underlying boilerplate (take a look yourself with `cargo expand`!) for the implementation. But the short answer is, each enum variant needs to serialize to a byte array of arguments, a variant name, and if it's an RPC give a port that receives a byte array and de-serialize the reply back. Each of the types inside of either the arguments or reply type need to implement the ```ractor_cluster::BytesConvertable``` trait which just says this value can be written to a byte array and decoded from a byte array. If you're using `prost` for your message type definitions (protobuf), we have a macro to auto-implement this for your types.
which adds a significant amount of underlying boilerplate (take a look yourself with `cargo expand`) for the implementation. But the short answer is, each enum variant needs to serialize to a byte array of arguments, a variant name, and if it's an RPC give a port that receives a byte array and de-serialize the reply back. Each of the types inside of either the arguments or reply type need to implement the ```ractor_cluster::BytesConvertable``` trait which just says this value can be written to a byte array and decoded from a byte array. If you're using `prost` for your message type definitions (protobuf), we have a macro to auto-implement this for your types.

```rust
ractor_cluster::derive_serialization_for_prost_type! {MyProtobufType}
Expand Down
14 changes: 6 additions & 8 deletions ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.9.2"
version = "0.9.3"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -14,14 +14,10 @@ categories = ["actor", "erlang"]
rust-version = "1.64"

[features]
# WIP
# tokio_runtime = ["tokio/time"]
# async_std_runtime = ["async-std"]

# default = ["tokio_runtime"]
# default = ["async_std_runtime"]

### Other features
cluster = []

# default = ["async-std"]
default = []

[dependencies]
Expand All @@ -31,8 +27,10 @@ dashmap = "5"
futures = "0.3"
once_cell = "1"
rand = "0.8"

# Tracing feature requires --cfg=tokio_unstable
tokio = { version = "1", features = ["sync", "time", "rt", "macros", "tracing"] }
async-std = { version = "1", features = ["attributes"], optional = true}
tracing = { version = "0.1", features = ["attributes"] }

[dev-dependencies]
Expand Down
225 changes: 173 additions & 52 deletions ractor/benches/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,42 +50,80 @@ fn create_actors(c: &mut Criterion) {
let large = 10000;

let id = format!("Creation of {small} actors");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {},
|()| {
runtime.block_on(async move {
let mut handles = vec![];
for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let mut handles = vec![];
for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let mut handles = vec![];
for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
}
},
BatchSize::PerIteration,
);
});

let id = format!("Creation of {large} actors");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {},
|()| {
runtime.block_on(async move {
let mut handles = vec![];
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let mut handles = vec![];
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let mut handles = vec![];
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
}
},
BatchSize::PerIteration,
);
Expand All @@ -97,47 +135,104 @@ fn schedule_work(c: &mut Criterion) {
let large = 1000;

let id = format!("Waiting on {small} actors to process first message");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {
runtime.block_on(async move {
let mut join_set = tokio::task::JoinSet::new();

for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let mut join_set = ractor::concurrency::JoinSet::new();

for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let mut join_set = ractor::concurrency::JoinSet::new();

for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
}
},
|mut handles| {
runtime.block_on(async move { while handles.join_next().await.is_some() {} })
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move { while handles.join_next().await.is_some() {} })
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
while handles.join_next().await.is_some() {}
})
}
},
BatchSize::PerIteration,
);
});

let id = format!("Waiting on {large} actors to process first message");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {
runtime.block_on(async move {
let mut join_set = tokio::task::JoinSet::new();
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let mut join_set = ractor::concurrency::JoinSet::new();
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let mut join_set = ractor::concurrency::JoinSet::new();
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
}
},
|mut handles| {
runtime.block_on(async move { while handles.join_next().await.is_some() {} })
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move { while handles.join_next().await.is_some() {} })
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
while handles.join_next().await.is_some() {}
})
}
},
BatchSize::PerIteration,
);
Expand Down Expand Up @@ -186,21 +281,47 @@ fn process_messages(c: &mut Criterion) {
}

let id = format!("Waiting on {NUM_MSGS} messages to be processed");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {
runtime.block_on(async move {
let (_, handle) = Actor::spawn(None, MessagingActor { num_msgs: NUM_MSGS }, ())
.await
.expect("Failed to create test actor");
handle
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let (_, handle) =
Actor::spawn(None, MessagingActor { num_msgs: NUM_MSGS }, ())
.await
.expect("Failed to create test actor");
handle
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let (_, handle) =
Actor::spawn(None, MessagingActor { num_msgs: NUM_MSGS }, ())
.await
.expect("Failed to create test actor");
handle
})
}
},
|handle| {
runtime.block_on(async move {
let _ = handle.await;
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let _ = handle.await;
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let _ = handle.await;
})
}
},
BatchSize::PerIteration,
);
Expand Down
2 changes: 1 addition & 1 deletion ractor/examples/philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ async fn main() {
let time_slice = Duration::from_millis(10);
let run_time = Duration::from_secs(5);

let philosopher_names = vec![
let philosopher_names = [
"Confucius",
"Descartes",
"Benjamin Franklin",
Expand Down
Loading

0 comments on commit c593b4c

Please sign in to comment.