Skip to content

Commit

Permalink
feat: pick cheapest payee using linear pricing curve
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Jan 2, 2024
1 parent d75a19f commit e815224
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 88 deletions.
19 changes: 18 additions & 1 deletion sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub enum SwarmCmd {
GetLocalStoreCost {
sender: oneshot::Sender<NanoTokens>,
},
/// Notify the node received a payment.
PaymentReceived,
/// Get data from the local RecordStore
GetLocalRecord {
key: RecordKey,
Expand Down Expand Up @@ -248,6 +250,9 @@ impl Debug for SwarmCmd {
SwarmCmd::GetLocalStoreCost { .. } => {
write!(f, "SwarmCmd::GetLocalStoreCost")
}
SwarmCmd::PaymentReceived => {
write!(f, "SwarmCmd::PaymentReceived")
}
SwarmCmd::GetLocalRecord { key, .. } => {
write!(
f,
Expand Down Expand Up @@ -333,7 +338,12 @@ impl SwarmDriver {
if self.is_in_close_range(key, &closest_k_peers) {
Some((key.clone(), record_type.clone()))
} else {
warn!("not in close range for key {key:?}");
// Reduce the log level as there will always be around 40% records being
// out of the close range, as the sender side is using `CLOSE_GROUP_SIZE + 2`
// to send our replication list to provide addressing margin.
// Given there will normally be 6 nodes sending such list with interval of 5-10s,
// this will accumulate to a lot of logs with the increasing records uploaded.
trace!("not in close range for key {key:?}");
None
}
})
Expand Down Expand Up @@ -397,6 +407,13 @@ impl SwarmDriver {

let _res = sender.send(cost);
}
SwarmCmd::PaymentReceived => {
self.swarm
.behaviour_mut()
.kademlia
.store_mut()
.payment_received();
}
SwarmCmd::GetLocalRecord { key, sender } => {
let record = self
.swarm
Expand Down
31 changes: 22 additions & 9 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use libp2p::{
multiaddr::Protocol,
Multiaddr, PeerId,
};
use rand::{seq::SliceRandom, Rng};
use rand::Rng;
use sn_protocol::{
error::Error as ProtocolError,
messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
Expand Down Expand Up @@ -475,6 +475,12 @@ impl Network {
.map_err(|_e| Error::InternalMsgChannelDropped)
}

/// Notify the node receicced a payment.
pub fn notify_payment_received(&self) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::PaymentReceived)?;
Ok(())
}

/// Get `Record` from the local RecordStore
pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
let (sender, receiver) = oneshot::channel();
Expand Down Expand Up @@ -860,16 +866,25 @@ impl Network {
}
}

/// Given `all_costs` it will return a random one to be selected as the payee.
/// Given `all_costs` it will return the closest / lowest cost
/// Closest requiring it to be within CLOSE_GROUP nodes
fn get_fees_from_store_cost_responses(
mut all_costs: Vec<(NetworkAddress, MainPubkey, PaymentQuote)>,
) -> Result<(PeerId, MainPubkey, PaymentQuote)> {
// sort all costs by fee, lowest to highest
// if there's a tie in cost, sort by pubkey
all_costs.sort_by(
|(address_a, _main_key_a, cost_a), (address_b, _main_key_b, cost_b)| match cost_a
.cost
.cmp(&cost_b.cost)
{
std::cmp::Ordering::Equal => address_a.cmp(address_b),
other => other,
},
);

// get the lowest cost
trace!("Got all costs: {all_costs:?}");
// Random shuffle the all_costs, so that nodes with high charge due to replication still
// get chance to be selected. Also avoid previllage of nodes with `sided addresses`.
let mut rng = rand::thread_rng();
all_costs.shuffle(&mut rng);

let payee = all_costs
.into_iter()
.next()
Expand Down Expand Up @@ -939,7 +954,6 @@ mod tests {
use super::*;
use sn_transfers::PaymentQuote;

#[ignore = "Payee is now randomly selected"]
#[test]
fn test_get_fee_from_store_cost_responses() -> Result<()> {
// for a vec of different costs of CLOSE_GROUP size
Expand All @@ -965,7 +979,6 @@ mod tests {
Ok(())
}

#[ignore = "Payee is now randomly selected"]
#[test]
fn test_get_some_fee_from_store_cost_responses_even_if_one_errs_and_sufficient(
) -> eyre::Result<()> {
Expand Down
113 changes: 35 additions & 78 deletions sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct NodeRecordStore {
#[cfg(feature = "open-metrics")]
/// Used to report the number of records held by the store to the metrics server.
record_count_metric: Option<Gauge>,
/// Counting how many times got paid
received_payment_count: usize,
}

/// Configuration for a `DiskBackedRecordStore`.
Expand Down Expand Up @@ -89,6 +91,7 @@ impl NodeRecordStore {
distance_range: None,
#[cfg(feature = "open-metrics")]
record_count_metric: None,
received_payment_count: 0,
}
}

Expand Down Expand Up @@ -284,13 +287,19 @@ impl NodeRecordStore {
MAX_RECORDS_COUNT
};

let cost = calculate_cost_for_relevant_records(relevant_records_len);
let cost =
calculate_cost_for_relevant_records(relevant_records_len, self.received_payment_count);

// vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues):
info!("Cost is now {cost:?} for {relevant_records_len:?} records stored of {MAX_RECORDS_COUNT:?} max");
NanoTokens::from(cost)
}

/// Notify the node received a payment.
pub(crate) fn payment_received(&mut self) {
self.received_payment_count = self.received_payment_count.saturating_add(1);
}

/// Calculate how many records are stored within a distance range
#[allow(clippy::mutable_key_type)]
pub fn get_records_within_distance_range(
Expand Down Expand Up @@ -513,36 +522,19 @@ impl RecordStore for ClientRecordStore {
fn remove_provider(&mut self, _key: &Key, _provider: &PeerId) {}
}

/// Cost calculator that increases cost nearing the maximum (MAX_RECORDS_COUNT (2048 at moment of writing)).
/// Table:
/// 1 = 0.000000010
/// 2 = 0.000000010
/// 4 = 0.000000011
/// 8 = 0.000000012
/// 16 = 0.000000014
/// 32 = 0.000000018
/// 64 = 0.000000033
/// 128 = 0.000000111
/// 256 = 0.000001238
/// 512 = 0.000153173
/// 1024 = 2.346196716
/// 1280 = 290.372529764
/// 1536 = 35937.398370712
/// 1792 = 4447723.077333529
/// 2048 = 550463903.051128626 (about 13% of TOTAL_SUPPLY at moment of writing)
fn calculate_cost_for_relevant_records(step: usize) -> u64 {
// Using a linear growth function, and be tweaked by `received_payment_count`
// to allow nodes receiving too many replication copies can still got paid.
fn calculate_cost_for_relevant_records(step: usize, received_payment_count: usize) -> u64 {
use std::cmp::max;

assert!(
step <= MAX_RECORDS_COUNT,
"step must be <= MAX_RECORDS_COUNT"
);

// Using an exponential growth function: y = ab^x. Here, a is the starting cost and b is the growth factor.
// We want a function that starts with a low cost and only ramps up once we get closer to the maximum.
let a = 0.000_000_010_f64; // This is the starting cost, starting at 10 nanos.
let b = 1.019_f64; // This is a hand-picked number; a low growth factor keeping the cost low for long.
let y = a * b.powf(step as f64);

(y * 1_000_000_000_f64) as u64
let ori_cost = (10 * step) as u64;
let divider = max(1, step / max(1, received_payment_count)) as u64;
max(10, ori_cost / divider)
}

#[allow(trivial_casts)]
Expand Down Expand Up @@ -887,10 +879,9 @@ mod tests {
Ok(())
}

#[ignore = "Too time consuming. Only to be used to simulate affects of different payee slection algorithm."]
#[test]
fn address_distribution_sim() {
// Map of peers and correspondent stats of `(num_of_records, Nano_earned, earned_times)`.
// Map of peers and correspondent stats of `(num_of_records, Nano_earned, received_payment_count)`.
let mut peers: HashMap<PeerId, (usize, u64, usize)> = Default::default();
let mut peers_vec = vec![];

Expand All @@ -904,7 +895,7 @@ mod tests {
}

let mut iteration = 0;
let mut total_earned_times = 0;
let mut total_received_payment_count = 0;

loop {
for _ in 0..num_of_chunks_per_itr {
Expand All @@ -929,13 +920,12 @@ mod tests {
}
};

let payee = pick_payee_randomly(peers_in_close);
// let payee = pick_cheapest_payee(&peers_in_close, &peers);
let payee = pick_cheapest_payee(&peers_in_close, &peers);

for peer in peers_in_replicate_range.iter() {
let entry = peers.entry(*peer).or_insert((0, 0, 0));
if *peer == payee {
let cost = calculate_cost(entry.0, entry.2);
let cost = calculate_cost_for_relevant_records(entry.0, entry.2);
entry.1 += cost;
entry.2 += 1;
}
Expand All @@ -948,7 +938,7 @@ mod tests {
}
}

let mut earned_times = 0;
let mut received_payment_count = 0;
let mut empty_earned_nodes = 0;

let mut min_earned = u64::MAX;
Expand All @@ -957,9 +947,9 @@ mod tests {
let mut max_store_cost = 0;

for (_peer_id, stats) in peers.iter() {
let cost = calculate_cost(stats.0, stats.2);
let cost = calculate_cost_for_relevant_records(stats.0, stats.2);
// println!("{peer_id:?}:{stats:?} with storecost to be {cost}");
earned_times += stats.2;
received_payment_count += stats.2;
if stats.1 == 0 {
empty_earned_nodes += 1;
}
Expand All @@ -978,29 +968,26 @@ mod tests {
}
}

total_earned_times += num_of_chunks_per_itr;
assert_eq!(total_earned_times, earned_times);
total_received_payment_count += num_of_chunks_per_itr;
assert_eq!(total_received_payment_count, received_payment_count);

println!("After the completion of {iteration} with {num_of_chunks_per_itr} chunks, there is still {empty_earned_nodes} nodes earned nothing");
println!("\t\t with storecost variation of (min {min_store_cost} - max {max_store_cost}), and earned variation of (min {min_earned} - max {max_earned})");

if empty_earned_nodes == 0 {
iteration += 1;

// Execute for 50 iterations, which allows the test can be executed in normal CI runs.
if iteration == 50 {
assert_eq!(0, empty_earned_nodes);
assert!((max_store_cost / min_store_cost) < 40);
assert!((max_earned / min_earned) < 400);
break;
}
iteration += 1;
}

// log_chunks_distribution(&peers);
}

fn calculate_cost(relevant_records: usize, earned_times: usize) -> u64 {
use std::cmp::max;

let ori_cost = calculate_cost_for_relevant_records(relevant_records);
let divident = max(1, relevant_records / max(1, earned_times)) as u64;
max(10, ori_cost / divident)
}

// Split nodes into groups based on its kBucketKey's leading byte of hashed_bytes.
// This will result in 256 groups, and collect number of nodes and chunks fell into.
#[allow(dead_code)]
Expand All @@ -1024,35 +1011,6 @@ mod tests {
}
}

// num_of_peers = 2000
// After the completion of 0 with 2000 chunks, there is still 786 nodes earned nothing
// After the completion of 1 with 2000 chunks, there is still 356 nodes earned nothing
// After the completion of 2 with 2000 chunks, there is still 162 nodes earned nothing
// After the completion of 3 with 2000 chunks, there is still 73 nodes earned nothing
// After the completion of 4 with 2000 chunks, there is still 36 nodes earned nothing
// After the completion of 5 with 2000 chunks, there is still 23 nodes earned nothing
// After the completion of 6 with 2000 chunks, there is still 15 nodes earned nothing
// After the completion of 7 with 2000 chunks, there is still 9 nodes earned nothing
// After the completion of 8 with 2000 chunks, there is still 7 nodes earned nothing
// After the completion of 9 with 2000 chunks, there is still 3 nodes earned nothing
// After the completion of 10 with 2000 chunks, there is still 3 nodes earned nothing
// After the completion of 11 with 2000 chunks, there is still 2 nodes earned nothing
// After the completion of 12 with 2000 chunks, there is still 2 nodes earned nothing
// After the completion of 13 with 2000 chunks, there is still 2 nodes earned nothing
// After the completion of 14 with 2000 chunks, there is still 1 nodes earned nothing
// After the completion of 15 with 2000 chunks, there is still 0 nodes earned nothing
fn pick_payee_randomly(mut peers_in_close: Vec<PeerId>) -> PeerId {
use rand::{prelude::SliceRandom, thread_rng};

let mut rng = thread_rng();
peers_in_close.shuffle(&mut rng);
if let Some(payee) = peers_in_close.first() {
*payee
} else {
panic!("Cann't find payee among {peers_in_close:?}");
}
}

// After the completion of 0 with 2000 chunks, there is still 875 nodes earned nothing
// After the completion of 1 with 2000 chunks, there is still 475 nodes earned nothing
// After the completion of 2 with 2000 chunks, there is still 314 nodes earned nothing
Expand All @@ -1065,7 +1023,6 @@ mod tests {
// After the completion of 119 with 2000 chunks, there is still 56 nodes earned nothing
// After the completion of 120 with 2000 chunks, there is still 56 nodes earned nothing
// After the completion of 121 with 2000 chunks, there is still 56 nodes earned nothing
#[allow(dead_code)]
fn pick_cheapest_payee(
peers_in_close: &Vec<PeerId>,
peers: &HashMap<PeerId, (usize, u64, usize)>,
Expand All @@ -1075,7 +1032,7 @@ mod tests {

for peer in peers_in_close {
if let Some(stats) = peers.get(peer) {
let store_cost = calculate_cost(stats.0, stats.2);
let store_cost = calculate_cost_for_relevant_records(stats.0, stats.2);
if store_cost < cheapest_cost {
cheapest_cost = store_cost;
payee = Some(*peer);
Expand Down
9 changes: 9 additions & 0 deletions sn_networking/src/record_store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ impl UnifiedRecordStore {
}
}

pub(crate) fn payment_received(&mut self) {
match self {
Self::Client(_) => {
warn!("Calling payment_received at Client. This should not happen");
}
Self::Node(store) => store.payment_received(),
}
}

pub(crate) fn set_distance_range(&mut self, distance_range: Distance) {
match self {
Self::Client(store) => store.set_distance_range(distance_range),
Expand Down
3 changes: 3 additions & 0 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ impl Node {

trace!("Received payment of {received_fee:?} for {pretty_key}");

// Notify `record_store` that the node received a payment.
let _ = self.network.notify_payment_received();

// deposit the CashNotes in our wallet
wallet.deposit_and_store_to_disk(&cash_notes)?;
let new_balance = wallet.balance().as_nano();
Expand Down

0 comments on commit e815224

Please sign in to comment.