Skip to content

Commit

Permalink
code refactor on rpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Sep 25, 2023
1 parent a52dd36 commit 4d970f1
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 83 deletions.
120 changes: 62 additions & 58 deletions rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use axum::routing::post;
use axum::{Extension, Router};
use ckb_app_config::RpcConfig;
use ckb_logger::info;
use ckb_notify::NotifyController;

use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken};
use futures_util::{SinkExt, TryStreamExt};
use jsonrpc_core::MetaIoHandler;
Expand All @@ -14,7 +14,7 @@ use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::runtime::Handle;

use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError};
use tower_http::timeout::TimeoutLayer;

Expand All @@ -33,13 +33,7 @@ impl RpcServer {
///
/// * `config` - RPC config options.
/// * `io_handler` - RPC methods handler. See [ServiceBuilder](../service_builder/struct.ServiceBuilder.html).
/// * `notify_controller` - Controller emitting notifications.
pub async fn new(
config: RpcConfig,
io_handler: IoHandler,
_handle: Handle,
_notify_controller: &NotifyController,
) -> Self {
pub async fn new(config: RpcConfig, io_handler: IoHandler) -> Self {
let rpc = Arc::new(io_handler);

let http_address = Self::start_server(&rpc, config.listen_address.to_owned()).await;
Expand All @@ -53,55 +47,13 @@ impl RpcServer {
None
};

// TCP server with line delimited json codec.
let mut tcp_address = None;
if let Some(tcp_listen_address) = config.tcp_listen_address {
let listener = TcpListener::bind(tcp_listen_address).await.unwrap();
tcp_address = listener.local_addr().ok();
info!("listen TCP RPCServer on address {:?}", tcp_address.unwrap());
tokio::spawn(async move {
let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
let stream_config = StreamServerConfig::default()
.with_channel_size(4)
.with_keep_alive(true)
.with_keep_alive_duration(Duration::from_secs(60))
.with_pipeline_size(4);

let exit_signal: CancellationToken = new_tokio_exit_rx();
tokio::select! {
_ = async {
while let Ok((stream, _)) = listener.accept().await {
let rpc = Arc::clone(&rpc);
let stream_config = stream_config.clone();
let codec = codec.clone();
tokio::spawn(async move {
let (r, w) = stream.into_split();
let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
let w = FramedWrite::new(w, codec).with(|msg| async move {
Ok::<_, LinesCodecError>(match msg {
StreamMsg::Str(msg) => msg,
_ => "".into(),
})
});
tokio::pin!(w);
let exit_signal: CancellationToken = new_tokio_exit_rx();
tokio::select! {
result = serve_stream_sink(&rpc, w, r, stream_config) => {
if let Err(err) = result {
info!("TCP RPCServer error: {:?}", err);
}
}
_ = exit_signal.cancelled() => {}
}
});
}
} => {},
_ = exit_signal.cancelled() => {
info!("TCP RPCServer stopped");
}
}
});
}
let tcp_address = if let Some(addr) = config.tcp_listen_address {
let local_addr = Self::start_tcp_server(rpc, addr).await;
info!("Listen TCP RPCServer on address {}", local_addr);
Some(local_addr)
} else {
None
};

Self {
http_address,
Expand Down Expand Up @@ -152,4 +104,56 @@ impl RpcServer {
});
rx_addr.await.unwrap()
}

async fn start_tcp_server(
rpc: Arc<MetaIoHandler<Option<Session>>>,
tcp_listen_address: String,
) -> SocketAddr {
// TCP server with line delimited json codec.
let listener = TcpListener::bind(tcp_listen_address).await.unwrap();
let tcp_address = listener.local_addr().unwrap();
tokio::spawn(async move {
let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
let stream_config = StreamServerConfig::default()
.with_channel_size(4)
.with_keep_alive(true)
.with_keep_alive_duration(Duration::from_secs(60))
.with_pipeline_size(4);

let exit_signal: CancellationToken = new_tokio_exit_rx();
tokio::select! {
_ = async {
while let Ok((stream, _)) = listener.accept().await {
let rpc = Arc::clone(&rpc);
let stream_config = stream_config.clone();
let codec = codec.clone();
tokio::spawn(async move {
let (r, w) = stream.into_split();
let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
let w = FramedWrite::new(w, codec).with(|msg| async move {
Ok::<_, LinesCodecError>(match msg {
StreamMsg::Str(msg) => msg,
_ => "".into(),
})
});
tokio::pin!(w);
let exit_signal: CancellationToken = new_tokio_exit_rx();
tokio::select! {
result = serve_stream_sink(&rpc, w, r, stream_config) => {
if let Err(err) = result {
info!("TCP RPCServer error: {:?}", err);
}
}
_ = exit_signal.cancelled() => {}
}
});
}
} => {},
_ = exit_signal.cancelled() => {
info!("TCP RPCServer stopped");
}
}
});
tcp_address
}
}
10 changes: 1 addition & 9 deletions rpc/src/tests/examples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,7 @@ fn setup_rpc_test_suite(height: u64) -> RpcTestSuite {
let io_handler = builder.build();
let shared_clone = shared.clone();
let handler = shared_clone.async_handle().clone();
let rpc_server = handler.block_on(async move {
RpcServer::new(
rpc_config,
io_handler,
shared_clone.async_handle().clone().into_inner(),
shared_clone.notify_controller(),
)
.await
});
let rpc_server = handler.block_on(async move { RpcServer::new(rpc_config, io_handler).await });

let rpc_client = reqwest::blocking::Client::new();
let rpc_uri = format!(
Expand Down
10 changes: 1 addition & 9 deletions rpc/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,7 @@ fn setup(consensus: Consensus) -> RpcTestSuite {

let shared_clone = shared.clone();
let handler = shared_clone.async_handle().clone();
let rpc_server = handler.block_on(async move {
RpcServer::new(
rpc_config,
io_handler,
shared_clone.async_handle().clone().into_inner(),
shared_clone.notify_controller(),
)
.await
});
let rpc_server = handler.block_on(async move { RpcServer::new(rpc_config, io_handler).await });

let rpc_client = Client::new();
let rpc_uri = format!(
Expand Down
8 changes: 1 addition & 7 deletions util/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,13 +421,7 @@ impl Launcher {
builder.enable_subscription(shared.clone()).await;
let io_handler = builder.build();

RpcServer::new(
rpc_config,
io_handler,
self.async_handle.clone().into_inner(),
shared.notify_controller(),
)
.await;
RpcServer::new(rpc_config, io_handler).await;

network_controller
}
Expand Down

0 comments on commit 4d970f1

Please sign in to comment.