Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uploader optimisation #1889

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,8 +863,9 @@ impl Client {
let cash_note_addr = SpendAddress::from_unique_pubkey(&unique_pubkey);
let network_address = NetworkAddress::from_spend_address(cash_note_addr);

trace!("Sending spend {unique_pubkey:?} to the network via put_record, with addr of {cash_note_addr:?}");
let key = network_address.to_record_key();
let pretty_key = PrettyPrintRecordKey::from(&key);
trace!("Sending spend {unique_pubkey:?} to the network via put_record, with addr of {cash_note_addr:?} - {pretty_key:?}");
let record_kind = RecordKind::Spend;
let record = Record {
key,
Expand All @@ -886,9 +887,10 @@ impl Client {
(None, Default::default())
};

// When there is retry on Put side, no need to have a retry on Get
let verification_cfg = GetRecordCfg {
get_quorum: Quorum::Majority,
retry_strategy: Some(RetryStrategy::Balanced),
retry_strategy: None,
target_record: record_to_verify,
expected_holders,
};
Expand Down Expand Up @@ -943,13 +945,14 @@ impl Client {
.await
}

/// This is a similar funcation to `get_spend_from_network` to get a spend from network.
/// Just using different `RetryStrategy` to improve the performance during crawling.
pub async fn crawl_spend_from_network(&self, address: SpendAddress) -> Result<SignedSpend> {
/// Try to peek a spend by just fetching one copy of it.
/// Useful to help decide whether a re-put is necessary, or a spend exists already
/// (client side verification).
pub async fn peek_a_spend(&self, address: SpendAddress) -> Result<SignedSpend> {
self.try_fetch_spend_from_network(
address,
GetRecordCfg {
get_quorum: Quorum::Majority,
get_quorum: Quorum::One,
retry_strategy: None,
target_record: None,
expected_holders: Default::default(),
Expand All @@ -958,21 +961,26 @@ impl Client {
.await
}

/// Try to confirm the Genesis spend doesn't present in the network yet.
/// It shall be quick, and any signle returned copy shall consider as error.
pub async fn is_genesis_spend_present(&self) -> bool {
let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY);
/// This is a similar funcation to `get_spend_from_network` to get a spend from network.
/// Just using different `RetryStrategy` to improve the performance during crawling.
pub async fn crawl_spend_from_network(&self, address: SpendAddress) -> Result<SignedSpend> {
self.try_fetch_spend_from_network(
genesis_addr,
address,
GetRecordCfg {
get_quorum: Quorum::One,
get_quorum: Quorum::Majority,
retry_strategy: None,
target_record: None,
expected_holders: Default::default(),
},
)
.await
.is_ok()
}

/// Try to confirm the Genesis spend doesn't present in the network yet.
/// It shall be quick, and any signle returned copy shall consider as error.
pub async fn is_genesis_spend_present(&self) -> bool {
let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY);
self.peek_a_spend(genesis_addr).await.is_ok()
}

async fn try_fetch_spend_from_network(
Expand Down
4 changes: 4 additions & 0 deletions sn_client/src/uploader/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,10 @@ impl InnerUploader {
got_a_previous_force_payment = false;
}

let _ = wallet_client
.resend_pending_transaction_blocking_loop()
.await;

let result = match wallet_client.pay_for_records(&cost_map, verify_store).await
{
Ok((storage_cost, royalty_fees)) => {
Expand Down
53 changes: 53 additions & 0 deletions sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,59 @@ impl WalletClient {
}
}

/// This is a blocking loop in cas there is pending transaction.
/// It will keeps resending the unconfirmed spend infinitely but explictly.
/// Function will only return on success (all unconfirmed spend uploaded),
/// or user chose to manualy, but safely, terminate the procedure.
pub async fn resend_pending_transaction_blocking_loop(&mut self) -> WalletResult<()> {
if !self.wallet.unconfirmed_spend_requests_exist() {
return Ok(());
}
// Wallet shall be all clear to progress forward.
while self.wallet.unconfirmed_spend_requests_exist() {
info!("Pre-Unconfirmed transactions dected, sending again after 30 seconds...");
println!("Pre-Unconfirmed transactions exist, sending again after 30 seconds...");
println!("It's safe to terminate the work, but do remember to retain the unconfirmed_spend file during wallet update.");
println!("Otherwise, you are in risk to make the wallet corrupted.");
// Longer wait as the network will already in heavy duty situation,
// hence try not to give it further burden with short intervaled re-puts.
sleep(Duration::from_secs(30)).await;

// Before re-sending, take a peek of un-confirmed spends first
// Helping user having a better view of what's happening.
let unconfirmed_spends_addrs: Vec<_> = self
.wallet
.unconfirmed_spend_requests()
.iter()
.map(|s| SpendAddress::from_unique_pubkey(&s.spend.unique_pubkey))
.collect();
for addr in unconfirmed_spends_addrs {
match self.client.peek_a_spend(addr).await {
Ok(_) => {
info!("Unconfirmed Spend {addr:?} is find having at least one copy in the network !");
println!(
"Unconfirmed Spend {addr:?} is find at least one copy in the network !"
);
}
Err(err) => {
info!(
"Unconfirmed Spend {addr:?} has no copy in the network yet {err:?} !"
);
println!(
"Unconfirmed Spend {addr:?} has no copy in the network yet {err:?} !"
);
}
}
}

self.resend_pending_transactions(true).await;
}
info!("Wallet is now all cleared, OK to progress further.");
println!("Wallet is now all cleared, OK to progress further.");
eprintln!("WARNING: Closing the client now could corrupt the wallet !");
Ok(())
}

/// Try resending failed transactions multiple times until it succeeds or until we reach max attempts.
async fn resend_pending_transaction_until_success(
&mut self,
Expand Down
46 changes: 46 additions & 0 deletions sn_transfers/src/wallet/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,49 @@ impl WalletApi {
Ok(payments)
}
}

#[cfg(test)]
mod tests {
use super::*;

use crate::{MainSecretKey, NanoTokens, PaymentQuote, Transfer};

#[test]
fn payment_selective() -> Result<()> {
let root_dir = std::env::temp_dir();
let wallet_api = WalletApi::new_from_wallet_dir(&root_dir);

let mut rng = bls::rand::thread_rng();
let chunk_name = XorName::random(&mut rng);

let transfer = Transfer::NetworkRoyalties(vec![]);

let recipient_1 = MainSecretKey::random().main_pubkey();
let payment_details_1 = PaymentDetails {
recipient: recipient_1,
peer_id_bytes: vec![],
transfer: (transfer.clone(), NanoTokens::zero()),
royalties: (transfer.clone(), NanoTokens::zero()),
quote: PaymentQuote::zero(),
};
let _ = wallet_api.insert_payment_transaction(chunk_name, payment_details_1);

let recipient_2 = MainSecretKey::random().main_pubkey();
let payment_details_2 = PaymentDetails {
recipient: recipient_2,
peer_id_bytes: vec![],
transfer: (transfer.clone(), NanoTokens::zero()),
royalties: (transfer, NanoTokens::zero()),
quote: PaymentQuote::zero(),
};
let _ = wallet_api.insert_payment_transaction(chunk_name, payment_details_2.clone());

let recent_payment = wallet_api.get_recent_payment(&chunk_name)?;
assert_eq!(payment_details_2.recipient, recent_payment.recipient);

let recent_payment = wallet_api.get_recent_payment(&chunk_name)?;
assert_eq!(payment_details_2.recipient, recent_payment.recipient);

Ok(())
}
}
Loading