Skip to content

Commit

Permalink
feat(cli): skip payment and upload for existing chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Dec 12, 2023
1 parent 1f6365e commit 45c0ed3
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 28 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ on:
branches: ["*"]

env:
SAFE_DATA_PATH: /home/runner/.local/share/safe
CLIENT_DATA_PATH: /home/runner/.local/share/safe/client
NODE_DATA_PATH: /home/runner/.local/share/safe/node
BOOTSTRAP_NODE_DATA_PATH: /home/runner/.local/share/safe/bootstrap_node
Expand Down Expand Up @@ -101,6 +102,23 @@ jobs:
SN_LOG: "all"
timeout-minutes: 25

# Uploading same file using different client shall not incur any payment neither uploads
# Note rg will throw an error directly in case of failed to find a matching pattern.
- name: Start a different client to upload the same file
run: |
mv $CLIENT_DATA_PATH $SAFE_DATA_PATH/client_first
cargo run --bin faucet --release -- --log-output-dest=data-dir send 5000000 $(cargo run --bin safe --release -- --log-output-dest=data-dir wallet address | tail -n 1) > initial_balance_from_faucet_1.txt
cat initial_balance_from_faucet_1.txt
cat initial_balance_from_faucet_1.txt | tail -n 1 > transfer_hex
cat transfer_hex
cargo run --bin safe --release -- --log-output-dest=data-dir wallet receive --file transfer_hex
cargo run --bin safe --release -- --log-output-dest=data-dir files upload "./the-test-data.zip" --show-holders > second_upload.txt
cat second_upload.txt
rg "New wallet balance: 5000000.000000000" second_upload.txt -c --stats
env:
SN_LOG: "all"
timeout-minutes: 25

- name: Chunks data integrity during nodes churn
run: cargo test --release -p sn_node --test data_with_churn -- --nocapture
env:
Expand Down
23 changes: 18 additions & 5 deletions sn_cli/src/subcommands/files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ async fn upload_files(
if chunk_manager.is_chunks_empty() {
// make sure we don't have any failed chunks in those
let chunks = chunk_manager.already_put_chunks(&files_path)?;
println!(
"Files upload attempted previously, verifying {} chunks",
chunks.len()
);
let failed_chunks = client.verify_uploaded_chunks(&chunks, batch_size).await?;

// mark the non-failed ones as completed
Expand Down Expand Up @@ -214,6 +218,7 @@ async fn upload_files(

// Task set to add and remove chunks from the chunk manager
let mut uploading_chunks = FuturesUnordered::new();
let mut total_exist_chunks = 0;

for chunks_batch in chunks_batches {
// while we dont have a full batch_size of ongoing uploading_chunks
Expand All @@ -236,18 +241,19 @@ async fn upload_files(
}

// pay for and verify payment... if we don't verify here, chunks uploads will surely fail
match file_api
let skipped_chunks = match file_api
.pay_for_chunks(chunks_batch.iter().map(|(name, _)| *name).collect())
.await
{
Ok((storage_cost, royalties_fees, new_balance)) => {
Ok(((storage_cost, royalties_fees, new_balance), skipped_chunks)) => {
final_balance = new_balance;
total_cost = total_cost
.checked_add(storage_cost)
.ok_or_else(|| eyre!("Unable to add cost to total cost"))?;
total_royalties = total_royalties
.checked_add(royalties_fees)
.ok_or_else(|| eyre!("Unable to add cost to total royalties fees"))?;
skipped_chunks
}
Err(ClientError::Transfers(WalletError::Transfer(
TransfersError::NotEnoughBalance(available, required),
Expand All @@ -259,10 +265,17 @@ async fn upload_files(
}
};

let mut chunks_to_upload = chunks_batch.to_vec();
chunks_to_upload.retain(|(name, _)| !skipped_chunks.contains(name));

total_exist_chunks += skipped_chunks.len();
progress_bar.inc(skipped_chunks.len() as u64);
chunk_manager.mark_completed(skipped_chunks.into_iter());

// upload paid chunks
let upload_tasks = upload_chunks_in_parallel(
&file_api,
chunks_batch.to_vec(),
chunks_to_upload,
verify_store,
&progress_bar,
show_holders,
Expand Down Expand Up @@ -341,8 +354,8 @@ async fn upload_files(
file.flush()?;

let elapsed = format_elapsed_time(now.elapsed());
println!("Uploaded {chunks_to_upload_len} chunks in {elapsed}");
info!("Uploaded {chunks_to_upload_len} chunks in {elapsed}");
println!("Uploaded {chunks_to_upload_len} chunks (with {total_exist_chunks} exist chunks) in {elapsed}");
info!("Uploaded {chunks_to_upload_len} chunks (with {total_exist_chunks} exist chunks) in {elapsed}");

println!("**************************************");
println!("* Payment Details *");
Expand Down
10 changes: 5 additions & 5 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ impl Files {
pub async fn pay_for_chunks(
&self,
chunks: Vec<XorName>,
) -> Result<(NanoTokens, NanoTokens, NanoTokens)> {
) -> Result<((NanoTokens, NanoTokens, NanoTokens), Vec<XorName>)> {
let mut wallet_client = self.wallet()?;
info!("Paying for and uploading {:?} chunks", chunks.len());

let (storage_cost, royalties_fees) =
let ((storage_cost, royalties_fees), skipped_chunks) =
wallet_client
.pay_for_storage(chunks.iter().map(|name| {
sn_protocol::NetworkAddress::ChunkAddress(ChunkAddress::new(*name))
Expand All @@ -201,7 +201,7 @@ impl Files {

wallet_client.store_local_wallet()?;
let new_balance = wallet_client.balance();
Ok((storage_cost, royalties_fees, new_balance))
Ok(((storage_cost, royalties_fees, new_balance), skipped_chunks))
}

// --------------------------------------------
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Files {
verify: bool,
) -> Result<(NetworkAddress, NanoTokens, NanoTokens)> {
// initial payment
let (mut storage_cost, mut royalties_fees) = self
let ((mut storage_cost, mut royalties_fees), _skipped) = self
.wallet()?
.pay_for_storage(
chunks
Expand Down Expand Up @@ -285,7 +285,7 @@ impl Files {
info!("Repaying for {:?} chunks, so far paid {storage_cost} (royalties fees: {royalties_fees})", failed_chunks.len());

// Now we pay again or top up, depending on the new current store cost is
let (new_storage_cost, new_royalties_fees) = self
let ((new_storage_cost, new_royalties_fees), _skipped) = self
.wallet()?
.pay_for_storage(failed_chunks.iter().map(|(addr, _path)| {
sn_protocol::NetworkAddress::ChunkAddress(ChunkAddress::new(*addr))
Expand Down
2 changes: 1 addition & 1 deletion sn_client/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl ClientRegister {
// Let's check if the user has already paid for this address first
let net_addr = sn_protocol::NetworkAddress::RegisterAddress(addr);
// Let's make the storage payment
(storage_cost, royalties_fees) = wallet_client
((storage_cost, royalties_fees), _) = wallet_client
.pay_for_storage(std::iter::once(net_addr.clone()))
.await?;
let cost = storage_cost
Expand Down
22 changes: 17 additions & 5 deletions sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl WalletClient {
pub async fn pay_for_storage(
&mut self,
content_addrs: impl Iterator<Item = NetworkAddress>,
) -> WalletResult<(NanoTokens, NanoTokens)> {
) -> WalletResult<((NanoTokens, NanoTokens), Vec<XorName>)> {
let verify_store = true;
let c: Vec<_> = content_addrs.collect();
let mut last_err = "No retries".to_string();
Expand All @@ -166,11 +166,14 @@ impl WalletClient {
Err(WalletError::CouldNotSendMoney(last_err))
}

/// Existing chunks will have a store cost to be Zero.
/// The payement procedure shall be skipped, and the chunk upload as well.
/// Hence the list of existing chunks will be returned.
async fn pay_for_storage_once(
&mut self,
content_addrs: impl Iterator<Item = NetworkAddress>,
verify_store: bool,
) -> WalletResult<(NanoTokens, NanoTokens)> {
) -> WalletResult<((NanoTokens, NanoTokens), Vec<XorName>)> {
// get store cost from network in parrallel
let mut tasks = JoinSet::new();
for content_addr in content_addrs {
Expand All @@ -190,12 +193,18 @@ impl WalletClient {

// collect store costs
let mut cost_map = BTreeMap::default();
let mut skipped_chunks = vec![];
while let Some(res) = tasks.join_next().await {
match res {
Ok((content_addr, Ok(cost))) => {
if let Some(xorname) = content_addr.as_xorname() {
let _ = cost_map.insert(xorname, cost);
debug!("Storecost inserted into payment map for {content_addr:?}");
if cost.1.cost == NanoTokens::zero() {
skipped_chunks.push(xorname);
debug!("Skipped existing chunk {content_addr:?}");
} else {
let _ = cost_map.insert(xorname, cost);
debug!("Storecost inserted into payment map for {content_addr:?}");
}
} else {
warn!("Cannot get store cost for a content that is not a data type: {content_addr:?}");
}
Expand All @@ -214,7 +223,10 @@ impl WalletClient {
info!("Storecosts retrieved");

// pay for records
self.pay_for_records(&cost_map, verify_store).await
Ok((
self.pay_for_records(&cost_map, verify_store).await?,
skipped_chunks,
))
}

/// Send tokens to nodes closest to the data we want to make storage payment for.
Expand Down
17 changes: 7 additions & 10 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,13 @@ impl Node {
Query::GetStoreCost(address) => {
trace!("Got GetStoreCost request for {address:?}");

let record_exists = {
if let Some(key) = address.as_record_key() {
match network.is_record_key_present_locally(&key).await {
Ok(res) => res,
Err(error) => {
error!("Problem getting record key's existence: {error:?}");
false
}
}
} else {
let record_exists = match network
.is_record_key_present_locally(&address.to_record_key())
.await
{
Ok(res) => res,
Err(error) => {
error!("Problem getting record key's existence: {error:?}");
false
}
};
Expand Down
4 changes: 2 additions & 2 deletions sn_node/tests/storage_payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async fn storage_payment_proofs_cached_in_wallet() -> Result<()> {
// let's first pay only for a subset of the addresses
let subset_len = random_content_addrs.len() / 3;
println!("Paying for {subset_len} random addresses...",);
let (storage_cost, royalties_fees) = wallet_client
let ((storage_cost, royalties_fees), _) = wallet_client
.pay_for_storage(random_content_addrs.clone().into_iter().take(subset_len))
.await?;

Expand All @@ -151,7 +151,7 @@ async fn storage_payment_proofs_cached_in_wallet() -> Result<()> {

// now let's request to pay for all addresses, even that we've already paid for a subset of them
let mut wallet_client = WalletClient::new(client.clone(), paying_wallet);
let (storage_cost, royalties_fees) = wallet_client
let ((storage_cost, royalties_fees), _) = wallet_client
.pay_for_storage(random_content_addrs.clone().into_iter())
.await?;
let total_cost = storage_cost
Expand Down

0 comments on commit 45c0ed3

Please sign in to comment.