diff --git a/Cargo.lock b/Cargo.lock index 5a249572e..0667a9f18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1033,6 +1033,17 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom 0.2.15", + "instant", + "rand 0.8.5", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -3232,6 +3243,7 @@ dependencies = [ "async-std", "async-trait", "atomic_store", + "backoff", "backtrace-on-stack-overflow", "bincode", "chrono", diff --git a/Cargo.toml b/Cargo.toml index dd3674bb4..878cf7a64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ async-compatibility-layer = { version = "1.1", default-features = false, feature async-lock = "3.3.0" async-std = { version = "1.9.0", features = ["unstable", "attributes"] } async-trait = "0.1" +backoff = "0.4" bincode = "1.3" chrono = "0.4" committable = "0.2" diff --git a/src/availability.rs b/src/availability.rs index dfde3700b..f68514817 100644 --- a/src/availability.rs +++ b/src/availability.rs @@ -448,9 +448,7 @@ where mod test { use super::*; use crate::{ - data_source::{ - storage::no_storage, ExtensibleDataSource, Transaction, VersionedDataSource, - }, + data_source::{storage::no_storage, ExtensibleDataSource}, status::StatusDataSource, task::BackgroundTask, testing::{ @@ -870,11 +868,10 @@ mod test { .await; let leaf = LeafQueryData::new(leaf, qc).unwrap(); let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis()); - - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(leaf).await.unwrap(); - tx.insert_block(block.clone()).await.unwrap(); - tx.commit().await.unwrap(); + data_source + .append(BlockInfo::new(leaf, Some(block.clone()), None, None)) + .await + .unwrap(); // assert that the store has data before we move on to API requests assert_eq!( diff --git a/src/availability/data_source.rs b/src/availability/data_source.rs index 8a68a4ae6..19a9cbe12 100644 --- a/src/availability/data_source.rs +++ b/src/availability/data_source.rs @@ -17,11 +17,14 @@ use super::{ TransactionHash, TransactionQueryData, VidCommonQueryData, }, }; -use crate::{Payload, VidCommitment, VidShare}; +use crate::{types::HeightIndexed, Payload, VidCommitment, VidShare}; use async_trait::async_trait; use derivative::Derivative; use derive_more::{Display, From}; -use futures::stream::{BoxStream, Stream, StreamExt}; +use futures::{ + future::Future, + stream::{BoxStream, Stream, StreamExt}, +}; use hotshot_types::traits::node_implementation::NodeType; use std::{cmp::Ordering, ops::RangeBounds}; @@ -201,13 +204,51 @@ where } } -#[async_trait] +/// Information about a block. +/// +/// This type encapsulate all the information we might have about a decided HotShot block: +/// * The leaf, including a header and consensus metadata +/// * The block itself, which may be missing if this node did not receive a DA proposal for this +/// block +/// * VID common and a unique VID share, which may be missing if this node did not receive a VID +/// share for this block +#[derive(Clone, Debug)] +pub struct BlockInfo { + pub leaf: LeafQueryData, + pub block: Option>, + pub vid_common: Option>, + pub vid_share: Option, +} + +impl From> for BlockInfo { + fn from(leaf: LeafQueryData) -> Self { + Self::new(leaf, None, None, None) + } +} + +impl HeightIndexed for BlockInfo { + fn height(&self) -> u64 { + self.leaf.height() + } +} + +impl BlockInfo { + pub fn new( + leaf: LeafQueryData, + block: Option>, + vid_common: Option>, + vid_share: Option, + ) -> Self { + Self { + leaf, + block, + vid_common, + vid_share, + } + } +} + pub trait UpdateAvailabilityData { - async fn insert_leaf(&mut self, leaf: LeafQueryData) -> anyhow::Result<()>; - async fn insert_block(&mut self, block: BlockQueryData) -> anyhow::Result<()>; - async fn insert_vid( - &mut self, - common: VidCommonQueryData, - share: Option, - ) -> anyhow::Result<()>; + /// Append information about a new block to the database. + fn append(&self, info: BlockInfo) -> impl Send + Future>; } diff --git a/src/availability/query_data.rs b/src/availability/query_data.rs index 3b88d07d8..80edd3fbd 100644 --- a/src/availability/query_data.rs +++ b/src/availability/query_data.rs @@ -10,7 +10,7 @@ // You should have received a copy of the GNU General Public License along with this program. If not, // see . -use crate::{types::HeightIndexed, Header, Metadata, Payload, Transaction, VidCommon}; +use crate::{types::HeightIndexed, Header, Metadata, Payload, Transaction, VidCommon, VidShare}; use committable::{Commitment, Committable}; use hotshot_types::{ data::Leaf, @@ -486,6 +486,12 @@ impl HeightIndexed for VidCommonQueryData { } } +impl HeightIndexed for (VidCommonQueryData, Option) { + fn height(&self) -> u64 { + self.0.height + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(bound = "")] pub struct TransactionQueryData diff --git a/src/data_source.rs b/src/data_source.rs index dbbb3cb3d..a6dccd337 100644 --- a/src/data_source.rs +++ b/src/data_source.rs @@ -454,10 +454,10 @@ pub mod availability_tests { #[espresso_macros::generic_tests] pub mod persistence_tests { use crate::{ - availability::{BlockQueryData, LeafQueryData, UpdateAvailabilityData}, + availability::{BlockQueryData, LeafQueryData}, data_source::{ - storage::{AvailabilityStorage, NodeStorage}, - Transaction, UpdateDataSource, + storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage}, + Transaction, }, node::NodeDataSource, testing::{ @@ -475,8 +475,9 @@ pub mod persistence_tests { #[async_std::test] pub async fn test_revert() where - for<'a> D::Transaction<'a>: - UpdateDataSource + AvailabilityStorage + NodeStorage, + for<'a> D::Transaction<'a>: UpdateAvailabilityStorage + + AvailabilityStorage + + NodeStorage, { use hotshot_example_types::node_types::TestVersions; @@ -528,7 +529,7 @@ pub mod persistence_tests { #[async_std::test] pub async fn test_reset() where - for<'a> D::Transaction<'a>: UpdateDataSource, + for<'a> D::Transaction<'a>: UpdateAvailabilityStorage, { use hotshot_example_types::node_types::TestVersions; @@ -588,8 +589,9 @@ pub mod persistence_tests { #[async_std::test] pub async fn test_drop_tx() where - for<'a> D::Transaction<'a>: - UpdateDataSource + AvailabilityStorage + NodeStorage, + for<'a> D::Transaction<'a>: UpdateAvailabilityStorage + + AvailabilityStorage + + NodeStorage, for<'a> D::ReadOnly<'a>: NodeStorage, { use hotshot_example_types::node_types::TestVersions; @@ -672,10 +674,12 @@ pub mod persistence_tests { pub mod node_tests { use crate::{ availability::{ - BlockQueryData, LeafQueryData, QueryableHeader, UpdateAvailabilityData, - VidCommonQueryData, + BlockInfo, BlockQueryData, LeafQueryData, QueryableHeader, VidCommonQueryData, + }, + data_source::{ + storage::{NodeStorage, UpdateAvailabilityStorage}, + update::Transaction, }, - data_source::{storage::NodeStorage, update::Transaction, UpdateDataSource}, node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart}, testing::{ consensus::{MockNetwork, TestableDataSource}, @@ -704,7 +708,7 @@ pub mod node_tests { #[async_std::test] pub async fn test_sync_status() where - for<'a> D::Transaction<'a>: UpdateDataSource, + for<'a> D::Transaction<'a>: UpdateAvailabilityStorage, { use hotshot_example_types::node_types::TestVersions; @@ -758,11 +762,7 @@ pub mod node_tests { // Insert a leaf without the corresponding block or VID info, make sure we detect that the // block and VID info are missing. - { - let mut tx = ds.write().await.unwrap(); - tx.insert_leaf(leaves[0].clone()).await.unwrap(); - tx.commit().await.unwrap(); - } + ds.append(leaves[0].clone().into()).await.unwrap(); assert_eq!( ds.sync_status().await.unwrap(), SyncStatus { @@ -776,11 +776,7 @@ pub mod node_tests { // Insert a leaf whose height is not the successor of the previous leaf. We should now // detect that the leaf in between is missing (along with all _three_ corresponding blocks). - { - let mut tx = ds.write().await.unwrap(); - tx.insert_leaf(leaves[2].clone()).await.unwrap(); - tx.commit().await.unwrap(); - } + ds.append(leaves[2].clone().into()).await.unwrap(); assert_eq!( ds.sync_status().await.unwrap(), SyncStatus { @@ -860,10 +856,7 @@ pub mod node_tests { } #[async_std::test] - pub async fn test_counters() - where - for<'a> D::Transaction<'a>: UpdateDataSource, - { + pub async fn test_counters() { use hotshot_example_types::node_types::TestVersions; setup_test(); @@ -914,12 +907,9 @@ pub mod node_tests { .await; *leaf.leaf.block_header_mut() = header.clone(); let block = BlockQueryData::new(header, payload); - { - let mut tx = ds.write().await.unwrap(); - tx.insert_leaf(leaf).await.unwrap(); - tx.insert_block(block).await.unwrap(); - tx.commit().await.unwrap(); - } + ds.append(BlockInfo::new(leaf, Some(block), None, None)) + .await + .unwrap(); total_transactions += 1; total_size += encoded.len(); @@ -960,7 +950,7 @@ pub mod node_tests { #[async_std::test] pub async fn test_vid_monotonicity() where - for<'a> D::Transaction<'a>: UpdateDataSource, + for<'a> D::Transaction<'a>: UpdateAvailabilityStorage, for<'a> D::ReadOnly<'a>: NodeStorage, { use hotshot_example_types::node_types::TestVersions; @@ -981,14 +971,14 @@ pub mod node_tests { ) .await; let common = VidCommonQueryData::new(leaf.header().clone(), disperse.common); - { - let mut tx = ds.write().await.unwrap(); - tx.insert_leaf(leaf).await.unwrap(); - tx.insert_vid(common.clone(), Some(disperse.shares[0].clone())) - .await - .unwrap(); - tx.commit().await.unwrap(); - } + ds.append(BlockInfo::new( + leaf, + None, + Some(common.clone()), + Some(disperse.shares[0].clone()), + )) + .await + .unwrap(); { assert_eq!(ds.get_vid_common(0).await.await, common); diff --git a/src/data_source/extension.rs b/src/data_source/extension.rs index d9b286970..bf8f6398d 100644 --- a/src/data_source/extension.rs +++ b/src/data_source/extension.rs @@ -13,7 +13,7 @@ use super::VersionedDataSource; use crate::{ availability::{ - AvailabilityDataSource, BlockId, BlockQueryData, Fetch, LeafId, LeafQueryData, + AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, Fetch, LeafId, LeafQueryData, PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash, TransactionQueryData, UpdateAvailabilityData, VidCommonQueryData, }, @@ -213,27 +213,14 @@ where } } -#[async_trait] impl UpdateAvailabilityData for ExtensibleDataSource where D: UpdateAvailabilityData + Send + Sync, U: Send + Sync, Types: NodeType, { - async fn insert_leaf(&mut self, leaf: LeafQueryData) -> anyhow::Result<()> { - self.data_source.insert_leaf(leaf).await - } - - async fn insert_block(&mut self, block: BlockQueryData) -> anyhow::Result<()> { - self.data_source.insert_block(block).await - } - - async fn insert_vid( - &mut self, - common: VidCommonQueryData, - share: Option, - ) -> anyhow::Result<()> { - self.data_source.insert_vid(common, share).await + async fn append(&self, info: BlockInfo) -> anyhow::Result<()> { + self.data_source.append(info).await } } @@ -417,7 +404,7 @@ where mod impl_testable_data_source { use super::*; use crate::{ - data_source::{Transaction, UpdateDataSource}, + data_source::UpdateDataSource, testing::{ consensus::{DataSourceLifeCycle, TestableDataSource}, mocks::MockTypes, @@ -428,8 +415,7 @@ mod impl_testable_data_source { #[async_trait] impl DataSourceLifeCycle for ExtensibleDataSource where - D: TestableDataSource, - for<'a> D::Transaction<'a>: UpdateDataSource, + D: TestableDataSource + UpdateDataSource, U: Clone + Default + Send + Sync + 'static, { type Storage = D::Storage; @@ -447,9 +433,7 @@ mod impl_testable_data_source { } async fn handle_event(&self, event: &Event) { - let mut tx = self.write().await.unwrap(); - tx.update(event).await.unwrap(); - tx.commit().await.unwrap(); + self.update(event).await; } } } diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index 9f14e1cc5..ef65e8c48 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -74,16 +74,17 @@ //! fetched [proactively](#proactive-fetching). use super::{ + notifier::Notifier, storage::{ pruning::{PruneStorage, PrunedHeightStorage}, AvailabilityStorage, ExplorerStorage, MerklizedStateHeightStorage, MerklizedStateStorage, - NodeStorage, + NodeStorage, UpdateAvailabilityStorage, }, - VersionedDataSource, + Transaction, VersionedDataSource, }; use crate::{ availability::{ - AvailabilityDataSource, BlockId, BlockQueryData, Fetch, LeafId, LeafQueryData, + AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, Fetch, LeafId, LeafQueryData, PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash, TransactionQueryData, UpdateAvailabilityData, VidCommonQueryData, }, @@ -103,9 +104,10 @@ use anyhow::Context; use async_lock::Semaphore; use async_std::{sync::Arc, task::sleep}; use async_trait::async_trait; +use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder}; use derivative::Derivative; use futures::{ - future::{join_all, BoxFuture, FutureExt}, + future::{join_all, BoxFuture, Future, FutureExt}, stream::{self, BoxStream, Stream, StreamExt}, }; use hotshot_types::traits::{ @@ -126,24 +128,21 @@ use tracing::Instrument; mod block; mod header; mod leaf; -mod notify_storage; mod transaction; mod vid; use self::{ block::PayloadFetcher, leaf::LeafFetcher, - notify_storage::{Heights, Notifiers, NotifyStorage}, transaction::TransactionRequest, vid::{VidCommonFetcher, VidCommonRequest}, }; -pub use notify_storage::Transaction; /// Builder for [`FetchingDataSource`] with configuration. pub struct Builder { storage: S, provider: P, - retry_delay: Option, + backoff: ExponentialBackoffBuilder, rate_limit: Option, range_chunk_size: usize, minor_scan_interval: Duration, @@ -159,10 +158,17 @@ pub struct Builder { impl Builder { /// Construct a new builder with the given storage and fetcher and the default options. pub fn new(storage: S, provider: P) -> Self { + let mut default_backoff = ExponentialBackoffBuilder::default(); + default_backoff + .with_initial_interval(Duration::from_secs(1)) + .with_multiplier(2.) + .with_max_interval(Duration::from_secs(32)) + .with_max_elapsed_time(Some(Duration::from_secs(64))); + Self { storage, provider, - retry_delay: None, + backoff: default_backoff, rate_limit: None, range_chunk_size: 25, // By default, we run minor proactive scans fairly frequently: once every minute. These @@ -184,13 +190,37 @@ impl Builder { } } - /// Set the maximum delay between retries of fetches. - pub fn with_retry_delay(mut self, retry_delay: Duration) -> Self { - self.retry_delay = Some(retry_delay); + /// Set the minimum delay between retries of failed operations. + pub fn with_min_retry_interval(mut self, interval: Duration) -> Self { + self.backoff.with_initial_interval(interval); + self + } + + /// Set the maximum delay between retries of failed operations. + pub fn with_max_retry_interval(mut self, interval: Duration) -> Self { + self.backoff.with_max_interval(interval); + self + } + + /// Set the multiplier for exponential backoff when retrying failed requests. + pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self { + self.backoff.with_multiplier(multiplier); self } - /// Set the maximum delay between retries of fetches. + /// Set the randomization factor for randomized backoff when retrying failed requests. + pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self { + self.backoff.with_randomization_factor(factor); + self + } + + /// Set the maximum time to retry failed operations before giving up. + pub fn with_retry_timeout(mut self, timeout: Duration) -> Self { + self.backoff.with_max_elapsed_time(Some(timeout)); + self + } + + /// Set the maximum number of simultaneous fetches. pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self { self.rate_limit = Some(with_rate_limit); self @@ -298,7 +328,7 @@ where Header: QueryableHeader, S: PruneStorage + VersionedDataSource + HasMetrics + 'static, for<'a> S::ReadOnly<'a>: AvailabilityStorage + PrunedHeightStorage + NodeStorage, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { /// Build a [`FetchingDataSource`] with these options. @@ -368,7 +398,7 @@ where let future = async move { for i in 1.. { tracing::warn!("starting pruner run {i} "); - fetcher.storage.prune().await; + fetcher.prune().await; sleep(cfg.interval()).await; } }; @@ -388,7 +418,7 @@ where Payload: QueryablePayload, Header: QueryableHeader, S: VersionedDataSource + PruneStorage + HasMetrics + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, for<'a> S::ReadOnly<'a>: AvailabilityStorage + NodeStorage + PrunedHeightStorage, P: AvailabilityProvider, { @@ -439,7 +469,7 @@ where Types: NodeType, { fn as_ref(&self) -> &S { - self.fetcher.storage.as_ref() + &self.fetcher.storage } } @@ -475,7 +505,7 @@ where Types: NodeType, Payload: QueryablePayload, S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, for<'a> S::ReadOnly<'a>: AvailabilityStorage + NodeStorage + PrunedHeightStorage, P: AvailabilityProvider, { @@ -556,16 +586,56 @@ where } } +impl UpdateAvailabilityData for FetchingDataSource +where + Types: NodeType, + Payload: QueryablePayload, + S: VersionedDataSource + 'static, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, + for<'a> S::ReadOnly<'a>: AvailabilityStorage, + P: AvailabilityProvider, +{ + async fn append(&self, info: BlockInfo) -> anyhow::Result<()> { + let height = info.height() as usize; + let fetch_block = info.block.is_none(); + let fetch_vid = info.vid_common.is_none(); + + self.fetcher.store_and_notify(info).await; + + // If data related to this block is missing, try and fetch it. + if fetch_block || fetch_vid { + let mut tx = match self.read().await { + Ok(tx) => tx, + Err(err) => { + tracing::warn!( + height, + "not fetching missing data at decide; could not open transactin: {err:#}" + ); + return Ok(()); + } + }; + if fetch_block { + BlockQueryData::active_fetch(&mut tx, self.fetcher.clone(), height.into()).await; + } + if fetch_vid { + VidCommonQueryData::active_fetch(&mut tx, self.fetcher.clone(), height.into()) + .await; + } + } + Ok(()) + } +} + impl VersionedDataSource for FetchingDataSource where Types: NodeType, S: VersionedDataSource + Send + Sync, P: Send + Sync, { - type Transaction<'a> = Transaction<'a, Types, S::Transaction<'a>> + type Transaction<'a> = S::Transaction<'a> where Self: 'a; - type ReadOnly<'a> = Transaction<'a, Types, S::ReadOnly<'a>> + type ReadOnly<'a> = S::ReadOnly<'a> where Self: 'a; @@ -584,7 +654,8 @@ struct Fetcher where Types: NodeType, { - storage: NotifyStorage, + storage: S, + notifiers: Notifiers, provider: Arc

, payload_fetcher: Arc>, leaf_fetcher: Arc>, @@ -594,6 +665,8 @@ where active_fetch_delay: Duration, // Duration to sleep after each chunk fetched chunk_fetch_delay: Duration, + // Exponential backoff when retrying failed oeprations. + backoff: ExponentialBackoff, } impl VersionedDataSource for Fetcher @@ -602,10 +675,10 @@ where S: VersionedDataSource + Send + Sync, P: Send + Sync, { - type Transaction<'a> = Transaction<'a, Types, S::Transaction<'a>> + type Transaction<'a> = S::Transaction<'a> where Self: 'a; - type ReadOnly<'a> = Transaction<'a, Types, S::ReadOnly<'a>> + type ReadOnly<'a> = S::ReadOnly<'a> where Self: 'a; @@ -625,14 +698,14 @@ where for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage, { async fn new(builder: Builder) -> anyhow::Result { + let backoff = builder.backoff.build(); + let mut payload_fetcher = fetching::Fetcher::default(); let mut leaf_fetcher = fetching::Fetcher::default(); let mut vid_common_fetcher = fetching::Fetcher::default(); - if let Some(delay) = builder.retry_delay { - payload_fetcher = payload_fetcher.with_retry_delay(delay); - leaf_fetcher = leaf_fetcher.with_retry_delay(delay); - vid_common_fetcher = vid_common_fetcher.with_retry_delay(delay); - } + payload_fetcher = payload_fetcher.with_backoff(backoff.clone()); + leaf_fetcher = leaf_fetcher.with_backoff(backoff.clone()); + vid_common_fetcher = vid_common_fetcher.with_backoff(backoff.clone()); if let Some(limit) = builder.rate_limit { let permit = Arc::new(Semaphore::new(limit)); @@ -643,7 +716,8 @@ where } Ok(Self { - storage: NotifyStorage::new(builder.storage), + storage: builder.storage, + notifiers: Default::default(), provider: Arc::new(builder.provider), payload_fetcher: Arc::new(payload_fetcher), leaf_fetcher: Arc::new(leaf_fetcher), @@ -651,6 +725,7 @@ where range_chunk_size: builder.range_chunk_size, active_fetch_delay: builder.active_fetch_delay, chunk_fetch_delay: builder.chunk_fetch_delay, + backoff, }) } } @@ -660,7 +735,7 @@ where Types: NodeType, Payload: QueryablePayload, S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, for<'a> S::ReadOnly<'a>: AvailabilityStorage + NodeStorage + PrunedHeightStorage, P: AvailabilityProvider, { @@ -670,9 +745,17 @@ where R: Into + Send, { let req = req.into(); - // Subscribe to notifications while we run `ok_or_fetch`. This means we won't miss any - // notifications sent in between checking local storage and triggering a fetch if necessary. - let passive = T::passive_fetch(self.storage.notifiers(), req).await; + + // Subscribe to notifications before we check storage for the requested object. This ensures + // that this operation will always eventually succeed as long as the requested object + // actually exists (or will exist). We will either find it in our local storage and succeed + // immediately, or (if it exists) someone will *later* come and add it to storage, at which + // point we will get a notification causing this passive fetch to resolve. + // + // Note the "someone" who later fetches the object and adds it to storage may be an active + // fetch triggered by this very requests, in cases where that is possible, but it need not + // be. + let passive = T::passive_fetch(&self.notifiers, req).await; let mut tx = match self.read().await { Ok(tx) => tx, @@ -760,12 +843,13 @@ where where T: RangedFetchable, { - // Subscribe to notifications first. This means we won't miss any notifications sent in - // between checking local storage and triggering a fetch if necessary. + // Subscribe to notifications first. As in [`get`](Self::get), this ensures we won't miss + // any notifications sent in between checking local storage and triggering a fetch if + // necessary. let mut passive = join_all( chunk .clone() - .map(|i| T::passive_fetch(self.storage.notifiers(), i.into())), + .map(|i| T::passive_fetch(&self.notifiers, i.into())), ) .await; @@ -901,7 +985,7 @@ where // Trigger an active fetch from a remote provider if possible. if let Some(tx) = tx { - if let Some(heights) = tx.heights().await.ok_or_trace() { + if let Some(heights) = Heights::load(tx).await.ok_or_trace() { if req.might_exist(heights) { T::active_fetch(tx, self.clone(), req).await; } else { @@ -961,7 +1045,7 @@ where continue; } }; - let heights = match tx.heights().await { + let heights = match Heights::load(&mut tx).await { Ok(heights) => heights, Err(err) => { tracing::error!(?backoff, "unable to load heights: {err:#}"); @@ -1076,6 +1160,151 @@ where } } +impl Fetcher +where + Types: NodeType, + S: VersionedDataSource, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, +{ + /// Store an object and notify anyone waiting on this object that it is available. + async fn store_and_notify(&self, obj: T) + where + T: Storable, + { + let try_store = || async { + let mut tx = self.storage.write().await?; + obj.clone().store(&mut tx).await?; + tx.commit().await + }; + + // Store the object in local storage, so we can avoid fetching it in the future. + let mut backoff = self.backoff.clone(); + backoff.reset(); + loop { + let Err(err) = try_store().await else { + break; + }; + // It is unfortunate if this fails, but we can still proceed by notifying with the + // object that we fetched, keeping it in memory. Log the error, retry a few times, and + // eventually move on. + tracing::warn!( + "failed to store fetched {} {}: {err:#}", + T::name(), + obj.height() + ); + + let Some(delay) = backoff.next_backoff() else { + break; + }; + tracing::info!(?delay, "retrying failed operation"); + sleep(delay).await; + } + + // Send a notification about the newly received object. It is important that we do this + // _after_ our attempt to store the object in local storage, otherwise there is a potential + // missed notification deadlock: + // * we send the notification + // * a task calls [`get`](Self::get) or [`get_chunk`](Self::get_chunk), finds that the + // requested object is not in storage, and begins waiting for a notification + // * we store the object. This ensures that no other task will be triggered to fetch it, + // which means no one will ever notify the waiting task. + // + // Note that we send the notification regardless of whether the store actually succeeded or + // not. This is to avoid _another_ subtle deadlock: if we failed to notify just because we + // failed to store, some fetches might not resolve, even though the object in question has + // actually been fetched. This should actually be ok, because as long as the object is not + // in storage, eventually some other task will come along and fetch, store, and notify about + // it. However, this is certainly not ideal, since we could resolve those pending fetches + // right now, and it causes bigger problems when the fetch that fails to resolve is the + // proactive scanner task, who is often the one that would eventually come along and + // re-fetch the object. + // + // The key thing to note is that it does no harm to notify even if we fail to store: at best + // we wake some tasks up sooner; at worst, anyone who misses the notification still + // satisfies the invariant that we only wait on notifications for objects which are not in + // storage, and eventually some other task will come along, find the object missing from + // storage, and re-fetch it. + obj.notify(&self.notifiers).await; + } +} + +impl Fetcher +where + Types: NodeType, + S: PruneStorage + Sync, +{ + async fn prune(&self) { + // We loop until the whole run pruner run is complete + let mut pruner = S::Pruner::default(); + loop { + match self.storage.prune(&mut pruner).await { + Ok(Some(height)) => { + tracing::warn!("Pruned to height {height}"); + } + Ok(None) => { + tracing::warn!("pruner run complete."); + break; + } + Err(e) => { + tracing::error!("pruner run failed: {e:?}"); + break; + } + } + } + } +} + +#[derive(Debug)] +struct Notifiers +where + Types: NodeType, +{ + block: Notifier>, + leaf: Notifier>, + vid_common: Notifier>, +} + +impl Default for Notifiers +where + Types: NodeType, +{ + fn default() -> Self { + Self { + block: Notifier::new(), + leaf: Notifier::new(), + vid_common: Notifier::new(), + } + } +} + +#[derive(Clone, Copy, Debug)] +struct Heights { + height: u64, + pruned_height: Option, +} + +impl Heights { + async fn load(tx: &mut T) -> anyhow::Result + where + Types: NodeType, + T: NodeStorage + PrunedHeightStorage + Send, + { + let height = tx.block_height().await.context("loading block height")? as u64; + let pruned_height = tx + .load_pruned_height() + .await + .context("loading pruned height")?; + Ok(Self { + height, + pruned_height, + }) + } + + fn might_exist(self, h: u64) -> bool { + h < self.height && self.pruned_height.map_or(true, |ph| h > ph) + } +} + #[async_trait] impl MerklizedStateDataSource for FetchingDataSource @@ -1335,7 +1564,7 @@ where req: Self::Request, ) where S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider; /// Wait for someone else to fetch the object. @@ -1367,6 +1596,54 @@ where R: RangeBounds + Send + 'static; } +/// An object which can be stored in the database. +trait Storable: HeightIndexed + Clone { + /// The name of this type of object, for debugging purposes. + fn name() -> &'static str; + + /// Notify anyone waiting for this object that it has become available. + fn notify(&self, notifiers: &Notifiers) -> impl Send + Future; + + /// Store the object in the local database. + fn store( + self, + storage: &mut (impl UpdateAvailabilityStorage + Send), + ) -> impl Send + Future>; +} + +impl Storable for BlockInfo { + fn name() -> &'static str { + "block info" + } + + async fn notify(&self, notifiers: &Notifiers) { + self.leaf.notify(notifiers).await; + + if let Some(block) = &self.block { + block.notify(notifiers).await; + } + if let Some(vid) = &self.vid_common { + vid.notify(notifiers).await; + } + } + + async fn store( + self, + storage: &mut (impl UpdateAvailabilityStorage + Send), + ) -> anyhow::Result<()> { + self.leaf.store(storage).await?; + + if let Some(block) = self.block { + block.store(storage).await?; + } + if let Some(common) = self.vid_common { + (common, self.vid_share).store(storage).await?; + } + + Ok(()) + } +} + /// Break a range into fixed-size chunks. fn range_chunks(range: R, chunk_size: usize) -> impl Iterator> where diff --git a/src/data_source/fetching/block.rs b/src/data_source/fetching/block.rs index a494908b4..b9c369b39 100644 --- a/src/data_source/fetching/block.rs +++ b/src/data_source/fetching/block.rs @@ -14,14 +14,15 @@ use super::{ header::{fetch_header_and_then, HeaderCallback}, - AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, NotifyStorage, - RangedFetchable, + AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable, + Storable, }; use crate::{ - availability::{ - BlockId, BlockQueryData, PayloadQueryData, QueryablePayload, UpdateAvailabilityData, + availability::{BlockId, BlockQueryData, PayloadQueryData, QueryablePayload}, + data_source::{ + storage::{AvailabilityStorage, UpdateAvailabilityStorage}, + VersionedDataSource, }, - data_source::{storage::AvailabilityStorage, update::Transaction, VersionedDataSource}, fetching::{ self, request::{self, PayloadRequest}, @@ -87,7 +88,7 @@ where req: Self::Request, ) where S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { fetch_header_and_then( @@ -125,6 +126,26 @@ where } } +impl Storable for BlockQueryData +where + Types: NodeType, +{ + fn name() -> &'static str { + "block" + } + + async fn notify(&self, notifiers: &Notifiers) { + notifiers.block.notify(self).await; + } + + async fn store( + self, + storage: &mut (impl UpdateAvailabilityStorage + Send), + ) -> anyhow::Result<()> { + storage.insert_block(self).await + } +} + pub(super) fn fetch_block_with_header( fetcher: Arc>, header: Header, @@ -132,7 +153,7 @@ pub(super) fn fetch_block_with_header( Types: NodeType, Payload: QueryablePayload, S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { // Now that we have the header, we only need to retrieve the payload. @@ -151,21 +172,6 @@ pub(super) fn fetch_block_with_header( ); } -async fn store_block( - storage: &NotifyStorage, - block: BlockQueryData, -) -> anyhow::Result<()> -where - Types: NodeType, - Payload: QueryablePayload, - S: VersionedDataSource, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, -{ - let mut tx = storage.write().await?; - tx.insert_block(block).await?; - tx.commit().await -} - #[async_trait] impl Fetchable for PayloadQueryData where @@ -201,7 +207,7 @@ where req: Self::Request, ) where S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { // We don't have storage for the payload alone, only the whole block. So if we need to fetch @@ -267,19 +273,12 @@ impl Callback> for PayloadCallback: QueryablePayload, S: 'static + VersionedDataSource, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { async fn run(self, payload: Payload) { tracing::info!("fetched payload {:?}", self.header.payload_commitment()); let block = BlockQueryData::new(self.header, payload); - let height = block.height(); - - // Store the block in local storage, so we can avoid fetching it in the future. - if let Err(err) = store_block(&self.fetcher.storage, block).await { - // It is unfortunate if this fails, but we can still proceed by returning the block that - // we fetched, keeping it in memory. Simply log the error and move on. - tracing::warn!("failed to store fetched block {height}: {err}"); - } + self.fetcher.store_and_notify(block).await; } } diff --git a/src/data_source/fetching/header.rs b/src/data_source/fetching/header.rs index 1dd5e9f52..fdc235aa2 100644 --- a/src/data_source/fetching/header.rs +++ b/src/data_source/fetching/header.rs @@ -17,8 +17,11 @@ use super::{ vid::fetch_vid_common_with_header, AvailabilityProvider, Fetcher, ResultExt, }; use crate::{ - availability::{BlockId, QueryablePayload, UpdateAvailabilityData}, - data_source::{storage::AvailabilityStorage, update::VersionedDataSource}, + availability::{BlockId, QueryablePayload}, + data_source::{ + storage::{AvailabilityStorage, UpdateAvailabilityStorage}, + update::VersionedDataSource, + }, Header, Payload, }; use anyhow::Context; @@ -75,7 +78,7 @@ where Types: NodeType, Payload: QueryablePayload, S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { fn fetcher(&self) -> Arc> { @@ -113,7 +116,7 @@ pub(super) async fn fetch_header_and_then( Types: NodeType, Payload: QueryablePayload, S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { // Check if at least the header is available in local storage. If it is, we benefit two ways: diff --git a/src/data_source/fetching/leaf.rs b/src/data_source/fetching/leaf.rs index 5941ab25c..55f01d9c4 100644 --- a/src/data_source/fetching/leaf.rs +++ b/src/data_source/fetching/leaf.rs @@ -14,11 +14,14 @@ use super::{ header::HeaderCallback, AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, - Notifiers, NotifyStorage, RangedFetchable, + Notifiers, RangedFetchable, Storable, }; use crate::{ - availability::{LeafId, LeafQueryData, QueryablePayload, UpdateAvailabilityData}, - data_source::{storage::AvailabilityStorage, update::Transaction, VersionedDataSource}, + availability::{LeafId, LeafQueryData, QueryablePayload}, + data_source::{ + storage::{AvailabilityStorage, UpdateAvailabilityStorage}, + VersionedDataSource, + }, fetching::{self, request, Callback}, types::HeightIndexed, Payload, QueryResult, @@ -80,7 +83,7 @@ where req: Self::Request, ) where S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { fetch_leaf_with_callbacks(fetcher, req, None) @@ -102,7 +105,7 @@ pub(super) fn fetch_leaf_with_callbacks( Types: NodeType, Payload: QueryablePayload, S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, I: IntoIterator> + Send + 'static, I::IntoIter: Send, @@ -125,22 +128,6 @@ pub(super) fn fetch_leaf_with_callbacks( } } -async fn store_leaf( - storage: &NotifyStorage, - leaf: LeafQueryData, -) -> anyhow::Result<()> -where - Types: NodeType, - Payload: QueryablePayload, - S: VersionedDataSource, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, -{ - let mut tx = storage.write().await?; - tx.insert_leaf(leaf).await?; - tx.commit().await?; - Ok(()) -} - #[async_trait] impl RangedFetchable for LeafQueryData where @@ -158,6 +145,26 @@ where } } +impl Storable for LeafQueryData +where + Types: NodeType, +{ + fn name() -> &'static str { + "leaf" + } + + async fn notify(&self, notifiers: &Notifiers) { + notifiers.leaf.notify(self).await; + } + + async fn store( + self, + storage: &mut (impl UpdateAvailabilityStorage + Send), + ) -> anyhow::Result<()> { + storage.insert_leaf(self).await + } +} + #[derive(Derivative, From)] #[derivative(Debug(bound = ""))] pub(super) enum LeafCallback { @@ -206,20 +213,14 @@ impl Callback> for LeafCallback: QueryablePayload, S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { async fn run(self, leaf: LeafQueryData) { match self { Self::Leaf { fetcher } => { - let height = leaf.height(); - tracing::info!("fetched leaf {height}"); - if let Err(err) = store_leaf(&fetcher.storage, leaf).await { - // It is unfortunate if this fails, but we can still proceed by - // returning the leaf that we fetched, keeping it in memory. - // Simply log the error and move on. - tracing::warn!("failed to store fetched leaf {height}: {err}"); - } + tracing::info!("fetched leaf {}", leaf.height()); + fetcher.store_and_notify(leaf).await; } Self::Continuation { callback } => callback.run(leaf.leaf.block_header().clone()), } diff --git a/src/data_source/fetching/notify_storage.rs b/src/data_source/fetching/notify_storage.rs deleted file mode 100644 index 0b23f463d..000000000 --- a/src/data_source/fetching/notify_storage.rs +++ /dev/null @@ -1,559 +0,0 @@ -// Copyright (c) 2022 Espresso Systems (espressosys.com) -// This file is part of the HotShot Query Service library. -// -// This program is free software: you can redistribute it and/or modify it under the terms of the GNU -// General Public License as published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without -// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// General Public License for more details. -// You should have received a copy of the GNU General Public License along with this program. If not, -// see . - -use crate::{ - availability::{ - BlockId, BlockQueryData, LeafId, LeafQueryData, PayloadQueryData, QueryableHeader, - QueryablePayload, TransactionHash, TransactionQueryData, UpdateAvailabilityData, - VidCommonQueryData, - }, - data_source::{ - notifier::Notifier, - storage::{ - pruning::{PruneStorage, PrunedHeightStorage, PrunerCfg}, - AvailabilityStorage, ExplorerStorage, MerklizedStateHeightStorage, - MerklizedStateStorage, NodeStorage, - }, - update::{self, VersionedDataSource}, - }, - explorer, - merklized_state::{MerklizedState, Snapshot, UpdateStateData}, - node::{SyncStatus, TimeWindowQueryData, WindowStart}, - Header, Payload, QueryResult, VidShare, -}; -use anyhow::Context; -use async_trait::async_trait; -use futures::future::Future; -use hotshot_types::traits::node_implementation::NodeType; -use jf_merkle_tree::{prelude::MerkleProof, MerkleTreeScheme}; -use std::ops::RangeBounds; -use tagged_base64::TaggedBase64; - -#[derive(Debug)] -pub(super) struct NotifyStorage -where - Types: NodeType, -{ - storage: S, - notifiers: Notifiers, -} - -impl AsRef for NotifyStorage -where - Types: NodeType, -{ - fn as_ref(&self) -> &S { - &self.storage - } -} - -impl NotifyStorage -where - Types: NodeType, -{ - pub(super) fn new(storage: S) -> Self { - Self { - storage, - notifiers: Default::default(), - } - } -} - -impl NotifyStorage -where - Types: NodeType, - S: PruneStorage + Sync, -{ - pub(super) async fn prune(&self) { - // We loop until the whole run pruner run is complete - let mut pruner = S::Pruner::default(); - loop { - match self.storage.prune(&mut pruner).await { - Ok(Some(height)) => { - tracing::warn!("Pruned to height {height}"); - } - Ok(None) => { - tracing::warn!("pruner run complete."); - break; - } - Err(e) => { - tracing::error!("pruner run failed: {e:?}"); - break; - } - } - } - } -} - -impl NotifyStorage -where - Types: NodeType, -{ - pub(super) fn notifiers(&self) -> &Notifiers { - &self.notifiers - } -} - -impl NotifyStorage -where - Types: NodeType, - S: PruneStorage + Sync, -{ - pub(super) fn get_pruning_config(&self) -> Option { - self.storage.get_pruning_config() - } -} - -impl VersionedDataSource for NotifyStorage -where - Types: NodeType, - S: VersionedDataSource, -{ - type ReadOnly<'a> = Transaction<'a, Types, S::ReadOnly<'a>> - where - Self: 'a; - - type Transaction<'a> = Transaction<'a, Types, S::Transaction<'a>> - where - Self: 'a; - - async fn read(&self) -> anyhow::Result> { - Ok(Transaction::new(self, self.storage.read().await?)) - } - - async fn write(&self) -> anyhow::Result> { - Ok(Transaction::new(self, self.storage.write().await?)) - } -} - -#[derive(Debug)] -pub struct Transaction<'a, Types, T> -where - Types: NodeType, -{ - inner: T, - notifiers: &'a Notifiers, - - // Pending notifications generated during this transaction. These notifications will be sent out - // after the transaction is committed to storage, which guarantees that anyone who subscribes to - // notifications and then sees that the desired object is _not_ present in storage will - // subsequently get a notification after the object is added to storage. - inserted_leaves: Vec>, - inserted_blocks: Vec>, - inserted_vid: Vec>, -} - -impl<'a, Types, T> Transaction<'a, Types, T> -where - Types: NodeType, -{ - fn new(storage: &'a NotifyStorage, inner: T) -> Self { - Self { - inner, - notifiers: &storage.notifiers, - inserted_leaves: Default::default(), - inserted_blocks: Default::default(), - inserted_vid: Default::default(), - } - } -} - -impl<'a, Types, T> AsRef for Transaction<'a, Types, T> -where - Types: NodeType, -{ - fn as_ref(&self) -> &T { - &self.inner - } -} - -impl<'a, Types, T> AsMut for Transaction<'a, Types, T> -where - Types: NodeType, -{ - fn as_mut(&mut self) -> &mut T { - &mut self.inner - } -} - -impl<'a, Types, T> update::Transaction for Transaction<'a, Types, T> -where - Types: NodeType, - T: update::Transaction, -{ - async fn commit(self) -> anyhow::Result<()> { - self.inner.commit().await?; - - // Now that any inserted objects have been added to storage, alert any clients who were - // waiting on these objects. - for leaf in self.inserted_leaves { - self.notifiers.leaf.notify(&leaf).await; - } - for block in self.inserted_blocks { - self.notifiers.block.notify(&block).await; - } - for vid in self.inserted_vid { - self.notifiers.vid_common.notify(&vid).await; - } - - Ok(()) - } - - fn revert(self) -> impl Future + Send { - self.inner.revert() - } -} - -#[async_trait] -impl<'a, Types, T> PrunedHeightStorage for Transaction<'a, Types, T> -where - Types: NodeType, - T: PrunedHeightStorage + Send, -{ - async fn load_pruned_height(&mut self) -> anyhow::Result> { - self.inner.load_pruned_height().await - } -} - -impl<'a, Types, T> Transaction<'a, Types, T> -where - Types: NodeType, - T: PrunedHeightStorage + NodeStorage + Send + Sync, -{ - pub(super) async fn heights(&mut self) -> anyhow::Result { - let height = self.block_height().await.context("loading block height")? as u64; - let pruned_height = self - .load_pruned_height() - .await - .context("loading pruned height")?; - Ok(Heights { - height, - pruned_height, - }) - } -} - -#[async_trait] -impl<'a, Types, T, State, const ARITY: usize> MerklizedStateStorage - for Transaction<'a, Types, T> -where - Types: NodeType, - T: MerklizedStateStorage + Send, - State: MerklizedState + 'static, - ::Commitment: Send, -{ - async fn get_path( - &mut self, - snapshot: Snapshot, - key: State::Key, - ) -> QueryResult> { - self.as_mut().get_path(snapshot, key).await - } -} - -#[async_trait] -impl<'a, Types, T> MerklizedStateHeightStorage for Transaction<'a, Types, T> -where - Types: NodeType, - T: MerklizedStateHeightStorage + Send, -{ - async fn get_last_state_height(&mut self) -> QueryResult { - self.as_mut().get_last_state_height().await - } -} - -#[async_trait] -impl<'a, Types, T, State, const ARITY: usize> UpdateStateData - for Transaction<'a, Types, T> -where - Types: NodeType, - State: MerklizedState, - T: UpdateStateData + Send + Sync, -{ - async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> { - self.inner.set_last_state_height(height).await - } - - async fn insert_merkle_nodes( - &mut self, - path: MerkleProof, - traversal_path: Vec, - block_number: u64, - ) -> anyhow::Result<()> { - self.inner - .insert_merkle_nodes(path, traversal_path, block_number) - .await - } -} - -#[async_trait] -impl<'a, Types, T> UpdateAvailabilityData for Transaction<'a, Types, T> -where - Types: NodeType, - Payload: QueryablePayload, - T: UpdateAvailabilityData + Send + Sync, -{ - async fn insert_leaf(&mut self, leaf: LeafQueryData) -> anyhow::Result<()> { - // Store the new leaf. - self.inner.insert_leaf(leaf.clone()).await?; - // Queue a notification about the newly received leaf. - self.inserted_leaves.push(leaf); - Ok(()) - } - - async fn insert_block(&mut self, block: BlockQueryData) -> anyhow::Result<()> { - // Store the new block. - self.inner.insert_block(block.clone()).await?; - // Queue a notification about the newly received block. - self.inserted_blocks.push(block); - Ok(()) - } - - async fn insert_vid( - &mut self, - common: VidCommonQueryData, - share: Option, - ) -> anyhow::Result<()> { - // Store the new VID. - self.inner.insert_vid(common.clone(), share).await?; - // Queue a notification about the newly received VID. - self.inserted_vid.push(common); - Ok(()) - } -} - -/// [`Transaction`] implements [`AvailabilityStorage`], not the richer -/// [`AvailabilityDataSource`](crate::availability::AvailabilityDataSource). -/// -/// Privding the full [`AvailabilityDataSource`](crate::availability::AvailabilityDataSource) -/// interface through a transaction would be ill advised, because read operations through this -/// interface trigger side effects (fetches) that may not be rolled back if the transaction is -/// rolled back, and may also compete for resources being used by the transaction itself. Thus, we -/// only provide [`AvailabilityStorage`], which returns errors if data is not available instead of -/// fetching. -#[async_trait] -impl<'a, Types, T> AvailabilityStorage for Transaction<'a, Types, T> -where - Types: NodeType, - Payload: QueryablePayload, - T: AvailabilityStorage, -{ - async fn get_leaf(&mut self, id: LeafId) -> QueryResult> { - self.inner.get_leaf(id).await - } - - async fn get_block(&mut self, id: BlockId) -> QueryResult> { - self.inner.get_block(id).await - } - - async fn get_header(&mut self, id: BlockId) -> QueryResult> { - self.inner.get_header(id).await - } - - async fn get_payload(&mut self, id: BlockId) -> QueryResult> { - self.inner.get_payload(id).await - } - - async fn get_vid_common( - &mut self, - id: BlockId, - ) -> QueryResult> { - self.inner.get_vid_common(id).await - } - - async fn get_leaf_range( - &mut self, - range: R, - ) -> QueryResult>>> - where - R: RangeBounds + Send + 'static, - { - self.inner.get_leaf_range(range).await - } - - async fn get_block_range( - &mut self, - range: R, - ) -> QueryResult>>> - where - R: RangeBounds + Send + 'static, - { - self.inner.get_block_range(range).await - } - - async fn get_payload_range( - &mut self, - range: R, - ) -> QueryResult>>> - where - R: RangeBounds + Send + 'static, - { - self.inner.get_payload_range(range).await - } - - async fn get_vid_common_range( - &mut self, - range: R, - ) -> QueryResult>>> - where - R: RangeBounds + Send + 'static, - { - self.inner.get_vid_common_range(range).await - } - - async fn get_transaction( - &mut self, - hash: TransactionHash, - ) -> QueryResult> { - self.inner.get_transaction(hash).await - } -} - -#[async_trait] -impl<'a, Types, T> NodeStorage for Transaction<'a, Types, T> -where - Types: NodeType, - T: NodeStorage + Send, -{ - async fn block_height(&mut self) -> QueryResult { - self.inner.block_height().await - } - - async fn count_transactions(&mut self) -> QueryResult { - self.inner.count_transactions().await - } - - async fn payload_size(&mut self) -> QueryResult { - self.inner.payload_size().await - } - - async fn vid_share(&mut self, id: ID) -> QueryResult - where - ID: Into> + Send + Sync, - { - self.inner.vid_share(id).await - } - - async fn sync_status(&mut self) -> QueryResult { - self.inner.sync_status().await - } - - async fn get_header_window( - &mut self, - start: impl Into> + Send + Sync, - end: u64, - ) -> QueryResult>> { - self.inner.get_header_window(start, end).await - } -} - -#[async_trait] -impl<'a, Types, T> ExplorerStorage for Transaction<'a, Types, T> -where - Types: NodeType, - Payload: QueryablePayload, - Header: QueryableHeader + explorer::traits::ExplorerHeader, - crate::Transaction: explorer::traits::ExplorerTransaction, - T: ExplorerStorage + Send, -{ - async fn get_block_summaries( - &mut self, - request: explorer::query_data::GetBlockSummariesRequest, - ) -> Result< - Vec>, - explorer::query_data::GetBlockSummariesError, - > { - self.as_mut().get_block_summaries(request).await - } - - async fn get_block_detail( - &mut self, - request: explorer::query_data::BlockIdentifier, - ) -> Result, explorer::query_data::GetBlockDetailError> - { - self.as_mut().get_block_detail(request).await - } - - async fn get_transaction_summaries( - &mut self, - request: explorer::query_data::GetTransactionSummariesRequest, - ) -> Result< - Vec>, - explorer::query_data::GetTransactionSummariesError, - > { - self.as_mut().get_transaction_summaries(request).await - } - - async fn get_transaction_detail( - &mut self, - request: explorer::query_data::TransactionIdentifier, - ) -> Result< - explorer::query_data::TransactionDetailResponse, - explorer::query_data::GetTransactionDetailError, - > { - self.as_mut().get_transaction_detail(request).await - } - - async fn get_explorer_summary( - &mut self, - ) -> Result< - explorer::query_data::ExplorerSummary, - explorer::query_data::GetExplorerSummaryError, - > { - self.as_mut().get_explorer_summary().await - } - - async fn get_search_results( - &mut self, - query: TaggedBase64, - ) -> Result< - explorer::query_data::SearchResult, - explorer::query_data::GetSearchResultsError, - > { - self.as_mut().get_search_results(query).await - } -} - -#[derive(Debug)] -pub(super) struct Notifiers -where - Types: NodeType, -{ - pub(super) block: Notifier>, - pub(super) leaf: Notifier>, - pub(super) vid_common: Notifier>, -} - -impl Default for Notifiers -where - Types: NodeType, -{ - fn default() -> Self { - Self { - block: Notifier::new(), - leaf: Notifier::new(), - vid_common: Notifier::new(), - } - } -} - -#[derive(Clone, Copy, Debug)] -pub(super) struct Heights { - pub(super) height: u64, - pub(super) pruned_height: Option, -} - -impl Heights { - pub(super) fn might_exist(self, h: u64) -> bool { - h < self.height && self.pruned_height.map_or(true, |ph| h > ph) - } -} diff --git a/src/data_source/fetching/transaction.rs b/src/data_source/fetching/transaction.rs index b7ab5df0e..9643fa1d9 100644 --- a/src/data_source/fetching/transaction.rs +++ b/src/data_source/fetching/transaction.rs @@ -14,10 +14,11 @@ use super::{AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Notifiers}; use crate::{ - availability::{ - QueryablePayload, TransactionHash, TransactionQueryData, UpdateAvailabilityData, + availability::{QueryablePayload, TransactionHash, TransactionQueryData}, + data_source::{ + storage::{AvailabilityStorage, UpdateAvailabilityStorage}, + update::VersionedDataSource, }, - data_source::{storage::AvailabilityStorage, update::VersionedDataSource}, Payload, QueryResult, }; use async_std::sync::Arc; @@ -67,7 +68,7 @@ where req: Self::Request, ) where S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { // We don't actively fetch transactions, because without a satisfying block payload, we have diff --git a/src/data_source/fetching/vid.rs b/src/data_source/fetching/vid.rs index 64d6a5a80..21bdaf86b 100644 --- a/src/data_source/fetching/vid.rs +++ b/src/data_source/fetching/vid.rs @@ -14,15 +14,18 @@ use super::{ header::{fetch_header_and_then, HeaderCallback}, - AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, NotifyStorage, - RangedFetchable, + AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable, + Storable, }; use crate::{ - availability::{BlockId, QueryablePayload, UpdateAvailabilityData, VidCommonQueryData}, - data_source::{storage::AvailabilityStorage, update::Transaction, VersionedDataSource}, + availability::{BlockId, QueryablePayload, VidCommonQueryData}, + data_source::{ + storage::{AvailabilityStorage, UpdateAvailabilityStorage}, + VersionedDataSource, + }, fetching::{self, request, Callback}, types::HeightIndexed, - Header, Payload, QueryResult, VidCommon, + Header, Payload, QueryResult, VidCommon, VidShare, }; use async_std::sync::Arc; use async_trait::async_trait; @@ -87,7 +90,7 @@ where req: Self::Request, ) where S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { fetch_header_and_then( @@ -125,6 +128,46 @@ where } } +impl Storable for VidCommonQueryData +where + Types: NodeType, +{ + fn name() -> &'static str { + "VID common" + } + + async fn notify(&self, notifiers: &Notifiers) { + notifiers.vid_common.notify(self).await; + } + + async fn store( + self, + storage: &mut (impl UpdateAvailabilityStorage + Send), + ) -> anyhow::Result<()> { + storage.insert_vid(self, None).await + } +} + +impl Storable for (VidCommonQueryData, Option) +where + Types: NodeType, +{ + fn name() -> &'static str { + "VID data" + } + + async fn notify(&self, notifiers: &Notifiers) { + notifiers.vid_common.notify(&self.0).await; + } + + async fn store( + self, + storage: &mut (impl UpdateAvailabilityStorage + Send), + ) -> anyhow::Result<()> { + storage.insert_vid(self.0, self.1).await + } +} + pub(super) fn fetch_vid_common_with_header( fetcher: Arc>, header: Header, @@ -132,7 +175,7 @@ pub(super) fn fetch_vid_common_with_header( Types: NodeType, Payload: QueryablePayload, S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { // Now that we have the header, we only need to retrieve the VID common data. @@ -151,22 +194,6 @@ pub(super) fn fetch_vid_common_with_header( ); } -async fn store_vid_common( - storage: &NotifyStorage, - common: VidCommonQueryData, -) -> anyhow::Result<()> -where - Types: NodeType, - Payload: QueryablePayload, - S: VersionedDataSource, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, -{ - let mut tx = storage.write().await?; - tx.insert_vid(common, None).await?; - tx.commit().await?; - Ok(()) -} - #[derive(Derivative)] #[derivative(Debug(bound = ""))] pub(super) struct VidCommonCallback { @@ -199,22 +226,12 @@ impl Callback for VidCommonCallback: QueryablePayload, S: VersionedDataSource + 'static, - for<'a> S::Transaction<'a>: UpdateAvailabilityData, + for<'a> S::Transaction<'a>: UpdateAvailabilityStorage, P: AvailabilityProvider, { async fn run(self, common: VidCommon) { tracing::info!("fetched VID common {:?}", self.header.payload_commitment()); let common = VidCommonQueryData::new(self.header, common); - let height = common.height(); - - // Store the data in local storage, so we can avoid fetching it in the future. - { - if let Err(err) = store_vid_common(&self.fetcher.storage, common).await { - // It is unfortunate if this fails, but we can still proceed by returning - // the block that we fetched, keeping it in memory. Simply log the error and - // move on. - tracing::warn!("failed to store fetched VID common {height}: {err}"); - } - } + self.fetcher.store_and_notify(common).await; } } diff --git a/src/data_source/fs.rs b/src/data_source/fs.rs index 9cea5618f..9d16aeb75 100644 --- a/src/data_source/fs.rs +++ b/src/data_source/fs.rs @@ -138,11 +138,13 @@ pub use super::storage::fs::Transaction; /// let mut events = hotshot.event_stream(); /// while let Some(event) = events.next().await { /// let mut state = state.write().await; -/// let mut tx = state.hotshot_qs.write().await.unwrap(); -/// tx.update(&event).await.unwrap(); -/// // Update other modules' states based on `event`. +/// state.hotshot_qs.update(&event).await; /// +/// // Update other modules' states based on `event`. +/// let mut tx = state.hotshot_qs.write().await.unwrap(); +/// // Do updates /// tx.commit().await.unwrap(); +/// /// // Commit or skip versions for other modules' storage. /// state.store.commit_version().unwrap(); /// } @@ -237,7 +239,7 @@ where mod impl_testable_data_source { use super::*; use crate::{ - data_source::{Transaction, UpdateDataSource, VersionedDataSource}, + data_source::UpdateDataSource, testing::{consensus::DataSourceLifeCycle, mocks::MockTypes}, }; use async_trait::async_trait; @@ -267,9 +269,7 @@ mod impl_testable_data_source { } async fn handle_event(&self, event: &Event) { - let mut tx = self.write().await.unwrap(); - tx.update(event).await.unwrap(); - tx.commit().await.unwrap(); + self.update(event).await; } } } diff --git a/src/data_source/sql.rs b/src/data_source/sql.rs index 8960e15d4..44ef0739b 100644 --- a/src/data_source/sql.rs +++ b/src/data_source/sql.rs @@ -225,15 +225,7 @@ impl Config { /// create an aggregate struct containing both [`SqlDataSource`] and your additional module /// states, as described in the [composition guide](crate#composition). If the additional modules /// have data that should live in the same database as the [`SqlDataSource`] data, you can follow -/// the steps in [custom migrations](#custom-migrations) to accomodate this. When modifying that -/// data, you can use a [`Transaction`] to atomically synchronize updates to the other modules' data -/// with updates to the [`SqlDataSource`]. If the additional data is completely independent of -/// HotShot query service data and does not need to be synchronized, you can also connect to the -/// database directly to make updates. -/// -/// In the following example, we compose HotShot query service modules with other application- -/// specific modules, synchronizing updates using SQL transactions via -/// [`write`](super::VersionedDataSource::write). +/// the steps in [custom migrations](#custom-migrations) to accomodate this. /// /// ``` /// # use async_std::{sync::Arc, task::spawn}; @@ -275,14 +267,10 @@ impl Config { /// spawn(async move { /// let mut events = hotshot.event_stream(); /// while let Some(event) = events.next().await { -/// let mut tx = state.hotshot_qs.write().await.unwrap(); -/// UpdateDataSource::::update(&mut tx, &event) -/// .await -/// .unwrap(); -/// // Update other modules' states based on `event`. Use `tx` to include database -/// // updates in the same atomic transaction as `update`. +/// state.hotshot_qs.update(&event).await; /// -/// // Commit all outstanding changes to the entire state at the same time. +/// let mut tx = state.hotshot_qs.write().await.unwrap(); +/// // Update other modules' states based on `event`, using `tx` to access the database. /// tx.commit().await.unwrap(); /// } /// }); @@ -314,7 +302,7 @@ where pub mod testing { use super::*; use crate::{ - data_source::{Transaction, UpdateDataSource, VersionedDataSource}, + data_source::UpdateDataSource, testing::{consensus::DataSourceLifeCycle, mocks::MockTypes}, }; use async_trait::async_trait; @@ -346,9 +334,7 @@ pub mod testing { } async fn handle_event(&self, event: &Event) { - let mut tx = self.write().await.unwrap(); - tx.update(event).await.unwrap(); - tx.commit().await.unwrap(); + self.update(event).await; } } } @@ -371,9 +357,13 @@ mod test { use super::*; use crate::{ availability::{ - AvailabilityDataSource, LeafQueryData, UpdateAvailabilityData, VidCommonQueryData, + AvailabilityDataSource, BlockInfo, LeafQueryData, UpdateAvailabilityData, + VidCommonQueryData, + }, + data_source::{ + storage::{NodeStorage, UpdateAvailabilityStorage}, + Transaction, VersionedDataSource, }, - data_source::{storage::NodeStorage, Transaction, VersionedDataSource}, fetching::provider::NoFetching, testing::{consensus::DataSourceLifeCycle, mocks::MockTypes, setup_test}, }; @@ -404,13 +394,14 @@ mod test { ) .await; let common = VidCommonQueryData::new(leaf.header().clone(), disperse.common); - let mut tx = ds.write().await.unwrap(); - tx.insert_leaf(leaf).await.unwrap(); - tx.insert_vid(common.clone(), None).await.unwrap(); - tx.commit().await.unwrap(); + ds.append(BlockInfo::new(leaf, None, Some(common.clone()), None)) + .await + .unwrap(); assert_eq!(ds.get_vid_common(0).await.await, common); - ds.read().await.unwrap().vid_share(0).await.unwrap_err(); + NodeStorage::::vid_share(&mut ds.read().await.unwrap(), 0) + .await + .unwrap_err(); // Re-insert the common data with the share. let mut tx = ds.write().await.unwrap(); @@ -420,7 +411,9 @@ mod test { tx.commit().await.unwrap(); assert_eq!(ds.get_vid_common(0).await.await, common); assert_eq!( - ds.read().await.unwrap().vid_share(0).await.unwrap(), + NodeStorage::::vid_share(&mut ds.read().await.unwrap(), 0) + .await + .unwrap(), disperse.shares[0] ); } diff --git a/src/data_source/storage.rs b/src/data_source/storage.rs index d7fce9d58..cb3f9cee2 100644 --- a/src/data_source/storage.rs +++ b/src/data_source/storage.rs @@ -77,17 +77,21 @@ use crate::{ Header, Payload, QueryResult, Transaction, VidShare, }; use async_trait::async_trait; +use futures::future::Future; use hotshot_types::traits::node_implementation::NodeType; use jf_merkle_tree::prelude::MerkleProof; use std::ops::RangeBounds; use tagged_base64::TaggedBase64; +pub mod fail_storage; pub mod fs; mod ledger_log; pub mod no_storage; pub mod pruning; pub mod sql; +#[cfg(any(test, feature = "testing"))] +pub use fail_storage::FailStorage; #[cfg(feature = "file-system-data-source")] pub use fs::FileSystemStorage; #[cfg(feature = "no-storage")] @@ -155,6 +159,25 @@ where ) -> QueryResult>; } +pub trait UpdateAvailabilityStorage +where + Types: NodeType, +{ + fn insert_leaf( + &mut self, + leaf: LeafQueryData, + ) -> impl Send + Future>; + fn insert_block( + &mut self, + block: BlockQueryData, + ) -> impl Send + Future>; + fn insert_vid( + &mut self, + common: VidCommonQueryData, + share: Option, + ) -> impl Send + Future>; +} + #[async_trait] pub trait NodeStorage { async fn block_height(&mut self) -> QueryResult; diff --git a/src/data_source/storage/fail_storage.rs b/src/data_source/storage/fail_storage.rs new file mode 100644 index 000000000..f1ef2331d --- /dev/null +++ b/src/data_source/storage/fail_storage.rs @@ -0,0 +1,432 @@ +// Copyright (c) 2022 Espresso Systems (espressosys.com) +// This file is part of the HotShot Query Service library. +// +// This program is free software: you can redistribute it and/or modify it under the terms of the GNU +// General Public License as published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without +// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Public License for more details. +// You should have received a copy of the GNU General Public License along with this program. If not, +// see . + +#![cfg(any(test, feature = "testing"))] + +use super::{ + pruning::{PruneStorage, PrunedHeightStorage, PrunerCfg, PrunerConfig}, + AvailabilityStorage, NodeStorage, +}; +use crate::{ + availability::{ + BlockId, BlockQueryData, LeafId, LeafQueryData, PayloadQueryData, QueryablePayload, + TransactionHash, TransactionQueryData, VidCommonQueryData, + }, + data_source::{storage::UpdateAvailabilityStorage, update, VersionedDataSource}, + metrics::PrometheusMetrics, + node::{SyncStatus, TimeWindowQueryData, WindowStart}, + status::HasMetrics, + Header, Payload, QueryError, QueryResult, VidShare, +}; +use async_std::sync::{Arc, Mutex}; +use async_trait::async_trait; +use futures::future::Future; +use hotshot_types::traits::node_implementation::NodeType; +use std::ops::RangeBounds; + +#[derive(Clone, Copy, Debug, Default)] +enum FailureMode { + #[default] + Never, + Once, + Always, +} + +impl FailureMode { + fn maybe_fail(&mut self) -> QueryResult<()> { + match self { + Self::Never => return Ok(()), + Self::Once => { + *self = Self::Never; + } + Self::Always => {} + } + + Err(QueryError::Error { + message: "injected error".into(), + }) + } +} + +#[derive(Debug, Default)] +struct Failer { + on_read: FailureMode, + on_write: FailureMode, + on_commit: FailureMode, + on_begin_writable: FailureMode, + on_begin_read_only: FailureMode, +} + +/// Storage wrapper for error injection. +#[derive(Clone, Debug)] +pub struct FailStorage { + inner: S, + failer: Arc>, +} + +impl From for FailStorage { + fn from(inner: S) -> Self { + Self { + inner, + failer: Default::default(), + } + } +} + +impl FailStorage { + pub async fn fail_reads(&self) { + self.failer.lock().await.on_read = FailureMode::Always; + } + + pub async fn fail_writes(&self) { + self.failer.lock().await.on_write = FailureMode::Always; + } + + pub async fn fail_commits(&self) { + self.failer.lock().await.on_commit = FailureMode::Always; + } + + pub async fn fail_begins_writable(&self) { + self.failer.lock().await.on_begin_writable = FailureMode::Always; + } + + pub async fn fail_begins_read_only(&self) { + self.failer.lock().await.on_begin_read_only = FailureMode::Always; + } + + pub async fn fail(&self) { + let mut failer = self.failer.lock().await; + failer.on_read = FailureMode::Always; + failer.on_write = FailureMode::Always; + failer.on_commit = FailureMode::Always; + failer.on_begin_writable = FailureMode::Always; + failer.on_begin_read_only = FailureMode::Always; + } + + pub async fn pass_reads(&self) { + self.failer.lock().await.on_read = FailureMode::Never; + } + + pub async fn pass_writes(&self) { + self.failer.lock().await.on_write = FailureMode::Never; + } + + pub async fn pass_commits(&self) { + self.failer.lock().await.on_commit = FailureMode::Never; + } + + pub async fn pass_begins_writable(&self) { + self.failer.lock().await.on_begin_writable = FailureMode::Never; + } + + pub async fn pass_begins_read_only(&self) { + self.failer.lock().await.on_begin_read_only = FailureMode::Never; + } + + pub async fn pass(&self) { + let mut failer = self.failer.lock().await; + failer.on_read = FailureMode::Never; + failer.on_write = FailureMode::Never; + failer.on_commit = FailureMode::Never; + failer.on_begin_writable = FailureMode::Never; + failer.on_begin_read_only = FailureMode::Never; + } + + pub async fn fail_one_read(&self) { + self.failer.lock().await.on_read = FailureMode::Once; + } + + pub async fn fail_one_write(&self) { + self.failer.lock().await.on_write = FailureMode::Once; + } + + pub async fn fail_one_commit(&self) { + self.failer.lock().await.on_commit = FailureMode::Once; + } + + pub async fn fail_one_begin_writable(&self) { + self.failer.lock().await.on_begin_writable = FailureMode::Once; + } + + pub async fn fail_one_begin_read_only(&self) { + self.failer.lock().await.on_begin_read_only = FailureMode::Once; + } +} + +impl VersionedDataSource for FailStorage +where + S: VersionedDataSource, +{ + type Transaction<'a> = Transaction> + where + Self: 'a; + type ReadOnly<'a> = Transaction> + where + Self: 'a; + + async fn write(&self) -> anyhow::Result<::Transaction<'_>> { + self.failer.lock().await.on_begin_writable.maybe_fail()?; + Ok(Transaction { + inner: self.inner.write().await?, + failer: self.failer.clone(), + }) + } + + async fn read(&self) -> anyhow::Result<::ReadOnly<'_>> { + self.failer.lock().await.on_begin_read_only.maybe_fail()?; + Ok(Transaction { + inner: self.inner.read().await?, + failer: self.failer.clone(), + }) + } +} + +impl PrunerConfig for FailStorage +where + S: PrunerConfig, +{ + fn set_pruning_config(&mut self, cfg: PrunerCfg) { + self.inner.set_pruning_config(cfg); + } + + fn get_pruning_config(&self) -> Option { + self.inner.get_pruning_config() + } +} + +#[async_trait] +impl PruneStorage for FailStorage +where + S: PruneStorage + Sync, +{ + type Pruner = S::Pruner; + + async fn get_disk_usage(&self) -> anyhow::Result { + self.inner.get_disk_usage().await + } + + async fn prune(&self, pruner: &mut Self::Pruner) -> anyhow::Result> { + self.inner.prune(pruner).await + } +} + +impl HasMetrics for FailStorage +where + S: HasMetrics, +{ + fn metrics(&self) -> &PrometheusMetrics { + self.inner.metrics() + } +} + +#[derive(Debug)] +pub struct Transaction { + inner: T, + failer: Arc>, +} + +impl Transaction { + async fn maybe_fail_read(&self) -> QueryResult<()> { + self.failer.lock().await.on_read.maybe_fail() + } + + async fn maybe_fail_write(&self) -> QueryResult<()> { + self.failer.lock().await.on_write.maybe_fail() + } + + async fn maybe_fail_commit(&self) -> QueryResult<()> { + self.failer.lock().await.on_commit.maybe_fail() + } +} + +impl update::Transaction for Transaction +where + T: update::Transaction, +{ + async fn commit(self) -> anyhow::Result<()> { + self.maybe_fail_commit().await?; + self.inner.commit().await + } + + fn revert(self) -> impl Future + Send { + self.inner.revert() + } +} + +#[async_trait] +impl AvailabilityStorage for Transaction +where + Types: NodeType, + Payload: QueryablePayload, + T: AvailabilityStorage, +{ + async fn get_leaf(&mut self, id: LeafId) -> QueryResult> { + self.maybe_fail_read().await?; + self.inner.get_leaf(id).await + } + + async fn get_block(&mut self, id: BlockId) -> QueryResult> { + self.maybe_fail_read().await?; + self.inner.get_block(id).await + } + + async fn get_header(&mut self, id: BlockId) -> QueryResult> { + self.maybe_fail_read().await?; + self.inner.get_header(id).await + } + + async fn get_payload(&mut self, id: BlockId) -> QueryResult> { + self.maybe_fail_read().await?; + self.inner.get_payload(id).await + } + + async fn get_vid_common( + &mut self, + id: BlockId, + ) -> QueryResult> { + self.maybe_fail_read().await?; + self.inner.get_vid_common(id).await + } + + async fn get_leaf_range( + &mut self, + range: R, + ) -> QueryResult>>> + where + R: RangeBounds + Send + 'static, + { + self.maybe_fail_read().await?; + self.inner.get_leaf_range(range).await + } + + async fn get_block_range( + &mut self, + range: R, + ) -> QueryResult>>> + where + R: RangeBounds + Send + 'static, + { + self.maybe_fail_read().await?; + self.inner.get_block_range(range).await + } + + async fn get_payload_range( + &mut self, + range: R, + ) -> QueryResult>>> + where + R: RangeBounds + Send + 'static, + { + self.maybe_fail_read().await?; + self.inner.get_payload_range(range).await + } + + async fn get_vid_common_range( + &mut self, + range: R, + ) -> QueryResult>>> + where + R: RangeBounds + Send + 'static, + { + self.maybe_fail_read().await?; + self.inner.get_vid_common_range(range).await + } + + async fn get_transaction( + &mut self, + hash: TransactionHash, + ) -> QueryResult> { + self.maybe_fail_read().await?; + self.inner.get_transaction(hash).await + } +} + +impl UpdateAvailabilityStorage for Transaction +where + Types: NodeType, + Payload: QueryablePayload, + T: UpdateAvailabilityStorage + Send + Sync, +{ + async fn insert_leaf(&mut self, leaf: LeafQueryData) -> anyhow::Result<()> { + self.maybe_fail_write().await?; + self.inner.insert_leaf(leaf).await + } + + async fn insert_block(&mut self, block: BlockQueryData) -> anyhow::Result<()> { + self.maybe_fail_write().await?; + self.inner.insert_block(block).await + } + + async fn insert_vid( + &mut self, + common: VidCommonQueryData, + share: Option, + ) -> anyhow::Result<()> { + self.maybe_fail_write().await?; + self.inner.insert_vid(common, share).await + } +} + +#[async_trait] +impl PrunedHeightStorage for Transaction +where + T: PrunedHeightStorage + Send + Sync, +{ + async fn load_pruned_height(&mut self) -> anyhow::Result> { + self.maybe_fail_read().await?; + self.inner.load_pruned_height().await + } +} + +#[async_trait] +impl NodeStorage for Transaction +where + Types: NodeType, + T: NodeStorage + Send + Sync, +{ + async fn block_height(&mut self) -> QueryResult { + self.maybe_fail_read().await?; + self.inner.block_height().await + } + + async fn count_transactions(&mut self) -> QueryResult { + self.maybe_fail_read().await?; + self.inner.count_transactions().await + } + + async fn payload_size(&mut self) -> QueryResult { + self.maybe_fail_read().await?; + self.inner.payload_size().await + } + + async fn vid_share(&mut self, id: ID) -> QueryResult + where + ID: Into> + Send + Sync, + { + self.maybe_fail_read().await?; + self.inner.vid_share(id).await + } + + async fn sync_status(&mut self) -> QueryResult { + self.maybe_fail_read().await?; + self.inner.sync_status().await + } + + async fn get_header_window( + &mut self, + start: impl Into> + Send + Sync, + end: u64, + ) -> QueryResult>> { + self.maybe_fail_read().await?; + self.inner.get_header_window(start, end).await + } +} diff --git a/src/data_source/storage/fs.rs b/src/data_source/storage/fs.rs index e858afb9a..f1468fa3c 100644 --- a/src/data_source/storage/fs.rs +++ b/src/data_source/storage/fs.rs @@ -15,12 +15,12 @@ use super::{ ledger_log::{Iter, LedgerLog}, pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig}, - AvailabilityStorage, NodeStorage, + AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage, }; use crate::{ availability::{ - data_source::{BlockId, LeafId, UpdateAvailabilityData}, + data_source::{BlockId, LeafId}, query_data::{ BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash, TransactionQueryData, VidCommonQueryData, @@ -529,8 +529,7 @@ where } } -#[async_trait] -impl<'a, Types: NodeType> UpdateAvailabilityData +impl<'a, Types: NodeType> UpdateAvailabilityStorage for Transaction>> where Payload: QueryablePayload, diff --git a/src/data_source/storage/no_storage.rs b/src/data_source/storage/no_storage.rs index e8a881cbc..9a01df4b0 100644 --- a/src/data_source/storage/no_storage.rs +++ b/src/data_source/storage/no_storage.rs @@ -19,9 +19,9 @@ use super::{ use crate::{ availability::{ BlockId, BlockQueryData, LeafId, LeafQueryData, PayloadQueryData, QueryablePayload, - TransactionHash, TransactionQueryData, UpdateAvailabilityData, VidCommonQueryData, + TransactionHash, TransactionQueryData, VidCommonQueryData, }, - data_source::{update, VersionedDataSource}, + data_source::{storage::UpdateAvailabilityStorage, update, VersionedDataSource}, metrics::PrometheusMetrics, node::{SyncStatus, TimeWindowQueryData, WindowStart}, status::HasMetrics, @@ -178,8 +178,7 @@ where } } -#[async_trait] -impl<'a, Types: NodeType> UpdateAvailabilityData for Transaction<'a> +impl<'a, Types: NodeType> UpdateAvailabilityStorage for Transaction<'a> where Payload: QueryablePayload, { @@ -245,11 +244,11 @@ where pub mod testing { use super::*; use crate::{ - availability::{define_api, AvailabilityDataSource, Fetch}, + availability::{ + define_api, AvailabilityDataSource, BlockInfo, Fetch, UpdateAvailabilityData, + }, data_source::{ - fetching, storage::sql::{testing::TmpDb, SqlStorage}, - update::Transaction as _, FetchingDataSource, SqlDataSource, UpdateDataSource, }, fetching::provider::{NoFetching, QueryServiceProvider}, @@ -384,15 +383,13 @@ pub mod testing { } async fn handle_event(&self, event: &Event) { - let mut tx = self.write().await.unwrap(); - tx.update(event).await.unwrap(); - tx.commit().await.unwrap(); + self.update(event).await; } } pub enum Transaction<'a, T> { - Sql(fetching::Transaction<'a, MockTypes, T>), - NoStorage(fetching::Transaction<'a, MockTypes, super::Transaction<'a>>), + Sql(T), + NoStorage(super::Transaction<'a>), } // Now a lot of boilerplate to implement all teh traits for [`DataSource`], by dispatching each @@ -424,6 +421,15 @@ pub mod testing { } } + impl UpdateAvailabilityData for DataSource { + async fn append(&self, info: BlockInfo) -> anyhow::Result<()> { + match self { + Self::Sql(ds) => ds.append(info).await, + Self::NoStorage(ds) => ds.append(info).await, + } + } + } + impl<'a, T> update::Transaction for Transaction<'a, T> where T: update::Transaction, @@ -449,10 +455,9 @@ pub mod testing { } } - #[async_trait] - impl<'a, T> UpdateAvailabilityData for Transaction<'a, T> + impl<'a, T> UpdateAvailabilityStorage for Transaction<'a, T> where - T: UpdateAvailabilityData + Send + Sync, + T: UpdateAvailabilityStorage + Send + Sync, { async fn insert_leaf(&mut self, leaf: LeafQueryData) -> anyhow::Result<()> { match self { @@ -596,21 +601,23 @@ pub mod testing { async fn block_height(&mut self) -> QueryResult { match self { Transaction::Sql(tx) => tx.block_height().await, - Transaction::NoStorage(tx) => tx.block_height().await, + Transaction::NoStorage(tx) => NodeStorage::::block_height(tx).await, } } async fn count_transactions(&mut self) -> QueryResult { match self { Transaction::Sql(tx) => tx.count_transactions().await, - Transaction::NoStorage(tx) => tx.count_transactions().await, + Transaction::NoStorage(tx) => { + NodeStorage::::count_transactions(tx).await + } } } async fn payload_size(&mut self) -> QueryResult { match self { Transaction::Sql(tx) => tx.payload_size().await, - Transaction::NoStorage(tx) => tx.payload_size().await, + Transaction::NoStorage(tx) => NodeStorage::::payload_size(tx).await, } } @@ -620,14 +627,14 @@ pub mod testing { { match self { Transaction::Sql(tx) => tx.vid_share(id).await, - Transaction::NoStorage(tx) => tx.vid_share(id).await, + Transaction::NoStorage(tx) => NodeStorage::::vid_share(tx, id).await, } } async fn sync_status(&mut self) -> QueryResult { match self { Transaction::Sql(tx) => tx.sync_status().await, - Transaction::NoStorage(tx) => tx.sync_status().await, + Transaction::NoStorage(tx) => NodeStorage::::sync_status(tx).await, } } diff --git a/src/data_source/storage/sql.rs b/src/data_source/storage/sql.rs index 75920a442..0e4faddd9 100644 --- a/src/data_source/storage/sql.rs +++ b/src/data_source/storage/sql.rs @@ -921,8 +921,8 @@ mod test { use super::{testing::TmpDb, *}; use crate::{ - availability::{LeafQueryData, UpdateAvailabilityData}, - data_source::storage::pruning::PrunedHeightStorage, + availability::LeafQueryData, + data_source::storage::{pruning::PrunedHeightStorage, UpdateAvailabilityStorage}, testing::{mocks::MockTypes, setup_test}, }; diff --git a/src/data_source/storage/sql/transaction.rs b/src/data_source/storage/sql/transaction.rs index 472d80327..2c4838f1b 100644 --- a/src/data_source/storage/sql/transaction.rs +++ b/src/data_source/storage/sql/transaction.rs @@ -27,10 +27,12 @@ use super::{ }; use crate::{ availability::{ - BlockQueryData, LeafQueryData, QueryableHeader, QueryablePayload, UpdateAvailabilityData, - VidCommonQueryData, + BlockQueryData, LeafQueryData, QueryableHeader, QueryablePayload, VidCommonQueryData, + }, + data_source::{ + storage::{pruning::PrunedHeightStorage, UpdateAvailabilityStorage}, + update, }, - data_source::{storage::pruning::PrunedHeightStorage, update}, merklized_state::{MerklizedState, UpdateStateData}, types::HeightIndexed, Header, Payload, QueryError, VidShare, @@ -461,8 +463,7 @@ impl Transaction { } } -#[async_trait] -impl UpdateAvailabilityData for Transaction +impl UpdateAvailabilityStorage for Transaction where Types: NodeType, Payload: QueryablePayload, diff --git a/src/data_source/update.rs b/src/data_source/update.rs index cbd86c908..7f6af335a 100644 --- a/src/data_source/update.rs +++ b/src/data_source/update.rs @@ -13,11 +13,12 @@ //! A generic algorithm for updating a HotShot Query Service data source with new data. use crate::{ availability::{ - BlockQueryData, LeafQueryData, QueryablePayload, UpdateAvailabilityData, VidCommonQueryData, + BlockInfo, BlockQueryData, LeafQueryData, QueryablePayload, UpdateAvailabilityData, + VidCommonQueryData, }, - Leaf, Payload, + Leaf, Payload, VidShare, }; -use anyhow::Context; +use anyhow::{ensure, Context}; use async_trait::async_trait; use futures::future::Future; use hotshot::types::{Event, EventType}; @@ -55,17 +56,17 @@ pub trait UpdateDataSource: UpdateAvailabilityData { /// /// If you want to update the data source with an untrusted event, for example one received from /// a peer over the network, you must authenticate it first. - async fn update(&mut self, event: &Event) -> anyhow::Result<()>; + async fn update(&self, event: &Event); } #[async_trait] impl UpdateDataSource for T where - T: UpdateAvailabilityData + Send, + T: UpdateAvailabilityData + Send + Sync, Payload: QueryablePayload, ::InstanceState: Default, { - async fn update(&mut self, event: &Event) -> anyhow::Result<()> { + async fn update(&self, event: &Event) { if let EventType::Decide { leaf_chain, qc, .. } = &event.event { // `qc` justifies the first (most recent) leaf... let qcs = once((**qc).clone()) @@ -84,78 +85,86 @@ where }, ) in qcs.zip(leaf_chain.iter().rev()) { - let leaf_data = - LeafQueryData::new(leaf.clone(), qc.clone()).context("inconsistent leaf")?; - self.insert_leaf(leaf_data.clone()).await?; + let height = leaf.block_header().block_number(); + let leaf_data = match LeafQueryData::new(leaf.clone(), qc.clone()) { + Ok(leaf) => leaf, + Err(err) => { + tracing::error!( + height, + ?leaf, + ?qc, + "inconsistent leaf; cannot append leaf information: {err:#}" + ); + continue; + } + }; - if let Some(vid_share) = vid_share { - self.insert_vid( - VidCommonQueryData::new( + let block_data = leaf + .block_payload() + .map(|payload| BlockQueryData::new(leaf.block_header().clone(), payload)); + if block_data.is_none() { + tracing::info!(height, "block not available at decide"); + } + + let (vid_common, vid_share) = if let Some(vid_share) = vid_share { + ( + Some(VidCommonQueryData::new( leaf.block_header().clone(), vid_share.common.clone(), - ), + )), Some(vid_share.share.clone()), ) - .await?; } else if leaf.view_number().u64() == 0 { // HotShot does not run VID in consensus for the genesis block. In this case, // the block payload is guaranteed to always be empty, so VID isn't really // necessary. But for consistency, we will still store the VID dispersal data, // computing it ourselves based on the well-known genesis VID commitment. - store_genesis_vid(self, leaf).await; + match genesis_vid(leaf) { + Ok((common, share)) => (Some(common), Some(share)), + Err(err) => { + tracing::warn!("failed to compute genesis VID: {err:#}"); + (None, None) + } + } } else { - tracing::error!( - "VID info for block {} not available at decide", - leaf.block_header().block_number() - ); + (None, None) + }; + if vid_common.is_none() { + tracing::info!(height, "VID not available at decide"); } - if let Some(block) = leaf.block_payload() { - self.insert_block(BlockQueryData::new(leaf.block_header().clone(), block)) - .await?; - } else { - tracing::error!( - "block {} not available at decide", - leaf.block_header().block_number() - ); + if let Err(err) = self + .append(BlockInfo::new(leaf_data, block_data, vid_common, vid_share)) + .await + { + tracing::warn!(height, "failed to append leaf information: {err:#}"); } } } - Ok(()) } } -async fn store_genesis_vid( - storage: &mut impl UpdateAvailabilityData, +fn genesis_vid( leaf: &Leaf, -) where +) -> anyhow::Result<(VidCommonQueryData, VidShare)> +where ::InstanceState: Default, { let payload = Payload::::empty().0; let bytes = payload.encode(); - match vid_scheme(GENESIS_VID_NUM_STORAGE_NODES).disperse(bytes) { - Ok(disperse) if disperse.commit != leaf.block_header().payload_commitment() => { - tracing::error!( - computed = %disperse.commit, - header = %leaf.block_header().payload_commitment(), - "computed VID commit for genesis block does not match header", - ); - } - Ok(mut disperse) => { - if let Err(err) = storage - .insert_vid( - VidCommonQueryData::new(leaf.block_header().clone(), disperse.common), - Some(disperse.shares.remove(0)), - ) - .await - { - tracing::error!(%err, "unable to store genesis VID"); - } - } - Err(err) => { - tracing::error!(%err, "unable to compute VID dispersal for genesis block"); - } - } + let mut disperse = vid_scheme(GENESIS_VID_NUM_STORAGE_NODES) + .disperse(bytes) + .context("unable to compute VID dispersal for genesis block")?; + ensure!( + disperse.commit == leaf.block_header().payload_commitment(), + "computed VID commit {} for genesis block does not match header commit {}", + disperse.commit, + leaf.block_header().payload_commitment() + ); + Ok(( + VidCommonQueryData::new(leaf.block_header().clone(), disperse.common), + disperse.shares.remove(0), + )) } /// A data source with an atomic transaction-based synchronization interface. diff --git a/src/fetching.rs b/src/fetching.rs index 97fd98125..d96286221 100644 --- a/src/fetching.rs +++ b/src/fetching.rs @@ -26,9 +26,9 @@ use async_std::{ sync::{Arc, Mutex}, task::{sleep, spawn}, }; +use backoff::{backoff::Backoff, ExponentialBackoff}; use derivative::Derivative; use std::{ - cmp::min, collections::{hash_map::Entry, BTreeSet, HashMap}, fmt::Debug, time::Duration, @@ -40,25 +40,6 @@ pub mod request; pub use provider::Provider; pub use request::Request; -// The fastest we will retry failed requests. -const MIN_RETRY_DELAY: Duration = Duration::from_secs(2); -// Factor by which to increase the retry delay each failed request. -// -// Our backoff strategy is to start with a relatively quick retry delay, but back off very quickly -// until reaching a maximum delay. This lets us succeed quickly when there is a transient failure in -// the provider, while limiting spam/failed requests when the provider is down for a long time. -// -// Backoff also lets us set a longer maximum delay without affecting optimistic performance, further -// reducing spam. -const BACKOFF_FACTOR: u32 = 4; - -// The longest we will wait to retry failed requests. -// -// Since many of the issues we might encounter when fetching from a peer are of the kind that won't -// recover immediately, and since we may have many parallel requests for resources and don't want to -// spam our peers, and since backoff allows us to first try a few times with a faster delay, we can -// safely wait a long while before retrying failed requests. -const DEFAULT_RETRY_DELAY: Duration = Duration::from_secs(5 * 60); const DEFAULT_RATE_LIMIT: usize = 32; /// A callback to process the result of a request. @@ -84,7 +65,7 @@ pub trait LocalCallback: Debug + Ord { pub struct Fetcher { #[derivative(Debug = "ignore")] in_progress: Arc>>>, - retry_delay: Duration, + backoff: ExponentialBackoff, permit: Arc, } @@ -92,15 +73,15 @@ impl Default for Fetcher { fn default() -> Self { Self { in_progress: Default::default(), - retry_delay: DEFAULT_RETRY_DELAY, + backoff: Default::default(), permit: Arc::new(Semaphore::new(DEFAULT_RATE_LIMIT)), } } } impl Fetcher { - pub fn with_retry_delay(mut self, retry_delay: Duration) -> Self { - self.retry_delay = retry_delay; + pub fn with_backoff(mut self, backoff: ExponentialBackoff) -> Self { + self.backoff = backoff; self } @@ -140,7 +121,7 @@ impl Fetcher { { let in_progress = self.in_progress.clone(); let permit = self.permit.clone(); - let max_retry_delay = self.retry_delay; + let mut backoff = self.backoff.clone(); spawn(async move { tracing::info!("spawned active fetch for {req:?}"); @@ -166,7 +147,8 @@ impl Fetcher { } // Now we are responsible for fetching the object, reach out to the provider. - let mut delay = min(MIN_RETRY_DELAY, max_retry_delay); + backoff.reset(); + let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1)); let res = loop { // Acquire a permit from the semaphore to rate limit the number of concurrent fetch requests let permit = permit.acquire().await; @@ -189,10 +171,9 @@ impl Fetcher { drop(permit); sleep(delay).await; - // Try a few times with a short delay, on the off chance that the problem resolves - // quickly. Back off until we eventually reach the maximum delay, which should be - // pretty long. - delay = min(delay * BACKOFF_FACTOR, max_retry_delay); + if let Some(next_delay) = backoff.next_backoff() { + delay = next_delay; + } }; // Done fetching, remove our lock on the object and execute all callbacks. diff --git a/src/fetching/provider/any.rs b/src/fetching/provider/any.rs index 3d0ccf580..209d542e0 100644 --- a/src/fetching/provider/any.rs +++ b/src/fetching/provider/any.rs @@ -204,7 +204,7 @@ mod test { use super::*; use crate::{ availability::{define_api, AvailabilityDataSource, UpdateAvailabilityData}, - data_source::{storage::sql::testing::TmpDb, Transaction, VersionedDataSource}, + data_source::storage::sql::testing::TmpDb, fetching::provider::{NoFetching, QueryServiceProvider}, task::BackgroundTask, testing::{ @@ -265,11 +265,10 @@ mod test { // Give the node a leaf after the range of interest so it learns about the correct block // height. - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(leaves.last().cloned().unwrap()) + data_source + .append(leaves.last().cloned().unwrap().into()) .await .unwrap(); - tx.commit().await.unwrap(); tracing::info!("requesting leaf from multiple providers"); let leaf = data_source diff --git a/src/fetching/provider/query_service.rs b/src/fetching/provider/query_service.rs index 15525228d..2e57a2cc1 100644 --- a/src/fetching/provider/query_service.rs +++ b/src/fetching/provider/query_service.rs @@ -165,15 +165,17 @@ mod test { use crate::{ api::load_api, availability::{ - define_api, AvailabilityDataSource, Fetch, TransactionQueryData, UpdateAvailabilityData, + define_api, AvailabilityDataSource, BlockId, BlockInfo, Fetch, TransactionQueryData, + UpdateAvailabilityData, }, data_source::{ sql::{self, SqlDataSource}, storage::{ pruning::{PrunedHeightStorage, PrunerCfg}, sql::testing::TmpDb, + AvailabilityStorage, FailStorage, SqlStorage, UpdateAvailabilityStorage, }, - AvailabilityProvider, Transaction, VersionedDataSource, + AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource, }, fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider}, node::{data_source::NodeDataSource, SyncStatus}, @@ -338,11 +340,10 @@ mod test { // Now we will actually fetch the missing data. First, since our node is not really // connected to consensus, we need to give it a leaf after the range of interest so it // learns about the correct block height. - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(leaves.last().cloned().unwrap()) + data_source + .append(leaves.last().cloned().unwrap().into()) .await .unwrap(); - tx.commit().await.unwrap(); // Block requests to the provider so that we can verify that without the provider, the node // does _not_ get the data. @@ -487,9 +488,7 @@ mod test { let test_leaf = &leaves[0]; // Tell the node about a leaf after the one of interest so it learns about the block height. - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(leaves[1].clone()).await.unwrap(); - tx.commit().await.unwrap(); + data_source.append(leaves[1].clone().into()).await.unwrap(); // Fetch a leaf and the corresponding block at the same time. This will result in two tasks // trying to fetch the same leaf, but one should win and notify the other, which ultimately @@ -547,11 +546,10 @@ mod test { // Tell the node about a leaf after the range of interest so it learns about the block // height. - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(leaves.last().cloned().unwrap()) + data_source + .append(leaves.last().cloned().unwrap().into()) .await .unwrap(); - tx.commit().await.unwrap(); // All the blocks here are empty, so they have the same payload: assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash()); @@ -614,11 +612,10 @@ mod test { // Tell the node about a leaf after the range of interest so it learns about the block // height. - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(finalized_leaves.last().cloned().unwrap()) + data_source + .append(finalized_leaves.last().cloned().unwrap().into()) .await .unwrap(); - tx.commit().await.unwrap(); // Check the subscriptions. let blocks = blocks.take(5).collect::>().await; @@ -735,12 +732,10 @@ mod test { let leaf = leaves.next().await.unwrap(); let block = blocks.next().await.unwrap(); - { - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(leaf).await.unwrap(); - tx.insert_block(block.clone()).await.unwrap(); - tx.commit().await.unwrap(); - } + data_source + .append(BlockInfo::new(leaf, Some(block.clone()), None, None)) + .await + .unwrap(); if block.transaction_by_hash(tx.commit()).is_some() { break block; @@ -789,7 +784,7 @@ mod test { )); let data_source = builder(&db, &provider) .await - .with_retry_delay(Duration::from_secs(1)) + .with_max_retry_interval(Duration::from_secs(1)) .build() .await .unwrap(); @@ -805,11 +800,10 @@ mod test { // Give the node a leaf after the range of interest so it learns about the correct block // height. - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(leaves.last().cloned().unwrap()) + data_source + .append(leaves.last().cloned().unwrap().into()) .await .unwrap(); - tx.commit().await.unwrap(); // Cause requests to fail temporarily, so we can test retries. provider.fail(); @@ -960,7 +954,6 @@ mod test { .read() .await .unwrap() - .as_mut() .load_pruned_height() .await .unwrap(); @@ -970,9 +963,7 @@ mod test { // Send the last leaf to the disconnected data source so it learns about the height and // fetches the missing data. let last_leaf = leaves.last().unwrap(); - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(last_leaf.clone()).await.unwrap(); - tx.commit().await.unwrap(); + data_source.append(last_leaf.clone().into()).await.unwrap(); // Trigger a fetch of each leaf so the database gets populated. for i in 1..=last_leaf.height() { @@ -989,7 +980,6 @@ mod test { .read() .await .unwrap() - .as_mut() .load_pruned_height() .await .unwrap(); @@ -1022,7 +1012,6 @@ mod test { .read() .await .unwrap() - .as_mut() .load_pruned_height() .await .unwrap(); @@ -1031,9 +1020,7 @@ mod test { // The node has pruned all of it's data including the latest block, so it's forgotten the // block height. We need to give it another leaf with some height so it will be willing to // fetch. - let mut tx = data_source.write().await.unwrap(); - tx.insert_leaf(last_leaf.clone()).await.unwrap(); - tx.commit().await.unwrap(); + data_source.append(last_leaf.clone().into()).await.unwrap(); // Wait for the data to be restored. It should be restored by the next major scan. loop { @@ -1065,4 +1052,235 @@ mod test { "{sync_status:?}" ); } + + #[derive(Clone, Copy, Debug)] + enum FailureType { + Begin, + Write, + Commit, + } + + async fn test_fetch_storage_failure_helper(failure: FailureType) { + setup_test(); + + // Create the consensus network. + let mut network = MockNetwork::::init().await; + + // Start a web server that the non-consensus node can use to fetch blocks. + let port = pick_unused_port().unwrap(); + let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source())); + app.register_module( + "availability", + define_api(&Default::default(), MockBase::instance()).unwrap(), + ) + .unwrap(); + network.spawn( + "server", + app.serve(format!("0.0.0.0:{port}"), MockBase::instance()), + ); + + // Start a data source which is not receiving events from consensus, only from a peer. + let provider = Provider::new(QueryServiceProvider::new( + format!("http://localhost:{port}").parse().unwrap(), + MockBase::instance(), + )); + let db = TmpDb::init().await; + let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap()); + let data_source = FetchingDataSource::builder(storage, provider) + .disable_proactive_fetching() + .with_max_retry_interval(Duration::from_millis(100)) + .with_retry_timeout(Duration::from_secs(1)) + .build() + .await + .unwrap(); + + // Start consensus. + network.start().await; + + // Wait until a couple of blocks are produced. + let leaves = network.data_source().subscribe_leaves(1).await; + let leaves = leaves.take(2).collect::>().await; + + // Send the last leaf to the disconnected data source so it learns about the height. + let last_leaf = leaves.last().unwrap(); + let mut tx = data_source.write().await.unwrap(); + tx.insert_leaf(last_leaf.clone()).await.unwrap(); + tx.commit().await.unwrap(); + + // Trigger a fetch of the first leaf; it should resolve even if we fail to store the leaf. + tracing::info!("fetch with write failure"); + match failure { + FailureType::Begin => data_source.as_ref().fail_begins_writable().await, + FailureType::Write => data_source.as_ref().fail_writes().await, + FailureType::Commit => data_source.as_ref().fail_commits().await, + } + assert_eq!(leaves[0], data_source.get_leaf(1).await.await); + data_source.as_ref().pass().await; + + // We can get the same leaf again, this will again trigger an active fetch since storage + // failed the first time. + tracing::info!("fetch with write success"); + let fetch = data_source.get_leaf(1).await; + assert!(fetch.is_pending()); + assert_eq!(leaves[0], fetch.await); + + // Finally, we should have the leaf locally and not need to fetch it. + tracing::info!("retrieve from storage"); + let fetch = data_source.get_leaf(1).await; + assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap()); + } + + #[async_std::test] + async fn test_fetch_storage_failure_on_begin() { + test_fetch_storage_failure_helper(FailureType::Begin).await; + } + + #[async_std::test] + async fn test_fetch_storage_failure_on_write() { + test_fetch_storage_failure_helper(FailureType::Write).await; + } + + #[async_std::test] + async fn test_fetch_storage_failure_on_commit() { + test_fetch_storage_failure_helper(FailureType::Commit).await; + } + + async fn test_fetch_storage_failure_retry_helper(failure: FailureType) { + setup_test(); + + // Create the consensus network. + let mut network = MockNetwork::::init().await; + + // Start a web server that the non-consensus node can use to fetch blocks. + let port = pick_unused_port().unwrap(); + let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source())); + app.register_module( + "availability", + define_api(&Default::default(), MockBase::instance()).unwrap(), + ) + .unwrap(); + network.spawn( + "server", + app.serve(format!("0.0.0.0:{port}"), MockBase::instance()), + ); + + // Start a data source which is not receiving events from consensus, only from a peer. + let provider = Provider::new(QueryServiceProvider::new( + format!("http://localhost:{port}").parse().unwrap(), + MockBase::instance(), + )); + let db = TmpDb::init().await; + let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap()); + let data_source = FetchingDataSource::builder(storage, provider) + .disable_proactive_fetching() + .with_min_retry_interval(Duration::from_millis(100)) + .build() + .await + .unwrap(); + + // Start consensus. + network.start().await; + + // Wait until a couple of blocks are produced. + let leaves = network.data_source().subscribe_leaves(1).await; + let leaves = leaves.take(2).collect::>().await; + + // Send the last leaf to the disconnected data source so it learns about the height. + let last_leaf = leaves.last().unwrap(); + let mut tx = data_source.write().await.unwrap(); + tx.insert_leaf(last_leaf.clone()).await.unwrap(); + tx.commit().await.unwrap(); + + // Trigger a fetch of the first leaf; it should retry until it successfully stores the leaf. + tracing::info!("fetch with write failure"); + match failure { + FailureType::Begin => data_source.as_ref().fail_one_begin_writable().await, + FailureType::Write => data_source.as_ref().fail_one_write().await, + FailureType::Commit => data_source.as_ref().fail_one_commit().await, + } + assert_eq!(leaves[0], data_source.get_leaf(1).await.await); + + // Check that the leaf ended up in local storage. + let mut tx = data_source.read().await.unwrap(); + assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap()); + } + + #[async_std::test] + async fn test_fetch_storage_failure_retry_on_begin() { + test_fetch_storage_failure_retry_helper(FailureType::Begin).await; + } + + #[async_std::test] + async fn test_fetch_storage_failure_retry_on_write() { + test_fetch_storage_failure_retry_helper(FailureType::Write).await; + } + + #[async_std::test] + async fn test_fetch_storage_failure_retry_on_commit() { + test_fetch_storage_failure_retry_helper(FailureType::Commit).await; + } + + #[async_std::test] + async fn test_fetch_on_decide() { + setup_test(); + + // Create the consensus network. + let mut network = MockNetwork::::init().await; + + // Start a web server that the non-consensus node can use to fetch blocks. + let port = pick_unused_port().unwrap(); + let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source())); + app.register_module( + "availability", + define_api(&Default::default(), MockBase::instance()).unwrap(), + ) + .unwrap(); + network.spawn( + "server", + app.serve(format!("0.0.0.0:{port}"), MockBase::instance()), + ); + + // Start a data source which is not receiving events from consensus. + let db = TmpDb::init().await; + let provider = Provider::new(QueryServiceProvider::new( + format!("http://localhost:{port}").parse().unwrap(), + MockBase::instance(), + )); + let data_source = builder(&db, &provider) + .await + .with_max_retry_interval(Duration::from_secs(1)) + .build() + .await + .unwrap(); + + // Start consensus. + network.start().await; + + // Wait until a block has been decided. + let leaf = network + .data_source() + .subscribe_leaves(1) + .await + .next() + .await + .unwrap(); + + // Give the node a decide containing the leaf but no additional information. + data_source.append(leaf.clone().into()).await.unwrap(); + + // We will eventually retrieve the corresponding block and VID common, triggered by seeing + // the leaf. + sleep(Duration::from_secs(5)).await; + + // Read the missing data directly from storage (via a database transaction), rather than + // going through the data source, so that this request itself does not trigger a fetch. + // Thus, this will only work if the data was already fetched, triggered by the leaf. + let mut tx = data_source.read().await.unwrap(); + let id = BlockId::::from(leaf.height() as usize); + let block = tx.get_block(id).await.unwrap(); + let vid = tx.get_vid_common(id).await.unwrap(); + + assert_eq!(block.hash(), leaf.block_hash()); + assert_eq!(vid.block_hash(), leaf.block_hash()); + } } diff --git a/src/lib.rs b/src/lib.rs index 54c142fd1..bc2e235e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,11 +76,8 @@ //! // Update query data using HotShot events. //! let mut events = hotshot.event_stream(); //! while let Some(event) = events.next().await { -//! let mut tx = data_source.write().await?; -//! //! // Update the query data based on this event. -//! tx.update(&event).await?; -//! tx.commit().await?; +//! data_source.update(&event).await; //! } //! # Ok(()) //! # } @@ -423,7 +420,6 @@ pub use resolvable::Resolvable; use async_std::sync::Arc; use async_trait::async_trait; -use data_source::{Transaction as _, UpdateDataSource}; use derive_more::{Deref, From, Into}; use futures::{future::BoxFuture, stream::StreamExt}; use hotshot::types::SystemContextHandle; @@ -521,13 +517,13 @@ where Payload: availability::QueryablePayload, Header: availability::QueryableHeader, D: availability::AvailabilityDataSource + + data_source::UpdateDataSource + node::NodeDataSource + status::StatusDataSource + data_source::VersionedDataSource + Send + Sync + 'static, - for<'a> D::Transaction<'a>: data_source::UpdateDataSource, ApiVer: StaticVersionType + 'static, { // Create API modules. @@ -558,9 +554,7 @@ where // Update query data using HotShot events. while let Some(event) = events.next().await { // Update the query data based on this event. - let mut tx = data_source.write().await.map_err(Error::internal)?; - tx.update(&event).await.map_err(Error::internal)?; - tx.commit().await.map_err(Error::internal)?; + data_source.update(&event).await; } Ok(()) @@ -571,11 +565,10 @@ mod test { use super::*; use crate::{ availability::{ - AvailabilityDataSource, BlockId, BlockQueryData, Fetch, LeafId, LeafQueryData, - PayloadQueryData, TransactionHash, TransactionQueryData, UpdateAvailabilityData, - VidCommonQueryData, + AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, Fetch, LeafId, + LeafQueryData, PayloadQueryData, TransactionHash, TransactionQueryData, + UpdateAvailabilityData, VidCommonQueryData, }, - data_source::VersionedDataSource, metrics::PrometheusMetrics, node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart}, status::{HasMetrics, StatusDataSource}, @@ -747,10 +740,10 @@ mod test { .await; let leaf = LeafQueryData::new(leaf, qc).unwrap(); let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis()); - let mut tx = hotshot_qs.write().await.unwrap(); - tx.insert_leaf(leaf).await.unwrap(); - tx.insert_block(block).await.unwrap(); - tx.commit().await.unwrap(); + hotshot_qs + .append(BlockInfo::new(leaf, Some(block), None, None)) + .await + .unwrap(); let module_state = RollingLog::create(&mut loader, Default::default(), "module_state", 1024).unwrap(); diff --git a/src/testing/consensus.rs b/src/testing/consensus.rs index 1fd276024..cdb29bffa 100644 --- a/src/testing/consensus.rs +++ b/src/testing/consensus.rs @@ -12,7 +12,7 @@ use super::mocks::{MockMembership, MockNodeImpl, MockTransaction, MockTypes, MockVersions}; use crate::{ - availability::AvailabilityDataSource, + availability::{AvailabilityDataSource, UpdateAvailabilityData}, data_source::{FileSystemDataSource, SqlDataSource, VersionedDataSource}, fetching::provider::NoFetching, node::NodeDataSource, @@ -334,6 +334,7 @@ pub trait DataSourceLifeCycle: Clone + Send + Sync + Sized + 'static { pub trait TestableDataSource: DataSourceLifeCycle + AvailabilityDataSource + + UpdateAvailabilityData + NodeDataSource + StatusDataSource + VersionedDataSource @@ -343,6 +344,7 @@ pub trait TestableDataSource: impl TestableDataSource for T where T: DataSourceLifeCycle + AvailabilityDataSource + + UpdateAvailabilityData + NodeDataSource + StatusDataSource + VersionedDataSource