Skip to content

Commit

Permalink
Refactor piece acquisition from archival storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
shamil-gadelshin committed Dec 20, 2023
1 parent 1a0038f commit 9d6a432
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::libp2p::identity::{ed25519, Keypair};
use subspace_networking::libp2p::Multiaddr;
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider;
use subspace_proof_of_space::Table;
use tempfile::TempDir;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -398,16 +397,10 @@ where
kzg.clone(),
segment_commitments_cache,
));

let piece_provider = PieceProvider::new(node.clone(), validator.clone());

let random_walking_piece_provider =
RandomWalkingPieceProvider::new(node.clone(), validator.clone());

let piece_getter = Arc::new(FarmerPieceGetter::new(
node.clone(),
piece_provider,
random_walking_piece_provider,
piece_cache.clone(),
node_client.clone(),
Arc::clone(&readers_and_pieces),
Expand Down
50 changes: 8 additions & 42 deletions crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,33 @@ use crate::utils::readers_and_pieces::ReadersAndPieces;
use crate::NodeClient;
use async_trait::async_trait;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::error::Error;
use std::sync::Arc;
use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy};
use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider;
use subspace_networking::Node;
use tracing::{debug, error, trace};

const MAX_RANDOM_WALK_ROUNDS: usize = 50;

pub struct FarmerPieceGetter<PV, NC> {
node: Node,
piece_provider: PieceProvider<PV>,
random_walking_piece_provider: RandomWalkingPieceProvider<PV>,
piece_cache: PieceCache,
node_client: NC,
readers_and_pieces: Arc<Mutex<Option<ReadersAndPieces>>>,
}

impl<PV, NC> FarmerPieceGetter<PV, NC> {
pub fn new(
node: Node,
piece_provider: PieceProvider<PV>,
random_walking_piece_provider: RandomWalkingPieceProvider<PV>,
piece_cache: PieceCache,
node_client: NC,
readers_and_pieces: Arc<Mutex<Option<ReadersAndPieces>>>,
) -> Self {
Self {
node,
piece_provider,
random_walking_piece_provider,
piece_cache,
node_client,
readers_and_pieces,
Expand Down Expand Up @@ -119,47 +109,23 @@ where
}

// L1 piece acquisition
// TODO: consider using retry policy for L1 lookups as well.
trace!(%piece_index, "Getting piece from DSN L1");
let connected_peers = HashSet::<PeerId>::from_iter(self.node.connected_peers().await?);
if connected_peers.is_empty() {
debug!(%piece_index, "Cannot acquire piece from DSN L1: no connected peers");
} else {
for peer_id in connected_peers.iter() {
let maybe_piece = self
.piece_provider
.get_piece_from_peer(*peer_id, piece_index)
.await;

if maybe_piece.is_some() {
trace!(%piece_index, %peer_id, "DSN L1 lookup succeeded");

return Ok(maybe_piece);
}
}
}
trace!(%piece_index, "Getting piece from DSN L1.");

trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
let random_walk_result = self
.random_walking_piece_provider
.get_piece(piece_index, MAX_RANDOM_WALK_ROUNDS)
let archival_storage_search_result = self
.piece_provider
.get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
.await;

if random_walk_result.is_some() {
trace!(%piece_index, "DSN L1 lookup via random walk succeeded");
if archival_storage_search_result.is_some() {
trace!(%piece_index, "DSN L1 lookup succeeded");

return Ok(random_walk_result);
return Ok(archival_storage_search_result);
} else {
debug!(
%piece_index,
max_rounds=%MAX_RANDOM_WALK_ROUNDS,
"Cannot acquire piece from DSN L1: random walk failed"
);
debug!(%piece_index, "Cannot acquire piece from DSN L1");
}

debug!(
%piece_index,
connected_peers=%connected_peers.len(),
"Cannot acquire piece: all methods yielded empty result"
);
Ok(None)
Expand Down
1 change: 0 additions & 1 deletion crates/subspace-networking/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
pub mod multihash;
pub mod piece_provider;
pub mod random_walking_piece_provider;
pub(crate) mod rate_limiter;
#[cfg(test)]
mod tests;
Expand Down
131 changes: 131 additions & 0 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use backoff::future::retry;
use backoff::ExponentialBackoff;
use futures::StreamExt;
use libp2p::PeerId;
use std::collections::HashSet;
use std::error::Error;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
Expand Down Expand Up @@ -205,4 +206,134 @@ where

None
}

/// Get piece from archival storage (L1). The algorithm tries to get a piece from currently
/// connected peers and falls back to random walking.
pub async fn get_piece_from_archival_storage(
&self,
piece_index: PieceIndex,
max_random_walking_rounds: usize,
) -> Option<Piece> {
// TODO: consider using retry policy for L1 lookups as well.
trace!(%piece_index, "Getting piece from archival storage..");

let connected_peers = {
let connected_peers = match self.node.connected_peers().await {
Ok(connected_peers) => connected_peers,
Err(err) => {
debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)");

Default::default()
}
};

HashSet::<PeerId>::from_iter(connected_peers)
};

if connected_peers.is_empty() {
debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)");
} else {
for peer_id in connected_peers.iter() {
let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await;

if maybe_piece.is_some() {
trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded");

return maybe_piece;
}
}
}

trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
let random_walk_result = self
.get_piece_by_random_walking(piece_index, max_random_walking_rounds)
.await;

if random_walk_result.is_some() {
trace!(%piece_index, "DSN L1 lookup via random walk succeeded");

return random_walk_result;
} else {
debug!(
%piece_index,
%max_random_walking_rounds,
"Cannot acquire piece from DSN L1: random walk failed"
);
}

None
}

/// Get piece from L1 by random walking
async fn get_piece_by_random_walking(
&self,
piece_index: PieceIndex,
walking_rounds: usize,
) -> Option<Piece> {
for round in 0..walking_rounds {
debug!(%piece_index, round, "Random walk round");

let result = self
.get_piece_by_random_walking_from_single_round(piece_index, round)
.await;

if result.is_some() {
return result;
}
}

debug!(%piece_index, "Random walking piece retrieval failed.");

None
}

/// Get piece from L1 by random walking (single round)
async fn get_piece_by_random_walking_from_single_round(
&self,
piece_index: PieceIndex,
round: usize,
) -> Option<Piece> {
trace!(%piece_index, "get_piece_by_random_walking round");

// Random walk key
let key = PeerId::random();

let mut request_batch = self.node.get_requests_batch_handle().await;
let get_closest_peers_result = request_batch.get_closest_peers(key.into()).await;

match get_closest_peers_result {
Ok(mut get_closest_peers_stream) => {
while let Some(peer_id) = get_closest_peers_stream.next().await {
trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item");

let request_result = request_batch
.send_generic_request(peer_id, PieceByIndexRequest { piece_index })
.await;

match request_result {
Ok(PieceByIndexResponse { piece: Some(piece) }) => {
trace!(%peer_id, %piece_index, ?key, %round, "Piece request succeeded.");

if let Some(validator) = &self.piece_validator {
return validator.validate_piece(peer_id, piece_index, piece).await;
} else {
return Some(piece);
}
}
Ok(PieceByIndexResponse { piece: None }) => {
debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece.");
}
Err(error) => {
debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed.");
}
}
}
}
Err(err) => {
warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error");
}
}

None
}
}

This file was deleted.

0 comments on commit 9d6a432

Please sign in to comment.