diff --git a/rust/.env.example b/rust/.env.example deleted file mode 100644 index 3b888555..00000000 --- a/rust/.env.example +++ /dev/null @@ -1,3 +0,0 @@ -HOST="0.0.0.0" -PORT="3000" -APIBARA_API_KEY= diff --git a/rust/Cargo.lock b/rust/Cargo.lock index d2c02cef..16ebfc01 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -5299,6 +5299,7 @@ dependencies = [ "envy", "futures-util", "hyper 0.14.30", + "lazy_static", "opentelemetry 0.24.0", "pragma-utils", "prometheus", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 2a2532ba..0a5d2442 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -73,6 +73,7 @@ axum-macros = { version = "0.4.1" } ya-gcp = { version = "0.11.3", features = ["storage"] } rusoto_s3 = "0.48.0" rusoto_core = "0.48.0" +lazy_static = "1.5.0" # Apibara DNA (indexing) apibara-core = { git = "https://github.com/apibara/dna", rev = "9caa385" } diff --git a/rust/rust-toolchain.toml b/rust/rust-toolchain.toml index cc34bc04..8c9c3a25 100644 --- a/rust/rust-toolchain.toml +++ b/rust/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.80.0" +channel = "1.81.0" components = ["rustfmt", "clippy", "rust-analyzer"] targets = ["wasm32-unknown-unknown"] profile = "minimal" diff --git a/rust/theoros/Cargo.toml b/rust/theoros/Cargo.toml index 667ee0d3..9a9efdca 100644 --- a/rust/theoros/Cargo.toml +++ b/rust/theoros/Cargo.toml @@ -33,5 +33,6 @@ ya-gcp = { workspace = true } rusoto_s3 = { workspace = true } rusoto_core = { workspace = true } alloy = { workspace = true } +lazy_static = { workspace = true } pragma-utils = { path = "../pragma-utils" } diff --git a/rust/theoros/openapi.json b/rust/theoros/openapi.json index 03411612..c557c844 100644 --- a/rust/theoros/openapi.json +++ b/rust/theoros/openapi.json @@ -1 +1,141 @@ -{"openapi":"3.0.3","info":{"title":"theoros","description":"","license":{"name":""},"version":"0.1.0"},"paths":{"/v1/calldata":{"get":{"tags":["crate::handlers::get_calldata"],"operationId":"get_calldata","parameters":[],"responses":{"200":{"description":"Get the calldata","content":{"application/json":{"schema":{"type":"array","items":{"$ref":"#/components/schemas/GetCalldataResponse"}}}}}}}}},"components":{"schemas":{"GetCalldataError":{"type":"string","enum":["InternalServerError","DatabaseConnection"]},"GetCalldataQuery":{"type":"object"},"GetCalldataResponse":{"type":"object","required":["hash"],"properties":{"hash":{"type":"string"}}}},"responses":{"GetCalldataResponse":{"description":"","content":{"application/json":{"schema":{"type":"object","required":["hash"],"properties":{"hash":{"type":"string"}}}}}}}},"tags":[{"name":"theoros","description":"Theoros - the Pragma Consultant"}]} \ No newline at end of file +{ + "openapi": "3.0.3", + "info": { + "title": "theoros", + "description": "", + "license": { + "name": "" + }, + "version": "0.1.0" + }, + "paths": { + "/v1/calldata": { + "get": { + "tags": [ + "crate::handlers::get_data_feeds" + ], + "operationId": "get_data_feeds", + "parameters": [], + "responses": { + "200": { + "description": "Get all the available data feeds", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/GetDataFeedsResponse" + } + } + } + } + } + } + } + }, + "/v1/calldata/{data_feed_id}": { + "get": { + "tags": [ + "crate::handlers::get_calldata" + ], + "operationId": "get_calldata", + "parameters": [], + "responses": { + "200": { + "description": "Constructs the calldata used to update the data feed id specified", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/GetCalldataResponse" + } + } + } + } + } + } + } + } + }, + "components": { + "schemas": { + "GetCalldataError": { + "type": "string", + "enum": [ + "InternalServerError", + "DatabaseConnection" + ] + }, + "GetCalldataQuery": { + "type": "object" + }, + "GetCalldataResponse": { + "type": "object", + "required": [ + "hash" + ], + "properties": { + "hash": { + "type": "string" + } + } + }, + "GetDataFeedsError": { + "type": "string", + "enum": [ + "InternalServerError", + "DatabaseConnection" + ] + }, + "GetDataFeedsQuery": { + "type": "object" + }, + "GetDataFeedsResponse": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "responses": { + "GetCalldataResponse": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "hash" + ], + "properties": { + "hash": { + "type": "string" + } + } + } + } + } + }, + "GetDataFeedsResponse": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "type": "string" + } + } + } + } + } + } + }, + "tags": [ + { + "name": "theoros", + "description": "Theoros - the Pragma Consultant" + } + ] +} \ No newline at end of file diff --git a/rust/theoros/src/config.rs b/rust/theoros/src/config.rs deleted file mode 100644 index a269b33d..00000000 --- a/rust/theoros/src/config.rs +++ /dev/null @@ -1,60 +0,0 @@ -use serde::Deserialize; -use tokio::sync::OnceCell; - -#[derive(Default, Debug, Deserialize, PartialEq, Clone)] -#[serde(rename_all = "lowercase")] -pub enum Mode { - #[default] - Dev, - Production, -} - -#[derive(Default, Debug, Deserialize, Clone)] -pub struct ModeConfig { - mode: Mode, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct ServerConfig { - host: String, - port: u16, -} - -impl Default for ServerConfig { - fn default() -> Self { - Self { host: "0.0.0.0".to_string(), port: 3000 } - } -} - -#[derive(Default, Debug, Deserialize, Clone)] -pub struct Config { - mode: ModeConfig, - server: ServerConfig, -} - -impl Config { - pub fn is_production_mode(&self) -> bool { - self.mode.mode == Mode::Production - } - - pub fn server_host(&self) -> &str { - &self.server.host - } - - pub fn server_port(&self) -> u16 { - self.server.port - } -} - -pub static CONFIG: OnceCell = OnceCell::const_new(); - -async fn init_config() -> Config { - let mode_config = envy::from_env::().unwrap_or_default(); - let server_config = envy::from_env::().unwrap_or_default(); - - Config { mode: mode_config, server: server_config } -} - -pub async fn config() -> &'static Config { - CONFIG.get_or_init(init_config).await -} diff --git a/rust/theoros/src/errors/data_feeds_error.rs b/rust/theoros/src/errors/data_feeds_error.rs new file mode 100644 index 00000000..cef60486 --- /dev/null +++ b/rust/theoros/src/errors/data_feeds_error.rs @@ -0,0 +1,25 @@ +use axum::{http::StatusCode, response::IntoResponse, Json}; +use serde_json::json; +use utoipa::ToSchema; + +#[derive(Debug, thiserror::Error, ToSchema)] +#[allow(unused)] +pub enum GetDataFeedsError { + #[error("internal server error")] + InternalServerError, + #[error("could not establish a connection with the database")] + DatabaseConnection, +} + +impl IntoResponse for GetDataFeedsError { + fn into_response(self) -> axum::response::Response { + let (status, err_msg) = match self { + Self::DatabaseConnection => { + (StatusCode::SERVICE_UNAVAILABLE, "Could not establish a connection with the Database".to_string()) + } + _ => (StatusCode::INTERNAL_SERVER_ERROR, String::from("Internal server error")), + }; + (status, Json(json!({"resource":"Calldata", "message": err_msg, "happened_at" : chrono::Utc::now() }))) + .into_response() + } +} diff --git a/rust/theoros/src/errors/mod.rs b/rust/theoros/src/errors/mod.rs index e2683d2f..7593cae5 100644 --- a/rust/theoros/src/errors/mod.rs +++ b/rust/theoros/src/errors/mod.rs @@ -1,5 +1,7 @@ pub mod app_error; pub mod calldata_error; +pub mod data_feeds_error; pub use app_error::AppError; pub use calldata_error::GetCalldataError; +pub use data_feeds_error::GetDataFeedsError; diff --git a/rust/theoros/src/handlers/get_calldata.rs b/rust/theoros/src/handlers/get_calldata.rs index 2f82db2d..da613a41 100644 --- a/rust/theoros/src/handlers/get_calldata.rs +++ b/rust/theoros/src/handlers/get_calldata.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToResponse, ToSchema}; use crate::errors::GetCalldataError; +use crate::extractors::PathExtractor; use crate::AppState; #[derive(Default, Deserialize, IntoParams, ToSchema)] @@ -16,10 +17,13 @@ pub struct GetCalldataResponse { #[utoipa::path( get, - // TODO: path - path = "/v1/calldata", + path = "/v1/calldata/{data_feed_id}", responses( - (status = 200, description = "Get the calldata", body = [GetCalldataResponse]) + ( + status = 200, + description = "Constructs the calldata used to update the data feed id specified", + body = [GetCalldataResponse] + ) ), params( GetCalldataQuery @@ -27,8 +31,9 @@ pub struct GetCalldataResponse { )] pub async fn get_calldata( State(_state): State, + PathExtractor(data_feed_id): PathExtractor, Query(_params): Query, ) -> Result, GetCalldataError> { - tracing::info!("Received get calldata request"); + tracing::info!("Received get calldata request for feed: {data_feed_id}"); Ok(Json(GetCalldataResponse::default())) } diff --git a/rust/theoros/src/handlers/get_data_feeds.rs b/rust/theoros/src/handlers/get_data_feeds.rs new file mode 100644 index 00000000..3c37817e --- /dev/null +++ b/rust/theoros/src/handlers/get_data_feeds.rs @@ -0,0 +1,32 @@ +use axum::extract::{Query, State}; +use axum::Json; +use serde::{Deserialize, Serialize}; +use utoipa::{IntoParams, ToResponse, ToSchema}; + +use crate::errors::GetDataFeedsError; +use crate::AppState; + +#[derive(Default, Deserialize, IntoParams, ToSchema)] +pub struct GetDataFeedsQuery {} + +#[derive(Debug, Default, Serialize, Deserialize, ToResponse, ToSchema)] +pub struct GetDataFeedsResponse(pub Vec); + +#[utoipa::path( + get, + // TODO: path + path = "/v1/calldata", + responses( + (status = 200, description = "Get all the available data feeds", body = [GetDataFeedsResponse]) + ), + params( + GetDataFeedsQuery + ), +)] +pub async fn get_data_feeds( + State(_state): State, + Query(_params): Query, +) -> Result, GetDataFeedsError> { + tracing::info!("Received get calldata request"); + Ok(Json(GetDataFeedsResponse::default())) +} diff --git a/rust/theoros/src/handlers/mod.rs b/rust/theoros/src/handlers/mod.rs index a9881305..9fd03629 100644 --- a/rust/theoros/src/handlers/mod.rs +++ b/rust/theoros/src/handlers/mod.rs @@ -1,2 +1,3 @@ pub mod get_calldata; +pub mod get_data_feeds; pub mod subscribe_to_calldata; diff --git a/rust/theoros/src/main.rs b/rust/theoros/src/main.rs index 01b68e06..a0397be6 100644 --- a/rust/theoros/src/main.rs +++ b/rust/theoros/src/main.rs @@ -1,69 +1,77 @@ -mod config; mod errors; mod extractors; mod handlers; +mod rpc; mod services; +mod storage; mod types; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use prometheus::Registry; +use starknet::core::types::Felt; +use storage::TheorosStorage; use tracing::Level; +use url::Url; use pragma_utils::{ services::{Service, ServiceGroup}, tracing::init_tracing, }; -use types::StarknetRpc; -use url::Url; -use crate::{ - config::config, - services::{ApiService, IndexerService, MetricsService}, - types::EventStorage, -}; +use rpc::StarknetRpc; +use services::{ApiService, IndexerService, MetricsService}; -// TODO: Config those +// TODO: Everything below here should be configurable, either via CLI or config file. +// See: https://github.com/astraly-labs/pragma-monorepo/issues/17 const APP_NAME: &str = "theoros"; const LOG_LEVEL: Level = Level::INFO; const METRICS_PORT: u16 = 8080; -const EVENTS_MEM_SIZE: usize = 10; - const MADARA_RPC_URL: &str = "https://free-rpc.nethermind.io/sepolia-juno"; -const APIBARA_DNA_URL: &str = "https://mainnet.starknet.a5a.ch"; // TODO: Should be Pragma X DNA url +const APIBARA_DNA_URL: &str = "https://sepolia.starknet.a5a.ch"; // TODO: Should be Pragma X DNA url + +const SERVER_HOST: &str = "0.0.0.0"; +const SERVER_PORT: u16 = 3000; + +const PRAGMA_WRAPPER_CONTRACT_ADDRESS: Felt = Felt::ZERO; +const HYPERLANE_CORE_CONTRACT_ADDRESS: Felt = Felt::ZERO; #[derive(Clone)] pub struct AppState { pub rpc_client: Arc, - pub event_storage: Arc, + pub storage: Arc, pub metrics_registry: Registry, // already wrapped into an Arc } #[tokio::main] #[tracing::instrument] async fn main() -> Result<()> { - dotenvy::dotenv()?; - let config = config().await; - init_tracing(APP_NAME, LOG_LEVEL)?; - let metrics_service = MetricsService::new(false, METRICS_PORT)?; + // New RPC client let rpc_url: Url = MADARA_RPC_URL.parse()?; let rpc_client = StarknetRpc::new(rpc_url); - // TODO: state should contains the rpc_client to interact with a Madara node + // New storage + initialization + let theoros_storage = + TheorosStorage::from_rpc_state(&rpc_client, &PRAGMA_WRAPPER_CONTRACT_ADDRESS, &HYPERLANE_CORE_CONTRACT_ADDRESS) + .await?; + + // Theoros metrics + let metrics_service = MetricsService::new(false, METRICS_PORT)?; + let state = AppState { rpc_client: Arc::new(rpc_client), - event_storage: Arc::new(EventStorage::new(EVENTS_MEM_SIZE)), + storage: Arc::new(theoros_storage), metrics_registry: metrics_service.registry(), }; - let apibara_api_key = std::env::var("APIBARA_API_KEY")?; // TODO: Should be in CLI + let apibara_api_key = std::env::var("APIBARA_API_KEY").context("APIBARA_API_KEY not found.")?; let indexer_service = IndexerService::new(state.clone(), APIBARA_DNA_URL, apibara_api_key)?; - let api_service = ApiService::new(state.clone(), config.server_host(), config.server_port()); + let api_service = ApiService::new(state.clone(), SERVER_HOST, SERVER_PORT); let theoros = ServiceGroup::default().with(metrics_service).with(indexer_service).with(api_service); theoros.start_and_drive_to_end().await?; diff --git a/rust/theoros/src/rpc/hyperlane.rs b/rust/theoros/src/rpc/hyperlane.rs new file mode 100644 index 00000000..e5eb2660 --- /dev/null +++ b/rust/theoros/src/rpc/hyperlane.rs @@ -0,0 +1,18 @@ +use starknet::core::types::Felt; + +#[async_trait::async_trait] +pub trait HyperlaneCalls { + /// Retrieves the announced storage locations as a [Vec] from the + /// hyperlane core contract. + /// The strings are all storage locations path. + async fn get_announced_storage_locations( + &self, + hyperlane_core_address: &Felt, + validators: &[Felt], + ) -> anyhow::Result>; + + /// Retrieves all the announced validators as a [Vec] from the hyperlane + /// core contract. + /// The felts are validators addresses. + async fn get_announced_validators(&self, hyperlane_core_address: &Felt) -> anyhow::Result>; +} diff --git a/rust/theoros/src/rpc/mod.rs b/rust/theoros/src/rpc/mod.rs new file mode 100644 index 00000000..53d6ca55 --- /dev/null +++ b/rust/theoros/src/rpc/mod.rs @@ -0,0 +1,60 @@ +pub mod hyperlane; +pub mod pragma_wrapper; + +pub use hyperlane::*; +pub use pragma_wrapper::*; + +use pragma_utils::conversions::starknet::felt_vec_to_vec_string; +use starknet::{ + core::types::{BlockId, BlockTag, Felt, FunctionCall}, + macros::selector, + providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider}, +}; +use url::Url; + +pub struct StarknetRpc(JsonRpcClient); + +impl StarknetRpc { + pub fn new(rpc_url: Url) -> Self { + Self(JsonRpcClient::new(HttpTransport::new(rpc_url))) + } +} + +#[async_trait::async_trait] +impl HyperlaneCalls for StarknetRpc { + async fn get_announced_storage_locations( + &self, + hyperlane_core_address: &Felt, + validators: &[Felt], + ) -> anyhow::Result> { + let mut calldata = vec![Felt::from(validators.len())]; + calldata.extend(validators); + + let call = FunctionCall { + contract_address: *hyperlane_core_address, + entry_point_selector: selector!("get_announced_storage_locations"), + calldata, + }; + + let response = self.0.call(call, BlockId::Tag(BlockTag::Pending)).await?; + let storage_locations = felt_vec_to_vec_string(&response)?; + Ok(storage_locations) + } + + async fn get_announced_validators(&self, hyperlane_core_address: &Felt) -> anyhow::Result> { + let call = FunctionCall { + contract_address: *hyperlane_core_address, + entry_point_selector: selector!("get_announced_validators"), + calldata: vec![], + }; + let response = self.0.call(call, BlockId::Tag(BlockTag::Pending)).await?; + Ok(response) + } +} + +#[async_trait::async_trait] +impl PragmaWrapperCalls for StarknetRpc { + async fn get_data_feeds(&self, _pragma_wrapper_address: &Felt) -> anyhow::Result> { + Ok(vec![]) + } +} diff --git a/rust/theoros/src/rpc/pragma_wrapper.rs b/rust/theoros/src/rpc/pragma_wrapper.rs new file mode 100644 index 00000000..16182e65 --- /dev/null +++ b/rust/theoros/src/rpc/pragma_wrapper.rs @@ -0,0 +1,8 @@ +use starknet::core::types::Felt; + +#[allow(unused)] +#[async_trait::async_trait] +pub trait PragmaWrapperCalls { + /// Retrieves all the available data feeds from the Pragma oracle Wrapper. + async fn get_data_feeds(&self, pragma_wrapper_address: &Felt) -> anyhow::Result>; +} diff --git a/rust/theoros/src/services/api/docs.rs b/rust/theoros/src/services/api/docs.rs index 028a95d9..9662b5f1 100644 --- a/rust/theoros/src/services/api/docs.rs +++ b/rust/theoros/src/services/api/docs.rs @@ -1,6 +1,7 @@ use std::path::PathBuf; use anyhow::Result; +use serde_json::to_string_pretty; use utoipa::OpenApi; use utoipauto::utoipauto; @@ -15,7 +16,9 @@ pub struct ApiDoc; impl ApiDoc { pub fn generate_openapi_json(output_path: PathBuf) -> Result<()> { - let json = ApiDoc::openapi().to_json()?; + let openapi = ApiDoc::openapi(); + let json = to_string_pretty(&openapi)?; + if let Some(parent) = output_path.parent() { std::fs::create_dir_all(parent)?; } diff --git a/rust/theoros/src/services/api/mod.rs b/rust/theoros/src/services/api/mod.rs index 63cd528e..002d224a 100644 --- a/rust/theoros/src/services/api/mod.rs +++ b/rust/theoros/src/services/api/mod.rs @@ -31,8 +31,7 @@ impl ApiService { #[async_trait::async_trait] impl Service for ApiService { async fn start(&mut self, join_set: &mut JoinSet>) -> anyhow::Result<()> { - // Uncomment this line below in order to generate the OpenAPI specs in the theoros folder - // ApiDoc::generate_openapi_json("./theoros".into())?; + ApiDoc::generate_openapi_json("./theoros".into())?; let host = self.host.to_owned(); let port = self.port; diff --git a/rust/theoros/src/services/api/router.rs b/rust/theoros/src/services/api/router.rs index 91e6c351..99e021d3 100644 --- a/rust/theoros/src/services/api/router.rs +++ b/rust/theoros/src/services/api/router.rs @@ -2,10 +2,12 @@ use axum::http::StatusCode; use axum::response::IntoResponse; use axum::routing::get; use axum::Router; + use utoipa::OpenApi as OpenApiT; use utoipa_swagger_ui::SwaggerUi; use crate::handlers::get_calldata::get_calldata; +use crate::handlers::get_data_feeds::get_data_feeds; use crate::AppState; pub fn api_router(state: AppState) -> Router { @@ -13,7 +15,8 @@ pub fn api_router(state: AppState) -> Router { Router::new() .route("/health", get(health)) .merge(SwaggerUi::new("/v1/docs").url("/v1/docs/openapi.json", open_api)) - .nest("/v1", calldata_route(state.clone())) + .nest("/v1", calldata_routes(state.clone())) + .nest("/v1", data_feeds_routes(state.clone())) .fallback(handler_404) } @@ -25,8 +28,10 @@ async fn handler_404() -> impl IntoResponse { (StatusCode::NOT_FOUND, "The requested resource was not found") } -// TODO: way better route names - -fn calldata_route(state: AppState) -> Router { +fn calldata_routes(state: AppState) -> Router { Router::new().route("/calldata", get(get_calldata)).with_state(state) } + +fn data_feeds_routes(state: AppState) -> Router { + Router::new().route("/data_feeds", get(get_data_feeds).with_state(state)) +} diff --git a/rust/theoros/src/services/indexer/mod.rs b/rust/theoros/src/services/indexer/mod.rs index f937750b..7fde9710 100644 --- a/rust/theoros/src/services/indexer/mod.rs +++ b/rust/theoros/src/services/indexer/mod.rs @@ -1,25 +1,30 @@ use std::str::FromStr; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use apibara_core::{ node::v1alpha2::DataFinality, - starknet::v1alpha2::{Block, Event, Filter, HeaderFilter}, + starknet::v1alpha2::{Block, Event, FieldElement, Filter, HeaderFilter}, }; use apibara_sdk::{configuration, ClientBuilder, Configuration, DataMessage, Uri}; use futures_util::TryStreamExt; -use starknet::core::types::Felt; +use starknet::core::utils::get_selector_from_name; use tokio::task::JoinSet; use pragma_utils::{conversions::apibara::felt_as_apibara_field, services::Service}; -use crate::{types::hyperlane::DispatchEvent, AppState}; +use crate::{ + types::hyperlane::{DispatchEvent, FromStarknetEventData, ValidatorAnnouncementEvent}, + AppState, HYPERLANE_CORE_CONTRACT_ADDRESS, +}; -// TODO: depends on the host machine - should be configurable +// TODO: Everything below here should be configurable, either via CLI or config file. +// See: https://github.com/astraly-labs/pragma-monorepo/issues/17 const INDEXING_STREAM_CHUNK_SIZE: usize = 256; - -// TODO: Config those -const HYPERLANE_CORE_CONTRACT_ADDRESS: Felt = Felt::ZERO; -const DISPATCH_EVENT_SELECTOR: Felt = Felt::ZERO; +lazy_static::lazy_static! { + pub static ref F_HYPERLANE_CORE_CONTRACT_ADDRESS: FieldElement = felt_as_apibara_field(&HYPERLANE_CORE_CONTRACT_ADDRESS); + pub static ref DISPATCH_EVENT_SELECTOR: FieldElement = felt_as_apibara_field(&get_selector_from_name("Dispatch").unwrap()); + pub static ref VALIDATOR_ANNOUNCEMENT_SELECTOR: FieldElement = felt_as_apibara_field(&get_selector_from_name("ValidatorAnnouncement").unwrap()); +} #[derive(Clone)] pub struct IndexerService { @@ -42,20 +47,9 @@ impl Service for IndexerService { } } -// TODO: We will probably need to also index the [ValidatorAnnouncement] events -// from the Hyperlane core contract. -// -// Goal is to track if validators send checkpoints to a new location that isn't tracked -// yet. -// For now, we just do the first call to [get_announced_storage_locations]. - impl IndexerService { pub fn new(state: AppState, apibara_uri: &str, apibara_api_key: String) -> Result { let uri = Uri::from_str(apibara_uri)?; - // TODO: should be a config - let pragma_oracle_contract = felt_as_apibara_field(&HYPERLANE_CORE_CONTRACT_ADDRESS); - // TODO: should be a config - let dispatch_event_selector = felt_as_apibara_field(&DISPATCH_EVENT_SELECTOR); let stream_config = Configuration::::default() .with_finality(DataFinality::DataStatusPending) @@ -64,8 +58,13 @@ impl IndexerService { .with_header(HeaderFilter::weak()) .add_event(|event| { event - .with_from_address(pragma_oracle_contract.clone()) - .with_keys(vec![dispatch_event_selector.clone()]) + .with_from_address(F_HYPERLANE_CORE_CONTRACT_ADDRESS.clone()) + .with_keys(vec![DISPATCH_EVENT_SELECTOR.clone()]) + }) + .add_event(|event| { + event + .with_from_address(F_HYPERLANE_CORE_CONTRACT_ADDRESS.clone()) + .with_keys(vec![VALIDATOR_ANNOUNCEMENT_SELECTOR.clone()]) }) .build() }); @@ -92,7 +91,7 @@ impl IndexerService { match stream.try_next().await { Ok(Some(response)) => self.process_batch(response).await?, Ok(None) => continue, - Err(e) => return Err(anyhow!("Error while streaming indexed batch: {}", e)), + Err(e) => bail!("Error while streaming indexed batch: {}", e), } } } @@ -103,30 +102,44 @@ impl IndexerService { DataMessage::Data { cursor: _, end_cursor: _, finality: _, batch } => { for block in batch { for event in block.events.into_iter().filter_map(|e| e.event) { + if event.from_address.is_none() { + continue; + } self.decode_and_store_event(event).await?; } } } DataMessage::Invalidate { cursor } => match cursor { - Some(c) => { - return Err(anyhow!("Received an invalidate request data at {}", &c.order_key)); - } - None => { - return Err(anyhow!("Invalidate request without cursor provided")); - } + Some(c) => bail!("Received an invalidate request data at {}", &c.order_key), + None => bail!("Invalidate request without cursor provided"), }, DataMessage::Heartbeat => {} } Ok(()) } - /// Converts the [Event] into a [DispatchEvent] and stores it into the event_storage. + /// Decodes a starknet [Event] into either a: + /// * [DispatchEvent] and stores it into the events storage, + /// * [ValidatorAnnouncementEvent] and stores it into the validators storage. async fn decode_and_store_event(&self, event: Event) -> Result<()> { - if event.from_address.is_none() { - return Ok(()); + let event_selector = event.keys.first().context("No event selector")?; + + match event_selector { + selector if selector == &*DISPATCH_EVENT_SELECTOR => { + tracing::info!("Received a DispatchEvent"); + let dispatch_event = DispatchEvent::from_starknet_event_data(event.data.into_iter()) + .context("Failed to parse DispatchEvent")?; + self.state.storage.dispatch_events.add(dispatch_event).await; + } + selector if selector == &*VALIDATOR_ANNOUNCEMENT_SELECTOR => { + tracing::info!("Received a ValidatorAnnouncementEvent"); + let validator_announcement_event = + ValidatorAnnouncementEvent::from_starknet_event_data(event.data.into_iter()) + .context("Failed to parse ValidatorAnnouncementEvent")?; + self.state.storage.validators.add_from_announcement_event(validator_announcement_event).await?; + } + _ => panic!("Unexpected event selector - should never happen."), } - let dispatch_event = DispatchEvent::from_event_data(event.data)?; - self.state.event_storage.add(dispatch_event).await; Ok(()) } } diff --git a/rust/theoros/src/storage/event_storage.rs b/rust/theoros/src/storage/event_storage.rs new file mode 100644 index 00000000..f4557258 --- /dev/null +++ b/rust/theoros/src/storage/event_storage.rs @@ -0,0 +1,63 @@ +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::RwLock; + +const DEFAULT_STORAGE_MAX_SIZE: usize = 16; + +/// FIFO Buffer of fixed size used to store events. +/// The first element is the latest. +#[derive(Debug)] +pub struct EventStorage { + events: Arc>>, + max_size: usize, +} + +impl EventStorage { + /// Creates a new `EventStorage` with the specified maximum size. + pub fn new(max_size: usize) -> Self { + Self { events: Arc::new(RwLock::new(VecDeque::with_capacity(max_size))), max_size } + } + + /// Adds a new event to the front of the queue, removing the oldest if necessary. + pub async fn add(&self, event: T) { + let mut events = self.events.write().await; + events.push_front(event); + if events.len() > self.max_size { + events.pop_back(); + } + } + + /// Returns the latest event, if any. + pub async fn latest(&self) -> Option { + self.events.read().await.front().cloned() + } + + /// Returns all events as a vector. + pub async fn all(&self) -> Vec { + self.events.read().await.iter().cloned().collect() + } +} + +impl Default for EventStorage { + fn default() -> Self { + Self::new(DEFAULT_STORAGE_MAX_SIZE) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_event_storage() { + let storage = EventStorage::new(3); + + storage.add(1).await; + storage.add(2).await; + storage.add(3).await; + storage.add(4).await; + + assert_eq!(storage.latest().await, Some(4)); + assert_eq!(storage.all().await, vec![4, 3, 2]); + } +} diff --git a/rust/theoros/src/storage/mod.rs b/rust/theoros/src/storage/mod.rs new file mode 100644 index 00000000..f450feb7 --- /dev/null +++ b/rust/theoros/src/storage/mod.rs @@ -0,0 +1,44 @@ +pub mod event_storage; +pub mod validators_storage; + +pub use event_storage::*; +pub use validators_storage::*; + +use std::collections::HashSet; + +use starknet::core::types::Felt; + +use crate::{ + rpc::{HyperlaneCalls, PragmaWrapperCalls, StarknetRpc}, + types::hyperlane::DispatchEvent, +}; + +/// Theoros storage that contains: +/// * a set of all available data feeds, +/// * an events storage containing the most recents [DispatchEvent] events indexed, +/// * a mapping of all the validators and their fetchers. +#[derive(Default)] +pub struct TheorosStorage { + pub data_feeds: HashSet, + pub validators: ValidatorsStorage, + pub dispatch_events: EventStorage, +} + +impl TheorosStorage { + pub async fn from_rpc_state( + rpc_client: &StarknetRpc, + pragma_wrapper_address: &Felt, + hyperlane_address: &Felt, + ) -> anyhow::Result { + let mut theoros_storage = TheorosStorage::default(); + + let initial_validators = rpc_client.get_announced_validators(hyperlane_address).await?; + let initial_locations = + rpc_client.get_announced_storage_locations(hyperlane_address, &initial_validators).await?; + theoros_storage.validators.fill_with_initial_state(initial_validators, initial_locations).await?; + + let supported_data_feeds = rpc_client.get_data_feeds(pragma_wrapper_address).await?; + theoros_storage.data_feeds = supported_data_feeds.into_iter().collect(); + Ok(theoros_storage) + } +} diff --git a/rust/theoros/src/storage/validators_storage.rs b/rust/theoros/src/storage/validators_storage.rs new file mode 100644 index 00000000..04e31e63 --- /dev/null +++ b/rust/theoros/src/storage/validators_storage.rs @@ -0,0 +1,54 @@ +use std::collections::HashMap; +use std::str::FromStr; + +use anyhow::bail; +use starknet::core::types::Felt; +use tokio::sync::RwLock; + +use crate::types::hyperlane::{CheckpointFetcherConf, ValidatorAnnouncementEvent}; + +/// Contains a mapping between the validators and their fetchers used to +/// retrieve checkpoints. +#[derive(Debug, Default)] +pub struct ValidatorsStorage(RwLock>); + +impl ValidatorsStorage { + pub fn new() -> Self { + Self(RwLock::new(HashMap::default())) + } + + /// Fills the [HashMap] with the initial state fetched from the RPC. + pub async fn fill_with_initial_state(&self, validators: Vec, locations: Vec) -> anyhow::Result<()> { + if validators.len() != locations.len() { + bail!("⛔ Validators and locations vectors must have the same length"); + } + + let mut all_fetchers = self.0.write().await; + + for (validator, location) in validators.into_iter().zip(locations.into_iter()) { + let fetcher = CheckpointFetcherConf::from_str(&location)?; + all_fetchers.insert(validator, fetcher); + } + + Ok(()) + } + + /// Adds or updates the [CheckpointFetcherConf] for the given validator + pub async fn add(&self, validator: Felt, fetcher: CheckpointFetcherConf) -> anyhow::Result<()> { + let mut all_fetchers = self.0.write().await; + all_fetchers.insert(validator, fetcher); + Ok(()) + } + + /// Adds or updates the [CheckpointFetcherConf] for the given validator from a [ValidatorAnnouncementEvent] + pub async fn add_from_announcement_event(&self, event: ValidatorAnnouncementEvent) -> anyhow::Result<()> { + let validator: Felt = event.validator.into(); + let fetcher = CheckpointFetcherConf::from_str(&event.storage_location)?; + self.add(validator, fetcher).await + } + + /// Returns all the fetchers for each validator + pub async fn all(&self) -> HashMap { + self.0.read().await.clone() + } +} diff --git a/rust/theoros/src/types/event_storage.rs b/rust/theoros/src/types/event_storage.rs deleted file mode 100644 index f53234d5..00000000 --- a/rust/theoros/src/types/event_storage.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::collections::VecDeque; - -use tokio::sync::RwLock; - -use crate::types::dispatch_event::DispatchEvent; - -/// FIFO Buffer of fixed size used to store DispatchEvent from our -/// oracle contract ; the first element being the latest. -pub struct EventStorage { - dispatches: RwLock>, - max_size: usize, -} - -impl EventStorage { - pub fn new(max_size: usize) -> Self { - Self { dispatches: RwLock::new(VecDeque::with_capacity(max_size)), max_size } - } - - pub async fn add(&self, dispatch: DispatchEvent) { - let mut dispatches = self.dispatches.write().await; - dispatches.push_front(dispatch); - if dispatches.len() > self.max_size { - dispatches.pop_back(); - } - } - - pub async fn latest(&self) -> Option { - self.dispatches.read().await.front().cloned() - } - - pub async fn all(&self) -> Vec { - self.dispatches.read().await.iter().cloned().collect() - } -} diff --git a/rust/theoros/src/types/hyperlane/checkpoints/mod.rs b/rust/theoros/src/types/hyperlane/checkpoint.rs similarity index 95% rename from rust/theoros/src/types/hyperlane/checkpoints/mod.rs rename to rust/theoros/src/types/hyperlane/checkpoint.rs index d27b9e35..1e6babdd 100644 --- a/rust/theoros/src/types/hyperlane/checkpoints/mod.rs +++ b/rust/theoros/src/types/hyperlane/checkpoint.rs @@ -1,11 +1,16 @@ -pub mod fetcher; - -pub use fetcher::*; - use serde::{Deserialize, Serialize}; use alloy::primitives::U256; +/// A Hyperlane (checkpoint, messageId) tuple +#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct CheckpointWithMessageId { + /// existing Hyperlane checkpoint struct + pub checkpoint: Checkpoint, + /// hash of message emitted from mailbox checkpoint.index + pub message_id: U256, +} + /// An Hyperlane checkpoint #[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] pub struct Checkpoint { @@ -18,12 +23,3 @@ pub struct Checkpoint { /// The index of the checkpoint pub index: u32, } - -/// A Hyperlane (checkpoint, messageId) tuple -#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] -pub struct CheckpointWithMessageId { - /// existing Hyperlane checkpoint struct - pub checkpoint: Checkpoint, - /// hash of message emitted from mailbox checkpoint.index - pub message_id: U256, -} diff --git a/rust/theoros/src/types/hyperlane/storages/gcs_storage.rs b/rust/theoros/src/types/hyperlane/checkpoint_fetchers/gcs.rs similarity index 94% rename from rust/theoros/src/types/hyperlane/storages/gcs_storage.rs rename to rust/theoros/src/types/hyperlane/checkpoint_fetchers/gcs.rs index 39226bb7..cb930c38 100644 --- a/rust/theoros/src/types/hyperlane/storages/gcs_storage.rs +++ b/rust/theoros/src/types/hyperlane/checkpoint_fetchers/gcs.rs @@ -6,7 +6,7 @@ use anyhow::Result; use async_trait::async_trait; use ya_gcp::{storage::StorageClient, AuthFlow, ClientBuilder, ClientBuilderConfig}; -use crate::types::{CheckpointFetcher, CheckpointWithMessageId, StorageLocation}; +use crate::types::hyperlane::{CheckpointFetcher, CheckpointWithMessageId}; #[allow(unused)] const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey"; @@ -66,7 +66,7 @@ impl CheckpointFetcher for GcsStorageClient { Ok(Some(serde_json::from_slice(res.as_ref())?)) } - fn announcement_location(&self) -> StorageLocation { + fn announcement_location(&self) -> String { format!("gs://{}/{}", &self.bucket, ANNOUNCEMENT_KEY) } } diff --git a/rust/theoros/src/types/hyperlane/storages/local.rs b/rust/theoros/src/types/hyperlane/checkpoint_fetchers/local.rs similarity index 90% rename from rust/theoros/src/types/hyperlane/storages/local.rs rename to rust/theoros/src/types/hyperlane/checkpoint_fetchers/local.rs index bea3dbad..bb45641f 100644 --- a/rust/theoros/src/types/hyperlane/storages/local.rs +++ b/rust/theoros/src/types/hyperlane/checkpoint_fetchers/local.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use anyhow::{Context, Result}; use async_trait::async_trait; -use crate::types::{CheckpointFetcher, CheckpointWithMessageId, StorageLocation}; +use crate::types::hyperlane::{CheckpointFetcher, CheckpointWithMessageId}; #[allow(unused)] #[derive(Debug, Clone)] @@ -40,7 +40,7 @@ impl CheckpointFetcher for LocalStorage { Ok(Some(checkpoint)) } - fn announcement_location(&self) -> StorageLocation { + fn announcement_location(&self) -> String { format!("file://{}", self.path.to_str().unwrap()) } } diff --git a/rust/theoros/src/types/hyperlane/checkpoints/fetcher.rs b/rust/theoros/src/types/hyperlane/checkpoint_fetchers/mod.rs similarity index 95% rename from rust/theoros/src/types/hyperlane/checkpoints/fetcher.rs rename to rust/theoros/src/types/hyperlane/checkpoint_fetchers/mod.rs index 72dd5301..0f6e4dce 100644 --- a/rust/theoros/src/types/hyperlane/checkpoints/fetcher.rs +++ b/rust/theoros/src/types/hyperlane/checkpoint_fetchers/mod.rs @@ -1,3 +1,7 @@ +pub mod gcs; +pub mod local; +pub mod s3; + // Source: // https://github.com/hyperlane-xyz/hyperlane-monorepo/blob/3e90734310fb1ca9a607ce3d334015fa7aaa9208/rust/hyperlane-base/src/settings/checkpoint_syncer.rs#L14 @@ -10,16 +14,14 @@ use core::str::FromStr; use rusoto_core::Region; use ya_gcp::{AuthFlow, ServiceAccountAuth}; -use crate::types::{ - gcs_storage::{GcsStorageClientBuilder, GCS_SERVICE_ACCOUNT_KEY, GCS_USER_SECRET}, +use crate::types::hyperlane::{ + gcs::{GcsStorageClientBuilder, GCS_SERVICE_ACCOUNT_KEY, GCS_USER_SECRET}, local::LocalStorage, s3::S3Storage, }; use super::CheckpointWithMessageId; -pub type StorageLocation = String; - #[allow(unused)] /// A generic trait to read/write Checkpoints offchain #[async_trait] @@ -27,10 +29,10 @@ pub trait CheckpointFetcher: Debug + Send + Sync { /// Attempt to fetch the signed (checkpoint, messageId) tuple at this index async fn fetch(&self, index: u32) -> Result>; /// Return the announcement storage location for this syncer - fn announcement_location(&self) -> StorageLocation; + fn announcement_location(&self) -> String; } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, Hash, PartialEq)] pub enum CheckpointFetcherConf { /// A local checkpoint syncer LocalStorage { diff --git a/rust/theoros/src/types/hyperlane/storages/s3.rs b/rust/theoros/src/types/hyperlane/checkpoint_fetchers/s3.rs similarity index 97% rename from rust/theoros/src/types/hyperlane/storages/s3.rs rename to rust/theoros/src/types/hyperlane/checkpoint_fetchers/s3.rs index f6e837a5..35dd7361 100644 --- a/rust/theoros/src/types/hyperlane/storages/s3.rs +++ b/rust/theoros/src/types/hyperlane/checkpoint_fetchers/s3.rs @@ -14,7 +14,7 @@ use tokio::time::timeout; use pragma_utils::http::http_client_with_timeout; -use crate::types::{CheckpointFetcher, CheckpointWithMessageId, StorageLocation}; +use crate::types::hyperlane::{CheckpointFetcher, CheckpointWithMessageId}; /// The timeout for S3 requests. Rusoto doesn't offer timeout configuration /// out of the box, so S3 requests must be wrapped with a timeout. @@ -112,7 +112,7 @@ impl CheckpointFetcher for S3Storage { .map_err(Into::into) } - fn announcement_location(&self) -> StorageLocation { + fn announcement_location(&self) -> String { match self.folder.as_deref() { None | Some("") => format!("s3://{}/{}", self.bucket, self.region.name()), Some(folder_str) => { diff --git a/rust/theoros/src/types/hyperlane/dispatch_event.rs b/rust/theoros/src/types/hyperlane/events/dispatch_event.rs similarity index 84% rename from rust/theoros/src/types/hyperlane/dispatch_event.rs rename to rust/theoros/src/types/hyperlane/events/dispatch_event.rs index 98c73b7a..19f3cd48 100644 --- a/rust/theoros/src/types/hyperlane/dispatch_event.rs +++ b/rust/theoros/src/types/hyperlane/events/dispatch_event.rs @@ -5,6 +5,8 @@ use starknet::core::types::U256; use pragma_utils::conversions::apibara::FromFieldBytes; +use super::FromStarknetEventData; + #[derive(Debug, Clone)] #[allow(unused)] pub struct DispatchEvent { @@ -14,32 +16,32 @@ pub struct DispatchEvent { pub message: DispatchMessage, } -impl DispatchEvent { - // Creates a Dispatch from a Dispatch Event data, which is: - // 0. (felt) sender address - // 1. (felt) destination chain id - // 2. (felt:low; felt:high) recipient address - // 3. message - // a. header => - // - felt => version, - // - felt => nonce, - // - felt => origin, - // - felt => sender_low, - // - felt => sender_high, - // - felt => destination, - // - felt => recipient_low, - // - felt => recipient_high, - // b. body: - // - felt => nbr data_feeds updated - // - update (per data_feed) => - // - felt => data_type (given it, we know update_size) - // - felt => ID - // - felt => price - // - felt => decimals - // - felt => timestamp - // - felt => sources_aggregated - // - IF FUTURE: felt => expiration_timestamp - pub fn from_event_data(data: Vec) -> Result { +// Creates a Dispatch from a Dispatch starknet event data, which is: +// 0. (felt) sender address +// 1. (felt) destination chain id +// 2. (felt:low; felt:high) recipient address +// 3. message +// a. header => +// - felt => version, +// - felt => nonce, +// - felt => origin, +// - felt => sender_low, +// - felt => sender_high, +// - felt => destination, +// - felt => recipient_low, +// - felt => recipient_high, +// b. body: +// - felt => nbr data_feeds updated +// - update (per data_feed) => +// - felt => data_type (given it, we know update_size) +// - felt => ID +// - felt => price +// - felt => decimals +// - felt => timestamp +// - felt => sources_aggregated +// - IF FUTURE: felt => expiration_timestamp +impl FromStarknetEventData for DispatchEvent { + fn from_starknet_event_data(data: impl Iterator) -> Result { let mut data = data.into_iter(); let sender = U256::from_words( @@ -54,8 +56,8 @@ impl DispatchEvent { u128::from_field_bytes(data.next().context("Missing recipient part 2")?.to_bytes()), ); - let header = DispatchMessageHeader::from_event_data(&mut data.by_ref().take(HEADER_SIZE))?; - let body = DispatchMessageBody::from_event_data(&mut data)?; + let header = DispatchMessageHeader::from_starknet_event_data(&mut data.by_ref().take(HEADER_SIZE))?; + let body = DispatchMessageBody::from_starknet_event_data(&mut data)?; let message = DispatchMessage { header, body }; let dispatch = Self { sender, destination_domain, recipient_address, message }; @@ -83,8 +85,8 @@ pub struct DispatchMessageHeader { pub recipient: U256, } -impl DispatchMessageHeader { - pub fn from_event_data(mut data: impl Iterator) -> Result { +impl FromStarknetEventData for DispatchMessageHeader { + fn from_starknet_event_data(mut data: impl Iterator) -> Result { Ok(Self { version: u8::from_field_bytes(data.next().context("Missing version")?.to_bytes()), nonce: u32::from_field_bytes(data.next().context("Missing nonce")?.to_bytes()), @@ -109,14 +111,14 @@ pub struct DispatchMessageBody { pub updates: Vec, } -impl DispatchMessageBody { - pub fn from_event_data(mut data: impl Iterator) -> Result { +impl FromStarknetEventData for DispatchMessageBody { + fn from_starknet_event_data(mut data: impl Iterator) -> Result { let nb_updated = u32::from_field_bytes(data.next().context("Missing number of updates")?.to_bytes()); let mut updates = Vec::with_capacity(nb_updated as usize); for _ in 0..nb_updated { - let update = DispatchUpdate::from_event_data(&mut data).context("Failed to parse update")?; + let update = DispatchUpdate::from_starknet_event_data(&mut data).context("Failed to parse update")?; updates.push(update); } @@ -155,14 +157,14 @@ pub enum DispatchUpdate { Future(FutureUpdate), } -impl DispatchUpdate { - pub fn from_event_data(mut data: impl Iterator) -> Result { +impl FromStarknetEventData for DispatchUpdate { + fn from_starknet_event_data(mut data: impl Iterator) -> Result { let data_type = UpdateType::from_u8(u8::from_field_bytes(data.next().context("Missing data type")?.to_bytes()))?; match data_type { - UpdateType::Spot => Ok(DispatchUpdate::Spot(SpotUpdate::from_event_data(&mut data)?)), - UpdateType::Future => Ok(DispatchUpdate::Future(FutureUpdate::from_event_data(&mut data)?)), + UpdateType::Spot => Ok(DispatchUpdate::Spot(SpotUpdate::from_starknet_event_data(&mut data)?)), + UpdateType::Future => Ok(DispatchUpdate::Future(FutureUpdate::from_starknet_event_data(&mut data)?)), } } } @@ -177,8 +179,8 @@ pub struct SpotUpdate { pub num_sources_aggregated: u32, } -impl SpotUpdate { - pub fn from_event_data(mut data: impl Iterator) -> Result { +impl FromStarknetEventData for SpotUpdate { + fn from_starknet_event_data(mut data: impl Iterator) -> Result { let feed_id = U256::from_words( u128::from_field_bytes(data.next().context("Missing feed ID part 1")?.to_bytes()), u128::from_field_bytes(data.next().context("Missing feed ID part 2")?.to_bytes()), @@ -208,8 +210,8 @@ pub struct FutureUpdate { pub num_sources_aggregated: u32, } -impl FutureUpdate { - pub fn from_event_data(mut data: impl Iterator) -> Result { +impl FromStarknetEventData for FutureUpdate { + fn from_starknet_event_data(mut data: impl Iterator) -> Result { let feed_id = U256::from_words( u128::from_field_bytes(data.next().context("Missing feed ID part 1")?.to_bytes()), u128::from_field_bytes(data.next().context("Missing feed ID part 2")?.to_bytes()), @@ -265,7 +267,7 @@ mod tests { create_field_element(5), // num_sources_aggregated ]; - let dispatch_event = DispatchEvent::from_event_data(event_data).unwrap(); + let dispatch_event = DispatchEvent::from_starknet_event_data(event_data.into_iter()).unwrap(); assert_eq!(dispatch_event.sender, U256::from(1_u32)); assert_eq!(dispatch_event.destination_domain, 2); @@ -308,7 +310,7 @@ mod tests { create_field_element(0), // recipient part 2 ]; - let header = DispatchMessageHeader::from_event_data(header_data.into_iter()).unwrap(); + let header = DispatchMessageHeader::from_starknet_event_data(header_data.into_iter()).unwrap(); assert_eq!(header.version, 1); assert_eq!(header.nonce, 2); @@ -339,7 +341,7 @@ mod tests { create_field_element(1234567892), // expiration_timestamp ]; - let body = DispatchMessageBody::from_event_data(body_data.into_iter()).unwrap(); + let body = DispatchMessageBody::from_starknet_event_data(body_data.into_iter()).unwrap(); assert_eq!(body.nb_updated, 2); assert_eq!(body.updates.len(), 2); @@ -391,8 +393,8 @@ mod tests { create_field_element(1234567892), // expiration_timestamp ]; - let spot_update = DispatchUpdate::from_event_data(spot_data.into_iter()).unwrap(); - let future_update = DispatchUpdate::from_event_data(future_data.into_iter()).unwrap(); + let spot_update = DispatchUpdate::from_starknet_event_data(spot_data.into_iter()).unwrap(); + let future_update = DispatchUpdate::from_starknet_event_data(future_data.into_iter()).unwrap(); match spot_update { DispatchUpdate::Spot(update) => { diff --git a/rust/theoros/src/types/hyperlane/events/mod.rs b/rust/theoros/src/types/hyperlane/events/mod.rs new file mode 100644 index 00000000..de3334a1 --- /dev/null +++ b/rust/theoros/src/types/hyperlane/events/mod.rs @@ -0,0 +1,11 @@ +pub mod dispatch_event; +pub mod validator_announcement_event; + +pub use dispatch_event::*; +pub use validator_announcement_event::*; + +use apibara_core::starknet::v1alpha2::FieldElement; + +pub trait FromStarknetEventData: Sized { + fn from_starknet_event_data(data: impl Iterator) -> anyhow::Result; +} diff --git a/rust/theoros/src/types/hyperlane/events/validator_announcement_event.rs b/rust/theoros/src/types/hyperlane/events/validator_announcement_event.rs new file mode 100644 index 00000000..1809087e --- /dev/null +++ b/rust/theoros/src/types/hyperlane/events/validator_announcement_event.rs @@ -0,0 +1,27 @@ +use anyhow::Context; +use apibara_core::starknet::v1alpha2::FieldElement; +use starknet::core::types::{EthAddress, Felt}; + +use pragma_utils::conversions::{apibara::apibara_field_as_felt, starknet::FeltVecToString}; + +use super::FromStarknetEventData; + +#[derive(Debug, Clone)] +#[allow(unused)] +pub struct ValidatorAnnouncementEvent { + pub validator: EthAddress, + pub storage_location: String, +} + +impl FromStarknetEventData for ValidatorAnnouncementEvent { + fn from_starknet_event_data(mut data: impl Iterator) -> anyhow::Result { + let validator_as_felt = apibara_field_as_felt(&data.next().context("Missing validator")?); + let validator = EthAddress::from_felt(&validator_as_felt).context("Invalid validator ETH address")?; + + let storage_location_felts: Vec = data.map(|f| apibara_field_as_felt(&f)).collect(); + let storage_location = storage_location_felts.to_string(); + + let new_event = Self { validator, storage_location }; + Ok(new_event) + } +} diff --git a/rust/theoros/src/types/hyperlane/mod.rs b/rust/theoros/src/types/hyperlane/mod.rs index 3e39281d..88e7ecf7 100644 --- a/rust/theoros/src/types/hyperlane/mod.rs +++ b/rust/theoros/src/types/hyperlane/mod.rs @@ -1,8 +1,7 @@ -pub mod checkpoints; -pub mod dispatch_event; -pub mod storages; +pub mod checkpoint; +pub mod checkpoint_fetchers; +pub mod events; -pub use checkpoints::*; -pub use dispatch_event::*; -#[allow(unused)] -pub use storages::*; +pub use checkpoint::*; +pub use checkpoint_fetchers::*; +pub use events::*; diff --git a/rust/theoros/src/types/hyperlane/storages/mod.rs b/rust/theoros/src/types/hyperlane/storages/mod.rs deleted file mode 100644 index 0fe72696..00000000 --- a/rust/theoros/src/types/hyperlane/storages/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod gcs_storage; -pub mod local; -pub mod s3; diff --git a/rust/theoros/src/types/mod.rs b/rust/theoros/src/types/mod.rs index ee73f386..5d95fdad 100644 --- a/rust/theoros/src/types/mod.rs +++ b/rust/theoros/src/types/mod.rs @@ -1,7 +1 @@ -pub mod event_storage; pub mod hyperlane; -pub mod starknet_rpc; - -pub use event_storage::*; -pub use hyperlane::*; -pub use starknet_rpc::*; diff --git a/rust/theoros/src/types/starknet_rpc.rs b/rust/theoros/src/types/starknet_rpc.rs deleted file mode 100644 index a52beaa1..00000000 --- a/rust/theoros/src/types/starknet_rpc.rs +++ /dev/null @@ -1,44 +0,0 @@ -use anyhow::Result; -use pragma_utils::conversions::starknet::felt_vec_to_vec_string; -use starknet::{ - core::types::{BlockId, BlockTag, EthAddress, Felt, FunctionCall}, - macros::selector, - providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider}, -}; -use url::Url; - -use super::StorageLocation; - -pub struct StarknetRpc(JsonRpcClient); - -impl StarknetRpc { - pub fn new(rpc_url: Url) -> Self { - Self(JsonRpcClient::new(HttpTransport::new(rpc_url))) - } - - /// Retrieves a [Vec] of [StorageLocation] from the hyperlane core contract. - pub async fn get_announced_storage_locations( - &self, - hyperlane_core_address: &str, - validators: &[EthAddress], - ) -> Result> { - let hyperlane_core_address = Felt::from_hex(hyperlane_core_address)?; - - let validators: Vec = validators.iter().cloned().map(Felt::from).collect(); - - let mut calldata = vec![Felt::from(validators.len())]; - calldata.extend(validators); - - let call = FunctionCall { - contract_address: hyperlane_core_address, - entry_point_selector: selector!("get_announced_storage_locations"), - calldata, - }; - - let response = self.0.call(call, BlockId::Tag(BlockTag::Pending)).await?; - let storage_locations = felt_vec_to_vec_string(&response)?; - Ok(storage_locations) - } - - // TODO: get_announced_validators -}