diff --git a/Cargo.lock b/Cargo.lock index 0cdfdd76..a4f195bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -467,6 +467,12 @@ dependencies = [ "ws_stream_wasm", ] +[[package]] +name = "anyhow" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3" + [[package]] name = "ark-bn254" version = "0.4.0" @@ -1478,8 +1484,10 @@ dependencies = [ "alloy-contract", "alloy-primitives", "alloy-provider", + "alloy-rpc-types", "alloy-sol-types", "alloy-transport-ws", + "anyhow", "eigensdk-client-avsregistry", "eigensdk-contract-bindings", "eigensdk-crypto-bls", @@ -1491,6 +1499,7 @@ dependencies = [ "futures-util", "thiserror", "tokio", + "tracing", ] [[package]] @@ -2540,6 +2549,17 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "info-operator-service" +version = "0.0.1-alpha" +dependencies = [ + "alloy-primitives", + "eigensdk-client-avsregistry", + "eigensdk-services-operatorsinfo", + "tokio", + "tracing", +] + [[package]] name = "inout" version = "0.1.3" diff --git a/Cargo.toml b/Cargo.toml index a448e105..cdfc07c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,9 @@ members = [ "crates/chainio/clients/avsregistry/", "crates/services/operatorsinfo/", "crates/types/", "crates/metrics/", -"crates/types/"] +"crates/types/", +"examples/info-operator-service/" +] resolver = "2" @@ -62,6 +64,7 @@ eigensdk-metrics-collectors-rpc-calls = {version = "0.0.1-alpha",path = "crates/ eigensdk-services-avsregistry = {path = "crates/services/avsregistry"} eigensdk-services-bls_aggregation = {path = "crates/services/bls_aggregation"} eigensdk-services-operatorsinfo = {path = "crates/services/operatorsinfo"} +info-operator-service = {path = "examples/info-operator-service"} tokio = {version = "1.37.0" , features = ["test-util", "full","sync"] } futures-util = "0.3.30" thiserror = "1.0" diff --git a/crates/chainio/clients/avsregistry/src/reader.rs b/crates/chainio/clients/avsregistry/src/reader.rs index 4963c0c0..bbb4932e 100644 --- a/crates/chainio/clients/avsregistry/src/reader.rs +++ b/crates/chainio/clients/avsregistry/src/reader.rs @@ -3,6 +3,7 @@ use alloy_primitives::{Address, Bytes, FixedBytes, B256, U256}; use alloy_provider::{Provider, ProviderBuilder}; use alloy_rpc_types::Filter; use alloy_sol_types::sol; +use ark_ff::Zero; use eigensdk_types::operator::{bitmap_to_quorum_ids, BLSApkRegistry, OperatorPubKeys}; use num_bigint::BigInt; use std::collections::HashMap; @@ -36,7 +37,6 @@ sol!( "../../../../crates/contracts/bindings/utils/json/OperatorStateRetriever.json" ); -use BLSApkRegistry::{G1Point, G2Point}; /// Avs Registry chainreader #[derive(Debug, Clone)] pub struct AvsRegistryChainReader { @@ -52,7 +52,7 @@ trait AvsRegistryReader { } impl AvsRegistryChainReader { - fn new( + pub fn new( registry_coordinator_addr: Address, bls_apk_registry_addr: Address, operator_state_retriever: Address, @@ -415,44 +415,59 @@ impl AvsRegistryChainReader { pub async fn query_existing_registered_operator_pub_keys( &self, start_block: u64, - stop_block: u64, + mut stop_block: u64, ) -> Result<(Vec
, Vec), Box> { let provider = ProviderBuilder::new() .with_recommended_fillers() .on_builtin(&self.provider) .await?; - - let filter = Filter::new() - .select(start_block..stop_block) - .event("NewPubkeyRegistration(address,(uint256,uint256),(uint256[2],uint256[2]))") - .address(self.bls_apk_registry_addr); - - let logs = provider.get_logs(&filter).await?; - - debug!(transactionLogs = ?logs, "avsRegistryChainReader.QueryExistingRegisteredOperatorPubKeys"); + let query_block_range = 1024; + let current_block_number = provider.get_block_number().await?; + if stop_block.is_zero() { + stop_block = current_block_number; + } + println!("start block :{}", start_block); + println!("stop block {}", stop_block); + let mut i = start_block; let mut operator_addresses: Vec
= vec![]; let mut operator_pub_keys: Vec = vec![]; + while i <= stop_block { + let mut to_block = i + (query_block_range - 1); + if to_block > stop_block { + to_block = stop_block; + } + println!("to block{}", to_block); + println!("bls apk address :{}", self.bls_apk_registry_addr); + let filter = Filter::new() + .select(i..to_block) + .event("NewPubkeyRegistration(address,(uint256,uint256),(uint256[2],uint256[2]))") + .address(self.bls_apk_registry_addr); + + let logs = provider.get_logs(&filter).await?; + println!("logs length {:?}", logs.len()); + debug!(transactionLogs = ?logs, "avsRegistryChainReader.QueryExistingRegisteredOperatorPubKeys"); - for (i, v_log) in logs.iter().enumerate() { - let pub_key_reg_option = v_log - .log_decode::() - .ok(); - if let Some(pub_key_reg) = pub_key_reg_option { - let data = pub_key_reg.data(); - let operator_addr = data.operator; - operator_addresses.push(operator_addr); - let g1_pub_key = data.pubkeyG1.clone(); - let g2_pub_key = data.pubkeyG2.clone(); - - let operator_pub_key = OperatorPubKeys { - g1_pub_key: g1_pub_key, - g2_pub_key: g2_pub_key, - }; - - operator_pub_keys.push(operator_pub_key); + for (_, v_log) in logs.iter().enumerate() { + let pub_key_reg_option = v_log + .log_decode::() + .ok(); + if let Some(pub_key_reg) = pub_key_reg_option { + let data = pub_key_reg.data(); + let operator_addr = data.operator; + operator_addresses.push(operator_addr); + let g1_pub_key = data.pubkeyG1.clone(); + let g2_pub_key = data.pubkeyG2.clone(); + + let operator_pub_key = OperatorPubKeys { + g1_pub_key: g1_pub_key, + g2_pub_key: g2_pub_key, + }; + + operator_pub_keys.push(operator_pub_key); + } } + i += 1024; } - Ok((operator_addresses, operator_pub_keys)) } @@ -470,9 +485,6 @@ impl AvsRegistryChainReader { let query_block_range = 10000; - let contract_registry_coordinator = - RegistryCoordinator::new(self.registry_coordinator_addr, &provider); - let mut i = start_block; while i <= stop_block { diff --git a/crates/chainio/clients/avsregistry/src/subscriber.rs b/crates/chainio/clients/avsregistry/src/subscriber.rs index 72870929..f3349f7d 100644 --- a/crates/chainio/clients/avsregistry/src/subscriber.rs +++ b/crates/chainio/clients/avsregistry/src/subscriber.rs @@ -20,22 +20,17 @@ use BLSApkRegistry::{BLSApkRegistryEvents, BLSApkRegistryInstance, NewPubkeyRegi /// AvsRegistry Chain Subscriber struct #[derive(Debug)] pub struct AvsRegistryChainSubscriber { - bls_apk_registry: BLSApkRegistryEvents, provider: String, } impl AvsRegistryChainSubscriber { - fn new(bls_apk_registry: BLSApkRegistryEvents, provider: String) -> Self { - return AvsRegistryChainSubscriber { - bls_apk_registry: bls_apk_registry, - provider, - }; + pub fn new(provider: String) -> Self { + return AvsRegistryChainSubscriber { provider }; } - async fn build_avs_registry_chain_reader( + pub async fn build( &self, bls_apk_registry_addr: Address, - provider: String, ) -> Result< BLSApkRegistryInstance< BoxTransport, @@ -60,6 +55,7 @@ impl AvsRegistryChainSubscriber { return Ok(bls_apk_reg); } + /// Utility function that returns new pubkey registration filter pub async fn get_new_pub_key_registration_filter<'a>( &self, ) -> Result> { diff --git a/crates/services/operatorsinfo/Cargo.toml b/crates/services/operatorsinfo/Cargo.toml index d4e9e34e..8719d7d9 100644 --- a/crates/services/operatorsinfo/Cargo.toml +++ b/crates/services/operatorsinfo/Cargo.toml @@ -24,4 +24,7 @@ alloy-primitives.workspace = true alloy-provider.workspace = true alloy-transport-ws.workspace = true futures-util.workspace = true -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } \ No newline at end of file +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +tracing.workspace = true +anyhow = "1.0.83" +alloy-rpc-types.workspace = true \ No newline at end of file diff --git a/crates/services/operatorsinfo/src/operatorsinfo_inmemory.rs b/crates/services/operatorsinfo/src/operatorsinfo_inmemory.rs index 5c886458..2ccbddbe 100644 --- a/crates/services/operatorsinfo/src/operatorsinfo_inmemory.rs +++ b/crates/services/operatorsinfo/src/operatorsinfo_inmemory.rs @@ -1,17 +1,16 @@ -use alloy_sol_types::sol; -use eigensdk_client_avsregistry::{ - reader::AvsRegistryChainReader, subscriber::AvsRegistryChainSubscriber, -}; - -// use eigensdk_types::{G1Point,G2Point}; use alloy_primitives::Address; use alloy_provider::{Provider, ProviderBuilder}; +use alloy_rpc_types::Filter; use alloy_transport_ws::WsConnect; +use anyhow::Result; +use eigensdk_client_avsregistry::{ + reader::AvsRegistryChainReader, subscriber::AvsRegistryChainSubscriber, +}; use eigensdk_types::operator::BLSApkRegistry::{self, G1Point, G2Point}; use eigensdk_types::operator::{operator_id_from_g1_pub_key, OperatorPubKeys}; -use eyre::Result; -use futures_util::{stream, StreamExt}; +use futures_util::StreamExt; use std::collections::HashMap; +use std::error::Error as StdError; use tokio::sync::{ mpsc, mpsc::UnboundedSender, @@ -32,6 +31,8 @@ enum OperatorsInfoMessage { Get(Address, Sender>), } +type BoxedError = Box; + impl OperatorInfoServiceInMemory { pub async fn new( avs_registry_subscriber: AvsRegistryChainSubscriber, @@ -39,7 +40,6 @@ impl OperatorInfoServiceInMemory { web_socket: String, ) -> Self { let (pubkeys_tx, mut pubkeys_rx) = mpsc::unbounded_channel(); - let mut operator_info_data = HashMap::new(); let mut operator_addr_to_id = HashMap::new(); @@ -71,53 +71,44 @@ impl OperatorInfoServiceInMemory { } } - #[tokio::main] - pub async fn start_service(&self) -> Result<()> { + pub async fn start_service(&self, start_block: u64, end_block: u64) -> Result<()> { // query past operator registrations - self.query_past_registered_operator_events_and_fill_db() + self.query_past_registered_operator_events_and_fill_db(start_block, end_block) .await; - let filter_result = self - .avs_registry_subscriber - .get_new_pub_key_registration_filter() - .await; - - match filter_result { - Ok(filter) => { - let ws = WsConnect::new(&self.ws); - let provider = ProviderBuilder::new().on_ws(ws).await?; - - let mut subcription_new_operator_registration_stream = - provider.subscribe_logs(&filter).await?; - let mut stream = subcription_new_operator_registration_stream.into_stream(); - while let Some(log) = stream.next().await { - let data = log - .log_decode::() - .ok(); - - if let Some(new_pub_key_event) = data { - let event_data = new_pub_key_event.data(); - let operator_pub_key = OperatorPubKeys { - g1_pub_key: G1Point { - X: event_data.pubkeyG1.X, - Y: event_data.pubkeyG1.Y, - }, - g2_pub_key: G2Point { - X: event_data.pubkeyG2.X, - Y: event_data.pubkeyG2.Y, - }, - }; - // send message - let _ = self.pub_keys.send(OperatorsInfoMessage::InsertOperatorInfo( - event_data.operator, - operator_pub_key, - )); - } - } + let ws = WsConnect::new(&self.ws); + let provider = ProviderBuilder::new().on_ws(ws).await?; + let current_block_number = provider.get_block_number().await?; + let filter = Filter::new() + .event("NewPubkeyRegistration(address,(uint256,uint256),(uint256[2],uint256[2]))") + .from_block(current_block_number); + + let subcription_new_operator_registration_stream = provider.subscribe_logs(&filter).await?; + let mut stream = subcription_new_operator_registration_stream.into_stream(); + while let Some(log) = stream.next().await { + let data = log + .log_decode::() + .ok(); + + if let Some(new_pub_key_event) = data { + let event_data = new_pub_key_event.data(); + let operator_pub_key = OperatorPubKeys { + g1_pub_key: G1Point { + X: event_data.pubkeyG1.X, + Y: event_data.pubkeyG1.Y, + }, + g2_pub_key: G2Point { + X: event_data.pubkeyG2.X, + Y: event_data.pubkeyG2.Y, + }, + }; + // send message + let _ = self.pub_keys.send(OperatorsInfoMessage::InsertOperatorInfo( + event_data.operator, + operator_pub_key, + )); } - Err(_) => {} } - Ok(()) } @@ -129,15 +120,17 @@ impl OperatorInfoServiceInMemory { responder_rx.await.unwrap_or(None) } - pub async fn query_past_registered_operator_events_and_fill_db(&self) { + pub async fn query_past_registered_operator_events_and_fill_db( + &self, + start_block: u64, + end_block: u64, + ) { let (operator_address, operator_pub_keys) = self .avs_registry_reader - .query_existing_registered_operator_pub_keys(0, 0) + .query_existing_registered_operator_pub_keys(start_block, end_block) .await .unwrap(); - for (i, address) in operator_address.iter().enumerate() { - // let mut pub_keys = map.lock().unwrap(); let message = OperatorsInfoMessage::InsertOperatorInfo(*address, operator_pub_keys[i].clone()); let _ = self.pub_keys.send(message); diff --git a/examples/README.md b/examples/README.md index 09f000c1..f60d1ff8 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,3 +1,8 @@ # Examples -Examples demonstrating how to interact with Eigen layer contracts using eigensdk-rs crates. \ No newline at end of file +Examples demonstrating how to interact with Eigen layer contracts using eigensdk-rs crates. + + +## OperatorsInfo + + diff --git a/examples/info-operator-service/Cargo.toml b/examples/info-operator-service/Cargo.toml new file mode 100644 index 00000000..99eb62a4 --- /dev/null +++ b/examples/info-operator-service/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "info-operator-service" +description = "Example demonstrating the operatorsinfo crate" + +version.workspace = true +edition.workspace = true +rust-version.workspace = true +repository.workspace = true +license-file.workspace = true + +[dependencies] +eigensdk-client-avsregistry.workspace = true +eigensdk-services-operatorsinfo.workspace = true +alloy-primitives.workspace = true +tokio = {workspace = true, features =["full"]} +tracing.workspace = true diff --git a/examples/info-operator-service/src/main.rs b/examples/info-operator-service/src/main.rs new file mode 100644 index 00000000..f03f0612 --- /dev/null +++ b/examples/info-operator-service/src/main.rs @@ -0,0 +1,60 @@ +use alloy_primitives::address; +use eigensdk_client_avsregistry::{ + reader::AvsRegistryChainReader, subscriber::AvsRegistryChainSubscriber, +}; +use eigensdk_services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::task; +use tokio::time::{self, Duration}; + +const HOLESKY_PROVIDER: &str = "https://ethereum-holesky.blockpi.network/v1/rpc/public"; +const WS_HOLESKY_PROIVIDER: &str = "wss://holesky.drpc.org"; + +#[tokio::main] +async fn main() { + let avs_registry_chain_reader = AvsRegistryChainReader::new( + address!("53012C69A189cfA2D9d29eb6F19B32e0A2EA3490"), + address!("066cF95c1bf0927124DFB8B02B401bc23A79730D"), + address!("B4baAfee917fb4449f5ec64804217bccE9f46C67"), + address!("BDACD5998989Eec814ac7A0f0f6596088AA2a270"), + HOLESKY_PROVIDER.to_string(), + ); + let avs_registry_subscriber = AvsRegistryChainSubscriber::new(WS_HOLESKY_PROIVIDER.to_string()); + + let operators_info = Arc::new(Mutex::new( + OperatorInfoServiceInMemory::new( + avs_registry_subscriber, + avs_registry_chain_reader, + WS_HOLESKY_PROIVIDER.to_string(), + ) + .await, + )); + let operators_info_clone = Arc::clone(&operators_info); + // start the service with a particular block range + // from block : 1536406 + // to block : 0 means current block , else normal + task::spawn(async move { + let operators_info = operators_info_clone.lock().await; + operators_info.start_service(1536406, 0).await + }); + + // Do whatever in this loop. We are getting the operator info , and re entering after 60 seconds + // indefinitely. You can always break or run it as per your preference. + loop { + println!("entered loop"); + let info = operators_info.lock().await; + // https://holesky.etherscan.io/tx/0xa5e239184bb8b3340a2ea2d73f6ef394663c76a7313e5b1e8d886f2ae0f71f1f + let res = info + .get_operator_info(address!("40d2c07fe83cf73df224f691cefd46257c4e5149")) + .await; + match res { + Some(operator_pub_keys) => { + println!("operator pub keys : {:?}", operator_pub_keys); + } + None => {} + }; + drop(info); // Explicitly drop the lock to release it + time::sleep(Duration::from_secs(60)).await; // Adjust the duration as needed + } +}