diff --git a/ant-networking/src/cmd.rs b/ant-networking/src/cmd.rs index b56febac14..4be0e0a0ad 100644 --- a/ant-networking/src/cmd.rs +++ b/ant-networking/src/cmd.rs @@ -153,6 +153,10 @@ pub enum LocalSwarmCmd { AddNetworkDensitySample { distance: Distance, }, + /// Send peer scores (collected from storage challenge) to replication_fetcher + NotifyPeerScores { + peer_scores: Vec<(PeerId, bool)>, + }, } /// Commands to send to the Swarm @@ -312,6 +316,9 @@ impl Debug for LocalSwarmCmd { LocalSwarmCmd::AddNetworkDensitySample { distance } => { write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})") } + LocalSwarmCmd::NotifyPeerScores { peer_scores } => { + write!(f, "LocalSwarmCmd::NotifyPeerScores({peer_scores:?})") + } } } } @@ -932,6 +939,10 @@ impl SwarmDriver { cmd_string = "AddNetworkDensitySample"; self.network_density_samples.add(distance); } + LocalSwarmCmd::NotifyPeerScores { peer_scores } => { + cmd_string = "NotifyPeerScores"; + self.replication_fetcher.add_peer_scores(peer_scores); + } } self.log_handling(cmd_string.to_string(), start.elapsed()); diff --git a/ant-networking/src/event/request_response.rs b/ant-networking/src/event/request_response.rs index 4d8de2131f..97991de5e5 100644 --- a/ant-networking/src/event/request_response.rs +++ b/ant-networking/src/event/request_response.rs @@ -46,7 +46,7 @@ impl SwarmDriver { channel: MsgResponder::FromPeer(channel), }); - self.add_keys_to_replication_fetcher(holder, keys); + self.add_keys_to_replication_fetcher(holder, keys, false); } Request::Cmd(ant_protocol::messages::Cmd::PeerConsideredAsBad { detected_by, @@ -160,6 +160,7 @@ impl SwarmDriver { &mut self, sender: NetworkAddress, incoming_keys: Vec<(NetworkAddress, ValidationType)>, + is_fresh_replicate: bool, ) { let holder = if let Some(peer_id) = sender.as_peer_id() { peer_id @@ -192,9 +193,9 @@ impl SwarmDriver { .kademlia .store_mut() .record_addresses_ref(); - let keys_to_fetch = self - .replication_fetcher - .add_keys(holder, incoming_keys, all_keys); + let keys_to_fetch = + self.replication_fetcher + .add_keys(holder, incoming_keys, all_keys, is_fresh_replicate); if keys_to_fetch.is_empty() { debug!("no waiting keys to fetch from the network"); } else { diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index c3184156ed..b0ff8fa09c 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -1104,6 +1104,10 @@ impl Network { self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance }) } + pub fn notify_peer_scores(&self, peer_scores: Vec<(PeerId, bool)>) { + self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerScores { peer_scores }) + } + /// Helper to send NetworkSwarmCmd fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) { send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd); diff --git a/ant-networking/src/replication_fetcher.rs b/ant-networking/src/replication_fetcher.rs index a4a16abe41..82c3e0390e 100644 --- a/ant-networking/src/replication_fetcher.rs +++ b/ant-networking/src/replication_fetcher.rs @@ -8,7 +8,7 @@ #![allow(clippy::mutable_key_type)] use crate::time::spawn; -use crate::{event::NetworkEvent, time::Instant}; +use crate::{event::NetworkEvent, time::Instant, CLOSE_GROUP_SIZE}; use ant_evm::U256; use ant_protocol::{ convert_distance_to_u256, storage::ValidationType, NetworkAddress, PrettyPrintRecordKey, @@ -17,7 +17,7 @@ use libp2p::{ kad::{KBucketDistance as Distance, RecordKey, K_VALUE}, PeerId, }; -use std::collections::{hash_map::Entry, BTreeSet, HashMap}; +use std::collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque}; use tokio::{sync::mpsc, time::Duration}; // Max parallel fetches that can be undertaken at the same time. @@ -50,6 +50,13 @@ pub(crate) struct ReplicationFetcher { /// used when the node is full, but we still have "close" data coming in /// that is _not_ closer than our farthest max record farthest_acceptable_distance: Option, + /// Scoring of peers collected from storage_challenge. + /// To be a trustworthy replication source, the peer must has two latest scoring both healthy. + peers_scores: HashMap, Instant)>, + /// During startup, when the knowledge of peers scoring hasn't been built up, + /// only records got `majority` of replicated in copies shall be trusted. + /// This is the temp container to accumulate those intitial replicated in records. + initial_replicates: HashMap<(NetworkAddress, ValidationType), HashSet>, } impl ReplicationFetcher { @@ -62,6 +69,8 @@ impl ReplicationFetcher { event_sender, distance_range: None, farthest_acceptable_distance: None, + peers_scores: HashMap::new(), + initial_replicates: HashMap::new(), } } @@ -73,97 +82,50 @@ impl ReplicationFetcher { // Adds the non existing incoming keys from the peer to the fetcher. // Returns the next set of keys that has to be fetched from the peer/network. // - // Note: the `incoming_keys` shall already got filter for existence. + // Note: for `fresh_replicate`, the verification is on payment and got undertaken by the caller + // Hence here it shall always be considered as valid to fetch. pub(crate) fn add_keys( &mut self, holder: PeerId, incoming_keys: Vec<(NetworkAddress, ValidationType)>, locally_stored_keys: &HashMap, + is_fresh_replicate: bool, ) -> Vec<(PeerId, RecordKey)> { - // Pre-calculate self_address since it's used multiple times - let self_address = NetworkAddress::from_peer(self.self_peer_id); - let total_incoming_keys = incoming_keys.len(); - - // Avoid multiple allocations by using with_capacity - let mut new_incoming_keys = Vec::with_capacity(incoming_keys.len()); - let mut keys_to_fetch = Vec::new(); - let mut out_of_range_keys = Vec::new(); - - // Single pass filtering instead of multiple retain() calls - for (addr, record_type) in incoming_keys { - let key = addr.to_record_key(); - - // Skip if locally stored or already pending fetch - if locally_stored_keys.contains_key(&key) - || self - .to_be_fetched - .contains_key(&(key.clone(), record_type.clone(), holder)) - { - continue; - } - - // Check distance constraints - if let Some(farthest_distance) = self.farthest_acceptable_distance { - if self_address.distance(&addr) > farthest_distance { - out_of_range_keys.push(addr); - continue; - } - } - - new_incoming_keys.push((addr, record_type)); - } + let candidates = if is_fresh_replicate { + incoming_keys + .into_iter() + .map(|(addr, val_type)| (holder, addr, val_type)) + .collect() + } else { + self.valid_candidates(&holder, incoming_keys, locally_stored_keys) + }; // Remove any outdated entries in `to_be_fetched` self.remove_stored_keys(locally_stored_keys); - - // Special case for single new key - if new_incoming_keys.len() == 1 { - let (record_address, record_type) = new_incoming_keys[0].clone(); - - let new_data_key = (record_address.to_record_key(), record_type); - - if let Entry::Vacant(entry) = self.on_going_fetches.entry(new_data_key.clone()) { - let (record_key, _record_type) = new_data_key; - keys_to_fetch.push((holder, record_key)); - let _ = entry.insert((holder, Instant::now() + FETCH_TIMEOUT)); - } - - // To avoid later on un-necessary actions. - new_incoming_keys.clear(); - } - self.to_be_fetched .retain(|_, time_out| *time_out > Instant::now()); - let mut out_of_range_keys = vec![]; - // Filter out those out_of_range ones among the incoming_keys. - if let Some(ref distance_range) = self.distance_range { - new_incoming_keys.retain(|(addr, _record_type)| { - let is_in_range = - convert_distance_to_u256(&self_address.distance(addr)) <= *distance_range; - if !is_in_range { - out_of_range_keys.push(addr.clone()); - } - is_in_range - }); - } - - if !out_of_range_keys.is_empty() { - info!("Among {total_incoming_keys} incoming replications from {holder:?}, found {} out of range", out_of_range_keys.len()); - } - - // add in-range AND non existing keys to the fetcher - new_incoming_keys + let mut keys_to_fetch = vec![]; + // add valid, in-range AND non existing keys to the fetcher + candidates .into_iter() - .for_each(|(addr, record_type)| { - let _ = self - .to_be_fetched - .entry((addr.to_record_key(), record_type, holder)) - .or_insert(Instant::now() + PENDING_TIMEOUT); + .for_each(|(peer_id, addr, record_type)| { + if is_fresh_replicate { + // Fresh replicate shall always got prioritized. + let new_data_key = (addr.to_record_key(), record_type); + if let Entry::Vacant(entry) = self.on_going_fetches.entry(new_data_key) { + keys_to_fetch.push((holder, addr.to_record_key())); + let _ = entry.insert((holder, Instant::now() + FETCH_TIMEOUT)); + } + } else { + let _ = self + .to_be_fetched + .entry((addr.to_record_key(), record_type, peer_id)) + .or_insert(Instant::now() + PENDING_TIMEOUT); + } }); keys_to_fetch.extend(self.next_keys_to_fetch()); - keys_to_fetch } @@ -327,6 +289,196 @@ impl ReplicationFetcher { .collect() } + // Record peers' healthy status after the storage chanllenge. + pub(crate) fn add_peer_scores(&mut self, scores: Vec<(PeerId, bool)>) { + for (peer_id, is_healthy) in scores { + let (peer_scores, last_seen) = self + .peers_scores + .entry(peer_id) + .or_insert((VecDeque::new(), Instant::now())); + peer_scores.push_back(is_healthy); + if peer_scores.len() > 2 { + let _ = peer_scores.pop_front(); + } + *last_seen = Instant::now(); + } + + // Once got enough scoring knowledge, the `majority` approach shall no longer be used. + if self.had_enough_scoring_knowledge() { + self.initial_replicates.clear(); + } + + // Pruning to avoid infinite growing, only keep the recent 20. + if self.peers_scores.len() > 20 { + let mut oldest_peer = PeerId::random(); + let mut oldest_timestamp = Instant::now(); + for (peer_id, (_peer_scores, last_seen)) in self.peers_scores.iter() { + if *last_seen < oldest_timestamp { + oldest_timestamp = *last_seen; + oldest_peer = *peer_id; + } + } + let _ = self.peers_scores.remove(&oldest_peer); + } + } + + // Among the incoming keys, figure out those: + // * not already stored + // * not on pending + // * within the range + // * from valid source peer + fn valid_candidates( + &mut self, + holder: &PeerId, + incoming_keys: Vec<(NetworkAddress, ValidationType)>, + locally_stored_keys: &HashMap, + ) -> Vec<(PeerId, NetworkAddress, ValidationType)> { + match self.is_peer_trustworthy(holder) { + Some(true) => { + let new_incoming_keys = + self.in_range_new_keys(holder, incoming_keys, locally_stored_keys); + new_incoming_keys + .into_iter() + .map(|(addr, val_type)| (*holder, addr, val_type)) + .collect() + } + Some(false) => vec![], + None => { + // Whenever we had enough scoring knowledge of peers, + // we shall no longer use the `majority copies` approach. + // This can prevent malicious neighbouring farming targeting existing nodes. + if self.had_enough_scoring_knowledge() { + // The replication source is probably a `new peer`. + // Just wait for the scoring knowledge to be built up. + return vec![]; + } + let new_incoming_keys = + self.in_range_new_keys(holder, incoming_keys, locally_stored_keys); + self.initial_majority_replicates(holder, new_incoming_keys) + } + } + } + + fn had_enough_scoring_knowledge(&self) -> bool { + self.peers_scores + .values() + .filter(|(scores, _last_seen)| scores.len() > 1) + .count() + >= CLOSE_GROUP_SIZE + } + + // Accumulates initial replicates when doesn't have enough knowledge of peers scores. + // Returns with entries that reached majority copies. + fn initial_majority_replicates( + &mut self, + holder: &PeerId, + incoming_keys: Vec<(NetworkAddress, ValidationType)>, + ) -> Vec<(PeerId, NetworkAddress, ValidationType)> { + let mut majorities = vec![]; + for addr_val_type in incoming_keys { + let peers = self + .initial_replicates + .entry(addr_val_type.clone()) + .or_default(); + let _ = peers.insert(*holder); + if peers.len() > CLOSE_GROUP_SIZE / 2 { + majorities.push(addr_val_type); + } + } + + let mut result = vec![]; + for addr_val_type in majorities { + if let Some(peers) = self.initial_replicates.remove(&addr_val_type) { + for peer in peers { + result.push((peer, addr_val_type.0.clone(), addr_val_type.1.clone())); + } + } + } + + result + } + + // Among the incoming keys, figure out those: + // * not already stored + // * not on pending + // * within the range + fn in_range_new_keys( + &mut self, + holder: &PeerId, + incoming_keys: Vec<(NetworkAddress, ValidationType)>, + locally_stored_keys: &HashMap, + ) -> Vec<(NetworkAddress, ValidationType)> { + // Pre-calculate self_address since it's used multiple times + let self_address = NetworkAddress::from_peer(self.self_peer_id); + let total_incoming_keys = incoming_keys.len(); + + // Avoid multiple allocations by using with_capacity + let mut new_incoming_keys = Vec::with_capacity(incoming_keys.len()); + let mut out_of_range_keys = Vec::new(); + + // Single pass filtering instead of multiple retain() calls + for (addr, record_type) in incoming_keys { + let key = addr.to_record_key(); + + // Skip if locally stored or already pending fetch + if locally_stored_keys.contains_key(&key) + || self + .to_be_fetched + .contains_key(&(key.clone(), record_type.clone(), *holder)) + { + continue; + } + + // Check distance constraints + if let Some(farthest_distance) = self.farthest_acceptable_distance { + if self_address.distance(&addr) > farthest_distance { + out_of_range_keys.push(addr); + continue; + } + } + + new_incoming_keys.push((addr, record_type)); + } + + // Filter out those out_of_range ones among the incoming_keys. + if let Some(ref distance_range) = self.distance_range { + new_incoming_keys.retain(|(addr, _record_type)| { + let is_in_range = + convert_distance_to_u256(&self_address.distance(addr)) <= *distance_range; + if !is_in_range { + out_of_range_keys.push(addr.clone()); + } + is_in_range + }); + } + + if !out_of_range_keys.is_empty() { + info!("Among {total_incoming_keys} incoming replications from {holder:?}, found {} out of range", out_of_range_keys.len()); + } + + new_incoming_keys + } + + // Check whether the peer is a trustworthy replication source. + // * Some(true) : peer is trustworthy + // * Some(false) : peer is not trustworthy + // * None : not having enough know to tell + fn is_peer_trustworthy(&self, holder: &PeerId) -> Option { + if let Some((scores, _last_seen)) = self.peers_scores.get(holder) { + if scores.len() > 1 { + let is_healthy = scores.iter().filter(|is_health| **is_health).count() > 1; + if !is_healthy { + info!("Peer {holder:?} is not a trustworthy replication source, as bearing scores of {scores:?}"); + } + Some(is_healthy) + } else { + None + } + } else { + None + } + } + // Just remove outdated entries in `on_going_fetch`, indicates a failure to fetch from network. // The node then considered to be in trouble and: // 1, the pending_entries from that node shall be removed from `to_be_fetched` list. @@ -433,8 +585,16 @@ mod tests { incoming_keys.push((key, ValidationType::Chunk)); }); - let keys_to_fetch = - replication_fetcher.add_keys(PeerId::random(), incoming_keys, &locally_stored_keys); + let replication_src = PeerId::random(); + replication_fetcher.add_peer_scores(vec![(replication_src, true)]); + replication_fetcher.add_peer_scores(vec![(replication_src, true)]); + + let keys_to_fetch = replication_fetcher.add_keys( + replication_src, + incoming_keys, + &locally_stored_keys, + false, + ); assert_eq!(keys_to_fetch.len(), MAX_PARALLEL_FETCH); // we should not fetch anymore keys @@ -443,22 +603,24 @@ mod tests { let random_data: Vec = (0..50).map(|_| rand::random::()).collect(); let key_2 = NetworkAddress::from_record_key(&RecordKey::from(random_data)); let keys_to_fetch = replication_fetcher.add_keys( - PeerId::random(), + replication_src, vec![ (key_1, ValidationType::Chunk), (key_2, ValidationType::Chunk), ], &locally_stored_keys, + false, ); assert!(keys_to_fetch.is_empty()); - // List with length of 1 will be considered as `new data` and to be fetched immediately + // Fresh replication shall be fetched immediately let random_data: Vec = (0..50).map(|_| rand::random::()).collect(); let key = NetworkAddress::from_record_key(&RecordKey::from(random_data)); let keys_to_fetch = replication_fetcher.add_keys( - PeerId::random(), + replication_src, vec![(key, ValidationType::Chunk)], &locally_stored_keys, + true, ); assert!(!keys_to_fetch.is_empty()); @@ -502,8 +664,16 @@ mod tests { incoming_keys.push((key, ValidationType::Chunk)); }); - let keys_to_fetch = - replication_fetcher.add_keys(PeerId::random(), incoming_keys, &Default::default()); + let replication_src = PeerId::random(); + replication_fetcher.add_peer_scores(vec![(replication_src, true)]); + replication_fetcher.add_peer_scores(vec![(replication_src, true)]); + + let keys_to_fetch = replication_fetcher.add_keys( + replication_src, + incoming_keys, + &Default::default(), + false, + ); assert_eq!( keys_to_fetch.len(), replication_fetcher.on_going_fetches.len(), diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index 81395821f4..3110c5d7fa 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -940,20 +940,26 @@ impl Node { }); } + let mut peer_scores = vec![]; while let Some(res) = tasks.join_next().await { match res { Ok((peer_id, score)) => { - if score < MIN_ACCEPTABLE_HEALTHY_SCORE { + let is_healthy = score < MIN_ACCEPTABLE_HEALTHY_SCORE; + if !is_healthy { info!("Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."); // TODO: shall the challenge failure immediately triggers the node to be removed? network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); } + peer_scores.push((peer_id, is_healthy)); } Err(e) => { info!("StorageChallenge task completed with error {e:?}"); } } } + if !peer_scores.is_empty() { + network.notify_peer_scores(peer_scores); + } info!( "Completed node StorageChallenge against neighbours in {:?}!",