diff --git a/ant-networking/src/cmd.rs b/ant-networking/src/cmd.rs index 3f7008bdf8..a4a88d18dc 100644 --- a/ant-networking/src/cmd.rs +++ b/ant-networking/src/cmd.rs @@ -72,7 +72,7 @@ pub enum LocalSwarmCmd { /// In case the range is too narrow, returns at lease CLOSE_GROUP_SIZE peers. GetReplicateCandidates { data_addr: NetworkAddress, - sender: oneshot::Sender>, + sender: oneshot::Sender>>, }, // Returns up to K_VALUE peers from all the k-buckets from the local Routing Table. // And our PeerId as well. @@ -88,11 +88,11 @@ pub enum LocalSwarmCmd { /// Check if the local RecordStore contains the provided key RecordStoreHasKey { key: RecordKey, - sender: oneshot::Sender, + sender: oneshot::Sender>, }, /// Get the Addresses of all the Records held locally GetAllLocalRecordAddresses { - sender: oneshot::Sender>, + sender: oneshot::Sender>>, }, /// Get data from the local RecordStore GetLocalRecord { @@ -105,7 +105,7 @@ pub enum LocalSwarmCmd { key: RecordKey, data_type: u32, data_size: usize, - sender: oneshot::Sender>, + sender: oneshot::Sender>, }, /// Notify the node received a payment. PaymentReceived, @@ -593,7 +593,7 @@ impl SwarmDriver { ) = self.kbuckets_status(); let estimated_network_size = Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets); - let Some((quoting_metrics, is_already_stored)) = self + let (quoting_metrics, is_already_stored) = match self .swarm .behaviour_mut() .kademlia @@ -603,12 +603,13 @@ impl SwarmDriver { data_type, data_size, Some(estimated_network_size as u64), - ) - else { - let _res = sender.send(None); - return Ok(()); + ) { + Ok(res) => res, + Err(err) => { + let _res = sender.send(Err(err)); + return Ok(()); + } }; - self.record_metrics(Marker::QuotingMetrics { quoting_metrics: "ing_metrics, }); @@ -647,7 +648,7 @@ impl SwarmDriver { .retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance); } - let _res = sender.send(Some((quoting_metrics, is_already_stored))); + let _res = sender.send(Ok((quoting_metrics, is_already_stored))); } LocalSwarmCmd::PaymentReceived => { cmd_string = "PaymentReceived"; @@ -718,7 +719,7 @@ impl SwarmDriver { .behaviour_mut() .kademlia .store_mut() - .get_farthest(); + .get_farthest()?; self.replication_fetcher.set_farthest_on_full(farthest); } Err(_) => { @@ -746,7 +747,7 @@ impl SwarmDriver { .behaviour_mut() .kademlia .store_mut() - .get_farthest_replication_distance() + .get_farthest_replication_distance()? { self.replication_fetcher .set_replication_distance_range(distance); @@ -1064,7 +1065,7 @@ impl SwarmDriver { self.last_replication = Some(Instant::now()); let self_addr = NetworkAddress::from_peer(self.self_peer_id); - let mut replicate_targets = self.get_replicate_candidates(&self_addr); + let mut replicate_targets = self.get_replicate_candidates(&self_addr)?; let now = Instant::now(); self.replication_targets @@ -1080,7 +1081,7 @@ impl SwarmDriver { .behaviour_mut() .kademlia .store_mut() - .record_addresses_ref() + .record_addresses_ref()? .values() .cloned() .collect(); @@ -1115,7 +1116,10 @@ impl SwarmDriver { // Note that: // * For general replication, replicate candidates shall be the closest to self // * For replicate fresh records, the replicate candidates shall be the closest to data - pub(crate) fn get_replicate_candidates(&mut self, target: &NetworkAddress) -> Vec { + pub(crate) fn get_replicate_candidates( + &mut self, + target: &NetworkAddress, + ) -> Result> { // get closest peers from buckets, sorted by increasing distance to the target let kbucket_key = target.as_kbucket_key(); let closest_k_peers: Vec = self @@ -1132,21 +1136,21 @@ impl SwarmDriver { .behaviour_mut() .kademlia .store_mut() - .get_farthest_replication_distance() + .get_farthest_replication_distance()? { let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range); if peers_in_range.len() >= CLOSE_GROUP_SIZE { - return peers_in_range; + return Ok(peers_in_range); } } // In case the range is too narrow, fall back to at least CLOSE_GROUP_SIZE peers. - closest_k_peers + Ok(closest_k_peers .iter() .take(CLOSE_GROUP_SIZE) .cloned() - .collect() + .collect()) } } diff --git a/ant-networking/src/error.rs b/ant-networking/src/error.rs index ee066a850c..66c4b75086 100644 --- a/ant-networking/src/error.rs +++ b/ant-networking/src/error.rs @@ -123,6 +123,9 @@ pub enum NetworkError { #[error("Record header is incorrect")] InCorrectRecordHeader, + #[error("The operation is not allowed on a client record store")] + OperationNotAllowedOnClientRecordStore, + // ---------- Chunk Errors #[error("Failed to verify the ChunkProof with the provided quorum")] FailedToVerifyChunkProof(NetworkAddress), diff --git a/ant-networking/src/event/request_response.rs b/ant-networking/src/event/request_response.rs index 4d8de2131f..eb4596d981 100644 --- a/ant-networking/src/event/request_response.rs +++ b/ant-networking/src/event/request_response.rs @@ -46,7 +46,7 @@ impl SwarmDriver { channel: MsgResponder::FromPeer(channel), }); - self.add_keys_to_replication_fetcher(holder, keys); + self.add_keys_to_replication_fetcher(holder, keys)?; } Request::Cmd(ant_protocol::messages::Cmd::PeerConsideredAsBad { detected_by, @@ -160,12 +160,12 @@ impl SwarmDriver { &mut self, sender: NetworkAddress, incoming_keys: Vec<(NetworkAddress, ValidationType)>, - ) { + ) -> Result<(), NetworkError> { let holder = if let Some(peer_id) = sender.as_peer_id() { peer_id } else { warn!("Replication list sender is not a peer_id {sender:?}"); - return; + return Ok(()); }; debug!( @@ -178,7 +178,7 @@ impl SwarmDriver { let closest_k_peers = self.get_closest_k_value_local_peers(); if !closest_k_peers.contains(&holder) || holder == self.self_peer_id { debug!("Holder {holder:?} is self or not in replication range."); - return; + return Ok(()); } // On receive a replication_list from a close_group peer, we undertake: @@ -191,7 +191,7 @@ impl SwarmDriver { .behaviour_mut() .kademlia .store_mut() - .record_addresses_ref(); + .record_addresses_ref()?; let keys_to_fetch = self .replication_fetcher .add_keys(holder, incoming_keys, all_keys); @@ -200,5 +200,7 @@ impl SwarmDriver { } else { self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch)); } + + Ok(()) } } diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index 568f599559..1232c9be00 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -276,9 +276,11 @@ impl Network { let (sender, receiver) = oneshot::channel(); self.send_local_swarm_cmd(LocalSwarmCmd::GetReplicateCandidates { data_addr, sender }); - receiver + let candidate = receiver .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) + .map_err(|_e| NetworkError::InternalMsgChannelDropped)??; + + Ok(candidate) } /// Get the Chunk existence proof from the close nodes to the provided chunk address. @@ -816,7 +818,7 @@ impl Network { key: RecordKey, data_type: u32, data_size: usize, - ) -> Result> { + ) -> Result<(QuotingMetrics, bool)> { let (sender, receiver) = oneshot::channel(); self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics { key, @@ -825,9 +827,10 @@ impl Network { sender, }); - receiver + let quoting_metrics = receiver .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) + .map_err(|_e| NetworkError::InternalMsgChannelDropped)??; + Ok(quoting_metrics) } /// Notify the node receicced a payment. @@ -998,9 +1001,11 @@ impl Network { sender, }); - receiver + let is_present = receiver .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) + .map_err(|_e| NetworkError::InternalMsgChannelDropped)??; + + Ok(is_present) } /// Returns the Addresses of all the locally stored Records @@ -1010,9 +1015,10 @@ impl Network { let (sender, receiver) = oneshot::channel(); self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender }); - receiver + let addrs = receiver .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) + .map_err(|_e| NetworkError::InternalMsgChannelDropped)??; + Ok(addrs) } /// Send `Request` to the given `PeerId` and await for the response. If `self` is the recipient, diff --git a/ant-networking/src/record_store.rs b/ant-networking/src/record_store.rs index 91dd8e2c43..14727a27fb 100644 --- a/ant-networking/src/record_store.rs +++ b/ant-networking/src/record_store.rs @@ -933,29 +933,7 @@ impl RecordStore for NodeRecordStore { /// A place holder RecordStore impl for the client that does nothing #[derive(Default, Debug)] -pub struct ClientRecordStore { - empty_record_addresses: HashMap, -} - -impl ClientRecordStore { - pub(crate) fn contains(&self, _key: &Key) -> bool { - false - } - - pub(crate) fn record_addresses(&self) -> HashMap { - HashMap::new() - } - - pub(crate) fn record_addresses_ref(&self) -> &HashMap { - &self.empty_record_addresses - } - - pub(crate) fn put_verified(&mut self, _r: Record, _record_type: ValidationType) -> Result<()> { - Ok(()) - } - - pub(crate) fn mark_as_stored(&mut self, _r: Key, _t: ValidationType) {} -} +pub struct ClientRecordStore {} impl RecordStore for ClientRecordStore { type RecordsIter<'a> = vec::IntoIter>; diff --git a/ant-networking/src/record_store_api.rs b/ant-networking/src/record_store_api.rs index 20228c2449..88d44735b4 100644 --- a/ant-networking/src/record_store_api.rs +++ b/ant-networking/src/record_store_api.rs @@ -7,13 +7,11 @@ // permissions and limitations relating to use of the SAFE Network Software. #![allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress +use crate::error::{NetworkError, Result}; use crate::record_store::{ClientRecordStore, NodeRecordStore}; use ant_evm::{QuotingMetrics, U256}; use ant_protocol::{storage::ValidationType, NetworkAddress}; -use libp2p::kad::{ - store::{RecordStore, Result}, - ProviderRecord, Record, RecordKey, -}; +use libp2p::kad::{store::RecordStore, ProviderRecord, Record, RecordKey}; use std::{borrow::Cow, collections::HashMap}; pub enum UnifiedRecordStore { @@ -32,7 +30,7 @@ impl RecordStore for UnifiedRecordStore { } } - fn put(&mut self, r: Record) -> Result<()> { + fn put(&mut self, r: Record) -> libp2p::kad::store::Result<()> { match self { Self::Client(store) => store.put(r), Self::Node(store) => store.put(r), @@ -53,7 +51,7 @@ impl RecordStore for UnifiedRecordStore { } } - fn add_provider(&mut self, record: ProviderRecord) -> Result<()> { + fn add_provider(&mut self, record: ProviderRecord) -> libp2p::kad::store::Result<()> { match self { Self::Client(store) => store.add_provider(record), Self::Node(store) => store.add_provider(record), @@ -83,32 +81,48 @@ impl RecordStore for UnifiedRecordStore { } impl UnifiedRecordStore { - pub(crate) fn contains(&self, key: &RecordKey) -> bool { + pub(crate) fn contains(&self, key: &RecordKey) -> Result { match self { - Self::Client(store) => store.contains(key), - Self::Node(store) => store.contains(key), + Self::Client(_) => { + error!("Calling 'contains' at Client. This should not happen"); + Err(NetworkError::OperationNotAllowedOnClientRecordStore) + } + Self::Node(store) => Ok(store.contains(key)), } } - pub(crate) fn record_addresses(&self) -> HashMap { + pub(crate) fn record_addresses(&self) -> Result> { match self { - Self::Client(store) => store.record_addresses(), - Self::Node(store) => store.record_addresses(), + Self::Client(_) => { + error!("Calling record_addresses at Client. This should not happen"); + Err(NetworkError::OperationNotAllowedOnClientRecordStore) + } + Self::Node(store) => Ok(store.record_addresses()), } } pub(crate) fn record_addresses_ref( &self, - ) -> &HashMap { + ) -> Result<&HashMap> { match self { - Self::Client(store) => store.record_addresses_ref(), - Self::Node(store) => store.record_addresses_ref(), + Self::Client(_) => { + error!("Calling record_addresses_ref at Client. This should not happen"); + Err(NetworkError::OperationNotAllowedOnClientRecordStore) + } + Self::Node(store) => Ok(store.record_addresses_ref()), } } - pub(crate) fn put_verified(&mut self, r: Record, record_type: ValidationType) -> Result<()> { + pub(crate) fn put_verified( + &mut self, + r: Record, + record_type: ValidationType, + ) -> libp2p::kad::store::Result<()> { match self { - Self::Client(store) => store.put_verified(r, record_type), + Self::Client(_) => { + error!("Calling put_verified at Client. This should not happen"); + Ok(()) + } Self::Node(store) => store.put_verified(r, record_type), } } @@ -121,53 +135,50 @@ impl UnifiedRecordStore { data_type: u32, data_size: usize, network_size: Option, - ) -> Option<(QuotingMetrics, bool)> { + ) -> Result<(QuotingMetrics, bool)> { match self { Self::Client(_) => { - warn!("Calling quoting metrics calculation at Client. This should not happen"); - None - } - Self::Node(store) => { - Some(store.quoting_metrics(key, data_type, data_size, network_size)) + error!("Calling quoting_metrics at Client. This should not happen"); + Err(NetworkError::OperationNotAllowedOnClientRecordStore) } + Self::Node(store) => Ok(store.quoting_metrics(key, data_type, data_size, network_size)), } } pub(crate) fn payment_received(&mut self) { match self { Self::Client(_) => { - warn!("Calling payment_received at Client. This should not happen"); + error!("Calling payment_received at Client. This should not happen"); } Self::Node(store) => store.payment_received(), } } - pub(crate) fn get_farthest_replication_distance(&self) -> Option { + pub(crate) fn get_farthest_replication_distance(&self) -> Result> { match self { - Self::Client(_store) => { - warn!("Calling get_distance_range at Client. This should not happen"); - None + Self::Client(_) => { + error!( + "Calling get_farthest_replication_distance at Client. This should not happen" + ); + Err(NetworkError::OperationNotAllowedOnClientRecordStore) } - Self::Node(store) => store.get_responsible_distance_range(), + Self::Node(store) => Ok(store.get_responsible_distance_range()), } } pub(crate) fn set_distance_range(&mut self, distance: U256) { match self { - Self::Client(_store) => { - warn!("Calling set_distance_range at Client. This should not happen"); + Self::Client(_) => { + error!("Calling set_distance_range at Client. This should not happen"); } Self::Node(store) => store.set_responsible_distance_range(distance), } } - pub(crate) fn get_farthest(&self) -> Option { + pub(crate) fn get_farthest(&self) -> Result> { match self { - Self::Client(_store) => { - warn!("Calling get_farthest at Client. This should not happen"); - None - } - Self::Node(store) => store.get_farthest(), + Self::Client(_) => Err(NetworkError::OperationNotAllowedOnClientRecordStore), + Self::Node(store) => Ok(store.get_farthest()), } } @@ -176,7 +187,9 @@ impl UnifiedRecordStore { /// (to be done after writes are finalised) pub(crate) fn mark_as_stored(&mut self, k: RecordKey, record_type: ValidationType) { match self { - Self::Client(store) => store.mark_as_stored(k, record_type), + Self::Client(_) => { + error!("Calling mark_as_stored at Client. This should not happen"); + } Self::Node(store) => store.mark_as_stored(k, record_type), }; } @@ -184,7 +197,7 @@ impl UnifiedRecordStore { pub(crate) fn cleanup_irrelevant_records(&mut self) { match self { Self::Client(_store) => { - warn!("Calling cleanup_irrelevant_records at Client. This should not happen"); + error!("Calling cleanup_irrelevant_records at Client. This should not happen"); } Self::Node(store) => store.cleanup_irrelevant_records(), } diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index 9b7e4b8d26..81395821f4 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -601,7 +601,7 @@ impl Node { }; match maybe_quoting_metrics { - Ok(Some((quoting_metrics, is_already_stored))) => { + Ok((quoting_metrics, is_already_stored)) => { if is_already_stored { QueryResponse::GetStoreQuote { quote: Err(ProtocolError::RecordExists( @@ -623,14 +623,6 @@ impl Node { } } } - Ok(None) => { - error!("Quoting metrics not found for {key:?}. This might be because we are using a ClientRecordStore??. This should not happen"); - QueryResponse::GetStoreQuote { - quote: Err(ProtocolError::GetStoreQuoteFailed), - peer_address: NetworkAddress::from_peer(self_id), - storage_proofs, - } - } Err(err) => { warn!("GetStoreQuote failed for {key:?}: {err}"); QueryResponse::GetStoreQuote {