diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 4e9bccf53d..52e2b93685 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -37,7 +37,6 @@ use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::identity::{ed25519, Keypair}; use subspace_networking::libp2p::Multiaddr; use subspace_networking::utils::piece_provider::PieceProvider; -use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider; use subspace_proof_of_space::Table; use tempfile::TempDir; use tokio::sync::Semaphore; @@ -398,16 +397,10 @@ where kzg.clone(), segment_commitments_cache, )); - let piece_provider = PieceProvider::new(node.clone(), validator.clone()); - let random_walking_piece_provider = - RandomWalkingPieceProvider::new(node.clone(), validator.clone()); - let piece_getter = Arc::new(FarmerPieceGetter::new( - node.clone(), piece_provider, - random_walking_piece_provider, piece_cache.clone(), node_client.clone(), Arc::clone(&readers_and_pieces), diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index 02f00999df..b245a43a09 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -3,25 +3,19 @@ use crate::utils::readers_and_pieces::ReadersAndPieces; use crate::NodeClient; use async_trait::async_trait; use parking_lot::Mutex; -use std::collections::HashSet; use std::error::Error; use std::sync::Arc; use subspace_core_primitives::{Piece, PieceIndex}; use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy}; use subspace_networking::libp2p::kad::RecordKey; -use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy}; -use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider; -use subspace_networking::Node; use tracing::{debug, error, trace}; const MAX_RANDOM_WALK_ROUNDS: usize = 50; pub struct FarmerPieceGetter { - node: Node, piece_provider: PieceProvider, - random_walking_piece_provider: RandomWalkingPieceProvider, piece_cache: PieceCache, node_client: NC, readers_and_pieces: Arc>>, @@ -29,17 +23,13 @@ pub struct FarmerPieceGetter { impl FarmerPieceGetter { pub fn new( - node: Node, piece_provider: PieceProvider, - random_walking_piece_provider: RandomWalkingPieceProvider, piece_cache: PieceCache, node_client: NC, readers_and_pieces: Arc>>, ) -> Self { Self { - node, piece_provider, - random_walking_piece_provider, piece_cache, node_client, readers_and_pieces, @@ -119,47 +109,23 @@ where } // L1 piece acquisition - // TODO: consider using retry policy for L1 lookups as well. - trace!(%piece_index, "Getting piece from DSN L1"); - let connected_peers = HashSet::::from_iter(self.node.connected_peers().await?); - if connected_peers.is_empty() { - debug!(%piece_index, "Cannot acquire piece from DSN L1: no connected peers"); - } else { - for peer_id in connected_peers.iter() { - let maybe_piece = self - .piece_provider - .get_piece_from_peer(*peer_id, piece_index) - .await; - - if maybe_piece.is_some() { - trace!(%piece_index, %peer_id, "DSN L1 lookup succeeded"); - - return Ok(maybe_piece); - } - } - } + trace!(%piece_index, "Getting piece from DSN L1."); - trace!(%piece_index, "Getting piece from DSN L1 using random walk."); - let random_walk_result = self - .random_walking_piece_provider - .get_piece(piece_index, MAX_RANDOM_WALK_ROUNDS) + let archival_storage_search_result = self + .piece_provider + .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS) .await; - if random_walk_result.is_some() { - trace!(%piece_index, "DSN L1 lookup via random walk succeeded"); + if archival_storage_search_result.is_some() { + trace!(%piece_index, "DSN L1 lookup succeeded"); - return Ok(random_walk_result); + return Ok(archival_storage_search_result); } else { - debug!( - %piece_index, - max_rounds=%MAX_RANDOM_WALK_ROUNDS, - "Cannot acquire piece from DSN L1: random walk failed" - ); + debug!(%piece_index, "Cannot acquire piece from DSN L1"); } debug!( %piece_index, - connected_peers=%connected_peers.len(), "Cannot acquire piece: all methods yielded empty result" ); Ok(None) diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 09adf161c6..31ecae37a0 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -2,7 +2,6 @@ pub mod multihash; pub mod piece_provider; -pub mod random_walking_piece_provider; pub(crate) mod rate_limiter; #[cfg(test)] mod tests; diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index ed92f74f04..8fae011dd9 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -7,6 +7,7 @@ use backoff::future::retry; use backoff::ExponentialBackoff; use futures::StreamExt; use libp2p::PeerId; +use std::collections::HashSet; use std::error::Error; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -205,4 +206,134 @@ where None } + + /// Get piece from archival storage (L1). The algorithm tries to get a piece from currently + /// connected peers and falls back to random walking. + pub async fn get_piece_from_archival_storage( + &self, + piece_index: PieceIndex, + max_random_walking_rounds: usize, + ) -> Option { + // TODO: consider using retry policy for L1 lookups as well. + trace!(%piece_index, "Getting piece from archival storage.."); + + let connected_peers = { + let connected_peers = match self.node.connected_peers().await { + Ok(connected_peers) => connected_peers, + Err(err) => { + debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)"); + + Default::default() + } + }; + + HashSet::::from_iter(connected_peers) + }; + + if connected_peers.is_empty() { + debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)"); + } else { + for peer_id in connected_peers.iter() { + let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await; + + if maybe_piece.is_some() { + trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded"); + + return maybe_piece; + } + } + } + + trace!(%piece_index, "Getting piece from DSN L1 using random walk."); + let random_walk_result = self + .get_piece_by_random_walking(piece_index, max_random_walking_rounds) + .await; + + if random_walk_result.is_some() { + trace!(%piece_index, "DSN L1 lookup via random walk succeeded"); + + return random_walk_result; + } else { + debug!( + %piece_index, + %max_random_walking_rounds, + "Cannot acquire piece from DSN L1: random walk failed" + ); + } + + None + } + + /// Get piece from L1 by random walking + async fn get_piece_by_random_walking( + &self, + piece_index: PieceIndex, + walking_rounds: usize, + ) -> Option { + for round in 0..walking_rounds { + debug!(%piece_index, round, "Random walk round"); + + let result = self + .get_piece_by_random_walking_from_single_round(piece_index, round) + .await; + + if result.is_some() { + return result; + } + } + + debug!(%piece_index, "Random walking piece retrieval failed."); + + None + } + + /// Get piece from L1 by random walking (single round) + async fn get_piece_by_random_walking_from_single_round( + &self, + piece_index: PieceIndex, + round: usize, + ) -> Option { + trace!(%piece_index, "get_piece_by_random_walking round"); + + // Random walk key + let key = PeerId::random(); + + let mut request_batch = self.node.get_requests_batch_handle().await; + let get_closest_peers_result = request_batch.get_closest_peers(key.into()).await; + + match get_closest_peers_result { + Ok(mut get_closest_peers_stream) => { + while let Some(peer_id) = get_closest_peers_stream.next().await { + trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item"); + + let request_result = request_batch + .send_generic_request(peer_id, PieceByIndexRequest { piece_index }) + .await; + + match request_result { + Ok(PieceByIndexResponse { piece: Some(piece) }) => { + trace!(%peer_id, %piece_index, ?key, %round, "Piece request succeeded."); + + if let Some(validator) = &self.piece_validator { + return validator.validate_piece(peer_id, piece_index, piece).await; + } else { + return Some(piece); + } + } + Ok(PieceByIndexResponse { piece: None }) => { + debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece."); + } + Err(error) => { + debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed."); + } + } + } + } + Err(err) => { + warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error"); + } + } + + None + } } diff --git a/crates/subspace-networking/src/utils/random_walking_piece_provider.rs b/crates/subspace-networking/src/utils/random_walking_piece_provider.rs deleted file mode 100644 index 7ac4bb7103..0000000000 --- a/crates/subspace-networking/src/utils/random_walking_piece_provider.rs +++ /dev/null @@ -1,90 +0,0 @@ -//! Provides methods to retrieve pieces from DSN L1 via random walk. - -use crate::utils::piece_provider::PieceValidator; -use crate::{Node, PieceByIndexRequest, PieceByIndexResponse}; -use futures::StreamExt; -use libp2p::PeerId; -use subspace_core_primitives::{Piece, PieceIndex}; -use tracing::{debug, trace, warn}; - -/// Piece provider with cancellation and optional piece validator. -pub struct RandomWalkingPieceProvider { - node: Node, - piece_validator: Option, -} - -impl RandomWalkingPieceProvider -where - PV: PieceValidator, -{ - /// Creates new piece provider. - pub fn new(node: Node, piece_validator: Option) -> Self { - Self { - node, - piece_validator, - } - } - - /// Get piece from L1 - pub async fn get_piece(&self, piece_index: PieceIndex, walking_rounds: usize) -> Option { - for round in 0..walking_rounds { - debug!(%piece_index, round, "Random walk round"); - - let result = self.get_piece_by_random_walking(piece_index).await; - - if result.is_some() { - return result; - } - } - - debug!(%piece_index, "Random walking piece retrieval failed."); - - None - } - - /// Get piece from L1 by random walking - async fn get_piece_by_random_walking(&self, piece_index: PieceIndex) -> Option { - trace!(%piece_index, "get_piece_by_random_walking round"); - - // Random walk key - let key = PeerId::random(); - - let mut request_batch = self.node.get_requests_batch_handle().await; - let get_closest_peers_result = request_batch.get_closest_peers(key.into()).await; - - match get_closest_peers_result { - Ok(mut get_closest_peers_stream) => { - while let Some(peer_id) = get_closest_peers_stream.next().await { - trace!(%piece_index, %peer_id, "get_closest_peers returned an item"); - - let request_result = request_batch - .send_generic_request(peer_id, PieceByIndexRequest { piece_index }) - .await; - - match request_result { - Ok(PieceByIndexResponse { piece: Some(piece) }) => { - trace!(%peer_id, %piece_index, ?key, "Piece request succeeded."); - - if let Some(validator) = &self.piece_validator { - return validator.validate_piece(peer_id, piece_index, piece).await; - } else { - return Some(piece); - } - } - Ok(PieceByIndexResponse { piece: None }) => { - debug!(%peer_id, %piece_index, ?key, "Piece request returned empty piece."); - } - Err(error) => { - debug!(%peer_id, %piece_index, ?key, ?error, "Piece request failed."); - } - } - } - } - Err(err) => { - warn!(%piece_index, ?key, ?err, "get_closest_peers returned an error"); - } - } - - None - } -}