Skip to content

Commit

Permalink
chore(node): simplify GetStoreCost flow
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Jan 8, 2024
1 parent bd47019 commit 05da130
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 51 deletions.
15 changes: 13 additions & 2 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub enum SwarmCmd {
},
/// GetLocalStoreCost for this node
GetLocalStoreCost {
key: RecordKey,
sender: oneshot::Sender<NanoTokens>,
},
/// Notify the node received a payment.
Expand Down Expand Up @@ -402,8 +403,18 @@ impl SwarmDriver {
trace!("We now have {} pending get record attempts and cached {total_records} fetched copies",
self.pending_get_record.len());
}
SwarmCmd::GetLocalStoreCost { sender } => {
let cost = self.swarm.behaviour_mut().kademlia.store_mut().store_cost();
SwarmCmd::GetLocalStoreCost { key, sender } => {
let record_exists = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.contains(&key);
let cost = if record_exists {
NanoTokens::zero()
} else {
self.swarm.behaviour_mut().kademlia.store_mut().store_cost()
};

let _res = sender.send(cost);
}
Expand Down
4 changes: 2 additions & 2 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,9 @@ impl Network {
}

/// Get the cost of storing the next record from the network
pub async fn get_local_storecost(&self) -> Result<NanoTokens> {
pub async fn get_local_storecost(&self, key: RecordKey) -> Result<NanoTokens> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetLocalStoreCost { sender })?;
self.send_swarm_cmd(SwarmCmd::GetLocalStoreCost { key, sender })?;

receiver
.await
Expand Down
23 changes: 8 additions & 15 deletions sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,19 +279,12 @@ impl NodeRecordStore {
/// Calculate the cost to store data for our current store state
#[allow(clippy::mutable_key_type)]
pub(crate) fn store_cost(&self) -> NanoTokens {
let relevant_records_len = if let Some(distance_range) = self.distance_range {
let record_keys: HashSet<_> = self.records.keys().cloned().collect();
self.get_records_within_distance_range(&record_keys, distance_range)
} else {
warn!("No distance range set on record store. Returning MAX_RECORDS_COUNT for relevant records in store cost calculation.");
MAX_RECORDS_COUNT
};

let cost =
calculate_cost_for_relevant_records(relevant_records_len, self.received_payment_count);
let stored_records = self.records.len();
let cost = calculate_cost_for_records(stored_records, self.received_payment_count);

// vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues):
info!("Cost is now {cost:?} for {relevant_records_len:?} records stored of {MAX_RECORDS_COUNT:?} max");
info!("Cost is now {cost:?} for {stored_records:?} stored of {MAX_RECORDS_COUNT:?} max, {:?} times got paid.",
self.received_payment_count);
NanoTokens::from(cost)
}

Expand Down Expand Up @@ -524,7 +517,7 @@ impl RecordStore for ClientRecordStore {

// Using a linear growth function, and be tweaked by `received_payment_count`
// to allow nodes receiving too many replication copies can still got paid.
fn calculate_cost_for_relevant_records(step: usize, received_payment_count: usize) -> u64 {
fn calculate_cost_for_records(step: usize, received_payment_count: usize) -> u64 {
use std::cmp::max;

let ori_cost = (10 * step) as u64;
Expand Down Expand Up @@ -920,7 +913,7 @@ mod tests {
for peer in peers_in_replicate_range.iter() {
let entry = peers.entry(*peer).or_insert((0, 0, 0));
if *peer == payee {
let cost = calculate_cost_for_relevant_records(entry.0, entry.2);
let cost = calculate_cost_for_records(entry.0, entry.2);
entry.1 += cost;
entry.2 += 1;
}
Expand All @@ -942,7 +935,7 @@ mod tests {
let mut max_store_cost = 0;

for (_peer_id, stats) in peers.iter() {
let cost = calculate_cost_for_relevant_records(stats.0, stats.2);
let cost = calculate_cost_for_records(stats.0, stats.2);
// println!("{peer_id:?}:{stats:?} with storecost to be {cost}");
received_payment_count += stats.2;
if stats.1 == 0 {
Expand Down Expand Up @@ -1027,7 +1020,7 @@ mod tests {

for peer in peers_in_close {
if let Some(stats) = peers.get(peer) {
let store_cost = calculate_cost_for_relevant_records(stats.0, stats.2);
let store_cost = calculate_cost_for_records(stats.0, stats.2);
if store_cost < cheapest_cost {
cheapest_cost = store_cost;
payee = Some(*peer);
Expand Down
59 changes: 27 additions & 32 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use sn_protocol::{
messages::{ChunkProof, Cmd, CmdResponse, Query, QueryResponse, Response},
NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::{CashNoteRedemption, LocalWallet, MainPubkey, MainSecretKey};
use sn_transfers::{CashNoteRedemption, LocalWallet, MainPubkey, MainSecretKey, NanoTokens};
use std::{
net::SocketAddr,
path::PathBuf,
Expand Down Expand Up @@ -451,39 +451,34 @@ impl Node {
let resp: QueryResponse = match query {
Query::GetStoreCost(address) => {
trace!("Got GetStoreCost request for {address:?}");

let record_exists = match network
.is_record_key_present_locally(&address.to_record_key())
.await
{
Ok(res) => res,
Err(error) => {
error!("Problem getting record key's existence: {error:?}");
false
}
};

if record_exists {
QueryResponse::GetStoreCost {
quote: Err(ProtocolError::RecordExists(
PrettyPrintRecordKey::from(&address.to_record_key()).into_owned(),
)),
payment_address,
peer_address: NetworkAddress::from_peer(network.peer_id),
let record_key = address.to_record_key();
let self_id = network.peer_id;

let store_cost = network.get_local_storecost(record_key.clone()).await;

match store_cost {
Ok(cost) => {
if cost == NanoTokens::zero() {
QueryResponse::GetStoreCost {
quote: Err(ProtocolError::RecordExists(
PrettyPrintRecordKey::from(&record_key).into_owned(),
)),
payment_address,
peer_address: NetworkAddress::from_peer(self_id),
}
} else {
QueryResponse::GetStoreCost {
quote: Self::create_quote_for_storecost(network, cost, &address),
payment_address,
peer_address: NetworkAddress::from_peer(self_id),
}
}
}
} else {
let store_cost = network
.get_local_storecost()
.await
.map_err(|_| ProtocolError::GetStoreCostFailed);

QueryResponse::GetStoreCost {
quote: store_cost.and_then(|cost| {
Self::create_quote_for_storecost(network, cost, &address)
}),
Err(_) => QueryResponse::GetStoreCost {
quote: Err(ProtocolError::GetStoreCostFailed),
payment_address,
peer_address: NetworkAddress::from_peer(network.peer_id),
}
peer_address: NetworkAddress::from_peer(self_id),
},
}
}
Query::GetReplicatedRecord { requester, key } => {
Expand Down

0 comments on commit 05da130

Please sign in to comment.