Skip to content

Commit

Permalink
fix orphan issues
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Oct 21, 2023
1 parent dc95426 commit 44f294b
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 23 deletions.
3 changes: 3 additions & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(DeclaredWrongCyclesChunk),
Box::new(DeclaredWrongCyclesAndRelayAgain),
Box::new(OrphanTxAccepted),
Box::new(TxPoolOrphanNormal),
Box::new(TxPoolOrphanReverse),
Box::new(TxPoolOrphanUnordered),
Box::new(OrphanTxRejected),
Box::new(GetRawTxPool),
Box::new(PoolReconcile),
Expand Down
23 changes: 20 additions & 3 deletions test/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,13 @@ impl Node {
self.submit_transaction(&self.new_transaction_spend_tip_cellbase())
}

// generate a transaction which spend tip block's cellbase and capacity
pub fn new_transaction_with_capacity(&self, capacity: Capacity) -> TransactionView {
let block = self.get_tip_block();
let cellbase = &block.transactions()[0];
self.new_transaction_with_since_capacity(cellbase.hash(), 0, capacity)
}

// generate a transaction which spend tip block's cellbase
pub fn new_transaction_spend_tip_cellbase(&self) -> TransactionView {
let block = self.get_tip_block();
Expand Down Expand Up @@ -522,11 +529,12 @@ impl Node {
self.new_transaction_with_since_capacity(hash, since, capacity_bytes!(100))
}

pub fn new_transaction_with_since_capacity(
pub fn new_transaction_with_capacity_and_index(
&self,
hash: Byte32,
since: u64,
capacity: Capacity,
index: u32,
since: u64,
) -> TransactionView {
let always_success_cell_dep = self.always_success_cell_dep();
let always_success_script = self.always_success_script();
Expand All @@ -540,10 +548,19 @@ impl Node {
.build(),
)
.output_data(Default::default())
.input(CellInput::new(OutPoint::new(hash, 0), since))
.input(CellInput::new(OutPoint::new(hash, index), since))
.build()
}

pub fn new_transaction_with_since_capacity(
&self,
hash: Byte32,
since: u64,
capacity: Capacity,
) -> TransactionView {
self.new_transaction_with_capacity_and_index(hash, capacity, 0, since)
}

pub fn new_always_failure_transaction(&self, hash: Byte32) -> TransactionView {
let always_failure_cell_dep = self.always_failure_cell_dep();
let always_failure_script = self.always_failure_script();
Expand Down
195 changes: 195 additions & 0 deletions test/src/specs/tx_pool/orphan_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use crate::utils::wait_until;
use crate::{Net, Node, Spec};
use ckb_jsonrpc_types::Status;
use ckb_network::SupportProtocols;
use ckb_types::core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView};
use ckb_types::packed::CellOutputBuilder;
use ckb_types::{
packed::{CellInput, OutPoint},
prelude::*,
};

const ALWAYS_SUCCESS_SCRIPT_CYCLE: u64 = 537;
// always_failure, as the name implies, so it doesn't matter what the cycles are
Expand Down Expand Up @@ -97,3 +103,192 @@ impl Spec for OrphanTxRejected {
assert!(matches!(ret.tx_status.status, Status::Rejected));
}
}

fn build_tx_chain(
node0: &Node,
) -> (
Net,
(
TransactionView,
TransactionView,
TransactionView,
TransactionView,
TransactionView,
TransactionView,
),
) {
node0.mine_until_out_bootstrap_period();
let parent = node0.new_transaction_with_capacity(capacity_bytes!(800));

let script = node0.always_success_script();
let new_output1 = CellOutputBuilder::default()
.capacity(capacity_bytes!(200).pack())
.lock(script.clone())
.build();
let new_output2 = new_output1.clone();
let new_output3 = new_output1.clone();

let tx1 = parent
.as_advanced_builder()
.set_inputs(vec![CellInput::new(OutPoint::new(parent.hash(), 0), 0)])
.set_outputs(vec![new_output1, new_output2, new_output3])
.set_outputs_data(vec![Default::default(); 3])
.build();

let tx11 =
node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 0, 0);
let tx12 =
node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 1, 0);
let tx13 =
node0.new_transaction_with_capacity_and_index(tx1.hash(), capacity_bytes!(100), 2, 0);

let cell_dep = node0.always_success_cell_dep();
let final_output = CellOutputBuilder::default()
.capacity(capacity_bytes!(80).pack())
.lock(script.clone())
.build();
let final_tx = TransactionBuilder::default()
.cell_dep(cell_dep)
.set_inputs(vec![
CellInput::new(OutPoint::new(tx11.hash(), 0), 0),
CellInput::new(OutPoint::new(tx12.hash(), 0), 0),
CellInput::new(OutPoint::new(tx13.hash(), 0), 0),
])
.set_outputs(vec![final_output])
.set_outputs_data(vec![Default::default(); 1])
.build();

let mut net = Net::new(
"orphan_tx_test",
node0.consensus(),
vec![SupportProtocols::RelayV3],
);
net.connect(node0);

return (net, (parent, tx1, tx11, tx12, tx13, final_tx));
}

fn run_replay_tx(
net: &Net,
node0: &Node,
tx: TransactionView,
orphan_tx_cnt: u64,
pending_cnt: u64,
) -> bool {
relay_tx(&net, node0, tx, ALWAYS_SUCCESS_SCRIPT_CYCLE);
let result = wait_until(5, || {
let tx_pool_info = node0.get_tip_tx_pool_info();
tx_pool_info.orphan.value() == orphan_tx_cnt && tx_pool_info.pending.value() == pending_cnt
});
result
}

pub struct TxPoolOrphanNormal;
impl Spec for TxPoolOrphanNormal {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(&node0);

assert!(
run_replay_tx(&net, node0, parent, 0, 1),
"parent sended expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx1, 0, 2),
"tx1 is send expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx11, 0, 3),
"tx11 is send expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx12, 0, 4),
"tx12 is send expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx13, 0, 5),
"tx13 is send expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, final_tx, 0, 6),
"final_tx is send expect nothing in orphan pool"
);
}
}

pub struct TxPoolOrphanReverse;
impl Spec for TxPoolOrphanReverse {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(&node0);

assert!(
run_replay_tx(&net, node0, final_tx, 1, 0),
"expect final_tx is in orphan pool"
);

assert!(
run_replay_tx(&net, node0, tx13, 2, 0),
"tx13 in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx12, 3, 0),
"tx12 is in orphan pool"
);
assert!(run_replay_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan");

assert!(run_replay_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan");

assert!(
run_replay_tx(&net, node0, parent, 0, 6),
"all is in pending"
);
}
}

pub struct TxPoolOrphanUnordered;
impl Spec for TxPoolOrphanUnordered {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(&node0);

assert!(
run_replay_tx(&net, node0, final_tx, 1, 0),
"expect final_tx is in orphan pool"
);

assert!(
run_replay_tx(&net, node0, tx11, 2, 0),
"tx11 in orphan pool"
);
let tx12_clone = tx12.clone();
assert!(
run_replay_tx(&net, node0, tx12, 3, 0),
"tx12 is in orphan pool"
);

// set tx12_clone with rpc
let ret = node0
.rpc_client()
.send_transaction_result(tx12_clone.data().into());
assert!(ret
.err()
.unwrap()
.to_string()
.contains("already exist in transaction_pool"));

assert!(
run_replay_tx(&net, node0, parent, 3, 1),
"parent is sent, should be in pending without change orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx1, 1, 4),
"tx1 is send, orphan pool only contains final_tx"
);

assert!(
run_replay_tx(&net, node0, tx13, 0, 6),
"tx13 is send, orphan pool is empty"
);
}
}
5 changes: 3 additions & 2 deletions tx-pool/src/component/orphan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ impl OrphanPool {
self.limit_size();
}

pub fn find_by_previous(&self, tx: &TransactionView) -> Option<ProposalShortId> {
pub fn find_by_previous(&self, tx: &TransactionView) -> Vec<ProposalShortId> {
tx.output_pts()
.iter()
.find_map(|out_point| self.by_out_point.get(out_point).cloned())
.filter_map(|out_point| self.by_out_point.get(out_point).cloned())
.collect::<Vec<_>>()
}
}
40 changes: 22 additions & 18 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,12 @@ impl TxPoolService {
// non contextual verify first
self.non_contextual_verify(&tx, None)?;

if self.chunk_contains(&tx).await || self.orphan_contains(&tx).await {
if self.chunk_contains(&tx).await {
return Err(Reject::Duplicated(tx.hash()));
}

if self.orphan_contains(&tx).await {
debug!("reject tx {} already in orphan pool", tx.hash());
return Err(Reject::Duplicated(tx.hash()));
}

Expand Down Expand Up @@ -419,15 +424,12 @@ impl TxPoolService {
.add_orphan_tx(tx, peer, declared_cycle)
}

pub(crate) async fn find_orphan_by_previous(
&self,
tx: &TransactionView,
) -> Option<OrphanEntry> {
pub(crate) async fn find_orphan_by_previous(&self, tx: &TransactionView) -> Vec<OrphanEntry> {
let orphan = self.orphan.read().await;
if let Some(id) = orphan.find_by_previous(tx) {
return orphan.get(&id).cloned();
}
None
let ids = orphan.find_by_previous(tx);
ids.iter()
.map(|id| orphan.get(id).cloned().unwrap())
.collect::<Vec<_>>()
}

pub(crate) async fn remove_orphan_tx(&self, id: &ProposalShortId) {
Expand All @@ -439,12 +441,13 @@ impl TxPoolService {
orphan_queue.push_back(tx.clone());

while let Some(previous) = orphan_queue.pop_front() {
if let Some(orphan) = self.find_orphan_by_previous(&previous).await {
let orphans = self.find_orphan_by_previous(&previous).await;
for orphan in orphans.into_iter() {
if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
debug!(
"process_orphan {} add to chunk, find previous from {}",
"process_orphan {} add to chunk, find previous from {}",
orphan.tx.hash(),
tx.hash(),
orphan.tx.hash()
);
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
self.chunk
Expand Down Expand Up @@ -475,19 +478,20 @@ impl TxPoolService {
});
debug!(
"process_orphan {} success, find previous from {}",
tx.hash(),
orphan.tx.hash()
orphan.tx.hash(),
tx.hash()
);
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
orphan_queue.push_back(orphan.tx);
}
Err(reject) => {
debug!(
"process_orphan {} reject {}, find previous from {}",
tx.hash(),
orphan.tx.hash(),
reject,
orphan.tx.hash()
tx.hash(),
);

if !is_missing_input(&reject) {
self.remove_orphan_tx(&orphan.tx.proposal_short_id()).await;
if reject.is_malformed_tx() {
Expand All @@ -503,7 +507,6 @@ impl TxPoolService {
self.put_recent_reject(&orphan.tx.hash(), &reject).await;
}
}
break;
}
}
}
Expand Down Expand Up @@ -877,7 +880,8 @@ impl TxPoolService {
let mut ids = vec![];
for tx in txs {
ids.push(tx.proposal_short_id());
if let Some(orphan) = self.find_orphan_by_previous(&tx).await {
let orphans = self.find_orphan_by_previous(&tx).await;
for orphan in orphans.into_iter() {
ids.push(orphan.tx.proposal_short_id());
}
}
Expand Down

0 comments on commit 44f294b

Please sign in to comment.