Skip to content

Commit

Permalink
Delay farming to after initial plotting
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Oct 14, 2023
1 parent 014ee55 commit 155e05e
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ where
metrics_endpoints,
sector_downloading_concurrency,
sector_encoding_concurrency,
farm_during_initial_plotting,
farming_thread_pool_size,
plotting_thread_pool_size,
replotting_thread_pool_size,
Expand Down Expand Up @@ -241,6 +242,7 @@ where
plotting_thread_pool: Arc::clone(&plotting_thread_pool),
replotting_thread_pool: Arc::clone(&replotting_thread_pool),
plotting_delay: Some(plotting_delay_receiver),
farm_during_initial_plotting,
},
disk_farm_index,
);
Expand Down
6 changes: 6 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ struct FarmingArgs {
/// more than 1 because it will most likely result in slower plotting overall
#[arg(long, default_value = "1")]
sector_encoding_concurrency: NonZeroUsize,
/// Allows to enable farming during initial plotting. Not used by default because plotting is so
/// intense on CPU and memory that farming will likely not work properly, yet it will
/// significantly impact plotting speed, delaying the time when farming can actually work
/// properly.
#[arg(long)]
farm_during_initial_plotting: bool,
/// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some
/// compute-intensive operations during proving), defaults to number of CPU cores available in
/// the system
Expand Down
18 changes: 18 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ pub struct SingleDiskFarmOptions<NC, PG> {
/// Notification for plotter to start, can be used to delay plotting until some initialization
/// has happened externally
pub plotting_delay: Option<oneshot::Receiver<()>>,
/// Whether to farm during initial plotting
pub farm_during_initial_plotting: bool,
}

/// Errors happening when trying to create/open single disk farm
Expand Down Expand Up @@ -594,6 +596,7 @@ impl SingleDiskFarm {
plotting_thread_pool,
replotting_thread_pool,
plotting_delay,
farm_during_initial_plotting,
} = options;
fs::create_dir_all(&directory)?;

Expand Down Expand Up @@ -853,6 +856,13 @@ impl SingleDiskFarm {
let sectors_indices_left_to_plot =
metadata_header.plotted_sector_count..target_sector_count;

let (farming_delay_sender, delay_farmer_receiver) = if farm_during_initial_plotting {
(None, None)
} else {
let (sender, receiver) = oneshot::channel();
(Some(sender), Some(receiver))
};

let span = info_span!("single_disk_farm", %disk_farm_index);

let plotting_join_handle = thread::Builder::new()
Expand Down Expand Up @@ -935,6 +945,7 @@ impl SingleDiskFarm {
node_client: node_client.clone(),
sectors_metadata: Arc::clone(&sectors_metadata),
sectors_to_plot_sender,
initial_plotting_finished: farming_delay_sender,
};
tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));

Expand Down Expand Up @@ -999,6 +1010,13 @@ impl SingleDiskFarm {
return Ok(());
}

if let Some(farming_delay) = delay_farmer_receiver {
if farming_delay.await.is_err() {
// Dropped before resolving
return Ok(());
}
}

let farming_options = FarmingOptions {
public_key,
reward_address,
Expand Down
9 changes: 9 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ pub(super) struct PlottingSchedulerOptions<NC> {
pub(super) node_client: NC,
pub(super) sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
pub(super) initial_plotting_finished: Option<oneshot::Sender<()>>,
}

pub(super) async fn plotting_scheduler<NC>(
Expand All @@ -354,6 +355,7 @@ where
node_client,
sectors_metadata,
sectors_to_plot_sender,
initial_plotting_finished,
} = plotting_scheduler_options;

// Create a proxy channel with atomically updatable last archived segment that
Expand Down Expand Up @@ -400,6 +402,7 @@ where
&last_archived_segment,
archived_segments_receiver,
sectors_to_plot_proxy_sender,
initial_plotting_finished,
);

select! {
Expand Down Expand Up @@ -534,6 +537,7 @@ async fn send_plotting_notifications<NC>(
last_archived_segment: &Atomic<SegmentHeader>,
mut archived_segments_receiver: mpsc::Receiver<()>,
mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
initial_plotting_finished: Option<oneshot::Sender<()>>,
) -> Result<(), BackgroundTaskError>
where
NC: NodeClient,
Expand All @@ -558,6 +562,11 @@ where
let _ = acknowledgement_receiver.await;
}

if let Some(initial_plotting_finished) = initial_plotting_finished {
// Doesn't matter if receiver is still around
let _ = initial_plotting_finished.send(());
}

let mut sectors_expire_at = HashMap::with_capacity(usize::from(target_sector_count));

let mut sector_indices_to_replot = Vec::new();
Expand Down

0 comments on commit 155e05e

Please sign in to comment.