Skip to content

Commit

Permalink
fix clippy and test
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Sep 25, 2023
1 parent 2e8ad16 commit ed710c6
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 54 deletions.
10 changes: 4 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ ckb-build-info = { path = "util/build-info", version = "= 0.112.0-pre" }
[dependencies]
ckb-build-info = { path = "util/build-info", version = "= 0.112.0-pre" }
ckb-bin = { path = "ckb-bin", version = "= 0.112.0-pre" }
tokio = { version = "1", features = ["full", "tracing"] }
ckb-async-runtime = { path = "util/runtime", version = "= 0.112.0-pre" }


[dev-dependencies]

Expand Down
3 changes: 1 addition & 2 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ ckb-app-config = { path = "../util/app-config", version = "= 0.112.0-pre" }
ckb-constant = { path = "../util/constant", version = "= 0.112.0-pre" }
jsonrpc-core = "18.0"
serde_json = "1.0"
#jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] }
jsonrpc-utils = { git = "https://github.com/chenyukang/jsonrpc-utils.git", branch = "exit-signal", features = ["server", "macros", "axum"] }
jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] }
ckb-jsonrpc-types = { path = "../util/jsonrpc-types", version = "= 0.112.0-pre" }
ckb-verification = { path = "../verification", version = "= 0.112.0-pre" }
ckb-verification-traits = { path = "../verification/traits", version = "= 0.112.0-pre" }
Expand Down
3 changes: 1 addition & 2 deletions rpc/src/module/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,7 @@ pub trait ChainRpc {
since = "0.109.0",
note = "Please use the RPC method [`get_fee_rate_statistics`](#tymethod.get_fee_rate_statistics) instead"
)]
#[rpc(name = "deprecated.get_fee_rate_statics")]
#[rpc(name = "get_fee_rate_statics")]
fn get_fee_rate_statics(&self, target: Option<Uint64>) -> Result<Option<FeeRateStatistics>>;

/// Returns the fee_rate statistics of confirmed blocks on the chain
Expand Down Expand Up @@ -1619,7 +1619,6 @@ const DEFAULT_HEADER_VERBOSITY_LEVEL: u32 = 1;
const DEFAULT_GET_TRANSACTION_VERBOSITY_LEVEL: u32 = 2;

#[async_trait]

impl ChainRpc for ChainRpcImpl {
fn get_block(
&self,
Expand Down
1 change: 0 additions & 1 deletion rpc/src/module/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub trait DebugRpc {
pub(crate) struct DebugRpcImpl {}

#[async_trait]

impl DebugRpc for DebugRpcImpl {
fn jemalloc_profiling_dump(&self) -> Result<String> {
let timestamp = time::SystemTime::now()
Expand Down
1 change: 0 additions & 1 deletion rpc/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ pub(crate) use self::miner::MinerRpcImpl;
pub(crate) use self::net::NetRpcImpl;
pub(crate) use self::pool::PoolRpcImpl;
pub(crate) use self::stats::StatsRpcImpl;
//pub(crate) use self::subscription::SubscriptionSession;
pub(crate) use self::subscription::SubscriptionRpcImpl;
pub(crate) use self::test::IntegrationTestRpcImpl;

Expand Down
19 changes: 9 additions & 10 deletions rpc/src/module/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(missing_docs)]

use async_trait::async_trait;
use ckb_async_runtime::Handle;
use ckb_jsonrpc_types::Topic;
Expand All @@ -7,8 +9,6 @@ use jsonrpc_core::Result;
use jsonrpc_utils::{pub_sub::PublishMsg, rpc};
use tokio::sync::broadcast;

const SUBSCRIBER_NAME: &str = "TcpSubscription";

/// RPC Module Subscription that CKB node will push new messages to subscribers.
///
/// RPC subscriptions require a full duplex connection. CKB offers such connections in the form of
Expand Down Expand Up @@ -140,7 +140,7 @@ pub trait SubscriptionRpc {
/// {
/// "id": 42,
/// "jsonrpc": "2.0",
/// "result": "0xf3ec7c262bcd8f8656975a5fbf6571a5"
/// "result": "0xf3"
/// }
/// ```
/// Unsubscribe Request
Expand All @@ -150,8 +150,8 @@ pub trait SubscriptionRpc {
/// "jsonrpc": "2.0",
/// "method": "unsubscribe",
/// "params": [
/// "0xf3ec7c262bcd8f8656975a5fbf6571a5"
/// ]
/// "0xf3"
/// ]
/// }
///
///
Expand Down Expand Up @@ -189,18 +189,17 @@ impl SubscriptionRpc for SubscriptionRpcImpl {
Topic::RejectedTransaction => self.new_reject_transaction_sender.clone(),
};
Ok(Box::pin(async_stream::stream! {
loop {
match tx.clone().subscribe().recv().await {
Ok(msg) => yield msg,
Err(_) => break,
}
while let Ok(msg) = tx.clone().subscribe().recv().await {
yield msg;
}
}))
}
}

impl SubscriptionRpcImpl {
pub async fn new(notify_controller: NotifyController, handle: Handle) -> Self {
const SUBSCRIBER_NAME: &str = "TcpSubscription";

let mut new_block_receiver = notify_controller
.subscribe_new_block(SUBSCRIBER_NAME.to_string())
.await;
Expand Down
56 changes: 33 additions & 23 deletions rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ impl RpcServer {
) -> Self {
let rpc = Arc::new(io_handler);

let http_address = Self::start_server(rpc.clone(), config.listen_address.to_owned()).await;
let http_address = Self::start_server(&rpc, config.listen_address.to_owned()).await;
info!("Listen HTTP RPCServer on address {}", http_address);

let ws_address = if let Some(addr) = config.ws_listen_address {
let local_addr = Self::start_server(rpc.clone(), addr).await;
let local_addr = Self::start_server(&rpc, addr).await;
info!("Listen WebSocket RPCServer on address {}", local_addr);
Some(local_addr)
} else {
Expand All @@ -65,30 +65,37 @@ impl RpcServer {
.with_channel_size(4)
.with_keep_alive(true)
.with_keep_alive_duration(Duration::from_secs(60))
.with_pipeline_size(4)
.with_exit_signal(new_tokio_exit_rx());
.with_pipeline_size(4);

let exit_signal: CancellationToken = new_tokio_exit_rx();
tokio::select! {
_ = async {
while let Ok((stream, _)) = listener.accept().await {
let rpc = Arc::clone(&rpc);
let stream_config = stream_config.clone();
let codec = codec.clone();
tokio::spawn(async move {
let (r, w) = stream.into_split();
let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
let w = FramedWrite::new(w, codec).with(|msg| async move {
Ok::<_, LinesCodecError>(match msg {
StreamMsg::Str(msg) => msg,
_ => "".into(),
})
while let Ok((stream, _)) = listener.accept().await {
let rpc = Arc::clone(&rpc);
let stream_config = stream_config.clone();
let codec = codec.clone();
tokio::spawn(async move {
let (r, w) = stream.into_split();
let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
let w = FramedWrite::new(w, codec).with(|msg| async move {
Ok::<_, LinesCodecError>(match msg {
StreamMsg::Str(msg) => msg,
_ => "".into(),
})
});
tokio::pin!(w);
let exit_signal: CancellationToken = new_tokio_exit_rx();
tokio::select! {
result = serve_stream_sink(&rpc, w, r, stream_config) => {
if let Err(err) = result {
info!("TCP RPCServer error: {:?}", err);
}
}
_ = exit_signal.cancelled() => {}
}
});
tokio::pin!(w);
drop(serve_stream_sink(&rpc, w, r, stream_config).await);
});
}
} => {},
}
} => {},
_ = exit_signal.cancelled() => {
info!("TCP RPCServer stopped");
}
Expand All @@ -103,7 +110,10 @@ impl RpcServer {
}
}

async fn start_server(rpc: Arc<MetaIoHandler<Option<Session>>>, address: String) -> SocketAddr {
async fn start_server(
rpc: &Arc<MetaIoHandler<Option<Session>>>,
address: String,
) -> SocketAddr {
let stream_config = StreamServerConfig::default()
.with_channel_size(4)
.with_pipeline_size(4);
Expand All @@ -116,7 +126,7 @@ impl RpcServer {
let app = Router::new()
.route("/", method_router.clone())
.route("/*path", method_router)
.layer(Extension(Arc::clone(&rpc)))
.layer(Extension(Arc::clone(rpc)))
.layer(Extension(ws_config))
.layer(TimeoutLayer::new(Duration::from_secs(30)));

Expand Down
3 changes: 1 addition & 2 deletions rpc/src/service_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![allow(deprecated)]
//use crate::module::SubscriptionSession;
use crate::module::{
add_alert_rpc_methods, add_chain_rpc_methods, add_debug_rpc_methods,
add_experiment_rpc_methods, add_indexer_rpc_methods, add_integration_test_rpc_methods,
Expand Down Expand Up @@ -260,7 +259,7 @@ impl<'a> ServiceBuilder<'a> {
/// Builds the RPC methods handler used in the RPC server.
pub fn build(self) -> IoHandler {
let mut io_handler = self.io_handler;
io_handler.add_method("ping", |_| async move { Ok("pong".into()) });
io_handler.add_method("ping", |_| async { Ok("pong".into()) });
io_handler
}
}
Expand Down
1 change: 0 additions & 1 deletion rpc/src/tests/examples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ fn test_rpc_examples() {
println!("Test RPC Example {}", example.request);
around_rpc_example(&suite, example);
}
eprintln!("finished ....");
}

fn replace_rpc_response<T>(example: &RpcTestExample, response: &mut RpcTestResponse)
Expand Down
3 changes: 1 addition & 2 deletions util/launcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ tempfile.workspace = true
quote = "1.0.27"
async-trait = "0.1"
http-body = "0.4.5"
#jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] }
jsonrpc-utils = { git = "https://github.com/chenyukang/jsonrpc-utils.git", branch = "exit-signal", features = ["server", "macros", "axum"] }
jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] }

[dev-dependencies]
ckb-systemtime = {path = "../systemtime", version = "= 0.112.0-pre", features = ["enable_faketime"] }
Expand Down
1 change: 0 additions & 1 deletion util/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,6 @@ impl Launcher {
&self.args.config.db,
&self.args.config.indexer,
)
//.enable_subscription(shared.clone())
.enable_debug();
builder.enable_subscription(shared.clone()).await;
let io_handler = builder.build();
Expand Down

0 comments on commit ed710c6

Please sign in to comment.