Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support graceful and quick farmer shutdown in the middle of plotting process #2100

Merged
merged 3 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 78 additions & 78 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use subspace_erasure_coding::ErasureCoding;
use subspace_proof_of_space::{Quality, Table, TableGenerator};
use thiserror::Error;
use tokio::sync::Semaphore;
use tokio::task::yield_now;
use tracing::{debug, trace, warn};

const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
Expand Down Expand Up @@ -315,87 +316,86 @@ where

let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);

(PieceOffset::ZERO..)
for ((piece_offset, record), mut encoded_chunks_used) in (PieceOffset::ZERO..)
.zip(raw_sector.records.iter_mut())
.zip(sector_contents_map.iter_record_bitfields_mut())
// TODO: Ideally, we'd use parallelism here, but using `.par_bridge()` causes Chia table
// derivation to only use a single thread, which slows everything to essentially
// single-threaded
.for_each(|((piece_offset, record), mut encoded_chunks_used)| {
// Derive PoSpace table (use parallel mode because multiple tables concurrently will use
// too much RAM)
let pos_table = table_generator.generate_parallel(
&sector_id.derive_evaluation_seed(piece_offset, farmer_protocol_info.history_size),
);

let source_record_chunks = record
.iter()
.map(|scalar_bytes| {
Scalar::try_from(scalar_bytes).expect(
"Piece getter must returns valid pieces of history that contain \
proper scalar bytes; qed",
)
})
.collect::<Vec<_>>();
// Erasure code source record chunks
let parity_record_chunks = erasure_coding.extend(&source_record_chunks).expect(
"Instance was verified to be able to work with this many values earlier; qed",
);

// For every erasure coded chunk check if there is quality present, if so then encode
// with PoSpace quality bytes and set corresponding `quality_present` bit to `true`
let num_successfully_encoded_chunks = (SBucket::ZERO..=SBucket::MAX)
.zip(
source_record_chunks
.iter()
.zip(&parity_record_chunks)
.flat_map(|(a, b)| [a, b]),
{
// Derive PoSpace table (use parallel mode because multiple tables concurrently will use
// too much RAM)
let pos_table = table_generator.generate_parallel(
&sector_id.derive_evaluation_seed(piece_offset, farmer_protocol_info.history_size),
);

let source_record_chunks = record
.iter()
.map(|scalar_bytes| {
Scalar::try_from(scalar_bytes).expect(
"Piece getter must returns valid pieces of history that contain proper \
scalar bytes; qed",
)
.zip(encoded_chunks_used.iter_mut())
.filter_map(|((s_bucket, record_chunk), mut encoded_chunk_used)| {
let quality = pos_table.find_quality(s_bucket.into())?;

*encoded_chunk_used = true;

Some(
Simd::from(record_chunk.to_bytes())
^ Simd::from(quality.create_proof().hash()),
)
})
// Make sure above filter function (and corresponding `encoded_chunk_used` update)
// happen at most as many times as there is number of chunks in the record,
// otherwise `n+1` iterations could happen and update extra `encoded_chunk_used`
// unnecessarily causing issues down the line
.take(record.iter().count())
.zip(record.iter_mut())
// Write encoded chunk back so we can reuse original allocation
.map(|(input_chunk, output_chunk)| {
*output_chunk = input_chunk.to_array();
})
.count();

// In some cases there is not enough PoSpace qualities available, in which case we add
// remaining number of unencoded erasure coded record chunks to the end
source_record_chunks
.iter()
.zip(&parity_record_chunks)
.flat_map(|(a, b)| [a, b])
.zip(encoded_chunks_used.iter())
// Skip chunks that were used previously
.filter_map(|(record_chunk, encoded_chunk_used)| {
if *encoded_chunk_used {
None
} else {
Some(record_chunk)
}
})
// First `num_successfully_encoded_chunks` chunks are encoded
.zip(record.iter_mut().skip(num_successfully_encoded_chunks))
// Write necessary number of unencoded chunks at the end
.for_each(|(input_chunk, output_chunk)| {
*output_chunk = input_chunk.to_bytes();
});
});
})
.collect::<Vec<_>>();
// Erasure code source record chunks
let parity_record_chunks = erasure_coding
.extend(&source_record_chunks)
.expect("Instance was verified to be able to work with this many values earlier; qed");

// For every erasure coded chunk check if there is quality present, if so then encode
// with PoSpace quality bytes and set corresponding `quality_present` bit to `true`
let num_successfully_encoded_chunks = (SBucket::ZERO..=SBucket::MAX)
.zip(
source_record_chunks
.iter()
.zip(&parity_record_chunks)
.flat_map(|(a, b)| [a, b]),
)
.zip(encoded_chunks_used.iter_mut())
.filter_map(|((s_bucket, record_chunk), mut encoded_chunk_used)| {
let quality = pos_table.find_quality(s_bucket.into())?;

*encoded_chunk_used = true;

Some(
Simd::from(record_chunk.to_bytes()) ^ Simd::from(quality.create_proof().hash()),
)
})
// Make sure above filter function (and corresponding `encoded_chunk_used` update)
// happen at most as many times as there is number of chunks in the record,
// otherwise `n+1` iterations could happen and update extra `encoded_chunk_used`
// unnecessarily causing issues down the line
.take(record.iter().count())
.zip(record.iter_mut())
// Write encoded chunk back so we can reuse original allocation
.map(|(input_chunk, output_chunk)| {
*output_chunk = input_chunk.to_array();
})
.count();

// In some cases there is not enough PoSpace qualities available, in which case we add
// remaining number of unencoded erasure coded record chunks to the end
source_record_chunks
.iter()
.zip(&parity_record_chunks)
.flat_map(|(a, b)| [a, b])
.zip(encoded_chunks_used.iter())
// Skip chunks that were used previously
.filter_map(|(record_chunk, encoded_chunk_used)| {
if *encoded_chunk_used {
None
} else {
Some(record_chunk)
}
})
// First `num_successfully_encoded_chunks` chunks are encoded
.zip(record.iter_mut().skip(num_successfully_encoded_chunks))
// Write necessary number of unencoded chunks at the end
.for_each(|(input_chunk, output_chunk)| {
*output_chunk = input_chunk.to_bytes();
});

// Give a chance to interrupt plotting if necessary in between pieces
yield_now().await
}

sector_output.resize(sector_size, 0);
sector_metadata_output.resize(SectorMetadataChecksummed::encoded_size(), 0);
Expand Down
83 changes: 49 additions & 34 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use async_lock::RwLock;
use derive_more::{Display, From};
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::{mpsc, oneshot};
use futures::future::{select, Either};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::{select, FutureExt, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use rayon::prelude::*;
Expand Down Expand Up @@ -873,17 +872,16 @@ impl SingleDiskFarm {
let _tokio_handle_guard = handle.enter();
let _span_guard = span.enter();

// Initial plotting
let initial_plotting_fut = async move {
let plotting_fut = async move {
if start_receiver.recv().await.is_err() {
// Dropped before starting
return Ok(());
return;
}

if let Some(plotting_delay) = plotting_delay {
if plotting_delay.await.is_err() {
// Dropped before resolving
return Ok(());
return;
}
}

Expand All @@ -907,22 +905,30 @@ impl SingleDiskFarm {
encoding_semaphore,
plotting_thread_pool,
replotting_thread_pool,
stop_receiver: stop_receiver.resubscribe(),
};
plotting::<_, _, PosTable>(plotting_options).await
};

let initial_plotting_result = handle.block_on(select(
Box::pin(initial_plotting_fut),
Box::pin(stop_receiver.recv()),
));
let plotting_fut = plotting::<_, _, PosTable>(plotting_options);

if let Either::Left((Err(error), _)) = initial_plotting_result {
if let Some(error_sender) = error_sender.lock().take() {
if let Err(error) = error_sender.send(error.into()) {
error!(%error, "Plotting failed to send error to background task");
select! {
plotting_result = plotting_fut.fuse() => {
if let Err(error) = plotting_result
&& let Some(error_sender) = error_sender.lock().take()
&& let Err(error) = error_sender.send(error.into())
{
error!(
%error,
"Plotting failed to send error to background task"
);
}
}
_ = stop_receiver.recv().fuse() => {
// Nothing, just exit
}
}
}
};

handle.block_on(plotting_fut);
}
})?;

Expand Down Expand Up @@ -1015,21 +1021,24 @@ impl SingleDiskFarm {
farming::<PosTable, _>(farming_options).await
};

let farming_result = handle.block_on(select(
Box::pin(farming_fut),
Box::pin(stop_receiver.recv()),
));

if let Either::Left((Err(error), _)) = farming_result {
if let Some(error_sender) = error_sender.lock().take() {
if let Err(error) = error_sender.send(error.into()) {
error!(
%error,
"Farming failed to send error to background task",
);
handle.block_on(async {
select! {
farming_result = farming_fut.fuse() => {
if let Err(error) = farming_result
&& let Some(error_sender) = error_sender.lock().take()
&& let Err(error) = error_sender.send(error.into())
{
error!(
%error,
"Farming failed to send error to background task",
);
}
}
_ = stop_receiver.recv().fuse() => {
// Nothing, just exit
}
}
}
});
})
}
})?;
Expand All @@ -1052,10 +1061,16 @@ impl SingleDiskFarm {
move || {
let _tokio_handle_guard = handle.enter();

handle.block_on(select(
Box::pin(reading_fut),
Box::pin(stop_receiver.recv()),
));
handle.block_on(async {
select! {
_ = reading_fut.fuse() => {
// Nothing, just exit
}
_ = stop_receiver.recv().fuse() => {
// Nothing, just exit
}
}
});
}
})?;

Expand Down
37 changes: 28 additions & 9 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use subspace_farmer_components::sector::SectorMetadataChecksummed;
use subspace_proof_of_space::Table;
use thiserror::Error;
use tokio::runtime::Handle;
use tokio::sync::Semaphore;
use tokio::sync::{broadcast, Semaphore};
use tracing::{debug, info, trace, warn};

const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
Expand Down Expand Up @@ -78,6 +78,9 @@ pub enum PlottingError {
/// Lower-level error
error: node_client::Error,
},
/// Farm is shutting down
#[error("Farm is shutting down")]
FarmIsShuttingDown,
/// I/O error occurred
#[error("I/O error: {0}")]
Io(#[from] io::Error),
Expand Down Expand Up @@ -110,6 +113,7 @@ pub(super) struct PlottingOptions<NC, PG> {
pub(crate) encoding_semaphore: Arc<Semaphore>,
pub(super) plotting_thread_pool: Arc<ThreadPool>,
pub(super) replotting_thread_pool: Arc<ThreadPool>,
pub(super) stop_receiver: broadcast::Receiver<()>,
}

/// Starts plotting process.
Expand Down Expand Up @@ -144,6 +148,7 @@ where
encoding_semaphore,
plotting_thread_pool,
replotting_thread_pool,
stop_receiver,
} = plotting_options;

let mut table_generator = PosTable::generator();
Expand Down Expand Up @@ -222,6 +227,7 @@ where
let erasure_coding = erasure_coding.clone();
let downloading_semaphore = Arc::clone(&downloading_semaphore);
let encoding_semaphore = Arc::clone(&encoding_semaphore);
let mut stop_receiver = stop_receiver.resubscribe();

let plotting_fn = move || {
tokio::task::block_in_place(move || {
Expand All @@ -243,19 +249,32 @@ where
table_generator: &mut table_generator,
});

Handle::current()
.block_on(plot_sector_fut)
.map(|plotted_sector| {
(sector, sector_metadata, table_generator, plotted_sector)
})
let plotted_sector = Handle::current().block_on(async {
select! {
plotting_result = Box::pin(plot_sector_fut).fuse() => {
plotting_result.map_err(PlottingError::from)
}
_ = stop_receiver.recv().fuse() => {
Err(PlottingError::FarmIsShuttingDown)
}
}
})?;

Ok((sector, sector_metadata, table_generator, plotted_sector))
})
};

if replotting {
replotting_thread_pool.install(plotting_fn)?
let plotting_result = if replotting {
replotting_thread_pool.install(plotting_fn)
} else {
plotting_thread_pool.install(plotting_fn)?
plotting_thread_pool.install(plotting_fn)
};

if matches!(plotting_result, Err(PlottingError::FarmIsShuttingDown)) {
return Ok(());
}

plotting_result?
};

plot_file.write_all_at(&sector, (sector_index as usize * sector_size) as u64)?;
Expand Down
Loading