From 5289b356aad9041b57f286708ad572d28150c458 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 30 Nov 2023 13:59:22 +0100 Subject: [PATCH] fix: use solana-client pubsub lib instead of solana-shadow Solana-shadow is not maintained anymore and doesn't work properly. It drops many updates and after a certain amount of latency drops all messages. We have been using Solana PubSub client in Hermes without any problem and there is no reason to stick with solana-shadow. --- Cargo.lock | 200 +----------------------------- Cargo.toml | 10 +- config/config.sample.pythnet.toml | 13 -- src/agent/solana/exporter.rs | 28 ++--- src/agent/solana/oracle.rs | 115 +++++++++-------- 5 files changed, 90 insertions(+), 276 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b07862..e60e26d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -881,19 +881,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "dashmap" -version = "5.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" -dependencies = [ - "cfg-if", - "hashbrown 0.12.3", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "data-encoding" version = "2.3.2" @@ -1272,21 +1259,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1707,19 +1679,6 @@ dependencies = [ "tokio-rustls", ] -[[package]] -name = "hyper-tls" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" -dependencies = [ - "bytes", - "hyper", - "native-tls", - "tokio", - "tokio-native-tls", -] - [[package]] name = "iana-time-zone" version = "0.1.47" @@ -2178,24 +2137,6 @@ dependencies = [ "twoway", ] -[[package]] -name = "native-tls" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -2396,51 +2337,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" -[[package]] -name = "openssl" -version = "0.10.42" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12fc0523e3bd51a692c8850d075d74dc062ccf251c0110668cbd921917118a13" -dependencies = [ - "bitflags", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" -dependencies = [ - "proc-macro2 1.0.43", - "quote 1.0.21", - "syn 1.0.99", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.77" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b03b84c3b2d099b81f0953422b4d4ad58761589d0229b5506356afca05a3670a" -dependencies = [ - "autocfg 1.1.0", - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "ordered-multimap" version = "0.4.3" @@ -2697,12 +2599,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "pkg-config" -version = "0.3.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" - [[package]] name = "polyval" version = "0.5.3" @@ -2835,7 +2731,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.4.0" +version = "2.4.1" dependencies = [ "anyhow", "async-trait", @@ -2866,9 +2762,9 @@ dependencies = [ "slog-extlog", "slog-term", "soketto", + "solana-account-decoder", "solana-client", "solana-sdk", - "solana-shadow", "thiserror", "tokio", "tokio-retry", @@ -4066,8 +3962,8 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-tungstenite 0.17.2", - "tungstenite 0.17.3", + "tokio-tungstenite", + "tungstenite", "url", ] @@ -4408,30 +4304,6 @@ dependencies = [ "syn 1.0.99", ] -[[package]] -name = "solana-shadow" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e5407b52971e4d643f2e4751d6a115414e46c30836d1ca9d57ce61720536e01" -dependencies = [ - "base64 0.13.0", - "borsh", - "borsh-derive", - "bs58", - "dashmap", - "futures", - "hyper-tls", - "serde", - "serde_json", - "solana-client", - "solana-sdk", - "thiserror", - "tokio", - "tokio-tungstenite 0.16.1", - "tracing", - "tracing-futures", -] - [[package]] name = "solana-streamer" version = "1.14.16" @@ -4931,16 +4803,6 @@ dependencies = [ "syn 1.0.99", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-retry" version = "0.3.0" @@ -4974,20 +4836,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-tungstenite" -version = "0.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72" -dependencies = [ - "futures-util", - "log", - "native-tls", - "tokio", - "tokio-native-tls", - "tungstenite 0.16.0", -] - [[package]] name = "tokio-tungstenite" version = "0.17.2" @@ -4999,7 +4847,7 @@ dependencies = [ "rustls", "tokio", "tokio-rustls", - "tungstenite 0.17.3", + "tungstenite", "webpki", "webpki-roots", ] @@ -5087,42 +4935,12 @@ dependencies = [ "once_cell", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "try-lock" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" -[[package]] -name = "tungstenite" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1" -dependencies = [ - "base64 0.13.0", - "byteorder", - "bytes", - "http", - "httparse", - "log", - "native-tls", - "rand 0.8.5", - "sha-1 0.9.8", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.17.3" @@ -5294,12 +5112,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "vec_map" version = "0.8.2" @@ -5347,7 +5159,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-stream", - "tokio-tungstenite 0.17.2", + "tokio-tungstenite", "tokio-util", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 2c98625..1109dd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.4.0" +version = "2.4.1" edition = "2021" [[bin]] @@ -28,9 +28,10 @@ chrono = "0.4.31" chrono-tz = "0.8.4" parking_lot = "0.12.1" pyth-sdk = "0.7.0" -pyth-sdk-solana = "0.7.1" -solana-client = "1.13.6" -solana-sdk = "1.13.6" +pyth-sdk-solana = "0.7.2" +solana-account-decoder = "1.14.16" +solana-client = "1.14.16" +solana-sdk = "1.14.16" bincode = "1.3.3" slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_trace"] } slog-term = "2.9.0" @@ -38,7 +39,6 @@ rand = "0.8.5" slog-async = "2.7.0" config = "0.13.3" thiserror = "1.0.32" -solana-shadow = "0.2.4" clap = { version = "4.0.32", features = ["derive"] } humantime-serde = "1.1.1" slog-envlogger = "2.2.0" diff --git a/config/config.sample.pythnet.toml b/config/config.sample.pythnet.toml index e30edd9..abf5543 100644 --- a/config/config.sample.pythnet.toml +++ b/config/config.sample.pythnet.toml @@ -48,14 +48,6 @@ key_store.program_key = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH" # Oracle mapping pubkey key_store.mapping_key = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J" -# Duration of the interval at which to publish updates. Default interval is 1 seconds. -# exporter.publish_interval_duration = "1s" - -# Price per compute unit offered for update_price transactions. -# This is needed for solana to be able to land transactions on the network -# during periods of high network congestion. -exporter.compute_unit_price_micro_lamports = 40000 - # Compute unit limit requested per instruction for transactions that update the price. # This should be an upper bound of the compute units a single upd_price instruction might consume. # For solana mainnet, this should be set to 20000 instead of the default 40000 since there is no accumulator. @@ -65,11 +57,6 @@ exporter.compute_unit_limit = 20000 # This is needed for solana to be able to land transactions on the network # during periods of high network congestion. exporter.dynamic_compute_unit_pricing_enabled = true -exporter.maximum_slot_gap_for_dynamic_compute_unit_price = 50 -exporter.maximum_total_compute_fee_micro_lamports = 10000000000 - -# The interval with which to poll account information. -oracle.poll_interval_duration = "5s" # Configuration for the JRPC API [pythd_adapter] diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index f50cee2..e2ce62e 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -481,25 +481,25 @@ impl Exporter { if let Some(weekly_schedule) = self.our_prices.get(&key_from_id) { let ret = weekly_schedule.can_publish_at(&now); - if !ret { - debug!(self.logger, "Exporter: Attempted to publish price outside market hours"; - "price_account" => key_from_id.to_string(), - "weekly_schedule" => format!("{:?}", weekly_schedule), - "utc_time" => now.format("%c").to_string(), - ); - } - - ret + if !ret { + debug!(self.logger, "Exporter: Attempted to publish price outside market hours"; + "price_account" => key_from_id.to_string(), + "weekly_schedule" => format!("{:?}", weekly_schedule), + "utc_time" => now.format("%c").to_string(), + ); + } + + ret } else { // Note: This message is not an error. Some // publishers have different permissions on // primary/secondary networks debug!( - self.logger, - "Exporter: Attempted to publish a price without permission, skipping"; - "unpermissioned_price_account" => key_from_id.to_string(), - "permissioned_accounts" => format!("{:?}", self.our_prices) - ); + self.logger, + "Exporter: Attempted to publish a price without permission, skipping"; + "unpermissioned_price_account" => key_from_id.to_string(), + "permissioned_accounts" => format!("{:?}", self.our_prices) + ); false } }) diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index c4df8b4..723a2bb 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -632,19 +632,26 @@ mod subscriber { Result, }, slog::Logger, + solana_account_decoder::UiAccountEncoding, + solana_client::{ + nonblocking::pubsub_client::PubsubClient, + rpc_config::{ + RpcAccountInfoConfig, + RpcProgramAccountsConfig, + }, + }, solana_sdk::{ account::Account, - commitment_config::CommitmentLevel, + commitment_config::{ + CommitmentConfig, + CommitmentLevel, + }, pubkey::Pubkey, }, - solana_shadow::{ - BlockchainShadow, - SyncOptions, - }, std::time::Duration, - tokio::sync::{ - broadcast, - mpsc, + tokio::{ + sync::mpsc, + time::Instant, }, }; @@ -662,9 +669,9 @@ mod subscriber { /// Commitment level used to read account data commitment: CommitmentLevel, - /// Public key of the root account to monitor. Note that all + /// Public key of the root program account to monitor. Note that all /// accounts owned by this account are also monitored. - account_key: Pubkey, + program_key: Pubkey, /// Channel on which updates are sent updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>, @@ -678,7 +685,7 @@ mod subscriber { wss_url: String, rpc_timeout: Duration, commitment: CommitmentLevel, - account_key: Pubkey, + program_key: Pubkey, updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>, logger: Logger, ) -> Self { @@ -687,62 +694,70 @@ mod subscriber { wss_url, rpc_timeout, commitment, - account_key, + program_key, updates_tx, logger, } } pub async fn run(&self) { - match self.start_shadow().await { - Ok(mut shadow_rx) => self.forward_updates(&mut shadow_rx).await, - Err(err) => { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); - } - } - } - - async fn forward_updates(&self, shadow_rx: &mut broadcast::Receiver<(Pubkey, Account)>) { loop { - if let Err(err) = self.forward_update(shadow_rx).await { + let current_time = Instant::now(); + if let Err(ref err) = self.start().await { error!(self.logger, "{}", err); debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + if current_time.elapsed() < Duration::from_secs(30) { + tracing::error!( + "Subscriber restarting too quickly. Sleeping for 1 second." + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } } } } - async fn forward_update( - &self, - shadow_rx: &mut broadcast::Receiver<(Pubkey, Account)>, - ) -> Result<()> { - self.updates_tx - .send(shadow_rx.recv().await?) + pub async fn start(&self) -> Result<()> { + debug!(self.logger, "subscribed to program account updates"; "program_key" => self.program_key.to_string()); + + let client = PubsubClient::new(self.wss_url.as_str()) .await - .map_err(|_| anyhow!("failed to forward update")) - } + .expect("failed to create pubsub client"); - pub async fn start_shadow( - &self, - ) -> Result> { - debug!(self.logger, "subscribed to account updates"; "account" => self.account_key.to_string()); - - let shadow = BlockchainShadow::new_for_program( - &self.account_key, - SyncOptions { - network: solana_shadow::Network::Custom( - self.rpc_url.clone(), - self.wss_url.clone(), - ), - commitment: self.commitment, - rpc_timeout: self.rpc_timeout, - max_lag: Some(10000), - ..SyncOptions::default() + let config = RpcProgramAccountsConfig { + account_config: RpcAccountInfoConfig { + commitment: Some(CommitmentConfig::confirmed()), + encoding: Some(UiAccountEncoding::Base64Zstd), + ..Default::default() }, - ) - .await?; + filters: None, + with_context: Some(true), + }; + + let (mut notif, _unsub) = client + .program_subscribe(&self.program_key, Some(config)) + .await?; - Ok(shadow.updates_channel()) + loop { + match tokio_stream::StreamExt::next(&mut notif).await { + Some(update) => { + let account: Account = match update.value.account.decode() { + Some(account) => account, + None => { + tracing::error!(?update, "Failed to decode account from update."); + continue; + } + }; + + self.updates_tx + .send((update.value.pubkey.as_str().try_into()?, account)) + .await + .map_err(|_| anyhow!("failed to send update to oracle"))?; + } + None => { + return Err(anyhow!("Subscriber closed connection")); + } + } + } } } }