diff --git a/Cargo.lock b/Cargo.lock index 882811c668..2ba9772212 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6075,6 +6075,8 @@ dependencies = [ "prost-types 0.12.3", "rand", "serde_json", + "tagged", + "tagged-debug-derive", ] [[package]] @@ -6288,6 +6290,8 @@ dependencies = [ "serde_json", "serde_with", "sha3", + "tagged", + "tagged-debug-derive", "thiserror", "vergen", ] diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 052af80e03..df929d24b5 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -27,6 +27,8 @@ serde_json = { workspace = true, features = [ ] } serde_with = { workspace = true } sha3 = { workspace = true } +tagged = { path = "../tagged" } +tagged-debug-derive = { path = "../tagged-debug-derive" } thiserror = { workspace = true } [dev-dependencies] diff --git a/crates/gateway-types/src/class_definition.rs b/crates/common/src/class_definition.rs similarity index 51% rename from crates/gateway-types/src/class_definition.rs rename to crates/common/src/class_definition.rs index b5e8a30a8b..1806655d03 100644 --- a/crates/gateway-types/src/class_definition.rs +++ b/crates/common/src/class_definition.rs @@ -1,14 +1,16 @@ use std::borrow::Cow; +use std::fmt; use fake::{Dummy, Fake, Faker}; use pathfinder_crypto::Felt; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; +use serde_with::serde_as; -use crate::request::contract::{SelectorAndFunctionIndex, SelectorAndOffset}; +use crate::{ByteCodeOffset, EntryPoint}; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Dummy)] pub enum ClassDefinition<'a> { Sierra(Sierra<'a>), Cairo(Cairo<'a>), @@ -87,3 +89,85 @@ pub struct CairoEntryPoints { #[serde(rename = "CONSTRUCTOR")] pub constructor: Vec, } + +#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Hash, Eq)] +#[serde(deny_unknown_fields)] +pub enum EntryPointType { + #[serde(rename = "EXTERNAL")] + External, + #[serde(rename = "L1_HANDLER")] + L1Handler, + #[serde(rename = "CONSTRUCTOR")] + Constructor, +} + +impl fmt::Display for EntryPointType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use EntryPointType::*; + f.pad(match self { + External => "EXTERNAL", + L1Handler => "L1_HANDLER", + Constructor => "CONSTRUCTOR", + }) + } +} + +#[serde_as] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct SelectorAndOffset { + pub selector: EntryPoint, + #[serde_as(as = "OffsetSerde")] + pub offset: ByteCodeOffset, +} + +#[derive(serde::Deserialize, serde::Serialize)] +#[serde(untagged)] +pub enum OffsetSerde { + HexStr(Felt), + Decimal(u64), +} + +impl serde_with::SerializeAs for OffsetSerde { + fn serialize_as(source: &ByteCodeOffset, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::Serialize; + + Felt::serialize(&source.0, serializer) + } +} + +impl<'de> serde_with::DeserializeAs<'de, ByteCodeOffset> for OffsetSerde { + fn deserialize_as(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + use serde::Deserialize; + + let offset = OffsetSerde::deserialize(deserializer)?; + let offset = match offset { + OffsetSerde::HexStr(felt) => felt, + OffsetSerde::Decimal(decimal) => Felt::from_u64(decimal), + }; + Ok(ByteCodeOffset(offset)) + } +} + +impl Dummy for SelectorAndOffset { + fn dummy_with_rng(_: &T, rng: &mut R) -> Self { + Self { + selector: Faker.fake_with_rng(rng), + offset: ByteCodeOffset(Felt::from_u64(rng.gen())), + } + } +} + +/// Descriptor of an entry point in a Sierra class. +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Dummy)] +#[serde(deny_unknown_fields)] +pub struct SelectorAndFunctionIndex { + pub selector: EntryPoint, + pub function_idx: u64, +} diff --git a/crates/common/src/event.rs b/crates/common/src/event.rs index aa504f5581..3942b6f75c 100644 --- a/crates/common/src/event.rs +++ b/crates/common/src/event.rs @@ -4,11 +4,13 @@ use fake::Dummy; use num_bigint::BigUint; use pathfinder_crypto::Felt; use serde_with::serde_conv; +use tagged::Tagged; +use tagged_debug_derive::TaggedDebug; use crate::{ContractAddress, EventData, EventKey}; #[serde_with::serde_as] -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq, Dummy)] +#[derive(Clone, serde::Deserialize, serde::Serialize, PartialEq, Eq, Dummy, TaggedDebug)] #[serde(deny_unknown_fields)] pub struct Event { #[serde_as(as = "Vec")] diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 00a7d66a74..79f415ebf3 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -14,6 +14,7 @@ use pathfinder_crypto::Felt; use primitive_types::H160; use serde::{Deserialize, Serialize}; +pub mod class_definition; pub mod consts; pub mod event; pub mod hash; diff --git a/crates/common/src/state_update.rs b/crates/common/src/state_update.rs index 9fe575d449..ab67e28a6c 100644 --- a/crates/common/src/state_update.rs +++ b/crates/common/src/state_update.rs @@ -27,7 +27,7 @@ pub struct StateUpdate { pub declared_sierra_classes: HashMap, } -#[derive(Default, Debug, Clone, PartialEq)] +#[derive(Default, Debug, Clone, PartialEq, Dummy)] pub struct StateUpdateData { pub contract_updates: HashMap, pub system_contract_updates: HashMap, @@ -312,6 +312,20 @@ impl StateUpdateData { cairo: self.declared_cairo_classes.clone(), } } + + pub fn state_diff_length(&self) -> usize { + let mut len = 0; + self.contract_updates.iter().for_each(|(_, update)| { + len += update.storage.len(); + len += usize::from(update.nonce.is_some()); + len += usize::from(update.class.is_some()); + }); + self.system_contract_updates.iter().for_each(|(_, update)| { + len += update.storage.len(); + }); + len += self.declared_cairo_classes.len() + self.declared_sierra_classes.len(); + len + } } impl From for StateUpdateData { diff --git a/crates/gateway-client/src/lib.rs b/crates/gateway-client/src/lib.rs index 0ef97cc086..35d45b1af9 100644 --- a/crates/gateway-client/src/lib.rs +++ b/crates/gateway-client/src/lib.rs @@ -637,9 +637,9 @@ mod tests { mod add_transaction { use std::collections::HashMap; + use pathfinder_common::class_definition::{EntryPointType, SelectorAndOffset}; use pathfinder_common::ContractAddress; use starknet_gateway_types::request::add_transaction::CairoContractDefinition; - use starknet_gateway_types::request::contract::{EntryPointType, SelectorAndOffset}; use super::*; @@ -739,8 +739,8 @@ mod tests { } mod declare { + use pathfinder_common::class_definition::SelectorAndFunctionIndex; use starknet_gateway_types::request::add_transaction::SierraContractDefinition; - use starknet_gateway_types::request::contract::SelectorAndFunctionIndex; use super::*; diff --git a/crates/gateway-types/src/class_hash.rs b/crates/gateway-types/src/class_hash.rs index f6999556e7..9ec2fb4a8c 100644 --- a/crates/gateway-types/src/class_hash.rs +++ b/crates/gateway-types/src/class_hash.rs @@ -1,12 +1,11 @@ use anyhow::{Context, Error, Result}; +use pathfinder_common::class_definition::EntryPointType::*; use pathfinder_common::{felt_bytes, ClassHash}; use pathfinder_crypto::hash::{HashChain, PoseidonHasher}; use pathfinder_crypto::Felt; use serde::Serialize; use sha3::Digest; -use crate::request::contract::EntryPointType; - #[derive(Debug, PartialEq)] pub enum ComputedClassHash { Cairo(ClassHash), @@ -99,12 +98,15 @@ pub mod from_parts { use std::collections::HashMap; use anyhow::Result; + use pathfinder_common::class_definition::{ + EntryPointType, + SelectorAndOffset, + SierraEntryPoints, + }; use pathfinder_common::ClassHash; use pathfinder_crypto::Felt; use super::json; - use crate::class_definition::SierraEntryPoints; - use crate::request::contract::{EntryPointType, SelectorAndOffset}; pub fn compute_cairo_class_hash( abi: &[u8], @@ -178,8 +180,6 @@ pub mod from_parts { fn compute_cairo_class_hash( mut contract_definition: json::CairoContractDefinition<'_>, ) -> Result { - use EntryPointType::*; - // the other modification is handled by skipping if the attributes vec is empty contract_definition.program.debug_info = None; @@ -390,8 +390,6 @@ fn compute_cairo_class_hash( fn compute_sierra_class_hash( contract_definition: json::SierraContractDefinition<'_>, ) -> Result { - use EntryPointType::*; - if contract_definition.contract_class_version != "0.1.0" { anyhow::bail!("Unsupported Sierra class version"); } @@ -541,7 +539,11 @@ mod json { use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; - use crate::request::contract::{EntryPointType, SelectorAndFunctionIndex, SelectorAndOffset}; + use pathfinder_common::class_definition::{ + EntryPointType, + SelectorAndFunctionIndex, + SelectorAndOffset, + }; pub enum ContractDefinition<'a> { Cairo(CairoContractDefinition<'a>), diff --git a/crates/gateway-types/src/lib.rs b/crates/gateway-types/src/lib.rs index 7b27df8dc4..3819d5d432 100644 --- a/crates/gateway-types/src/lib.rs +++ b/crates/gateway-types/src/lib.rs @@ -1,4 +1,3 @@ -pub mod class_definition; pub mod class_hash; pub mod error; pub mod reply; diff --git a/crates/gateway-types/src/request.rs b/crates/gateway-types/src/request.rs index 203102bff1..4cf781794c 100644 --- a/crates/gateway-types/src/request.rs +++ b/crates/gateway-types/src/request.rs @@ -119,100 +119,14 @@ impl From for pathfinder_common::BlockId { } } -pub mod contract { - use std::fmt; - - use fake::{Dummy, Fake, Faker}; - use pathfinder_common::{ByteCodeOffset, EntryPoint}; - use pathfinder_crypto::Felt; - use serde_with::serde_as; - - #[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Hash, Eq)] - #[serde(deny_unknown_fields)] - pub enum EntryPointType { - #[serde(rename = "EXTERNAL")] - External, - #[serde(rename = "L1_HANDLER")] - L1Handler, - #[serde(rename = "CONSTRUCTOR")] - Constructor, - } - - impl fmt::Display for EntryPointType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use EntryPointType::*; - f.pad(match self { - External => "EXTERNAL", - L1Handler => "L1_HANDLER", - Constructor => "CONSTRUCTOR", - }) - } - } - - #[serde_as] - #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq)] - #[serde(deny_unknown_fields)] - pub struct SelectorAndOffset { - pub selector: EntryPoint, - #[serde_as(as = "OffsetSerde")] - pub offset: ByteCodeOffset, - } - - #[derive(serde::Deserialize, serde::Serialize)] - #[serde(untagged)] - pub enum OffsetSerde { - HexStr(Felt), - Decimal(u64), - } - - impl serde_with::SerializeAs for OffsetSerde { - fn serialize_as(source: &ByteCodeOffset, serializer: S) -> Result - where - S: serde::Serializer, - { - use serde::Serialize; - - Felt::serialize(&source.0, serializer) - } - } - - impl<'de> serde_with::DeserializeAs<'de, ByteCodeOffset> for OffsetSerde { - fn deserialize_as(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - use serde::Deserialize; - - let offset = OffsetSerde::deserialize(deserializer)?; - let offset = match offset { - OffsetSerde::HexStr(felt) => felt, - OffsetSerde::Decimal(decimal) => Felt::from_u64(decimal), - }; - Ok(ByteCodeOffset(offset)) - } - } - - impl Dummy for SelectorAndOffset { - fn dummy_with_rng(_: &T, rng: &mut R) -> Self { - Self { - selector: Faker.fake_with_rng(rng), - offset: ByteCodeOffset(Felt::from_u64(rng.gen())), - } - } - } - - /// Descriptor of an entry point in a Sierra class. - #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Dummy)] - #[serde(deny_unknown_fields)] - pub struct SelectorAndFunctionIndex { - pub selector: EntryPoint, - pub function_idx: u64, - } -} - pub mod add_transaction { use std::collections::HashMap; + use pathfinder_common::class_definition::{ + EntryPointType, + SelectorAndFunctionIndex, + SelectorAndOffset, + }; use pathfinder_common::{ AccountDeploymentDataElem, CasmHash, @@ -227,7 +141,6 @@ pub mod add_transaction { use pathfinder_serde::{CallParamAsDecimalStr, TransactionSignatureElemAsDecimalStr}; use serde_with::serde_as; - use super::contract::{EntryPointType, SelectorAndFunctionIndex, SelectorAndOffset}; use super::{CallParam, ContractAddress, Fee, TransactionSignatureElem}; use crate::reply::transaction::{DataAvailabilityMode, ResourceBounds}; @@ -465,12 +378,11 @@ pub mod add_transaction { } mod byte_code_offset { + use pathfinder_common::class_definition::SelectorAndOffset; use pathfinder_common::macro_prelude::*; use pathfinder_common::ByteCodeOffset; use pathfinder_crypto::Felt; - use crate::request::contract::SelectorAndOffset; - #[test] fn with_hex_offset() { let json = r#"{ diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index f362b28f94..2373aa7863 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -48,7 +48,8 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha2 = { workspace = true } sha3 = { workspace = true } - +tagged = { path = "../tagged" } +tagged-debug-derive = { path = "../tagged-debug-derive" } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/crates/p2p/src/client/conv.rs b/crates/p2p/src/client/conv.rs index ecec8589e1..57ab59c19f 100644 --- a/crates/p2p/src/client/conv.rs +++ b/crates/p2p/src/client/conv.rs @@ -6,6 +6,7 @@ use std::borrow::Cow; use std::io::Read; use anyhow::Context; +use p2p_proto::class::{Cairo0Class, Cairo1Class, Cairo1EntryPoints, SierraEntryPoint}; use p2p_proto::common::{Address, Hash}; use p2p_proto::receipt::execution_resources::BuiltinCounter; use p2p_proto::receipt::{ @@ -19,6 +20,12 @@ use p2p_proto::receipt::{ ReceiptCommon, }; use p2p_proto::transaction::AccountSignature; +use pathfinder_common::class_definition::{ + Cairo, + SelectorAndFunctionIndex, + SelectorAndOffset, + Sierra, +}; use pathfinder_common::event::Event; use pathfinder_common::receipt::{ BuiltinCounters, @@ -895,3 +902,82 @@ impl TryFromDto for SierraDefinition { Ok(Self(sierra)) } } + +impl ToDto for Sierra<'_> { + fn to_dto(self) -> Cairo1Class { + let into_dto = |x: SelectorAndFunctionIndex| SierraEntryPoint { + selector: x.selector.0, + index: x.function_idx, + }; + + let entry_points = Cairo1EntryPoints { + externals: self + .entry_points_by_type + .external + .into_iter() + .map(into_dto) + .collect(), + l1_handlers: self + .entry_points_by_type + .l1_handler + .into_iter() + .map(into_dto) + .collect(), + constructors: self + .entry_points_by_type + .constructor + .into_iter() + .map(into_dto) + .collect(), + }; + + Cairo1Class { + abi: self.abi.to_string(), + program: self.sierra_program, + entry_points, + contract_class_version: self.contract_class_version.into(), + } + } +} + +impl ToDto for Cairo<'_> { + fn to_dto(self) -> Cairo0Class { + let into_dto = |x: SelectorAndOffset| p2p_proto::class::EntryPoint { + selector: x.selector.0, + offset: u64::from_be_bytes( + x.offset.0.as_be_bytes()[24..] + .try_into() + .expect("slice len matches"), + ), + }; + + let mut gzip_encoder = + flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast()); + serde_json::to_writer(&mut gzip_encoder, &self.program).unwrap(); + let program = gzip_encoder.finish().unwrap(); + let program = base64::encode(program); + + Cairo0Class { + abi: self.abi.to_string(), + externals: self + .entry_points_by_type + .external + .into_iter() + .map(into_dto) + .collect(), + l1_handlers: self + .entry_points_by_type + .l1_handler + .into_iter() + .map(into_dto) + .collect(), + constructors: self + .entry_points_by_type + .constructor + .into_iter() + .map(into_dto) + .collect(), + program, + } + } +} diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index 62b4538e11..edb2f9d735 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -49,8 +49,15 @@ use pathfinder_common::{ TransactionHash, TransactionIndex, }; +use tagged::Tagged; +use tagged_debug_derive::TaggedDebug; use tokio::sync::RwLock; +#[cfg(test)] +mod fixtures; +#[cfg(test)] +mod tests; + use crate::client::conv::{CairoDefinition, FromDto, SierraDefinition, TryFromDto}; use crate::client::peer_aware; use crate::sync::protocol; @@ -104,7 +111,7 @@ impl> Dummy for PeerData { } } -#[derive(Clone, Debug)] +#[derive(Clone, PartialEq, Dummy, TaggedDebug)] pub enum ClassDefinition { Cairo { block_number: BlockNumber, @@ -333,7 +340,7 @@ impl Client { /// determining if the class was really deployed or replaced__. pub fn state_diff_stream( self, - mut start: BlockNumber, + start: BlockNumber, stop: BlockNumber, state_diff_length_and_commitment_stream: impl futures::Stream< Item = anyhow::Result<(usize, StateDiffCommitment)>, @@ -341,350 +348,89 @@ impl Client { ) -> impl futures::Stream< Item = Result, PeerData>, > { - tracing::trace!(?start, ?stop, "Streaming state diffs"); - - async_stream::try_stream! { - pin_mut!(state_diff_length_and_commitment_stream); - - let mut current_count_outer = None; - let mut current_commitment = Default::default(); - - if start <= stop { - // Loop which refreshes peer set once we exhaust it. - 'outer: loop { - let peers = self + let inner = self.inner.clone(); + let outer = self; + make_state_diff_stream( + start, + stop, + state_diff_length_and_commitment_stream, + move || { + let outer = outer.clone(); + async move { + outer .get_update_peers_with_sync_capability(protocol::StateDiffs::NAME) - .await; - - // Attempt each peer. - 'next_peer: for peer in peers { - let peer_err = |e: anyhow::Error| PeerData::new(peer, e); - let limit = stop.get() - start.get() + 1; - - let request = StateDiffsRequest { - iteration: Iteration { - start: start.get().into(), - direction: Direction::Forward, - limit, - step: 1.into(), - }, - }; - - let mut responses = match self - .inner - .send_state_diffs_sync_request(peer, request) - .await - { - Ok(x) => x, - Err(error) => { - // Failed to establish connection, try next peer. - tracing::debug!(%peer, reason=%error, "State diffs request failed"); - continue 'next_peer; - } - }; - - let mut current_count = match current_count_outer { - // Still the same block - Some(backup) => backup, - // Move to the next block - None => { - let (count, commitment) = state_diff_length_and_commitment_stream - .next() - .await - .with_context(|| { - format!("Stream terminated prematurely at block {start}") - }) - .map_err(peer_err)? - .map_err(peer_err)?; - current_count_outer = Some(count); - current_commitment = commitment; - count - } - }; - - tracing::trace!(block_number=%start, expected_responses=%current_count, "Expecting state diff responses"); - - let mut state_diff = StateUpdateData::default(); - - while let Some(state_diff_response) = responses.next().await { - tracing::trace!(?state_diff_response, "Received response"); - - match state_diff_response { - StateDiffsResponse::ContractDiff(ContractDiff { - address, - nonce, - class_hash, - values, - domain: _, - }) => { - let address = ContractAddress(address.0); - - match current_count.checked_sub(values.len()) { - Some(x) => current_count = x, - None => { - tracing::debug!(%peer, %start, "Too many storage diffs: {} > {}", values.len(), current_count); - // TODO punish the peer - continue 'next_peer; - } - } - - if address == ContractAddress::ONE { - let storage = &mut state_diff - .system_contract_updates - .entry(address) - .or_default() - .storage; - values.into_iter().for_each( - |ContractStoredValue { key, value }| { - storage - .insert(StorageAddress(key), StorageValue(value)); - }, - ); - } else { - let update = &mut state_diff - .contract_updates - .entry(address) - .or_default(); - values.into_iter().for_each( - |ContractStoredValue { key, value }| { - update - .storage - .insert(StorageAddress(key), StorageValue(value)); - }, - ); - - if let Some(nonce) = nonce { - match current_count.checked_sub(1) { - Some(x) => current_count = x, - None => { - tracing::debug!(%peer, %start, "Too many nonce updates"); - // TODO punish the peer - continue 'next_peer; - } - } - - update.nonce = Some(ContractNonce(nonce)); - } - - if let Some(class_hash) = class_hash.map(|x| ClassHash(x.0)) { - match current_count.checked_sub(1) { - Some(x) => current_count = x, - None => { - tracing::debug!(%peer, %start, "Too many deployed contracts"); - // TODO punish the peer - continue 'next_peer; - } - } - - update.class = - Some(ContractClassUpdate::Deploy(class_hash)); - } - } - } - StateDiffsResponse::DeclaredClass(DeclaredClass { - class_hash, - compiled_class_hash, - }) => { - if let Some(compiled_class_hash) = compiled_class_hash { - state_diff.declared_sierra_classes.insert( - SierraHash(class_hash.0), - CasmHash(compiled_class_hash.0), - ); - } else { - state_diff - .declared_cairo_classes - .insert(ClassHash(class_hash.0)); - } - - match current_count.checked_sub(1) { - Some(x) => current_count = x, - None => { - tracing::debug!(%peer, %start, "Too many declared classes"); - // TODO punish the peer - continue 'next_peer; - } - } - } - StateDiffsResponse::Fin => { - if state_diff.is_empty() { - if start == stop { - // We're done, terminate the stream - break 'outer; - } - } else { - tracing::debug!(%peer, "Premature state diff stream Fin"); - // TODO punish the peer - continue 'next_peer; - } - } - }; - - if current_count == 0 { - // All the counters for this block have been exhausted which means - // that the state update for this block is complete. - tracing::trace!(block_number=%start, "State diff received for block"); - - yield PeerData::new( - peer, - (UnverifiedStateUpdateData { - expected_commitment: std::mem::take(&mut current_commitment), - state_diff: std::mem::take(&mut state_diff), - }, start), - ); - - if start < stop { - // Move to the next block - start += 1; - tracing::trace!(next_block=%start, "Moving to next block"); - let (count, commitment) = state_diff_length_and_commitment_stream.next().await - .ok_or_else(|| anyhow::anyhow!("Contract update counts stream terminated prematurely at block {start}")) - .map_err(peer_err)? - .map_err(peer_err)?; - current_count = count; - current_count_outer = Some(current_count); - current_commitment = commitment; - - tracing::trace!(number=%current_count, "Expecting state diff responses"); - } - } - } - } + .await } - } - } + }, + move |peer, request| { + let inner = inner.clone(); + async move { inner.send_state_diffs_sync_request(peer, request).await } + }, + ) } pub fn class_definition_stream( self, - mut start: BlockNumber, + start: BlockNumber, stop: BlockNumber, declared_class_counts_stream: impl futures::Stream>, ) -> impl futures::Stream, PeerData>> { - tracing::trace!(?start, ?stop, "Streaming classes"); - - async_stream::try_stream! { - pin_mut!(declared_class_counts_stream); - - let mut current_count_outer = None; - - if start <= stop { - // Loop which refreshes peer set once we exhaust it. - 'outer: loop { - let peers = self + let inner = self.inner.clone(); + let outer = self; + make_class_definition_stream( + start, + stop, + declared_class_counts_stream, + move || { + let outer = outer.clone(); + async move { + outer .get_update_peers_with_sync_capability(protocol::Classes::NAME) - .await; - - // Attempt each peer. - 'next_peer: for peer in peers { - let peer_err = |e: anyhow::Error| PeerData::new(peer, e); - let limit = stop.get() - start.get() + 1; - - let request = ClassesRequest { - iteration: Iteration { - start: start.get().into(), - direction: Direction::Forward, - limit, - step: 1.into(), - }, - }; - - let mut responses = - match self.inner.send_classes_sync_request(peer, request).await { - Ok(x) => x, - Err(error) => { - // Failed to establish connection, try next peer. - tracing::debug!(%peer, reason=%error, "Classes request failed"); - continue 'next_peer; - } - }; - - let mut current_count = match current_count_outer { - // Still the same block - Some(backup) => backup, - // Move to the next block - None => { - let x = declared_class_counts_stream.next().await - .ok_or_else(|| anyhow::anyhow!("Declared class counts stream terminated prematurely at block {start}")) - .map_err(peer_err)? - .map_err(peer_err)?; - current_count_outer = Some(x); - x - } - }; - - while start <= stop { - tracing::trace!(block_number=%start, expected_classes=%current_count, "Expecting class definition responses"); - - let mut class_definitions = Vec::new(); - - while current_count > 0 { - if let Some(class_definition) = responses.next().await { - match class_definition { - ClassesResponse::Class(p2p_proto::class::Class::Cairo0 { - class, - domain: _, - }) => { - let CairoDefinition(definition) = - CairoDefinition::try_from_dto(class).map_err(peer_err)?; - class_definitions.push(ClassDefinition::Cairo { - block_number: start, - definition, - }); - } - ClassesResponse::Class(p2p_proto::class::Class::Cairo1 { - class, - domain: _, - }) => { - let definition = SierraDefinition::try_from_dto(class).map_err(peer_err)?; - class_definitions.push(ClassDefinition::Sierra { - block_number: start, - sierra_definition: definition.0, - }); - } - ClassesResponse::Fin => { - tracing::debug!(%peer, "Received FIN, continuing with next peer"); - continue 'next_peer; - } - } - - current_count -= 1; - } else { - // Stream closed before receiving all expected classes - tracing::debug!(%peer, "Premature class definition stream termination"); - // TODO punish the peer - continue 'next_peer; - } - } - - tracing::trace!(block_number=%start, "All classes received for block"); - - for class_definition in class_definitions { - yield PeerData::new( - peer, - class_definition, - ); - } - - if start == stop { - break 'outer; - } - - start += 1; - current_count = declared_class_counts_stream.next().await - .ok_or_else(|| anyhow::anyhow!("Declared class counts stream terminated prematurely at block {start}")) - .map_err(peer_err)? - .map_err(peer_err)?; - current_count_outer = Some(current_count); - - tracing::trace!(block_number=%start, expected_classes=%current_count, "Expecting class definition responses"); - } + .await + } + }, + move |peer, request| { + let inner = inner.clone(); + async move { inner.send_classes_sync_request(peer, request).await } + }, + ) + } - break 'outer; - } + /// ### Important + /// + /// Events are grouped by block and by transaction. The order of flattened + /// events in a block is guaranteed to be correct because the event + /// commitment is part of block hash. However the number of events per + /// transaction for __pre 0.13.2__ Starknet blocks is __TRUSTED__ + /// because neither signature nor block hash contain this information. + pub fn event_stream( + self, + start: BlockNumber, + stop: BlockNumber, + event_counts_stream: impl futures::Stream>, + ) -> impl futures::Stream< + Item = Result, PeerData>, + > { + let inner = self.inner.clone(); + let outer = self; + make_event_stream( + start, + stop, + event_counts_stream, + move || { + let outer = outer.clone(); + async move { + outer + .get_update_peers_with_sync_capability(protocol::Events::NAME) + .await } - } - } + }, + move |peer, request| { + let inner = inner.clone(); + async move { inner.send_events_sync_request(peer, request).await } + }, + ) } pub async fn events_for_block( @@ -994,136 +740,6 @@ impl Client { Ok(None) } - - /// ### Important - /// - /// Events are grouped by block and by transaction. The order of flattened - /// events in a block is guaranteed to be correct because the event - /// commitment is part of block hash. However the number of events per - /// transaction for __pre 0.13.2__ Starknet blocks is __TRUSTED__ - /// because neither signature nor block hash contain this information. - pub fn event_stream( - self, - mut start: BlockNumber, - stop: BlockNumber, - event_counts_stream: impl futures::Stream>, - ) -> impl futures::Stream>> { - tracing::trace!(?start, ?stop, "Streaming events"); - - async_stream::try_stream! { - pin_mut!(event_counts_stream); - - let mut current_count_outer = None; - - if start <= stop { - // Loop which refreshes peer set once we exhaust it. - 'outer: loop { - let peers = self - .get_update_peers_with_sync_capability(protocol::Events::NAME) - .await; - - // Attempt each peer. - 'next_peer: for peer in peers { - let limit = stop.get() - start.get() + 1; - - let request = EventsRequest { - iteration: Iteration { - start: start.get().into(), - direction: Direction::Forward, - limit, - step: 1.into(), - }, - }; - - let mut responses = - match self.inner.send_events_sync_request(peer, request).await { - Ok(x) => x, - Err(error) => { - // Failed to establish connection, try next peer. - tracing::debug!(%peer, reason=%error, "Events request failed"); - continue 'next_peer; - } - }; - - // Maintain the current transaction hash to group events by transaction - // This grouping is TRUSTED for pre 0.13.2 Starknet blocks. - let mut current_txn_hash = None; - let mut current_count = match current_count_outer { - // Still the same block - Some(backup) => backup, - // Move to the next block - None => { - let x = event_counts_stream.next().await - .ok_or_else(|| anyhow::anyhow!("Event counts stream terminated prematurely at block {start}"))??; - current_count_outer = Some(x); - x - } - }; - - while start <= stop { - tracing::trace!(block_number=%start, expected_responses=%current_count, "Expecting event responses"); - - let mut events: Vec<(TransactionHash, Vec)> = Vec::new(); - - while current_count > 0 { - if let Some(response) = responses.next().await { - match response { - EventsResponse::Event(event) => { - let txn_hash = TransactionHash(event.transaction_hash.0); - let event = Event::try_from_dto(event)?; - - match current_txn_hash { - Some(x) if x == txn_hash => { - // Same transaction - events.last_mut().expect("not empty").1.push(event); - } - None | Some(_) => { - // New transaction - events.push((txn_hash, vec![event])); - current_txn_hash = Some(txn_hash); - } - } - } - EventsResponse::Fin => { - tracing::debug!(%peer, "Received FIN, continuing with next peer"); - continue 'next_peer; - } - }; - - current_count -= 1; - } else { - // Stream closed before receiving all expected events for this block - tracing::debug!(%peer, block_number=%start, "Premature event stream termination"); - // TODO punish the peer - continue 'next_peer; - } - } - - tracing::trace!(block_number=%start, "All events received for block"); - - yield PeerData::new( - peer, - (start, std::mem::take(&mut events)), - ); - - if start == stop { - break 'outer; - } - - start += 1; - current_count = event_counts_stream.next().await - .ok_or_else(|| anyhow::anyhow!("Event counts stream terminated prematurely at block {start}"))??; - current_count_outer = Some(current_count); - - tracing::trace!(next_block=%start, expected_responses=%current_count, "Moving to next block"); - } - - break 'outer; - } - } - } - } - } } pub fn make_transaction_stream( @@ -1216,6 +832,9 @@ where receipt, }, ) => { + // FIXME + // These conversions should all be infallible OR + // we should move to the next peer when failure occurs let t = TransactionVariant::try_from_dto(transaction) .map_err(peer_err)?; let r = Receipt::try_from(( @@ -1225,16 +844,21 @@ where ), )) .map_err(peer_err)?; + match current_count.checked_sub(1) { - Some(x) => current_count = x, + Some(x) => { + current_count = x; + transactions.push((t, r)); + } None => { - tracing::debug!(%peer, %start, "Too many transactions"); + tracing::debug!(%peer, %start, %stop, "Too many transactions"); // TODO punish the peer - continue 'next_peer; + + // We can only get here in case of the last block, which means that the stream should be terminated + debug_assert!(start == stop); + break 'outer; } } - - transactions.push((t, r)); } TransactionsResponse::Fin => { if current_count == 0 { @@ -1303,45 +927,560 @@ where } } -#[derive(Clone, Debug)] -struct PeersWithCapability { - set: HashMap>, - last_update: std::time::Instant, - timeout: Duration, -} - -impl PeersWithCapability { - pub fn new(timeout: Duration) -> Self { - Self { - set: Default::default(), - last_update: std::time::Instant::now(), - timeout, - } - } +pub fn make_state_diff_stream( + mut start: BlockNumber, + stop: BlockNumber, + state_diff_length_and_commitment_stream: impl futures::Stream< + Item = anyhow::Result<(usize, StateDiffCommitment)>, + >, + get_peers: impl Fn() -> PF, + send_request: impl Fn(PeerId, StateDiffsRequest) -> RF, +) -> impl futures::Stream< + Item = Result, PeerData>, +> +where + PF: std::future::Future>, + RF: std::future::Future< + Output = anyhow::Result>, + >, +{ + tracing::trace!(?start, ?stop, "Streaming state diffs"); - /// Does not clear if elapsed, instead the caller is expected to call - /// [`Self::update`] - pub fn get(&self, capability: &str) -> Option<&HashSet> { - if self.last_update.elapsed() > self.timeout { - None - } else { - self.set.get(capability) - } - } + async_stream::try_stream! { + pin_mut!(state_diff_length_and_commitment_stream); - pub fn update(&mut self, capability: &str, peers: HashSet) { - self.last_update = std::time::Instant::now(); - self.set.insert(capability.to_owned(), peers); - } -} + let mut current_count_outer = None; + let mut current_commitment = Default::default(); -impl Default for PeersWithCapability { + if start <= stop { + // Loop which refreshes peer set once we exhaust it. + 'outer: loop { + let peers = get_peers().await; + + // Attempt each peer. + 'next_peer: for peer in peers { + let peer_err = |e: anyhow::Error| PeerData::new(peer, e); + let limit = stop.get() - start.get() + 1; + + let request = StateDiffsRequest { + iteration: Iteration { + start: start.get().into(), + direction: Direction::Forward, + limit, + step: 1.into(), + }, + }; + + let mut responses = match send_request(peer, request).await + { + Ok(x) => x, + Err(error) => { + // Failed to establish connection, try next peer. + tracing::debug!(%peer, reason=%error, "State diffs request failed"); + continue 'next_peer; + } + }; + + let mut current_count = match current_count_outer { + // Still the same block + Some(backup) => backup, + // Move to the next block + None => { + let (count, commitment) = state_diff_length_and_commitment_stream + .next() + .await + .with_context(|| { + format!("Stream terminated prematurely at block {start}") + }) + .map_err(peer_err)? + .map_err(peer_err)?; + current_count_outer = Some(count); + current_commitment = commitment; + count + } + }; + + tracing::trace!(block_number=%start, expected_responses=%current_count, "Expecting state diff responses"); + + let mut state_diff = StateUpdateData::default(); + + while let Some(state_diff_response) = responses.next().await { + tracing::trace!(?state_diff_response, "Received response"); + + match state_diff_response { + StateDiffsResponse::ContractDiff(ContractDiff { + address, + nonce, + class_hash, + values, + domain: _, + }) => { + let address = ContractAddress(address.0); + + match current_count.checked_sub(values.len()) { + Some(x) => current_count = x, + None => { + tracing::debug!(%peer, %start, "Too many storage diffs: {} > {}", values.len(), current_count); + // TODO punish the peer + + // We can only get here in case of the last block, which means that the stream should be terminated + debug_assert!(start == stop); + break 'outer; + } + } + + if address == ContractAddress::ONE { + let storage = &mut state_diff + .system_contract_updates + .entry(address) + .or_default() + .storage; + values.into_iter().for_each( + |ContractStoredValue { key, value }| { + storage + .insert(StorageAddress(key), StorageValue(value)); + }, + ); + } else { + let update = &mut state_diff + .contract_updates + .entry(address) + .or_default(); + values.into_iter().for_each( + |ContractStoredValue { key, value }| { + update + .storage + .insert(StorageAddress(key), StorageValue(value)); + }, + ); + + if let Some(nonce) = nonce { + match current_count.checked_sub(1) { + Some(x) => current_count = x, + None => { + tracing::debug!(%peer, %start, "Too many nonce updates"); + // TODO punish the peer + + // We can only get here in case of the last block, which means that the stream should be terminated + debug_assert!(start == stop); + break 'outer; + } + } + + update.nonce = Some(ContractNonce(nonce)); + } + + if let Some(class_hash) = class_hash.map(|x| ClassHash(x.0)) { + match current_count.checked_sub(1) { + Some(x) => current_count = x, + None => { + tracing::debug!(%peer, %start, "Too many deployed contracts"); + // TODO punish the peer + + // We can only get here in case of the last block, which means that the stream should be terminated + debug_assert!(start == stop); + break 'outer; + } + } + + update.class = + Some(ContractClassUpdate::Deploy(class_hash)); + } + } + } + StateDiffsResponse::DeclaredClass(DeclaredClass { + class_hash, + compiled_class_hash, + }) => { + if let Some(compiled_class_hash) = compiled_class_hash { + state_diff.declared_sierra_classes.insert( + SierraHash(class_hash.0), + CasmHash(compiled_class_hash.0), + ); + } else { + state_diff + .declared_cairo_classes + .insert(ClassHash(class_hash.0)); + } + + match current_count.checked_sub(1) { + Some(x) => current_count = x, + None => { + tracing::debug!(%peer, %start, "Too many declared classes"); + // TODO punish the peer + + // We can only get here in case of the last block, which means that the stream should be terminated + debug_assert!(start == stop); + break 'outer; + } + } + } + StateDiffsResponse::Fin => { + if current_count == 0 { + if start == stop { + // We're done, terminate the stream + break 'outer; + } + } else { + tracing::debug!(%peer, "Premature state diff stream Fin"); + // TODO punish the peer + continue 'next_peer; + } + } + }; + + if current_count == 0 { + // All the counters for this block have been exhausted which means + // that the state update for this block is complete. + tracing::trace!(block_number=%start, "State diff received for block"); + + yield PeerData::new( + peer, + ( + UnverifiedStateUpdateData { + expected_commitment: std::mem::take(&mut current_commitment), + state_diff: std::mem::take(&mut state_diff), + }, + start + ) + ); + + if start < stop { + // Move to the next block + start += 1; + tracing::trace!(next_block=%start, "Moving to next block"); + let (count, commitment) = state_diff_length_and_commitment_stream.next().await + .ok_or_else(|| anyhow::anyhow!("Contract update counts stream terminated prematurely at block {start}")) + .map_err(peer_err)? + .map_err(peer_err)?; + current_count = count; + current_count_outer = Some(current_count); + current_commitment = commitment; + + tracing::trace!(number=%current_count, "Expecting state diff responses"); + } + } + } + + // TODO punish the peer + // If we reach here, the peer did not send a Fin, so the counter for the current block should be reset + // and we should start from the current block again but from the next peer. + tracing::debug!(%peer, "Fin missing"); + } + } + } + } +} + +pub fn make_class_definition_stream( + mut start: BlockNumber, + stop: BlockNumber, + declared_class_counts_stream: impl futures::Stream>, + get_peers: impl Fn() -> PF, + send_request: impl Fn(PeerId, ClassesRequest) -> RF, +) -> impl futures::Stream, PeerData>> +where + PF: std::future::Future>, + RF: std::future::Future< + Output = anyhow::Result>, + >, +{ + tracing::trace!(?start, ?stop, "Streaming classes"); + + async_stream::try_stream! { + pin_mut!(declared_class_counts_stream); + + let mut current_count_outer = None; + + if start <= stop { + // Loop which refreshes peer set once we exhaust it. + 'outer: loop { + let peers = get_peers().await; + + // Attempt each peer. + 'next_peer: for peer in peers { + let peer_err = |e: anyhow::Error| PeerData::new(peer, e); + let limit = stop.get() - start.get() + 1; + + let request = ClassesRequest { + iteration: Iteration { + start: start.get().into(), + direction: Direction::Forward, + limit, + step: 1.into(), + }, + }; + + let mut responses = + match send_request(peer, request).await { + Ok(x) => x, + Err(error) => { + // Failed to establish connection, try next peer. + tracing::debug!(%peer, reason=%error, "Classes request failed"); + continue 'next_peer; + } + }; + + let mut current_count = match current_count_outer { + // Still the same block + Some(backup) => backup, + // Move to the next block + None => { + let x = declared_class_counts_stream.next().await + .ok_or_else(|| anyhow::anyhow!("Declared class counts stream terminated prematurely at block {start}")) + .map_err(peer_err)? + .map_err(peer_err)?; + current_count_outer = Some(x); + x + } + }; + + while start <= stop { + tracing::trace!(block_number=%start, expected_classes=%current_count, "Expecting class definition responses"); + + let mut class_definitions = Vec::new(); + + while current_count > 0 { + if let Some(class_definition) = responses.next().await { + match class_definition { + ClassesResponse::Class(p2p_proto::class::Class::Cairo0 { + class, + domain: _, + }) => { + let CairoDefinition(definition) = + CairoDefinition::try_from_dto(class).map_err(peer_err)?; + class_definitions.push(ClassDefinition::Cairo { + block_number: start, + definition, + }); + } + ClassesResponse::Class(p2p_proto::class::Class::Cairo1 { + class, + domain: _, + }) => { + let definition = SierraDefinition::try_from_dto(class).map_err(peer_err)?; + class_definitions.push(ClassDefinition::Sierra { + block_number: start, + sierra_definition: definition.0, + }); + } + ClassesResponse::Fin => { + tracing::debug!(%peer, "Received FIN, continuing with next peer"); + continue 'next_peer; + } + } + + current_count -= 1; + } else { + // Stream closed before receiving all expected classes + tracing::debug!(%peer, "Premature class definition stream termination"); + // TODO punish the peer + continue 'next_peer; + } + } + + tracing::trace!(block_number=%start, "All classes received for block"); + + for class_definition in class_definitions { + yield PeerData::new( + peer, + class_definition, + ); + } + + if start == stop { + break 'outer; + } + + start += 1; + current_count = declared_class_counts_stream.next().await + .ok_or_else(|| anyhow::anyhow!("Declared class counts stream terminated prematurely at block {start}")) + .map_err(peer_err)? + .map_err(peer_err)?; + current_count_outer = Some(current_count); + + tracing::trace!(block_number=%start, expected_classes=%current_count, "Expecting class definition responses"); + } + + break 'outer; + } + } + } + } +} + +pub fn make_event_stream( + mut start: BlockNumber, + stop: BlockNumber, + event_counts_stream: impl futures::Stream>, + get_peers: impl Fn() -> PF, + send_request: impl Fn(PeerId, EventsRequest) -> RF, +) -> impl futures::Stream, PeerData>> +where + PF: std::future::Future>, + RF: std::future::Future< + Output = anyhow::Result>, + >, +{ + tracing::trace!(?start, ?stop, "Streaming events"); + + async_stream::try_stream! { + pin_mut!(event_counts_stream); + + let mut current_count_outer = None; + + if start <= stop { + // Loop which refreshes peer set once we exhaust it. + 'outer: loop { + let peers = get_peers().await; + + // Attempt each peer. + 'next_peer: for peer in peers { + let peer_err = |e: anyhow::Error| PeerData::new(peer, e); + let limit = stop.get() - start.get() + 1; + + let request = EventsRequest { + iteration: Iteration { + start: start.get().into(), + direction: Direction::Forward, + limit, + step: 1.into(), + }, + }; + + let mut responses = + match send_request(peer, request).await { + Ok(x) => x, + Err(error) => { + // Failed to establish connection, try next peer. + tracing::debug!(%peer, reason=%error, "Events request failed"); + continue 'next_peer; + } + }; + + // Maintain the current transaction hash to group events by transaction + // This grouping is TRUSTED for pre 0.13.2 Starknet blocks. + let mut current_txn_hash = None; + let mut current_count = match current_count_outer { + // Still the same block + Some(backup) => backup, + // Move to the next block + None => { + let x = event_counts_stream.next().await + .ok_or_else(|| anyhow::anyhow!("Event counts stream terminated prematurely at block {start}")) + .map_err(peer_err)? + .map_err(peer_err)?; + current_count_outer = Some(x); + x + } + }; + + while start <= stop { + tracing::trace!(block_number=%start, expected_responses=%current_count, "Expecting event responses"); + + let mut events: Vec<(TransactionHash, Vec)> = Vec::new(); + + while current_count > 0 { + if let Some(response) = responses.next().await { + match response { + EventsResponse::Event(event) => { + let txn_hash = TransactionHash(event.transaction_hash.0); + let event = Event::try_from_dto(event).map_err(peer_err)?; + + match current_txn_hash { + Some(x) if x == txn_hash => { + // Same transaction + events.last_mut().expect("not empty").1.push(event); + } + None | Some(_) => { + // New transaction + events.push((txn_hash, vec![event])); + current_txn_hash = Some(txn_hash); + } + } + } + EventsResponse::Fin => { + tracing::debug!(%peer, "Received FIN, continuing with next peer"); + continue 'next_peer; + } + }; + + current_count -= 1; + } else { + // Stream closed before receiving all expected events for this block + tracing::debug!(%peer, block_number=%start, "Premature event stream termination"); + // TODO punish the peer + continue 'next_peer; + } + } + + tracing::trace!(block_number=%start, "All events received for block"); + + yield PeerData::new( + peer, + (start, std::mem::take(&mut events)), + ); + + if start == stop { + break 'outer; + } + + start += 1; + current_count = event_counts_stream.next().await + .ok_or_else(|| anyhow::anyhow!("Event counts stream terminated prematurely at block {start}")) + .map_err(peer_err)? + .map_err(peer_err)?; + current_count_outer = Some(current_count); + + tracing::trace!(next_block=%start, expected_responses=%current_count, "Moving to next block"); + } + + break 'outer; + } + } + } + } +} + +#[derive(Clone, Debug)] +struct PeersWithCapability { + set: HashMap>, + last_update: std::time::Instant, + timeout: Duration, +} + +impl PeersWithCapability { + pub fn new(timeout: Duration) -> Self { + Self { + set: Default::default(), + last_update: std::time::Instant::now(), + timeout, + } + } + + /// Does not clear if elapsed, instead the caller is expected to call + /// [`Self::update`] + pub fn get(&self, capability: &str) -> Option<&HashSet> { + if self.last_update.elapsed() > self.timeout { + None + } else { + self.set.get(capability) + } + } + + pub fn update(&mut self, capability: &str, peers: HashSet) { + self.last_update = std::time::Instant::now(); + self.set.insert(capability.to_owned(), peers); + } +} + +impl Default for PeersWithCapability { fn default() -> Self { Self::new(Duration::from_secs(60)) } } -#[derive(Clone, Default, Debug, PartialEq, Eq, Dummy)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Dummy)] pub struct Receipt { pub actual_fee: Fee, pub execution_resources: ExecutionResources, @@ -1372,7 +1511,7 @@ pub struct UnverifiedTransactionData { pub type UnverifiedTransactionDataWithBlockNumber = (UnverifiedTransactionData, BlockNumber); /// For a single block -#[derive(Clone, Debug)] +#[derive(Clone, PartialEq, Dummy, TaggedDebug)] pub struct UnverifiedStateUpdateData { pub expected_commitment: StateDiffCommitment, pub state_diff: StateUpdateData, @@ -1515,252 +1654,3 @@ impl std::fmt::Display for ClassDefinitionsError { } } } - -#[cfg(test)] -mod tests { - use std::collections::VecDeque; - - use fake::{Fake, Faker}; - use futures::channel::mpsc; - use futures::{stream, SinkExt, TryStreamExt}; - use rstest::rstest; - use tagged::Tagged; - use tagged_debug_derive::TaggedDebug; - use tokio::sync::Mutex; - - use super::*; - use crate::client::conv::ToDto; - - #[derive(Clone, PartialEq, TaggedDebug)] - struct TestPeer(PeerId); - - #[derive(Clone, Dummy, PartialEq, TaggedDebug)] - struct TestTxn((TransactionVariant, Receipt)); - - fn peer(tag: i32) -> TestPeer { - Tagged::::get(format!("peer {tag}"), || TestPeer(PeerId::random())).data - } - - fn txn(tag: i32, transaction_index: u64) -> TestTxn { - let x = Tagged::get(format!("txn {tag}"), || { - let mut x = Faker.fake::(); - x.0 .1.transaction_index = TransactionIndex::new_or_panic(transaction_index); - x - }); - x.data - } - - type TestResponse = Result<(TestPeer, Vec, Option), TestPeer>; - - use TransactionsResponse::Fin; - - #[rstest] - #[case::one_peer_1_block( - // Number of blocks - 1, - // Simulated responses from peers - vec![Ok((peer(0), vec![txn(0, 0), txn(1, 1)], Some(Fin)))], - // Expected number of transactions per block - vec![2], - // Expected stream of (peer_id, transactions_for_block) - vec![Ok((peer(0), vec![txn(0, 0), txn(1, 1)]))] - )] - #[case::one_peer_2_blocks( - // Peer gives responses for all blocks in one go - 2, - vec![Ok((peer(0), vec![txn(4, 0), txn(5, 0)], Some(Fin)))], - vec![1, 1], - vec![ - Ok((peer(0), vec![txn(4, 0)])), // block 0 - Ok((peer(0), vec![txn(5, 0)])) // block 1 - ] - )] - #[case::one_peer_2_blocks_in_2_attempts( - // Peer gives a response for the second block after a retry - 2, - vec![ - Ok((peer(0), vec![txn(6, 0)], Some(Fin))), - Ok((peer(0), vec![txn(7, 0)], Some(Fin))) - ], - vec![1, 1], - vec![ - Ok((peer(0), vec![txn(6, 0)])), - Ok((peer(0), vec![txn(7, 0)])) - ] - )] - #[case::two_peers_1_block_per_peer( - 2, - vec![ - Ok((peer(0), vec![txn(8, 0)], Some(Fin))), - Ok((peer(1), vec![txn(9, 0)], Some(Fin))) - ], - vec![1, 1], - vec![ - Ok((peer(0), vec![txn(8, 0)])), - Ok((peer(1), vec![txn(9, 1)])) - ] - )] - #[case::first_peer_premature_eos_with_fin( - 2, - vec![ - // First peer gives full block 0 and half of block 1 - Ok((peer(0), vec![txn(10, 0), txn(11, 0)], Some(Fin))), - Ok((peer(1), vec![txn(11, 0), txn(12, 1)], Some(Fin))) - ], - vec![1, 2], - vec![ - Ok((peer(0), vec![txn(10, 0)])), - Ok((peer(1), vec![txn(11, 0), txn(12, 1)])) - ] - )] - #[case::first_peer_all_txns_in_block_but_no_fin( - 2, - vec![ - // First peer gives full block 0 but no fin - Ok((peer(0), vec![txn(13, 0)], None)), - Ok((peer(1), vec![txn(14, 0)], Some(Fin))) - ], - vec![1, 1], - vec![ - // We assume this block 0 could be correct - Ok((peer(0), vec![txn(13, 0)])), // block 0 - Ok((peer(1), vec![txn(14, 0)])) // block 1 - ] - )] - // The same as above but the first peer gives half of the second block before closing the stream - #[case::first_peer_half_txns_in_block_but_no_fin( - 2, - vec![ - // First peer gives full block 0 and partial block 1 but no fin - Ok((peer(0), vec![txn(15, 0), txn(16, 0)], None)), - Ok((peer(1), vec![txn(16, 0), txn(17, 1)], Some(Fin))) - ], - vec![1, 2], - vec![ - // We assume this block could be correct so we move to the next one - Ok((peer(0), vec![txn(15, 0)])), // block 0 - Ok((peer(1), vec![txn(16, 0), txn(17, 1)])) // block 1 - ] - )] - #[case::count_steam_is_too_short( - 2, - vec![ - // 2 blocks in responses - Ok((peer(0), vec![txn(18, 0)], Some(Fin))), - Ok((peer(0), vec![txn(19, 0)], Some(Fin))) - ], - vec![1], // but only 1 block provided in the count stream - vec![ - Ok((peer(0), vec![txn(18, 0)])), - Err(peer(0)) // the second block is not processed - ] - )] - #[case::response_fails( - 2, - vec![ - Ok((peer(0), vec![txn(20, 0)], Some(Fin))), - Err(peer(0)), - Ok((peer(1), vec![txn(21, 0)], Some(Fin))), - ], - vec![1, 1], - vec![ - Ok((peer(0), vec![txn(20, 0)])), - Ok((peer(1), vec![txn(21, 0)])), - ] - )] - #[test_log::test(tokio::test)] - async fn make_transaction_stream( - #[case] num_blocks: usize, - #[case] responses: Vec, - #[case] num_txns_per_block: Vec, - #[case] expected_stream: Vec), TestPeer>>, - ) { - let _ = env_logger::builder().is_test(true).try_init(); - - let peers = responses - .iter() - .map(|r| match r { - Ok((p, _, _)) => p.0, - Err(p) => p.0, - }) - .collect::>(); - let responses = Arc::new(Mutex::new( - responses - .into_iter() - .map(|r| r.map(|(_, txns, fin)| (txns, fin))) - .collect::>(), - )); - - let get_peers = || async { peers.clone() }; - - let send_request = |peer: PeerId, req: TransactionsRequest| { - let p = TestPeer(peer); - - tracing::trace!(peer=?p, ?req, "Got request"); - - async { - let mut guard = responses.lock().await; - match guard.pop_front() { - Some(Ok((mut txns, fin))) => { - txns.iter_mut() - .enumerate() - .for_each(|(i, TestTxn((_, r)))| { - r.transaction_index = TransactionIndex::new_or_panic(i as u64); - }); - - let (mut tx, rx) = mpsc::channel(txns.len() + 1); - for TestTxn((t, r)) in txns { - tx.send(TransactionsResponse::TransactionWithReceipt( - TransactionWithReceipt { - receipt: (&t, r).to_dto(), - transaction: t.to_dto(), - }, - )) - .await - .unwrap(); - } - if fin.is_some() { - tx.send(TransactionsResponse::Fin).await.unwrap(); - } - Ok(rx) - } - Some(Err(_)) => Err(anyhow::anyhow!("peer failed")), - None => { - panic!("fix your assumed responses") - } - } - } - }; - - let start = BlockNumber::GENESIS; - let stop = start + (num_blocks - 1) as u64; - - let actual = super::make_transaction_stream( - start, - stop, - stream::iter( - num_txns_per_block - .into_iter() - .map(|x| Ok((x, Default::default()))), - ), - get_peers, - send_request, - ) - .map_ok(|x| { - ( - TestPeer(x.peer), - x.data - .0 - .transactions - .into_iter() - .map(TestTxn) - .collect::>(), - ) - }) - .map_err(|x| TestPeer(x.peer)) - .collect::>() - .await; - - pretty_assertions_sorted::assert_eq!(actual, expected_stream); - } -} diff --git a/crates/p2p/src/client/peer_agnostic/fixtures.rs b/crates/p2p/src/client/peer_agnostic/fixtures.rs new file mode 100644 index 0000000000..9be90bf90c --- /dev/null +++ b/crates/p2p/src/client/peer_agnostic/fixtures.rs @@ -0,0 +1,330 @@ +use std::collections::VecDeque; +use std::sync::Arc; + +use fake::{Dummy, Fake, Faker}; +use futures::channel::mpsc; +use futures::SinkExt; +use libp2p::PeerId; +use p2p_proto::class::{Class, ClassesResponse}; +use p2p_proto::common::{Address, Hash, VolitionDomain}; +use p2p_proto::event::EventsResponse; +use p2p_proto::state::{ContractDiff, ContractStoredValue, DeclaredClass, StateDiffsResponse}; +use p2p_proto::transaction::{TransactionWithReceipt, TransactionsResponse}; +use pathfinder_common::event::Event; +use pathfinder_common::state_update::{ContractClassUpdate, ContractUpdate, StateUpdateData}; +use pathfinder_common::transaction::TransactionVariant; +use pathfinder_common::{ + BlockNumber, + CasmHash, + ClassHash, + ContractAddress, + SierraHash, + TransactionHash, + TransactionIndex, +}; +use tagged::Tagged; +use tagged_debug_derive::TaggedDebug; +use tokio::sync::Mutex; + +use super::{ClassDefinition, UnverifiedStateUpdateData}; +use crate::client::conv::{CairoDefinition, SierraDefinition, ToDto, TryFromDto}; +use crate::client::peer_agnostic::Receipt; + +#[derive(Clone, PartialEq, TaggedDebug)] +pub struct TestPeer(pub PeerId); + +#[derive(Clone, Dummy, PartialEq, TaggedDebug)] +pub struct TestTxn { + pub t: TransactionVariant, + pub r: Receipt, +} + +#[derive(Copy, Clone, Dummy, PartialEq, TaggedDebug)] +pub struct TaggedTransactionHash(pub TransactionHash); + +pub type TaggedEventsForBlockByTransaction = + (BlockNumber, Vec<(TaggedTransactionHash, Vec)>); + +impl TestTxn { + pub fn new((t, r): (TransactionVariant, Receipt)) -> Self { + Self { t, r } + } +} + +pub fn peer(tag: i32) -> TestPeer { + tagged::init(); + Tagged::::get(format!("peer {tag}"), || TestPeer(PeerId::random())) + .unwrap() + .data +} + +#[allow(clippy::type_complexity)] +pub fn unzip_fixtures( + responses: Vec), TestPeer>>, +) -> (Vec, Arc, TestPeer>>>>) { + let peers = responses + .iter() + .map(|r| match r { + Ok((p, _)) => p.0, + Err(p) => p.0, + }) + .collect::>(); + let responses = Arc::new(Mutex::new( + responses + .into_iter() + .map(|r| r.map(|(_, responses)| responses)) + .collect::>(), + )); + (peers, responses) +} + +#[allow(clippy::type_complexity)] +pub async fn send_request( + responses: Arc, TestPeer>>>>, +) -> anyhow::Result> { + let mut guard = responses.lock().await; + match guard.pop_front() { + Some(Ok(responses)) => { + let (mut tx, rx) = mpsc::channel(responses.len() + 1); + for r in responses { + tx.send(r).await.unwrap(); + } + Ok(rx) + } + Some(Err(_)) => Err(anyhow::anyhow!("peer failed")), + None => { + panic!("fix your assumed responses") + } + } +} + +pub fn txn_resp(tag: i32, transaction_index: u64) -> TransactionsResponse { + let TestTxn { t, r } = txn(tag, transaction_index); + let resp = TransactionsResponse::TransactionWithReceipt(TransactionWithReceipt { + receipt: (&t, r).to_dto(), + transaction: t.to_dto(), + }); + Tagged::get(format!("txn resp {tag}"), || resp) + .unwrap() + .data +} + +pub fn txn(tag: i32, transaction_index: u64) -> TestTxn { + Tagged::get(format!("txn {tag}"), || { + let mut x = Faker.fake::(); + x.r.transaction_index = TransactionIndex::new_or_panic(transaction_index); + x + }) + .unwrap() + .data +} + +pub fn contract_diff(tag: i32) -> StateDiffsResponse { + let sd = state_diff(tag).state_diff; + let (a, u) = sd + .contract_updates + .into_iter() + .chain(sd.system_contract_updates.into_iter().map(|(a, u)| { + ( + a, + ContractUpdate { + storage: u.storage, + ..Default::default() + }, + ) + })) + .next() + .unwrap(); + StateDiffsResponse::ContractDiff( + Tagged::get(format!("contract diff response {tag}"), || ContractDiff { + address: Address(a.0), + nonce: u.nonce.map(|x| x.0), + class_hash: u.class.map(|x| Hash(x.class_hash().0)), + values: u + .storage + .into_iter() + .map(|(k, v)| ContractStoredValue { + key: k.0, + value: v.0, + }) + .collect(), + domain: VolitionDomain::L1, + }) + .unwrap() + .data, + ) +} + +pub fn declared_class(tag: i32) -> StateDiffsResponse { + let sd = state_diff(tag).state_diff; + let (class_hash, compiled_class_hash) = sd + .declared_sierra_classes + .into_iter() + .map(|(s, c)| (Hash(s.0), Some(Hash(c.0)))) + .chain( + sd.declared_cairo_classes + .into_iter() + .map(|c| (Hash(c.0), None)), + ) + .next() + .unwrap(); + + StateDiffsResponse::DeclaredClass( + Tagged::get(format!("declared class {tag}"), || DeclaredClass { + class_hash, + compiled_class_hash, + }) + .unwrap() + .data, + ) +} + +pub fn state_diff(tag: i32) -> UnverifiedStateUpdateData { + let (declared_cairo_classes, declared_sierra_classes) = match Faker.fake::>() { + Some(x) => ([].into(), [(SierraHash(Faker.fake()), x)].into()), + None => ([ClassHash(Faker.fake())].into(), [].into()), + }; + let (contract_updates, system_contract_updates) = if Faker.fake() { + ( + [( + Faker.fake(), + ContractUpdate { + storage: Faker.fake(), + class: Some(ContractClassUpdate::Deploy(Faker.fake())), + nonce: Faker.fake(), + }, + )] + .into(), + [].into(), + ) + } else { + ([].into(), [(ContractAddress::ONE, Faker.fake())].into()) + }; + Tagged::get(format!("state diff {tag}"), || UnverifiedStateUpdateData { + expected_commitment: Default::default(), + state_diff: StateUpdateData { + contract_updates, + system_contract_updates, + declared_cairo_classes, + declared_sierra_classes, + }, + }) + .unwrap() + .data +} + +pub fn len(tag: i32) -> usize { + state_diff(tag).state_diff.state_diff_length() +} + +pub fn surplus_storage() -> StateDiffsResponse { + StateDiffsResponse::ContractDiff(ContractDiff { + address: Faker.fake(), + nonce: None, + class_hash: None, + values: vec![Faker.fake()], // Must not be empty + domain: Faker.fake(), + }) +} + +pub fn surplus_nonce() -> StateDiffsResponse { + StateDiffsResponse::ContractDiff(ContractDiff { + address: Faker.fake(), + nonce: Some(Faker.fake()), + class_hash: None, + values: vec![], + domain: Faker.fake(), + }) +} + +pub fn surplus_class() -> StateDiffsResponse { + StateDiffsResponse::ContractDiff(ContractDiff { + address: Faker.fake(), + nonce: None, + class_hash: Some(Faker.fake()), + values: vec![], + domain: Faker.fake(), + }) +} + +pub fn class_resp(tag: i32) -> ClassesResponse { + use pathfinder_common::class_definition::ClassDefinition; + let c = Tagged::::get(format!("class response {tag}"), || { + let c = Faker.fake::>(); + match c { + ClassDefinition::Sierra(s) => Class::Cairo1 { + class: s.to_dto(), + domain: 0, + }, + ClassDefinition::Cairo(c) => Class::Cairo0 { + class: c.to_dto(), + domain: 0, + }, + } + }) + .unwrap() + .data; + ClassesResponse::Class(c) +} + +pub fn class(tag: i32, block_number: u64) -> ClassDefinition { + let block_number = BlockNumber::new_or_panic(block_number); + match class_resp(tag) { + ClassesResponse::Class(Class::Cairo0 { class, .. }) => { + Tagged::get(format!("class {tag}"), || ClassDefinition::Cairo { + block_number, + definition: CairoDefinition::try_from_dto(class).unwrap().0, + }) + .unwrap() + .data + } + ClassesResponse::Class(Class::Cairo1 { class, .. }) => { + Tagged::get(format!("class {tag}"), || ClassDefinition::Sierra { + block_number, + sierra_definition: SierraDefinition::try_from_dto(class).unwrap().0, + }) + .unwrap() + .data + } + ClassesResponse::Fin => unreachable!(), + } +} + +pub fn event_resp(ev: i32, txn: i32) -> EventsResponse { + let (_, mut v) = events(vec![(vec![ev], txn)], 0); + let t = v[0].0; + let e = v.pop().unwrap().1.pop().unwrap(); + + let e = p2p_proto::event::Event { + transaction_hash: Hash(t.0 .0), + from_address: e.from_address.0, + keys: e.keys.iter().map(|x| x.0).collect(), + data: e.data.iter().map(|x| x.0).collect(), + }; + + EventsResponse::Event( + Tagged::::get(format!("event response {ev}, txn {txn}"), || e) + .unwrap() + .data, + ) +} + +pub fn events( + events_by_txn: Vec<(Vec, i32)>, + block: u64, +) -> TaggedEventsForBlockByTransaction { + let events_by_txn = events_by_txn + .into_iter() + .map(|(evs, txn)| { + let evs = evs + .into_iter() + .map(|ev| Tagged::get_fake(format!("event {ev}")).unwrap().data) + .collect(); + let t = Tagged::::get_fake(format!("txn hash {txn}")) + .unwrap() + .data; + (t, evs) + }) + .collect(); + (BlockNumber::new_or_panic(block), events_by_txn) +} diff --git a/crates/p2p/src/client/peer_agnostic/tests.rs b/crates/p2p/src/client/peer_agnostic/tests.rs new file mode 100644 index 0000000000..8316ad9549 --- /dev/null +++ b/crates/p2p/src/client/peer_agnostic/tests.rs @@ -0,0 +1,672 @@ +use futures::{stream, TryStreamExt}; +use rstest::rstest; +use ClassesResponse::Fin as ClassFin; +use EventsResponse::Fin as EventFin; +use StateDiffsResponse::Fin as SDFin; +use TransactionsResponse::Fin as TxnFin; + +use super::*; +use crate::client::peer_agnostic::fixtures::*; + +#[rstest] +#[case::one_peer_1_block( + 1, + // Simulated responses + // transaction transaction index + // | | + vec![Ok((peer(0), vec![txn_resp(0, 0), txn_resp(1, 1), TxnFin]))], + // Expected number of transactions per block + vec![2], + // Expected stream + // transaction transaction index + // | | + vec![Ok((peer(0), vec![txn(0, 0), txn(1, 1)]))] +)] +#[case::one_peer_2_blocks( + // Peer gives responses for all blocks in one go + 2, + vec![Ok((peer(0), vec![txn_resp(2, 0), txn_resp(3, 0), TxnFin]))], + vec![1, 1], + vec![ + Ok((peer(0), vec![txn(2, 0)])), // block 0 + Ok((peer(0), vec![txn(3, 0)])) // block 1 + ] +)] +#[case::one_peer_2_blocks_in_2_attempts( + // Peer gives a response for the second block after a retry + 2, + vec![ + Ok((peer(0), vec![txn_resp(4, 0), TxnFin])), + Ok((peer(0), vec![txn_resp(5, 0), TxnFin])) + ], + vec![1, 1], + vec![ + Ok((peer(0), vec![txn(4, 0)])), + Ok((peer(0), vec![txn(5, 0)])) + ] +)] +#[case::two_peers_1_block_per_peer( + 2, + vec![ + // Errors are ignored + Err(peer(1)), + Ok((peer(0), vec![txn_resp(6, 0), TxnFin])), + Err(peer(0)), + Ok((peer(1), vec![txn_resp(7, 0), TxnFin])) + ], + vec![1, 1], + vec![ + Ok((peer(0), vec![txn(6, 0)])), + Ok((peer(1), vec![txn(7, 0)])) + ] +)] +#[case::first_peer_premature_eos_with_fin( + 2, + vec![ + // First peer gives full block 0 and half of block 1 + Ok((peer(0), vec![txn_resp(8, 0), txn_resp(9, 0), TxnFin])), + Ok((peer(1), vec![txn_resp(9, 0), txn_resp(10, 1), TxnFin])) + ], + vec![1, 2], + vec![ + Ok((peer(0), vec![txn(8, 0)])), + Ok((peer(1), vec![txn(9, 0), txn(10, 1)])) + ] +)] +#[case::first_peer_full_block_no_fin( + 2, + vec![ + // First peer gives full block 0 but no fin + Ok((peer(0), vec![txn_resp(11, 0)])), + Ok((peer(1), vec![txn_resp(12, 0), TxnFin])) + ], + vec![1, 1], + vec![ + // We assume this block 0 could be correct + Ok((peer(0), vec![txn(11, 0)])), // block 0 + Ok((peer(1), vec![txn(12, 0)])) // block 1 + ] +)] +// The same as above but the first peer gives half of the second block before closing the +// stream +#[case::first_peer_half_block_no_fin( + 2, + vec![ + // First peer gives full block 0 and partial block 1 but no fin + Ok((peer(0), vec![txn_resp(13, 0), txn_resp(14, 0)])), + Ok((peer(1), vec![txn_resp(14, 0), txn_resp(15, 1), TxnFin])) + ], + vec![1, 2], + vec![ + // We assume this block could be correct so we move to the next one + Ok((peer(0), vec![txn(13, 0)])), // block 0 + Ok((peer(1), vec![txn(14, 0), txn(15, 1)])) // block 1 + ] +)] +#[case::count_steam_is_too_short( + 2, + vec![ + // 2 blocks in responses + Ok((peer(0), vec![txn_resp(16, 0), TxnFin])), + Ok((peer(0), vec![txn_resp(17, 0), TxnFin])) + ], + vec![1], // but only 1 block provided in the count stream + vec![ + Ok((peer(0), vec![txn(16, 0)])), + Err(peer(0)) // the second block is not processed + ] +)] +#[case::too_many_responses( + 1, + vec![Ok((peer(0), vec![txn_resp(18, 0), txn_resp(19, 0), TxnFin]))], + vec![1], + vec![Ok((peer(0), vec![txn(18, 0)]))] +)] +#[case::empty_responses_are_ignored( + 1, + vec![ + Ok((peer(0), vec![])), + Ok((peer(1), vec![txn_resp(20, 0), TxnFin])), + Ok((peer(2), vec![])) + ], + vec![1], + vec![Ok((peer(1), vec![txn(20, 0)]))] +)] +#[test_log::test(tokio::test)] +async fn make_transaction_stream( + #[case] num_blocks: usize, + #[case] responses: Vec), TestPeer>>, + #[case] num_txns_per_block: Vec, + #[case] expected_stream: Vec), TestPeer>>, +) { + let _ = env_logger::builder().is_test(true).try_init(); + let (peers, responses) = unzip_fixtures(responses); + let get_peers = || async { peers.clone() }; + let send_request = + |_: PeerId, _: TransactionsRequest| async { send_request(responses.clone()).await }; + + let start = BlockNumber::GENESIS; + let stop = start + (num_blocks - 1) as u64; + + let actual = super::make_transaction_stream( + start, + stop, + stream::iter( + num_txns_per_block + .into_iter() + .map(|x| Ok((x, Default::default()))), + ), + get_peers, + send_request, + ) + .map_ok(|x| { + ( + TestPeer(x.peer), + x.data + .0 + .transactions + .into_iter() + .map(TestTxn::new) + .collect(), + ) + }) + .map_err(|x| TestPeer(x.peer)) + .collect::>() + .await; + + pretty_assertions_sorted::assert_eq!(actual, expected_stream); +} + +#[rstest] +#[case::one_peer_1_block( + 1, + vec![Ok((peer(0), vec![contract_diff(0), declared_class(0), SDFin]))], + vec![len(0)], + vec![Ok((peer(0), state_diff(0)))] +)] +#[case::one_peer_2_blocks( + 2, + vec![Ok((peer(0), vec![contract_diff(1), declared_class(1), contract_diff(2), declared_class(2), SDFin]))], + vec![len(1), len(2)], + vec![ + Ok((peer(0), state_diff(1))), + Ok((peer(0), state_diff(2))) + ] +)] +#[case::one_peer_2_blocks_in_2_attempts( + // Peer gives a response for the second block after a retry + 2, + vec![ + Ok((peer(0), vec![contract_diff(3), declared_class(3), SDFin])), + Ok((peer(0), vec![contract_diff(4), declared_class(4), SDFin])), + ], + vec![len(3), len(4)], + vec![ + Ok((peer(0), state_diff(3))), + Ok((peer(0), state_diff(4))) + ] +)] +#[case::two_peers_1_block_per_peer( + 2, + vec![ + // Errors are ignored + Err(peer(1)), + Ok((peer(0), vec![contract_diff(5), declared_class(5), SDFin])), + Err(peer(0)), + Ok((peer(1), vec![contract_diff(6), declared_class(6), SDFin])), + ], + vec![len(5), len(6)], + vec![ + Ok((peer(0), state_diff(5))), + Ok((peer(1), state_diff(6))) + ] +)] +#[case::first_peer_premature_eos_with_fin( + 2, + vec![ + // First peer gives full block 0 and half of block 1 + Ok((peer(0), vec![contract_diff(7), declared_class(7), contract_diff(8), SDFin])), + Ok((peer(1), vec![contract_diff(8), declared_class(8), SDFin])) + ], + vec![len(7), len(8)], + vec![ + Ok((peer(0), state_diff(7))), + Ok((peer(1), state_diff(8))) + ] +)] +#[case::first_peer_full_block_no_fin( + 2, + vec![ + // First peer gives full block 0 but no fin + Ok((peer(0), vec![contract_diff(9), declared_class(9)])), + Ok((peer(1), vec![contract_diff(10), declared_class(10), SDFin])) + ], + vec![len(9), len(10)], + vec![ + Ok((peer(0), state_diff(9))), + Ok((peer(1), state_diff(10))) + ] +)] +// The same as above but the first peer gives half of the second block before closing the +// stream +#[case::first_peer_half_block_no_fin( + 2, + vec![ + // First peer gives full block 0 and partial block 1 but no fin + Ok((peer(0), vec![contract_diff(11), declared_class(11), contract_diff(12)])), + Ok((peer(1), vec![contract_diff(12), declared_class(12), SDFin])), + ], + vec![len(11), len(12)], + vec![ + Ok((peer(0), state_diff(11))), + Ok((peer(1), state_diff(12))) + ] +)] +#[case::count_steam_is_too_short( + 2, + vec![ + // 2 blocks in responses + Ok((peer(0), vec![contract_diff(13), declared_class(13), SDFin])), + Ok((peer(0), vec![contract_diff(14), declared_class(14), SDFin])) + ], + vec![len(13)], // but only 1 block provided in the count stream + vec![ + Ok((peer(0), state_diff(13))), + Err(peer(0)) // the second block is not processed + ] +)] +#[case::too_many_responses_storage( + 1, + vec![Ok((peer(0), vec![contract_diff(15), declared_class(15), surplus_storage(), SDFin]))], + vec![len(15)], + vec![Ok((peer(0), state_diff(15)))] +)] +#[case::too_many_responses_nonce( + 1, + vec![Ok((peer(0), vec![contract_diff(16), declared_class(16), surplus_nonce(), SDFin]))], + vec![len(16)], + vec![Ok((peer(0), state_diff(16)))] +)] +#[case::too_many_responses_class( + 1, + vec![Ok((peer(0), vec![contract_diff(17), declared_class(17), surplus_class(), SDFin]))], + vec![len(17)], + vec![Ok((peer(0), state_diff(17)))] +)] +#[case::too_many_responses_declaration( + 1, + vec![Ok((peer(0), vec![contract_diff(18), declared_class(18), declared_class(19), SDFin]))], + vec![len(18)], + vec![Ok((peer(0), state_diff(18)))] +)] +#[case::empty_responses_are_ignored( + 1, + vec![ + Ok((peer(0), vec![])), + Ok((peer(1), vec![contract_diff(20), declared_class(20), SDFin])), + Ok((peer(2), vec![])) + ], + vec![len(20)], + vec![Ok((peer(1), state_diff(20)))] +)] +#[test_log::test(tokio::test)] +async fn make_state_diff_stream( + #[case] num_blocks: usize, + #[case] responses: Vec), TestPeer>>, + #[case] state_diff_len_per_block: Vec, + #[case] expected_stream: Vec>, +) { + let _ = env_logger::builder().is_test(true).try_init(); + let (peers, responses) = unzip_fixtures(responses); + let get_peers = || async { peers.clone() }; + let send_request = + |_: PeerId, _: StateDiffsRequest| async { send_request(responses.clone()).await }; + + let start = BlockNumber::GENESIS; + let stop = start + (num_blocks - 1) as u64; + + let actual = super::make_state_diff_stream( + start, + stop, + stream::iter( + state_diff_len_per_block + .into_iter() + .map(|x| Ok((x, Default::default()))), + ), + get_peers, + send_request, + ) + .map_ok(|x| (TestPeer(x.peer), x.data)) + .map_err(|x| TestPeer(x.peer)) + .collect::>() + .await; + + let expected = expected_stream + .into_iter() + .enumerate() + .map(|(i, x)| x.map(|(p, su)| (p, (su, BlockNumber::new_or_panic(i as u64))))) + .collect::>(); + + pretty_assertions_sorted::assert_eq!(actual, expected); +} + +#[rstest] +#[case::one_peer_1_block( + 1, + // class + // | + vec![Ok((peer(0), vec![class_resp(0), ClassFin]))], + vec![1], + // class block + // | | + vec![Ok((peer(0), class(0, 0)))] +)] +#[case::one_peer_2_blocks( + 2, + vec![Ok((peer(0), vec![class_resp(1), class_resp(2), ClassFin]))], + vec![1, 1], + vec![ + Ok((peer(0), class(1, 0))), + Ok((peer(0), class(2, 1))) + ] +)] +#[case::one_peer_2_blocks_in_2_attempts( + // Peer gives a response for the second block after a retry + 2, + vec![ + Ok((peer(0), vec![class_resp(3), class_resp(4), ClassFin])), + Ok((peer(0), vec![class_resp(5), class_resp(6), ClassFin])), + ], + vec![2, 2], + vec![ + Ok((peer(0), class(3, 0))), + Ok((peer(0), class(4, 0))), + Ok((peer(0), class(5, 1))), + Ok((peer(0), class(6, 1))) + ] +)] +#[case::two_peers_1_block_per_peer( + 2, + vec![ + // Errors are ignored + Err(peer(1)), + Ok((peer(0), vec![class_resp(7), ClassFin])), + Err(peer(0)), + Ok((peer(1), vec![class_resp(8), ClassFin])), + ], + vec![1, 1], + vec![ + Ok((peer(0), class(7, 0))), + Ok((peer(1), class(8, 1))) + ] +)] +#[case::first_peer_premature_eos_with_fin( + 2, + vec![ + // First peer gives full block 0 and half of block 1 + Ok((peer(0), vec![class_resp(9), class_resp(10), ClassFin])), + Ok((peer(1), vec![class_resp(10), class_resp(11), ClassFin])) + ], + vec![1, 2], + vec![ + Ok((peer(0), class(9, 0))), + Ok((peer(1), class(10, 1))), + Ok((peer(1), class(11, 1))) + ] +)] +#[case::first_peer_full_block_no_fin( + 2, + vec![ + // First peer gives full block 0 but no fin + Ok((peer(0), vec![class_resp(12), class_resp(13)])), + Ok((peer(1), vec![class_resp(14), ClassFin])) + ], + vec![2, 1], + vec![ + Ok((peer(0), class(12, 0))), + Ok((peer(0), class(13, 0))), + Ok((peer(1), class(14, 1))), + ] +)] +// The same as above but the first peer gives half of the second block before closing the +// stream +#[case::first_peer_half_block_no_fin( + 2, + vec![ + // First peer gives full block 0 and partial block 1 but no fin + Ok((peer(0), vec![class_resp(15), class_resp(16), class_resp(17)])), + Ok((peer(1), vec![class_resp(16), class_resp(17), class_resp(18), ClassFin])), + ], + vec![1, 3], + vec![ + Ok((peer(0), class(15, 0))), + Ok((peer(1), class(16, 1))), + Ok((peer(1), class(17, 1))), + Ok((peer(1), class(18, 1))), + ] +)] +#[case::count_steam_is_too_short( + 2, + vec![ + // 2 blocks in responses + Ok((peer(0), vec![class_resp(19), ClassFin])), + Ok((peer(0), vec![class_resp(20), ClassFin])) + ], + vec![1], // but only 1 block provided in the count stream + vec![ + Ok((peer(0), class(19, 0))), + Err(peer(0)) // the second block is not processed + ] +)] +#[case::too_many_responses_declaration( + 1, + vec![Ok((peer(0), vec![class_resp(21), class_resp(22), ClassFin]))], + vec![1], + vec![Ok((peer(0), class(21, 0)))] +)] +#[case::empty_responses_are_ignored( + 1, + vec![ + Ok((peer(0), vec![])), + Ok((peer(1), vec![class_resp(22), ClassFin])), + Ok((peer(2), vec![])) + ], + vec![1], + vec![Ok((peer(1), class(22, 0)))] +)] +#[test_log::test(tokio::test)] +async fn make_class_definition_stream( + #[case] num_blocks: usize, + #[case] responses: Vec), TestPeer>>, + #[case] declared_classes_per_block: Vec, + #[case] expected_stream: Vec>, +) { + let _ = env_logger::builder().is_test(true).try_init(); + let (peers, responses) = unzip_fixtures(responses); + let get_peers = || async { peers.clone() }; + let send_request = + |_: PeerId, _: ClassesRequest| async { send_request(responses.clone()).await }; + + let start = BlockNumber::GENESIS; + let stop = start + (num_blocks - 1) as u64; + + let actual = super::make_class_definition_stream( + start, + stop, + stream::iter(declared_classes_per_block.into_iter().map(Ok)), + get_peers, + send_request, + ) + .map_ok(|x| (TestPeer(x.peer), x.data)) + .map_err(|x| TestPeer(x.peer)) + .collect::>() + .await; + + pretty_assertions_sorted::assert_eq!(actual, expected_stream); +} + +#[rstest] +#[case::one_peer_1_block( + 1, + // event transaction + // | | + vec![Ok((peer(0), vec![event_resp(0, 0), event_resp(1, 0), event_resp(2, 2), EventFin]))], + vec![3], + // transaction + // events | block + // / \ | | + vec![Ok((peer(0), events(vec![(vec![0, 1], 0), (vec![2], 2)], 0)))] +)] +#[case::one_peer_2_blocks( + 2, + vec![Ok((peer(0), vec![event_resp(3, 3), event_resp(4, 3), event_resp(5, 5), event_resp(6, 6), EventFin]))], + vec![2, 2], + vec![ + Ok((peer(0), events(vec![(vec![3, 4], 3)], 0))), + Ok((peer(0), events(vec![(vec![5], 5), (vec![6], 6)], 1))) + ] +)] +#[case::one_peer_2_blocks_in_2_attempts( + // Peer gives a response for the second block after a retry + 2, + vec![ + Ok((peer(0), vec![event_resp(7, 7), event_resp(8, 8), EventFin])), + Ok((peer(0), vec![event_resp(9, 9), event_resp(10, 9), EventFin])), + ], + vec![2, 2], + vec![ + Ok((peer(0), events(vec![(vec![7], 7), (vec![8], 8)], 0))), + Ok((peer(0), events(vec![(vec![9, 10], 9)], 1))), + ] +)] +#[case::two_peers_1_block_per_peer( + 2, + vec![ + // Errors are ignored + Err(peer(1)), + Ok((peer(0), vec![event_resp(11, 11), event_resp(12, 11), EventFin])), + Err(peer(0)), + Ok((peer(1), vec![event_resp(13, 13), EventFin])), + ], + vec![2, 1], + vec![ + Ok((peer(0), events(vec![(vec![11, 12], 11)], 0))), + Ok((peer(1), events(vec![(vec![13], 13)], 1))), + ] +)] +#[case::first_peer_premature_eos_with_fin( + 2, + vec![ + // First peer gives full block 0 and half of block 1 + Ok((peer(0), vec![event_resp(14, 14), event_resp(15, 14), event_resp(16, 16), EventFin])), + Ok((peer(1), vec![event_resp(16, 16), event_resp(17, 16), EventFin])), + ], + vec![2, 2], + vec![ + Ok((peer(0), events(vec![(vec![14, 15], 14)], 0))), + Ok((peer(1), events(vec![(vec![16, 17], 16)], 1))), + ] +)] +#[case::first_peer_full_block_no_fin( + 2, + vec![ + // First peer gives full block 0 but no fin + Ok((peer(0), vec![event_resp(18, 18)])), + Ok((peer(1), vec![event_resp(19, 19), EventFin])) + ], + vec![1, 1], + vec![ + Ok((peer(0), events(vec![(vec![18], 18)], 0))), + Ok((peer(1), events(vec![(vec![19], 19)], 1))), + ] +)] +// The same as above but the first peer gives half of the second block before closing the +// stream +#[case::first_peer_half_block_no_fin( + 2, + vec![ + // First peer gives full block 0 and partial block 1 but no fin + Ok((peer(0), vec![event_resp(20, 20), event_resp(21, 21), event_resp(22, 22)])), + Ok((peer(1), vec![event_resp(22, 22), event_resp(23, 22), event_resp(24, 22), EventFin])), + ], + vec![2, 3], + vec![ + Ok((peer(0), events(vec![(vec![20], 20), (vec![21], 21)], 0))), + Ok((peer(1), events(vec![(vec![22, 23, 24], 22)], 1))), + ] +)] +#[case::count_steam_is_too_short( + 2, + vec![ + // 2 blocks in responses + Ok((peer(0), vec![event_resp(25, 25), EventFin])), + Ok((peer(0), vec![event_resp(26, 26), EventFin])) + ], + vec![1], // but only 1 block provided in the count stream + vec![ + Ok((peer(0), events(vec![(vec![25], 25)], 0))), + Err(peer(0)) // the second block is not processed + ] +)] +#[case::too_many_responses( + 1, + vec![Ok((peer(0), vec![event_resp(27, 27), event_resp(28, 27), event_resp(29, 27), EventFin]))], + vec![1], + vec![ + Ok((peer(0), events(vec![(vec![27], 27)], 0))), + ] +)] +#[case::empty_responses_are_ignored( + 1, + vec![ + Ok((peer(0), vec![])), + Ok((peer(0), vec![event_resp(30, 30), EventFin])), + Ok((peer(2), vec![])) + ], + vec![1], + vec![ + Ok((peer(0), events(vec![(vec![30], 30)], 0))) + ] +)] +#[test_log::test(tokio::test)] +async fn make_event_stream( + #[case] num_blocks: usize, + #[case] responses: Vec), TestPeer>>, + #[case] events_per_block: Vec, + #[case] expected_stream: Vec>, +) { + let _ = env_logger::builder().is_test(true).try_init(); + let (peers, responses) = unzip_fixtures(responses); + let get_peers = || async { peers.clone() }; + let send_request = + |_: PeerId, _: EventsRequest| async { send_request(responses.clone()).await }; + + let start = BlockNumber::GENESIS; + let stop = start + (num_blocks - 1) as u64; + + let actual = super::make_event_stream( + start, + stop, + stream::iter(events_per_block.into_iter().map(Ok)), + get_peers, + send_request, + ) + .map_ok(|x| { + ( + TestPeer(x.peer), + ( + x.data.0, + x.data + .1 + .into_iter() + .map(|(t, e)| (TaggedTransactionHash(t), e)) + .collect(), + ), + ) + }) + .map_err(|x| TestPeer(x.peer)) + .collect::>() + .await; + + pretty_assertions_sorted::assert_eq!(actual, expected_stream); +} diff --git a/crates/p2p_proto/Cargo.toml b/crates/p2p_proto/Cargo.toml index e2429f8607..823fb970e0 100644 --- a/crates/p2p_proto/Cargo.toml +++ b/crates/p2p_proto/Cargo.toml @@ -19,6 +19,8 @@ prost = { workspace = true } prost-types = { workspace = true } rand = { workspace = true } serde_json = { workspace = true, features = ["raw_value"] } +tagged = { path = "../tagged" } +tagged-debug-derive = { path = "../tagged-debug-derive" } [dev-dependencies] pretty_assertions_sorted = { workspace = true } diff --git a/crates/p2p_proto/src/class.rs b/crates/p2p_proto/src/class.rs index 5a5ed6d9e7..47ea38be20 100644 --- a/crates/p2p_proto/src/class.rs +++ b/crates/p2p_proto/src/class.rs @@ -2,6 +2,8 @@ use std::fmt::Debug; use fake::{Dummy, Fake, Faker}; use pathfinder_crypto::Felt; +use tagged::Tagged; +use tagged_debug_derive::TaggedDebug; use crate::common::Iteration; use crate::{proto, proto_field, ToProtobuf, TryFromProtobuf}; @@ -71,7 +73,7 @@ impl Dummy for Cairo1Class { } } -#[derive(Debug, Clone, PartialEq, Eq, Dummy)] +#[derive(Clone, PartialEq, Eq, Dummy, TaggedDebug)] pub enum Class { Cairo0 { class: Cairo0Class, domain: u32 }, Cairo1 { class: Cairo1Class, domain: u32 }, diff --git a/crates/p2p_proto/src/event.rs b/crates/p2p_proto/src/event.rs index 6e6a4b4f59..16e4116fe8 100644 --- a/crates/p2p_proto/src/event.rs +++ b/crates/p2p_proto/src/event.rs @@ -1,5 +1,7 @@ use fake::Dummy; use pathfinder_crypto::Felt; +use tagged::Tagged; +use tagged_debug_derive::TaggedDebug; use crate::common::{Hash, Iteration}; use crate::{proto, proto_field, ToProtobuf, TryFromProtobuf}; @@ -19,7 +21,7 @@ pub struct EventsRequest { pub iteration: Iteration, } -#[derive(Debug, Default, Clone, PartialEq, Eq, Dummy)] +#[derive(Default, Clone, PartialEq, Eq, Dummy, TaggedDebug)] pub enum EventsResponse { Event(Event), #[default] diff --git a/crates/p2p_proto/src/state.rs b/crates/p2p_proto/src/state.rs index 00c3f57710..055181aebf 100644 --- a/crates/p2p_proto/src/state.rs +++ b/crates/p2p_proto/src/state.rs @@ -2,6 +2,8 @@ use std::fmt::Debug; use fake::Dummy; use pathfinder_crypto::Felt; +use tagged::Tagged; +use tagged_debug_derive::TaggedDebug; use crate::common::{Address, Hash, Iteration, VolitionDomain}; use crate::{proto, proto_field, ToProtobuf, TryFromProtobuf}; @@ -13,7 +15,7 @@ pub struct ContractStoredValue { pub value: Felt, } -#[derive(Debug, Clone, PartialEq, Eq, ToProtobuf, TryFromProtobuf, Dummy)] +#[derive(Clone, PartialEq, Eq, ToProtobuf, TryFromProtobuf, Dummy, TaggedDebug)] #[protobuf(name = "crate::proto::state::ContractDiff")] pub struct ContractDiff { pub address: Address, @@ -25,7 +27,7 @@ pub struct ContractDiff { pub domain: VolitionDomain, } -#[derive(Debug, Clone, PartialEq, Eq, ToProtobuf, TryFromProtobuf, Dummy)] +#[derive(Clone, PartialEq, Eq, ToProtobuf, TryFromProtobuf, Dummy, TaggedDebug)] #[protobuf(name = "crate::proto::state::DeclaredClass")] pub struct DeclaredClass { pub class_hash: Hash, diff --git a/crates/pathfinder/src/p2p_network/sync_handlers.rs b/crates/pathfinder/src/p2p_network/sync_handlers.rs index 7a93525dfb..e0bbd9d557 100644 --- a/crates/pathfinder/src/p2p_network/sync_handlers.rs +++ b/crates/pathfinder/src/p2p_network/sync_handlers.rs @@ -24,18 +24,14 @@ use p2p_proto::state::{ StateDiffsResponse, }; use p2p_proto::transaction::{TransactionWithReceipt, TransactionsRequest, TransactionsResponse}; -use pathfinder_common::{BlockHash, BlockNumber}; +use pathfinder_common::{class_definition, BlockHash, BlockNumber}; use pathfinder_crypto::Felt; use pathfinder_storage::{Storage, Transaction}; -use starknet_gateway_types::class_definition; use tokio::sync::mpsc; -pub mod conv; #[cfg(test)] mod tests; -use self::conv::{cairo_def_into_dto, sierra_def_into_dto}; - #[cfg(not(test))] const MAX_BLOCKS_COUNT: u64 = 100; @@ -239,7 +235,7 @@ fn get_classes_for_block( let cairo_class = serde_json::from_slice::>(&definition)?; Class::Cairo0 { - class: cairo_def_into_dto(cairo_class), + class: cairo_class.to_dto(), domain: 0, // TODO } } @@ -250,7 +246,7 @@ fn get_classes_for_block( let sierra_class = serde_json::from_slice::>(&sierra)?; Class::Cairo1 { - class: sierra_def_into_dto(sierra_class), + class: sierra_class.to_dto(), domain: 0, // TODO } } diff --git a/crates/pathfinder/src/p2p_network/sync_handlers/conv.rs b/crates/pathfinder/src/p2p_network/sync_handlers/conv.rs deleted file mode 100644 index 4377eb2ec6..0000000000 --- a/crates/pathfinder/src/p2p_network/sync_handlers/conv.rs +++ /dev/null @@ -1,79 +0,0 @@ -//! Workaround for the orphan rule - implement conversion fns for types ourside -//! our crate. -use p2p_proto::class::{Cairo0Class, Cairo1Class, Cairo1EntryPoints, EntryPoint, SierraEntryPoint}; -use starknet_gateway_types::class_definition::{Cairo, Sierra}; -use starknet_gateway_types::request::contract::{SelectorAndFunctionIndex, SelectorAndOffset}; - -pub fn sierra_def_into_dto(sierra: Sierra<'_>) -> Cairo1Class { - let into_dto = |x: SelectorAndFunctionIndex| SierraEntryPoint { - selector: x.selector.0, - index: x.function_idx, - }; - - let entry_points = Cairo1EntryPoints { - externals: sierra - .entry_points_by_type - .external - .into_iter() - .map(into_dto) - .collect(), - l1_handlers: sierra - .entry_points_by_type - .l1_handler - .into_iter() - .map(into_dto) - .collect(), - constructors: sierra - .entry_points_by_type - .constructor - .into_iter() - .map(into_dto) - .collect(), - }; - - Cairo1Class { - abi: sierra.abi.to_string(), - program: sierra.sierra_program, - entry_points, - contract_class_version: sierra.contract_class_version.into(), - } -} - -pub fn cairo_def_into_dto(cairo: Cairo<'_>) -> Cairo0Class { - let into_dto = |x: SelectorAndOffset| EntryPoint { - selector: x.selector.0, - offset: u64::from_be_bytes( - x.offset.0.as_be_bytes()[24..] - .try_into() - .expect("slice len matches"), - ), - }; - - let mut gzip_encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast()); - serde_json::to_writer(&mut gzip_encoder, &cairo.program).unwrap(); - let program = gzip_encoder.finish().unwrap(); - let program = base64::encode(program); - - Cairo0Class { - abi: cairo.abi.to_string(), - externals: cairo - .entry_points_by_type - .external - .into_iter() - .map(into_dto) - .collect(), - l1_handlers: cairo - .entry_points_by_type - .l1_handler - .into_iter() - .map(into_dto) - .collect(), - constructors: cairo - .entry_points_by_type - .constructor - .into_iter() - .map(into_dto) - .collect(), - program, - } -} diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index f028c6493f..89b5692141 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -359,11 +359,11 @@ async fn handle_class_stream>>, + stream: impl Stream, PeerData>>, storage: Storage, ) -> Result<(), SyncError> { stream - .map_err(Into::into) + .map_err(|e| e.data.into()) .and_then(|x| events::verify_commitment(x, storage.clone())) .try_chunks(100) .map_err(|e| e.1) @@ -1525,7 +1525,8 @@ mod tests { use crate::state::block_hash::calculate_event_commitment; struct Setup { - pub streamed_events: Vec>>, + pub streamed_events: + Vec, PeerData>>, pub expected_events: Vec)>>, pub storage: Storage, } @@ -1536,7 +1537,7 @@ mod tests { let streamed_events = blocks .iter() .map(|block| { - anyhow::Result::Ok(PeerData::for_tests(( + Result::Ok(PeerData::for_tests(( block.header.header.number, block .transaction_data @@ -1641,7 +1642,9 @@ mod tests { async fn stream_failure() { assert_matches::assert_matches!( handle_event_stream( - stream::once(std::future::ready(Err(anyhow::anyhow!("")))), + stream::once(std::future::ready(Err(PeerData::for_tests( + anyhow::anyhow!("") + )))), StorageBuilder::in_memory().unwrap() ) .await diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index b793c0a44b..e432ff1233 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -8,16 +8,12 @@ use futures::stream::{BoxStream, StreamExt}; use p2p::client::peer_agnostic::ClassDefinition as P2PClassDefinition; use p2p::PeerData; use p2p_proto::transaction; +use pathfinder_common::class_definition::{Cairo, ClassDefinition as GwClassDefinition, Sierra}; use pathfinder_common::state_update::DeclaredClasses; use pathfinder_common::{BlockNumber, CasmHash, ClassHash, SierraHash}; use pathfinder_storage::Storage; use serde_json::de; use starknet_gateway_client::GatewayApi; -use starknet_gateway_types::class_definition::{ - Cairo, - ClassDefinition as GwClassDefinition, - Sierra, -}; use starknet_gateway_types::class_hash::from_parts::{ compute_cairo_class_hash, compute_sierra_class_hash, @@ -443,118 +439,6 @@ impl ProcessStage for Store { } } -pub(super) async fn compile_sierra_to_casm_or_fetch( - peer_data: PeerData, - fgw: SequencerClient, -) -> Result, SyncError> { - let PeerData { - peer, - data: Class { - block_number, - hash, - definition, - }, - } = peer_data; - - let definition = match definition { - ClassDefinition::Cairo(c) => CompiledClassDefinition::Cairo(c), - ClassDefinition::Sierra(sierra_definition) => { - let (casm_definition, sierra_definition) = - tokio::task::spawn_blocking(move || -> (anyhow::Result<_>, _) { - let _span = - tracing::trace_span!("compile_sierra_to_casm_or_fetch", class_hash=%hash) - .entered(); - ( - pathfinder_compiler::compile_to_casm(&sierra_definition) - .context("Compiling Sierra class"), - sierra_definition, - ) - }) - .await - .context("Joining blocking task")?; - - let casm_definition = match casm_definition { - Ok(x) => x, - Err(_) => fgw - .pending_casm_by_hash(hash) - .await - .context("Fetching casm definition from gateway")? - .to_vec(), - }; - - CompiledClassDefinition::Sierra { - sierra_definition, - casm_definition, - } - } - }; - - Ok(PeerData::new( - peer, - CompiledClass { - block_number, - hash, - definition, - }, - )) -} - -pub(super) async fn persist( - storage: Storage, - classes: Vec>, -) -> Result { - tokio::task::spawn_blocking(move || { - let mut connection = storage - .connection() - .context("Creating database connection")?; - let transaction = connection - .transaction() - .context("Creating database transaction")?; - let tail = classes - .last() - .map(|x| x.data.block_number) - .context("No class definitions to persist")?; - - for CompiledClass { - block_number: _, - definition, - hash, - } in classes.into_iter().map(|x| x.data) - { - match definition { - CompiledClassDefinition::Cairo(definition) => { - transaction - .update_cairo_class(hash, &definition) - .context("Updating cairo class definition")?; - } - CompiledClassDefinition::Sierra { - sierra_definition, - casm_definition, - } => { - let casm_hash = transaction - .casm_hash(hash) - .context("Getting casm hash for sierra class")? - .context("Casm hash not found")?; - - transaction - .update_sierra_class( - &SierraHash(hash.0), - &sierra_definition, - &casm_hash, - &casm_definition, - ) - .context("Updating sierra class definition")?; - } - } - } - transaction.commit().context("Committing db transaction")?; - - Ok(tail) - }) - .await - .context("Joining blocking task")? -} - pub struct VerifyClassHashes { pub declarations: BoxStream<'static, DeclaredClasses>, pub tokio_handle: tokio::runtime::Handle, diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 140974498e..8f71bb1a8c 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -15,6 +15,7 @@ use p2p::client::peer_agnostic::{ UnverifiedTransactionData, }; use p2p::PeerData; +use pathfinder_common::class_definition::ClassDefinition; use pathfinder_common::event::Event; use pathfinder_common::receipt::Receipt; use pathfinder_common::state_update::{DeclaredClasses, StateUpdateData}; @@ -35,7 +36,6 @@ use pathfinder_common::{ }; use pathfinder_storage::Storage; use starknet_gateway_client::GatewayApi; -use starknet_gateway_types::class_definition::ClassDefinition; use tokio_stream::wrappers::ReceiverStream; use super::class_definitions::CompiledClass; diff --git a/crates/rpc/src/v02/types/class.rs b/crates/rpc/src/v02/types/class.rs index 115cc981aa..9a1df8fba4 100644 --- a/crates/rpc/src/v02/types/class.rs +++ b/crates/rpc/src/v02/types/class.rs @@ -101,7 +101,7 @@ impl TryFrom fn try_from(c: CairoContractClass) -> Result { use std::collections::HashMap; - use starknet_gateway_types::request::contract::{EntryPointType, SelectorAndOffset}; + use pathfinder_common::class_definition::{EntryPointType, SelectorAndOffset}; let abi = match c.abi { Some(abi) => Some(serde_json::to_value(abi)?), @@ -149,7 +149,7 @@ impl TryFrom fn try_from(c: SierraContractClass) -> Result { use std::collections::HashMap; - use starknet_gateway_types::request::contract::{EntryPointType, SelectorAndFunctionIndex}; + use pathfinder_common::class_definition::{EntryPointType, SelectorAndFunctionIndex}; let mut entry_points: HashMap> = Default::default(); @@ -299,7 +299,7 @@ impl<'de> serde_with::DeserializeAs<'de, u64> for OffsetSerde { } } -impl From for starknet_gateway_types::request::contract::SelectorAndOffset { +impl From for pathfinder_common::class_definition::SelectorAndOffset { fn from(entry_point: ContractEntryPoint) -> Self { Self { selector: pathfinder_common::EntryPoint(entry_point.selector), @@ -461,9 +461,7 @@ pub struct SierraEntryPoint { pub selector: Felt, } -impl From - for starknet_gateway_types::request::contract::SelectorAndFunctionIndex -{ +impl From for pathfinder_common::class_definition::SelectorAndFunctionIndex { fn from(entry_point: SierraEntryPoint) -> Self { Self { function_idx: entry_point.function_idx, diff --git a/crates/storage/src/fake.rs b/crates/storage/src/fake.rs index 15cd2f9f54..1884500874 100644 --- a/crates/storage/src/fake.rs +++ b/crates/storage/src/fake.rs @@ -111,6 +111,7 @@ pub mod init { use pathfinder_common::test_utils::fake_non_empty_with_rng; use pathfinder_common::transaction::Transaction; use pathfinder_common::{ + class_definition, BlockHash, BlockHeader, BlockNumber, @@ -122,7 +123,6 @@ pub mod init { TransactionIndex, }; use rand::Rng; - use starknet_gateway_types::class_definition; use super::Block; diff --git a/crates/tagged/src/lib.rs b/crates/tagged/src/lib.rs index 8ce07a68bf..f4fe3ba5d6 100644 --- a/crates/tagged/src/lib.rs +++ b/crates/tagged/src/lib.rs @@ -1,3 +1,8 @@ +//! Tagging is meant to be used in tests only, but because `[cfg(test)]` cannot +//! be _exported_ we're using the closest build configuration option within the +//! implementation, which is `[cfg(debug_assertions)]`. As an additional safety +//! measure, the [`tagged::init()`](crate::init()) function must be called +//! before using the `tagged::Tagged` type. use std::any::{Any, TypeId}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; @@ -29,33 +34,59 @@ static mut LUTS: Lut = None; type LutGuard = MutexGuard<'static, HashMap>>>; -fn lut() -> LutGuard { +/// - Does nothing outside Debug builds. +/// - This function __must__ be called before using the [`Tagged`] type, and if +/// one then wants to see the tags in `std::fmt::Debug` output for each type +/// that derives `tagged_debug_derive::TaggedDebug`. +/// - You need to make sure to call this function __at least__ once. Subsequent +/// calls will have no effect. +pub fn init() { + #[cfg(debug_assertions)] INIT.call_once(|| { unsafe { LUTS = Some(Default::default()); }; }); +} - unsafe { LUTS.as_ref().unwrap().lock().unwrap() } +fn lut() -> Option { + unsafe { LUTS.as_ref() }.map(|luts| luts.lock().unwrap()) } impl Tagged { - pub fn get T>(tag: U, ctor: C) -> Self { - let mut luts = lut(); - let lut = luts.entry(TypeId::of::()).or_default(); - let tag = tag.to_string(); - let data = lut - .entry(tag.clone()) - .or_insert_with(|| Box::new(ctor())) - .downcast_ref::() - .unwrap() - .clone(); - Self { tag, data } + /// Important + /// + /// Use only in Debug builds after calling [`tagged::init`](`crate::init`). + /// Otherwise this function always returns `None`. + pub fn get T>(tag: U, ctor: C) -> Option { + #[cfg(debug_assertions)] + { + let luts = lut(); + luts.map(|mut luts| { + let lut = luts.entry(TypeId::of::()).or_default(); + let tag = tag.to_string(); + let data = lut + .entry(tag.clone()) + .or_insert_with(|| Box::new(ctor())) + .downcast_ref::() + .unwrap() + .clone(); + Self { tag, data } + }) + } + #[cfg(not(debug_assertions))] + { + None + } } } impl + 'static> Tagged { - pub fn get_fake(tag: U) -> Self { + /// Important + /// + /// Use only in Debug builds after calling [`tagged::init`](`crate::init`). + /// Otherwise this function always returns `None`. + pub fn get_fake(tag: U) -> Option { Self::get(tag, || Faker.fake()) } } @@ -64,29 +95,42 @@ impl + 'static> Tagged { pub struct TypeNotFound; impl Tagged { + /// Important + /// + /// Use only in Debug builds after calling [`tagged::init`](`crate::init`). + /// Otherwise this function will error. pub fn from_data(data: &T) -> Result { - let luts = lut(); - let lut = luts.get(&TypeId::of::()); - - match lut { - Some(lut) => { - let tag = lut - .iter() - .find_map(|(k, v)| { - v.downcast_ref::() - .and_then(|u| (u == data).then_some(k.clone())) + #[cfg(debug_assertions)] + { + let luts = lut().ok_or(TypeNotFound)?; + let lut = luts.get(&TypeId::of::()); + + match lut { + Some(lut) => { + let tag = lut + .iter() + .find_map(|(k, v)| { + v.downcast_ref::() + .and_then(|u| (u == data).then_some(k.clone())) + }) + .unwrap_or("value not found".into()); + + Ok(Self { + tag, + data: data.clone(), }) - .unwrap_or("value not found".into()); - - Ok(Self { - tag, - data: data.clone(), - }) + } + None => Err(TypeNotFound), } - None => Err(TypeNotFound), } + #[cfg(not(debug_assertions))] + Err(TypeNotFound) } + /// Important + /// + /// Use only in Debug builds after calling [`tagged::init`](`crate::init`). + /// Otherwise this function will error. pub fn tag(data: &T) -> Result { Self::from_data(data).map(|tagged| tagged.tag) } @@ -100,19 +144,19 @@ mod tests { use super::*; - #[derive(Clone, Default, Dummy, PartialEq, TaggedDebug)] + #[derive(Clone, Copy, Default, Dummy, PartialEq, TaggedDebug)] struct Unit; - #[derive(Clone, Default, Dummy, PartialEq, TaggedDebug)] + #[derive(Clone, Copy, Default, Dummy, PartialEq, TaggedDebug)] struct Tuple(i32, i32); - #[derive(Clone, Default, Dummy, PartialEq, TaggedDebug)] + #[derive(Clone, Copy, Default, Dummy, PartialEq, TaggedDebug)] struct Struct { a: i32, b: i32, } - #[derive(Clone, Default, Dummy, PartialEq, TaggedDebug)] + #[derive(Clone, Copy, Default, Dummy, PartialEq, TaggedDebug)] enum Enum { #[default] A, @@ -123,7 +167,7 @@ mod tests { }, } - #[derive(Clone, Default, Dummy, PartialEq, TaggedDebug)] + #[derive(Clone, Copy, Default, Dummy, PartialEq, TaggedDebug)] struct Complex { u: Unit, t: Tuple, @@ -131,116 +175,77 @@ mod tests { } #[test] - fn lookup_works_correctly() { - #[derive(Clone, Default, Dummy, PartialEq, TaggedDebug)] - struct Foo { - a: i32, - } - - #[derive(Clone, Default, Debug, Dummy, PartialEq)] - struct Unregistered { - u: u32, - } - - // A tag points to the same value - let foo = Tagged::::get_fake("foo"); - let foo2 = Tagged::::get_fake("foo"); - assert_eq!(foo, foo2); - - // Retagging a cached value - let retagged = Tagged::from_data(&foo.data).unwrap(); - assert_eq!(foo, retagged); - - // Retagging an uncached value - assert_eq!( - Tagged::tag(&Faker.fake::()).unwrap(), - "value not found" - ); - - // Retagging an uncached type fails - assert!(Tagged::from_data(&Faker.fake::()).is_err()); - } - - #[test] - fn shown_tag_in_debug_when_type_found() { - // Types are not registered yet, so show default debugs - assert_eq!(format!("{:?}", Unit), "Unit"); - - assert_eq!(format!("{:?}", Tuple(0, 1)), "Tuple(0, 1)"); - - assert_eq!( - format!("{:?}", Struct { a: 0, b: 1 }), - "Struct { a: 0, b: 1 }" - ); - - assert_eq!(format!("{:?}", Enum::A), "A"); - - assert_eq!(format!("{:?}", Enum::B(0, 1)), "B(0, 1)"); - - assert_eq!( - format!("{:?}", Enum::C { a: 2, b: 3 }), - r#"C { a: 2, b: 3 }"# - ); + fn lookup_and_debugs_work_correctly() { + let unit = Unit; + let tuple = Tuple(0, 1); + let stru = Struct { a: 0, b: 1 }; + let enum_unit = Enum::A; + let enum_tuple = Enum::B(0, 1); + let enum_struct = Enum::C { a: 2, b: 3 }; + let complex = Complex { + u: Unit, + t: Tuple(0, 1), + e: Enum::C { a: 2, b: 3 }, + }; + // Global lut is not initialized yet, so show default debugs + assert_eq!(format!("{:?}", unit), "Unit"); + assert_eq!(format!("{:?}", tuple), "Tuple(0, 1)"); + assert_eq!(format!("{:?}", stru), "Struct { a: 0, b: 1 }"); + assert_eq!(format!("{:?}", enum_unit), "A"); + assert_eq!(format!("{:?}", enum_tuple), "B(0, 1)"); + assert_eq!(format!("{:?}", enum_struct), r#"C { a: 2, b: 3 }"#); assert_eq!( - format!( - "{:?}", - Complex { - u: Unit, - t: Tuple(0, 1), - e: Enum::C { a: 2, b: 3 } - } - ), + format!("{:?}", complex), r#"Complex { u: Unit, t: Tuple(0, 1), e: C { a: 2, b: 3 } }"# ); - // Register types, now they should also display the tag they were created with - assert_eq!( - format!("{:?}", Tagged::::get_fake("unit")), - r#"Tagged { tag: "unit", data: Unit { TAG: "unit" } }"# - ); - - assert_eq!( - format!("{:?}", Tagged::::get("tuple", || Tuple(0, 1))), - r#"Tagged { tag: "tuple", data: Tuple("TAG: tuple", 0, 1) }"# - ); - - assert_eq!( - format!( - "{:?}", - Tagged::::get("struct", || Struct { a: 0, b: 1 }) - ), - r#"Tagged { tag: "struct", data: Struct { TAG: "struct", a: 0, b: 1 } }"# - ); - + // Global lut needs to be initialized to cache values and retrieve them + assert!(Tagged::::get("unit", || unit).is_none()); + assert!(Tagged::::get_fake("complex").is_none()); + assert!(Tagged::from_data(&complex).is_err()); + assert!(Tagged::tag(&complex).is_err()); + + // Init global lut + crate::init(); + + // These types are not registered yet, so still show default debugs + assert_eq!(format!("{:?}", unit), "Unit"); + assert_eq!(format!("{:?}", tuple), "Tuple(0, 1)"); + assert_eq!(format!("{:?}", stru), "Struct { a: 0, b: 1 }"); + assert_eq!(format!("{:?}", enum_unit), "A"); + assert_eq!(format!("{:?}", enum_tuple), "B(0, 1)"); + assert_eq!(format!("{:?}", enum_struct), r#"C { a: 2, b: 3 }"#); assert_eq!( - format!("{:?}", Tagged::::get("enum_unit", || Enum::A)), - r#"Tagged { tag: "enum_unit", data: A { TAG: "enum_unit" } }"# + format!("{:?}", complex), + r#"Complex { u: Unit, t: Tuple(0, 1), e: C { a: 2, b: 3 } }"# ); + // Register types by inserting at least one value per type, now the debugs for + // those values should show the tag they were created with + Tagged::::get_fake("unit"); + Tagged::::get("tuple", || tuple); + Tagged::::get("struct", || stru); + Tagged::::get("enum_unit", || enum_unit); + Tagged::::get("enum_tuple", || enum_tuple); + Tagged::::get("enum_struct", || enum_struct); + Tagged::::get("complex", || complex); + + assert_eq!(format!("{:?}", unit), r#"Unit { TAG: "unit" }"#); + assert_eq!(format!("{:?}", tuple), r#"Tuple("TAG: tuple", 0, 1)"#); assert_eq!( - format!("{:?}", Tagged::::get("enum_tuple", || Enum::B(0, 1))), - r#"Tagged { tag: "enum_tuple", data: B("TAG: enum_tuple", 0, 1) }"# + format!("{:?}", stru), + r#"Struct { TAG: "struct", a: 0, b: 1 }"# ); - + assert_eq!(format!("{:?}", enum_unit), r#"A { TAG: "enum_unit" }"#); + assert_eq!(format!("{:?}", enum_tuple), r#"B("TAG: enum_tuple", 0, 1)"#); assert_eq!( - format!( - "{:?}", - Tagged::::get("enum_struct", || Enum::C { a: 2, b: 3 }) - ), - r#"Tagged { tag: "enum_struct", data: C { TAG: "enum_struct", a: 2, b: 3 } }"# + format!("{:?}", enum_struct), + r#"C { TAG: "enum_struct", a: 2, b: 3 }"# ); - assert_eq!( - format!( - "{:?}", - Tagged::::get("complex", || Complex { - u: Unit, - t: Tuple(0, 1), - e: Enum::C { a: 2, b: 3 } - }) - ), - r#"Tagged { tag: "complex", data: Complex { TAG: "complex", u: Unit { TAG: "unit" }, t: Tuple("TAG: tuple", 0, 1), e: C { TAG: "enum_struct", a: 2, b: 3 } } }"# + format!("{:?}", complex), + r#"Complex { TAG: "complex", u: Unit { TAG: "unit" }, t: Tuple("TAG: tuple", 0, 1), e: C { TAG: "enum_struct", a: 2, b: 3 } }"# ); } }