Skip to content

Commit

Permalink
farmer: Add random walk piece acquisition.
Browse files Browse the repository at this point in the history
  • Loading branch information
shamil-gadelshin committed Dec 18, 2023
1 parent f07d067 commit 1a0038f
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 24 deletions.
22 changes: 13 additions & 9 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::libp2p::identity::{ed25519, Keypair};
use subspace_networking::libp2p::Multiaddr;
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider;
use subspace_proof_of_space::Table;
use tempfile::TempDir;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -390,20 +391,23 @@ where
.map_err(|error| anyhow::anyhow!(error))?;
// TODO: Consider introducing and using global in-memory segment header cache (this comment is
// in multiple files)
let segment_commitments_cache = Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE));
let piece_provider = PieceProvider::new(
let segment_commitments_cache = Arc::new(Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE)));
let validator = Some(SegmentCommitmentPieceValidator::new(
node.clone(),
Some(SegmentCommitmentPieceValidator::new(
node.clone(),
node_client.clone(),
kzg.clone(),
segment_commitments_cache,
)),
);
node_client.clone(),
kzg.clone(),
segment_commitments_cache,
));

let piece_provider = PieceProvider::new(node.clone(), validator.clone());

let random_walking_piece_provider =
RandomWalkingPieceProvider::new(node.clone(), validator.clone());

let piece_getter = Arc::new(FarmerPieceGetter::new(
node.clone(),
piece_provider,
random_walking_piece_provider,
piece_cache.clone(),
node_client.clone(),
Arc::clone(&readers_and_pieces),
Expand Down
48 changes: 35 additions & 13 deletions crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy};
use subspace_networking::utils::random_walking_piece_provider::RandomWalkingPieceProvider;
use subspace_networking::Node;
use tracing::{debug, error, trace};

const MAX_RANDOM_WALK_ROUNDS: usize = 50;

pub struct FarmerPieceGetter<PV, NC> {
node: Node,
piece_provider: PieceProvider<PV>,
random_walking_piece_provider: RandomWalkingPieceProvider<PV>,
piece_cache: PieceCache,
node_client: NC,
readers_and_pieces: Arc<Mutex<Option<ReadersAndPieces>>>,
Expand All @@ -27,13 +31,15 @@ impl<PV, NC> FarmerPieceGetter<PV, NC> {
pub fn new(
node: Node,
piece_provider: PieceProvider<PV>,
random_walking_piece_provider: RandomWalkingPieceProvider<PV>,
piece_cache: PieceCache,
node_client: NC,
readers_and_pieces: Arc<Mutex<Option<ReadersAndPieces>>>,
) -> Self {
Self {
node,
piece_provider,
random_walking_piece_provider,
piece_cache,
node_client,
readers_and_pieces,
Expand Down Expand Up @@ -118,21 +124,37 @@ where
let connected_peers = HashSet::<PeerId>::from_iter(self.node.connected_peers().await?);
if connected_peers.is_empty() {
debug!(%piece_index, "Cannot acquire piece from DSN L1: no connected peers");

return Ok(None);
} else {
for peer_id in connected_peers.iter() {
let maybe_piece = self
.piece_provider
.get_piece_from_peer(*peer_id, piece_index)
.await;

if maybe_piece.is_some() {
trace!(%piece_index, %peer_id, "DSN L1 lookup succeeded");

return Ok(maybe_piece);
}
}
}

for peer_id in connected_peers.iter() {
let maybe_piece = self
.piece_provider
.get_piece_from_peer(*peer_id, piece_index)
.await;

if maybe_piece.is_some() {
trace!(%piece_index, %peer_id, "DSN L1 lookup succeeded");

return Ok(maybe_piece);
}
trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
let random_walk_result = self
.random_walking_piece_provider
.get_piece(piece_index, MAX_RANDOM_WALK_ROUNDS)
.await;

if random_walk_result.is_some() {
trace!(%piece_index, "DSN L1 lookup via random walk succeeded");

return Ok(random_walk_result);
} else {
debug!(
%piece_index,
max_rounds=%MAX_RANDOM_WALK_ROUNDS,
"Cannot acquire piece from DSN L1: random walk failed"
);
}

debug!(
Expand Down
6 changes: 4 additions & 2 deletions crates/subspace-farmer/src/utils/piece_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::NodeClient;
use async_trait::async_trait;
use lru::LruCache;
use parking_lot::Mutex;
use std::sync::Arc;
use subspace_archiving::archiver::is_piece_valid;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{Piece, PieceIndex, SegmentCommitment, SegmentIndex};
Expand All @@ -10,19 +11,20 @@ use subspace_networking::utils::piece_provider::PieceValidator;
use subspace_networking::Node;
use tracing::{error, warn};

#[derive(Clone)]
pub struct SegmentCommitmentPieceValidator<NC> {
dsn_node: Node,
node_client: NC,
kzg: Kzg,
segment_commitment_cache: Mutex<LruCache<SegmentIndex, SegmentCommitment>>,
segment_commitment_cache: Arc<Mutex<LruCache<SegmentIndex, SegmentCommitment>>>,
}

impl<NC> SegmentCommitmentPieceValidator<NC> {
pub fn new(
dsn_node: Node,
node_client: NC,
kzg: Kzg,
segment_commitment_cache: Mutex<LruCache<SegmentIndex, SegmentCommitment>>,
segment_commitment_cache: Arc<Mutex<LruCache<SegmentIndex, SegmentCommitment>>>,
) -> Self {
Self {
dsn_node,
Expand Down

0 comments on commit 1a0038f

Please sign in to comment.