Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(test): Fixes bugs in the lightwalletd integration tests #9052

Merged
merged 18 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub enum TransactionDownloadVerifyError {
#[error("transaction download / verification was cancelled")]
Cancelled,

#[error("transaction did not pass consensus validation: {0}")]
#[error("transaction did not pass consensus validation, error: {0:?}")]
arya2 marked this conversation as resolved.
Show resolved Hide resolved
Invalid(#[from] zebra_consensus::error::TransactionError),
}

Expand Down
192 changes: 116 additions & 76 deletions zebrad/tests/common/lightwalletd/send_transaction_test.rs
arya2 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@
//! were obtained. This is to ensure that zebra does not reject the transactions because they have
//! already been seen in a block.

use std::{cmp::min, sync::Arc, time::Duration};
use std::{cmp::min, collections::HashSet, sync::Arc};
use tower::BoxError;

use color_eyre::eyre::Result;
use color_eyre::eyre::{eyre, Result};

use zebra_chain::{
parameters::Network::{self, *},
block::Block,
parameters::Network::*,
serialization::ZcashSerialize,
transaction::{self, Transaction},
};
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY;
use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY;

Expand All @@ -34,10 +37,13 @@ use crate::common::{
lightwalletd::{
can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc,
sync::wait_for_zebrad_and_lightwalletd_sync,
wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude},
wallet_grpc::{
self, compact_tx_streamer_client::CompactTxStreamerClient, connect_to_lightwalletd,
Empty, Exclude,
},
},
sync::LARGE_CHECKPOINT_TIMEOUT,
test_type::TestType::{self, *},
regtest::MiningRpcMethods,
test_type::TestType::*,
};

/// The maximum number of transactions we want to send in the test.
Expand Down Expand Up @@ -85,11 +91,19 @@ pub async fn run() -> Result<()> {
"running gRPC send transaction test using lightwalletd & zebrad",
);

let transactions =
load_transactions_from_future_blocks(network.clone(), test_type, test_name).await?;
let mut count = 0;
let blocks: Vec<Block> =
get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS)
.await?
.into_iter()
.take_while(|block| {
count += block.transactions.len() - 1;
count <= max_sent_transactions()
})
.collect();

tracing::info!(
transaction_count = ?transactions.len(),
blocks_count = ?blocks.len(),
partial_sync_path = ?zebrad_state_path,
"got transactions to send, spawning isolated zebrad...",
);
Expand All @@ -113,6 +127,8 @@ pub async fn run() -> Result<()> {

let zebra_rpc_address = zebra_rpc_address.expect("lightwalletd test must have RPC port");

let zebrad_rpc_client = RpcRequestClient::new(zebra_rpc_address);

tracing::info!(
?test_type,
?zebra_rpc_address,
Expand All @@ -134,7 +150,7 @@ pub async fn run() -> Result<()> {
"spawned lightwalletd connected to zebrad, waiting for them both to sync...",
);

let (_lightwalletd, mut zebrad) = wait_for_zebrad_and_lightwalletd_sync(
let (_lightwalletd, _zebrad) = wait_for_zebrad_and_lightwalletd_sync(
lightwalletd,
lightwalletd_rpc_port,
zebrad,
Expand Down Expand Up @@ -164,15 +180,71 @@ pub async fn run() -> Result<()> {
.await?
.into_inner();

let mut transaction_hashes = HashSet::new();
let mut has_tx_with_shielded_elements = false;
let mut counter = 0;

for block in blocks {
let (has_shielded_elements, count) = send_transactions_from_block(
&mut rpc_client,
&zebrad_rpc_client,
block.clone(),
&mut transaction_hashes,
)
.await?;

has_tx_with_shielded_elements |= has_shielded_elements;
counter += count;

tracing::info!(
height = ?block.coinbase_height(),
"submitting block at height"
);

let submit_block_response = zebrad_rpc_client.submit_block(block).await;
tracing::info!(?submit_block_response, "submitted block");
}

// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
assert!(
!has_tx_with_shielded_elements || counter >= 1,
"failed to read v4+ transactions with shielded elements \
from future blocks in mempool via lightwalletd"
);

Ok(())
}

/// Sends non-coinbase transactions from a block to the mempool, verifies that the transactions
/// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate.
///
/// Returns the zebrad test child that's handling the RPC requests.

#[tracing::instrument(skip_all)]
async fn send_transactions_from_block(
rpc_client: &mut CompactTxStreamerClient<tonic::transport::Channel>,
zebrad_rpc_client: &RpcRequestClient,
block: Block,
transaction_hashes: &mut HashSet<transaction::Hash>,
) -> Result<(bool, usize)> {
// Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call:
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L482>
//
// So we need to wait much longer than that here.
let sleep_until_lwd_last_mempool_refresh =
tokio::time::sleep(std::time::Duration::from_secs(4));

let transaction_hashes: Vec<transaction::Hash> =
transactions.iter().map(|tx| tx.hash()).collect();
let transactions: Vec<_> = block
.transactions
.iter()
.filter(|tx| !tx.is_coinbase())
.collect();

if transactions.is_empty() {
return Ok((false, 0));
}

transaction_hashes.extend(transactions.iter().map(|tx| tx.hash()));

tracing::info!(
transaction_count = ?transactions.len(),
Expand All @@ -181,7 +253,7 @@ pub async fn run() -> Result<()> {
);

let mut has_tx_with_shielded_elements = false;
for transaction in transactions {
for &transaction in &transactions {
let transaction_hash = transaction.hash();

// See <https://github.com/zcash/lightwalletd/blob/master/parser/transaction.go#L367>
Expand All @@ -195,20 +267,24 @@ pub async fn run() -> Result<()> {

tracing::info!(?transaction_hash, "sending transaction...");

let request = prepare_send_transaction_request(transaction);
let request = prepare_send_transaction_request(transaction.clone());

let response = rpc_client.send_transaction(request).await?.into_inner();
match rpc_client.send_transaction(request).await {
Ok(response) => assert_eq!(response.into_inner(), expected_response),
Err(err) => {
tracing::warn!(?err, "failed to send transaction");
let send_tx_rsp = zebrad_rpc_client
.send_transaction(transaction)
.await
.map_err(|e| eyre!(e));

assert_eq!(response, expected_response);
tracing::warn!(?send_tx_rsp, "failed to send tx twice");
}
};
}

// Check if some transaction is sent to mempool,
// Fails if there are only coinbase transactions in the first 50 future blocks
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;
// Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements()
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L537>
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
conradoplg marked this conversation as resolved.
Show resolved Hide resolved
sleep_until_lwd_last_mempool_refresh.await;

tracing::info!("calling GetMempoolTx gRPC to fetch transactions...");
Expand All @@ -217,25 +293,6 @@ pub async fn run() -> Result<()> {
.await?
.into_inner();

// Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead.
// If that happens, we skip the rest of the test.
tracing::info!("checking if lightwalletd has queried the mempool...");

// We need a short timeout here, because sometimes this message is not logged.
zebrad = zebrad.with_timeout(Duration::from_secs(60));
let tx_log =
zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds");
// Reset the failed timeout and give the rest of the test enough time to finish.
#[allow(unused_assignments)]
{
zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT);
}

if tx_log.is_err() {
tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks");
return Ok(());
}

tracing::info!("checking the mempool contains some of the sent transactions...");
let mut counter = 0;
while let Some(tx) = transactions_stream.message().await? {
Expand All @@ -251,16 +308,6 @@ pub async fn run() -> Result<()> {
counter += 1;
}

// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
//
// TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and,
// only check if a transaction from the first block has shielded elements
assert!(
!has_tx_with_shielded_elements || counter >= 1,
"failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd"
);

// TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool.
tracing::info!("calling GetMempoolStream gRPC to fetch transactions...");
let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner();

Expand All @@ -270,32 +317,7 @@ pub async fn run() -> Result<()> {
_counter += 1;
}

Ok(())
}

/// Loads transactions from a few block(s) after the chain tip of the cached state.
///
/// Returns a list of non-coinbase transactions from blocks that have not been finalized to disk
/// in the `ZEBRA_CACHED_STATE_DIR`.
///
/// ## Panics
///
/// If the provided `test_type` doesn't need an rpc server and cached state
#[tracing::instrument]
async fn load_transactions_from_future_blocks(
network: Network,
test_type: TestType,
test_name: &str,
) -> Result<Vec<Arc<Transaction>>> {
let transactions = get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS)
.await?
.into_iter()
.flat_map(|block| block.transactions)
.filter(|transaction| !transaction.is_coinbase())
.take(max_sent_transactions())
.collect();

Ok(transactions)
Ok((has_tx_with_shielded_elements, counter))
}

/// Prepare a request to send to lightwalletd that contains a transaction to be sent.
Expand All @@ -307,3 +329,21 @@ fn prepare_send_transaction_request(transaction: Arc<Transaction>) -> wallet_grp
height: 0,
}
}

trait SendTransactionMethod {
async fn send_transaction(
&self,
transaction: &Arc<Transaction>,
) -> Result<zebra_rpc::methods::SentTransactionHash, BoxError>;
}

impl SendTransactionMethod for RpcRequestClient {
async fn send_transaction(
&self,
transaction: &Arc<Transaction>,
) -> Result<zebra_rpc::methods::SentTransactionHash, BoxError> {
let tx_data = hex::encode(transaction.zcash_serialize_to_vec()?);
self.json_result_from_call("sendrawtransaction", format!(r#"["{tx_data}"]"#))
.await
}
}
2 changes: 1 addition & 1 deletion zebrad/tests/common/lightwalletd/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn wait_for_zebrad_and_lightwalletd_sync<
wait_for_zebrad_mempool: bool,
wait_for_zebrad_tip: bool,
) -> Result<(TestChild<TempDir>, TestChild<P>)> {
let is_zebrad_finished = AtomicBool::new(false);
let is_zebrad_finished = AtomicBool::new(!wait_for_zebrad_tip);
let is_lightwalletd_finished = AtomicBool::new(false);

let is_zebrad_finished = &is_zebrad_finished;
Expand Down
Loading
Loading