Skip to content

Commit

Permalink
Merge pull request #77 from TheButlah/runtime-agnostic-codecs
Browse files Browse the repository at this point in the history
Made code (mostly) runtime and transport independent
  • Loading branch information
Alexei-Kornienko authored Oct 13, 2020
2 parents 9acd3f0 + 437aa36 commit dce2c13
Show file tree
Hide file tree
Showing 14 changed files with 213 additions and 108 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ criterion = "0.3"
bench = false

[[bench]]
name = "pub_sub"
harness = false
name = "pub_sub"
harness = false
bench = false # Don't actually benchmark this, until we fix it

[[bench]]
Expand Down
34 changes: 34 additions & 0 deletions src/codec/framed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! General types and traits to facilitate compatibility across async runtimes
use crate::codec::ZmqCodec;

// We use dynamic dispatch to avoid complicated generics and simplify things
type Inner = futures_codec::Framed<Box<dyn Frameable>, ZmqCodec>;

// Enables us to have multiple bounds on the dyn trait in `InnerFramed`
pub trait Frameable: futures::AsyncWrite + futures::AsyncRead + Unpin + Send {}
impl<T> Frameable for T where T: futures::AsyncWrite + futures::AsyncRead + Unpin + Send {}

/// Equivalent to [`futures_codec::Framed<T, ZmqCodec>`] or
/// [`tokio_util::codec::Framed`]
pub(crate) struct FramedIo(Inner);
impl FramedIo {
pub fn new(frameable: Box<dyn Frameable>) -> Self {
let inner = futures_codec::Framed::new(frameable, ZmqCodec::new());
Self(inner)
}
}

impl std::ops::Deref for FramedIo {
type Target = Inner;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for FramedIo {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
2 changes: 2 additions & 0 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
mod command;
mod error;
mod framed;
mod greeting;
mod mechanism;
mod zmq_codec;

pub(crate) use command::{ZmqCommand, ZmqCommandName};
pub(crate) use error::{CodecError, CodecResult};
pub(crate) use framed::FramedIo;
pub(crate) use greeting::ZmqGreeting;
pub(crate) use zmq_codec::ZmqCodec;

Expand Down
22 changes: 13 additions & 9 deletions src/dealer_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ use async_trait::async_trait;
use dashmap::DashMap;
use futures::channel::{mpsc, oneshot};
use futures::lock::Mutex;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::SinkExt;
use futures_codec::Framed;
use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::Arc;
use tokio::net::TcpStream;

use crate::codec::FramedIo;
use crate::codec::*;
use crate::endpoint::{Endpoint, TryIntoEndpoint};
use crate::error::*;
use crate::message::*;
use crate::util::*;
use crate::{util, MultiPeer, Socket, SocketBackend};
use crate::transport;
use crate::util::{self, Peer, PeerIdentity};
use crate::{MultiPeer, Socket, SocketBackend};
use crate::{SocketType, ZmqResult};
use futures::stream::{FuturesUnordered, StreamExt};
use std::collections::HashMap;

struct RouterSocketBackend {
pub(crate) peers: Arc<DashMap<PeerIdentity, Peer>>,
Expand Down Expand Up @@ -97,8 +97,12 @@ impl Socket for RouterSocket {

async fn bind(&mut self, endpoint: impl TryIntoEndpoint + 'async_trait) -> ZmqResult<Endpoint> {
let endpoint = endpoint.try_into()?;
let (endpoint, stop_handle) =
util::start_accepting_connections(endpoint, self.backend.clone()).await?;
let Endpoint::Tcp(host, port) = endpoint;

let cloned_backend = self.backend.clone();
let cback = move |result| util::peer_connected(result, cloned_backend.clone());
let (endpoint, stop_handle) = transport::tcp::begin_accept(host, port, cback).await?;

self.binds.insert(endpoint.clone(), stop_handle);
Ok(endpoint)
}
Expand Down Expand Up @@ -159,7 +163,7 @@ impl RouterSocket {
}

pub struct DealerSocket {
pub(crate) _inner: Framed<TcpStream, ZmqCodec>,
pub(crate) _inner: FramedIo,
}

impl DealerSocket {
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::ZmqMessage;

use thiserror::Error;

pub type ZmqResult<T> = Result<T, ZmqError>;

#[derive(Error, Debug)]
pub enum ZmqError {
#[error(transparent)]
Expand Down
29 changes: 14 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
#![recursion_limit = "1024"]
#[macro_use]
extern crate enum_primitive_derive;
use num_traits::ToPrimitive;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use std::convert::TryFrom;
use std::fmt::{Debug, Display};

use futures_codec::Framed;

mod codec;
mod dealer_router;
Expand All @@ -20,21 +10,30 @@ mod r#pub;
mod rep;
mod req;
mod sub;
mod transport;
pub mod util;

use crate::codec::*;
pub use crate::dealer_router::*;
pub use crate::endpoint::{Endpoint, Host, Transport, TryIntoEndpoint};
pub use crate::error::ZmqError;
pub use crate::error::{ZmqError, ZmqResult};
pub use crate::r#pub::*;
pub use crate::rep::*;
pub use crate::req::*;
pub use crate::sub::*;
use crate::util::*;
pub use message::*;
use std::collections::HashMap;

pub type ZmqResult<T> = Result<T, ZmqError>;
use crate::codec::*;
use crate::util::*;

#[macro_use]
extern crate enum_primitive_derive;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use num_traits::ToPrimitive;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::{Debug, Display};

#[derive(Clone, Copy, Debug, PartialEq, Primitive)]
pub enum SocketType {
Expand Down
14 changes: 10 additions & 4 deletions src/pub.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::codec::*;
use crate::endpoint::{Endpoint, TryIntoEndpoint};
use crate::message::*;
use crate::transport;
use crate::util::*;
use crate::{util, MultiPeer, NonBlockingSend, Socket, SocketBackend, SocketType, ZmqResult};

use async_trait::async_trait;
use dashmap::DashMap;
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -145,8 +147,12 @@ impl Socket for PubSocket {

async fn bind(&mut self, endpoint: impl TryIntoEndpoint + 'async_trait) -> ZmqResult<Endpoint> {
let endpoint = endpoint.try_into()?;
let (endpoint, stop_handle) =
util::start_accepting_connections(endpoint, self.backend.clone()).await?;
let Endpoint::Tcp(host, port) = endpoint;

let cloned_backend = self.backend.clone();
let cback = move |result| util::peer_connected(result, cloned_backend.clone());
let (endpoint, stop_handle) = transport::tcp::begin_accept(host, port, cback).await?;

self.binds.insert(endpoint.clone(), stop_handle);
Ok(endpoint)
}
Expand All @@ -155,8 +161,8 @@ impl Socket for PubSocket {
let endpoint = endpoint.try_into()?;
let Endpoint::Tcp(host, port) = endpoint;

let raw_socket = tokio::net::TcpStream::connect((host.to_string().as_str(), port)).await?;
util::peer_connected(raw_socket, self.backend.clone()).await;
let connect_result = transport::tcp::connect(host, port).await;
util::peer_connected(connect_result, self.backend.clone()).await;
Ok(())
}

Expand Down
18 changes: 12 additions & 6 deletions src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ use crate::codec::*;
use crate::endpoint::{Endpoint, TryIntoEndpoint};
use crate::error::*;
use crate::fair_queue::FairQueue;
use crate::transport;
use crate::util::FairQueueProcessor;
use crate::*;
use crate::{util, SocketType, ZmqResult};

use async_trait::async_trait;
use dashmap::DashMap;
use futures_util::sink::SinkExt;
use futures::SinkExt;
use futures::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::stream::StreamExt;

struct RepPeer {
pub(crate) _identity: PeerIdentity,
Expand Down Expand Up @@ -66,8 +68,12 @@ impl Socket for RepSocket {

async fn bind(&mut self, endpoint: impl TryIntoEndpoint + 'async_trait) -> ZmqResult<Endpoint> {
let endpoint = endpoint.try_into()?;
let (endpoint, stop_handle) =
util::start_accepting_connections(endpoint, self.backend.clone()).await?;
let Endpoint::Tcp(host, port) = endpoint;

let cloned_backend = self.backend.clone();
let cback = move |result| util::peer_connected(result, cloned_backend.clone());
let (endpoint, stop_handle) = transport::tcp::begin_accept(host, port, cback).await?;

self.binds.insert(endpoint.clone(), stop_handle);
Ok(endpoint)
}
Expand All @@ -76,8 +82,8 @@ impl Socket for RepSocket {
let endpoint = endpoint.try_into()?;
let Endpoint::Tcp(host, port) = endpoint;

let raw_socket = tokio::net::TcpStream::connect((host.to_string().as_str(), port)).await?;
util::peer_connected(raw_socket, self.backend.clone()).await;
let connect_result = transport::tcp::connect(host, port).await;
util::peer_connected(connect_result, self.backend.clone()).await;
Ok(())
}

Expand Down
17 changes: 11 additions & 6 deletions src/req.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use crate::codec::*;
use crate::endpoint::{Endpoint, TryIntoEndpoint};
use crate::error::*;
use crate::transport;
use crate::util::{self, Peer, PeerIdentity};
use crate::*;
use crate::{SocketType, ZmqResult};

use async_trait::async_trait;
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use futures::channel::{mpsc, oneshot};
use futures::lock::Mutex;
use futures_util::sink::SinkExt;
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::stream::StreamExt;

struct ReqSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, Peer>,
Expand Down Expand Up @@ -124,8 +125,12 @@ impl Socket for ReqSocket {

async fn bind(&mut self, endpoint: impl TryIntoEndpoint + 'async_trait) -> ZmqResult<Endpoint> {
let endpoint = endpoint.try_into()?;
let (endpoint, stop_handle) =
util::start_accepting_connections(endpoint, self.backend.clone()).await?;
let Endpoint::Tcp(host, port) = endpoint;

let cloned_backend = self.backend.clone();
let cback = move |result| util::peer_connected(result, cloned_backend.clone());
let (endpoint, stop_handle) = transport::tcp::begin_accept(host, port, cback).await?;

self.binds.insert(endpoint.clone(), stop_handle);
Ok(endpoint)
}
Expand All @@ -134,8 +139,8 @@ impl Socket for ReqSocket {
let endpoint = endpoint.try_into()?;
let Endpoint::Tcp(host, port) = endpoint;

let raw_socket = tokio::net::TcpStream::connect((host.to_string().as_str(), port)).await?;
util::peer_connected(raw_socket, self.backend.clone()).await;
let connect_result = transport::tcp::connect(host, port).await;
util::peer_connected(connect_result, self.backend.clone()).await;
Ok(())
}

Expand Down
18 changes: 11 additions & 7 deletions src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use crate::codec::*;
use crate::endpoint::{Endpoint, TryIntoEndpoint};
use crate::fair_queue::FairQueue;
use crate::message::*;
use crate::transport;
use crate::util::*;
use crate::{util, BlockingRecv, MultiPeer, Socket, SocketBackend, SocketType, ZmqResult};

use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use dashmap::DashMap;
use futures::channel::{mpsc, oneshot};
use futures::SinkExt;
use futures::StreamExt;

use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -147,8 +147,12 @@ impl Socket for SubSocket {

async fn bind(&mut self, endpoint: impl TryIntoEndpoint + 'async_trait) -> ZmqResult<Endpoint> {
let endpoint = endpoint.try_into()?;
let (endpoint, stop_handle) =
util::start_accepting_connections(endpoint, self.backend.clone()).await?;
let Endpoint::Tcp(host, port) = endpoint;

let cloned_backend = self.backend.clone();
let cback = move |result| util::peer_connected(result, cloned_backend.clone());
let (endpoint, stop_handle) = transport::tcp::begin_accept(host, port, cback).await?;

self.binds.insert(endpoint.clone(), stop_handle);
Ok(endpoint)
}
Expand All @@ -157,8 +161,8 @@ impl Socket for SubSocket {
let endpoint = endpoint.try_into()?;
let Endpoint::Tcp(host, port) = endpoint;

let raw_socket = tokio::net::TcpStream::connect((host.to_string().as_str(), port)).await?;
util::peer_connected(raw_socket, self.backend.clone()).await;
let connect_result = transport::tcp::connect(host, port).await;
util::peer_connected(connect_result, self.backend.clone()).await;
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod tcp;
30 changes: 30 additions & 0 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// TODO: Conditionally compile things
mod tokio;

use self::tokio as tk;
use crate::codec::FramedIo;
use crate::endpoint::{Endpoint, Host, Port};
use crate::ZmqResult;

use std::net::SocketAddr;

pub(crate) async fn connect(host: Host, port: Port) -> ZmqResult<(FramedIo, SocketAddr)> {
tk::connect(host, port).await
}

/// Spawns an async task that listens for tcp connections at the provided
/// address.
///
/// `cback` will be invoked when a tcp connection is accepted. If the result was
/// `Ok`, we get a tuple containing the framed raw socket, along with the ip
/// address of the remote connection accepted.
pub(crate) async fn begin_accept<T>(
host: Host,
port: Port,
cback: impl Fn(ZmqResult<(FramedIo, SocketAddr)>) -> T + Send + 'static,
) -> ZmqResult<(Endpoint, futures::channel::oneshot::Sender<bool>)>
where
T: std::future::Future<Output = ()> + Send + 'static,
{
tk::begin_accept(host, port, cback).await
}
Loading

0 comments on commit dce2c13

Please sign in to comment.