Skip to content

Commit

Permalink
Switch from tokio.rs to async_std
Browse files Browse the repository at this point in the history
  • Loading branch information
bkolobara committed Sep 10, 2021
1 parent 6de5ca8 commit e4b5ce9
Show file tree
Hide file tree
Showing 14 changed files with 422 additions and 115 deletions.
331 changes: 326 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ uuid = { version = "0.8", features = ["v4"] }
anyhow = "1.0"
clap = "3.0.0-beta.2"
lazy_static = "1.4"
tokio = { version = "1.7", features = ["rt-multi-thread", "macros", "sync", "net", "time", "io-util"] }
tokio = { version = "1.7", features = ["macros", "net"] }
async-std = { version = "^1.0", features = ["attributes", "unstable"] }
wasmtime = { version = "0.29", git = "https://github.com/bytecodealliance/wasmtime.git", branch = "main" }
wasmtime-wasi = { version = "0.29", git = "https://github.com/bytecodealliance/wasmtime.git", branch = "main" }
wasmparser = "0.79"
Expand All @@ -34,6 +35,7 @@ semver = "1"
wat = "1.0"
wabt = "0.10"
pretty_assertions = "0.7"
tokio = { version = "1.7", features = ["rt-multi-thread"] }
criterion = { version = "0.3", features = ["async_tokio"] }

[build-dependencies]
Expand Down
7 changes: 6 additions & 1 deletion benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ fn criterion_benchmark(c: &mut Criterion) {

c.bench_function("spawn process", |b| {
b.to_async(&rt).iter(|| async {
module.spawn("hello", Vec::new(), None).await.unwrap();
module
.spawn("hello", Vec::new(), None)
.await
.unwrap()
.0
.await;
});
});

Expand Down
4 changes: 2 additions & 2 deletions src/api/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ fn send_receive_skip_search(
.or_trap("lunatic::message::send_receive_skip_search")?;
process.send(Signal::Message(message));
if let Some(message) = tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
message = caller.data_mut().message_mailbox.pop_skip_search(tag) => Some(message)
} {
// Put the message into the scratch area
Expand Down Expand Up @@ -550,7 +550,7 @@ fn receive(
tag => Some(tag),
};
if let Some(message) = tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
message = caller.data_mut().message_mailbox.pop(tag) => Some(message)
} {
let result = match message {
Expand Down
2 changes: 1 addition & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn namespace_matches_filter(namespace: &str, name: &str, namespace_filter: &[Str
}

mod tests {
#[tokio::test]
#[async_std::test]
async fn import_filter_signature_matches() {
use crate::{EnvConfig, Environment};

Expand Down
40 changes: 12 additions & 28 deletions src/api/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ use std::convert::TryInto;
use std::future::Future;
use std::io::IoSlice;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use async_std::io::{ReadExt, WriteExt};
use async_std::net::{TcpListener, TcpStream};
use wasmtime::{Caller, FuncType, Linker, ValType};
use wasmtime::{Memory, Trap};

Expand Down Expand Up @@ -211,7 +209,7 @@ fn resolve(
let name = std::str::from_utf8(buffer.as_slice()).or_trap("lunatic::network::resolve")?;
// Check for timeout during lookup
let return_ = if let Some(result) = tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
result = tokio::net::lookup_host(name) => Some(result)
} {
let (iter_or_error_id, result) = match result {
Expand Down Expand Up @@ -463,11 +461,7 @@ fn tcp_accept(

let (tcp_stream_or_error_id, peer_addr_iter, result) = match tcp_listener.accept().await {
Ok((stream, socket_addr)) => {
let stream_id = caller
.data_mut()
.resources
.tcp_streams
.add(Arc::new(Mutex::new(stream)));
let stream_id = caller.data_mut().resources.tcp_streams.add(stream);
let dns_iter_id = caller
.data_mut()
.resources
Expand Down Expand Up @@ -539,18 +533,11 @@ fn tcp_connect(
)?;

if let Some(result) = tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
result = TcpStream::connect(socket_addr) => Some(result)
} {
let (stream_or_error_id, result) = match result {
Ok(stream) => (
caller
.data_mut()
.resources
.tcp_streams
.add(Arc::new(Mutex::new(stream))),
0,
),
Ok(stream) => (caller.data_mut().resources.tcp_streams.add(stream), 0),
Err(error) => (caller.data_mut().errors.add(error.into()), 1),
};

Expand Down Expand Up @@ -660,17 +647,17 @@ fn tcp_write_vectored(
.collect();
let vec_slices = vec_slices?;

let stream_mutex = caller
let mut stream = caller
.data()
.resources
.tcp_streams
.get(stream_id)
.or_trap("lunatic::network::tcp_write_vectored")?
.clone();
let mut stream = stream_mutex.lock().await;

// Check for timeout
if let Some(result) = tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
result = stream.write_vectored(vec_slices.as_slice()) => Some(result)
} {
let (opaque, return_) = match result {
Expand Down Expand Up @@ -722,16 +709,14 @@ fn tcp_read(
opaque_ptr: u32,
) -> Box<dyn Future<Output = Result<u32, Trap>> + Send + '_> {
Box::new(async move {
let stream_mutex = caller
let mut stream = caller
.data()
.resources
.tcp_streams
.get(stream_id)
.or_trap("lunatic::network::tcp_read")?
.clone();

let mut stream = stream_mutex.lock().await;

let memory = get_memory(&mut caller)?;
let buffer = memory
.data_mut(&mut caller)
Expand All @@ -740,7 +725,7 @@ fn tcp_read(

// Check for timeout first
if let Some(result) = tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
_ = async_std::task::sleep(Duration::from_millis(timeout as u64)), if timeout != 0 => None,
result = stream.read(buffer) => Some(result)
} {
let (opaque, return_) = match result {
Expand Down Expand Up @@ -783,14 +768,13 @@ fn tcp_flush(
error_id_ptr: u32,
) -> Box<dyn Future<Output = Result<u32, Trap>> + Send + '_> {
Box::new(async move {
let stream_mutex = caller
let mut stream = caller
.data()
.resources
.tcp_streams
.get(stream_id)
.or_trap("lunatic::network::tcp_flush")?
.clone();
let mut stream = stream_mutex.lock().await;

let (error_id, result) = match stream.flush().await {
Ok(()) => (0, 0),
Expand Down
8 changes: 4 additions & 4 deletions src/api/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ fn clone_process(mut caller: Caller<ProcessState>, process_id: u64) -> Result<u6
//% Suspend process for `millis`.
fn sleep_ms(_: Caller<ProcessState>, millis: u64) -> Box<dyn Future<Output = ()> + Send + '_> {
Box::new(async move {
tokio::time::sleep(Duration::from_millis(millis)).await;
async_std::task::sleep(Duration::from_millis(millis)).await;
})
}

Expand All @@ -771,7 +771,7 @@ fn die_when_link_dies(mut caller: Caller<ProcessState>, trap: u32) {
caller
.data_mut()
.signal_mailbox
.send(Signal::DieWhenLinkDies(trap != 0))
.try_send(Signal::DieWhenLinkDies(trap != 0))
.expect("The signal is sent to itself and the receiver must exist at this point");
}

Expand Down Expand Up @@ -847,7 +847,7 @@ fn link(mut caller: Caller<ProcessState>, tag: i64, process_id: u64) -> Result<(
caller
.data_mut()
.signal_mailbox
.send(Signal::Link(tag, process))
.try_send(Signal::Link(tag, process))
.expect("The signal is sent to itself and the receiver must exist at this point");
Ok(())
}
Expand Down Expand Up @@ -878,7 +878,7 @@ fn unlink(mut caller: Caller<ProcessState>, process_id: u64) -> Result<(), Trap>
caller
.data_mut()
.signal_mailbox
.send(Signal::UnLink(process))
.try_send(Signal::UnLink(process))
.expect("The signal is sent to itself and the receiver must exist at this point");
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Result;
use lazy_static::lazy_static;
use tokio::task;
use wasmtime::{Config, Engine, InstanceAllocationStrategy, Linker, OptLevel, ProfilingStrategy};

use super::config::EnvConfig;
Expand Down Expand Up @@ -77,13 +76,13 @@ impl Environment {
let env = self.clone();
let new_module = patch_module(&data, self.config.plugins())?;
// The compilation of a module is a CPU intensive tasks and can take some time.
let module = task::spawn_blocking(move || {
let module = async_std::task::spawn_blocking(move || {
match wasmtime::Module::new(env.engine(), new_module.as_slice()) {
Ok(wasmtime_module) => Ok(Module::new(data, env, wasmtime_module)),
Err(err) => Err(err),
}
})
.await??;
.await?;
Ok(module)
}

Expand Down
6 changes: 3 additions & 3 deletions src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ mod tests {

use super::{Message, MessageMailbox};

#[tokio::test]
#[async_std::test]
async fn no_tag_signal_message() {
let mailbox = MessageMailbox::default();
let message = Message::Signal(None);
Expand All @@ -163,7 +163,7 @@ mod tests {
}
}

#[tokio::test]
#[async_std::test]
async fn tag_signal_message() {
let mailbox = MessageMailbox::default();
let tag = Some(1337);
Expand All @@ -173,7 +173,7 @@ mod tests {
assert_eq!(message.tag(), tag);
}

#[tokio::test]
#[async_std::test]
async fn selective_receive_tag_signal_message() {
let mailbox = MessageMailbox::default();
let tag1 = Some(1);
Expand Down
39 changes: 17 additions & 22 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use clap::{crate_version, App, Arg, ArgSettings};
use anyhow::{Context, Result};
use lunatic_runtime::{EnvConfig, Environment};

fn main() -> Result<()> {
#[async_std::main]
async fn main() -> Result<()> {
// Init logger
env_logger::init();
// Parse command line arguments
Expand Down Expand Up @@ -60,25 +61,19 @@ fn main() -> Result<()> {
}
let env = Environment::new(config)?;

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;

rt.block_on(async {
// Spawn main process
let path = args.value_of("wasm").unwrap();
let path = Path::new(path);
let module = fs::read(path)?;
let module = env.create_module(module).await?;
let (task, _) = module
.spawn("_start", Vec::new(), None)
.await
.context(format!(
"Failed to spawn process from {}::_start()",
path.to_string_lossy()
))?;
// Wait on the main process to finish
task.await?;
Ok(())
})
// Spawn main process
let path = args.value_of("wasm").unwrap();
let path = Path::new(path);
let module = fs::read(path)?;
let module = env.create_module(module).await?;
let (task, _) = module
.spawn("_start", Vec::new(), None)
.await
.context(format!(
"Failed to spawn process from {}::_start()",
path.to_string_lossy()
))?;
// Wait on the main process to finish
task.await;
Ok(())
}
10 changes: 5 additions & 5 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
sync::Arc,
};

use tokio::{net::TcpStream, sync::Mutex};
use async_std::net::TcpStream;

use crate::Process;

Expand Down Expand Up @@ -65,7 +65,7 @@ impl DataMessage {
}

/// Adds a TCP stream to the message and returns the index of it inside of the message
pub fn add_tcp_stream(&mut self, tcp_stream: Arc<Mutex<TcpStream>>) -> usize {
pub fn add_tcp_stream(&mut self, tcp_stream: TcpStream) -> usize {
self.resources.push(Resource::TcpStream(tcp_stream));
self.resources.len() - 1
}
Expand Down Expand Up @@ -94,7 +94,7 @@ impl DataMessage {
///
/// If the index is out of bound or the resource is not a tcp stream the function will return
/// None.
pub fn take_tcp_stream(&mut self, index: usize) -> Option<Arc<Mutex<TcpStream>>> {
pub fn take_tcp_stream(&mut self, index: usize) -> Option<TcpStream> {
if let Some(resource_ref) = self.resources.get_mut(index) {
let resource = std::mem::replace(resource_ref, Resource::None);
match resource {
Expand Down Expand Up @@ -147,12 +147,12 @@ impl Read for DataMessage {
}
}

/// A resource ([`WasmProcess`](crate::WasmProcess), [`TcpStream`](tokio::net::TcpStream),
/// A resource ([`WasmProcess`](crate::WasmProcess), [`TcpStream`](async_std::net::TcpStream),
/// ...) that is attached to a [`DataMessage`].
pub enum Resource {
None,
Process(Arc<dyn Process>),
TcpStream(Arc<Mutex<TcpStream>>),
TcpStream(TcpStream),
}

impl Debug for Resource {
Expand Down
Loading

0 comments on commit e4b5ce9

Please sign in to comment.