From 44f294b4db545d1d2395d68acd595f8a9e2f6ebd Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 20 Oct 2023 22:56:27 +0800 Subject: [PATCH] fix orphan issues --- test/src/main.rs | 3 + test/src/node.rs | 23 +++- test/src/specs/tx_pool/orphan_tx.rs | 195 ++++++++++++++++++++++++++++ tx-pool/src/component/orphan.rs | 5 +- tx-pool/src/process.rs | 40 +++--- 5 files changed, 243 insertions(+), 23 deletions(-) diff --git a/test/src/main.rs b/test/src/main.rs index c35dcada8e3..d01cc53f8b8 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -426,6 +426,9 @@ fn all_specs() -> Vec> { Box::new(DeclaredWrongCyclesChunk), Box::new(DeclaredWrongCyclesAndRelayAgain), Box::new(OrphanTxAccepted), + Box::new(TxPoolOrphanNormal), + Box::new(TxPoolOrphanReverse), + Box::new(TxPoolOrphanUnordered), Box::new(OrphanTxRejected), Box::new(GetRawTxPool), Box::new(PoolReconcile), diff --git a/test/src/node.rs b/test/src/node.rs index 143367a1481..ce5748c3ba5 100644 --- a/test/src/node.rs +++ b/test/src/node.rs @@ -344,6 +344,13 @@ impl Node { self.submit_transaction(&self.new_transaction_spend_tip_cellbase()) } + // generate a transaction which spend tip block's cellbase and capacity + pub fn new_transaction_with_capacity(&self, capacity: Capacity) -> TransactionView { + let block = self.get_tip_block(); + let cellbase = &block.transactions()[0]; + self.new_transaction_with_since_capacity(cellbase.hash(), 0, capacity) + } + // generate a transaction which spend tip block's cellbase pub fn new_transaction_spend_tip_cellbase(&self) -> TransactionView { let block = self.get_tip_block(); @@ -522,11 +529,12 @@ impl Node { self.new_transaction_with_since_capacity(hash, since, capacity_bytes!(100)) } - pub fn new_transaction_with_since_capacity( + pub fn new_transaction_with_capacity_and_index( &self, hash: Byte32, - since: u64, capacity: Capacity, + index: u32, + since: u64, ) -> TransactionView { let always_success_cell_dep = self.always_success_cell_dep(); let always_success_script = self.always_success_script(); @@ -540,10 +548,19 @@ impl Node { .build(), ) .output_data(Default::default()) - .input(CellInput::new(OutPoint::new(hash, 0), since)) + .input(CellInput::new(OutPoint::new(hash, index), since)) .build() } + pub fn new_transaction_with_since_capacity( + &self, + hash: Byte32, + since: u64, + capacity: Capacity, + ) -> TransactionView { + self.new_transaction_with_capacity_and_index(hash, capacity, 0, since) + } + pub fn new_always_failure_transaction(&self, hash: Byte32) -> TransactionView { let always_failure_cell_dep = self.always_failure_cell_dep(); let always_failure_script = self.always_failure_script(); diff --git a/test/src/specs/tx_pool/orphan_tx.rs b/test/src/specs/tx_pool/orphan_tx.rs index 49d0901d6c1..f41f6e478c3 100644 --- a/test/src/specs/tx_pool/orphan_tx.rs +++ b/test/src/specs/tx_pool/orphan_tx.rs @@ -3,6 +3,12 @@ use crate::utils::wait_until; use crate::{Net, Node, Spec}; use ckb_jsonrpc_types::Status; use ckb_network::SupportProtocols; +use ckb_types::core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView}; +use ckb_types::packed::CellOutputBuilder; +use ckb_types::{ + packed::{CellInput, OutPoint}, + prelude::*, +}; const ALWAYS_SUCCESS_SCRIPT_CYCLE: u64 = 537; // always_failure, as the name implies, so it doesn't matter what the cycles are @@ -97,3 +103,192 @@ impl Spec for OrphanTxRejected { assert!(matches!(ret.tx_status.status, Status::Rejected)); } } + +fn build_tx_chain( + node0: &Node, +) -> ( + Net, + ( + TransactionView, + TransactionView, + TransactionView, + TransactionView, + TransactionView, + TransactionView, + ), +) { + node0.mine_until_out_bootstrap_period(); + let parent = node0.new_transaction_with_capacity(capacity_bytes!(800)); + + let script = node0.always_success_script(); + let new_output1 = CellOutputBuilder::default() + .capacity(capacity_bytes!(200).pack()) + .lock(script.clone()) + .build(); + let new_output2 = new_output1.clone(); + let new_output3 = new_output1.clone(); + + let tx1 = parent + .as_advanced_builder() + .set_inputs(vec![CellInput::new(OutPoint::new(parent.hash(), 0), 0)]) + .set_outputs(vec![new_output1, new_output2, new_output3]) + .set_outputs_data(vec![Default::default(); 3]) + .build(); + + let tx11 = + node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 0, 0); + let tx12 = + node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 1, 0); + let tx13 = + node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 2, 0); + + let cell_dep = node0.always_success_cell_dep(); + let final_output = CellOutputBuilder::default() + .capacity(capacity_bytes!(80).pack()) + .lock(script.clone()) + .build(); + let final_tx = TransactionBuilder::default() + .cell_dep(cell_dep) + .set_inputs(vec![ + CellInput::new(OutPoint::new(tx11.hash(), 0), 0), + CellInput::new(OutPoint::new(tx12.hash(), 0), 0), + CellInput::new(OutPoint::new(tx13.hash(), 0), 0), + ]) + .set_outputs(vec![final_output]) + .set_outputs_data(vec![Default::default(); 1]) + .build(); + + let mut net = Net::new( + "orphan_tx_test", + node0.consensus(), + vec![SupportProtocols::RelayV3], + ); + net.connect(node0); + + return (net, (parent, tx1, tx11, tx12, tx13, final_tx)); +} + +fn run_replay_tx( + net: &Net, + node0: &Node, + tx: TransactionView, + orphan_tx_cnt: u64, + pending_cnt: u64, +) -> bool { + relay_tx(&net, node0, tx, ALWAYS_SUCCESS_SCRIPT_CYCLE); + let result = wait_until(5, || { + let tx_pool_info = node0.get_tip_tx_pool_info(); + tx_pool_info.orphan.value() == orphan_tx_cnt && tx_pool_info.pending.value() == pending_cnt + }); + result +} + +pub struct TxPoolOrphanNormal; +impl Spec for TxPoolOrphanNormal { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(&node0); + + assert!( + run_replay_tx(&net, node0, parent, 0, 1), + "parent sended expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx1, 0, 2), + "tx1 is send expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx11, 0, 3), + "tx11 is send expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx12, 0, 4), + "tx12 is send expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx13, 0, 5), + "tx13 is send expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, final_tx, 0, 6), + "final_tx is send expect nothing in orphan pool" + ); + } +} + +pub struct TxPoolOrphanReverse; +impl Spec for TxPoolOrphanReverse { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(&node0); + + assert!( + run_replay_tx(&net, node0, final_tx, 1, 0), + "expect final_tx is in orphan pool" + ); + + assert!( + run_replay_tx(&net, node0, tx13, 2, 0), + "tx13 in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx12, 3, 0), + "tx12 is in orphan pool" + ); + assert!(run_replay_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan"); + + assert!(run_replay_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan"); + + assert!( + run_replay_tx(&net, node0, parent, 0, 6), + "all is in pending" + ); + } +} + +pub struct TxPoolOrphanUnordered; +impl Spec for TxPoolOrphanUnordered { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(&node0); + + assert!( + run_replay_tx(&net, node0, final_tx, 1, 0), + "expect final_tx is in orphan pool" + ); + + assert!( + run_replay_tx(&net, node0, tx11, 2, 0), + "tx11 in orphan pool" + ); + let tx12_clone = tx12.clone(); + assert!( + run_replay_tx(&net, node0, tx12, 3, 0), + "tx12 is in orphan pool" + ); + + // set tx12_clone with rpc + let ret = node0 + .rpc_client() + .send_transaction_result(tx12_clone.data().into()); + assert!(ret + .err() + .unwrap() + .to_string() + .contains("already exist in transaction_pool")); + + assert!( + run_replay_tx(&net, node0, parent, 3, 1), + "parent is sent, should be in pending without change orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx1, 1, 4), + "tx1 is send, orphan pool only contains final_tx" + ); + + assert!( + run_replay_tx(&net, node0, tx13, 0, 6), + "tx13 is send, orphan pool is empty" + ); + } +} diff --git a/tx-pool/src/component/orphan.rs b/tx-pool/src/component/orphan.rs index f0551a14867..6e2c06959a1 100644 --- a/tx-pool/src/component/orphan.rs +++ b/tx-pool/src/component/orphan.rs @@ -135,9 +135,10 @@ impl OrphanPool { self.limit_size(); } - pub fn find_by_previous(&self, tx: &TransactionView) -> Option { + pub fn find_by_previous(&self, tx: &TransactionView) -> Vec { tx.output_pts() .iter() - .find_map(|out_point| self.by_out_point.get(out_point).cloned()) + .filter_map(|out_point| self.by_out_point.get(out_point).cloned()) + .collect::>() } } diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index bcc44de6ae4..e4edf883cce 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -238,7 +238,12 @@ impl TxPoolService { // non contextual verify first self.non_contextual_verify(&tx, None)?; - if self.chunk_contains(&tx).await || self.orphan_contains(&tx).await { + if self.chunk_contains(&tx).await { + return Err(Reject::Duplicated(tx.hash())); + } + + if self.orphan_contains(&tx).await { + debug!("reject tx {} already in orphan pool", tx.hash()); return Err(Reject::Duplicated(tx.hash())); } @@ -419,15 +424,12 @@ impl TxPoolService { .add_orphan_tx(tx, peer, declared_cycle) } - pub(crate) async fn find_orphan_by_previous( - &self, - tx: &TransactionView, - ) -> Option { + pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec { let orphan = self.orphan.read().await; - if let Some(id) = orphan.find_by_previous(tx) { - return orphan.get(&id).cloned(); - } - None + let ids = orphan.find_by_previous(tx); + ids.iter() + .map(|id| orphan.get(id).cloned().unwrap()) + .collect::>() } pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) { @@ -439,12 +441,13 @@ impl TxPoolService { orphan_queue.push_back(tx.clone()); while let Some(previous) = orphan_queue.pop_front() { - if let Some(orphan) = self.find_orphan_by_previous(&previous).await { + let orphans = self.find_orphan_by_previous(&previous).await; + for orphan in orphans.into_iter() { if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles { debug!( - "process_orphan {} add to chunk, find previous from {}", + "process_orphan {} add to chunk, find previous from {}", + orphan.tx.hash(), tx.hash(), - orphan.tx.hash() ); self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await; self.chunk @@ -475,8 +478,8 @@ impl TxPoolService { }); debug!( "process_orphan {} success, find previous from {}", - tx.hash(), - orphan.tx.hash() + orphan.tx.hash(), + tx.hash() ); self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await; orphan_queue.push_back(orphan.tx); @@ -484,10 +487,11 @@ impl TxPoolService { Err(reject) => { debug!( "process_orphan {} reject {}, find previous from {}", - tx.hash(), + orphan.tx.hash(), reject, - orphan.tx.hash() + tx.hash(), ); + if !is_missing_input(&reject) { self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await; if reject.is_malformed_tx() { @@ -503,7 +507,6 @@ impl TxPoolService { self.put_recent_reject(&orphan.tx.hash(), &reject).await; } } - break; } } } @@ -877,7 +880,8 @@ impl TxPoolService { let mut ids = vec![]; for tx in txs { ids.push(tx.proposal_short_id()); - if let Some(orphan) = self.find_orphan_by_previous(&tx).await { + let orphans = self.find_orphan_by_previous(&tx).await; + for orphan in orphans.into_iter() { ids.push(orphan.tx.proposal_short_id()); } }