Skip to content

Commit

Permalink
Merge pull request #718 from EspressoSystems/jb/missed-notification
Browse files Browse the repository at this point in the history
Improve interaction between storage and fetching notifications
  • Loading branch information
jbearer authored Oct 30, 2024
2 parents bf491a1 + ba15807 commit 7e12d6c
Show file tree
Hide file tree
Showing 28 changed files with 1,417 additions and 990 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async-compatibility-layer = { version = "1.1", default-features = false, feature
async-lock = "3.3.0"
async-std = { version = "1.9.0", features = ["unstable", "attributes"] }
async-trait = "0.1"
backoff = "0.4"
bincode = "1.3"
chrono = "0.4"
committable = "0.2"
Expand Down
13 changes: 5 additions & 8 deletions src/availability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,7 @@ where
mod test {
use super::*;
use crate::{
data_source::{
storage::no_storage, ExtensibleDataSource, Transaction, VersionedDataSource,
},
data_source::{storage::no_storage, ExtensibleDataSource},
status::StatusDataSource,
task::BackgroundTask,
testing::{
Expand Down Expand Up @@ -870,11 +868,10 @@ mod test {
.await;
let leaf = LeafQueryData::new(leaf, qc).unwrap();
let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());

let mut tx = data_source.write().await.unwrap();
tx.insert_leaf(leaf).await.unwrap();
tx.insert_block(block.clone()).await.unwrap();
tx.commit().await.unwrap();
data_source
.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
.await
.unwrap();

// assert that the store has data before we move on to API requests
assert_eq!(
Expand Down
61 changes: 51 additions & 10 deletions src/availability/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ use super::{
TransactionHash, TransactionQueryData, VidCommonQueryData,
},
};
use crate::{Payload, VidCommitment, VidShare};
use crate::{types::HeightIndexed, Payload, VidCommitment, VidShare};
use async_trait::async_trait;
use derivative::Derivative;
use derive_more::{Display, From};
use futures::stream::{BoxStream, Stream, StreamExt};
use futures::{
future::Future,
stream::{BoxStream, Stream, StreamExt},
};
use hotshot_types::traits::node_implementation::NodeType;
use std::{cmp::Ordering, ops::RangeBounds};

Expand Down Expand Up @@ -201,13 +204,51 @@ where
}
}

#[async_trait]
/// Information about a block.
///
/// This type encapsulate all the information we might have about a decided HotShot block:
/// * The leaf, including a header and consensus metadata
/// * The block itself, which may be missing if this node did not receive a DA proposal for this
/// block
/// * VID common and a unique VID share, which may be missing if this node did not receive a VID
/// share for this block
#[derive(Clone, Debug)]
pub struct BlockInfo<Types: NodeType> {
pub leaf: LeafQueryData<Types>,
pub block: Option<BlockQueryData<Types>>,
pub vid_common: Option<VidCommonQueryData<Types>>,
pub vid_share: Option<VidShare>,
}

impl<Types: NodeType> From<LeafQueryData<Types>> for BlockInfo<Types> {
fn from(leaf: LeafQueryData<Types>) -> Self {
Self::new(leaf, None, None, None)
}
}

impl<Types: NodeType> HeightIndexed for BlockInfo<Types> {
fn height(&self) -> u64 {
self.leaf.height()
}
}

impl<Types: NodeType> BlockInfo<Types> {
pub fn new(
leaf: LeafQueryData<Types>,
block: Option<BlockQueryData<Types>>,
vid_common: Option<VidCommonQueryData<Types>>,
vid_share: Option<VidShare>,
) -> Self {
Self {
leaf,
block,
vid_common,
vid_share,
}
}
}

pub trait UpdateAvailabilityData<Types: NodeType> {
async fn insert_leaf(&mut self, leaf: LeafQueryData<Types>) -> anyhow::Result<()>;
async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()>;
async fn insert_vid(
&mut self,
common: VidCommonQueryData<Types>,
share: Option<VidShare>,
) -> anyhow::Result<()>;
/// Append information about a new block to the database.
fn append(&self, info: BlockInfo<Types>) -> impl Send + Future<Output = anyhow::Result<()>>;
}
8 changes: 7 additions & 1 deletion src/availability/query_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// You should have received a copy of the GNU General Public License along with this program. If not,
// see <https://www.gnu.org/licenses/>.

use crate::{types::HeightIndexed, Header, Metadata, Payload, Transaction, VidCommon};
use crate::{types::HeightIndexed, Header, Metadata, Payload, Transaction, VidCommon, VidShare};
use committable::{Commitment, Committable};
use hotshot_types::{
data::Leaf,
Expand Down Expand Up @@ -486,6 +486,12 @@ impl<Types: NodeType> HeightIndexed for VidCommonQueryData<Types> {
}
}

impl<Types: NodeType> HeightIndexed for (VidCommonQueryData<Types>, Option<VidShare>) {
fn height(&self) -> u64 {
self.0.height
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(bound = "")]
pub struct TransactionQueryData<Types: NodeType>
Expand Down
72 changes: 31 additions & 41 deletions src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,10 @@ pub mod availability_tests {
#[espresso_macros::generic_tests]
pub mod persistence_tests {
use crate::{
availability::{BlockQueryData, LeafQueryData, UpdateAvailabilityData},
availability::{BlockQueryData, LeafQueryData},
data_source::{
storage::{AvailabilityStorage, NodeStorage},
Transaction, UpdateDataSource,
storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
Transaction,
},
node::NodeDataSource,
testing::{
Expand All @@ -475,8 +475,9 @@ pub mod persistence_tests {
#[async_std::test]
pub async fn test_revert<D: TestableDataSource>()
where
for<'a> D::Transaction<'a>:
UpdateDataSource<MockTypes> + AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
+ AvailabilityStorage<MockTypes>
+ NodeStorage<MockTypes>,
{
use hotshot_example_types::node_types::TestVersions;

Expand Down Expand Up @@ -528,7 +529,7 @@ pub mod persistence_tests {
#[async_std::test]
pub async fn test_reset<D: TestableDataSource>()
where
for<'a> D::Transaction<'a>: UpdateDataSource<MockTypes>,
for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
{
use hotshot_example_types::node_types::TestVersions;

Expand Down Expand Up @@ -588,8 +589,9 @@ pub mod persistence_tests {
#[async_std::test]
pub async fn test_drop_tx<D: TestableDataSource>()
where
for<'a> D::Transaction<'a>:
UpdateDataSource<MockTypes> + AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
+ AvailabilityStorage<MockTypes>
+ NodeStorage<MockTypes>,
for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
{
use hotshot_example_types::node_types::TestVersions;
Expand Down Expand Up @@ -672,10 +674,12 @@ pub mod persistence_tests {
pub mod node_tests {
use crate::{
availability::{
BlockQueryData, LeafQueryData, QueryableHeader, UpdateAvailabilityData,
VidCommonQueryData,
BlockInfo, BlockQueryData, LeafQueryData, QueryableHeader, VidCommonQueryData,
},
data_source::{
storage::{NodeStorage, UpdateAvailabilityStorage},
update::Transaction,
},
data_source::{storage::NodeStorage, update::Transaction, UpdateDataSource},
node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart},
testing::{
consensus::{MockNetwork, TestableDataSource},
Expand Down Expand Up @@ -704,7 +708,7 @@ pub mod node_tests {
#[async_std::test]
pub async fn test_sync_status<D: TestableDataSource>()
where
for<'a> D::Transaction<'a>: UpdateDataSource<MockTypes>,
for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
{
use hotshot_example_types::node_types::TestVersions;

Expand Down Expand Up @@ -758,11 +762,7 @@ pub mod node_tests {

// Insert a leaf without the corresponding block or VID info, make sure we detect that the
// block and VID info are missing.
{
let mut tx = ds.write().await.unwrap();
tx.insert_leaf(leaves[0].clone()).await.unwrap();
tx.commit().await.unwrap();
}
ds.append(leaves[0].clone().into()).await.unwrap();
assert_eq!(
ds.sync_status().await.unwrap(),
SyncStatus {
Expand All @@ -776,11 +776,7 @@ pub mod node_tests {

// Insert a leaf whose height is not the successor of the previous leaf. We should now
// detect that the leaf in between is missing (along with all _three_ corresponding blocks).
{
let mut tx = ds.write().await.unwrap();
tx.insert_leaf(leaves[2].clone()).await.unwrap();
tx.commit().await.unwrap();
}
ds.append(leaves[2].clone().into()).await.unwrap();
assert_eq!(
ds.sync_status().await.unwrap(),
SyncStatus {
Expand Down Expand Up @@ -860,10 +856,7 @@ pub mod node_tests {
}

#[async_std::test]
pub async fn test_counters<D: TestableDataSource>()
where
for<'a> D::Transaction<'a>: UpdateDataSource<MockTypes>,
{
pub async fn test_counters<D: TestableDataSource>() {
use hotshot_example_types::node_types::TestVersions;

setup_test();
Expand Down Expand Up @@ -914,12 +907,9 @@ pub mod node_tests {
.await;
*leaf.leaf.block_header_mut() = header.clone();
let block = BlockQueryData::new(header, payload);
{
let mut tx = ds.write().await.unwrap();
tx.insert_leaf(leaf).await.unwrap();
tx.insert_block(block).await.unwrap();
tx.commit().await.unwrap();
}
ds.append(BlockInfo::new(leaf, Some(block), None, None))
.await
.unwrap();

total_transactions += 1;
total_size += encoded.len();
Expand Down Expand Up @@ -960,7 +950,7 @@ pub mod node_tests {
#[async_std::test]
pub async fn test_vid_monotonicity<D: TestableDataSource>()
where
for<'a> D::Transaction<'a>: UpdateDataSource<MockTypes>,
for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
{
use hotshot_example_types::node_types::TestVersions;
Expand All @@ -981,14 +971,14 @@ pub mod node_tests {
)
.await;
let common = VidCommonQueryData::new(leaf.header().clone(), disperse.common);
{
let mut tx = ds.write().await.unwrap();
tx.insert_leaf(leaf).await.unwrap();
tx.insert_vid(common.clone(), Some(disperse.shares[0].clone()))
.await
.unwrap();
tx.commit().await.unwrap();
}
ds.append(BlockInfo::new(
leaf,
None,
Some(common.clone()),
Some(disperse.shares[0].clone()),
))
.await
.unwrap();

{
assert_eq!(ds.get_vid_common(0).await.await, common);
Expand Down
28 changes: 6 additions & 22 deletions src/data_source/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use super::VersionedDataSource;
use crate::{
availability::{
AvailabilityDataSource, BlockId, BlockQueryData, Fetch, LeafId, LeafQueryData,
AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, Fetch, LeafId, LeafQueryData,
PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash, TransactionQueryData,
UpdateAvailabilityData, VidCommonQueryData,
},
Expand Down Expand Up @@ -213,27 +213,14 @@ where
}
}

#[async_trait]
impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
where
D: UpdateAvailabilityData<Types> + Send + Sync,
U: Send + Sync,
Types: NodeType,
{
async fn insert_leaf(&mut self, leaf: LeafQueryData<Types>) -> anyhow::Result<()> {
self.data_source.insert_leaf(leaf).await
}

async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
self.data_source.insert_block(block).await
}

async fn insert_vid(
&mut self,
common: VidCommonQueryData<Types>,
share: Option<VidShare>,
) -> anyhow::Result<()> {
self.data_source.insert_vid(common, share).await
async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
self.data_source.append(info).await
}
}

Expand Down Expand Up @@ -417,7 +404,7 @@ where
mod impl_testable_data_source {
use super::*;
use crate::{
data_source::{Transaction, UpdateDataSource},
data_source::UpdateDataSource,
testing::{
consensus::{DataSourceLifeCycle, TestableDataSource},
mocks::MockTypes,
Expand All @@ -428,8 +415,7 @@ mod impl_testable_data_source {
#[async_trait]
impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
where
D: TestableDataSource,
for<'a> D::Transaction<'a>: UpdateDataSource<MockTypes>,
D: TestableDataSource + UpdateDataSource<MockTypes>,
U: Clone + Default + Send + Sync + 'static,
{
type Storage = D::Storage;
Expand All @@ -447,9 +433,7 @@ mod impl_testable_data_source {
}

async fn handle_event(&self, event: &Event<MockTypes>) {
let mut tx = self.write().await.unwrap();
tx.update(event).await.unwrap();
tx.commit().await.unwrap();
self.update(event).await;
}
}
}
Expand Down
Loading

0 comments on commit 7e12d6c

Please sign in to comment.