From 6c8a3946aecd19e910fc1a55faf793405cb056f4 Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 27 Oct 2023 10:16:11 +0800 Subject: [PATCH] fix orphan pool issue for long pending tx --- spec/src/consensus.rs | 6 +- sync/src/types/mod.rs | 6 +- test/src/main.rs | 4 + test/src/node.rs | 23 ++- test/src/specs/tx_pool/orphan_tx.rs | 251 ++++++++++++++++++++++++++ tx-pool/src/component/orphan.rs | 61 +++++-- tx-pool/src/component/tests/mod.rs | 1 + tx-pool/src/component/tests/orphan.rs | 58 ++++++ tx-pool/src/pool.rs | 1 + tx-pool/src/process.rs | 65 ++++--- 10 files changed, 424 insertions(+), 52 deletions(-) create mode 100644 tx-pool/src/component/tests/orphan.rs diff --git a/spec/src/consensus.rs b/spec/src/consensus.rs index 0f059d1dfb..1627c483f6 100644 --- a/spec/src/consensus.rs +++ b/spec/src/consensus.rs @@ -60,8 +60,10 @@ pub(crate) const GENESIS_EPOCH_LENGTH: u64 = 1_000; // o_ideal = 1/40 = 2.5% pub(crate) const DEFAULT_ORPHAN_RATE_TARGET: (u32, u32) = (1, 40); -const MAX_BLOCK_INTERVAL: u64 = 48; // 48s -const MIN_BLOCK_INTERVAL: u64 = 8; // 8s +/// max block interval, 48 seconds +pub const MAX_BLOCK_INTERVAL: u64 = 48; +/// min block interval, 8 seconds +pub const MIN_BLOCK_INTERVAL: u64 = 8; /// cycles of a typical two-in-two-out tx. pub const TWO_IN_TWO_OUT_CYCLES: Cycle = 3_500_000; diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 0417021488..3171878242 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -4,7 +4,7 @@ use crate::utils::is_internal_db_error; use crate::{Status, StatusCode, FAST_INDEX, LOW_INDEX, NORMAL_INDEX, TIME_TRACE_SIZE}; use ckb_app_config::SyncConfig; use ckb_chain::chain::ChainController; -use ckb_chain_spec::consensus::Consensus; +use ckb_chain_spec::consensus::{Consensus, MAX_BLOCK_INTERVAL, MIN_BLOCK_INTERVAL}; use ckb_channel::Receiver; use ckb_constant::sync::{ BLOCK_DOWNLOAD_TIMEOUT, HEADERS_DOWNLOAD_HEADERS_PER_SECOND, HEADERS_DOWNLOAD_INSPECT_WINDOW, @@ -103,8 +103,8 @@ impl ChainSyncState { fn tip_synced(&mut self) { let now = unix_time_as_millis(); - // use avg block interval: (MAX_BLOCK_INTERVAL + MIN_BLOCK_INTERVAL) / 2 = 28 - self.headers_sync_state = HeadersSyncState::TipSynced(now + 28000); + let avg_interval = (MAX_BLOCK_INTERVAL + MIN_BLOCK_INTERVAL) / 2; + self.headers_sync_state = HeadersSyncState::TipSynced(now + avg_interval * 1000); } fn started(&self) -> bool { diff --git a/test/src/main.rs b/test/src/main.rs index 39f9e87008..030617b27c 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -426,6 +426,10 @@ fn all_specs() -> Vec> { Box::new(DeclaredWrongCyclesChunk), Box::new(DeclaredWrongCyclesAndRelayAgain), Box::new(OrphanTxAccepted), + Box::new(TxPoolOrphanNormal), + Box::new(TxPoolOrphanReverse), + Box::new(TxPoolOrphanUnordered), + Box::new(TxPoolOrphanDoubleSpend), Box::new(OrphanTxRejected), Box::new(GetRawTxPool), Box::new(PoolReconcile), diff --git a/test/src/node.rs b/test/src/node.rs index 5ef6167312..65bee5851a 100644 --- a/test/src/node.rs +++ b/test/src/node.rs @@ -346,6 +346,13 @@ impl Node { self.submit_transaction(&self.new_transaction_spend_tip_cellbase()) } + // generate a transaction which spend tip block's cellbase and capacity + pub fn new_transaction_with_capacity(&self, capacity: Capacity) -> TransactionView { + let block = self.get_tip_block(); + let cellbase = &block.transactions()[0]; + self.new_transaction_with_since_capacity(cellbase.hash(), 0, capacity) + } + // generate a transaction which spend tip block's cellbase pub fn new_transaction_spend_tip_cellbase(&self) -> TransactionView { let block = self.get_tip_block(); @@ -539,11 +546,12 @@ impl Node { self.new_transaction_with_since_capacity(hash, since, capacity_bytes!(100)) } - pub fn new_transaction_with_since_capacity( + pub fn new_transaction_with_capacity_and_index( &self, hash: Byte32, - since: u64, capacity: Capacity, + index: u32, + since: u64, ) -> TransactionView { let always_success_cell_dep = self.always_success_cell_dep(); let always_success_script = self.always_success_script(); @@ -557,10 +565,19 @@ impl Node { .build(), ) .output_data(Default::default()) - .input(CellInput::new(OutPoint::new(hash, 0), since)) + .input(CellInput::new(OutPoint::new(hash, index), since)) .build() } + pub fn new_transaction_with_since_capacity( + &self, + hash: Byte32, + since: u64, + capacity: Capacity, + ) -> TransactionView { + self.new_transaction_with_capacity_and_index(hash, capacity, 0, since) + } + pub fn new_always_failure_transaction(&self, hash: Byte32) -> TransactionView { let always_failure_cell_dep = self.always_failure_cell_dep(); let always_failure_script = self.always_failure_script(); diff --git a/test/src/specs/tx_pool/orphan_tx.rs b/test/src/specs/tx_pool/orphan_tx.rs index 49d0901d6c..de6f29766b 100644 --- a/test/src/specs/tx_pool/orphan_tx.rs +++ b/test/src/specs/tx_pool/orphan_tx.rs @@ -3,6 +3,12 @@ use crate::utils::wait_until; use crate::{Net, Node, Spec}; use ckb_jsonrpc_types::Status; use ckb_network::SupportProtocols; +use ckb_types::core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView}; +use ckb_types::packed::CellOutputBuilder; +use ckb_types::{ + packed::{CellInput, OutPoint}, + prelude::*, +}; const ALWAYS_SUCCESS_SCRIPT_CYCLE: u64 = 537; // always_failure, as the name implies, so it doesn't matter what the cycles are @@ -97,3 +103,248 @@ impl Spec for OrphanTxRejected { assert!(matches!(ret.tx_status.status, Status::Rejected)); } } + +// construct a tx chain with such structure: +// +// parent +// | +// tx1 +// / | \ +// tx11 tx12 tx13 +// \ | / +// final_tx +// +fn build_tx_chain( + node0: &Node, +) -> ( + Net, + ( + TransactionView, + TransactionView, + TransactionView, + TransactionView, + TransactionView, + TransactionView, + ), +) { + node0.mine_until_out_bootstrap_period(); + let parent = node0.new_transaction_with_capacity(capacity_bytes!(800)); + + let script = node0.always_success_script(); + let new_output1 = CellOutputBuilder::default() + .capacity(capacity_bytes!(200).pack()) + .lock(script.clone()) + .build(); + let new_output2 = new_output1.clone(); + let new_output3 = new_output1.clone(); + + let tx1 = parent + .as_advanced_builder() + .set_inputs(vec![CellInput::new(OutPoint::new(parent.hash(), 0), 0)]) + .set_outputs(vec![new_output1, new_output2, new_output3]) + .set_outputs_data(vec![Default::default(); 3]) + .build(); + + let tx11 = + node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 0, 0); + let tx12 = + node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 1, 0); + let tx13 = + node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 2, 0); + + let cell_dep = node0.always_success_cell_dep(); + let final_output = CellOutputBuilder::default() + .capacity(capacity_bytes!(80).pack()) + .lock(script) + .build(); + let final_tx = TransactionBuilder::default() + .cell_dep(cell_dep) + .set_inputs(vec![ + CellInput::new(OutPoint::new(tx11.hash(), 0), 0), + CellInput::new(OutPoint::new(tx12.hash(), 0), 0), + CellInput::new(OutPoint::new(tx13.hash(), 0), 0), + ]) + .set_outputs(vec![final_output]) + .set_outputs_data(vec![Default::default(); 1]) + .build(); + + let mut net = Net::new( + "orphan_tx_test", + node0.consensus(), + vec![SupportProtocols::RelayV3], + ); + net.connect(node0); + + (net, (parent, tx1, tx11, tx12, tx13, final_tx)) +} + +fn run_replay_tx( + net: &Net, + node0: &Node, + tx: TransactionView, + orphan_tx_cnt: u64, + pending_cnt: u64, +) -> bool { + relay_tx(net, node0, tx, ALWAYS_SUCCESS_SCRIPT_CYCLE); + + wait_until(5, || { + let tx_pool_info = node0.get_tip_tx_pool_info(); + tx_pool_info.orphan.value() == orphan_tx_cnt && tx_pool_info.pending.value() == pending_cnt + }) +} + +pub struct TxPoolOrphanNormal; +impl Spec for TxPoolOrphanNormal { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0); + + assert!( + run_replay_tx(&net, node0, parent, 0, 1), + "parent sended expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx1, 0, 2), + "tx1 is sent expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx11, 0, 3), + "tx11 is sent expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx12, 0, 4), + "tx12 is sent expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx13, 0, 5), + "tx13 is sent expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, final_tx, 0, 6), + "final_tx is sent expect nothing in orphan pool" + ); + } +} + +pub struct TxPoolOrphanReverse; +impl Spec for TxPoolOrphanReverse { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0); + + assert!( + run_replay_tx(&net, node0, final_tx, 1, 0), + "expect final_tx is in orphan pool" + ); + + assert!( + run_replay_tx(&net, node0, tx13, 2, 0), + "tx13 in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx12, 3, 0), + "tx12 is in orphan pool" + ); + assert!(run_replay_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan"); + + assert!(run_replay_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan"); + + assert!( + run_replay_tx(&net, node0, parent, 0, 6), + "all is in pending" + ); + } +} + +pub struct TxPoolOrphanUnordered; +impl Spec for TxPoolOrphanUnordered { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0); + + assert!( + run_replay_tx(&net, node0, final_tx, 1, 0), + "expect final_tx is in orphan pool" + ); + + assert!( + run_replay_tx(&net, node0, tx11, 2, 0), + "tx11 in orphan pool" + ); + let tx12_clone = tx12.clone(); + assert!( + run_replay_tx(&net, node0, tx12, 3, 0), + "tx12 is in orphan pool" + ); + + // set tx12_clone with rpc + let ret = node0 + .rpc_client() + .send_transaction_result(tx12_clone.data().into()); + assert!(ret + .err() + .unwrap() + .to_string() + .contains("already exist in transaction_pool")); + + assert!( + run_replay_tx(&net, node0, parent, 3, 1), + "parent is sent, should be in pending without change orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx1, 1, 4), + "tx1 is sent, orphan pool only contains final_tx" + ); + + assert!( + run_replay_tx(&net, node0, tx13, 0, 6), + "tx13 is sent, orphan pool is empty" + ); + } +} + +pub struct TxPoolOrphanDoubleSpend; +impl Spec for TxPoolOrphanDoubleSpend { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + node0.mine_until_out_bootstrap_period(); + let parent = node0.new_transaction_with_capacity(capacity_bytes!(800)); + + let script = node0.always_success_script(); + let new_output1 = CellOutputBuilder::default() + .capacity(capacity_bytes!(200).pack()) + .lock(script.clone()) + .build(); + let new_output2 = new_output1.clone(); + let new_output3 = new_output1.clone(); + + let tx1 = parent + .as_advanced_builder() + .set_inputs(vec![CellInput::new(OutPoint::new(parent.hash(), 0), 0)]) + .set_outputs(vec![new_output1, new_output2, new_output3]) + .set_outputs_data(vec![Default::default(); 3]) + .build(); + + let tx11 = + node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 0, 0); + let tx12 = + node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(120), 0, 0); + + let mut net = Net::new( + "orphan_tx_test", + node0.consensus(), + vec![SupportProtocols::RelayV3], + ); + net.connect(node0); + + assert!( + run_replay_tx(&net, node0, tx11, 1, 0), + "tx11 in orphan pool" + ); + + assert!( + run_replay_tx(&net, node0, tx12, 1, 0), + "tx12 is not in orphan pool" + ); + } +} diff --git a/tx-pool/src/component/orphan.rs b/tx-pool/src/component/orphan.rs index 6cc386a0dc..2a11630ce8 100644 --- a/tx-pool/src/component/orphan.rs +++ b/tx-pool/src/component/orphan.rs @@ -1,14 +1,18 @@ +use ckb_chain_spec::consensus::MAX_BLOCK_INTERVAL; use ckb_logger::{debug, trace}; use ckb_network::PeerIndex; +use ckb_types::packed::Byte32; use ckb_types::{ core::{Cycle, TransactionView}, packed::{OutPoint, ProposalShortId}, }; use ckb_util::shrink_to_fit; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; const SHRINK_THRESHOLD: usize = 100; -pub(crate) const ORPHAN_TX_EXPIRE_TIME: u64 = 2 * 48; // double block interval + +/// 100 max block interval +pub(crate) const ORPHAN_TX_EXPIRE_TIME: u64 = 100 * MAX_BLOCK_INTERVAL; pub(crate) const DEFAULT_MAX_ORPHAN_TRANSACTIONS: usize = 100; #[derive(Debug, Clone)] @@ -37,7 +41,7 @@ impl Entry { #[derive(Default, Debug, Clone)] pub(crate) struct OrphanPool { pub(crate) entries: HashMap, - pub(crate) by_out_point: HashMap, + pub(crate) by_out_point: HashMap>, } impl OrphanPool { @@ -58,7 +62,7 @@ impl OrphanPool { self.entries.contains_key(id) } - pub fn shrink_to_fit(&mut self) { + fn shrink_to_fit(&mut self) { shrink_to_fit!(self.entries, SHRINK_THRESHOLD); shrink_to_fit!(self.by_out_point, SHRINK_THRESHOLD); } @@ -69,8 +73,11 @@ impl OrphanPool { pub fn remove_orphan_tx(&mut self, id: &ProposalShortId) -> Option { self.entries.remove(id).map(|entry| { + debug!("remove orphan tx {}", entry.tx.hash()); for out_point in entry.tx.input_pts_iter() { - self.by_out_point.remove(&out_point); + self.by_out_point + .get_mut(&out_point) + .map(|set| set.remove(id)); } entry }) @@ -83,7 +90,7 @@ impl OrphanPool { self.shrink_to_fit(); } - pub fn limit_size(&mut self) -> usize { + fn limit_size(&mut self) -> Vec { let now = ckb_systemtime::unix_time().as_secs(); let expires: Vec<_> = self .entries @@ -98,46 +105,60 @@ impl OrphanPool { .cloned() .collect(); - let mut evicted = expires.len(); + let mut evicted_txs = vec![]; for id in expires { - self.remove_orphan_tx(&id); + if let Some(entry) = self.remove_orphan_tx(&id) { + evicted_txs.push(entry.tx.hash()); + } } while self.len() > DEFAULT_MAX_ORPHAN_TRANSACTIONS { - evicted += 1; // Evict a random orphan: let id = self.entries.keys().next().cloned().expect("bound checked"); - self.remove_orphan_tx(&id); + if let Some(entry) = self.remove_orphan_tx(&id) { + evicted_txs.push(entry.tx.hash()); + } } - if evicted > 0 { - trace!("OrphanTxPool full, evicted {} tx", evicted); + if !evicted_txs.is_empty() { + trace!("OrphanTxPool full, evicted {} tx", evicted_txs.len()); self.shrink_to_fit(); } - evicted + evicted_txs } - pub fn add_orphan_tx(&mut self, tx: TransactionView, peer: PeerIndex, declared_cycle: Cycle) { + pub fn add_orphan_tx( + &mut self, + tx: TransactionView, + peer: PeerIndex, + declared_cycle: Cycle, + ) -> Vec { if self.entries.contains_key(&tx.proposal_short_id()) { - return; + return vec![]; } - debug!("add_orphan_tx {}", tx.hash()); + debug!("add_orphan_tx {}", tx.hash()); self.entries.insert( tx.proposal_short_id(), Entry::new(tx.clone(), peer, declared_cycle), ); for out_point in tx.input_pts_iter() { - self.by_out_point.insert(out_point, tx.proposal_short_id()); + self.by_out_point + .entry(out_point) + .or_insert_with(HashSet::default) + .insert(tx.proposal_short_id()); } - self.limit_size(); + + self.limit_size() } - pub fn find_by_previous(&self, tx: &TransactionView) -> Option { + pub fn find_by_previous(&self, tx: &TransactionView) -> Vec<&ProposalShortId> { tx.output_pts() .iter() - .find_map(|out_point| self.by_out_point.get(out_point).cloned()) + .filter_map(|out_point| self.by_out_point.get(out_point)) + .flatten() + .collect::>() } } diff --git a/tx-pool/src/component/tests/mod.rs b/tx-pool/src/component/tests/mod.rs index fb851e4855..13ba389e82 100644 --- a/tx-pool/src/component/tests/mod.rs +++ b/tx-pool/src/component/tests/mod.rs @@ -1,5 +1,6 @@ mod chunk; mod entry; +mod orphan; mod pending; mod proposed; mod recent_reject; diff --git a/tx-pool/src/component/tests/orphan.rs b/tx-pool/src/component/tests/orphan.rs new file mode 100644 index 0000000000..bfd5e00cda --- /dev/null +++ b/tx-pool/src/component/tests/orphan.rs @@ -0,0 +1,58 @@ +use crate::component::orphan::OrphanPool; +use crate::component::tests::util::build_tx; +use ckb_types::packed::Byte32; + +#[test] +fn test_orphan() { + let tx1 = build_tx(vec![(&Byte32::zero(), 1), (&Byte32::zero(), 2)], 1); + let mut orphan = OrphanPool::new(); + assert_eq!(orphan.len(), 0); + assert!(!orphan.contains_key(&tx1.proposal_short_id())); + + orphan.add_orphan_tx(tx1.clone(), 0.into(), 0); + assert_eq!(orphan.len(), 1); + + orphan.add_orphan_tx(tx1.clone(), 0.into(), 0); + assert_eq!(orphan.len(), 1); + + let tx2 = build_tx(vec![(&tx1.hash(), 0)], 1); + orphan.add_orphan_tx(tx2.clone(), 0.into(), 0); + assert_eq!(orphan.len(), 2); + + orphan.remove_orphan_tx(&tx1.proposal_short_id()); + assert_eq!(orphan.len(), 1); + orphan.remove_orphan_tx(&tx2.proposal_short_id()); + assert_eq!(orphan.len(), 0); +} + +#[test] +fn test_orphan_duplicated() { + let tx1 = build_tx(vec![(&Byte32::zero(), 1), (&Byte32::zero(), 2)], 3); + let mut orphan = OrphanPool::new(); + + let tx2 = build_tx(vec![(&tx1.hash(), 0)], 1); + let tx3 = build_tx(vec![(&tx2.hash(), 0)], 1); + let tx4 = build_tx(vec![(&tx3.hash(), 0), (&tx1.hash(), 1)], 1); + let tx5 = build_tx(vec![(&tx1.hash(), 0)], 2); + orphan.add_orphan_tx(tx1.clone(), 0.into(), 0); + orphan.add_orphan_tx(tx2.clone(), 0.into(), 0); + orphan.add_orphan_tx(tx3.clone(), 0.into(), 0); + orphan.add_orphan_tx(tx4.clone(), 0.into(), 0); + orphan.add_orphan_tx(tx5.clone(), 0.into(), 0); + assert_eq!(orphan.len(), 5); + + let txs = orphan.find_by_previous(&tx2); + assert_eq!(txs.len(), 1); + + let txs = orphan.find_by_previous(&tx1); + assert_eq!(txs.len(), 3); + assert!(txs.contains(&&tx2.proposal_short_id())); + assert!(txs.contains(&&tx4.proposal_short_id())); + assert!(txs.contains(&&tx5.proposal_short_id())); + + orphan.remove_orphan_tx(&tx4.proposal_short_id()); + let txs = orphan.find_by_previous(&tx1); + assert_eq!(txs.len(), 2); + assert!(txs.contains(&&tx2.proposal_short_id())); + assert!(txs.contains(&&tx5.proposal_short_id())); +} diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index cb7d207aaa..20e3028e32 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -203,6 +203,7 @@ impl TxPool { ) { for tx in txs { let tx_hash = tx.hash(); + debug!("try remove_committed_tx {}", tx_hash); self.remove_committed_tx(tx, callbacks); self.committed_txs_hash_cache diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index d05498e99f..e9be766d56 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -277,7 +277,12 @@ impl TxPoolService { // non contextual verify first self.non_contextual_verify(&tx, None)?; - if self.chunk_contains(&tx).await || self.orphan_contains(&tx).await { + if self.chunk_contains(&tx).await { + return Err(Reject::Duplicated(tx.hash())); + } + + if self.orphan_contains(&tx).await { + debug!("reject tx {} already in orphan pool", tx.hash()); return Err(Reject::Duplicated(tx.hash())); } @@ -460,38 +465,46 @@ impl TxPoolService { peer: PeerIndex, declared_cycle: Cycle, ) { - self.orphan + let evicted_txs = self + .orphan .write() .await - .add_orphan_tx(tx, peer, declared_cycle) + .add_orphan_tx(tx, peer, declared_cycle); + // for any evicted orphan tx, we should send reject to relayer + // so that we mark it as `unknown` in filter + for tx_hash in evicted_txs { + self.send_result_to_relayer(TxVerificationResult::Reject { tx_hash }); + } } - pub(crate) async fn find_orphan_by_previous( - &self, - tx: &TransactionView, - ) -> Option { + pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec { let orphan = self.orphan.read().await; - if let Some(id) = orphan.find_by_previous(tx) { - return orphan.get(&id).cloned(); - } - None + orphan + .find_by_previous(tx) + .iter() + .filter_map(|id| orphan.get(id).cloned()) + .collect::>() } pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) { self.orphan.write().await.remove_orphan_tx(id); } + /// Remove all orphans which are resolved by the given transaction + /// the process is like a breath first search, if there is a cycle in `orphan_queue`, + /// `_process_tx` will return `Reject` since we have checked duplicated tx pub(crate) async fn process_orphan_tx(&self, tx: &TransactionView) { let mut orphan_queue: VecDeque = VecDeque::new(); orphan_queue.push_back(tx.clone()); while let Some(previous) = orphan_queue.pop_front() { - if let Some(orphan) = self.find_orphan_by_previous(&previous).await { + let orphans = self.find_orphan_by_previous(&previous).await; + for orphan in orphans.into_iter() { if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles { debug!( - "process_orphan {} add to chunk, find previous from {}", + "process_orphan {} add to chunk, find previous from {}", + orphan.tx.hash(), tx.hash(), - orphan.tx.hash() ); self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await; self.chunk @@ -522,8 +535,8 @@ impl TxPoolService { }); debug!( "process_orphan {} success, find previous from {}", - tx.hash(), - orphan.tx.hash() + orphan.tx.hash(), + tx.hash() ); self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await; orphan_queue.push_back(orphan.tx); @@ -531,10 +544,11 @@ impl TxPoolService { Err(reject) => { debug!( "process_orphan {} reject {}, find previous from {}", - tx.hash(), + orphan.tx.hash(), reject, - orphan.tx.hash() + tx.hash(), ); + if !is_missing_input(&reject) { self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await; if reject.is_malformed_tx() { @@ -554,7 +568,6 @@ impl TxPoolService { self.put_recent_reject(&orphan.tx.hash(), &reject).await; } } - break; } } } @@ -914,17 +927,21 @@ impl TxPoolService { } } - { - let mut orphan = self.orphan.write().await; - orphan.remove_orphan_txs(attached.iter().map(|tx| tx.proposal_short_id())); - } - + self.remove_orphan_txs_by_attach(&attached).await; { let mut chunk = self.chunk.write().await; chunk.remove_chunk_txs(attached.iter().map(|tx| tx.proposal_short_id())); } } + async fn remove_orphan_txs_by_attach<'a>(&self, txs: &LinkedHashSet) { + for tx in txs.iter() { + self.process_orphan_tx(tx).await; + } + let mut orphan = self.orphan.write().await; + orphan.remove_orphan_txs(txs.iter().map(|tx| tx.proposal_short_id())); + } + fn readd_detached_tx( &self, tx_pool: &mut TxPool,