diff --git a/Cargo.toml b/Cargo.toml index abe195024..2bdf80ffb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,8 +77,8 @@ derive_more = "0.99" either = "1.10" futures = "0.3" hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.36" } -hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.36" } hotshot-testing = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.36" } +hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.36" } itertools = "0.12.1" jf-primitives = { git = "https://github.com/EspressoSystems/jellyfish", tag = "0.4.3" } prometheus = "0.13" @@ -135,6 +135,6 @@ generic-array = "0.14" hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.36" } portpicker = "0.1" rand = "0.8" +reqwest = "0.12.3" spin_sleep = "1.2" tempfile = "3.10" -reqwest = "0.12.3" diff --git a/flake.nix b/flake.nix index 4dd02f2fd..822491363 100644 --- a/flake.nix +++ b/flake.nix @@ -111,7 +111,7 @@ RUST_SRC_PATH = "${rustToolchain}/lib/rustlib/src/rust/library"; RUST_BACKTRACE = 1; RUST_LOG = "info"; - RUSTFLAGS=" --cfg async_executor_impl=\"async-std\" --cfg async_channel_impl=\"async-std\""; + RUSTFLAGS=" --cfg async_executor_impl=\"async-std\" --cfg async_channel_impl=\"async-std\" --cfg hotshot_example"; # Use a distinct target dir for builds from within nix shells. CARGO_TARGET_DIR = "target/nix"; in { diff --git a/src/data_source/extension.rs b/src/data_source/extension.rs index 67722141d..20df80ae7 100644 --- a/src/data_source/extension.rs +++ b/src/data_source/extension.rs @@ -28,7 +28,6 @@ use crate::{ }; use async_trait::async_trait; use hotshot_types::traits::node_implementation::NodeType; -use jf_primitives::merkle_tree::prelude::MerklePath; use jf_primitives::merkle_tree::prelude::MerkleProof; use std::ops::RangeBounds; @@ -298,6 +297,14 @@ where ) -> QueryResult> { self.data_source.get_path(snapshot, key).await } + + async fn keys(&self, snapshot: Snapshot) -> QueryResult> { + self.data_source.keys(snapshot).await + } + + async fn get_snapshot(&self, snapshot: Snapshot) -> QueryResult { + self.data_source.get_snapshot(snapshot).await + } } #[async_trait] @@ -325,7 +332,7 @@ where { async fn insert_merkle_nodes( &mut self, - path: MerklePath, + path: MerkleProof, traversal_path: Vec, block_number: u64, ) -> QueryResult<()> { diff --git a/src/data_source/fetching.rs b/src/data_source/fetching.rs index 38709cb78..32f75736d 100644 --- a/src/data_source/fetching.rs +++ b/src/data_source/fetching.rs @@ -97,8 +97,6 @@ use crate::{ Header, Payload, QueryResult, VidShare, }; use anyhow::Context; -use jf_primitives::merkle_tree::prelude::MerkleProof; - use async_std::{ sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, task::sleep, @@ -111,7 +109,7 @@ use futures::{ stream::{self, BoxStream, Stream, StreamExt}, }; use hotshot_types::traits::node_implementation::NodeType; -use jf_primitives::merkle_tree::{prelude::MerklePath, MerkleTreeScheme}; +use jf_primitives::merkle_tree::{prelude::MerkleProof, MerkleTreeScheme}; use std::{ cmp::min, @@ -560,7 +558,7 @@ where { async fn insert_merkle_nodes( &mut self, - path: MerklePath, + path: MerkleProof, traversal_path: Vec, block_number: u64, ) -> QueryResult<()> { @@ -597,6 +595,26 @@ where .get_path(snapshot, key) .await } + + async fn keys(&self, snapshot: Snapshot) -> QueryResult> { + self.fetcher + .storage + .read() + .await + .storage + .keys(snapshot) + .await + } + + async fn get_snapshot(&self, snapshot: Snapshot) -> QueryResult { + self.fetcher + .storage + .read() + .await + .storage + .get_snapshot(snapshot) + .await + } } #[async_trait] diff --git a/src/data_source/storage/sql.rs b/src/data_source/storage/sql.rs index e479f61d1..93d67b62e 100644 --- a/src/data_source/storage/sql.rs +++ b/src/data_source/storage/sql.rs @@ -12,18 +12,26 @@ #![cfg(feature = "sql-data-source")] -use std::{ - borrow::Cow, - cmp::min, - collections::{HashMap, HashSet}, - fmt::Display, - ops::{Bound, RangeBounds}, - pin::Pin, - str::FromStr, - time::Duration, +use super::{pruning::PrunedHeightStorage, AvailabilityStorage}; +use crate::{ + availability::{ + BlockId, BlockQueryData, LeafId, LeafQueryData, PayloadQueryData, QueryableHeader, + QueryablePayload, TransactionHash, TransactionIndex, UpdateAvailabilityData, + VidCommonQueryData, + }, + data_source::{ + storage::pruning::{PruneStorage, PrunerCfg, PrunerConfig}, + VersionedDataSource, + }, + merklized_state::{ + MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot, + UpdateStateData, + }, + node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart}, + task::BackgroundTask, + types::HeightIndexed, + Header, Leaf, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult, VidShare, }; - -pub use anyhow::Error; use ark_serialize::{CanonicalDeserialize, CanonicalSerialize, SerializationError}; use async_std::{net::ToSocketAddrs, sync::Arc, task::sleep}; use async_trait::async_trait; @@ -42,52 +50,38 @@ use hotshot_types::{ node_implementation::NodeType, }, }; -use jf_primitives::merkle_tree::prelude::MerkleProof; - -use std::fmt::Debug; -use tokio_postgres::types::{private::BytesMut, to_sql_checked, FromSql, Type}; -// This needs to be reexported so that we can reference it by absolute path relative to this crate -// in the expansion of `include_migrations`, even when `include_migrations` is invoked from another -// crate which doesn't have `include_dir` as a dependency. -pub use include_dir::include_dir; use itertools::{izip, Itertools}; +use jf_primitives::merkle_tree::{ + prelude::{MerkleNode, MerkleProof}, + DigestAlgorithm, MerkleCommitment, ToTraversalPath, +}; use postgres_native_tls::TlsConnector; -pub use refinery::Migration; use snafu::OptionExt; -pub use tokio_postgres as postgres; +use std::{ + borrow::Cow, + cmp::min, + collections::{HashMap, HashSet, VecDeque}, + fmt::{Debug, Display}, + ops::{Bound, RangeBounds}, + pin::Pin, + str::FromStr, + time::Duration, +}; use tokio_postgres::{ config::Host, tls::TlsConnect, - types::{BorrowToSql, ToSql}, + types::{private::BytesMut, to_sql_checked, BorrowToSql, FromSql, ToSql, Type}, Client, NoTls, Row, ToStatement, }; -use super::{pruning::PrunedHeightStorage, AvailabilityStorage}; +pub use anyhow::Error; +// This needs to be reexported so that we can reference it by absolute path relative to this crate +// in the expansion of `include_migrations`, even when `include_migrations` is invoked from another +// crate which doesn't have `include_dir` as a dependency. pub use crate::include_migrations; -use jf_primitives::merkle_tree::{ - prelude::{MerkleNode, MerklePath}, - DigestAlgorithm, MerkleCommitment, ToTraversalPath, -}; - -use crate::{ - availability::{ - BlockId, BlockQueryData, LeafId, LeafQueryData, PayloadQueryData, QueryableHeader, - QueryablePayload, TransactionHash, TransactionIndex, UpdateAvailabilityData, - VidCommonQueryData, - }, - data_source::{ - storage::pruning::{PruneStorage, PrunerCfg, PrunerConfig}, - VersionedDataSource, - }, - merklized_state::{ - MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot, - UpdateStateData, - }, - node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart}, - task::BackgroundTask, - types::HeightIndexed, - Header, Leaf, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult, VidShare, -}; +pub use include_dir::include_dir; +pub use refinery::Migration; +pub use tokio_postgres as postgres; /// Embed migrations from the given directory into the current binary. /// @@ -1205,27 +1199,33 @@ impl, const ARITY: usize> { async fn insert_merkle_nodes( &mut self, - path: MerklePath, + proof: MerkleProof, traversal_path: Vec, block_number: u64, ) -> QueryResult<()> { + let pos = proof.pos; + let path = proof.proof; + let name = State::state_type(); let block_number = block_number as i64; let mut traversal_path = traversal_path.iter(); let txn = self.transaction().await?; - // All the nodes are collected here, They depend on the hash ids which are returned after hashes are upserted in the db + // All the nodes are collected here, They depend on the hash ids which are returned after + // hashes are upserted in the db let mut nodes = Vec::new(); for node in path.iter() { match node { MerkleNode::Empty => { let ltree_path = LTree::from(traversal_path.clone()); + let index = serde_json::to_value(pos.clone()).map_err(ParseError::Serde)?; nodes.push(( Node { pos: ltree_path, + index: Some(index), ..Default::default() }, [0_u8; 32].to_vec(), @@ -1251,8 +1251,8 @@ impl, const ARITY: usize> nodes.push(( Node { pos: ltree_path, - index, - entry, + index: Some(index), + entry: Some(entry), ..Default::default() }, leaf_commit, @@ -1306,9 +1306,10 @@ impl, const ARITY: usize> .collect(); // insert internal node + let ltree_path = LTree::from(traversal_path.clone()); nodes.push(( Node { - pos: LTree::from(traversal_path.clone()), + pos: ltree_path, children: Some(children_hash_ids), children_bitvec: Some(children_bitvec), ..Default::default() @@ -1383,44 +1384,13 @@ where ) -> QueryResult> { let state_type = State::state_type(); let tree_height = State::tree_height(); - let header_state_commitment_field = State::header_state_commitment_field(); // Get the traversal path of the index let traversal_path = State::Key::to_traversal_path(&key, tree_height) .into_iter() .map(|x| x as i64) .collect::>(); - - let (created, merkle_commitment) = match snapshot { - Snapshot::Commit(commit) => { - // Get the block height using the merkle commitment. - let query = self - .query_one( - &format!( - "SELECT height FROM Header where - data->>'{header_state_commitment_field}' = $1" - ), - &[&commit.to_string()], - ) - .await?; - - (query.get(0), commit.digest()) - } - Snapshot::Index(created) => { - let created = created as i64; - // Check if header exists - // For Snapshot::Commit the entry is already checked when retrieving the block height - let row = self.query_one(&format!( - "SELECT data->>'{header_state_commitment_field}' as root_commmitment from HEADER where height = $1"), - [sql_param(&created)], - ) - .await?; - let commit: String = row.get(0); - let commit: State::Commit = - serde_json::from_value(commit.into()).map_err(ParseError::Serde)?; - (created, commit.digest()) - } - }; + let (created, merkle_commitment) = self.snapshot_info(snapshot).await?; // Get all the nodes in the path to the index. // Order by pos DESC is to return nodes from the leaf to the root @@ -1464,7 +1434,7 @@ where .try_collect() .await?; - let mut proof_path = Vec::new(); + let mut proof_path = VecDeque::with_capacity(State::tree_height()); for Node { hash_id, children, @@ -1475,10 +1445,12 @@ where } in nodes.iter() { { - let value = hashes.get(hash_id).ok_or(QueryError::NotFound)?; - // If the row has children and no index then its a ForgettenSubtree - match (children, children_bitvec, index.is_null()) { - (Some(children), Some(children_bitvec), true) => { + let value = hashes.get(hash_id).ok_or(QueryError::Error { + message: format!("node's value references non-existent hash {hash_id}"), + })?; + match (children, children_bitvec, index, entry) { + // If the row has children then its a branch + (Some(children), Some(children_bitvec), None, None) => { let mut children = children.iter(); // Reconstruct the Children MerkleNodes from storage. @@ -1487,9 +1459,14 @@ where .iter() .map(|bit| { if bit { - let value = hashes - .get(children.next().ok_or(QueryError::NotFound)?) - .ok_or(QueryError::NotFound)?; + let hash_id = children.next().ok_or(QueryError::Error { + message: "node has fewer children than set bits".into(), + })?; + let value = hashes.get(hash_id).ok_or(QueryError::Error { + message: format!( + "node's child references non-existent hash {hash_id}" + ), + })?; Ok(Arc::new(MerkleNode::ForgettenSubtree { value: State::T::deserialize_compressed(value.as_slice()) .map_err(ParseError::Deserialize)?, @@ -1500,15 +1477,15 @@ where }) .collect::>>()?; // Use the Children merkle nodes to reconstruct the branch node - proof_path.push(MerkleNode::Branch { + proof_path.push_back(MerkleNode::Branch { value: State::T::deserialize_compressed(value.as_slice()) .map_err(ParseError::Deserialize)?, children: child_nodes, }); } - // No children but if there is an index then its a leaf - (None, None, false) => { - proof_path.push(MerkleNode::Leaf { + // If it has an entry, it's a leaf + (None, None, Some(index), Some(entry)) => { + proof_path.push_back(MerkleNode::Leaf { value: State::T::deserialize_compressed(value.as_slice()) .map_err(ParseError::Deserialize)?, pos: serde_json::from_value(index.clone()) @@ -1517,9 +1494,9 @@ where .map_err(ParseError::Serde)?, }); } - // No children and index then its an Empty Node - (None, None, true) => { - proof_path.push(MerkleNode::Empty); + // Otherwise, it's empty. + (None, None, Some(_), None) => { + proof_path.push_back(MerkleNode::Empty); } _ => { return Err(QueryError::Error { @@ -1531,15 +1508,18 @@ where } // Reconstruct the merkle commitment from the path - let init = match proof_path.first() { - Some(MerkleNode::Leaf { value, .. }) => *value, - Some(MerkleNode::Empty) => State::T::default(), - Some(_) => { - return Err(QueryError::Error { - message: "Missing State ".to_string(), - }) + let init = if let Some(MerkleNode::Leaf { value, .. }) = proof_path.front() { + *value + } else { + // If the path ends in a branch (or, as a special case, if the path and thus the entire + // tree is empty), we are looking up an entry that is not present in the tree. We always + // store all the nodes on all the paths to all the entries in the tree, so the only + // nodes we could be missing are empty nodes from unseen entries. Thus, we can + // reconstruct what the path should be by prepending empty nodes. + while proof_path.len() <= State::tree_height() { + proof_path.push_front(MerkleNode::Empty); } - None => return Err(QueryError::Missing), + State::T::default() }; let commitment_from_path = traversal_path .iter() @@ -1559,13 +1539,24 @@ where .collect::>>()?; if data[*branch as usize] != val { - return Err(QueryError::Error { - message: "Some earlier state was used to calculate root commitment" - .to_string(), - }); + // This can only happen if data is missing: we have an old version of + // one of the nodes in the path, which is why it is not matching up with + // its parent. + tracing::warn!( + ?key, + parent = ?data[*branch as usize], + child = ?val, + branch = %*branch, + %created, + %merkle_commitment, + "missing data in merklized state; parent-child mismatch", + ); + return Err(QueryError::Missing); } - State::Digest::digest(&data).map_err(|_| QueryError::Missing) + State::Digest::digest(&data).map_err(|err| QueryError::Error { + message: format!("failed to update digest: {err:#}"), + }) } MerkleNode::Empty => Ok(init), _ => Err(QueryError::Error { @@ -1574,19 +1565,67 @@ where } })?; - if commitment_from_path != merkle_commitment { + if commitment_from_path != merkle_commitment.digest() { return Err(QueryError::Error { message: - "Commitment calcuated from merkle path does not match the commitment in the header" - .to_string(), + format!("Commitment calcuated from merkle path ({commitment_from_path:?}) does not match the commitment in the header ({:?})", merkle_commitment.digest()), }); } Ok(MerkleProof { pos: key, - proof: proof_path, + proof: proof_path.into(), }) } + + async fn keys(&self, snapshot: Snapshot) -> QueryResult> { + let state_type = State::state_type(); + + // Identify the snapshot. + let created = self.snapshot_info(snapshot).await?.0; + + // Get all the nodes which correspond to an entry, ie have a non-NULL index field. + let rows = self + .query( + &format!( + "SELECT DISTINCT ON (pos) index + FROM {state_type} + WHERE created <= $1 AND index IS NOT NULL + ORDER BY pos, created DESC;" + ), + [sql_param(&created)], + ) + .await?; + rows.map(|row| { + let row = row.map_err(|err| QueryError::Error { + message: format!("failed to fetch key: {err:#}"), + })?; + serde_json::from_value(row.get("index")).map_err(|err| QueryError::Error { + message: format!("failed to deserialize key: {err:#}"), + }) + }) + .try_collect() + .await + } + + async fn get_snapshot(&self, snapshot: Snapshot) -> QueryResult { + // Identify the snapshot. + let commit = self.snapshot_info(snapshot).await?.1; + + // Create a completely sparse snapshot from the header, as a starting point. + let mut state = State::from_commitment(commit); + // Remember each path into the tree. + for key in self.keys(snapshot).await? { + let path = self.get_path(snapshot, key.clone()).await?; + state + .insert_path(key.clone(), &path) + .map_err(|err| QueryError::Error { + message: format!("invalid path for key {key:?}: {err:#}"), + })?; + } + + Ok(state) + } } #[async_trait] @@ -1615,6 +1654,66 @@ impl MerklizedStateHeightPersistence for SqlStorage { } } +/// Low-level Merklized state operations. +impl SqlStorage { + /// Get information identifying a [`Snapshot`]. + /// + /// If the given snapshot is known to the database, this function returns + /// * The block height at which the snapshot was created + /// * A digest of the Merkle commitment to the snapshotted state + async fn snapshot_info( + &self, + snapshot: Snapshot, + ) -> QueryResult<(i64, State::Commit)> + where + Types: NodeType, + State: MerklizedState, + { + let header_state_commitment_field = State::header_state_commitment_field(); + + let (created, commit) = match snapshot { + Snapshot::Commit(commit) => { + // Get the block height using the merkle commitment. + let query = self + .query_one( + &format!( + "SELECT height FROM Header where + data->>'{header_state_commitment_field}' = $1" + ), + &[&commit.to_string()], + ) + .await?; + + (query.get(0), commit) + } + Snapshot::Index(created) => { + let created = created as i64; + let row = self + .query_one( + &format!( + "SELECT data->>'{header_state_commitment_field}' AS root_commmitment + FROM header + WHERE height = $1" + ), + [sql_param(&created)], + ) + .await?; + let commit: String = row.get(0); + let commit = serde_json::from_value(commit.into()).map_err(ParseError::Serde)?; + (created, commit) + } + }; + + // Make sure the requested snapshot is up to date. + let height = self.get_last_state_height().await?; + if height < (created as usize) { + return Err(QueryError::NotFound); + } + + Ok((created, commit)) + } +} + /// Represents a Hash table row struct HashTableRow { /// Hash id to be used by the state table to save space @@ -1687,8 +1786,8 @@ struct Node { hash_id: i32, children: Option>, children_bitvec: Option, - index: serde_json::Value, - entry: serde_json::Value, + index: Option, + entry: Option, } impl Node { @@ -1731,22 +1830,26 @@ impl TryFrom for Node { fn try_from(row: Row) -> Result { Ok(Self { pos: row.try_get(0).map_err(|e| QueryError::Error { - message: format!("failed to get column pos {e}"), + message: format!("failed to get column pos: {e}"), })?, created: row.try_get(1).map_err(|e| QueryError::Error { - message: format!("failed to get column created {e}"), + message: format!("failed to get column created: {e}"), })?, hash_id: row.try_get(2).map_err(|e| QueryError::Error { - message: format!("failed to get column hash_id {e}"), + message: format!("failed to get column hash_id: {e}"), })?, children: row.try_get(3).map_err(|e| QueryError::Error { - message: format!("failed to get column children {e}"), + message: format!("failed to get column children: {e}"), })?, children_bitvec: row.try_get(4).map_err(|e| QueryError::Error { - message: format!("failed to get column children bitmap {e}"), + message: format!("failed to get column children bitmap: {e}"), + })?, + index: row.try_get(5).map_err(|e| QueryError::Error { + message: format!("failed to get column index: {e}"), + })?, + entry: row.try_get(6).map_err(|e| QueryError::Error { + message: format!("failed to get column entry: {e}"), })?, - index: row.try_get(5).unwrap_or(serde_json::Value::Null), - entry: row.try_get(6).unwrap_or(serde_json::Value::Null), }) } } @@ -2631,8 +2734,10 @@ pub mod testing { mod test { use hotshot_example_types::state_types::TestInstanceState; - use jf_primitives::merkle_tree::universal_merkle_tree::UniversalMerkleTree; - use jf_primitives::merkle_tree::UniversalMerkleTreeScheme; + use jf_primitives::merkle_tree::{ + universal_merkle_tree::UniversalMerkleTree, LookupResult, UniversalMerkleTreeScheme, + }; + use rand::{seq::IteratorRandom, RngCore}; use super::{testing::TmpDb, *}; @@ -2888,7 +2993,8 @@ mod test { let mut storage = SqlStorage::connect(db.config()).await.unwrap(); // define a test tree - let mut test_tree: UniversalMerkleTree<_, _, _, 8, _> = MockMerkleTree::new(3); + let mut test_tree: UniversalMerkleTree<_, _, _, 8, _> = + MockMerkleTree::new(MockMerkleTree::tree_height()); let block_height = 1; // insert some entries into the tree and the header table @@ -2917,13 +3023,15 @@ mod test { >::insert_merkle_nodes( &mut storage, - proof.proof.clone(), + proof.clone(), traversal_path.clone(), block_height as u64, ) .await .expect("failed to insert nodes"); } + // update saved state height + storage.set_last_state_height(block_height).await.unwrap(); //Get the path and check if it matches the lookup for i in 0..27 { @@ -2974,12 +3082,14 @@ mod test { >::insert_merkle_nodes( &mut storage, - proof_bh_2.proof.clone(), + proof_bh_2.clone(), traversal_path.clone(), 2, ) .await .expect("failed to insert nodes"); + // update saved state height + storage.set_last_state_height(2).await.unwrap(); // Find all the nodes of Index 0 in table let ltree_path = LTree::from(traversal_path.iter()); @@ -3039,7 +3149,7 @@ mod test { let mut storage = SqlStorage::connect(db.config()).await.unwrap(); // define a test tree - let mut test_tree = MockMerkleTree::new(3); + let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height()); let block_height = 1; //insert an entry into the tree test_tree.update(0, 0).unwrap(); @@ -3066,12 +3176,14 @@ mod test { // insert merkle nodes >::insert_merkle_nodes( &mut storage, - proof_before_remove.proof.clone(), + proof_before_remove.clone(), traversal_path.clone(), block_height as u64, ) .await .expect("failed to insert nodes"); + // update saved state height + storage.set_last_state_height(block_height).await.unwrap(); // the path from the db and and tree should match let merkle_path = >::get_path( &storage, @@ -3097,7 +3209,7 @@ mod test { >::insert_merkle_nodes( &mut storage, - proof_after_remove.proof.clone(), + proof_after_remove.clone(), traversal_path.clone(), 2_u64, ) @@ -3115,6 +3227,8 @@ mod test { ) .await .unwrap(); + // update saved state height + storage.set_last_state_height(2).await.unwrap(); // Get non membership proof let non_membership_path = >::get_path( @@ -3144,6 +3258,62 @@ mod test { assert_eq!(proof_bh_1, proof_before_remove, "merkle paths dont match"); } + #[async_std::test] + async fn test_merklized_state_non_membership_proof_unseen_entry() { + setup_test(); + + let db = TmpDb::init().await; + let mut storage = SqlStorage::connect(db.config()).await.unwrap(); + + // define a test tree + let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height()); + + // For each case (where the root is empty, a leaf, and a branch) test getting a + // non-membership proof for an entry node the database has never seen. + for i in 0..=2 { + tracing::info!(i, ?test_tree, "testing non-membership proof"); + + // Insert a dummy header + storage + .query_opt( + "INSERT INTO HEADER VALUES ($1, $2, 't', 0, $3)", + [ + sql_param(&(i as i64)), + sql_param(&format!("hash{i}")), + sql_param(&serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()})), + ], + ) + .await + .unwrap(); + // update saved state height + storage.set_last_state_height(i).await.unwrap(); + + // get a non-membership proof for a never-before-seen node. + let proof = MerklizedStateDataSource::get_path( + &storage, + Snapshot::::Index(i as u64), + 100, + ) + .await + .unwrap(); + assert_eq!(proof.elem(), None); + assert!(test_tree.non_membership_verify(100, proof).unwrap()); + + // insert an additional node into the tree. + test_tree.update(i, i).unwrap(); + let (_, proof) = test_tree.lookup(i).expect_ok().unwrap(); + let traversal_path = ToTraversalPath::<8>::to_traversal_path(&i, test_tree.height()); + UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes( + &mut storage, + proof, + traversal_path, + (i + 1) as u64, + ) + .await + .expect("failed to insert nodes"); + } + } + #[async_std::test] async fn test_merklized_storage_with_commit() { // This test insert a merkle path into the database and queries the path using the merkle commitment @@ -3153,7 +3323,7 @@ mod test { let mut storage = SqlStorage::connect(db.config()).await.unwrap(); // define a test tree - let mut test_tree = MockMerkleTree::new(3); + let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height()); let block_height = 1; //insert an entry into the tree test_tree.update(0, 0).unwrap(); @@ -3180,12 +3350,14 @@ mod test { // insert merkle nodes >::insert_merkle_nodes( &mut storage, - proof.proof.clone(), + proof.clone(), traversal_path.clone(), block_height as u64, ) .await .expect("failed to insert nodes"); + // update saved state height + storage.set_last_state_height(block_height).await.unwrap(); let merkle_proof = >::get_path( @@ -3214,7 +3386,7 @@ mod test { let mut storage = SqlStorage::connect(db.config()).await.unwrap(); // define a test tree - let mut test_tree = MockMerkleTree::new(3); + let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height()); let block_height = 1; //insert an entry into the tree @@ -3241,12 +3413,14 @@ mod test { // insert merkle nodes >::insert_merkle_nodes( &mut storage, - proof.proof.clone(), + proof.clone(), traversal_path.clone(), block_height as u64, ) .await .expect("failed to insert nodes"); + // update saved state height + storage.set_last_state_height(block_height).await.unwrap(); } test_tree.update(1, 100).unwrap(); @@ -3258,7 +3432,7 @@ mod test { // insert merkle nodes >::insert_merkle_nodes( &mut storage, - proof.proof.clone(), + proof.clone(), traversal_path.clone(), block_height as u64, ) @@ -3318,7 +3492,7 @@ mod test { .unwrap(); >::insert_merkle_nodes( &mut storage, - proof.proof.clone(), + proof.clone(), traversal_path.clone(), 2_u64, ) @@ -3348,4 +3522,245 @@ mod test { assert!(merkle_path.is_err()); } + + #[async_std::test] + async fn test_merklized_state_snapshot() { + setup_test(); + + let db = TmpDb::init().await; + let mut storage = SqlStorage::connect(db.config()).await.unwrap(); + + // Define a test tree + let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height()); + + // We will sample random keys as u32. This is a value that is not a valid u32 and thus is a + // key we will never insert into the tree. + const RESERVED_KEY: usize = (u32::MAX as usize) + 1; + + // Randomly insert and delete some entries. For each entry we insert, we also keep track of + // whether the entry should be in the tree using a HashMap. + #[tracing::instrument(skip(tree, expected))] + fn randomize(tree: &mut MockMerkleTree, expected: &mut HashMap>) { + let mut rng = rand::thread_rng(); + tracing::info!("randomizing tree"); + + for _ in 0..50 { + // We flip a coin to decide whether to insert or delete, unless the tree is empty, + // in which case we can only insert. + if !expected.values().any(|v| v.is_some()) || rng.next_u32() % 2 == 0 { + // Insert. + let key = rng.next_u32() as usize; + let val = rng.next_u32() as usize; + tracing::info!(key, val, "inserting"); + + tree.update(key, val).unwrap(); + expected.insert(key, Some(val)); + } else { + // Delete. + let key = expected + .iter() + .filter_map(|(k, v)| if v.is_some() { Some(k) } else { None }) + .choose(&mut rng) + .unwrap(); + tracing::info!(key, "deleting"); + + tree.remove(key).unwrap(); + expected.insert(*key, None); + } + } + } + + // Commit the tree to storage. + #[tracing::instrument(skip(storage, tree, expected))] + async fn store( + storage: &mut SqlStorage, + tree: &MockMerkleTree, + expected: &HashMap>, + block_height: u64, + ) { + tracing::info!("persisting tree"); + + for key in expected.keys() { + let proof = match tree.universal_lookup(key) { + LookupResult::Ok(_, proof) => proof, + LookupResult::NotFound(proof) => proof, + LookupResult::NotInMemory => panic!("failed to find key {key}"), + }; + let traversal_path = ToTraversalPath::<8>::to_traversal_path(key, tree.height()); + UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes( + storage, + proof, + traversal_path, + block_height, + ) + .await + .unwrap(); + } + // insert the header with merkle commitment + storage + .query_opt( + "INSERT INTO HEADER VALUES ($1, $2, 'hash', 0, $3)", + [ + sql_param(&(block_height as i64)), + sql_param(&format!("hash{block_height}")), + sql_param(&serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(tree.commitment()).unwrap()})), + ], + ) + .await + .unwrap(); + storage + .set_last_state_height(block_height as usize) + .await + .unwrap(); + storage.commit().await.unwrap(); + } + + #[tracing::instrument(skip(storage, tree, expected))] + async fn validate( + storage: &SqlStorage, + tree: &MockMerkleTree, + expected: &HashMap>, + block_height: u64, + ) { + tracing::info!("validating snapshot"); + + // Check that we can get a correct path for each key that we touched. + let snapshot = Snapshot::<_, MockMerkleTree, 8>::Index(block_height); + let loaded = storage.get_snapshot(snapshot).await.unwrap(); + for (key, val) in expected { + let proof = match loaded.universal_lookup(key) { + LookupResult::Ok(_, proof) => proof, + LookupResult::NotFound(proof) => proof, + LookupResult::NotInMemory => panic!("failed to find key {key}"), + }; + assert_eq!(proof, storage.get_path(snapshot, *key).await.unwrap()); + assert_eq!(val.as_ref(), proof.elem()); + // Check path is valid for test_tree + if val.is_some() { + MockMerkleTree::verify(tree.commitment().digest(), key, proof) + .unwrap() + .unwrap(); + } else { + assert!(tree.non_membership_verify(key, proof).unwrap()); + } + } + + // Check that we can even get a non-membership proof for a key that we never touched. + let proof = match loaded.universal_lookup(RESERVED_KEY) { + LookupResult::Ok(_, proof) => proof, + LookupResult::NotFound(proof) => proof, + LookupResult::NotInMemory => panic!("failed to find reserved key {RESERVED_KEY}"), + }; + assert_eq!( + proof, + storage.get_path(snapshot, RESERVED_KEY).await.unwrap() + ); + assert_eq!(proof.elem(), None); + // Check path is valid for test_tree + assert!(tree.non_membership_verify(RESERVED_KEY, proof).unwrap()); + } + + // Create a randomized Merkle tree. + let mut expected = HashMap::>::new(); + randomize(&mut test_tree, &mut expected); + + // Commit the randomized tree to storage. + store(&mut storage, &test_tree, &expected, 1).await; + validate(&storage, &test_tree, &expected, 1).await; + + // Make random edits and commit another snapshot. + let mut expected2 = expected.clone(); + let mut test_tree2 = test_tree.clone(); + randomize(&mut test_tree2, &mut expected2); + store(&mut storage, &test_tree2, &expected2, 2).await; + validate(&storage, &test_tree2, &expected2, 2).await; + + // Ensure the original snapshot is still valid. + validate(&storage, &test_tree, &expected, 1).await; + } + + #[async_std::test] + async fn test_merklized_state_missing_leaf() { + // Check that if a leaf is missing but its ancestors are present/key is in the tree, we + // catch it rather than interpreting the entry as an empty node by default. Note that this + // scenario should be impossible in normal usage, since we never store or delete partial + // paths. But we should never return an invalid proof even in extreme cases like database + // corruption. + setup_test(); + + for tree_size in 1..=3 { + let db = TmpDb::init().await; + let mut storage = SqlStorage::connect(db.config()).await.unwrap(); + + // Define a test tree + let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height()); + for i in 0..tree_size { + test_tree.update(i, i).unwrap(); + } + + // Insert a header with the tree commitment. + storage + .query_opt( + "INSERT INTO HEADER VALUES (0, 'hash', 'hash', 0, $1)", + [ + sql_param(&serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()})), + ], + ) + .await + .unwrap(); + + // Insert Merkle nodes. + for i in 0..tree_size { + let proof = test_tree.lookup(i).expect_ok().unwrap().1; + let traversal_path = + ToTraversalPath::<8>::to_traversal_path(&i, test_tree.height()); + UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes( + &mut storage, + proof, + traversal_path, + 0, + ) + .await + .unwrap(); + } + storage.set_last_state_height(0).await.unwrap(); + storage.commit().await.unwrap(); + + // Test that we can get all the entries. + let snapshot = Snapshot::::Index(0); + for i in 0..tree_size { + let proof = test_tree.lookup(i).expect_ok().unwrap().1; + assert_eq!(proof, storage.get_path(snapshot, i).await.unwrap()); + assert_eq!(*proof.elem().unwrap(), i); + } + + // Now delete the leaf node for the last entry we inserted, corrupting the database. + let index = serde_json::to_value(tree_size - 1).unwrap(); + storage + .transaction() + .await + .unwrap() + .execute_one_with_retries( + &format!( + "DELETE FROM {} WHERE index = $1", + MockMerkleTree::state_type() + ), + [index], + ) + .await + .unwrap(); + storage.commit().await.unwrap(); + + // Test that we can still get the entries we didn't delete. + for i in 0..tree_size - 1 { + let proof = test_tree.lookup(i).expect_ok().unwrap().1; + assert_eq!(proof, storage.get_path(snapshot, i).await.unwrap()); + assert_eq!(*proof.elem().unwrap(), i); + } + + // Looking up the entry we deleted fails, rather than return an invalid path. + let err = storage.get_path(snapshot, tree_size - 1).await.unwrap_err(); + assert!(matches!(err, QueryError::Missing)); + } + } } diff --git a/src/merklized_state/data_source.rs b/src/merklized_state/data_source.rs index b4234f379..8743dc9ec 100644 --- a/src/merklized_state/data_source.rs +++ b/src/merklized_state/data_source.rs @@ -22,11 +22,9 @@ use derivative::Derivative; use derive_more::Display; use hotshot_types::traits::node_implementation::NodeType; -use jf_primitives::merkle_tree::prelude::MerkleProof; -use jf_primitives::merkle_tree::DigestAlgorithm; use jf_primitives::merkle_tree::{ - prelude::MerklePath, Element, Index, MerkleCommitment, MerkleTreeScheme, NodeValue, - ToTraversalPath, + prelude::MerkleProof, DigestAlgorithm, Element, ForgetableMerkleTreeScheme, Index, + MerkleCommitment, NodeValue, ToTraversalPath, }; use serde::{de::DeserializeOwned, Serialize}; use std::fmt::Display; @@ -50,6 +48,25 @@ where snapshot: Snapshot, key: State::Key, ) -> QueryResult>; + + /// List all keys in the given snapshot of the state. + /// + /// This includes all keys _known to_ the snapshot, or in other words all keys for which + /// [`get_path`](Self::get_path) will successfully return a [`MerkleProof`]. In particular this + /// may include keys which are _not present_ in the Merklized state, but whose absence is + /// specifically known to the dadtabase, such as keys which were inserted in a previous snapshot + /// and then deleted. For these keys, [`get_path`](Self::get_path) will return a non-membership + /// proof; for all other keys it will return a membership proof. + async fn keys(&self, snapshot: Snapshot) -> QueryResult>; + + /// Load a complete snapshot of the given state. + /// + /// The result is a complete Merkle tree, which can be queried for any entry, present or absent, + /// and will always successfully return either a presence or absence proof. + /// + /// This function may be extremely expensive, so should be used infrequently, for example at + /// initialization time only. + async fn get_snapshot(&self, snapshot: Snapshot) -> QueryResult; } /// This trait defines methods for updating the storage with the merkle tree state. @@ -59,7 +76,7 @@ pub trait UpdateStateData, { async fn insert_merkle_nodes( &mut self, - path: MerklePath, + path: MerkleProof, traversal_path: Vec, block_number: u64, ) -> QueryResult<()>; @@ -72,16 +89,18 @@ pub trait MerklizedStateHeightPersistence { } type StateCommitment = >::Commit; + +/// Snapshot can be queried by block height (index) or merkle tree commitment #[derive(Derivative, Display)] #[derivative(Ord = "feature_allow_slow_enum")] #[derivative( + Copy(bound = ""), Debug(bound = ""), PartialEq(bound = ""), Eq(bound = ""), Ord(bound = ""), Hash(bound = "") )] -// Snapshot can be queried by block height (index) or merkle tree commitment pub enum Snapshot, const ARITY: usize> { #[display(fmt = "{_0}")] Commit(StateCommitment), @@ -89,6 +108,14 @@ pub enum Snapshot, const ARITY: Index(u64), } +impl, Types: NodeType, const ARITY: usize> Clone + for Snapshot +{ + fn clone(&self) -> Self { + *self + } +} + impl, Types: NodeType, const ARITY: usize> PartialOrd for Snapshot { @@ -100,7 +127,7 @@ impl, Types: NodeType, const ARITY: usize> Parti /// This trait should be implemented by the MerkleTree that the module is initialized for. /// It defines methods utilized by the module. pub trait MerklizedState: - MerkleTreeScheme + Send + Sync + Clone + 'static + ForgetableMerkleTreeScheme + Send + Sync + Clone + 'static where Types: NodeType, { @@ -139,4 +166,11 @@ where /// Get the height of the tree fn tree_height() -> usize; + + /// Insert a forgotten path into the tree. + fn insert_path( + &mut self, + key: Self::Key, + proof: &MerkleProof, + ) -> anyhow::Result<()>; } diff --git a/src/testing/mocks.rs b/src/testing/mocks.rs index 9ec5280be..bab8ceb04 100644 --- a/src/testing/mocks.rs +++ b/src/testing/mocks.rs @@ -33,8 +33,9 @@ use hotshot_types::{ }; use jf_primitives::merkle_tree::{ - prelude::{Sha3Digest, Sha3Node}, + prelude::{MerkleProof, Sha3Digest, Sha3Node}, universal_merkle_tree::UniversalMerkleTree, + ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, }; use serde::{Deserialize, Serialize}; use std::ops::Range; @@ -134,6 +135,18 @@ impl MerklizedState for MockMerkleTree { } fn tree_height() -> usize { - 3 + 12 + } + + fn insert_path( + &mut self, + key: Self::Key, + proof: &MerkleProof, + ) -> anyhow::Result<()> { + match proof.elem() { + Some(elem) => self.remember(key, elem, proof)?, + None => self.non_membership_remember(key, proof)?, + } + Ok(()) } }