From 26e68399fd142990c131bd81ddd9403f8379b04d Mon Sep 17 00:00:00 2001 From: yukang Date: Tue, 9 May 2023 20:09:51 +0800 Subject: [PATCH 01/31] move alert and chain to jsonrpc_utils --- Cargo.lock | 244 ++++++++++++++++++++++++++++++++- ckb-bin/src/subcommand/run.rs | 7 +- rpc/Cargo.toml | 7 + rpc/src/lib.rs | 5 +- rpc/src/module/alert.rs | 13 +- rpc/src/module/chain.rs | 101 +++++++------- rpc/src/module/mod.rs | 9 +- rpc/src/module/subscription.rs | 20 +-- rpc/src/module/test.rs | 40 ++++-- rpc/src/server.rs | 142 +++++++------------ rpc/src/service_builder.rs | 46 +++---- util/launcher/Cargo.toml | 11 ++ util/launcher/src/lib.rs | 78 ++++++----- 13 files changed, 484 insertions(+), 239 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 587c3196c2..2bbcc3093e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -162,6 +162,59 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fb79c228270dcf2426e74864cabc94babb5dbab01a4314e702d2f16540e1591" +dependencies = [ + "async-trait", + "axum-core", + "base64 0.21.5", + "bitflags 1.3.2", + "bytes 1.5.0", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", + "sync_wrapper", + "tokio", + "tokio-tungstenite", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes 1.5.0", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -177,6 +230,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.5" @@ -545,7 +604,7 @@ dependencies = [ name = "ckb-bin" version = "0.113.0-pre" dependencies = [ - "base64", + "base64 0.21.5", "ckb-app-config", "ckb-async-runtime", "ckb-build-info", @@ -867,12 +926,14 @@ dependencies = [ name = "ckb-launcher" version = "0.113.0-pre" dependencies = [ + "async-trait", "ckb-app-config", "ckb-async-runtime", "ckb-block-filter", "ckb-build-info", "ckb-chain", "ckb-channel", + "ckb-hash", "ckb-jsonrpc-types", "ckb-light-client-protocol-server", "ckb-logger", @@ -884,10 +945,17 @@ dependencies = [ "ckb-shared", "ckb-store", "ckb-sync", + "ckb-systemtime", "ckb-tx-pool", "ckb-types", "ckb-verification", "ckb-verification-traits", + "http-body", + "jsonrpc-utils", + "num_cpus", + "once_cell", + "quote", + "tempfile", ] [[package]] @@ -1043,7 +1111,7 @@ dependencies = [ name = "ckb-miner" version = "0.113.0-pre" dependencies = [ - "base64", + "base64 0.21.5", "ckb-app-config", "ckb-async-runtime", "ckb-channel", @@ -1250,6 +1318,8 @@ dependencies = [ name = "ckb-rpc" version = "0.113.0-pre" dependencies = [ + "async-trait", + "axum", "ckb-app-config", "ckb-chain", "ckb-chain-spec", @@ -1278,6 +1348,7 @@ dependencies = [ "ckb-util", "ckb-verification", "ckb-verification-traits", + "futures-util", "itertools 0.11.0", "jsonrpc-core", "jsonrpc-derive", @@ -1285,13 +1356,16 @@ dependencies = [ "jsonrpc-pubsub", "jsonrpc-server-utils", "jsonrpc-tcp-server", + "jsonrpc-utils", "jsonrpc-ws-server", "pretty_assertions", + "quote", "reqwest", "serde", "serde_json", "tempfile", "tokio", + "tokio-util 0.7.10", ] [[package]] @@ -2584,6 +2658,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.8.0" @@ -2934,6 +3014,36 @@ dependencies = [ "tower-service", ] +[[package]] +name = "jsonrpc-utils" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82ed5157e86d6cd293f7d72d6af1000e8b8dbd8e4dd4493d71d0ffb48226348" +dependencies = [ + "anyhow", + "axum", + "futures-core", + "futures-util", + "hex", + "jsonrpc-core", + "jsonrpc-utils-macros", + "pin-project-lite", + "rand 0.8.5", + "serde_json", + "tokio", +] + +[[package]] +name = "jsonrpc-utils-macros" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8727d2c8bb2833f22c4306879a5cf4cc4a8659170c211e9b523b86aab654f823" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "jsonrpc-ws-server" version = "18.0.0" @@ -3063,6 +3173,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + [[package]] name = "memchr" version = "2.6.4" @@ -3600,6 +3716,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -4014,7 +4150,7 @@ version = "0.11.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ - "base64", + "base64 0.21.5", "bytes 1.5.0", "encoding_rs", "futures-core", @@ -4141,6 +4277,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rustversion" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" + [[package]] name = "rusty-fork" version = "0.3.0" @@ -4372,6 +4514,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7f05c1d5476066defcdfacce1f52fc3cae3af1d3089727100c02ae92e5abbe0" +dependencies = [ + "serde", +] + [[package]] name = "serde_plain" version = "0.3.0" @@ -4550,7 +4701,7 @@ version = "9.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da7a2b3c2bc9693bcb40870c4e9b5bf0d79f9cb46273321bf855ec513e919082" dependencies = [ - "base64", + "base64 0.21.5", "digest 0.10.7", "hex", "miette", @@ -4601,6 +4752,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tempfile" version = "3.8.0" @@ -4878,6 +5035,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.6.10" @@ -4929,6 +5098,47 @@ dependencies = [ "serde", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" +dependencies = [ + "bitflags 1.3.2", + "bytes 1.5.0", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -4941,6 +5151,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-core", ] @@ -5005,6 +5216,25 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes 1.5.0", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -5111,6 +5341,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index fd6355af33..12dbc4b3da 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -1,6 +1,6 @@ use crate::helper::deadlock_detection; use ckb_app_config::{ExitCode, RunArgs}; -use ckb_async_runtime::Handle; +use ckb_async_runtime::{tokio, Handle}; use ckb_build_info::Version; use ckb_launcher::Launcher; use ckb_logger::info; @@ -45,12 +45,15 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), launcher.start_block_filter(&shared); - let (network_controller, _rpc_server) = launcher.start_network_and_rpc( + let rt = tokio::runtime::Runtime::new().unwrap(); + + let network_controller = launcher.start_network_and_rpc( &shared, chain_controller.clone(), miner_enable, pack.take_relay_tx_receiver(), ); + let network_controller = rt.block_on(network_controller); let tx_pool_builder = pack.take_tx_pool_builder(); tx_pool_builder.start(network_controller.clone()); diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index eaf36a7eed..3052cced23 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -45,6 +45,13 @@ ckb-pow = { path = "../pow", version = "= 0.113.0-pre" } ckb-indexer = { path = "../util/indexer", version = "= 0.113.0-pre" } itertools.workspace = true tokio = "1" +quote = "1.0.27" +async-trait = "0.1" +jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] } +axum = "0.6.1" +tokio-util = { version = "0.7.3", features = ["codec"] } +futures-util = { version = "0.3.21"} + [dev-dependencies] reqwest = { version = "=0.11.20", features = ["blocking", "json"] } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index ebbd2227e4..9dbb12bbb0 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -10,9 +10,12 @@ pub mod module; #[cfg(test)] mod tests; +use jsonrpc_core::MetaIoHandler; +use jsonrpc_utils::pub_sub::Session; + pub use crate::error::RPCError; pub use crate::server::RpcServer; pub use crate::service_builder::ServiceBuilder; #[doc(hidden)] -pub type IoHandler = jsonrpc_pubsub::PubSubHandler>; +pub type IoHandler = MetaIoHandler>; \ No newline at end of file diff --git a/rpc/src/module/alert.rs b/rpc/src/module/alert.rs index d1c6ce7762..a7ea069be6 100644 --- a/rpc/src/module/alert.rs +++ b/rpc/src/module/alert.rs @@ -6,8 +6,10 @@ use ckb_network_alert::{notifier::Notifier as AlertNotifier, verifier::Verifier use ckb_types::{packed, prelude::*}; use ckb_util::Mutex; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; use std::sync::Arc; +use jsonrpc_utils::rpc; +use async_trait::async_trait; + /// RPC Module Alert for network alerts. /// @@ -15,7 +17,8 @@ use std::sync::Arc; /// /// The alerts must be signed by 2-of-4 signatures, where the public keys are hard-coded in the source code /// and belong to early CKB developers. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait AlertRpc { /// Sends an alert. /// @@ -67,9 +70,10 @@ pub trait AlertRpc { /// } /// ``` #[rpc(name = "send_alert")] - fn send_alert(&self, alert: Alert) -> Result<()>; + async fn send_alert(&self, alert: Alert) -> Result<()>; } +#[derive(Clone)] pub(crate) struct AlertRpcImpl { network_controller: NetworkController, verifier: Arc, @@ -90,8 +94,9 @@ impl AlertRpcImpl { } } +#[async_trait] impl AlertRpc for AlertRpcImpl { - fn send_alert(&self, alert: Alert) -> Result<()> { + async fn send_alert(&self, alert: Alert) -> Result<()> { let alert: packed::Alert = alert.into(); let now_ms = ckb_systemtime::unix_time_as_millis(); let notice_until: u64 = alert.raw().notice_until().unpack(); diff --git a/rpc/src/module/chain.rs b/rpc/src/module/chain.rs index fdbab35d67..829c2be1c6 100644 --- a/rpc/src/module/chain.rs +++ b/rpc/src/module/chain.rs @@ -26,9 +26,10 @@ use ckb_types::{ use ckb_verification::ScriptVerifier; use ckb_verification::TxVerifyEnv; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; use std::collections::HashSet; use std::sync::Arc; +use jsonrpc_utils::rpc; +use async_trait::async_trait; /// RPC Module Chain for methods related to the canonical chain. /// @@ -52,7 +53,8 @@ use std::sync::Arc; /// * it is found as an output in any transaction in the [canonical chain](#canonical-chain), /// and /// * it is not found as an input in any transaction in the canonical chain. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait ChainRpc { /// Returns the information about a block by hash. /// @@ -178,7 +180,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block")] - fn get_block( + async fn get_block( &self, block_hash: H256, verbosity: Option, @@ -312,7 +314,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_by_number")] - fn get_block_by_number( + async fn get_block_by_number( &self, block_number: BlockNumber, verbosity: Option, @@ -391,7 +393,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_header")] - fn get_header( + async fn get_header( &self, block_hash: H256, verbosity: Option, @@ -472,7 +474,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_header_by_number")] - fn get_header_by_number( + async fn get_header_by_number( &self, block_number: BlockNumber, verbosity: Option, @@ -526,7 +528,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_filter")] - fn get_block_filter(&self, block_hash: H256) -> Result>; + async fn get_block_filter(&self, block_hash: H256) -> Result>; /// Returns the information about a transaction requested by transaction hash. /// @@ -648,7 +650,7 @@ pub trait ChainRpc { /// ``` /// #[rpc(name = "get_transaction")] - fn get_transaction( + async fn get_transaction( &self, tx_hash: H256, verbosity: Option, @@ -696,7 +698,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_hash")] - fn get_block_hash(&self, block_number: BlockNumber) -> Result>; + async fn get_block_hash(&self, block_number: BlockNumber) -> Result>; /// Returns the header with the highest block number in the [canonical chain](#canonical-chain). /// @@ -762,7 +764,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_tip_header")] - fn get_tip_header(&self, verbosity: Option) -> Result>; + async fn get_tip_header(&self, verbosity: Option) -> Result>; /// Returns the status of a cell. The RPC returns extra information if it is a [live cell](#live-cell). /// @@ -828,7 +830,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_live_cell")] - fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> Result; + async fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> Result; /// Returns the highest block number in the [canonical chain](#canonical-chain). /// @@ -858,7 +860,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_tip_block_number")] - fn get_tip_block_number(&self) -> Result; + async fn get_tip_block_number(&self) -> Result; /// Returns the epoch with the highest number in the [canonical chain](#canonical-chain). /// @@ -894,7 +896,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_current_epoch")] - fn get_current_epoch(&self) -> Result; + async fn get_current_epoch(&self) -> Result; /// Returns the epoch in the [canonical chain](#canonical-chain) with the specific epoch number. /// @@ -940,7 +942,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_epoch_by_number")] - fn get_epoch_by_number(&self, epoch_number: EpochNumber) -> Result>; + async fn get_epoch_by_number(&self, epoch_number: EpochNumber) -> Result>; /// Returns increased issuance, miner reward, and the total transaction fee of a block. /// @@ -1004,7 +1006,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_economic_state")] - fn get_block_economic_state(&self, block_hash: H256) -> Result>; + async fn get_block_economic_state(&self, block_hash: H256) -> Result>; /// Returns a Merkle proof that transactions are included in a block. /// @@ -1045,7 +1047,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_transaction_proof")] - fn get_transaction_proof( + async fn get_transaction_proof( &self, tx_hashes: Vec, block_hash: Option, @@ -1091,7 +1093,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "verify_transaction_proof")] - fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Result>; + async fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Result>; /// Returns a Merkle proof of transactions' witness included in a block. /// @@ -1137,7 +1139,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_transaction_and_witness_proof")] - fn get_transaction_and_witness_proof( + async fn get_transaction_and_witness_proof( &self, tx_hashes: Vec, block_hash: Option, @@ -1188,7 +1190,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "verify_transaction_and_witness_proof")] - fn verify_transaction_and_witness_proof( + async fn verify_transaction_and_witness_proof( &self, tx_proof: TransactionAndWitnessProof, ) -> Result>; @@ -1302,7 +1304,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_fork_block")] - fn get_fork_block( + async fn get_fork_block( &self, block_hash: H256, verbosity: Option, @@ -1396,7 +1398,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_consensus")] - fn get_consensus(&self) -> Result; + async fn get_consensus(&self) -> Result; /// Returns the past median time by block hash. /// @@ -1436,7 +1438,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_median_time")] - fn get_block_median_time(&self, block_hash: H256) -> Result>; + async fn get_block_median_time(&self, block_hash: H256) -> Result>; /// `estimate_cycles` run a transaction and return the execution consumed cycles. /// @@ -1515,7 +1517,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "estimate_cycles")] - fn estimate_cycles(&self, tx: Transaction) -> Result; + async fn estimate_cycles(&self, tx: Transaction) -> Result; /// Returns the fee_rate statistics of confirmed blocks on the chain /// @@ -1561,7 +1563,7 @@ pub trait ChainRpc { note = "Please use the RPC method [`get_fee_rate_statistics`](#tymethod.get_fee_rate_statistics) instead" )] #[rpc(name = "get_fee_rate_statics")] - fn get_fee_rate_statics(&self, target: Option) -> Result>; + async fn get_fee_rate_statics(&self, target: Option) -> Result>; /// Returns the fee_rate statistics of confirmed blocks on the chain /// @@ -1603,9 +1605,10 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_fee_rate_statistics")] - fn get_fee_rate_statistics(&self, target: Option) -> Result>; + async fn get_fee_rate_statistics(&self, target: Option) -> Result>; } +#[derive(Clone)] pub(crate) struct ChainRpcImpl { pub shared: Shared, } @@ -1614,8 +1617,10 @@ const DEFAULT_BLOCK_VERBOSITY_LEVEL: u32 = 2; const DEFAULT_HEADER_VERBOSITY_LEVEL: u32 = 1; const DEFAULT_GET_TRANSACTION_VERBOSITY_LEVEL: u32 = 2; +#[async_trait] + impl ChainRpc for ChainRpcImpl { - fn get_block( + async fn get_block( &self, block_hash: H256, verbosity: Option, @@ -1627,7 +1632,7 @@ impl ChainRpc for ChainRpcImpl { self.get_block_by_hash(&snapshot, &block_hash, verbosity, with_cycles) } - fn get_block_by_number( + async fn get_block_by_number( &self, block_number: BlockNumber, verbosity: Option, @@ -1653,7 +1658,7 @@ impl ChainRpc for ChainRpcImpl { ret } - fn get_header( + async fn get_header( &self, block_hash: H256, verbosity: Option, @@ -1680,7 +1685,7 @@ impl ChainRpc for ChainRpcImpl { } } - fn get_header_by_number( + async fn get_header_by_number( &self, block_number: BlockNumber, verbosity: Option, @@ -1715,7 +1720,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn get_block_filter(&self, block_hash: H256) -> Result> { + async fn get_block_filter(&self, block_hash: H256) -> Result> { let store = self.shared.store(); let block_hash = block_hash.pack(); if !store.is_main_chain(&block_hash) { @@ -1732,7 +1737,7 @@ impl ChainRpc for ChainRpcImpl { })) } - fn get_transaction( + async fn get_transaction( &self, tx_hash: H256, verbosity: Option, @@ -1765,7 +1770,7 @@ impl ChainRpc for ChainRpcImpl { } } - fn get_block_hash(&self, block_number: BlockNumber) -> Result> { + async fn get_block_hash(&self, block_number: BlockNumber) -> Result> { Ok(self .shared .snapshot() @@ -1773,7 +1778,7 @@ impl ChainRpc for ChainRpcImpl { .map(|h| h.unpack())) } - fn get_tip_header(&self, verbosity: Option) -> Result> { + async fn get_tip_header(&self, verbosity: Option) -> Result> { let verbosity = verbosity .map(|v| v.value()) .unwrap_or(DEFAULT_HEADER_VERBOSITY_LEVEL); @@ -1790,13 +1795,13 @@ impl ChainRpc for ChainRpcImpl { } } - fn get_current_epoch(&self) -> Result { + async fn get_current_epoch(&self) -> Result { Ok(EpochView::from_ext( self.shared.snapshot().epoch_ext().pack(), )) } - fn get_epoch_by_number(&self, epoch_number: EpochNumber) -> Result> { + async fn get_epoch_by_number(&self, epoch_number: EpochNumber) -> Result> { let snapshot = self.shared.snapshot(); Ok(snapshot .get_epoch_index(epoch_number.into()) @@ -1807,7 +1812,7 @@ impl ChainRpc for ChainRpcImpl { })) } - fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> Result { + async fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> Result { let cell_status = self .shared .snapshot() @@ -1816,11 +1821,11 @@ impl ChainRpc for ChainRpcImpl { Ok(cell_status.into()) } - fn get_tip_block_number(&self) -> Result { + async fn get_tip_block_number(&self) -> Result { Ok(self.shared.snapshot().tip_header().number().into()) } - fn get_block_economic_state(&self, block_hash: H256) -> Result> { + async fn get_block_economic_state(&self, block_hash: H256) -> Result> { let snapshot = self.shared.snapshot(); let block_number = if let Some(block_number) = snapshot.get_block_number(&block_hash.pack()) @@ -1888,7 +1893,7 @@ impl ChainRpc for ChainRpcImpl { })) } - fn get_transaction_proof( + async fn get_transaction_proof( &self, tx_hashes: Vec, block_hash: Option, @@ -1910,7 +1915,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Result> { + async fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Result> { let snapshot = self.shared.snapshot(); snapshot @@ -1953,7 +1958,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn get_transaction_and_witness_proof( + async fn get_transaction_and_witness_proof( &self, tx_hashes: Vec, block_hash: Option, @@ -1977,7 +1982,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn verify_transaction_and_witness_proof( + async fn verify_transaction_and_witness_proof( &self, tx_proof: TransactionAndWitnessProof, ) -> Result> { @@ -2049,7 +2054,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn get_fork_block( + async fn get_fork_block( &self, block_hash: H256, verbosity: Option, @@ -2077,12 +2082,12 @@ impl ChainRpc for ChainRpcImpl { } } - fn get_consensus(&self) -> Result { + async fn get_consensus(&self) -> Result { let consensus = self.shared.consensus().clone(); Ok(consensus.into()) } - fn get_block_median_time(&self, block_hash: H256) -> Result> { + async fn get_block_median_time(&self, block_hash: H256) -> Result> { let block_hash = block_hash.pack(); let snapshot = self.shared.snapshot(); if !snapshot.is_main_chain(&block_hash) { @@ -2096,17 +2101,17 @@ impl ChainRpc for ChainRpcImpl { Ok(Some(median_time.into())) } - fn estimate_cycles(&self, tx: Transaction) -> Result { + async fn estimate_cycles(&self, tx: Transaction) -> Result { let tx: packed::Transaction = tx.into(); CyclesEstimator::new(&self.shared).run(tx) } - fn get_fee_rate_statics(&self, target: Option) -> Result> { + async fn get_fee_rate_statics(&self, target: Option) -> Result> { Ok(FeeRateCollector::new(self.shared.snapshot().as_ref()) .statistics(target.map(Into::into))) } - fn get_fee_rate_statistics(&self, target: Option) -> Result> { + async fn get_fee_rate_statistics(&self, target: Option) -> Result> { Ok(FeeRateCollector::new(self.shared.snapshot().as_ref()) .statistics(target.map(Into::into))) } diff --git a/rpc/src/module/mod.rs b/rpc/src/module/mod.rs index e55ac1bde0..fb40cbb187 100644 --- a/rpc/src/module/mod.rs +++ b/rpc/src/module/mod.rs @@ -119,7 +119,7 @@ mod miner; mod net; pub(crate) mod pool; mod stats; -mod subscription; +//mod subscription; mod test; pub(crate) use self::alert::AlertRpcImpl; @@ -131,10 +131,12 @@ pub(crate) use self::miner::MinerRpcImpl; pub(crate) use self::net::NetRpcImpl; pub(crate) use self::pool::PoolRpcImpl; pub(crate) use self::stats::StatsRpcImpl; -pub(crate) use self::subscription::{SubscriptionRpcImpl, SubscriptionSession}; +//pub(crate) use self::subscription::SubscriptionSession; pub(crate) use self::test::IntegrationTestRpcImpl; +pub use self::alert::add_alert_rpc_methods; pub use self::alert::AlertRpc; +pub use self::chain::add_chain_rpc_methods; pub use self::chain::ChainRpc; pub use self::debug::DebugRpc; pub use self::experiment::ExperimentRpc; @@ -143,5 +145,6 @@ pub use self::miner::MinerRpc; pub use self::net::NetRpc; pub use self::pool::PoolRpc; pub use self::stats::StatsRpc; -pub use self::subscription::SubscriptionRpc; +pub use self::test::add_integration_test_rpc_methods; +//pub use self::subscription::SubscriptionRpc; pub use self::test::IntegrationTestRpc; diff --git a/rpc/src/module/subscription.rs b/rpc/src/module/subscription.rs index c5ca5e5160..84d1c62213 100644 --- a/rpc/src/module/subscription.rs +++ b/rpc/src/module/subscription.rs @@ -1,12 +1,17 @@ +use async_trait::async_trait; use ckb_jsonrpc_types::Topic; use ckb_notify::NotifyController; use jsonrpc_core::{Metadata, Result}; -use jsonrpc_derive::rpc; +//use jsonrpc_derive::rpc; use jsonrpc_pubsub::{ typed::{Sink, Subscriber}, - PubSubMetadata, Session, SubscriptionId, + SubscriptionId, }; +use jsonrpc_utils::rpc; + +use jsonrpc_utils::{axum_utils::handle_jsonrpc, pub_sub::Session}; + use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; @@ -18,7 +23,7 @@ use tokio::runtime::Handle; const SUBSCRIBER_NAME: &str = "TcpSubscription"; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct SubscriptionSession { pub(crate) subscription_ids: Arc>>, pub(crate) session: Arc, @@ -35,12 +40,6 @@ impl SubscriptionSession { impl Metadata for SubscriptionSession {} -impl PubSubMetadata for SubscriptionSession { - fn session(&self) -> Option> { - Some(Arc::clone(&self.session)) - } -} - /// RPC Module Subscription that CKB node will push new messages to subscribers. /// /// RPC subscriptions require a full duplex connection. CKB offers such connections in the form of @@ -78,7 +77,8 @@ impl PubSubMetadata for SubscriptionSession { /// socket.send(`{"id": 2, "jsonrpc": "2.0", "method": "unsubscribe", "params": ["0x0"]}`) /// ``` #[allow(clippy::needless_return)] -#[rpc(server)] +#[rpc] +#[async_trait] pub trait SubscriptionRpc { /// Context to implement the subscription RPC. type Metadata; diff --git a/rpc/src/module/test.rs b/rpc/src/module/test.rs index d56c85239f..85fbc0340d 100644 --- a/rpc/src/module/test.rs +++ b/rpc/src/module/test.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_chain::chain::ChainController; use ckb_dao::DaoCalculator; use ckb_jsonrpc_types::{Block, BlockTemplate, Byte32, EpochNumberWithFraction, Transaction}; @@ -20,12 +21,13 @@ use ckb_types::{ }; use ckb_verification_traits::Switch; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::collections::HashSet; use std::sync::Arc; /// RPC for Integration Test. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait IntegrationTestRpc { /// process block without any block verification. /// @@ -104,7 +106,11 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "process_block_without_verify")] - fn process_block_without_verify(&self, data: Block, broadcast: bool) -> Result>; + async fn process_block_without_verify( + &self, + data: Block, + broadcast: bool, + ) -> Result>; /// Truncate chain to specified tip hash, can only truncate less then 50000 blocks each time. /// @@ -137,7 +143,7 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "truncate")] - fn truncate(&self, target_tip_hash: H256) -> Result<()>; + async fn truncate(&self, target_tip_hash: H256) -> Result<()>; /// Generate block(with verification) and broadcast the block. /// @@ -166,7 +172,7 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "generate_block")] - fn generate_block(&self) -> Result; + async fn generate_block(&self) -> Result; /// Generate epochs during development, can be useful for scenarios /// like testing DAO-related functionalities. @@ -287,7 +293,7 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "notify_transaction")] - fn notify_transaction(&self, transaction: Transaction) -> Result; + async fn notify_transaction(&self, transaction: Transaction) -> Result; /// Generate block with block template, attach calculated dao field to build new block, /// @@ -392,7 +398,7 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "generate_block_with_template")] - fn generate_block_with_template(&self, block_template: BlockTemplate) -> Result; + async fn generate_block_with_template(&self, block_template: BlockTemplate) -> Result; /// Return calculated dao field according to specified block template. /// @@ -495,17 +501,23 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "calculate_dao_field")] - fn calculate_dao_field(&self, block_template: BlockTemplate) -> Result; + async fn calculate_dao_field(&self, block_template: BlockTemplate) -> Result; } +#[derive(Clone)] pub(crate) struct IntegrationTestRpcImpl { pub network_controller: NetworkController, pub shared: Shared, pub chain: ChainController, } +#[async_trait] impl IntegrationTestRpc for IntegrationTestRpcImpl { - fn process_block_without_verify(&self, data: Block, broadcast: bool) -> Result> { + async fn process_block_without_verify( + &self, + data: Block, + broadcast: bool, + ) -> Result> { let block: packed::Block = data.into(); let block: Arc = Arc::new(block.into_view()); let ret = self @@ -530,7 +542,7 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { } } - fn truncate(&self, target_tip_hash: H256) -> Result<()> { + async fn truncate(&self, target_tip_hash: H256) -> Result<()> { let header = { let snapshot = self.shared.snapshot(); let header = snapshot @@ -562,7 +574,7 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { Ok(()) } - fn generate_block(&self) -> Result { + async fn generate_block(&self) -> Result { let tx_pool = self.shared.tx_pool_controller(); let block_template = tx_pool .get_block_template(None, None, None) @@ -611,8 +623,8 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { Ok(tx_hash.unpack()) } - fn generate_block_with_template(&self, block_template: BlockTemplate) -> Result { - let dao_field = self.calculate_dao_field(block_template.clone())?; + async fn generate_block_with_template(&self, block_template: BlockTemplate) -> Result { + let dao_field = self.calculate_dao_field(block_template.clone()).await?; let mut update_dao_template = block_template; update_dao_template.dao = dao_field; @@ -620,7 +632,7 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { self.process_and_announce_block(block) } - fn calculate_dao_field(&self, block_template: BlockTemplate) -> Result { + async fn calculate_dao_field(&self, block_template: BlockTemplate) -> Result { let snapshot: &Snapshot = &self.shared.snapshot(); let consensus = snapshot.consensus(); let parent_header = snapshot diff --git a/rpc/src/server.rs b/rpc/src/server.rs index c778f961c3..775c204647 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -1,20 +1,17 @@ -use crate::module::{SubscriptionRpc, SubscriptionRpcImpl, SubscriptionSession}; use crate::IoHandler; use ckb_app_config::RpcConfig; -use ckb_logger::info; use ckb_notify::NotifyController; -use jsonrpc_pubsub::Session; -use jsonrpc_server_utils::cors::AccessControlAllowOrigin; -use jsonrpc_server_utils::hosts::DomainsValidation; -use std::net::{SocketAddr, ToSocketAddrs}; +use futures_util::{SinkExt, TryStreamExt}; +use jsonrpc_core::MetaIoHandler; +use jsonrpc_utils::axum_utils::jsonrpc_router; +use jsonrpc_utils::stream::{serve_stream_sink, StreamMsg, StreamServerConfig}; +use std::sync::Arc; +use tokio::net::TcpListener; use tokio::runtime::Handle; +use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError}; #[doc(hidden)] -pub struct RpcServer { - pub(crate) http: jsonrpc_http_server::Server, - pub(crate) _tcp: Option, - pub(crate) _ws: Option, -} +pub struct RpcServer {} impl RpcServer { /// Creates an RPC server. @@ -24,95 +21,56 @@ impl RpcServer { /// * `config` - RPC config options. /// * `io_handler` - RPC methods handler. See [ServiceBuilder](../service_builder/struct.ServiceBuilder.html). /// * `notify_controller` - Controller emitting notifications. - pub fn new( + pub async fn start_jsonrpc_server( config: RpcConfig, io_handler: IoHandler, notify_controller: &NotifyController, handle: Handle, - ) -> RpcServer { - let http = jsonrpc_http_server::ServerBuilder::new(io_handler.clone()) - .cors(DomainsValidation::AllowOnly(vec![ - AccessControlAllowOrigin::Null, - AccessControlAllowOrigin::Any, - ])) - .event_loop_executor(handle.clone()) - .max_request_body_size(config.max_request_body_size) - .health_api(("/ping", "ping")) - .start_http( - &config - .listen_address - .to_socket_addrs() - .expect("config listen_address parsed") - .next() - .expect("config listen_address parsed"), - ) - .expect("Start Jsonrpc HTTP service"); - info!( - "Listen HTTP RPC server on address {}", - config.listen_address - ); - - let _tcp = config - .tcp_listen_address - .as_ref() - .map(|tcp_listen_address| { - let subscription_rpc_impl = - SubscriptionRpcImpl::new(notify_controller.clone(), handle.clone()); - let mut handler = io_handler.clone(); - if config.subscription_enable() { - handler.extend_with(subscription_rpc_impl.to_delegate()); - } - let tcp_server = jsonrpc_tcp_server::ServerBuilder::with_meta_extractor( - handler, - |context: &jsonrpc_tcp_server::RequestContext| { - Some(SubscriptionSession::new(Session::new( - context.sender.clone(), - ))) - }, - ) - .start( - &tcp_listen_address - .to_socket_addrs() - .expect("config tcp_listen_address parsed") - .next() - .expect("config tcp_listen_address parsed"), - ) - .expect("Start Jsonrpc TCP service"); - info!("Listen TCP RPC server on address {}", tcp_listen_address); - - tcp_server - }); + ) -> Result<(), String> { + let rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2); - let _ws = config.ws_listen_address.as_ref().map(|ws_listen_address| { - let subscription_rpc_impl = SubscriptionRpcImpl::new(notify_controller.clone(), handle); - let mut handler = io_handler.clone(); - if config.subscription_enable() { - handler.extend_with(subscription_rpc_impl.to_delegate()); - } - let ws_server = jsonrpc_ws_server::ServerBuilder::with_meta_extractor( - handler, - |context: &jsonrpc_ws_server::RequestContext| { - Some(SubscriptionSession::new(Session::new(context.sender()))) - }, - ) - .start( - &ws_listen_address - .to_socket_addrs() - .expect("config ws_listen_address parsed") - .next() - .expect("config ws_listen_address parsed"), - ) - .expect("Start Jsonrpc WebSocket service"); - info!("Listen WS RPC server on address {}", ws_listen_address); + let rpc = Arc::new(rpc); + let stream_config = StreamServerConfig::default() + .with_channel_size(4) + .with_pipeline_size(4); - ws_server + // HTTP and WS server. + let ws_config = stream_config.clone().with_keep_alive(true); + let app = jsonrpc_router("/", rpc.clone(), ws_config); + // You can use additional tower-http middlewares to add e.g. CORS. + let http = tokio::spawn(async move { + axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); }); - RpcServer { http, _tcp, _ws } - } + // TCP server. + + // TCP server with line delimited json codec. + // + // You can also use other transports (e.g. TLS, unix socket) and codecs + // (e.g. netstring, JSON splitter). + let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap(); + let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024); + while let Ok((s, _)) = listener.accept().await { + let rpc = rpc.clone(); + let stream_config = stream_config.clone(); + let codec = codec.clone(); + tokio::spawn(async move { + let (r, w) = s.into_split(); + let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str); + let w = FramedWrite::new(w, codec).with(|msg| async move { + Ok::<_, LinesCodecError>(match msg { + StreamMsg::Str(msg) => msg, + _ => "".into(), + }) + }); + tokio::pin!(w); + drop(serve_stream_sink(&rpc, w, r, stream_config).await); + }); + } - /// Gets the HTTP RPC endpoint. - pub fn http_address(&self) -> &SocketAddr { - self.http.address() + Ok(()) } } diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index 19372ce8a0..749d45014e 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -1,10 +1,11 @@ #![allow(deprecated)] use crate::error::RPCError; -use crate::module::SubscriptionSession; +//use crate::module::SubscriptionSession; use crate::module::{ - AlertRpc, AlertRpcImpl, ChainRpc, ChainRpcImpl, DebugRpc, DebugRpcImpl, ExperimentRpc, - ExperimentRpcImpl, IndexerRpc, IndexerRpcImpl, IntegrationTestRpc, IntegrationTestRpcImpl, - MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, PoolRpcImpl, StatsRpc, StatsRpcImpl, + add_alert_rpc_methods, add_chain_rpc_methods, add_integration_test_rpc_methods, AlertRpcImpl, + ChainRpcImpl, DebugRpc, DebugRpcImpl, ExperimentRpc, ExperimentRpcImpl, IndexerRpc, + IndexerRpcImpl, IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, + PoolRpcImpl, StatsRpc, StatsRpcImpl, }; use crate::IoHandler; use ckb_app_config::{DBConfig, IndexerConfig, RpcConfig}; @@ -17,6 +18,7 @@ use ckb_shared::shared::Shared; use ckb_sync::SyncShared; use ckb_types::packed::Script; use ckb_util::Mutex; +use jsonrpc_core::MetaIoHandler; use jsonrpc_core::RemoteProcedure; use std::sync::Arc; @@ -26,6 +28,7 @@ const DEPRECATED_RPC_PREFIX: &str = "deprecated."; pub struct ServiceBuilder<'a> { config: &'a RpcConfig, io_handler: IoHandler, + rpc_hander: MetaIoHandler>, } impl<'a> ServiceBuilder<'a> { @@ -34,20 +37,20 @@ impl<'a> ServiceBuilder<'a> { Self { config, io_handler: IoHandler::default(), + rpc_hander: MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), } } /// Mounts methods from module Chain if it is enabled in the config. pub fn enable_chain(mut self, shared: Shared) -> Self { - let rpc_methods = ChainRpcImpl { shared }.to_delegate(); if self.config.chain_enable() { - self.add_methods(rpc_methods); - } else { - self.update_disabled_methods("Chain", rpc_methods); + let methods = ChainRpcImpl { shared }; + add_chain_rpc_methods(&mut self.rpc_hander, methods); } self } + /* /// Mounts methods from module Pool if it is enabled in the config. pub fn enable_pool( mut self, @@ -147,24 +150,21 @@ impl<'a> ServiceBuilder<'a> { network_controller: NetworkController, chain: ChainController, ) -> Self { - let rpc_methods = IntegrationTestRpcImpl { - shared: shared.clone(), - network_controller, - chain, - } - .to_delegate(); - if self.config.integration_test_enable() { // IntegrationTest only on Dummy PoW chain + /* assert_eq!( shared.consensus().pow, Pow::Dummy, "Only run integration test on Dummy PoW chain" ); - - self.add_methods(rpc_methods); - } else { - self.update_disabled_methods("IntegrationTest", rpc_methods); + */ + let methods = IntegrationTestRpcImpl { + shared: shared.clone(), + network_controller, + chain, + }; + add_integration_test_rpc_methods(&mut self.rpc_hander, methods); } self } @@ -176,12 +176,9 @@ impl<'a> ServiceBuilder<'a> { alert_notifier: Arc>, network_controller: NetworkController, ) -> Self { - let rpc_methods = - AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller).to_delegate(); if self.config.alert_enable() { - self.add_methods(rpc_methods); - } else { - self.update_disabled_methods("Alert", rpc_methods); + let methods = AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller); + add_alert_rpc_methods(&mut self.rpc_hander, methods); } self } @@ -252,6 +249,7 @@ impl<'a> ServiceBuilder<'a> { } })); } + */ /// Builds the RPC methods handler used in the RPC server. pub fn build(self) -> IoHandler { diff --git a/util/launcher/Cargo.toml b/util/launcher/Cargo.toml index 2c4d2b7dba..3c1a932785 100644 --- a/util/launcher/Cargo.toml +++ b/util/launcher/Cargo.toml @@ -32,6 +32,17 @@ ckb-channel = { path = "../channel", version = "= 0.113.0-pre" } ckb-tx-pool = { path = "../../tx-pool", version = "= 0.113.0-pre" } ckb-light-client-protocol-server = { path = "../light-client-protocol-server", version = "= 0.113.0-pre" } ckb-block-filter = { path = "../../block-filter", version = "= 0.113.0-pre" } +ckb-hash = { path = "../hash", version = "= 0.113.0-pre" } +num_cpus = "1.10" +once_cell = "1.8.0" +tempfile.workspace = true +quote = "1.0.27" +async-trait = "0.1" +http-body = "0.4.5" +jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] } + +[dev-dependencies] +ckb-systemtime = {path = "../systemtime", version = "= 0.113.0-pre", features = ["enable_faketime"] } [features] with_sentry = [ "ckb-sync/with_sentry", "ckb-network/with_sentry", "ckb-app-config/with_sentry" ] diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index c6ff62f5d7..e804d8d721 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -30,6 +30,9 @@ use ckb_tx_pool::service::TxVerificationResult; use ckb_types::prelude::*; use ckb_verification::GenesisVerifier; use ckb_verification_traits::Verifier; +use jsonrpc_utils::{ + axum_utils::jsonrpc_router, pub_sub::PublishMsg, rpc, stream::StreamServerConfig, +}; use std::sync::Arc; const SECP256K1_BLAKE160_SIGHASH_ALL_ARG_LEN: usize = 20; @@ -253,13 +256,13 @@ impl Launcher { } /// Start network service and rpc serve - pub fn start_network_and_rpc( + pub async fn start_network_and_rpc( &self, shared: &Shared, chain_controller: ChainController, miner_enable: bool, relay_tx_receiver: Receiver, - ) -> (NetworkController, RpcServer) { + ) -> NetworkController { let sync_shared = Arc::new(SyncShared::with_tmpdir( shared.clone(), self.args.config.network.sync.clone(), @@ -375,47 +378,48 @@ impl Launcher { .expect("Start network service failed"); let rpc_config = self.adjust_rpc_config(); - let builder = ServiceBuilder::new(&rpc_config) - .enable_chain(shared.clone()) - .enable_pool( - shared.clone(), - rpc_config - .extra_well_known_lock_scripts - .iter() - .map(|script| script.clone().into()) - .collect(), - rpc_config - .extra_well_known_type_scripts - .iter() - .map(|script| script.clone().into()) - .collect(), - ) - .enable_miner( - shared.clone(), - network_controller.clone(), - chain_controller.clone(), - miner_enable, - ) - .enable_net(network_controller.clone(), sync_shared) - .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) - .enable_experiment(shared.clone()) - .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) - .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) - .enable_indexer( - shared.clone(), - &self.args.config.db, - &self.args.config.indexer, - ) - .enable_debug(); + let builder = ServiceBuilder::new(&rpc_config).enable_chain(shared.clone()); + // .enable_pool( + // shared.clone(), + // rpc_config + // .extra_well_known_lock_scripts + // .iter() + // .map(|script| script.clone().into()) + // .collect(), + // rpc_config + // .extra_well_known_type_scripts + // .iter() + // .map(|script| script.clone().into()) + // .collect(), + // ) + // .enable_miner( + // shared.clone(), + // network_controller.clone(), + // chain_controller.clone(), + // miner_enable, + // ) + // .enable_net(network_controller.clone(), sync_shared) + // .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) + // .enable_experiment(shared.clone()) + // .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) + // .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) + // .enable_indexer( + // shared.clone(), + // &self.args.config.db, + // &self.args.config.indexer, + // ) + //.enable_debug(); let io_handler = builder.build(); - let rpc_server = RpcServer::new( + RpcServer::start_jsonrpc_server( rpc_config.clone(), io_handler, shared.notify_controller(), self.async_handle.clone().into_inner(), - ); + ) + .await + .expect("Start rpc server failed"); - (network_controller, rpc_server) + network_controller } } From c3eaec2f1e2895b0069f1056ac471f96b99d6cb9 Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 20 Sep 2023 04:55:04 +0800 Subject: [PATCH 02/31] make the ping router works --- ckb-bin/src/lib.rs | 7 +++-- ckb-bin/src/subcommand/run.rs | 49 ++++++++++++++++++++++++----------- rpc/src/module/alert.rs | 12 ++++++--- rpc/src/module/test.rs | 2 +- rpc/src/server.rs | 13 +++++----- rpc/src/service_builder.rs | 12 +++++---- src/main.rs | 4 ++- util/launcher/src/lib.rs | 14 +++++----- 8 files changed, 70 insertions(+), 43 deletions(-) diff --git a/ckb-bin/src/lib.rs b/ckb-bin/src/lib.rs index 825a95b097..efbdb5e482 100644 --- a/ckb-bin/src/lib.rs +++ b/ckb-bin/src/lib.rs @@ -6,7 +6,7 @@ mod setup_guard; mod subcommand; use ckb_app_config::{cli, ExitCode, Setup}; -use ckb_async_runtime::new_global_runtime; +use ckb_async_runtime::Handle; use ckb_build_info::Version; use ckb_logger::info; use ckb_network::tokio; @@ -25,7 +25,7 @@ pub(crate) const LOG_TARGET_SENTRY: &str = "sentry"; /// /// * `version` - The version is passed in so the bin crate can collect the version without trigger /// re-linking. -pub fn run_app(version: Version) -> Result<(), ExitCode> { +pub async fn run_app(version: Version, mut handle: Handle) -> Result<(), ExitCode> { // Always print backtrace on panic. ::std::env::set_var("RUST_BACKTRACE", "full"); @@ -58,14 +58,13 @@ pub fn run_app(version: Version) -> Result<(), ExitCode> { .expect("SubcommandRequiredElseHelp"); let is_silent_logging = is_silent_logging(cmd); - let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(); let setup = Setup::from_matches(bin_name, cmd, matches)?; let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?; raise_fd_limit(); let ret = match cmd { - cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()), + cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()).await, cli::CMD_MINER => subcommand::miner(setup.miner(matches)?, handle.clone()), cli::CMD_REPLAY => subcommand::replay(setup.replay(matches)?, handle.clone()), cli::CMD_EXPORT => subcommand::export(setup.export(matches)?, handle.clone()), diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 12dbc4b3da..1b5bc48a97 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -1,6 +1,9 @@ use crate::helper::deadlock_detection; use ckb_app_config::{ExitCode, RunArgs}; -use ckb_async_runtime::{tokio, Handle}; +use ckb_async_runtime::{ + tokio::{self, spawn}, + Handle, +}; use ckb_build_info::Version; use ckb_launcher::Launcher; use ckb_logger::info; @@ -8,11 +11,20 @@ use ckb_stop_handler::{broadcast_exit_signals, wait_all_ckb_services_exit}; use ckb_types::core::cell::setup_system_cell_cache; -pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { +// pub fn run( +// args: RunArgs, +// version: Version, +// async_handle: Handle, +// runtime: Runtime, +// ) -> Result<(), ExitCode> { +// //runtime.spawn_blocking(|| run_inner(args, version, async_handle)); +// runtime.block_on(run_inner(args, version, async_handle)) +// } + +pub async fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { deadlock_detection(); info!("ckb version: {}", version); - let mut launcher = Launcher::new(args, version, async_handle); let block_assembler_config = launcher.sanitize_block_assembler_config()?; @@ -45,18 +57,25 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), launcher.start_block_filter(&shared); - let rt = tokio::runtime::Runtime::new().unwrap(); - - let network_controller = launcher.start_network_and_rpc( - &shared, - chain_controller.clone(), - miner_enable, - pack.take_relay_tx_receiver(), - ); - let network_controller = rt.block_on(network_controller); - - let tx_pool_builder = pack.take_tx_pool_builder(); - tx_pool_builder.start(network_controller.clone()); + let rpc_task = spawn(async move { + let network_controller = launcher + .start_network_and_rpc( + &shared, + chain_controller.clone(), + miner_enable, + pack.take_relay_tx_receiver(), + ) + .await; + eprintln!("network_controller begin to run ...."); + eprintln!("end network_controller run ...."); + + let tx_pool_builder = pack.take_tx_pool_builder(); + tx_pool_builder.start(network_controller.clone()); + }); + + tokio::select! { + _ = rpc_task => {}, + }; ctrlc::set_handler(|| { info!("Trapped exit signal, exiting..."); diff --git a/rpc/src/module/alert.rs b/rpc/src/module/alert.rs index a7ea069be6..a01ece7707 100644 --- a/rpc/src/module/alert.rs +++ b/rpc/src/module/alert.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_jsonrpc_types::Alert; use ckb_logger::error; use ckb_network::{NetworkController, SupportProtocols}; @@ -6,10 +7,8 @@ use ckb_network_alert::{notifier::Notifier as AlertNotifier, verifier::Verifier use ckb_types::{packed, prelude::*}; use ckb_util::Mutex; use jsonrpc_core::Result; -use std::sync::Arc; use jsonrpc_utils::rpc; -use async_trait::async_trait; - +use std::sync::Arc; /// RPC Module Alert for network alerts. /// @@ -71,6 +70,9 @@ pub trait AlertRpc { /// ``` #[rpc(name = "send_alert")] async fn send_alert(&self, alert: Alert) -> Result<()>; + + #[rpc(name = "hello")] + async fn hello(&self) -> Result; } #[derive(Clone)] @@ -126,4 +128,8 @@ impl AlertRpc for AlertRpcImpl { )), } } + + async fn hello(&self) -> Result { + Ok(format!("Hello, Yukang!")) + } } diff --git a/rpc/src/module/test.rs b/rpc/src/module/test.rs index 85fbc0340d..5390585ea8 100644 --- a/rpc/src/module/test.rs +++ b/rpc/src/module/test.rs @@ -611,7 +611,7 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { Ok(current_epoch.full_value().into()) } - fn notify_transaction(&self, tx: Transaction) -> Result { + async fn notify_transaction(&self, tx: Transaction) -> Result { let tx: packed::Transaction = tx.into(); let tx: core::TransactionView = tx.into_view(); let tx_pool = self.shared.tx_pool_controller(); diff --git a/rpc/src/server.rs b/rpc/src/server.rs index 775c204647..d6dbbae889 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -22,36 +22,35 @@ impl RpcServer { /// * `io_handler` - RPC methods handler. See [ServiceBuilder](../service_builder/struct.ServiceBuilder.html). /// * `notify_controller` - Controller emitting notifications. pub async fn start_jsonrpc_server( - config: RpcConfig, io_handler: IoHandler, notify_controller: &NotifyController, handle: Handle, ) -> Result<(), String> { - let rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2); - - let rpc = Arc::new(rpc); + let rpc = Arc::new(io_handler); let stream_config = StreamServerConfig::default() .with_channel_size(4) .with_pipeline_size(4); // HTTP and WS server. let ws_config = stream_config.clone().with_keep_alive(true); - let app = jsonrpc_router("/", rpc.clone(), ws_config); + let app = jsonrpc_router("/rpc", rpc.clone(), ws_config); // You can use additional tower-http middlewares to add e.g. CORS. let http = tokio::spawn(async move { - axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()) + axum::Server::bind(&"0.0.0.0:8114".parse().unwrap()) .serve(app.into_make_service()) .await .unwrap(); }); + eprintln!("started http ..........."); + // TCP server. // TCP server with line delimited json codec. // // You can also use other transports (e.g. TLS, unix socket) and codecs // (e.g. netstring, JSON splitter). - let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap(); + let listener = TcpListener::bind("0.0.0.0:8116").await.unwrap(); let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024); while let Ok((s, _)) = listener.accept().await { let rpc = rpc.clone(); diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index 749d45014e..f9c90e5341 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -36,7 +36,7 @@ impl<'a> ServiceBuilder<'a> { pub fn new(config: &'a RpcConfig) -> Self { Self { config, - io_handler: IoHandler::default(), + io_handler: IoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), rpc_hander: MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), } } @@ -45,7 +45,7 @@ impl<'a> ServiceBuilder<'a> { pub fn enable_chain(mut self, shared: Shared) -> Self { if self.config.chain_enable() { let methods = ChainRpcImpl { shared }; - add_chain_rpc_methods(&mut self.rpc_hander, methods); + add_chain_rpc_methods(&mut self.io_handler, methods); } self } @@ -167,7 +167,7 @@ impl<'a> ServiceBuilder<'a> { add_integration_test_rpc_methods(&mut self.rpc_hander, methods); } self - } + }*/ /// Mounts methods from module Alert if it is enabled in the config. pub fn enable_alert( @@ -177,12 +177,14 @@ impl<'a> ServiceBuilder<'a> { network_controller: NetworkController, ) -> Self { if self.config.alert_enable() { + eprintln!("enable_alert ............."); let methods = AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller); add_alert_rpc_methods(&mut self.rpc_hander, methods); } self } + /* /// Mounts methods from module Debug if it is enabled in the config. pub fn enable_debug(mut self) -> Self { if self.config.debug_enable() { @@ -254,8 +256,8 @@ impl<'a> ServiceBuilder<'a> { /// Builds the RPC methods handler used in the RPC server. pub fn build(self) -> IoHandler { let mut io_handler = self.io_handler; - io_handler.add_sync_method("ping", |_| Ok("pong".into())); - + io_handler.add_method("@ping", |_| async move { Ok("pong".into()) }); + eprintln!("build ............."); io_handler } } diff --git a/src/main.rs b/src/main.rs index 15f623d055..24fa39c5dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,10 +5,12 @@ use ckb_build_info::Version; #[cfg(all(not(target_env = "msvc"), not(target_os = "macos")))] #[global_allocator] static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; +use ckb_async_runtime::new_global_runtime; fn main() { let version = get_version(); - if let Some(exit_code) = run_app(version).err() { + let (handle, _handle_stop_rx, runtime) = new_global_runtime(); + if let Some(exit_code) = runtime.block_on(run_app(version, handle)).err() { ::std::process::exit(exit_code.into()); } } diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index e804d8d721..3f48d29a62 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -20,7 +20,8 @@ use ckb_network::{ use ckb_network_alert::alert_relayer::AlertRelayer; use ckb_proposal_table::ProposalTable; use ckb_resource::Resource; -use ckb_rpc::{RpcServer, ServiceBuilder}; +use ckb_rpc::RpcServer; +use ckb_rpc::ServiceBuilder; use ckb_shared::Shared; use ckb_shared::shared_builder::{SharedBuilder, SharedPackage}; @@ -187,9 +188,9 @@ impl Launcher { &self, block_assembler_config: Option, ) -> Result<(Shared, SharedPackage), ExitCode> { - self.async_handle.block_on(observe_listen_port_occupancy( - &self.args.config.network.listen_addresses, - ))?; + // self.async_handle.block_on(observe_listen_port_occupancy( + // &self.args.config.network.listen_addresses, + // ))?; let shared_builder = SharedBuilder::new( &self.args.config.bin_name, @@ -378,7 +379,7 @@ impl Launcher { .expect("Start network service failed"); let rpc_config = self.adjust_rpc_config(); - let builder = ServiceBuilder::new(&rpc_config).enable_chain(shared.clone()); + let builder = ServiceBuilder::new(&rpc_config).enable_chain(shared.clone()) // .enable_pool( // shared.clone(), // rpc_config @@ -402,7 +403,7 @@ impl Launcher { // .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) // .enable_experiment(shared.clone()) // .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) - // .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) + .enable_alert(alert_verifier, alert_notifier, network_controller.clone()); // .enable_indexer( // shared.clone(), // &self.args.config.db, @@ -412,7 +413,6 @@ impl Launcher { let io_handler = builder.build(); RpcServer::start_jsonrpc_server( - rpc_config.clone(), io_handler, shared.notify_controller(), self.async_handle.clone().into_inner(), From ab2ed6cfb9157bba1bb9fed9144b871b5e2f649c Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 20 Sep 2023 12:23:44 +0800 Subject: [PATCH 03/31] fix rpc api router issues --- Cargo.lock | 2 ++ rpc/Cargo.toml | 1 + rpc/src/module/debug.rs | 30 ++++++++++++++------ rpc/src/module/miner.rs | 16 +++++++---- rpc/src/module/mod.rs | 2 ++ rpc/src/server.rs | 21 ++++++++++---- rpc/src/service_builder.rs | 57 ++++++++++++-------------------------- util/launcher/src/lib.rs | 21 ++++++-------- 8 files changed, 78 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2bbcc3093e..27ecec2e3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1366,6 +1366,7 @@ dependencies = [ "tempfile", "tokio", "tokio-util 0.7.10", + "tower-http", ] [[package]] @@ -5128,6 +5129,7 @@ dependencies = [ "http-body", "http-range-header", "pin-project-lite", + "tokio", "tower", "tower-layer", "tower-service", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 3052cced23..f809109b38 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -51,6 +51,7 @@ jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] } axum = "0.6.1" tokio-util = { version = "0.7.3", features = ["codec"] } futures-util = { version = "0.3.21"} +tower-http = { version = "0.3.5", features = ["timeout"] } [dev-dependencies] diff --git a/rpc/src/module/debug.rs b/rpc/src/module/debug.rs index eabf49d87d..dd0bdd1f40 100644 --- a/rpc/src/module/debug.rs +++ b/rpc/src/module/debug.rs @@ -1,15 +1,16 @@ +use async_trait::async_trait; use ckb_jsonrpc_types::{ExtraLoggerConfig, MainLoggerConfig}; use ckb_logger_service::Logger; use jsonrpc_core::{Error, ErrorCode::InternalError, Result}; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::time; - /// RPC Module Debug for internal RPC methods. /// /// **This module is for CKB developers and will not guarantee compatibility.** The methods here /// will be changed or removed without advanced notification. -#[rpc(server)] #[doc(hidden)] +#[rpc] +#[async_trait] pub trait DebugRpc { /// Dumps jemalloc memory profiling information into a file. /// @@ -17,10 +18,10 @@ pub trait DebugRpc { /// /// The RPC returns the path to the dumped file on success or returns an error on failure. #[rpc(name = "jemalloc_profiling_dump")] - fn jemalloc_profiling_dump(&self) -> Result; + async fn jemalloc_profiling_dump(&self) -> Result; /// Changes main logger config options while CKB is running. #[rpc(name = "update_main_logger")] - fn update_main_logger(&self, config: MainLoggerConfig) -> Result<()>; + async fn update_main_logger(&self, config: MainLoggerConfig) -> Result<()>; /// Sets logger config options for extra loggers. /// /// CKB nodes allow setting up extra loggers. These loggers will have their own log files and @@ -32,13 +33,20 @@ pub trait DebugRpc { /// * `config_opt` - Adds a new logger or update an existing logger when this is not null. /// Removes the logger when this is null. #[rpc(name = "set_extra_logger")] - fn set_extra_logger(&self, name: String, config_opt: Option) -> Result<()>; + async fn set_extra_logger( + &self, + name: String, + config_opt: Option, + ) -> Result<()>; } +#[derive(Clone)] pub(crate) struct DebugRpcImpl {} +#[async_trait] + impl DebugRpc for DebugRpcImpl { - fn jemalloc_profiling_dump(&self) -> Result { + async fn jemalloc_profiling_dump(&self) -> Result { let timestamp = time::SystemTime::now() .duration_since(time::SystemTime::UNIX_EPOCH) .unwrap() @@ -54,7 +62,7 @@ impl DebugRpc for DebugRpcImpl { } } - fn update_main_logger(&self, config: MainLoggerConfig) -> Result<()> { + async fn update_main_logger(&self, config: MainLoggerConfig) -> Result<()> { let MainLoggerConfig { filter, to_stdout, @@ -71,7 +79,11 @@ impl DebugRpc for DebugRpcImpl { }) } - fn set_extra_logger(&self, name: String, config_opt: Option) -> Result<()> { + async fn set_extra_logger( + &self, + name: String, + config_opt: Option, + ) -> Result<()> { if let Err(err) = Logger::check_extra_logger_name(&name) { return Err(Error { code: InternalError, diff --git a/rpc/src/module/miner.rs b/rpc/src/module/miner.rs index a334e88180..3b96bb0e37 100644 --- a/rpc/src/module/miner.rs +++ b/rpc/src/module/miner.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_chain::chain::ChainController; use ckb_jsonrpc_types::{Block, BlockTemplate, Uint64, Version}; use ckb_logger::{debug, error, info, warn}; @@ -9,7 +10,7 @@ use ckb_types::{core, packed, prelude::*, H256}; use ckb_verification::HeaderVerifier; use ckb_verification_traits::Verifier; use jsonrpc_core::{Error, Result}; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; @@ -18,7 +19,8 @@ use std::sync::Arc; /// /// A miner gets a template from CKB, optionally selects transactions, resolves the PoW puzzle, and /// submits the found new block. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait MinerRpc { /// Returns block template for miners. /// @@ -131,7 +133,7 @@ pub trait MinerRpc { /// } /// ``` #[rpc(name = "get_block_template")] - fn get_block_template( + async fn get_block_template( &self, bytes_limit: Option, proposals_limit: Option, @@ -220,17 +222,19 @@ pub trait MinerRpc { /// } /// ``` #[rpc(name = "submit_block")] - fn submit_block(&self, work_id: String, block: Block) -> Result; + async fn submit_block(&self, work_id: String, block: Block) -> Result; } +#[derive(Clone)] pub(crate) struct MinerRpcImpl { pub network_controller: NetworkController, pub shared: Shared, pub chain: ChainController, } +#[async_trait] impl MinerRpc for MinerRpcImpl { - fn get_block_template( + async fn get_block_template( &self, bytes_limit: Option, proposals_limit: Option, @@ -252,7 +256,7 @@ impl MinerRpc for MinerRpcImpl { }) } - fn submit_block(&self, work_id: String, block: Block) -> Result { + async fn submit_block(&self, work_id: String, block: Block) -> Result { let block: packed::Block = block.into(); let block: Arc = Arc::new(block.into_view()); let header = block.header(); diff --git a/rpc/src/module/mod.rs b/rpc/src/module/mod.rs index fb40cbb187..f79cbbb84a 100644 --- a/rpc/src/module/mod.rs +++ b/rpc/src/module/mod.rs @@ -138,9 +138,11 @@ pub use self::alert::add_alert_rpc_methods; pub use self::alert::AlertRpc; pub use self::chain::add_chain_rpc_methods; pub use self::chain::ChainRpc; +pub use self::debug::add_debug_rpc_methods; pub use self::debug::DebugRpc; pub use self::experiment::ExperimentRpc; pub use self::indexer::IndexerRpc; +pub use self::miner::add_miner_rpc_methods; pub use self::miner::MinerRpc; pub use self::net::NetRpc; pub use self::pool::PoolRpc; diff --git a/rpc/src/server.rs b/rpc/src/server.rs index d6dbbae889..ade1630297 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -1,14 +1,17 @@ use crate::IoHandler; -use ckb_app_config::RpcConfig; +use axum::routing::post; +use axum::{Extension, Router}; use ckb_notify::NotifyController; use futures_util::{SinkExt, TryStreamExt}; -use jsonrpc_core::MetaIoHandler; -use jsonrpc_utils::axum_utils::jsonrpc_router; +use jsonrpc_utils::axum_utils::{handle_jsonrpc, handle_jsonrpc_ws}; +use jsonrpc_utils::pub_sub::Session; use jsonrpc_utils::stream::{serve_stream_sink, StreamMsg, StreamServerConfig}; use std::sync::Arc; +use std::time::Duration; use tokio::net::TcpListener; use tokio::runtime::Handle; use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError}; +use tower_http::timeout::TimeoutLayer; #[doc(hidden)] pub struct RpcServer {} @@ -32,10 +35,18 @@ impl RpcServer { .with_pipeline_size(4); // HTTP and WS server. + let method_router = + post(handle_jsonrpc::>).get(handle_jsonrpc_ws::>); let ws_config = stream_config.clone().with_keep_alive(true); - let app = jsonrpc_router("/rpc", rpc.clone(), ws_config); + let app = Router::new() + .route("/", method_router.clone()) + .route("/*path", method_router) + .layer(Extension(rpc.clone())) + .layer(Extension(ws_config)) + .layer(TimeoutLayer::new(Duration::from_secs(30))); + // You can use additional tower-http middlewares to add e.g. CORS. - let http = tokio::spawn(async move { + let _http = tokio::spawn(async move { axum::Server::bind(&"0.0.0.0:8114".parse().unwrap()) .serve(app.into_make_service()) .await diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index f9c90e5341..a804564d57 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -1,11 +1,11 @@ #![allow(deprecated)] -use crate::error::RPCError; //use crate::module::SubscriptionSession; use crate::module::{ - add_alert_rpc_methods, add_chain_rpc_methods, add_integration_test_rpc_methods, AlertRpcImpl, - ChainRpcImpl, DebugRpc, DebugRpcImpl, ExperimentRpc, ExperimentRpcImpl, IndexerRpc, - IndexerRpcImpl, IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, - PoolRpcImpl, StatsRpc, StatsRpcImpl, + add_alert_rpc_methods, add_chain_rpc_methods, add_debug_rpc_methods, + add_integration_test_rpc_methods, add_miner_rpc_methods, AlertRpcImpl, ChainRpcImpl, DebugRpc, + DebugRpcImpl, ExperimentRpc, ExperimentRpcImpl, IndexerRpc, IndexerRpcImpl, + IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, PoolRpcImpl, + StatsRpc, StatsRpcImpl, }; use crate::IoHandler; use ckb_app_config::{DBConfig, IndexerConfig, RpcConfig}; @@ -28,7 +28,6 @@ const DEPRECATED_RPC_PREFIX: &str = "deprecated."; pub struct ServiceBuilder<'a> { config: &'a RpcConfig, io_handler: IoHandler, - rpc_hander: MetaIoHandler>, } impl<'a> ServiceBuilder<'a> { @@ -37,7 +36,6 @@ impl<'a> ServiceBuilder<'a> { Self { config, io_handler: IoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), - rpc_hander: MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), } } @@ -71,6 +69,7 @@ impl<'a> ServiceBuilder<'a> { } self } + */ /// Mounts methods from module Miner if `enable` is `true` and it is enabled in the config. pub fn enable_miner( @@ -84,16 +83,16 @@ impl<'a> ServiceBuilder<'a> { shared, chain, network_controller, - } - .to_delegate(); + }; if enable && self.config.miner_enable() { - self.add_methods(rpc_methods); + add_miner_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Miner", rpc_methods); + //self.update_disabled_methods("Miner", rpc_methods); } self } + /* /// Mounts methods from module Net if it is enabled in the config. pub fn enable_net( mut self, @@ -142,6 +141,7 @@ impl<'a> ServiceBuilder<'a> { } self } + */ /// Mounts methods from module Integration if it is enabled in the config. pub fn enable_integration_test( @@ -164,10 +164,10 @@ impl<'a> ServiceBuilder<'a> { network_controller, chain, }; - add_integration_test_rpc_methods(&mut self.rpc_hander, methods); + add_integration_test_rpc_methods(&mut self.io_handler, methods); } self - }*/ + } /// Mounts methods from module Alert if it is enabled in the config. pub fn enable_alert( @@ -179,20 +179,21 @@ impl<'a> ServiceBuilder<'a> { if self.config.alert_enable() { eprintln!("enable_alert ............."); let methods = AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller); - add_alert_rpc_methods(&mut self.rpc_hander, methods); + add_alert_rpc_methods(&mut self.io_handler, methods); } self } - /* /// Mounts methods from module Debug if it is enabled in the config. pub fn enable_debug(mut self) -> Self { if self.config.debug_enable() { - self.io_handler.extend_with(DebugRpcImpl {}.to_delegate()); + //self.io_handler.extend_with(DebugRpcImpl {}.to_delegate()); + add_debug_rpc_methods(&mut self.io_handler, DebugRpcImpl {}); } self } + /* /// Mounts methods from module Indexer if it is enabled in the config. pub fn enable_indexer( mut self, @@ -227,30 +228,6 @@ impl<'a> ServiceBuilder<'a> { ) }); } - - fn add_methods(&mut self, rpc_methods: I) - where - I: IntoIterator>)>, - { - let enable_deprecated_rpc = self.config.enable_deprecated_rpc; - self.io_handler - .extend_with(rpc_methods.into_iter().map(|(name, method)| { - if let Some(deprecated_method_name) = name.strip_prefix(DEPRECATED_RPC_PREFIX) { - ( - deprecated_method_name.to_owned(), - if enable_deprecated_rpc { - method - } else { - RemoteProcedure::Method(Arc::new(|_param, _meta| async { - Err(RPCError::rpc_method_is_deprecated()) - })) - }, - ) - } else { - (name, method) - } - })); - } */ /// Builds the RPC methods handler used in the RPC server. diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 3f48d29a62..8be0d9757b 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -31,9 +31,6 @@ use ckb_tx_pool::service::TxVerificationResult; use ckb_types::prelude::*; use ckb_verification::GenesisVerifier; use ckb_verification_traits::Verifier; -use jsonrpc_utils::{ - axum_utils::jsonrpc_router, pub_sub::PublishMsg, rpc, stream::StreamServerConfig, -}; use std::sync::Arc; const SECP256K1_BLAKE160_SIGHASH_ALL_ARG_LEN: usize = 20; @@ -393,23 +390,23 @@ impl Launcher { // .map(|script| script.clone().into()) // .collect(), // ) - // .enable_miner( - // shared.clone(), - // network_controller.clone(), - // chain_controller.clone(), - // miner_enable, - // ) + .enable_miner( + shared.clone(), + network_controller.clone(), + chain_controller.clone(), + miner_enable, + ) // .enable_net(network_controller.clone(), sync_shared) // .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) // .enable_experiment(shared.clone()) - // .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) - .enable_alert(alert_verifier, alert_notifier, network_controller.clone()); + .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) + .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) // .enable_indexer( // shared.clone(), // &self.args.config.db, // &self.args.config.indexer, // ) - //.enable_debug(); + .enable_debug(); let io_handler = builder.build(); RpcServer::start_jsonrpc_server( From 5a372b4a17732c69330ccf75e85042ab7598e401 Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 20 Sep 2023 17:29:08 +0800 Subject: [PATCH 04/31] migrate more rpc apis to jsonrpc_utils --- rpc/src/lib.rs | 2 +- rpc/src/module/experiment.rs | 16 +++++---- rpc/src/module/indexer.rs | 24 ++++++++------ rpc/src/module/mod.rs | 5 +++ rpc/src/module/net.rs | 48 ++++++++++++++------------- rpc/src/module/pool.rs | 32 ++++++++++-------- rpc/src/module/stats.rs | 16 +++++---- rpc/src/server.rs | 4 +-- rpc/src/service_builder.rs | 55 ++++++++++++------------------- util/indexer/src/service.rs | 1 + util/launcher/src/lib.rs | 63 ++++++++++++++++++------------------ 11 files changed, 139 insertions(+), 127 deletions(-) diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 9dbb12bbb0..812df193ee 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -18,4 +18,4 @@ pub use crate::server::RpcServer; pub use crate::service_builder::ServiceBuilder; #[doc(hidden)] -pub type IoHandler = MetaIoHandler>; \ No newline at end of file +pub type IoHandler = MetaIoHandler>; diff --git a/rpc/src/module/experiment.rs b/rpc/src/module/experiment.rs index 0486c9009a..6f8491b600 100644 --- a/rpc/src/module/experiment.rs +++ b/rpc/src/module/experiment.rs @@ -1,5 +1,6 @@ use crate::error::RPCError; use crate::module::chain::CyclesEstimator; +use async_trait::async_trait; use ckb_dao::DaoCalculator; use ckb_jsonrpc_types::{ Capacity, DaoWithdrawingCalculationKind, EstimateCycles, OutPoint, Transaction, @@ -8,14 +9,15 @@ use ckb_shared::{shared::Shared, Snapshot}; use ckb_store::ChainStore; use ckb_types::{core, packed, prelude::*}; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; /// RPC Module Experiment for experimenting methods. /// /// **EXPERIMENTAL warning** /// /// The methods here may be removed or changed in future releases without prior notifications. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait ExperimentRpc { /// Dry run a transaction and return the execution cycles. /// @@ -98,7 +100,7 @@ pub trait ExperimentRpc { note = "Please use the RPC method [`estimate_cycles`](#tymethod.estimate_cycles) instead" )] #[rpc(name = "dry_run_transaction")] - fn dry_run_transaction(&self, tx: Transaction) -> Result; + async fn dry_run_transaction(&self, tx: Transaction) -> Result; /// Calculates the maximum withdrawal one can get, given a referenced DAO cell, and /// a withdrawing block hash. @@ -155,24 +157,26 @@ pub trait ExperimentRpc { /// } /// ``` #[rpc(name = "calculate_dao_maximum_withdraw")] - fn calculate_dao_maximum_withdraw( + async fn calculate_dao_maximum_withdraw( &self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind, ) -> Result; } +#[derive(Clone)] pub(crate) struct ExperimentRpcImpl { pub shared: Shared, } +#[async_trait] impl ExperimentRpc for ExperimentRpcImpl { - fn dry_run_transaction(&self, tx: Transaction) -> Result { + async fn dry_run_transaction(&self, tx: Transaction) -> Result { let tx: packed::Transaction = tx.into(); CyclesEstimator::new(&self.shared).run(tx) } - fn calculate_dao_maximum_withdraw( + async fn calculate_dao_maximum_withdraw( &self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind, diff --git a/rpc/src/module/indexer.rs b/rpc/src/module/indexer.rs index ea7578b9e3..7d6cfcbd9b 100644 --- a/rpc/src/module/indexer.rs +++ b/rpc/src/module/indexer.rs @@ -1,14 +1,16 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_indexer::IndexerHandle; use ckb_jsonrpc_types::{ IndexerCell, IndexerCellsCapacity, IndexerOrder, IndexerPagination, IndexerSearchKey, IndexerTip, IndexerTx, JsonBytes, Uint32, }; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; /// RPC Module Indexer. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait IndexerRpc { /// Returns the indexed tip /// @@ -41,7 +43,7 @@ pub trait IndexerRpc { /// } /// ``` #[rpc(name = "get_indexer_tip")] - fn get_indexer_tip(&self) -> Result>; + async fn get_indexer_tip(&self) -> Result>; /// Returns the live cells collection by the lock or type script. /// @@ -389,7 +391,7 @@ pub trait IndexerRpc { /// } /// ``` #[rpc(name = "get_cells")] - fn get_cells( + async fn get_cells( &self, search_key: IndexerSearchKey, order: IndexerOrder, @@ -804,7 +806,7 @@ pub trait IndexerRpc { /// } /// ``` #[rpc(name = "get_transactions")] - fn get_transactions( + async fn get_transactions( &self, search_key: IndexerSearchKey, order: IndexerOrder, @@ -867,12 +869,13 @@ pub trait IndexerRpc { /// } /// ``` #[rpc(name = "get_cells_capacity")] - fn get_cells_capacity( + async fn get_cells_capacity( &self, search_key: IndexerSearchKey, ) -> Result>; } +#[derive(Clone)] pub(crate) struct IndexerRpcImpl { pub(crate) handle: IndexerHandle, } @@ -883,14 +886,15 @@ impl IndexerRpcImpl { } } +#[async_trait] impl IndexerRpc for IndexerRpcImpl { - fn get_indexer_tip(&self) -> Result> { + async fn get_indexer_tip(&self) -> Result> { self.handle .get_indexer_tip() .map_err(|e| RPCError::custom(RPCError::Indexer, e)) } - fn get_cells( + async fn get_cells( &self, search_key: IndexerSearchKey, order: IndexerOrder, @@ -902,7 +906,7 @@ impl IndexerRpc for IndexerRpcImpl { .map_err(|e| RPCError::custom(RPCError::Indexer, e)) } - fn get_transactions( + async fn get_transactions( &self, search_key: IndexerSearchKey, order: IndexerOrder, @@ -914,7 +918,7 @@ impl IndexerRpc for IndexerRpcImpl { .map_err(|e| RPCError::custom(RPCError::Indexer, e)) } - fn get_cells_capacity( + async fn get_cells_capacity( &self, search_key: IndexerSearchKey, ) -> Result> { diff --git a/rpc/src/module/mod.rs b/rpc/src/module/mod.rs index f79cbbb84a..0764848246 100644 --- a/rpc/src/module/mod.rs +++ b/rpc/src/module/mod.rs @@ -140,12 +140,17 @@ pub use self::chain::add_chain_rpc_methods; pub use self::chain::ChainRpc; pub use self::debug::add_debug_rpc_methods; pub use self::debug::DebugRpc; +pub use self::experiment::add_experiment_rpc_methods; pub use self::experiment::ExperimentRpc; +pub use self::indexer::add_indexer_rpc_methods; pub use self::indexer::IndexerRpc; pub use self::miner::add_miner_rpc_methods; pub use self::miner::MinerRpc; +pub use self::net::add_net_rpc_methods; pub use self::net::NetRpc; +pub use self::pool::add_pool_rpc_methods; pub use self::pool::PoolRpc; +pub use self::stats::add_stats_rpc_methods; pub use self::stats::StatsRpc; pub use self::test::add_integration_test_rpc_methods; //pub use self::subscription::SubscriptionRpc; diff --git a/rpc/src/module/net.rs b/rpc/src/module/net.rs index bef8e6d91c..602b2b85d6 100644 --- a/rpc/src/module/net.rs +++ b/rpc/src/module/net.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_jsonrpc_types::{ BannedAddr, LocalNode, LocalNodeProtocol, NodeAddress, PeerSyncState, RemoteNode, RemoteNodeProtocol, SyncState, Timestamp, @@ -7,14 +8,15 @@ use ckb_network::{extract_peer_id, multiaddr::Multiaddr, NetworkController}; use ckb_sync::SyncShared; use ckb_systemtime::unix_time_as_millis; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::sync::Arc; const MAX_ADDRS: usize = 50; const DEFAULT_BAN_DURATION: u64 = 24 * 60 * 60 * 1000; // 1 day /// RPC Module Net for P2P network. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait NetRpc { /// Returns the local node information. /// @@ -74,7 +76,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "local_node_info")] - fn local_node_info(&self) -> Result; + async fn local_node_info(&self) -> Result; /// Returns the connected peers' information. /// @@ -218,7 +220,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "get_peers")] - fn get_peers(&self) -> Result>; + async fn get_peers(&self) -> Result>; /// Returns all banned IPs/Subnets. /// @@ -252,7 +254,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "get_banned_addresses")] - fn get_banned_addresses(&self) -> Result>; + async fn get_banned_addresses(&self) -> Result>; /// Clears all banned IPs/Subnets. /// @@ -279,7 +281,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "clear_banned_addresses")] - fn clear_banned_addresses(&self) -> Result<()>; + async fn clear_banned_addresses(&self) -> Result<()>; /// Inserts or deletes an IP/Subnet from the banned list /// @@ -328,7 +330,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "set_ban")] - fn set_ban( + async fn set_ban( &self, address: String, command: String, @@ -371,7 +373,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "sync_state")] - fn sync_state(&self) -> Result; + async fn sync_state(&self) -> Result; /// Disable/enable all p2p network activity /// @@ -404,7 +406,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "set_network_active")] - fn set_network_active(&self, state: bool) -> Result<()>; + async fn set_network_active(&self, state: bool) -> Result<()>; /// Attempts to add a node to the peers list and try connecting to it. /// @@ -464,7 +466,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "add_node")] - fn add_node(&self, peer_id: String, address: String) -> Result<()>; + async fn add_node(&self, peer_id: String, address: String) -> Result<()>; /// Attempts to remove a node from the peers list and try disconnecting from it. /// @@ -501,7 +503,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "remove_node")] - fn remove_node(&self, peer_id: String) -> Result<()>; + async fn remove_node(&self, peer_id: String) -> Result<()>; /// Requests that a ping is sent to all connected peers, to measure ping time. /// @@ -528,16 +530,18 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "ping_peers")] - fn ping_peers(&self) -> Result<()>; + async fn ping_peers(&self) -> Result<()>; } +#[derive(Clone)] pub(crate) struct NetRpcImpl { pub network_controller: NetworkController, pub sync_shared: Arc, } +#[async_trait] impl NetRpc for NetRpcImpl { - fn local_node_info(&self) -> Result { + async fn local_node_info(&self) -> Result { Ok(LocalNode { version: self.network_controller.version().to_owned(), node_id: self.network_controller.node_id(), @@ -565,7 +569,7 @@ impl NetRpc for NetRpcImpl { }) } - fn get_peers(&self) -> Result> { + async fn get_peers(&self) -> Result> { let peers: Vec = self .network_controller .connected_peers() @@ -652,7 +656,7 @@ impl NetRpc for NetRpcImpl { Ok(peers) } - fn get_banned_addresses(&self) -> Result> { + async fn get_banned_addresses(&self) -> Result> { Ok(self .network_controller .get_banned_addrs() @@ -666,12 +670,12 @@ impl NetRpc for NetRpcImpl { .collect()) } - fn clear_banned_addresses(&self) -> Result<()> { + async fn clear_banned_addresses(&self) -> Result<()> { self.network_controller.clear_banned_addrs(); Ok(()) } - fn set_ban( + async fn set_ban( &self, address: String, command: String, @@ -709,7 +713,7 @@ impl NetRpc for NetRpcImpl { } } - fn sync_state(&self) -> Result { + async fn sync_state(&self) -> Result { let chain = self.sync_shared.active_chain(); let state = chain.shared().state(); let (fast_time, normal_time, low_time) = state.read_inflight_blocks().division_point(); @@ -729,12 +733,12 @@ impl NetRpc for NetRpcImpl { Ok(sync_state) } - fn set_network_active(&self, state: bool) -> Result<()> { + async fn set_network_active(&self, state: bool) -> Result<()> { self.network_controller.set_active(state); Ok(()) } - fn add_node(&self, peer_id: String, address: String) -> Result<()> { + async fn add_node(&self, peer_id: String, address: String) -> Result<()> { if let Ok(multiaddr) = address.parse::() { if extract_peer_id(&multiaddr).is_some() { self.network_controller.add_node(multiaddr) @@ -745,14 +749,14 @@ impl NetRpc for NetRpcImpl { Ok(()) } - fn remove_node(&self, peer_id: String) -> Result<()> { + async fn remove_node(&self, peer_id: String) -> Result<()> { if let Ok(id) = peer_id.parse() { self.network_controller.remove_node(&id) } Ok(()) } - fn ping_peers(&self) -> Result<()> { + async fn ping_peers(&self) -> Result<()> { self.network_controller.ping_peers(); Ok(()) } diff --git a/rpc/src/module/pool.rs b/rpc/src/module/pool.rs index cdad05f947..05e11ef3e7 100644 --- a/rpc/src/module/pool.rs +++ b/rpc/src/module/pool.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_chain_spec::consensus::Consensus; use ckb_constant::hardfork::{mainnet, testnet}; use ckb_jsonrpc_types::{ @@ -9,11 +10,12 @@ use ckb_shared::shared::Shared; use ckb_types::{core, packed, prelude::*, H256}; use ckb_verification::{Since, SinceMetric}; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::sync::Arc; /// RPC Module Pool for transaction memory pool. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait PoolRpc { /// Submits a new transaction into the transaction pool. If the transaction is already in the /// pool, rebroadcast it to peers. @@ -101,7 +103,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "send_transaction")] - fn send_transaction( + async fn send_transaction( &self, tx: Transaction, outputs_validator: Option, @@ -142,7 +144,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "remove_transaction")] - fn remove_transaction(&self, tx_hash: H256) -> Result; + async fn remove_transaction(&self, tx_hash: H256) -> Result; /// Returns the transaction pool information. /// @@ -182,7 +184,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "tx_pool_info")] - fn tx_pool_info(&self) -> Result; + async fn tx_pool_info(&self) -> Result; /// Removes all transactions from the transaction pool. /// @@ -209,7 +211,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "clear_tx_pool")] - fn clear_tx_pool(&self) -> Result<()>; + async fn clear_tx_pool(&self) -> Result<()>; /// Returns all transaction ids in tx pool as a json array of string transaction ids. /// ## Params @@ -253,7 +255,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "get_raw_tx_pool")] - fn get_raw_tx_pool(&self, verbose: Option) -> Result; + async fn get_raw_tx_pool(&self, verbose: Option) -> Result; /// Query and returns the details of a transaction in the pool, only for trouble shooting /// ## Params @@ -321,9 +323,10 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "tx_pool_ready")] - fn tx_pool_ready(&self) -> Result; + async fn tx_pool_ready(&self) -> Result; } +#[derive(Clone)] pub(crate) struct PoolRpcImpl { shared: Shared, well_known_lock_scripts: Vec, @@ -428,13 +431,14 @@ fn build_well_known_type_scripts(chain_spec_name: &str) -> Vec { }).expect("checked json str").into_iter().map(Into::into).collect() } +#[async_trait] impl PoolRpc for PoolRpcImpl { - fn tx_pool_ready(&self) -> Result { + async fn tx_pool_ready(&self) -> Result { let tx_pool = self.shared.tx_pool_controller(); Ok(tx_pool.service_started()) } - fn send_transaction( + async fn send_transaction( &self, tx: Transaction, outputs_validator: Option, @@ -477,7 +481,7 @@ impl PoolRpc for PoolRpcImpl { } } - fn remove_transaction(&self, tx_hash: H256) -> Result { + async fn remove_transaction(&self, tx_hash: H256) -> Result { let tx_pool = self.shared.tx_pool_controller(); tx_pool.remove_local_tx(tx_hash.pack()).map_err(|e| { @@ -486,7 +490,7 @@ impl PoolRpc for PoolRpcImpl { }) } - fn tx_pool_info(&self) -> Result { + async fn tx_pool_info(&self) -> Result { let tx_pool = self.shared.tx_pool_controller(); let get_tx_pool_info = tx_pool.get_tx_pool_info(); if let Err(e) = get_tx_pool_info { @@ -499,7 +503,7 @@ impl PoolRpc for PoolRpcImpl { Ok(tx_pool_info.into()) } - fn clear_tx_pool(&self) -> Result<()> { + async fn clear_tx_pool(&self) -> Result<()> { let snapshot = Arc::clone(&self.shared.snapshot()); let tx_pool = self.shared.tx_pool_controller(); tx_pool @@ -509,7 +513,7 @@ impl PoolRpc for PoolRpcImpl { Ok(()) } - fn get_raw_tx_pool(&self, verbose: Option) -> Result { + async fn get_raw_tx_pool(&self, verbose: Option) -> Result { let tx_pool = self.shared.tx_pool_controller(); let raw = if verbose.unwrap_or(false) { diff --git a/rpc/src/module/stats.rs b/rpc/src/module/stats.rs index b4162b0e40..c9591cda9f 100644 --- a/rpc/src/module/stats.rs +++ b/rpc/src/module/stats.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use ckb_jsonrpc_types::{AlertMessage, ChainInfo, DeploymentInfo, DeploymentPos, DeploymentsInfo}; use ckb_network_alert::notifier::Notifier as AlertNotifier; use ckb_shared::shared::Shared; @@ -5,12 +6,13 @@ use ckb_traits::HeaderFieldsProvider; use ckb_types::prelude::Unpack; use ckb_util::Mutex; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::collections::BTreeMap; use std::sync::Arc; /// RPC Module Stats for getting various statistic data. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait StatsRpc { /// Returns statistics about the chain. /// @@ -51,7 +53,7 @@ pub trait StatsRpc { /// } /// ``` #[rpc(name = "get_blockchain_info")] - fn get_blockchain_info(&self) -> Result; + async fn get_blockchain_info(&self) -> Result; /// Returns statistics about the chain. /// @@ -96,16 +98,18 @@ pub trait StatsRpc { /// } /// ``` #[rpc(name = "get_deployments_info")] - fn get_deployments_info(&self) -> Result; + async fn get_deployments_info(&self) -> Result; } +#[derive(Clone)] pub(crate) struct StatsRpcImpl { pub shared: Shared, pub alert_notifier: Arc>, } +#[async_trait] impl StatsRpc for StatsRpcImpl { - fn get_blockchain_info(&self) -> Result { + async fn get_blockchain_info(&self) -> Result { let chain = self.shared.consensus().id.clone(); let (tip_header, median_time) = { let snapshot = self.shared.snapshot(); @@ -147,7 +151,7 @@ impl StatsRpc for StatsRpcImpl { }) } - fn get_deployments_info(&self) -> Result { + async fn get_deployments_info(&self) -> Result { let snapshot = self.shared.snapshot(); let deployments: BTreeMap = self .shared diff --git a/rpc/src/server.rs b/rpc/src/server.rs index ade1630297..f83e197728 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -26,8 +26,8 @@ impl RpcServer { /// * `notify_controller` - Controller emitting notifications. pub async fn start_jsonrpc_server( io_handler: IoHandler, - notify_controller: &NotifyController, - handle: Handle, + _notify_controller: &NotifyController, + _handle: Handle, ) -> Result<(), String> { let rpc = Arc::new(io_handler); let stream_config = StreamServerConfig::default() diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index a804564d57..4d0b540401 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -2,12 +2,12 @@ //use crate::module::SubscriptionSession; use crate::module::{ add_alert_rpc_methods, add_chain_rpc_methods, add_debug_rpc_methods, - add_integration_test_rpc_methods, add_miner_rpc_methods, AlertRpcImpl, ChainRpcImpl, DebugRpc, - DebugRpcImpl, ExperimentRpc, ExperimentRpcImpl, IndexerRpc, IndexerRpcImpl, - IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, PoolRpcImpl, - StatsRpc, StatsRpcImpl, + add_experiment_rpc_methods, add_indexer_rpc_methods, add_integration_test_rpc_methods, + add_miner_rpc_methods, add_net_rpc_methods, add_pool_rpc_methods, add_stats_rpc_methods, + AlertRpcImpl, ChainRpcImpl, DebugRpcImpl, ExperimentRpcImpl, IndexerRpcImpl, + IntegrationTestRpcImpl, MinerRpcImpl, NetRpcImpl, PoolRpcImpl, StatsRpcImpl, }; -use crate::IoHandler; +use crate::{IoHandler, RPCError}; use ckb_app_config::{DBConfig, IndexerConfig, RpcConfig}; use ckb_chain::chain::ChainController; use ckb_indexer::IndexerService; @@ -18,8 +18,6 @@ use ckb_shared::shared::Shared; use ckb_sync::SyncShared; use ckb_types::packed::Script; use ckb_util::Mutex; -use jsonrpc_core::MetaIoHandler; -use jsonrpc_core::RemoteProcedure; use std::sync::Arc; const DEPRECATED_RPC_PREFIX: &str = "deprecated."; @@ -48,7 +46,6 @@ impl<'a> ServiceBuilder<'a> { self } - /* /// Mounts methods from module Pool if it is enabled in the config. pub fn enable_pool( mut self, @@ -60,16 +57,14 @@ impl<'a> ServiceBuilder<'a> { shared, extra_well_known_lock_scripts, extra_well_known_type_scripts, - ) - .to_delegate(); + ); if self.config.pool_enable() { - self.add_methods(rpc_methods); + add_pool_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Pool", rpc_methods); + //self.update_disabled_methods("Pool", rpc_methods); } self } - */ /// Mounts methods from module Miner if `enable` is `true` and it is enabled in the config. pub fn enable_miner( @@ -92,7 +87,6 @@ impl<'a> ServiceBuilder<'a> { self } - /* /// Mounts methods from module Net if it is enabled in the config. pub fn enable_net( mut self, @@ -102,12 +96,11 @@ impl<'a> ServiceBuilder<'a> { let rpc_methods = NetRpcImpl { network_controller, sync_shared, - } - .to_delegate(); + }; if self.config.net_enable() { - self.add_methods(rpc_methods); + add_net_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Net", rpc_methods); + //self.update_disabled_methods("Net", rpc_methods); } self } @@ -121,27 +114,25 @@ impl<'a> ServiceBuilder<'a> { let rpc_methods = StatsRpcImpl { shared, alert_notifier, - } - .to_delegate(); + }; if self.config.stats_enable() { - self.add_methods(rpc_methods); + add_stats_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Stats", rpc_methods); + //self.update_disabled_methods("Stats", rpc_methods); } self } /// Mounts methods from module Experiment if it is enabled in the config. pub fn enable_experiment(mut self, shared: Shared) -> Self { - let rpc_methods = ExperimentRpcImpl { shared }.to_delegate(); + let rpc_methods = ExperimentRpcImpl { shared }; if self.config.experiment_enable() { - self.add_methods(rpc_methods); + add_experiment_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Experiment", rpc_methods); + //self.update_disabled_methods("Experiment", rpc_methods); } self } - */ /// Mounts methods from module Integration if it is enabled in the config. pub fn enable_integration_test( @@ -152,13 +143,11 @@ impl<'a> ServiceBuilder<'a> { ) -> Self { if self.config.integration_test_enable() { // IntegrationTest only on Dummy PoW chain - /* assert_eq!( shared.consensus().pow, Pow::Dummy, "Only run integration test on Dummy PoW chain" ); - */ let methods = IntegrationTestRpcImpl { shared: shared.clone(), network_controller, @@ -177,7 +166,6 @@ impl<'a> ServiceBuilder<'a> { network_controller: NetworkController, ) -> Self { if self.config.alert_enable() { - eprintln!("enable_alert ............."); let methods = AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller); add_alert_rpc_methods(&mut self.io_handler, methods); } @@ -193,7 +181,6 @@ impl<'a> ServiceBuilder<'a> { self } - /* /// Mounts methods from module Indexer if it is enabled in the config. pub fn enable_indexer( mut self, @@ -203,12 +190,12 @@ impl<'a> ServiceBuilder<'a> { ) -> Self { let indexer = IndexerService::new(db_config, indexer_config, shared.async_handle().clone()); let indexer_handle = indexer.handle(); - let rpc_methods = IndexerRpcImpl::new(indexer_handle).to_delegate(); + let rpc_methods = IndexerRpcImpl::new(indexer_handle); if self.config.indexer_enable() { start_indexer(&shared, indexer, indexer_config.index_tx_pool); - self.add_methods(rpc_methods); + add_indexer_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Indexer", rpc_methods); + //self.update_disabled_methods("Indexer", rpc_methods); } self } @@ -228,13 +215,11 @@ impl<'a> ServiceBuilder<'a> { ) }); } - */ /// Builds the RPC methods handler used in the RPC server. pub fn build(self) -> IoHandler { let mut io_handler = self.io_handler; io_handler.add_method("@ping", |_| async move { Ok("pong".into()) }); - eprintln!("build ............."); io_handler } } diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index ad99825bb9..2dacb88475 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -253,6 +253,7 @@ impl IndexerService { /// /// The handle is internally reference-counted and can be freely cloned. /// A handle can be obtained using the IndexerService::handle method. +#[derive(Clone)] pub struct IndexerHandle { pub(crate) store: RocksdbStore, pub(crate) pool: Option>>, diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 8be0d9757b..45d13fe177 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -376,37 +376,38 @@ impl Launcher { .expect("Start network service failed"); let rpc_config = self.adjust_rpc_config(); - let builder = ServiceBuilder::new(&rpc_config).enable_chain(shared.clone()) - // .enable_pool( - // shared.clone(), - // rpc_config - // .extra_well_known_lock_scripts - // .iter() - // .map(|script| script.clone().into()) - // .collect(), - // rpc_config - // .extra_well_known_type_scripts - // .iter() - // .map(|script| script.clone().into()) - // .collect(), - // ) - .enable_miner( - shared.clone(), - network_controller.clone(), - chain_controller.clone(), - miner_enable, - ) - // .enable_net(network_controller.clone(), sync_shared) - // .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) - // .enable_experiment(shared.clone()) - .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) - .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) - // .enable_indexer( - // shared.clone(), - // &self.args.config.db, - // &self.args.config.indexer, - // ) - .enable_debug(); + let builder = ServiceBuilder::new(&rpc_config) + .enable_chain(shared.clone()) + .enable_pool( + shared.clone(), + rpc_config + .extra_well_known_lock_scripts + .iter() + .map(|script| script.clone().into()) + .collect(), + rpc_config + .extra_well_known_type_scripts + .iter() + .map(|script| script.clone().into()) + .collect(), + ) + .enable_miner( + shared.clone(), + network_controller.clone(), + chain_controller.clone(), + miner_enable, + ) + .enable_net(network_controller.clone(), sync_shared) + .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) + .enable_experiment(shared.clone()) + .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) + .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) + .enable_indexer( + shared.clone(), + &self.args.config.db, + &self.args.config.indexer, + ) + .enable_debug(); let io_handler = builder.build(); RpcServer::start_jsonrpc_server( From 1b517dfd1a80fcbe162f37183e52fcb0c758d559 Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 20 Sep 2023 18:33:43 +0800 Subject: [PATCH 05/31] fix deprecated issues --- ckb-bin/src/lib.rs | 7 +- ckb-bin/src/subcommand/run.rs | 50 ++++++------- rpc/src/module/alert.rs | 7 -- rpc/src/module/chain.rs | 44 +++++++++--- rpc/src/service_builder.rs | 132 ++++++++++++++++++++++++---------- src/main.rs | 4 +- util/launcher/src/lib.rs | 6 +- 7 files changed, 154 insertions(+), 96 deletions(-) diff --git a/ckb-bin/src/lib.rs b/ckb-bin/src/lib.rs index efbdb5e482..825a95b097 100644 --- a/ckb-bin/src/lib.rs +++ b/ckb-bin/src/lib.rs @@ -6,7 +6,7 @@ mod setup_guard; mod subcommand; use ckb_app_config::{cli, ExitCode, Setup}; -use ckb_async_runtime::Handle; +use ckb_async_runtime::new_global_runtime; use ckb_build_info::Version; use ckb_logger::info; use ckb_network::tokio; @@ -25,7 +25,7 @@ pub(crate) const LOG_TARGET_SENTRY: &str = "sentry"; /// /// * `version` - The version is passed in so the bin crate can collect the version without trigger /// re-linking. -pub async fn run_app(version: Version, mut handle: Handle) -> Result<(), ExitCode> { +pub fn run_app(version: Version) -> Result<(), ExitCode> { // Always print backtrace on panic. ::std::env::set_var("RUST_BACKTRACE", "full"); @@ -58,13 +58,14 @@ pub async fn run_app(version: Version, mut handle: Handle) -> Result<(), ExitCod .expect("SubcommandRequiredElseHelp"); let is_silent_logging = is_silent_logging(cmd); + let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(); let setup = Setup::from_matches(bin_name, cmd, matches)?; let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?; raise_fd_limit(); let ret = match cmd { - cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()).await, + cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()), cli::CMD_MINER => subcommand::miner(setup.miner(matches)?, handle.clone()), cli::CMD_REPLAY => subcommand::replay(setup.replay(matches)?, handle.clone()), cli::CMD_EXPORT => subcommand::export(setup.export(matches)?, handle.clone()), diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 1b5bc48a97..012d38d0cd 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -11,21 +11,11 @@ use ckb_stop_handler::{broadcast_exit_signals, wait_all_ckb_services_exit}; use ckb_types::core::cell::setup_system_cell_cache; -// pub fn run( -// args: RunArgs, -// version: Version, -// async_handle: Handle, -// runtime: Runtime, -// ) -> Result<(), ExitCode> { -// //runtime.spawn_blocking(|| run_inner(args, version, async_handle)); -// runtime.block_on(run_inner(args, version, async_handle)) -// } - -pub async fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { +pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { deadlock_detection(); info!("ckb version: {}", version); - let mut launcher = Launcher::new(args, version, async_handle); + let mut launcher = Launcher::new(args, version, async_handle.clone()); let block_assembler_config = launcher.sanitize_block_assembler_config()?; let miner_enable = block_assembler_config.is_some(); @@ -57,26 +47,26 @@ pub async fn run(args: RunArgs, version: Version, async_handle: Handle) -> Resul launcher.start_block_filter(&shared); - let rpc_task = spawn(async move { - let network_controller = launcher - .start_network_and_rpc( - &shared, - chain_controller.clone(), - miner_enable, - pack.take_relay_tx_receiver(), - ) - .await; - eprintln!("network_controller begin to run ...."); - eprintln!("end network_controller run ...."); - - let tx_pool_builder = pack.take_tx_pool_builder(); - tx_pool_builder.start(network_controller.clone()); + async_handle.block_on(async move { + let rpc_task = spawn(async move { + let network_controller = launcher + .start_network_and_rpc( + &shared, + chain_controller.clone(), + miner_enable, + pack.take_relay_tx_receiver(), + ) + .await; + + let tx_pool_builder = pack.take_tx_pool_builder(); + tx_pool_builder.start(network_controller.clone()); + }); + + tokio::select! { + _ = rpc_task => {}, + } }); - tokio::select! { - _ = rpc_task => {}, - }; - ctrlc::set_handler(|| { info!("Trapped exit signal, exiting..."); broadcast_exit_signals(); diff --git a/rpc/src/module/alert.rs b/rpc/src/module/alert.rs index a01ece7707..7f9f22bd86 100644 --- a/rpc/src/module/alert.rs +++ b/rpc/src/module/alert.rs @@ -70,9 +70,6 @@ pub trait AlertRpc { /// ``` #[rpc(name = "send_alert")] async fn send_alert(&self, alert: Alert) -> Result<()>; - - #[rpc(name = "hello")] - async fn hello(&self) -> Result; } #[derive(Clone)] @@ -128,8 +125,4 @@ impl AlertRpc for AlertRpcImpl { )), } } - - async fn hello(&self) -> Result { - Ok(format!("Hello, Yukang!")) - } } diff --git a/rpc/src/module/chain.rs b/rpc/src/module/chain.rs index 829c2be1c6..c1f504f684 100644 --- a/rpc/src/module/chain.rs +++ b/rpc/src/module/chain.rs @@ -1,5 +1,6 @@ use crate::error::RPCError; use crate::util::FeeRateCollector; +use async_trait::async_trait; use ckb_jsonrpc_types::{ BlockEconomicState, BlockFilter, BlockNumber, BlockResponse, BlockView, CellWithStatus, Consensus, EpochNumber, EpochView, EstimateCycles, FeeRateStatistics, HeaderView, OutPoint, @@ -26,10 +27,9 @@ use ckb_types::{ use ckb_verification::ScriptVerifier; use ckb_verification::TxVerifyEnv; use jsonrpc_core::Result; +use jsonrpc_utils::rpc; use std::collections::HashSet; use std::sync::Arc; -use jsonrpc_utils::rpc; -use async_trait::async_trait; /// RPC Module Chain for methods related to the canonical chain. /// @@ -764,7 +764,8 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_tip_header")] - async fn get_tip_header(&self, verbosity: Option) -> Result>; + async fn get_tip_header(&self, verbosity: Option) + -> Result>; /// Returns the status of a cell. The RPC returns extra information if it is a [live cell](#live-cell). /// @@ -1006,7 +1007,10 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_economic_state")] - async fn get_block_economic_state(&self, block_hash: H256) -> Result>; + async fn get_block_economic_state( + &self, + block_hash: H256, + ) -> Result>; /// Returns a Merkle proof that transactions are included in a block. /// @@ -1562,8 +1566,11 @@ pub trait ChainRpc { since = "0.109.0", note = "Please use the RPC method [`get_fee_rate_statistics`](#tymethod.get_fee_rate_statistics) instead" )] - #[rpc(name = "get_fee_rate_statics")] - async fn get_fee_rate_statics(&self, target: Option) -> Result>; + #[rpc(name = "deprecated.get_fee_rate_statics")] + async fn get_fee_rate_statics( + &self, + target: Option, + ) -> Result>; /// Returns the fee_rate statistics of confirmed blocks on the chain /// @@ -1605,7 +1612,10 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_fee_rate_statistics")] - async fn get_fee_rate_statistics(&self, target: Option) -> Result>; + async fn get_fee_rate_statistics( + &self, + target: Option, + ) -> Result>; } #[derive(Clone)] @@ -1778,7 +1788,10 @@ impl ChainRpc for ChainRpcImpl { .map(|h| h.unpack())) } - async fn get_tip_header(&self, verbosity: Option) -> Result> { + async fn get_tip_header( + &self, + verbosity: Option, + ) -> Result> { let verbosity = verbosity .map(|v| v.value()) .unwrap_or(DEFAULT_HEADER_VERBOSITY_LEVEL); @@ -1825,7 +1838,10 @@ impl ChainRpc for ChainRpcImpl { Ok(self.shared.snapshot().tip_header().number().into()) } - async fn get_block_economic_state(&self, block_hash: H256) -> Result> { + async fn get_block_economic_state( + &self, + block_hash: H256, + ) -> Result> { let snapshot = self.shared.snapshot(); let block_number = if let Some(block_number) = snapshot.get_block_number(&block_hash.pack()) @@ -2106,12 +2122,18 @@ impl ChainRpc for ChainRpcImpl { CyclesEstimator::new(&self.shared).run(tx) } - async fn get_fee_rate_statics(&self, target: Option) -> Result> { + async fn get_fee_rate_statics( + &self, + target: Option, + ) -> Result> { Ok(FeeRateCollector::new(self.shared.snapshot().as_ref()) .statistics(target.map(Into::into))) } - async fn get_fee_rate_statistics(&self, target: Option) -> Result> { + async fn get_fee_rate_statistics( + &self, + target: Option, + ) -> Result> { Ok(FeeRateCollector::new(self.shared.snapshot().as_ref()) .statistics(target.map(Into::into))) } diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index 4d0b540401..ee7ac5b6a9 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -18,6 +18,9 @@ use ckb_shared::shared::Shared; use ckb_sync::SyncShared; use ckb_types::packed::Script; use ckb_util::Mutex; +use jsonrpc_core::MetaIoHandler; +use jsonrpc_core::RemoteProcedure; +use jsonrpc_utils::pub_sub::Session; use std::sync::Arc; const DEPRECATED_RPC_PREFIX: &str = "deprecated."; @@ -39,9 +42,13 @@ impl<'a> ServiceBuilder<'a> { /// Mounts methods from module Chain if it is enabled in the config. pub fn enable_chain(mut self, shared: Shared) -> Self { + let mut meta_io = MetaIoHandler::default(); if self.config.chain_enable() { let methods = ChainRpcImpl { shared }; - add_chain_rpc_methods(&mut self.io_handler, methods); + add_chain_rpc_methods(&mut meta_io, methods); + self.add_methods(meta_io); + } else { + self.update_disabled_methods("Chain", meta_io); } self } @@ -53,15 +60,17 @@ impl<'a> ServiceBuilder<'a> { extra_well_known_lock_scripts: Vec