Skip to content

Commit

Permalink
Merge pull request #2271 from subspace/sc-consensus-subspace-refactoring
Browse files Browse the repository at this point in the history
sc-consensus-subspace refactoring
  • Loading branch information
nazar-pc authored Nov 27, 2023
2 parents 13d714f + 45a8479 commit 2d374fe
Show file tree
Hide file tree
Showing 18 changed files with 1,293 additions and 1,368 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 2 additions & 21 deletions crates/pallet-subspace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ use sp_consensus_subspace::consensus::{is_proof_of_time_valid, verify_solution};
use sp_consensus_subspace::digests::CompatibleDigestItem;
use sp_consensus_subspace::offence::{OffenceDetails, OffenceError, OnOffenceHandler};
use sp_consensus_subspace::{
ChainConstants, EquivocationProof, FarmerPublicKey, FarmerSignature, PotParameters,
PotParametersChange, SignedVote, Vote, WrappedPotOutput,
EquivocationProof, FarmerPublicKey, FarmerSignature, PotParameters, PotParametersChange,
SignedVote, Vote, WrappedPotOutput,
};
use sp_runtime::generic::DigestItem;
use sp_runtime::traits::{BlockNumberProvider, CheckedSub, Hash, One, Zero};
Expand Down Expand Up @@ -1142,25 +1142,6 @@ impl<T: Config> Pallet<T> {

u64::from(archived_segments) * ArchivedHistorySegment::SIZE as u64
}

pub fn chain_constants() -> ChainConstants {
ChainConstants::V0 {
confirmation_depth_k: T::ConfirmationDepthK::get()
.try_into()
.unwrap_or_else(|_| panic!("Block number always fits in BlockNumber; qed")),
block_authoring_delay: T::BlockAuthoringDelay::get(),
era_duration: T::EraDuration::get()
.try_into()
.unwrap_or_else(|_| panic!("Block number always fits in BlockNumber; qed")),
slot_probability: T::SlotProbability::get(),
recent_segments: T::RecentSegments::get(),
recent_history_fraction: (
T::RecentHistoryFraction::get().0,
T::RecentHistoryFraction::get().1,
),
min_sector_lifetime: T::MinSectorLifetime::get(),
}
}
}

impl<T> Pallet<T>
Expand Down
12 changes: 7 additions & 5 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use lru::LruCache;
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use sc_client_api::{AuxStore, BlockBackend};
use sc_consensus_subspace::archiver::{recreate_genesis_segment, SegmentHeadersStore};
use sc_consensus_subspace::archiver::{
recreate_genesis_segment, ArchivedSegmentNotification, SegmentHeadersStore,
};
use sc_consensus_subspace::notification::SubspaceNotificationStream;
use sc_consensus_subspace::{
ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle,
use sc_consensus_subspace::slot_worker::{
NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle,
};
use sc_rpc::{DenyUnsafe, SubscriptionTaskExecutor};
use sc_utils::mpsc::TracingUnboundedSender;
Expand Down Expand Up @@ -301,7 +303,6 @@ where
})?;

let farmer_app_info: Result<FarmerAppInfo, ApiError> = try {
let slot_duration = runtime_api.slot_duration(best_hash)?;
let chain_constants = runtime_api.chain_constants(best_hash)?;
let protocol_info = FarmerProtocolInfo {
history_size: runtime_api.history_size(best_hash)?,
Expand All @@ -314,7 +315,8 @@ where
FarmerAppInfo {
genesis_hash,
dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
farming_timeout: slot_duration
farming_timeout: chain_constants
.slot_duration()
.as_duration()
.mul_f64(SlotNumber::from(chain_constants.block_authoring_delay()) as f64),
protocol_info,
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ targets = ["x86_64-unknown-linux-gnu"]
async-trait = "0.1.73"
codec = { package = "parity-scale-codec", version = "3.6.5", features = ["derive"] }
futures = "0.3.29"
log = "0.4.20"
lru = "0.11.0"
parking_lot = "0.12.1"
rand = "0.8.5"
Expand Down Expand Up @@ -47,6 +46,7 @@ subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-spac
subspace-verification = { version = "0.1.0", path = "../subspace-verification" }
thiserror = "1.0.48"
tokio = { version = "1.34.0", features = ["sync"] }
tracing = "0.1.37"

[dev-dependencies]
# TODO: Restore in the future, currently tests are mostly broken and useless
Expand Down
79 changes: 41 additions & 38 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@
//! Contains implementation of archiving process in Subspace blockchain that converts blockchain
//! history (blocks) into archived history (pieces).
use crate::{
ArchivedSegmentNotification, BlockImportingNotification, SubspaceLink,
SubspaceNotificationSender, SubspaceSyncOracle,
};
use crate::block_import::BlockImportingNotification;
use crate::slot_worker::SubspaceSyncOracle;
use crate::{SubspaceLink, SubspaceNotificationSender};
use codec::{Decode, Encode};
use futures::StreamExt;
use log::{debug, info, warn};
use parking_lot::Mutex;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
use sc_utils::mpsc::tracing_unbounded;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::SyncOracle;
Expand All @@ -44,17 +42,30 @@ use sp_runtime::traits::{Block as BlockT, CheckedSub, Header, NumberFor, One, Ze
use sp_runtime::{Justifications, Saturating};
use std::error::Error;
use std::future::Future;
use std::num::NonZeroUsize;
use std::slice;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::objects::BlockObjectMapping;
use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex};
use tracing::{debug, info, warn};

/// This corresponds to default value of `--max-runtime-instances` in Substrate
const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 8;

/// How deep (in segments) should block be in order to be finalized.
///
/// This is required for full nodes to not prune recent history such that keep-up sync in Substrate
/// works even without archival nodes (initial sync will be done from DSN).
///
/// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
/// Substrate and is not worth it right now.
/// https://github.com/paritytech/substrate/discussions/14359
pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: NonZeroUsize =
NonZeroUsize::new(5).expect("Not zero; qed");

#[derive(Debug)]
struct SegmentHeadersStoreInner<AS> {
aux_store: Arc<AS>,
Expand Down Expand Up @@ -89,10 +100,7 @@ where
let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
let mut next_key_index = 0;

debug!(
target: "subspace",
"Started loading segment headers into cache"
);
debug!("Started loading segment headers into cache");
while let Some(segment_headers) =
aux_store
.get_aux(&Self::key(next_key_index))?
Expand All @@ -104,10 +112,7 @@ where
cache.extend(segment_headers);
next_key_index += 1;
}
debug!(
target: "subspace",
"Finished loading segment headers into cache"
);
debug!("Finished loading segment headers into cache");

Ok(Self {
inner: Arc::new(SegmentHeadersStoreInner {
Expand Down Expand Up @@ -198,15 +203,16 @@ where
}
}

/// How deep (in segments) should block be in order to be finalized.
///
/// This is required for full nodes to not prune recent history such that keep-up sync in Substrate
/// works even without archival nodes (initial sync will be done from DSN).
///
/// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
/// Substrate and is not worth it right now.
/// https://github.com/paritytech/substrate/discussions/14359
pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: usize = 5;
/// Notification with block header hash that needs to be signed and sender for signature.
#[derive(Debug, Clone)]
pub struct ArchivedSegmentNotification {
/// Archived segment.
pub archived_segment: Arc<NewArchivedSegment>,
/// Sender that signified the fact of receiving archived segment by farmer.
///
/// This must be used to send a message or else block import pipeline will get stuck.
pub acknowledgement_sender: TracingUnboundedSender<()>,
}

fn find_last_archived_block<Block, Client, AS>(
client: &Client,
Expand Down Expand Up @@ -423,9 +429,8 @@ where
// Continuing from existing initial state
let last_archived_block_number = last_segment_header.last_archived_block().number;
info!(
target: "subspace",
"Last archived block {}",
last_archived_block_number,
%last_archived_block_number,
"Resuming archiver from last archived block",
);

// Set initial value, this is needed in case only genesis block was archived and there
Expand Down Expand Up @@ -464,7 +469,7 @@ where

archiver
} else {
info!(target: "subspace", "Starting archiving from genesis");
info!("Starting archiving from genesis");

Archiver::new(subspace_link.kzg().clone()).expect("Incorrect parameters for archiver")
};
Expand Down Expand Up @@ -497,10 +502,8 @@ where

if let Some(blocks_to_archive_to) = blocks_to_archive_to {
info!(
target: "subspace",
"Archiving already produced blocks {}..={}",
blocks_to_archive_from,
blocks_to_archive_to,
blocks_to_archive_from, blocks_to_archive_to,
);

let thread_pool = ThreadPoolBuilder::new()
Expand Down Expand Up @@ -556,7 +559,6 @@ where
let encoded_block = encode_block(block);

debug!(
target: "subspace",
"Encoded block {} has size of {:.2} kiB",
block_number_to_archive,
encoded_block.len() as f32 / 1024.0
Expand Down Expand Up @@ -621,11 +623,15 @@ fn finalize_block<Block, Backend, Client>(
client
.apply_finality(import_op, hash, None, true)
.map_err(|error| {
warn!(target: "subspace", "Error applying finality to block {:?}: {}", (hash, number), error);
warn!(
"Error applying finality to block {:?}: {}",
(hash, number),
error
);
error
})?;

debug!(target: "subspace", "Finalizing blocks up to ({:?}, {})", number, hash);
debug!("Finalizing blocks up to ({:?}, {})", number, hash);

telemetry!(
telemetry;
Expand Down Expand Up @@ -725,10 +731,8 @@ where
let block_hash_to_archive = block.block.hash();

debug!(
target: "subspace",
"Archiving block {:?} ({})",
block_number_to_archive,
block_hash_to_archive
block_number_to_archive, block_hash_to_archive
);

if parent_block_hash != best_archived_block_hash {
Expand Down Expand Up @@ -762,7 +766,6 @@ where

let encoded_block = encode_block(block);
debug!(
target: "subspace",
"Encoded block {} has size of {:.2} kiB",
block_number_to_archive,
encoded_block.len() as f32 / 1024.0
Expand Down Expand Up @@ -796,7 +799,7 @@ where
segment_headers
.iter()
.flat_map(|(_k, v)| v.iter().rev())
.nth(FINALIZATION_DEPTH_IN_SEGMENTS)
.nth(FINALIZATION_DEPTH_IN_SEGMENTS.get())
.map(|segment_header| segment_header.last_archived_block().number)
};

Expand Down
Loading

0 comments on commit 2d374fe

Please sign in to comment.