Skip to content

Commit

Permalink
chore: return error if client record store is used for node ops
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Jan 17, 2025
1 parent 3c7bbc2 commit ee235bc
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 105 deletions.
44 changes: 24 additions & 20 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub enum LocalSwarmCmd {
/// In case the range is too narrow, returns at lease CLOSE_GROUP_SIZE peers.
GetReplicateCandidates {
data_addr: NetworkAddress,
sender: oneshot::Sender<Vec<PeerId>>,
sender: oneshot::Sender<Result<Vec<PeerId>>>,
},
// Returns up to K_VALUE peers from all the k-buckets from the local Routing Table.
// And our PeerId as well.
Expand All @@ -88,11 +88,11 @@ pub enum LocalSwarmCmd {
/// Check if the local RecordStore contains the provided key
RecordStoreHasKey {
key: RecordKey,
sender: oneshot::Sender<bool>,
sender: oneshot::Sender<Result<bool>>,
},
/// Get the Addresses of all the Records held locally
GetAllLocalRecordAddresses {
sender: oneshot::Sender<HashMap<NetworkAddress, ValidationType>>,
sender: oneshot::Sender<Result<HashMap<NetworkAddress, ValidationType>>>,
},
/// Get data from the local RecordStore
GetLocalRecord {
Expand All @@ -105,7 +105,7 @@ pub enum LocalSwarmCmd {
key: RecordKey,
data_type: u32,
data_size: usize,
sender: oneshot::Sender<Option<(QuotingMetrics, bool)>>,
sender: oneshot::Sender<Result<(QuotingMetrics, bool)>>,
},
/// Notify the node received a payment.
PaymentReceived,
Expand Down Expand Up @@ -593,7 +593,7 @@ impl SwarmDriver {
) = self.kbuckets_status();
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
let Some((quoting_metrics, is_already_stored)) = self
let (quoting_metrics, is_already_stored) = match self
.swarm
.behaviour_mut()
.kademlia
Expand All @@ -603,12 +603,13 @@ impl SwarmDriver {
data_type,
data_size,
Some(estimated_network_size as u64),
)
else {
let _res = sender.send(None);
return Ok(());
) {
Ok(res) => res,
Err(err) => {
let _res = sender.send(Err(err));
return Ok(());
}
};

self.record_metrics(Marker::QuotingMetrics {
quoting_metrics: &quoting_metrics,
});
Expand Down Expand Up @@ -647,7 +648,7 @@ impl SwarmDriver {
.retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance);
}

let _res = sender.send(Some((quoting_metrics, is_already_stored)));
let _res = sender.send(Ok((quoting_metrics, is_already_stored)));
}
LocalSwarmCmd::PaymentReceived => {
cmd_string = "PaymentReceived";
Expand Down Expand Up @@ -718,7 +719,7 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest();
.get_farthest()?;
self.replication_fetcher.set_farthest_on_full(farthest);
}
Err(_) => {
Expand Down Expand Up @@ -746,7 +747,7 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest_replication_distance()
.get_farthest_replication_distance()?
{
self.replication_fetcher
.set_replication_distance_range(distance);
Expand Down Expand Up @@ -1064,7 +1065,7 @@ impl SwarmDriver {
self.last_replication = Some(Instant::now());

let self_addr = NetworkAddress::from_peer(self.self_peer_id);
let mut replicate_targets = self.get_replicate_candidates(&self_addr);
let mut replicate_targets = self.get_replicate_candidates(&self_addr)?;

let now = Instant::now();
self.replication_targets
Expand All @@ -1080,7 +1081,7 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref()
.record_addresses_ref()?
.values()
.cloned()
.collect();
Expand Down Expand Up @@ -1115,7 +1116,10 @@ impl SwarmDriver {
// Note that:
// * For general replication, replicate candidates shall be the closest to self
// * For replicate fresh records, the replicate candidates shall be the closest to data
pub(crate) fn get_replicate_candidates(&mut self, target: &NetworkAddress) -> Vec<PeerId> {
pub(crate) fn get_replicate_candidates(
&mut self,
target: &NetworkAddress,
) -> Result<Vec<PeerId>> {
// get closest peers from buckets, sorted by increasing distance to the target
let kbucket_key = target.as_kbucket_key();
let closest_k_peers: Vec<PeerId> = self
Expand All @@ -1132,21 +1136,21 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest_replication_distance()
.get_farthest_replication_distance()?
{
let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);

if peers_in_range.len() >= CLOSE_GROUP_SIZE {
return peers_in_range;
return Ok(peers_in_range);
}
}

// In case the range is too narrow, fall back to at least CLOSE_GROUP_SIZE peers.
closest_k_peers
Ok(closest_k_peers
.iter()
.take(CLOSE_GROUP_SIZE)
.cloned()
.collect()
.collect())
}
}

Expand Down
3 changes: 3 additions & 0 deletions ant-networking/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ pub enum NetworkError {
#[error("Record header is incorrect")]
InCorrectRecordHeader,

#[error("The operation is not allowed on a client record store")]
OperationNotAllowedOnClientRecordStore,

// ---------- Chunk Errors
#[error("Failed to verify the ChunkProof with the provided quorum")]
FailedToVerifyChunkProof(NetworkAddress),
Expand Down
12 changes: 7 additions & 5 deletions ant-networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl SwarmDriver {
channel: MsgResponder::FromPeer(channel),
});

self.add_keys_to_replication_fetcher(holder, keys);
self.add_keys_to_replication_fetcher(holder, keys)?;
}
Request::Cmd(ant_protocol::messages::Cmd::PeerConsideredAsBad {
detected_by,
Expand Down Expand Up @@ -160,12 +160,12 @@ impl SwarmDriver {
&mut self,
sender: NetworkAddress,
incoming_keys: Vec<(NetworkAddress, ValidationType)>,
) {
) -> Result<(), NetworkError> {
let holder = if let Some(peer_id) = sender.as_peer_id() {
peer_id
} else {
warn!("Replication list sender is not a peer_id {sender:?}");
return;
return Ok(());
};

debug!(
Expand All @@ -178,7 +178,7 @@ impl SwarmDriver {
let closest_k_peers = self.get_closest_k_value_local_peers();
if !closest_k_peers.contains(&holder) || holder == self.self_peer_id {
debug!("Holder {holder:?} is self or not in replication range.");
return;
return Ok(());
}

// On receive a replication_list from a close_group peer, we undertake:
Expand All @@ -191,7 +191,7 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
.record_addresses_ref()?;
let keys_to_fetch = self
.replication_fetcher
.add_keys(holder, incoming_keys, all_keys);
Expand All @@ -200,5 +200,7 @@ impl SwarmDriver {
} else {
self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch));
}

Ok(())
}
}
24 changes: 15 additions & 9 deletions ant-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,11 @@ impl Network {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetReplicateCandidates { data_addr, sender });

receiver
let candidate = receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
.map_err(|_e| NetworkError::InternalMsgChannelDropped)??;

Ok(candidate)
}

/// Get the Chunk existence proof from the close nodes to the provided chunk address.
Expand Down Expand Up @@ -816,7 +818,7 @@ impl Network {
key: RecordKey,
data_type: u32,
data_size: usize,
) -> Result<Option<(QuotingMetrics, bool)>> {
) -> Result<(QuotingMetrics, bool)> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics {
key,
Expand All @@ -825,9 +827,10 @@ impl Network {
sender,
});

receiver
let quoting_metrics = receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
.map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
Ok(quoting_metrics)
}

/// Notify the node receicced a payment.
Expand Down Expand Up @@ -998,9 +1001,11 @@ impl Network {
sender,
});

receiver
let is_present = receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
.map_err(|_e| NetworkError::InternalMsgChannelDropped)??;

Ok(is_present)
}

/// Returns the Addresses of all the locally stored Records
Expand All @@ -1010,9 +1015,10 @@ impl Network {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });

receiver
let addrs = receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
.map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
Ok(addrs)
}

/// Send `Request` to the given `PeerId` and await for the response. If `self` is the recipient,
Expand Down
24 changes: 1 addition & 23 deletions ant-networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,29 +933,7 @@ impl RecordStore for NodeRecordStore {

/// A place holder RecordStore impl for the client that does nothing
#[derive(Default, Debug)]
pub struct ClientRecordStore {
empty_record_addresses: HashMap<Key, (NetworkAddress, ValidationType)>,
}

impl ClientRecordStore {
pub(crate) fn contains(&self, _key: &Key) -> bool {
false
}

pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, ValidationType> {
HashMap::new()
}

pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, ValidationType)> {
&self.empty_record_addresses
}

pub(crate) fn put_verified(&mut self, _r: Record, _record_type: ValidationType) -> Result<()> {
Ok(())
}

pub(crate) fn mark_as_stored(&mut self, _r: Key, _t: ValidationType) {}
}
pub struct ClientRecordStore {}

impl RecordStore for ClientRecordStore {
type RecordsIter<'a> = vec::IntoIter<Cow<'a, Record>>;
Expand Down
Loading

0 comments on commit ee235bc

Please sign in to comment.