From 8795eb1df98ad02ea4381deaafb3cb1bfdae96bb Mon Sep 17 00:00:00 2001 From: qima Date: Tue, 9 Jan 2024 17:28:13 +0800 Subject: [PATCH] chore(client): refactor client upload flow --- .github/workflows/merge.yml | 9 +- sn_client/src/error.rs | 6 + sn_client/src/files/upload.rs | 558 ++++++++++++++++---------- sn_client/src/wallet.rs | 4 +- sn_networking/src/lib.rs | 12 +- sn_node/tests/verify_data_location.rs | 14 +- 6 files changed, 377 insertions(+), 226 deletions(-) diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 509bf5c5fd..8aaf1d6d21 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -575,12 +575,12 @@ jobs: run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture timeout-minutes: 5 - - name: Verify the location of the data on the network (4 * 5 mins) + - name: Verify the location of the data on the network run: cargo test --release -p sn_node --features="local-discovery" --test verify_data_location -- --nocapture env: CHURN_COUNT: 3 SN_LOG: "all" - timeout-minutes: 30 + timeout-minutes: 5 - name: Verify the routing tables of the nodes run: cargo test --release -p sn_node --features="local-discovery" --test verify_routing_table -- --nocapture @@ -703,16 +703,17 @@ jobs: run: cargo run --bin safe --release -- --log-output-dest=data-dir files upload "./test_data_1.tar.gz" -r 0 env: SN_LOG: "all" - timeout-minutes: 60 + timeout-minutes: 5 - name: Wait for certain period run: sleep 300 + timeout-minutes: 6 - name: Start a client to upload second file run: cargo run --bin safe --release -- --log-output-dest=data-dir files upload "./test_data_2.tar.gz" -r 0 env: SN_LOG: "all" - timeout-minutes: 60 + timeout-minutes: 10 - name: Stop the local network and upload logs if: always() diff --git a/sn_client/src/error.rs b/sn_client/src/error.rs index 32291e6ffb..34b8978202 100644 --- a/sn_client/src/error.rs +++ b/sn_client/src/error.rs @@ -91,4 +91,10 @@ pub enum Error { #[error("Error occurred while assembling the downloaded chunks")] FailedToAssembleDownloadedChunks, + + #[error("Error occurred when access wallet file")] + FailedToAccessWallet, + + #[error("Task completion notification channel is done")] + FailedToReadFromNotificationChannel, } diff --git a/sn_client/src/files/upload.rs b/sn_client/src/files/upload.rs index 1a2cbf619a..815fd8da26 100644 --- a/sn_client/src/files/upload.rs +++ b/sn_client/src/files/upload.rs @@ -11,15 +11,18 @@ use crate::{ FilesApi, BATCH_SIZE, MAX_UPLOAD_RETRIES, }; use bytes::Bytes; -use futures::{stream::FuturesUnordered, StreamExt}; use libp2p::PeerId; -use sn_protocol::storage::{Chunk, ChunkAddress}; +use sn_networking::PayeeQuote; +use sn_protocol::{ + storage::{Chunk, ChunkAddress}, + NetworkAddress, +}; use sn_transfers::NanoTokens; -use std::{collections::HashSet, path::PathBuf}; -use tokio::{ - sync::mpsc::{self}, - task::JoinHandle, +use std::{ + collections::{BTreeMap, BTreeSet, HashSet}, + path::PathBuf, }; +use tokio::sync::mpsc::{self}; use xor_name::XorName; /// The maximum number of sequential payment failures before aborting the upload process. @@ -50,6 +53,20 @@ struct ChunkInfo { path: PathBuf, } +#[allow(clippy::large_enum_variant)] +enum TaskResult { + GetStoreCostOK((ChunkInfo, PayeeQuote)), + // (store_cost, royalty_fee, new_wallet_balance) + MakePaymentsOK( + ( + (NanoTokens, NanoTokens, NanoTokens), + Vec<(ChunkInfo, PeerId)>, + ), + ), + UploadChunksOK(XorName), + ErrorEntries(Vec), +} + /// `FilesUpload` provides functionality for uploading chunks with support for retries and queuing. /// This struct is not cloneable. To create a new instance with default configuration, use the `new` function. /// To modify the configuration, use the provided setter methods (`set_...` functions). @@ -63,7 +80,6 @@ pub struct FilesUpload { api: FilesApi, // Uploads failed_chunks: HashSet, - uploading_chunks: FuturesUnordered)>>, // Upload stats upload_storage_cost: NanoTokens, upload_royalty_fees: NanoTokens, @@ -84,7 +100,6 @@ impl FilesUpload { max_retries: MAX_UPLOAD_RETRIES, api: files_api, failed_chunks: Default::default(), - uploading_chunks: Default::default(), upload_storage_cost: NanoTokens::zero(), upload_royalty_fees: NanoTokens::zero(), upload_final_balance: NanoTokens::zero(), @@ -150,33 +165,51 @@ impl FilesUpload { self.upload_final_balance } - /// get the set of failed chunks that could not be uploaded - pub fn get_failed_chunks(&self) -> HashSet { - self.failed_chunks - .clone() - .into_iter() - .map(|chunk_info| chunk_info.name) - .collect() - } - /// Uploads the provided chunks to the network. /// If you want to track the upload progress, use the `get_upload_events` method. - pub async fn upload_chunks(&mut self, chunks: Vec<(XorName, PathBuf)>) -> Result<()> { + pub async fn upload_chunks(&mut self, mut chunks: Vec<(XorName, PathBuf)>) -> Result<()> { + trace!("Uploading chunks {:?}", chunks.len()); // make sure we log that the event sender is absent atleast once self.logged_event_sender_absence = false; // clean up the trackers/stats self.failed_chunks = Default::default(); - self.uploading_chunks = Default::default(); self.upload_storage_cost = NanoTokens::zero(); self.upload_royalty_fees = NanoTokens::zero(); self.upload_final_balance = NanoTokens::zero(); - let result = self.upload(chunks).await; + #[allow(unused_assignments)] + let mut result = Ok(()); + + let mut last_failed = 0; + let mut for_failed_chunks = false; + loop { + result = self.upload(chunks, for_failed_chunks).await; + + // Any error raised from `upload` function is un-recoverable + // and shall terminate the overall upload immediately. + if result.is_err() { + // send an event indicating that the upload process completed with an error + self.send_event(FileUploadEvent::Error); + break; + } - // send an event indicating that the upload process completed with an error - if result.is_err() { - self.send_event(FileUploadEvent::Error).await?; + chunks = self + .take_failed_chunks() + .iter() + .map(|chunk_info| (chunk_info.name, chunk_info.path.clone())) + .collect(); + if chunks.len() == last_failed { + // Terminate the flow whenever there is no progress with failed_chunks + // It could be there is no failure at all (last_failed being 0) + // Or payment/network issue that rejected any uploads + break; + } + + for_failed_chunks = true; + last_failed = chunks.len(); + warn!("Retrying failed chunks {last_failed} ..."); + println!("Retrying failed chunks {last_failed} ..."); } // drop the sender to close the channel. @@ -186,210 +219,173 @@ impl FilesUpload { result } - async fn upload(&mut self, chunks: Vec<(XorName, PathBuf)>) -> Result<()> { + /// There are three main task groups to upload chunks: + /// 1, Fetch the store_cost of a chunk + /// 2, Pay for the chunk based on the fetched store_cost + /// 3, Upload the chunk + /// Task groups can be run in parallel to each other, however sometimes requires input from previous group. + /// Within each group, for group 1 and 3, the mini tasks inside can be parallel to each other. + /// However for group 2, mini tasks inside has to be undertaken sequentially. + async fn upload( + &mut self, + mut chunks: Vec<(XorName, PathBuf)>, + for_failed_chunks: bool, + ) -> Result<()> { + let mut pending_to_pay: Vec<(ChunkInfo, PayeeQuote)> = vec![]; + let mut pending_to_upload: Vec<(ChunkInfo, PeerId)> = vec![]; + let mut uploaded = 0; + let mut skipped = 0; + let mut on_going_get_cost = BTreeSet::new(); + let mut on_going_pay_for_chunk = BTreeSet::new(); + let mut on_going_uploadings = BTreeSet::new(); let mut sequential_payment_fails = 0; - let mut chunk_batches = Vec::with_capacity(chunks.len()); - chunk_batches.extend( - chunks - .into_iter() - .map(|(name, path)| ChunkInfo { name, path }), - ); - let n_batches = { - let total_elements = chunk_batches.len(); - // to get +1 if there is a remainder - (total_elements + self.batch_size - 1) / self.batch_size - }; - let mut batch = 1; - let chunk_batches = chunk_batches.chunks(self.batch_size); + let batch_size = self.batch_size; + let total_chunks = chunks.len(); - for chunks_batch in chunk_batches { - trace!("Uploading batch {batch}/{n_batches}"); - if sequential_payment_fails >= MAX_SEQUENTIAL_PAYMENT_FAILS { - return Err(ClientError::SequentialUploadPaymentError); - } - // if the payment fails, we can continue to the next batch - let res = self.handle_chunk_batch(chunks_batch, false).await; - batch += 1; - match res { - Ok(()) => { - trace!("Uploaded batch {batch}/{n_batches}"); - } - Err(err) => match err { - ClientError::CouldNotVerifyTransfer(err) => { - warn!( - "Failed to verify transfer validity in the network. Chunk batch will be retried... {err:?}" - ); - println!( - "Failed to verify transfer validity in the network. Chunk batch will be retried..." - ); - sequential_payment_fails += 1; - continue; - } - error => { - return Err(error); - } - }, - } - } + let (get_store_cost_sender, mut get_store_cost_receiver) = mpsc::channel(batch_size); + let (paid_chunk_sender, mut paid_chunk_receiver) = mpsc::channel::(batch_size); + let (paying_work_sender, paying_work_receiver) = mpsc::channel(batch_size); + let (upload_chunk_sender, mut upload_chunk_receiver) = mpsc::channel(batch_size); - // ensure we wait on any remaining uploading_chunks - self.progress_uploading_chunks(true).await?; + self.spawn_paying_thread(paying_work_receiver, paid_chunk_sender, batch_size); - let mut retry_count = 0; - let max_retries = self.max_retries; - let mut failed_chunks_to_upload = self.take_failed_chunks(); - while !failed_chunks_to_upload.is_empty() && retry_count < max_retries { - warn!( - "Retrying failed chunks {:?}, attempt {retry_count}/{max_retries}...", - failed_chunks_to_upload.len() - ); - println!( - "Retrying failed chunks {:?}, attempt {retry_count}/{max_retries}...", - failed_chunks_to_upload.len() - ); - retry_count += 1; - let batches = failed_chunks_to_upload.chunks(self.batch_size); - for chunks_batch in batches { - self.handle_chunk_batch(chunks_batch, true).await?; + loop { + if uploaded + skipped + self.failed_chunks.len() == total_chunks { + // To avoid empty final_balance when all chunks are skipped. + self.upload_final_balance = self.api.wallet()?.balance(); + return Ok(()); } - // ensure we wait on any remaining uploading_chunks w/ drain_all - self.progress_uploading_chunks(true).await?; - - // take the new failed chunks - failed_chunks_to_upload = self.take_failed_chunks(); - } - Ok(()) - } - - /// Handles a batch of chunks for upload. This includes paying for the chunks, uploading them, - /// and handling any errors that occur during the process. - /// - /// If `failed_batch` is true, we emit FilesUploadEvent::Uploaded for the skipped_chunks. This is because, - /// the failed_batch was already paid for, but could not be verified on the first try. - async fn handle_chunk_batch( - &mut self, - chunks_batch: &[ChunkInfo], - failed_batch: bool, - ) -> Result<()> { - // while we don't have a full batch_size of ongoing uploading_chunks - // we can pay for the next batch and carry on - self.progress_uploading_chunks(false).await?; - - // pay for and verify payment... if we don't verify here, chunks uploads will surely fail - let (payee_map, skipped_chunks) = match self - .api - .pay_for_chunks(chunks_batch.iter().map(|info| info.name).collect()) - .await - { - Ok(((storage_cost, royalty_fees, new_balance), (payee_map, skipped_chunks))) => { - // store the stats and emit event too - self.upload_storage_cost = self - .upload_storage_cost - .checked_add(storage_cost) - .ok_or(ClientError::TotalPriceTooHigh)?; - self.upload_royalty_fees = self - .upload_royalty_fees - .checked_add(royalty_fees) - .ok_or(ClientError::TotalPriceTooHigh)?; - self.upload_final_balance = new_balance; - self.send_event(FileUploadEvent::PayedForChunks { - storage_cost, - royalty_fees, - new_balance, - }) - .await?; - (payee_map, skipped_chunks) + if chunks.is_empty() && !on_going_pay_for_chunk.is_empty() { + // Fire None to trigger a forced round of making leftover payments. + let paying_work_sender_clone = paying_work_sender.clone(); + let _handle = tokio::spawn(async move { + let _ = paying_work_sender_clone.send(None).await; + }); } - Err(err) => return Err(err), - }; - let mut chunks_to_upload = chunks_batch.to_vec(); - // don't reupload skipped chunks - chunks_to_upload.retain(|info| !skipped_chunks.contains(&info.name)); + while !chunks.is_empty() + && on_going_get_cost.len() < batch_size + && pending_to_pay.len() < batch_size + { + if let Some((name, path)) = chunks.pop() { + let _ = on_going_get_cost.insert(name); + self.spawn_get_store_cost_task( + ChunkInfo { name, path }, + get_store_cost_sender.clone(), + ); + } + } - // send update about the existing chunks - for chunk in skipped_chunks { - if failed_batch { - // the chunk was already paid for but might have not been verified on the first try. - self.send_event(FileUploadEvent::Uploaded(ChunkAddress::new(chunk))) - .await?; - } else { - // if during the first try we skip the chunk, then it was already uploaded. - self.send_event(FileUploadEvent::AlreadyExistsInNetwork(ChunkAddress::new( - chunk, - ))) - .await?; + while !pending_to_pay.is_empty() + && on_going_pay_for_chunk.len() < batch_size + && pending_to_upload.len() < batch_size + { + if let Some(to_pay) = pending_to_pay.pop() { + let _ = on_going_pay_for_chunk.insert(to_pay.0.name); + let paying_work_sender_clone = paying_work_sender.clone(); + let _handle = tokio::spawn(async move { + let _ = paying_work_sender_clone.send(Some(to_pay)).await; + }); + } } - } - // upload paid chunks - for chunk_info in chunks_to_upload.into_iter() { - let files_api = self.api.clone(); - let verify_store = self.verify_store; + while !pending_to_upload.is_empty() && on_going_uploadings.len() < batch_size { + if let Some((chunk_info, payee)) = pending_to_upload.pop() { + let _ = on_going_uploadings.insert(chunk_info.name); + self.spawn_upload_chunk_task(chunk_info, payee, upload_chunk_sender.clone()); + } + } - let payee = if let Some(payee) = payee_map - .iter() - .find(|itr| itr.0 == chunk_info.name) - .map(|result| result.1) + let task_result = if let Some(result) = progress_tasks( + &mut get_store_cost_receiver, + &mut paid_chunk_receiver, + &mut upload_chunk_receiver, + ) + .await { - payee + result } else { - error!( - "Cannot find payee of {:?} among the payee_map", - chunk_info.name - ); - continue; + return Err(ClientError::FailedToReadFromNotificationChannel); }; - // Spawn a task for each chunk to be uploaded - let handle = tokio::spawn(Self::upload_chunk( - files_api, - chunk_info, - payee, - verify_store, - )); - - self.progress_uploading_chunks(false).await?; - - self.uploading_chunks.push(handle); - } - - Ok(()) - } - - /// Progresses the uploading of chunks. If the number of ongoing uploading chunks is less than the batch size, - /// it pays for the next batch and continues. If an error occurs during the upload, it will be returned. - /// - /// If `drain_all` is true, will wait for all ongoing uploads to complete before returning. - async fn progress_uploading_chunks(&mut self, drain_all: bool) -> Result<()> { - while drain_all || self.uploading_chunks.len() >= self.batch_size { - if let Some(result) = self.uploading_chunks.next().await { - // bail if we've had any errors so far - match result? { - (chunk_info, Ok(())) => { + match task_result { + TaskResult::GetStoreCostOK((chunk_info, cost)) => { + let _ = on_going_get_cost.remove(&chunk_info.name); + trace!( + "Upload task got chunk {:?}'s store_cost {:?}", + chunk_info.name, + cost.2 + ); + if cost.2.cost != NanoTokens::zero() { + pending_to_pay.push((chunk_info, cost)); + } else if for_failed_chunks { + // the chunk was already paid for but might have not been verified on the first try. self.send_event(FileUploadEvent::Uploaded(ChunkAddress::new( chunk_info.name, - ))) - .await?; + ))); + uploaded += 1; + } else { + // if during the first try we skip the chunk, then it was already uploaded. + self.send_event(FileUploadEvent::AlreadyExistsInNetwork( + ChunkAddress::new(chunk_info.name), + )); + skipped += 1; } - (chunk_info, Err(err)) => { - warn!("Failed to upload a chunk: {err}"); - self.send_event(FileUploadEvent::FailedToUpload(ChunkAddress::new( - chunk_info.name, - ))) - .await?; - // store the failed chunk to be retried later - self.failed_chunks.insert(chunk_info); + } + TaskResult::MakePaymentsOK(( + (storage_cost, royalty_fees, new_balance), + reply_list, + )) => { + trace!("Paid {} chunks, with {storage_cost:?} store_cost and {royalty_fees:?} royalty_fees, and new_balance is {new_balance:?}", + reply_list.len()); + sequential_payment_fails = 0; + for (chunk_info, _) in reply_list.iter() { + let _ = on_going_pay_for_chunk.remove(&chunk_info.name); } + pending_to_upload.extend(reply_list); + + self.upload_storage_cost = self + .upload_storage_cost + .checked_add(storage_cost) + .ok_or(ClientError::TotalPriceTooHigh)?; + self.upload_royalty_fees = self + .upload_royalty_fees + .checked_add(royalty_fees) + .ok_or(ClientError::TotalPriceTooHigh)?; + self.upload_final_balance = new_balance; + self.send_event(FileUploadEvent::PayedForChunks { + storage_cost, + royalty_fees, + new_balance, + }); + } + TaskResult::UploadChunksOK(xor_name) => { + trace!("Upload task uploaded chunk {xor_name:?}"); + let _ = on_going_uploadings.remove(&xor_name); + uploaded += 1; + self.send_event(FileUploadEvent::Uploaded(ChunkAddress::new(xor_name))); + } + TaskResult::ErrorEntries(error_list) => { + if error_list.is_empty() { + // Empty error_list indicating an unrecoverable failure to access wallet. + // The entire upload process shall be terminated. + return Err(ClientError::FailedToAccessWallet); + } + if error_list.len() > 1 { + sequential_payment_fails += 1; + if sequential_payment_fails >= MAX_SEQUENTIAL_PAYMENT_FAILS { + // Too many sequential overall payment failure indicating + // unrecoverable failure of spend tx continously rejected by network. + // The entire upload process shall be terminated. + return Err(ClientError::SequentialUploadPaymentError); + } + } + self.failed_chunks.extend(error_list); } - } else { - // we're finished - break; } } - Ok(()) } /// Store chunks from chunk_paths (assuming payments have already been made and are in our local wallet). @@ -426,16 +422,160 @@ impl FilesUpload { .collect() } - async fn send_event(&mut self, event: FileUploadEvent) -> Result<()> { + fn send_event(&mut self, event: FileUploadEvent) { if let Some(sender) = self.event_sender.as_ref() { - sender.send(event).await.map_err(|err| { - error!("Could not send files event due to {err:?}"); - ClientError::CouldNotSendFilesEvent - })?; + let sender_clone = sender.clone(); + let _handle = tokio::spawn(async move { + let _ = sender_clone.send(event).await; + }); } else if !self.logged_event_sender_absence { info!("FilesUpload upload event sender is not set. Use get_upload_events() if you need to keep track of the progress"); self.logged_event_sender_absence = true; } - Ok(()) + } + + fn spawn_get_store_cost_task( + &self, + chunk_info: ChunkInfo, + get_store_cost_sender: mpsc::Sender, + ) { + trace!("spawning a get_store_cost task"); + let client = self.api.client.clone(); + let _handle = tokio::spawn(async move { + let cost = match client + .network + .get_store_costs_from_network(NetworkAddress::from_chunk_address( + ChunkAddress::new(chunk_info.name), + )) + .await + { + Ok(cost) => { + debug!("Storecosts retrieved for {:?} {cost:?}", chunk_info.name); + TaskResult::GetStoreCostOK((chunk_info, cost)) + } + Err(err) => { + error!( + "Encountered error {err:?} when getting store_cost of {:?}", + chunk_info.name + ); + TaskResult::ErrorEntries(vec![chunk_info]) + } + }; + + let _ = get_store_cost_sender.send(cost).await; + }); + } + + fn spawn_paying_thread( + &self, + mut paying_work_receiver: mpsc::Receiver>, + pay_for_chunk_sender: mpsc::Sender, + batch_size: usize, + ) { + let files_api = self.api.clone(); + let verify_store = self.verify_store; + let _handle = tokio::spawn(async move { + trace!("spawning paying thread"); + let mut wallet_client = match files_api.wallet() { + Ok(wallet) => wallet, + Err(err) => { + error!("Failed to open wallet when handling {err:?}"); + let _ = pay_for_chunk_sender + .send(TaskResult::ErrorEntries(vec![])) + .await; + return; + } + }; + let mut cost_map = BTreeMap::new(); + let mut chunk_info_map = vec![]; + + while let Some(payment) = paying_work_receiver.recv().await { + let make_payments = if let Some((chunk_info, quote)) = payment { + let _ = cost_map.insert(chunk_info.name, (quote.1, quote.2)); + chunk_info_map.push((chunk_info, quote.0)); + cost_map.len() >= batch_size + } else { + // using None to indicate as all paid. + !cost_map.is_empty() + }; + + if make_payments { + let result = match wallet_client.pay_for_records(&cost_map, verify_store).await + { + Ok((storage_cost, royalty_fees)) => { + trace!("Made payments for {} chunks", cost_map.len()); + let reply_list = std::mem::take(&mut chunk_info_map); + TaskResult::MakePaymentsOK(( + (storage_cost, royalty_fees, wallet_client.balance()), + reply_list, + )) + } + Err(err) => { + let reply_list: Vec = std::mem::take(&mut chunk_info_map) + .into_iter() + .map(|(chunk_info, _)| chunk_info) + .collect(); + error!("When paying {} chunks, got error {err:?}", reply_list.len()); + TaskResult::ErrorEntries(reply_list) + } + }; + let pay_for_chunk_sender_clone = pay_for_chunk_sender.clone(); + let _handle = tokio::spawn(async move { + let _ = pay_for_chunk_sender_clone.send(result).await; + }); + + cost_map = BTreeMap::new(); + } + } + trace!("Paying thread terminated"); + }); + } + + fn spawn_upload_chunk_task( + &self, + chunk_info: ChunkInfo, + payee: PeerId, + upload_chunk_sender: mpsc::Sender, + ) { + trace!("spawning upload chunk task"); + let files_api = self.api.clone(); + let verify_store = self.verify_store; + + let _handle = tokio::spawn(async move { + let (chunk_info, result) = + Self::upload_chunk(files_api, chunk_info, payee, verify_store).await; + + debug!( + "Chunk {:?} uploaded with result {:?}", + chunk_info.name, result + ); + if result.is_ok() { + let _ = upload_chunk_sender + .send(TaskResult::UploadChunksOK(chunk_info.name)) + .await; + } else { + let _ = upload_chunk_sender + .send(TaskResult::ErrorEntries(vec![chunk_info])) + .await; + } + }); + } +} + +async fn progress_tasks( + get_store_cost_receiver: &mut mpsc::Receiver, + paid_chunk_receiver: &mut mpsc::Receiver, + upload_chunk_receiver: &mut mpsc::Receiver, +) -> Option { + tokio::select! { + get_store_cost_event = get_store_cost_receiver.recv() => { + get_store_cost_event + } + paid_chunk_event = paid_chunk_receiver.recv() => { + paid_chunk_event + } + upload_chunk_event = upload_chunk_receiver.recv() => { + upload_chunk_event + } } } diff --git a/sn_client/src/wallet.rs b/sn_client/src/wallet.rs index ce456ff782..fda786e858 100644 --- a/sn_client/src/wallet.rs +++ b/sn_client/src/wallet.rs @@ -12,7 +12,7 @@ use super::{error::Result, Client}; use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::{future::join_all, TryFutureExt}; use libp2p::PeerId; -use sn_networking::GetRecordError; +use sn_networking::{GetRecordError, PayeeQuote}; use sn_protocol::NetworkAddress; use sn_transfers::{ CashNote, DerivationIndex, LocalWallet, MainPubkey, NanoTokens, Payment, PaymentQuote, @@ -168,7 +168,7 @@ impl WalletClient { pub async fn get_store_cost_at_address( &self, address: NetworkAddress, - ) -> WalletResult<(PeerId, MainPubkey, PaymentQuote)> { + ) -> WalletResult { self.client .network .get_store_costs_from_network(address) diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 26cda34746..06733dcac3 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -63,6 +63,9 @@ use tokio::sync::{ oneshot, }; +/// The type of quote for a selected payee. +pub type PayeeQuote = (PeerId, MainPubkey, PaymentQuote); + /// The maximum number of peers to return in a `GetClosestPeers` response. /// This is the group size used in safe network protocol to be responsible for /// an item in the network. @@ -337,10 +340,11 @@ impl Network { } /// Get the store costs from the majority of the closest peers to the provided RecordKey. + /// Record already exists will have a cost of zero to be returned. pub async fn get_store_costs_from_network( &self, record_address: NetworkAddress, - ) -> Result<(PeerId, MainPubkey, PaymentQuote)> { + ) -> Result { // The requirement of having at least CLOSE_GROUP_SIZE // close nodes will be checked internally automatically. let close_nodes = self.get_closest_peers(&record_address, true).await?; @@ -386,8 +390,8 @@ impl Network { }); // Ensure we dont have any further out nodes than `close_group_majority()` - // This should ensure that if we didnt get all responses from close nodes, we're less likely to be - // paying a node that is not in the CLOSE_GROUP + // This should ensure that if we didnt get all responses from close nodes, + // we're less likely to be paying a node that is not in the CLOSE_GROUP let all_costs = all_costs.into_iter().take(close_group_majority()).collect(); get_fees_from_store_cost_responses(all_costs) @@ -777,7 +781,7 @@ impl Network { /// Closest requiring it to be within CLOSE_GROUP nodes fn get_fees_from_store_cost_responses( mut all_costs: Vec<(NetworkAddress, MainPubkey, PaymentQuote)>, -) -> Result<(PeerId, MainPubkey, PaymentQuote)> { +) -> Result { // sort all costs by fee, lowest to highest // if there's a tie in cost, sort by pubkey all_costs.sort_by( diff --git a/sn_node/tests/verify_data_location.rs b/sn_node/tests/verify_data_location.rs index 93c5fe2665..f7211d3922 100644 --- a/sn_node/tests/verify_data_location.rs +++ b/sn_node/tests/verify_data_location.rs @@ -149,12 +149,12 @@ fn query_content_task() { let mut rng = OsRng; let client = get_gossip_client().await; // let's choose a random content to query - let rando_addr = ChunkAddress::new(XorName::random(&mut rng)); - tracing::trace!("Querying content {rando_addr:?}"); + let random_addr = ChunkAddress::new(XorName::random(&mut rng)); + tracing::trace!("Querying content {random_addr:?}"); - if let Err(error) = client.get_chunk(rando_addr, false).await { + if let Err(error) = client.get_chunk(random_addr, false).await { tracing::error!( - "Error querying content rando content (as expected) {rando_addr:?} : {error}" + "Error querying content random content (as expected) {random_addr:?} : {error}" ); } @@ -311,6 +311,9 @@ async fn store_chunks(client: Client, chunk_count: usize, wallet_dir: PathBuf) - let start = Instant::now(); let mut rng = OsRng; let files_api = FilesApi::new(client, wallet_dir); + let mut file_upload = FilesUpload::new(files_api) + .set_show_holders(true) + .set_verify_store(false); let mut uploaded_chunks_count = 0; loop { @@ -340,9 +343,6 @@ async fn store_chunks(client: Client, chunk_count: usize, wallet_dir: PathBuf) - let key = PrettyPrintRecordKey::from(&RecordKey::new(&head_chunk_addr.xorname())).into_owned(); - let mut file_upload = FilesUpload::new(files_api.clone()) - .set_show_holders(true) - .set_verify_store(false); file_upload.upload_chunks(chunks).await?; uploaded_chunks_count += 1;