From 325a0b92c08020487798d591b0197ee9dac5d8a4 Mon Sep 17 00:00:00 2001 From: keyvan Date: Thu, 26 Dec 2024 14:01:55 -0800 Subject: [PATCH 01/14] feat(cli): add close publisher caps and claim rewards instruction --- staking/cli/src/cli.rs | 9 +- staking/cli/src/instructions.rs | 201 +++++++++++++++++++++++++++++++- staking/cli/src/main.rs | 11 ++ 3 files changed, 216 insertions(+), 5 deletions(-) diff --git a/staking/cli/src/cli.rs b/staking/cli/src/cli.rs index 174572bd..4508d013 100644 --- a/staking/cli/src/cli.rs +++ b/staking/cli/src/cli.rs @@ -71,7 +71,7 @@ pub enum Action { )] hermes_url: String, - #[clap(long, default_value = "3u8hJUVTA4jH1wYAyUur7FFZVQ8H635K3tSHHF4ssjQ5")] + #[clap(long, default_value = "HDwcJBJXjL9FpJ7UBsYBtaDjsBUhuLCUYoz3zr8SWWaQ")] wormhole: Pubkey, }, InitializePoolRewardCustody {}, @@ -110,6 +110,13 @@ pub enum Action { publisher_caps: Pubkey, }, SaveStakeAccountsSnapshot {}, + ClaimRewards { + #[clap(long, help = "Minimum staked tokens")] + min_staked: u64, + #[clap(long, help = "Minimum reward tokens per publisher")] + min_reward: u64, + }, + CloseAllPublisherCaps {}, } pub enum SignerSource { diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index e6908dfe..07c3dc3b 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -12,7 +12,10 @@ use { TokenAccount, }, }, - base64::Engine, + base64::{ + prelude::BASE64_STANDARD, + Engine, + }, integration_tests::{ integrity_pool::pda::{ get_delegation_record_address, @@ -86,9 +89,11 @@ use { global_config::GlobalConfig, max_voter_weight_record::MAX_VOTER_WEIGHT, positions::{ + DynamicPositionArray, DynamicPositionArrayAccount, PositionData, PositionState, + Target, }, stake_account::StakeAccountMetadataV2, }, @@ -105,9 +110,12 @@ use { }, mem::size_of, }, - wormhole_core_bridge_solana::sdk::{ - WriteEncodedVaaArgs, - VAA_START, + wormhole_core_bridge_solana::{ + sdk::{ + WriteEncodedVaaArgs, + VAA_START, + }, + state::EncodedVaa, }, wormhole_sdk::vaa::{ Body, @@ -870,6 +878,191 @@ pub fn update_y(rpc_client: &RpcClient, signer: &dyn Signer, y: u64) { process_transaction(rpc_client, &[instruction], &[signer]).unwrap(); } +pub fn close_all_publisher_caps(rpc_client: &RpcClient, signer: &dyn Signer) { + let mut data = EncodedVaa::DISCRIMINATOR.to_vec(); + data.extend_from_slice(&[1]); + data.extend_from_slice(&signer.pubkey().to_bytes()); + + rpc_client + .get_program_accounts_with_config( + &publisher_caps::ID, + RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new( + 0, + MemcmpEncodedBytes::Bytes(PublisherCaps::DISCRIMINATOR.to_vec()), + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64Zstd), + data_slice: None, + commitment: None, + min_context_slot: None, + }, + with_context: None, + }, + ) + .unwrap() + .into_iter() + .for_each(|(pubkey, _account)| close_publisher_caps(rpc_client, signer, pubkey)); +} + +pub fn advance_delegation_record( + rpc_client: &RpcClient, + signer: &dyn Signer, + positions: &Pubkey, + min_reward: u64, +) { + let pool_config = get_pool_config_address(); + + let PoolConfig { + pool_data: pool_data_address, + pyth_token_mint, + .. + } = PoolConfig::try_deserialize( + &mut rpc_client + .get_account_data(&pool_config) + .unwrap() + .as_slice(), + ) + .unwrap(); + + let pool_data = PoolData::try_deserialize( + &mut rpc_client.get_account_data(&pool_data_address).unwrap()[..8 + size_of::()] + .as_ref(), + ) + .unwrap(); + + pool_data + .publishers + .iter() + .enumerate() + .for_each(|(publisher_index, publisher)| { + if *publisher == Pubkey::default() { + return; + } + let publisher_stake_account_positions = + if pool_data.publisher_stake_accounts[publisher_index] == Pubkey::default() { + None + } else { + Some(pool_data.publisher_stake_accounts[publisher_index]) + }; + + let publisher_stake_account_custody = + publisher_stake_account_positions.map(get_stake_account_custody_address); + + let accounts = integrity_pool::accounts::AdvanceDelegationRecord { + delegation_record: get_delegation_record_address(*publisher, *positions), + payer: signer.pubkey(), + pool_config, + pool_data: pool_data_address, + pool_reward_custody: get_pool_reward_custody_address(pyth_token_mint), + publisher: *publisher, + publisher_stake_account_positions, + publisher_stake_account_custody, + stake_account_positions: *positions, + stake_account_custody: get_stake_account_custody_address(*positions), + system_program: system_program::ID, + token_program: spl_token::ID, + }; + + let data = integrity_pool::instruction::AdvanceDelegationRecord {}; + + let ix = Instruction { + program_id: integrity_pool::ID, + accounts: accounts.to_account_metas(None), + data: data.data(), + }; + + + let mut transaction = + Transaction::new_with_payer(&[ix.clone()], Some(&signer.pubkey())); + transaction.sign( + &[signer], + rpc_client + .get_latest_blockhash_with_commitment(CommitmentConfig::finalized()) + .unwrap() + .0, + ); + let res = rpc_client.simulate_transaction(&transaction).unwrap(); + let reward_amount: u64 = u64::from_le_bytes( + BASE64_STANDARD + .decode(res.value.return_data.unwrap().data.0) + .unwrap()[..8] + .try_into() + .unwrap(), + ); + + if reward_amount < min_reward { + return; + } + + println!( + "Advance delegation record for pubkey: {:?} publisher: {:?} with reward: {:?}", + positions.to_string(), + publisher, + reward_amount, + ); + + process_transaction(rpc_client, &[ix], &[signer]).unwrap(); + }); +} + +pub fn claim_rewards( + rpc_client: &RpcClient, + signer: &dyn Signer, + min_staked: u64, + min_reward: u64, +) { + let mut data: Vec = rpc_client + .get_program_accounts_with_config( + &staking::ID, + RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new( + 0, + MemcmpEncodedBytes::Bytes(PositionData::discriminator().to_vec()), + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64Zstd), + data_slice: None, + commitment: None, + min_context_slot: None, + }, + with_context: None, + }, + ) + .unwrap() + .into_iter() + .map(|(pubkey, account)| DynamicPositionArrayAccount { + key: pubkey, + lamports: account.lamports, + data: account.data.clone(), + }) + .collect::>(); + + let current_epoch = get_current_epoch(rpc_client); + + let data: Vec = data + .iter_mut() + .filter_map(|positions| { + let acc = positions.to_dynamic_position_array(); + let valid = acc + .get_target_exposure(&Target::IntegrityPool, current_epoch) + .unwrap() + >= min_staked; + if valid { + Some(acc) + } else { + None + } + }) + .collect(); + + + data.iter().enumerate().for_each(|(i, positions)| { + println!("Claiming rewards for account {} / {}", i, data.len()); + advance_delegation_record(rpc_client, signer, positions.acc_info.key, min_reward); + }); +} + pub fn save_stake_accounts_snapshot(rpc_client: &RpcClient) { let data: Vec<(Pubkey, DynamicPositionArrayAccount, Pubkey, Pubkey, Pubkey)> = rpc_client .get_program_accounts_with_config( diff --git a/staking/cli/src/main.rs b/staking/cli/src/main.rs index a06c0b9f..27019b6c 100644 --- a/staking/cli/src/main.rs +++ b/staking/cli/src/main.rs @@ -8,6 +8,8 @@ use { Cli, }, instructions::{ + claim_rewards, + close_all_publisher_caps, close_publisher_caps, create_slash_event, fetch_publisher_caps_and_advance, @@ -97,5 +99,14 @@ fn main() { Action::SaveStakeAccountsSnapshot {} => { save_stake_accounts_snapshot(&rpc_client); } + Action::ClaimRewards { + min_staked, + min_reward, + } => { + claim_rewards(&rpc_client, keypair.as_ref(), min_staked, min_reward); + } + Action::CloseAllPublisherCaps {} => { + close_all_publisher_caps(&rpc_client, keypair.as_ref()); + } } } From 7b11b22a39dd49fba0af2b50d337d0faff034ccf Mon Sep 17 00:00:00 2001 From: keyvan Date: Mon, 30 Dec 2024 12:04:03 -0800 Subject: [PATCH 02/14] wip --- staking/cli/src/instructions.rs | 117 +++++++++++++++++++------------- 1 file changed, 70 insertions(+), 47 deletions(-) diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 07c3dc3b..285013d3 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -12,10 +12,7 @@ use { TokenAccount, }, }, - base64::{ - prelude::BASE64_STANDARD, - Engine, - }, + base64::Engine, integration_tests::{ integrity_pool::pda::{ get_delegation_record_address, @@ -258,7 +255,7 @@ pub fn process_transaction( rpc_client: &RpcClient, instructions: &[Instruction], signers: &[&dyn Signer], -) -> Result { +) -> Result> { let mut transaction = Transaction::new_with_payer(instructions, Some(&signers[0].pubkey())); transaction.sign( signers, @@ -283,7 +280,7 @@ pub fn process_transaction( } Err(err) => { println!("transaction err: {err:?}"); - Err(err.get_transaction_error().unwrap()) + Err(err.get_transaction_error()) } } } @@ -905,12 +902,31 @@ pub fn close_all_publisher_caps(rpc_client: &RpcClient, signer: &dyn Signer) { .for_each(|(pubkey, _account)| close_publisher_caps(rpc_client, signer, pubkey)); } +pub struct FetchError {} + +pub fn fetch_delegation_record( + rpc_client: &RpcClient, + key: &Pubkey, +) -> Result { + let delegation_record = DelegationRecord::try_deserialize( + &mut (rpc_client + .get_account_data(key) + .map_err(|_| FetchError {})? + .as_slice()), + ) + .map_err(|_| FetchError {})?; + + Ok(delegation_record) +} + pub fn advance_delegation_record( rpc_client: &RpcClient, signer: &dyn Signer, - positions: &Pubkey, + positions: &DynamicPositionArray, min_reward: u64, + current_epoch: u64, ) { + let positions_pubkey = positions.acc_info.key; let pool_config = get_pool_config_address(); let PoolConfig { @@ -949,8 +965,24 @@ pub fn advance_delegation_record( let publisher_stake_account_custody = publisher_stake_account_positions.map(get_stake_account_custody_address); + let delegation_record_pubkey = + get_delegation_record_address(*publisher, *positions_pubkey); + + let delegation_record = fetch_delegation_record(rpc_client, &delegation_record_pubkey); + + match delegation_record { + Ok(delegation_record) => { + if delegation_record.last_epoch == current_epoch { + return; + } + } + Err(_) => { + return; + } + } + let accounts = integrity_pool::accounts::AdvanceDelegationRecord { - delegation_record: get_delegation_record_address(*publisher, *positions), + delegation_record: get_delegation_record_address(*publisher, *positions_pubkey), payer: signer.pubkey(), pool_config, pool_data: pool_data_address, @@ -958,8 +990,8 @@ pub fn advance_delegation_record( publisher: *publisher, publisher_stake_account_positions, publisher_stake_account_custody, - stake_account_positions: *positions, - stake_account_custody: get_stake_account_custody_address(*positions), + stake_account_positions: *positions_pubkey, + stake_account_custody: get_stake_account_custody_address(*positions_pubkey), system_program: system_program::ID, token_program: spl_token::ID, }; @@ -973,36 +1005,16 @@ pub fn advance_delegation_record( }; - let mut transaction = - Transaction::new_with_payer(&[ix.clone()], Some(&signer.pubkey())); - transaction.sign( - &[signer], - rpc_client - .get_latest_blockhash_with_commitment(CommitmentConfig::finalized()) - .unwrap() - .0, - ); - let res = rpc_client.simulate_transaction(&transaction).unwrap(); - let reward_amount: u64 = u64::from_le_bytes( - BASE64_STANDARD - .decode(res.value.return_data.unwrap().data.0) - .unwrap()[..8] - .try_into() - .unwrap(), - ); - - if reward_amount < min_reward { - return; - } - println!( - "Advance delegation record for pubkey: {:?} publisher: {:?} with reward: {:?}", - positions.to_string(), + "Advance delegation record for pubkey: {:?} publisher: {:?}", + positions_pubkey.to_string(), publisher, - reward_amount, ); - - process_transaction(rpc_client, &[ix], &[signer]).unwrap(); + for _ in 0..10 { + if process_transaction(rpc_client, &[ix.clone()], &[signer]).is_ok() { + break; + } + } }); } @@ -1040,27 +1052,38 @@ pub fn claim_rewards( let current_epoch = get_current_epoch(rpc_client); - let data: Vec = data + let mut data: Vec<(u64, DynamicPositionArray)> = data .iter_mut() .filter_map(|positions| { let acc = positions.to_dynamic_position_array(); - let valid = acc + let exposure = acc .get_target_exposure(&Target::IntegrityPool, current_epoch) - .unwrap() - >= min_staked; - if valid { - Some(acc) + .unwrap(); + if exposure >= min_staked { + Some((exposure, acc)) } else { None } }) .collect(); + data.sort_by_key(|(exposure, _)| *exposure); + data.reverse(); + - data.iter().enumerate().for_each(|(i, positions)| { - println!("Claiming rewards for account {} / {}", i, data.len()); - advance_delegation_record(rpc_client, signer, positions.acc_info.key, min_reward); - }); + data.iter() + .enumerate() + // .skip(120) + .for_each(|(i, (exposure, positions))| { + println!( + "Claiming rewards for account ({} / {}). exposure: {}. pubkey: {:?}", + i, + data.len(), + exposure, + positions.acc_info.key + ); + advance_delegation_record(rpc_client, signer, positions, min_reward, current_epoch); + }); } pub fn save_stake_accounts_snapshot(rpc_client: &RpcClient) { From 29b4a79672ac09770058b117e996ebb7076cc1dd Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 00:08:56 +0000 Subject: [PATCH 03/14] no-verify --- staking/cli/src/instructions.rs | 28 +++++++++++++++++-- .../programs/staking/src/state/positions.rs | 4 +-- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 285013d3..9fac205a 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -91,6 +91,7 @@ use { PositionData, PositionState, Target, + TargetWithParameters, }, stake_account::StakeAccountMetadataV2, }, @@ -256,7 +257,9 @@ pub fn process_transaction( instructions: &[Instruction], signers: &[&dyn Signer], ) -> Result> { - let mut transaction = Transaction::new_with_payer(instructions, Some(&signers[0].pubkey())); + let mut instructions = instructions.to_vec(); + instructions.push(ComputeBudgetInstruction::set_compute_unit_price(1)); + let mut transaction = Transaction::new_with_payer(&instructions, Some(&signers[0].pubkey())); transaction.sign( signers, rpc_client @@ -267,7 +270,7 @@ pub fn process_transaction( let transaction_signature_res = rpc_client .send_and_confirm_transaction_with_spinner_and_config( &transaction, - CommitmentConfig::confirmed(), + CommitmentConfig::processed(), RpcSendTransactionConfig { skip_preflight: true, ..Default::default() @@ -955,6 +958,27 @@ pub fn advance_delegation_record( if *publisher == Pubkey::default() { return; } + + let publisher_exposure = { + let mut publisher_exposure = 0; + for i in 0..positions.get_position_capacity() { + if let Some(position) = positions.read_position(i).unwrap() { + if (position.target_with_parameters + == TargetWithParameters::IntegrityPool { + publisher: *publisher, + }) + { + publisher_exposure += position.amount; + } + } + } + publisher_exposure + }; + + if publisher_exposure == 0 { + return; + } + let publisher_stake_account_positions = if pool_data.publisher_stake_accounts[publisher_index] == Pubkey::default() { None diff --git a/staking/programs/staking/src/state/positions.rs b/staking/programs/staking/src/state/positions.rs index d6e9d15a..3a645798 100644 --- a/staking/programs/staking/src/state/positions.rs +++ b/staking/programs/staking/src/state/positions.rs @@ -216,9 +216,7 @@ impl<'a> DynamicPositionArray<'a> { let mut exposure: u64 = 0; for i in 0..self.get_position_capacity() { if let Some(position) = self.read_position(i)? { - if position.target_with_parameters.get_target() == *target - && position.get_current_position(current_epoch)? != PositionState::UNLOCKED - { + if position.target_with_parameters.get_target() == *target { exposure = exposure .checked_add(position.amount) .ok_or_else(|| error!(ErrorCode::GenericOverflow))?; From 83ad46aa3a798b02b1a032f1459fa8d5e5713973 Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 00:20:25 +0000 Subject: [PATCH 04/14] checkpoint --- staking/cli/src/instructions.rs | 44 +++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 9fac205a..7857ba34 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -950,13 +950,14 @@ pub fn advance_delegation_record( ) .unwrap(); - pool_data + // Collect all valid instructions + let instructions: Vec = pool_data .publishers .iter() .enumerate() - .for_each(|(publisher_index, publisher)| { + .filter_map(|(publisher_index, publisher)| { if *publisher == Pubkey::default() { - return; + return None; } let publisher_exposure = { @@ -976,7 +977,7 @@ pub fn advance_delegation_record( }; if publisher_exposure == 0 { - return; + return None; } let publisher_stake_account_positions = @@ -997,11 +998,11 @@ pub fn advance_delegation_record( match delegation_record { Ok(delegation_record) => { if delegation_record.last_epoch == current_epoch { - return; + return None; } } Err(_) => { - return; + return None; } } @@ -1022,24 +1023,29 @@ pub fn advance_delegation_record( let data = integrity_pool::instruction::AdvanceDelegationRecord {}; - let ix = Instruction { + + Some(Instruction { program_id: integrity_pool::ID, accounts: accounts.to_account_metas(None), data: data.data(), - }; + }) + }) + .collect(); + // Process all instructions in one transaction if there are any + if !instructions.is_empty() { + println!( + "Advancing delegation record for pubkey: {:?}, number of instructions: {}", + positions_pubkey.to_string(), + instructions.len(), + ); - println!( - "Advance delegation record for pubkey: {:?} publisher: {:?}", - positions_pubkey.to_string(), - publisher, - ); - for _ in 0..10 { - if process_transaction(rpc_client, &[ix.clone()], &[signer]).is_ok() { - break; - } + for _ in 0..10 { + if process_transaction(rpc_client, &instructions, &[signer]).is_ok() { + break; } - }); + } + } } pub fn claim_rewards( @@ -1097,7 +1103,7 @@ pub fn claim_rewards( data.iter() .enumerate() - // .skip(120) + .skip(96) .for_each(|(i, (exposure, positions))| { println!( "Claiming rewards for account ({} / {}). exposure: {}. pubkey: {:?}", From 7f5f6902ff0af51473d8852bb43d524cb0d84908 Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 00:23:42 +0000 Subject: [PATCH 05/14] checkpoitn --- staking/cli/src/instructions.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 7857ba34..45c88d2e 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -48,7 +48,8 @@ use { serde_wormhole::RawMessage, solana_account_decoder::UiAccountEncoding, solana_client::{ - rpc_client::RpcClient, + nonblocking::rpc_client::RpcClient, + // rpc_client::RpcClient, rpc_config::{ RpcAccountInfoConfig, RpcProgramAccountsConfig, @@ -125,6 +126,7 @@ use { }, }; + pub fn init_publisher_caps(rpc_client: &RpcClient, payer: &dyn Signer) -> Pubkey { let publisher_caps = Keypair::new(); let create_account_ix = create_account( @@ -252,7 +254,7 @@ pub fn deserialize_accumulator_update_data( } } -pub fn process_transaction( +pub async fn process_transaction( rpc_client: &RpcClient, instructions: &[Instruction], signers: &[&dyn Signer], @@ -264,6 +266,7 @@ pub fn process_transaction( signers, rpc_client .get_latest_blockhash_with_commitment(CommitmentConfig::finalized()) + .await .unwrap() .0, ); @@ -276,7 +279,7 @@ pub fn process_transaction( ..Default::default() }, ); - match transaction_signature_res { + match transaction_signature_res.await { Ok(signature) => { println!("Transaction successful : {signature:?}"); Ok(signature) From 78b070c2f15c022a20323c4e08c02b3ef1934a4a Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 00:44:38 +0000 Subject: [PATCH 06/14] checkpoint --- staking/cli/src/cli.rs | 63 -- staking/cli/src/instructions.rs | 1012 +------------------------------ staking/cli/src/main.rs | 84 +-- 3 files changed, 27 insertions(+), 1132 deletions(-) diff --git a/staking/cli/src/cli.rs b/staking/cli/src/cli.rs index 4508d013..a5d811ab 100644 --- a/staking/cli/src/cli.rs +++ b/staking/cli/src/cli.rs @@ -48,75 +48,12 @@ fn get_keypair_from_file(path: &str) -> Result { #[allow(clippy::large_enum_variant)] #[derive(Subcommand, Debug)] pub enum Action { - #[clap(about = "Initialize pool")] - InitializePool { - #[clap( - long, - help = "Keypair pool data account", - parse(try_from_str = get_keypair_from_file) - )] - pool_data_keypair: Keypair, - #[clap(long, help = "Y parameter")] - y: u64, - #[clap(long, help = "Reward program authority parameter")] - reward_program_authority: Pubkey, - #[clap(long, help = "Slash custody parameter")] - slash_custody: Pubkey, - }, - Advance { - #[clap( - long, - help = "Url of hermes to fetch publisher caps", - default_value = "https://hermes-beta.pyth.network/" - )] - hermes_url: String, - - #[clap(long, default_value = "HDwcJBJXjL9FpJ7UBsYBtaDjsBUhuLCUYoz3zr8SWWaQ")] - wormhole: Pubkey, - }, - InitializePoolRewardCustody {}, - UpdateDelegationFee { - #[clap(long, help = "New fee")] - delegation_fee: u64, - }, - SetPublisherStakeAccount { - #[clap(long, help = "Publisher")] - publisher: Pubkey, - #[clap(long, help = "Stake account positions")] - stake_account_positions: Pubkey, - }, - CreateSlashEvent { - #[clap(long, help = "Publisher")] - publisher: Pubkey, - #[clap(long, help = "Amount")] - slash_ratio: u64, - }, - UpdateRewardProgramAuthority { - #[clap(long, help = "New reward program authority")] - new_reward_program_authority: Pubkey, - }, - Slash { - #[clap(long, help = "Publisher")] - publisher: Pubkey, - #[clap(long, help = "Stake account positions")] - stake_account_positions: Pubkey, - }, - UpdateY { - #[clap(long, help = "New Y")] - y: u64, - }, - ClosePublisherCaps { - #[clap(long, help = "Publisher caps")] - publisher_caps: Pubkey, - }, - SaveStakeAccountsSnapshot {}, ClaimRewards { #[clap(long, help = "Minimum staked tokens")] min_staked: u64, #[clap(long, help = "Minimum reward tokens per publisher")] min_reward: u64, }, - CloseAllPublisherCaps {}, } pub enum SignerSource { diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 45c88d2e..070d6bea 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -127,133 +127,6 @@ use { }; -pub fn init_publisher_caps(rpc_client: &RpcClient, payer: &dyn Signer) -> Pubkey { - let publisher_caps = Keypair::new(); - let create_account_ix = create_account( - &payer.pubkey(), - &publisher_caps.pubkey(), - rpc_client - .get_minimum_balance_for_rent_exemption(PublisherCaps::LEN) - .unwrap(), - PublisherCaps::LEN.try_into().unwrap(), - &publisher_caps::ID, - ); - - let accounts = publisher_caps::accounts::InitPublisherCaps { - signer: payer.pubkey(), - publisher_caps: publisher_caps.pubkey(), - }; - - let instruction_data = publisher_caps::instruction::InitPublisherCaps {}; - - let instruction = Instruction { - program_id: publisher_caps::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction( - rpc_client, - &[ - create_account_ix, - instruction, - ComputeBudgetInstruction::set_compute_unit_limit(1_400_000), - ], - &[payer, &publisher_caps], - ) - .unwrap(); - publisher_caps.pubkey() -} - -pub fn write_publisher_caps( - rpc_client: &RpcClient, - payer: &dyn Signer, - publisher_caps: Pubkey, - index: usize, - chunk: &[u8], -) { - let accounts = publisher_caps::accounts::WritePublisherCaps { - write_authority: payer.pubkey(), - publisher_caps, - }; - - let instruction_data = publisher_caps::instruction::WritePublisherCaps { - index: index.try_into().unwrap(), - data: chunk.to_vec(), - }; - - let instruction = Instruction { - program_id: publisher_caps::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction(rpc_client, &[instruction], &[payer]).unwrap(); -} - -pub fn close_publisher_caps(rpc_client: &RpcClient, payer: &dyn Signer, publisher_caps: Pubkey) { - let accounts = publisher_caps::accounts::ClosePublisherCaps { - write_authority: payer.pubkey(), - publisher_caps, - }; - - let instruction_data = publisher_caps::instruction::ClosePublisherCaps {}; - - let instruction = Instruction { - program_id: publisher_caps::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction(rpc_client, &[instruction], &[payer]).unwrap(); -} - - -pub fn verify_publisher_caps( - rpc_client: &RpcClient, - payer: &dyn Signer, - publisher_caps: Pubkey, - encoded_vaa: Pubkey, - merkle_proofs: Vec, -) { - let accounts = publisher_caps::accounts::VerifyPublisherCaps { - signer: payer.pubkey(), - publisher_caps, - encoded_vaa, - }; - - let instruction_data = publisher_caps::instruction::VerifyPublisherCaps { - proof: merkle_proofs[0].proof.to_vec(), - }; - - let instruction = Instruction { - program_id: publisher_caps::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction( - rpc_client, - &[ - instruction, - ComputeBudgetInstruction::set_compute_unit_limit(1_400_000), - ], - &[payer], - ) - .unwrap(); -} - -pub fn deserialize_accumulator_update_data( - accumulator_message: Vec, -) -> (Vec, Vec) { - let accumulator_update_data = - AccumulatorUpdateData::try_from_slice(accumulator_message.as_slice()).unwrap(); - - match accumulator_update_data.proof { - Proof::WormholeMerkle { vaa, updates } => return (vaa.as_ref().to_vec(), updates), - } -} - pub async fn process_transaction( rpc_client: &RpcClient, instructions: &[Instruction], @@ -291,632 +164,27 @@ pub async fn process_transaction( } } -pub fn process_write_encoded_vaa( - rpc_client: &RpcClient, - vaa: &[u8], - wormhole: Pubkey, - payer: &dyn Signer, -) -> Pubkey { - let encoded_vaa_keypair = Keypair::new(); - let encoded_vaa_size: usize = vaa.len() + VAA_START; - - let create_encoded_vaa = system_instruction::create_account( - &payer.pubkey(), - &encoded_vaa_keypair.pubkey(), - Rent::default().minimum_balance(encoded_vaa_size), - encoded_vaa_size as u64, - &wormhole, - ); - let init_encoded_vaa_accounts = wormhole_core_bridge_solana::accounts::InitEncodedVaa { - write_authority: payer.pubkey(), - encoded_vaa: encoded_vaa_keypair.pubkey(), - } - .to_account_metas(None); - - let init_encoded_vaa_instruction = Instruction { - program_id: wormhole, - accounts: init_encoded_vaa_accounts, - data: wormhole_core_bridge_solana::instruction::InitEncodedVaa.data(), - }; - - process_transaction( - rpc_client, - &[create_encoded_vaa, init_encoded_vaa_instruction], - &[payer, &encoded_vaa_keypair], - ) - .unwrap(); - - for i in (0..vaa.len()).step_by(1000) { - let chunk = &vaa[i..min(i + 1000, vaa.len())]; - - write_encoded_vaa( - rpc_client, - payer, - &encoded_vaa_keypair.pubkey(), - &wormhole, - i, - chunk, - ); - } - - let (header, _): (Header, Body<&RawMessage>) = serde_wormhole::from_slice(vaa).unwrap(); - let guardian_set = GuardianSet::key(&wormhole, header.guardian_set_index); - - let request_compute_units_instruction: Instruction = - ComputeBudgetInstruction::set_compute_unit_limit(600_000); - - let verify_encoded_vaa_accounts = wormhole_core_bridge_solana::accounts::VerifyEncodedVaaV1 { - guardian_set, - write_authority: payer.pubkey(), - draft_vaa: encoded_vaa_keypair.pubkey(), - } - .to_account_metas(None); - - let verify_encoded_vaa_instruction = Instruction { - program_id: wormhole, - accounts: verify_encoded_vaa_accounts, - data: wormhole_core_bridge_solana::instruction::VerifyEncodedVaaV1 {}.data(), - }; - - process_transaction( - rpc_client, - &[ - verify_encoded_vaa_instruction, - request_compute_units_instruction, - ], - &[payer], - ) - .unwrap(); - - - encoded_vaa_keypair.pubkey() -} - -pub fn write_encoded_vaa( - rpc_client: &RpcClient, - payer: &dyn Signer, - encoded_vaa: &Pubkey, - wormhole: &Pubkey, - index: usize, - chunk: &[u8], -) { - let write_encoded_vaa_accounts = wormhole_core_bridge_solana::accounts::WriteEncodedVaa { - write_authority: payer.pubkey(), - draft_vaa: *encoded_vaa, - } - .to_account_metas(None); - - let write_encoded_vaa_accounts_instruction = Instruction { - program_id: *wormhole, - accounts: write_encoded_vaa_accounts.clone(), - data: wormhole_core_bridge_solana::instruction::WriteEncodedVaa { - args: WriteEncodedVaaArgs { - index: index as u32, - data: chunk.to_vec(), - }, - } - .data(), - }; - - process_transaction( - rpc_client, - &[write_encoded_vaa_accounts_instruction], - &[payer], - ) - .unwrap(); -} - -pub fn close_encoded_vaa( - rpc_client: &RpcClient, - payer: &dyn Signer, - encoded_vaa: Pubkey, - wormhole: &Pubkey, -) { - let close_encoded_vaa_accounts = wormhole_core_bridge_solana::accounts::CloseEncodedVaa { - write_authority: payer.pubkey(), - encoded_vaa, - } - .to_account_metas(None); - - let close_encoded_vaa_instruction = Instruction { - program_id: *wormhole, - accounts: close_encoded_vaa_accounts, - data: wormhole_core_bridge_solana::instruction::CloseEncodedVaa {}.data(), - }; - - process_transaction(rpc_client, &[close_encoded_vaa_instruction], &[payer]).unwrap(); -} - -pub fn initialize_reward_custody(rpc_client: &RpcClient, payer: &dyn Signer) { - let pool_config = get_pool_config_address(); - - let PoolConfig { - pyth_token_mint, .. - } = PoolConfig::try_deserialize( - &mut rpc_client - .get_account_data(&pool_config) - .unwrap() - .as_slice(), - ) - .unwrap(); - - let create_ata_ix = spl_associated_token_account::instruction::create_associated_token_account( - &payer.pubkey(), - &pool_config, - &pyth_token_mint, - &spl_token::ID, - ); - - process_transaction(rpc_client, &[create_ata_ix], &[payer]).unwrap(); -} - -pub fn advance(rpc_client: &RpcClient, payer: &dyn Signer, publisher_caps: Pubkey) { - let pool_config = get_pool_config_address(); - - let PoolConfig { - pool_data, - pyth_token_mint, - .. - } = PoolConfig::try_deserialize( - &mut rpc_client - .get_account_data(&pool_config) - .unwrap() - .as_slice(), - ) - .unwrap(); - - let pool_reward_custody = get_pool_reward_custody_address(pyth_token_mint); - - let accounts = integrity_pool::accounts::Advance { - signer: payer.pubkey(), - pool_config, - publisher_caps, - pool_data, - pool_reward_custody, - }; - - let instruction_data = integrity_pool::instruction::Advance {}; - - let instruction = Instruction { - program_id: integrity_pool::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction( - rpc_client, - &[ - instruction, - ComputeBudgetInstruction::set_compute_unit_limit(1_400_000), - ], - &[payer], - ) - .unwrap(); -} - -pub fn initialize_pool( - rpc_client: &RpcClient, - payer: &dyn Signer, - pool_data_keypair: &Keypair, - reward_program_authority: Pubkey, - y: u64, - slash_custody: Pubkey, -) { - let pool_data_space: u64 = PoolData::LEN.try_into().unwrap(); - let config_address = get_config_address(); - - let rent = rpc_client - .get_minimum_balance_for_rent_exemption(pool_data_space.try_into().unwrap()) - .unwrap(); - - let create_pool_data_acc_ix = create_account( - &payer.pubkey(), - &pool_data_keypair.pubkey(), - rent, - pool_data_space, - &integrity_pool::ID, - ); - - let pool_config_pubkey = get_pool_config_address(); - - let initialize_pool_data = integrity_pool::instruction::InitializePool { - reward_program_authority, - y, - }; - - let initialize_pool_accs = integrity_pool::accounts::InitializePool { - payer: payer.pubkey(), - pool_data: pool_data_keypair.pubkey(), - pool_config: pool_config_pubkey, - config_account: config_address, - slash_custody, - system_program: system_program::ID, - }; - - let initialize_pool_ix = Instruction::new_with_bytes( - integrity_pool::ID, - &initialize_pool_data.data(), - initialize_pool_accs.to_account_metas(None), - ); - - - process_transaction( - rpc_client, - &[create_pool_data_acc_ix, initialize_pool_ix], - &[payer, pool_data_keypair], - ) - .unwrap(); -} - -pub fn get_current_time(rpc_client: &RpcClient) -> i64 { - let slot = rpc_client.get_slot().unwrap(); - rpc_client.get_block_time(slot).unwrap() +pub async fn get_current_time(rpc_client: &RpcClient) -> i64 { + let slot = rpc_client.get_slot().await.unwrap(); + rpc_client.get_block_time(slot).await.unwrap() } -pub fn get_current_epoch(rpc_client: &RpcClient) -> u64 { - let slot = rpc_client.get_slot().unwrap(); - let blocktime = rpc_client.get_block_time(slot).unwrap(); +pub async fn get_current_epoch(rpc_client: &RpcClient) -> u64 { + let slot = rpc_client.get_slot().await.unwrap(); + let blocktime = rpc_client.get_block_time(slot).await.unwrap(); blocktime as u64 / EPOCH_DURATION } -pub fn fetch_publisher_caps_and_advance( - rpc_client: &RpcClient, - payer: &dyn Signer, - wormhole: Pubkey, - hermes_url: String, -) { - let pool_config = get_pool_config_address(); - - let PoolConfig { - pool_data: pool_data_address, - .. - } = PoolConfig::try_deserialize( - &mut rpc_client - .get_account_data(&pool_config) - .unwrap() - .as_slice(), - ) - .unwrap(); - - let pool_data = PoolData::try_deserialize( - &mut rpc_client.get_account_data(&pool_data_address).unwrap()[..8 + size_of::()] - .as_ref(), - ) - .unwrap(); - - if pool_data.last_updated_epoch == get_current_epoch(rpc_client) { - println!("Pool data is already updated"); - return; - } - - let client = Client::new(); - let response = client - .get(format!( - "{}v2/updates/publisher_stake_caps/latest?encoding=base64", - hermes_url - )) - .send() - .unwrap(); - - let json: serde_json::Value = response.json().unwrap(); - let encoded_message = json["binary"]["data"][0].as_str().unwrap(); - - //decode tmp from base64 - let message = base64::prelude::BASE64_STANDARD - .decode(encoded_message) - .unwrap(); - - let (vaa, merkle_proofs) = deserialize_accumulator_update_data(message); - - - let encoded_vaa = process_write_encoded_vaa(rpc_client, vaa.as_slice(), wormhole, payer); - - - let publisher_caps = init_publisher_caps(rpc_client, payer); - - - let publisher_caps_message_bytes = - Vec::::from(merkle_proofs.first().unwrap().message.clone()); - - - for i in (0..publisher_caps_message_bytes.len()).step_by(1000) { - let chunk = - &publisher_caps_message_bytes[i..min(i + 1000, publisher_caps_message_bytes.len())]; - - write_publisher_caps(rpc_client, payer, publisher_caps, i, chunk); - } - - verify_publisher_caps( - rpc_client, - payer, - publisher_caps, - encoded_vaa, - merkle_proofs, - ); - - - println!( - "Initialized publisher caps with pubkey : {:?}", - publisher_caps - ); - - advance(rpc_client, payer, publisher_caps); - close_publisher_caps(rpc_client, payer, publisher_caps); - close_encoded_vaa(rpc_client, payer, encoded_vaa, &wormhole); -} - -pub fn update_delegation_fee(rpc_client: &RpcClient, payer: &dyn Signer, delegation_fee: u64) { - let pool_config = get_pool_config_address(); - - let PoolConfig { pool_data, .. } = PoolConfig::try_deserialize( - &mut rpc_client - .get_account_data(&pool_config) - .unwrap() - .as_slice(), - ) - .unwrap(); - - let accounts = integrity_pool::accounts::UpdateDelegationFee { - reward_program_authority: payer.pubkey(), - pool_config, - pool_data, - system_program: system_program::ID, - }; - - let instruction_data = integrity_pool::instruction::UpdateDelegationFee { delegation_fee }; - - let instruction = Instruction { - program_id: integrity_pool::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction(rpc_client, &[instruction], &[payer]).unwrap(); -} - -pub fn set_publisher_stake_account( - rpc_client: &RpcClient, - signer: &dyn Signer, - publisher: &Pubkey, - stake_account_positions: &Pubkey, -) { - let pool_config = get_pool_config_address(); - - let PoolConfig { pool_data, .. } = PoolConfig::try_deserialize( - &mut rpc_client - .get_account_data(&pool_config) - .unwrap() - .as_slice(), - ) - .unwrap(); - - let accounts = integrity_pool::accounts::SetPublisherStakeAccount { - signer: signer.pubkey(), - publisher: *publisher, - current_stake_account_positions_option: None, - new_stake_account_positions_option: Some(*stake_account_positions), - pool_config, - pool_data, - }; - - let instruction_data = integrity_pool::instruction::SetPublisherStakeAccount {}; - - let instruction = Instruction { - program_id: integrity_pool::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction(rpc_client, &[instruction], &[signer]).unwrap(); -} - -pub fn create_slash_event( - rpc_client: &RpcClient, - signer: &dyn Signer, - publisher: &Pubkey, - slash_ratio: u64, -) { - let pool_config = get_pool_config_address(); - - let PoolConfig { - pool_data: pool_data_address, - slash_custody, - .. - } = PoolConfig::try_deserialize( - &mut rpc_client - .get_account_data(&pool_config) - .unwrap() - .as_slice(), - ) - .unwrap(); - - let pool_data = PoolData::try_deserialize( - &mut rpc_client.get_account_data(&pool_data_address).unwrap()[..8 + size_of::()] - .as_ref(), - ) - .unwrap(); - - let publisher_index = pool_data.get_publisher_index(publisher).unwrap(); - let index = pool_data.num_slash_events[publisher_index]; - - let accounts = integrity_pool::accounts::CreateSlashEvent { - payer: signer.pubkey(), - reward_program_authority: signer.pubkey(), - publisher: *publisher, - slash_custody, - pool_config, - pool_data: pool_data_address, - slash_event: get_slash_event_address(index, *publisher), - system_program: system_program::ID, - }; - - let instruction_data = integrity_pool::instruction::CreateSlashEvent { index, slash_ratio }; - - let instruction = Instruction { - program_id: integrity_pool::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction(rpc_client, &[instruction], &[signer]).unwrap(); -} - -pub fn update_reward_program_authority( - rpc_client: &RpcClient, - signer: &dyn Signer, - new_reward_program_authority: &Pubkey, -) { - let pool_config = get_pool_config_address(); - - let accounts = integrity_pool::accounts::UpdateRewardProgramAuthority { - reward_program_authority: signer.pubkey(), - pool_config, - system_program: system_program::ID, - }; - - let instruction_data = integrity_pool::instruction::UpdateRewardProgramAuthority { - reward_program_authority: *new_reward_program_authority, - }; - - let instruction = Instruction { - program_id: integrity_pool::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction(rpc_client, &[instruction], &[signer]).unwrap(); -} - -pub fn slash( - rpc_client: &RpcClient, - signer: &dyn Signer, - publisher: &Pubkey, - stake_account_positions: &Pubkey, -) { - let pool_config = get_pool_config_address(); - let PoolConfig { - pool_data, - slash_custody, - .. - } = PoolConfig::try_deserialize( - &mut rpc_client - .get_account_data(&pool_config) - .unwrap() - .as_slice(), - ) - .unwrap(); - - let delegation_record = get_delegation_record_address(*publisher, *stake_account_positions); - let DelegationRecord { - next_slash_event_index, - .. - } = { - let delegation_record_account_data = rpc_client.get_account_data(&delegation_record); - if let Ok(data) = delegation_record_account_data { - DelegationRecord::try_deserialize(&mut data.as_slice()).unwrap() - } else { - DelegationRecord { - last_epoch: 0, - next_slash_event_index: 0, - } - } - }; - - - let stake_account_metadata = get_stake_account_metadata_address(*stake_account_positions); - let stake_account_custody = get_stake_account_custody_address(*stake_account_positions); - let custody_authority = get_stake_account_custody_authority_address(*stake_account_positions); - let config_account = get_config_address(); - let governance_target_account = get_target_address(); - - - let accounts = integrity_pool::accounts::Slash { - signer: signer.pubkey(), - pool_data, - pool_config, - slash_event: get_slash_event_address(next_slash_event_index, *publisher), - delegation_record, - publisher: *publisher, - stake_account_positions: *stake_account_positions, - stake_account_metadata, - stake_account_custody, - config_account, - governance_target_account, - slash_custody, - custody_authority, - staking_program: staking::ID, - token_program: spl_token::ID, - }; - - let instruction_data = integrity_pool::instruction::Slash { - index: next_slash_event_index, - }; - - let instruction = Instruction { - program_id: integrity_pool::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction(rpc_client, &[instruction], &[signer]).unwrap(); -} - -pub fn update_y(rpc_client: &RpcClient, signer: &dyn Signer, y: u64) { - let pool_config = get_pool_config_address(); - - let accounts = integrity_pool::accounts::UpdateY { - reward_program_authority: signer.pubkey(), - pool_config, - system_program: system_program::ID, - }; - - let instruction_data = integrity_pool::instruction::UpdateY { y }; - - let instruction = Instruction { - program_id: integrity_pool::ID, - accounts: accounts.to_account_metas(None), - data: instruction_data.data(), - }; - - process_transaction(rpc_client, &[instruction], &[signer]).unwrap(); -} - -pub fn close_all_publisher_caps(rpc_client: &RpcClient, signer: &dyn Signer) { - let mut data = EncodedVaa::DISCRIMINATOR.to_vec(); - data.extend_from_slice(&[1]); - data.extend_from_slice(&signer.pubkey().to_bytes()); - - rpc_client - .get_program_accounts_with_config( - &publisher_caps::ID, - RpcProgramAccountsConfig { - filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new( - 0, - MemcmpEncodedBytes::Bytes(PublisherCaps::DISCRIMINATOR.to_vec()), - ))]), - account_config: RpcAccountInfoConfig { - encoding: Some(UiAccountEncoding::Base64Zstd), - data_slice: None, - commitment: None, - min_context_slot: None, - }, - with_context: None, - }, - ) - .unwrap() - .into_iter() - .for_each(|(pubkey, _account)| close_publisher_caps(rpc_client, signer, pubkey)); -} - pub struct FetchError {} -pub fn fetch_delegation_record( +pub async fn fetch_delegation_record( rpc_client: &RpcClient, key: &Pubkey, ) -> Result { let delegation_record = DelegationRecord::try_deserialize( &mut (rpc_client .get_account_data(key) + .await .map_err(|_| FetchError {})? .as_slice()), ) @@ -925,10 +193,10 @@ pub fn fetch_delegation_record( Ok(delegation_record) } -pub fn advance_delegation_record( +pub async fn advance_delegation_record<'a>( rpc_client: &RpcClient, signer: &dyn Signer, - positions: &DynamicPositionArray, + positions: &DynamicPositionArray<'a>, min_reward: u64, current_epoch: u64, ) { @@ -942,14 +210,18 @@ pub fn advance_delegation_record( } = PoolConfig::try_deserialize( &mut rpc_client .get_account_data(&pool_config) + .await .unwrap() .as_slice(), ) .unwrap(); let pool_data = PoolData::try_deserialize( - &mut rpc_client.get_account_data(&pool_data_address).unwrap()[..8 + size_of::()] - .as_ref(), + &mut &rpc_client + .get_account_data(&pool_data_address) + .await + .unwrap() + .as_slice()[..8 + size_of::()], ) .unwrap(); @@ -996,7 +268,8 @@ pub fn advance_delegation_record( let delegation_record_pubkey = get_delegation_record_address(*publisher, *positions_pubkey); - let delegation_record = fetch_delegation_record(rpc_client, &delegation_record_pubkey); + let delegation_record = + fetch_delegation_record(rpc_client, &delegation_record_pubkey).await; match delegation_record { Ok(delegation_record) => { @@ -1044,14 +317,17 @@ pub fn advance_delegation_record( ); for _ in 0..10 { - if process_transaction(rpc_client, &instructions, &[signer]).is_ok() { + if process_transaction(rpc_client, &instructions, &[signer]) + .await + .is_ok() + { break; } } } } -pub fn claim_rewards( +pub async fn claim_rewards( rpc_client: &RpcClient, signer: &dyn Signer, min_staked: u64, @@ -1074,6 +350,7 @@ pub fn claim_rewards( with_context: None, }, ) + .await .unwrap() .into_iter() .map(|(pubkey, account)| DynamicPositionArrayAccount { @@ -1083,7 +360,7 @@ pub fn claim_rewards( }) .collect::>(); - let current_epoch = get_current_epoch(rpc_client); + let current_epoch = get_current_epoch(rpc_client).await; let mut data: Vec<(u64, DynamicPositionArray)> = data .iter_mut() @@ -1118,242 +395,3 @@ pub fn claim_rewards( advance_delegation_record(rpc_client, signer, positions, min_reward, current_epoch); }); } - -pub fn save_stake_accounts_snapshot(rpc_client: &RpcClient) { - let data: Vec<(Pubkey, DynamicPositionArrayAccount, Pubkey, Pubkey, Pubkey)> = rpc_client - .get_program_accounts_with_config( - &staking::ID, - RpcProgramAccountsConfig { - filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new( - 0, - MemcmpEncodedBytes::Bytes(PositionData::discriminator().to_vec()), - ))]), - account_config: RpcAccountInfoConfig { - encoding: Some(UiAccountEncoding::Base64Zstd), - data_slice: None, - commitment: None, - min_context_slot: None, - }, - with_context: None, - }, - ) - .unwrap() - .into_iter() - .map(|(pubkey, account)| { - ( - pubkey, - DynamicPositionArrayAccount { - key: pubkey, - lamports: account.lamports, - data: account.data.clone(), - }, - get_stake_account_metadata_address(pubkey), - get_stake_account_custody_address(pubkey), - get_stake_account_custody_authority_address(pubkey), - ) - }) - .collect::>(); - - let metadata_accounts_data = rpc_client - .get_program_accounts_with_config( - &staking::ID, - RpcProgramAccountsConfig { - filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new( - 0, - MemcmpEncodedBytes::Bytes(StakeAccountMetadataV2::discriminator().to_vec()), - ))]), - account_config: RpcAccountInfoConfig { - encoding: Some(UiAccountEncoding::Base64Zstd), - data_slice: None, - commitment: None, - min_context_slot: None, - }, - with_context: None, - }, - ) - .unwrap() - .into_iter() - .map(|(pubkey, account)| { - ( - pubkey, - StakeAccountMetadataV2::try_deserialize(&mut account.data.as_slice()).unwrap(), - ) - }) - .collect::>(); - - let config = GlobalConfig::try_deserialize( - &mut rpc_client - .get_account_data(&get_config_address()) - .unwrap() - .as_slice(), - ) - .unwrap(); - let current_time = get_current_time(rpc_client); - - let metadata_account_data_locked: HashMap = metadata_accounts_data - .iter() - .map(|(pubkey, metadata)| { - ( - *pubkey, - metadata - .lock - .get_unvested_balance(current_time, config.pyth_token_list_time) - .unwrap(), - ) - }) - .collect::>(); - - let data = data - .into_iter() - .map( - |(pubkey, account, metadata_pubkey, custody_pubkey, custody_authority_pubkey)| { - ( - pubkey, - account, - metadata_pubkey, - custody_pubkey, - custody_authority_pubkey, - *metadata_account_data_locked.get(&metadata_pubkey).unwrap(), - ) - }, - ) - .collect::>(); - - // We need to check the actual tokens accounts, since you can initialize a stake account with an - // arbitrary vesting schedule but 0 tokens - let locked_token_accounts_pubkeys = data - .iter() - .filter(|(_, _, _, _, _, locked_amount)| *locked_amount > 0u64) - .map(|(_, _, _, token_account_pubkey, _, _)| *token_account_pubkey) - .collect::>(); - - let mut locked_token_accounts_actual_amounts: HashMap = HashMap::new(); - for chunk in locked_token_accounts_pubkeys.chunks(100) { - rpc_client - .get_multiple_accounts(chunk) - .unwrap() - .into_iter() - .enumerate() - .for_each(|(index, account)| { - locked_token_accounts_actual_amounts.insert( - chunk[index], - TokenAccount::try_deserialize(&mut account.unwrap().data.as_slice()) - .unwrap() - .amount, - ); - }); - } - - let data = data - .into_iter() - .map( - |( - pubkey, - account, - metadata_pubkey, - custody_pubkey, - custody_authority_pubkey, - locked_amount, - )| { - ( - pubkey, - account, - metadata_pubkey, - custody_pubkey, - custody_authority_pubkey, - min( - locked_amount, - *locked_token_accounts_actual_amounts - .get(&custody_pubkey) - .unwrap_or(&0u64), - ), - ) - }, - ) - .collect::>(); - - let current_epoch = get_current_epoch(rpc_client); - let data = data - .into_iter() - .map( - |( - pubkey, - mut account, - metadata_pubkey, - custody_pubkey, - custody_authority_pubkey, - locked_amount, - )| { - let dynamic_position_array = account.to_dynamic_position_array(); - let owner = dynamic_position_array.owner().unwrap(); - let staked_in_governance = compute_voter_weight( - &dynamic_position_array, - current_epoch, - MAX_VOTER_WEIGHT, - MAX_VOTER_WEIGHT, - ) - .unwrap(); - - let staked_in_ois = { - let mut amount = 0u64; - for i in 0..dynamic_position_array.get_position_capacity() { - if let Some(position) = dynamic_position_array.read_position(i).unwrap() { - match position.get_current_position(current_epoch).unwrap() { - PositionState::LOCKED | PositionState::PREUNLOCKING => { - if !position.is_voting() { - amount += position.amount; - } - } - _ => {} - } - } - } - amount - }; - ( - pubkey, - metadata_pubkey, - custody_pubkey, - custody_authority_pubkey, - owner, - locked_amount, - staked_in_governance, - staked_in_ois, - ) - }, - ) - .collect::>(); - - let timestamp = chrono::Utc::now().format("%Y-%m-%d_%H:%M:%S").to_string(); - let file = File::create(format!("snapshots/snapshot-{}.csv", timestamp)).unwrap(); - let mut writer = BufWriter::new(file); - - // Write the header - writeln!(writer, "positions_pubkey,metadata_pubkey,custody_pubkey,custody_authority_pubkey,owner,locked_amount,staked_in_governance,staked_in_ois").unwrap(); - // Write the data - for ( - pubkey, - metadata_pubkey, - custody_pubkey, - custody_authority_pubkey, - owner, - locked_amount, - staked_in_governance, - staked_in_ois, - ) in data - { - writeln!( - writer, - "{},{},{},{},{},{},{},{}", - pubkey, - metadata_pubkey, - custody_pubkey, - custody_authority_pubkey, - owner, - locked_amount, - staked_in_governance, - staked_in_ois - ) - .unwrap(); - } -} diff --git a/staking/cli/src/main.rs b/staking/cli/src/main.rs index 27019b6c..7193d7a1 100644 --- a/staking/cli/src/main.rs +++ b/staking/cli/src/main.rs @@ -7,22 +7,8 @@ use { Action, Cli, }, - instructions::{ - claim_rewards, - close_all_publisher_caps, - close_publisher_caps, - create_slash_event, - fetch_publisher_caps_and_advance, - initialize_pool, - initialize_reward_custody, - save_stake_accounts_snapshot, - set_publisher_stake_account, - slash, - update_delegation_fee, - update_reward_program_authority, - update_y, - }, - solana_client::rpc_client::RpcClient, + instructions::claim_rewards, + solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::commitment_config::CommitmentConfig, }; @@ -36,77 +22,11 @@ fn main() { let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); match action { - Action::InitializePool { - pool_data_keypair, - reward_program_authority, - y, - slash_custody, - } => { - initialize_pool( - &rpc_client, - keypair.as_ref(), - &pool_data_keypair, - reward_program_authority, - y, - slash_custody, - ); - } - Action::Advance { - hermes_url, - wormhole, - } => { - fetch_publisher_caps_and_advance(&rpc_client, keypair.as_ref(), wormhole, hermes_url); - } - Action::InitializePoolRewardCustody {} => { - initialize_reward_custody(&rpc_client, keypair.as_ref()); - } - Action::UpdateDelegationFee { delegation_fee } => { - update_delegation_fee(&rpc_client, keypair.as_ref(), delegation_fee) - } - Action::SetPublisherStakeAccount { - publisher, - stake_account_positions, - } => set_publisher_stake_account( - &rpc_client, - keypair.as_ref(), - &publisher, - &stake_account_positions, - ), - Action::CreateSlashEvent { - publisher, - slash_ratio, - } => create_slash_event(&rpc_client, keypair.as_ref(), &publisher, slash_ratio), - Action::UpdateRewardProgramAuthority { - new_reward_program_authority, - } => update_reward_program_authority( - &rpc_client, - keypair.as_ref(), - &new_reward_program_authority, - ), - Action::Slash { - publisher, - stake_account_positions, - } => slash( - &rpc_client, - keypair.as_ref(), - &publisher, - &stake_account_positions, - ), - Action::UpdateY { y } => update_y(&rpc_client, keypair.as_ref(), y), - Action::ClosePublisherCaps { publisher_caps } => { - close_publisher_caps(&rpc_client, keypair.as_ref(), publisher_caps) - } - Action::SaveStakeAccountsSnapshot {} => { - save_stake_accounts_snapshot(&rpc_client); - } Action::ClaimRewards { min_staked, min_reward, } => { claim_rewards(&rpc_client, keypair.as_ref(), min_staked, min_reward); } - Action::CloseAllPublisherCaps {} => { - close_all_publisher_caps(&rpc_client, keypair.as_ref()); - } } } From 5f17f69c1f4b416c186a12738b700a14fed91726 Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 00:46:17 +0000 Subject: [PATCH 07/14] checkpoint --- staking/Cargo.lock | 43 +++++++++++++++++++++-------------------- staking/cli/Cargo.toml | 1 + staking/cli/src/main.rs | 9 ++++++--- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/staking/Cargo.lock b/staking/Cargo.lock index 97cef11a..0e4341b4 100644 --- a/staking/Cargo.lock +++ b/staking/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -1643,9 +1643,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1658,9 +1658,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1668,15 +1668,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1685,15 +1685,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1702,21 +1702,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -5281,6 +5281,7 @@ dependencies = [ "byteorder", "chrono", "clap 3.2.25", + "futures", "integration-tests", "integrity-pool", "publisher-caps", @@ -6216,7 +6217,7 @@ dependencies = [ [[package]] name = "wormhole-core" version = "0.1.0" -source = "git+https://github.com/guibescos/wormhole?branch=reisen/sdk-solana#b0da20525fe68408ed2c8b331eb5f63101381936" +source = "git+https://github.com/guibescos/wormhole?branch=reisen%2Fsdk-solana#b0da20525fe68408ed2c8b331eb5f63101381936" dependencies = [ "borsh 0.9.3", "bstr 0.2.17", @@ -6268,7 +6269,7 @@ checksum = "045a3cc929189ffc0df110e4dc04421ed3226543f47a302088e6175b97b7d75f" [[package]] name = "wormhole-solana" version = "0.1.0" -source = "git+https://github.com/guibescos/wormhole?branch=reisen/sdk-solana#b0da20525fe68408ed2c8b331eb5f63101381936" +source = "git+https://github.com/guibescos/wormhole?branch=reisen%2Fsdk-solana#b0da20525fe68408ed2c8b331eb5f63101381936" dependencies = [ "borsh 0.9.3", "bstr 0.2.17", diff --git a/staking/cli/Cargo.toml b/staking/cli/Cargo.toml index 9a3e3cf8..f8aa0c9a 100644 --- a/staking/cli/Cargo.toml +++ b/staking/cli/Cargo.toml @@ -27,3 +27,4 @@ uriparse = "0.6.4" solana-remote-wallet = "1.18.16" solana-account-decoder = "1.18.16" chrono = "0.4.38" +futures = "0.3.31" diff --git a/staking/cli/src/main.rs b/staking/cli/src/main.rs index 7193d7a1..7aa816ab 100644 --- a/staking/cli/src/main.rs +++ b/staking/cli/src/main.rs @@ -25,8 +25,11 @@ fn main() { Action::ClaimRewards { min_staked, min_reward, - } => { - claim_rewards(&rpc_client, keypair.as_ref(), min_staked, min_reward); - } + } => futures::executor::block_on(claim_rewards( + &rpc_client, + keypair.as_ref(), + min_staked, + min_reward, + )), } } From 3634e9426671c18995881bfd1407570c2da04400 Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 00:57:07 +0000 Subject: [PATCH 08/14] checkpoint --- staking/Cargo.lock | 5 +- staking/cli/Cargo.toml | 1 + staking/cli/src/instructions.rs | 138 +++++++++++++++++++------------- staking/cli/src/main.rs | 11 +-- 4 files changed, 89 insertions(+), 66 deletions(-) diff --git a/staking/Cargo.lock b/staking/Cargo.lock index 0e4341b4..25b94ffe 100644 --- a/staking/Cargo.lock +++ b/staking/Cargo.lock @@ -5295,6 +5295,7 @@ dependencies = [ "solana-client", "solana-remote-wallet", "solana-sdk", + "tokio", "uriparse", "wormhole-core-bridge-solana", "wormhole-solana", @@ -5528,9 +5529,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.40.0" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" dependencies = [ "backtrace", "bytes", diff --git a/staking/cli/Cargo.toml b/staking/cli/Cargo.toml index f8aa0c9a..2a113eec 100644 --- a/staking/cli/Cargo.toml +++ b/staking/cli/Cargo.toml @@ -28,3 +28,4 @@ solana-remote-wallet = "1.18.16" solana-account-decoder = "1.18.16" chrono = "0.4.38" futures = "0.3.31" +tokio = "1.42.0" diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 070d6bea..3c750adc 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -13,6 +13,7 @@ use { }, }, base64::Engine, + futures::future::join_all, integration_tests::{ integrity_pool::pda::{ get_delegation_record_address, @@ -179,11 +180,11 @@ pub struct FetchError {} pub async fn fetch_delegation_record( rpc_client: &RpcClient, - key: &Pubkey, + key: Pubkey, ) -> Result { let delegation_record = DelegationRecord::try_deserialize( &mut (rpc_client - .get_account_data(key) + .get_account_data(&key) .await .map_err(|_| FetchError {})? .as_slice()), @@ -225,8 +226,8 @@ pub async fn advance_delegation_record<'a>( ) .unwrap(); - // Collect all valid instructions - let instructions: Vec = pool_data + // First collect all potential instruction data + let potential_instructions: Vec<_> = pool_data .publishers .iter() .enumerate() @@ -265,48 +266,63 @@ pub async fn advance_delegation_record<'a>( let publisher_stake_account_custody = publisher_stake_account_positions.map(get_stake_account_custody_address); - let delegation_record_pubkey = - get_delegation_record_address(*publisher, *positions_pubkey); + Some(( + *publisher, + publisher_stake_account_positions, + publisher_stake_account_custody, + )) + }) + .collect(); - let delegation_record = - fetch_delegation_record(rpc_client, &delegation_record_pubkey).await; + // Fetch all delegation records concurrently + let delegation_records = join_all(potential_instructions.iter().map(|(publisher, _, _)| { + let delegation_record_pubkey = get_delegation_record_address(*publisher, *positions_pubkey); + fetch_delegation_record(rpc_client, delegation_record_pubkey) + })) + .await; - match delegation_record { - Ok(delegation_record) => { - if delegation_record.last_epoch == current_epoch { - return None; - } - } - Err(_) => { - return None; + // Process results and create instructions + let mut instructions = Vec::new(); + for ( + (publisher, publisher_stake_account_positions, publisher_stake_account_custody), + delegation_record, + ) in potential_instructions.into_iter().zip(delegation_records) + { + // Skip if we couldn't fetch the record or if it's already processed for current epoch + match delegation_record { + Ok(delegation_record) => { + if delegation_record.last_epoch == current_epoch { + continue; } } + Err(_) => { + continue; + } + } - let accounts = integrity_pool::accounts::AdvanceDelegationRecord { - delegation_record: get_delegation_record_address(*publisher, *positions_pubkey), - payer: signer.pubkey(), - pool_config, - pool_data: pool_data_address, - pool_reward_custody: get_pool_reward_custody_address(pyth_token_mint), - publisher: *publisher, - publisher_stake_account_positions, - publisher_stake_account_custody, - stake_account_positions: *positions_pubkey, - stake_account_custody: get_stake_account_custody_address(*positions_pubkey), - system_program: system_program::ID, - token_program: spl_token::ID, - }; - - let data = integrity_pool::instruction::AdvanceDelegationRecord {}; + let accounts = integrity_pool::accounts::AdvanceDelegationRecord { + delegation_record: get_delegation_record_address(publisher, *positions_pubkey), + payer: signer.pubkey(), + pool_config, + pool_data: pool_data_address, + pool_reward_custody: get_pool_reward_custody_address(pyth_token_mint), + publisher, + publisher_stake_account_positions, + publisher_stake_account_custody, + stake_account_positions: *positions_pubkey, + stake_account_custody: get_stake_account_custody_address(*positions_pubkey), + system_program: system_program::ID, + token_program: spl_token::ID, + }; + let data = integrity_pool::instruction::AdvanceDelegationRecord {}; - Some(Instruction { - program_id: integrity_pool::ID, - accounts: accounts.to_account_metas(None), - data: data.data(), - }) - }) - .collect(); + instructions.push(Instruction { + program_id: integrity_pool::ID, + accounts: accounts.to_account_metas(None), + data: data.data(), + }); + } // Process all instructions in one transaction if there are any if !instructions.is_empty() { @@ -316,14 +332,14 @@ pub async fn advance_delegation_record<'a>( instructions.len(), ); - for _ in 0..10 { - if process_transaction(rpc_client, &instructions, &[signer]) - .await - .is_ok() - { - break; - } - } + // for _ in 0..10 { + // // if process_transaction(rpc_client, &instructions, &[signer]) + // // .await + // // .is_ok() + // // { + // // break; + // // } + // } } } @@ -384,14 +400,24 @@ pub async fn claim_rewards( data.iter() .enumerate() .skip(96) - .for_each(|(i, (exposure, positions))| { - println!( - "Claiming rewards for account ({} / {}). exposure: {}. pubkey: {:?}", - i, - data.len(), - exposure, - positions.acc_info.key - ); - advance_delegation_record(rpc_client, signer, positions, min_reward, current_epoch); + .collect::>() // Collect into Vec first + .chunks(10) // Process in chunks of 10 + .for_each(|chunk| { + println!("Processing batch of up to 10 accounts..."); + + // Create a future for each account in the chunk + let futures = chunk.iter().map(|(i, (exposure, positions))| { + println!( + "Claiming rewards for account ({} / {}). exposure: {}. pubkey: {:?}", + i, + data.len(), + exposure, + positions.acc_info.key + ); + advance_delegation_record(rpc_client, signer, positions, min_reward, current_epoch) + }); + + // Execute up to 10 futures concurrently + futures::executor::block_on(join_all(futures)); }); } diff --git a/staking/cli/src/main.rs b/staking/cli/src/main.rs index 7aa816ab..6b780a39 100644 --- a/staking/cli/src/main.rs +++ b/staking/cli/src/main.rs @@ -12,8 +12,8 @@ use { solana_sdk::commitment_config::CommitmentConfig, }; - -fn main() { +#[tokio::main] +async fn main() { let Cli { keypair, rpc_url, @@ -25,11 +25,6 @@ fn main() { Action::ClaimRewards { min_staked, min_reward, - } => futures::executor::block_on(claim_rewards( - &rpc_client, - keypair.as_ref(), - min_staked, - min_reward, - )), + } => claim_rewards(&rpc_client, keypair.as_ref(), min_staked, min_reward).await, } } From 61a23cce262749afada1e48ed97632b83216d808 Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 01:11:26 +0000 Subject: [PATCH 09/14] start debugging --- staking/Cargo.lock | 5 +++-- staking/cli/Cargo.toml | 1 + staking/cli/src/instructions.rs | 40 ++++++++++++++++----------------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/staking/Cargo.lock b/staking/Cargo.lock index 25b94ffe..89782364 100644 --- a/staking/Cargo.lock +++ b/staking/Cargo.lock @@ -5296,6 +5296,7 @@ dependencies = [ "solana-remote-wallet", "solana-sdk", "tokio", + "tokio-stream", "uriparse", "wormhole-core-bridge-solana", "wormhole-solana", @@ -5578,9 +5579,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" dependencies = [ "futures-core", "pin-project-lite", diff --git a/staking/cli/Cargo.toml b/staking/cli/Cargo.toml index 2a113eec..6ef93f21 100644 --- a/staking/cli/Cargo.toml +++ b/staking/cli/Cargo.toml @@ -29,3 +29,4 @@ solana-account-decoder = "1.18.16" chrono = "0.4.38" futures = "0.3.31" tokio = "1.42.0" +tokio-stream = "0.1.17" diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 3c750adc..d2a54f2c 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -13,7 +13,10 @@ use { }, }, base64::Engine, - futures::future::join_all, + futures::{ + future::join_all, + StreamExt, + }, integration_tests::{ integrity_pool::pda::{ get_delegation_record_address, @@ -274,6 +277,11 @@ pub async fn advance_delegation_record<'a>( }) .collect(); + println!( + "There are {} potential instructions", + potential_instructions.len() + ); + // Fetch all delegation records concurrently let delegation_records = join_all(potential_instructions.iter().map(|(publisher, _, _)| { let delegation_record_pubkey = get_delegation_record_address(*publisher, *positions_pubkey); @@ -397,27 +405,17 @@ pub async fn claim_rewards( data.reverse(); - data.iter() + let futures = data + .iter() .enumerate() - .skip(96) .collect::>() // Collect into Vec first - .chunks(10) // Process in chunks of 10 - .for_each(|chunk| { - println!("Processing batch of up to 10 accounts..."); - - // Create a future for each account in the chunk - let futures = chunk.iter().map(|(i, (exposure, positions))| { - println!( - "Claiming rewards for account ({} / {}). exposure: {}. pubkey: {:?}", - i, - data.len(), - exposure, - positions.acc_info.key - ); - advance_delegation_record(rpc_client, signer, positions, min_reward, current_epoch) - }); + .iter() + .map(|(i, (exposure, positions))| { + advance_delegation_record(rpc_client, signer, positions, min_reward, current_epoch) + }) + .collect::>(); - // Execute up to 10 futures concurrently - futures::executor::block_on(join_all(futures)); - }); + let futures = tokio_stream::iter(futures); + let result = futures.buffer_unordered(1).collect::>().await; + // println!("There are {} futures", futures.len()); } From 900a5fa35ee3f74acaca346552cafeab67e6509a Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 01:40:49 +0000 Subject: [PATCH 10/14] works --- staking/cli/src/instructions.rs | 98 ++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 39 deletions(-) diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index d2a54f2c..0b26969c 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -203,31 +203,13 @@ pub async fn advance_delegation_record<'a>( positions: &DynamicPositionArray<'a>, min_reward: u64, current_epoch: u64, + pool_data: &PoolData, + pool_data_address: &Pubkey, + pyth_token_mint: &Pubkey, + pool_config: &Pubkey, + index: usize, ) { let positions_pubkey = positions.acc_info.key; - let pool_config = get_pool_config_address(); - - let PoolConfig { - pool_data: pool_data_address, - pyth_token_mint, - .. - } = PoolConfig::try_deserialize( - &mut rpc_client - .get_account_data(&pool_config) - .await - .unwrap() - .as_slice(), - ) - .unwrap(); - - let pool_data = PoolData::try_deserialize( - &mut &rpc_client - .get_account_data(&pool_data_address) - .await - .unwrap() - .as_slice()[..8 + size_of::()], - ) - .unwrap(); // First collect all potential instruction data let potential_instructions: Vec<_> = pool_data @@ -278,7 +260,9 @@ pub async fn advance_delegation_record<'a>( .collect(); println!( - "There are {} potential instructions", + "Position {:?} with index {} has {} potential instructions", + positions_pubkey, + index, potential_instructions.len() ); @@ -311,9 +295,9 @@ pub async fn advance_delegation_record<'a>( let accounts = integrity_pool::accounts::AdvanceDelegationRecord { delegation_record: get_delegation_record_address(publisher, *positions_pubkey), payer: signer.pubkey(), - pool_config, - pool_data: pool_data_address, - pool_reward_custody: get_pool_reward_custody_address(pyth_token_mint), + pool_config: *pool_config, + pool_data: *pool_data_address, + pool_reward_custody: get_pool_reward_custody_address(*pyth_token_mint), publisher, publisher_stake_account_positions, publisher_stake_account_custody, @@ -340,14 +324,14 @@ pub async fn advance_delegation_record<'a>( instructions.len(), ); - // for _ in 0..10 { - // // if process_transaction(rpc_client, &instructions, &[signer]) - // // .await - // // .is_ok() - // // { - // // break; - // // } - // } + for _ in 0..10 { + if process_transaction(rpc_client, &instructions, &[signer]) + .await + .is_ok() + { + break; + } + } } } @@ -405,17 +389,53 @@ pub async fn claim_rewards( data.reverse(); + let pool_config = get_pool_config_address(); + + let PoolConfig { + pool_data: pool_data_address, + pyth_token_mint, + .. + } = PoolConfig::try_deserialize( + &mut rpc_client + .get_account_data(&pool_config) + .await + .unwrap() + .as_slice(), + ) + .unwrap(); + + let pool_data = PoolData::try_deserialize( + &mut &rpc_client + .get_account_data(&pool_data_address) + .await + .unwrap() + .as_slice()[..8 + size_of::()], + ) + .unwrap(); + + + // advance_delegation_record(rpc_client, signer, &data.first().unwrap().1, min_reward, + // current_epoch, &pool_data, &pool_data_address, &pyth_token_mint, &pool_config).await; let futures = data .iter() .enumerate() - .collect::>() // Collect into Vec first - .iter() .map(|(i, (exposure, positions))| { - advance_delegation_record(rpc_client, signer, positions, min_reward, current_epoch) + advance_delegation_record( + rpc_client, + signer, + positions, + min_reward, + current_epoch, + &pool_data, + &pool_data_address, + &pyth_token_mint, + &pool_config, + i, + ) }) .collect::>(); let futures = tokio_stream::iter(futures); - let result = futures.buffer_unordered(1).collect::>().await; + let result = futures.buffer_unordered(2).collect::>().await; // println!("There are {} futures", futures.len()); } From 841ff4bb2cc96299f4379211e7ff287fe03b46c9 Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 02:11:37 +0000 Subject: [PATCH 11/14] works --- staking/cli/src/instructions.rs | 52 +++++++++++++++------------------ 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 0b26969c..a0c81046 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -135,9 +135,9 @@ pub async fn process_transaction( rpc_client: &RpcClient, instructions: &[Instruction], signers: &[&dyn Signer], -) -> Result> { +) -> Result<(), Option> { let mut instructions = instructions.to_vec(); - instructions.push(ComputeBudgetInstruction::set_compute_unit_price(1)); + instructions.push(ComputeBudgetInstruction::set_compute_unit_price(1000)); let mut transaction = Transaction::new_with_payer(&instructions, Some(&signers[0].pubkey())); transaction.sign( signers, @@ -147,25 +147,23 @@ pub async fn process_transaction( .unwrap() .0, ); - let transaction_signature_res = rpc_client - .send_and_confirm_transaction_with_spinner_and_config( - &transaction, - CommitmentConfig::processed(), - RpcSendTransactionConfig { - skip_preflight: true, - ..Default::default() - }, - ); - match transaction_signature_res.await { - Ok(signature) => { - println!("Transaction successful : {signature:?}"); - Ok(signature) - } - Err(err) => { - println!("transaction err: {err:?}"); - Err(err.get_transaction_error()) - } + for _ in 0..10 { + rpc_client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + max_retries: Some(0), + ..Default::default() + }, + ) + .await + .unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; } + + Ok(()) } pub async fn get_current_time(rpc_client: &RpcClient) -> i64 { @@ -316,7 +314,7 @@ pub async fn advance_delegation_record<'a>( }); } - // Process all instructions in one transaction if there are any + // Process instructions in chunks of 5 if !instructions.is_empty() { println!( "Advancing delegation record for pubkey: {:?}, number of instructions: {}", @@ -324,13 +322,10 @@ pub async fn advance_delegation_record<'a>( instructions.len(), ); - for _ in 0..10 { - if process_transaction(rpc_client, &instructions, &[signer]) + for chunk in instructions.chunks(5) { + process_transaction(rpc_client, chunk, &[signer]) .await - .is_ok() - { - break; - } + .unwrap(); } } } @@ -413,7 +408,6 @@ pub async fn claim_rewards( ) .unwrap(); - // advance_delegation_record(rpc_client, signer, &data.first().unwrap().1, min_reward, // current_epoch, &pool_data, &pool_data_address, &pyth_token_mint, &pool_config).await; let futures = data @@ -436,6 +430,6 @@ pub async fn claim_rewards( .collect::>(); let futures = tokio_stream::iter(futures); - let result = futures.buffer_unordered(2).collect::>().await; + let result = futures.buffer_unordered(20).collect::>().await; // println!("There are {} futures", futures.len()); } From 1b011a86f68b297a147aed85ee68ce63989712dc Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 02:17:13 +0000 Subject: [PATCH 12/14] works --- staking/cli/src/instructions.rs | 58 +++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index a0c81046..5eeebcd7 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -206,7 +206,7 @@ pub async fn advance_delegation_record<'a>( pyth_token_mint: &Pubkey, pool_config: &Pubkey, index: usize, -) { +) -> bool { let positions_pubkey = positions.acc_info.key; // First collect all potential instruction data @@ -327,7 +327,9 @@ pub async fn advance_delegation_record<'a>( .await .unwrap(); } + return true; // Instructions were processed } + false // No instructions were processed } pub async fn claim_rewards( @@ -408,28 +410,36 @@ pub async fn claim_rewards( ) .unwrap(); - // advance_delegation_record(rpc_client, signer, &data.first().unwrap().1, min_reward, - // current_epoch, &pool_data, &pool_data_address, &pyth_token_mint, &pool_config).await; - let futures = data - .iter() - .enumerate() - .map(|(i, (exposure, positions))| { - advance_delegation_record( - rpc_client, - signer, - positions, - min_reward, - current_epoch, - &pool_data, - &pool_data_address, - &pyth_token_mint, - &pool_config, - i, - ) - }) - .collect::>(); + loop { + let futures = data + .iter() + .enumerate() + .map(|(i, (exposure, positions))| { + advance_delegation_record( + rpc_client, + signer, + positions, + min_reward, + current_epoch, + &pool_data, + &pool_data_address, + &pyth_token_mint, + &pool_config, + i, + ) + }) + .collect::>(); + + let futures = tokio_stream::iter(futures); + let results = futures.buffer_unordered(20).collect::>().await; + + // Check if any delegations were advanced + if !results.iter().any(|&processed| processed) { + break; + } - let futures = tokio_stream::iter(futures); - let result = futures.buffer_unordered(20).collect::>().await; - // println!("There are {} futures", futures.len()); + + println!("We will retry after 5 seconds!"); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } } From a0053add417e7936a66fbd83dc2de9be4c255418 Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Tue, 31 Dec 2024 04:23:10 +0000 Subject: [PATCH 13/14] go --- staking/cli/src/instructions.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 5eeebcd7..48e8f87d 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -410,11 +410,16 @@ pub async fn claim_rewards( ) .unwrap(); + println!("Processing {} accounts", data.len()); + // Initialize results vector with true to process all indexes in first round + let mut active_positions = vec![true; data.len()]; + loop { let futures = data .iter() .enumerate() - .map(|(i, (exposure, positions))| { + .filter(|(i, _)| active_positions[*i]) + .map(|(i, (_, positions))| { advance_delegation_record( rpc_client, signer, @@ -431,15 +436,25 @@ pub async fn claim_rewards( .collect::>(); let futures = tokio_stream::iter(futures); - let results = futures.buffer_unordered(20).collect::>().await; + let results = futures.buffered(20).collect::>().await; + + println!("Finished processing {} accounts", results.len()); + // Update active_positions based on results + let mut result_index = 0; + for i in 0..active_positions.len() { + if active_positions[i] { + active_positions[i] = results[result_index]; + result_index += 1; + } + } - // Check if any delegations were advanced - if !results.iter().any(|&processed| processed) { + // If no delegations were advanced, we're done + if !results.iter().any(|&active| active) { break; } - println!("We will retry after 5 seconds!"); - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + println!("We will retry after 10 seconds!"); + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; } } From 118628b6dabfaa5617bbf3a7c9c099e68202123d Mon Sep 17 00:00:00 2001 From: Guillermo Bescos Date: Thu, 2 Jan 2025 22:36:21 +0000 Subject: [PATCH 14/14] checkpoint --- staking/cli/src/instructions.rs | 7 +++---- staking/programs/staking/src/state/positions.rs | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/staking/cli/src/instructions.rs b/staking/cli/src/instructions.rs index 48e8f87d..116d83b0 100644 --- a/staking/cli/src/instructions.rs +++ b/staking/cli/src/instructions.rs @@ -1,5 +1,6 @@ use { anchor_lang::{ + pubkey, AccountDeserialize, Discriminator, InstructionData, @@ -137,7 +138,7 @@ pub async fn process_transaction( signers: &[&dyn Signer], ) -> Result<(), Option> { let mut instructions = instructions.to_vec(); - instructions.push(ComputeBudgetInstruction::set_compute_unit_price(1000)); + instructions.push(ComputeBudgetInstruction::set_compute_unit_price(10000)); let mut transaction = Transaction::new_with_payer(&instructions, Some(&signers[0].pubkey())); transaction.sign( signers, @@ -285,9 +286,7 @@ pub async fn advance_delegation_record<'a>( continue; } } - Err(_) => { - continue; - } + Err(_) => {} } let accounts = integrity_pool::accounts::AdvanceDelegationRecord { diff --git a/staking/programs/staking/src/state/positions.rs b/staking/programs/staking/src/state/positions.rs index 3a645798..46999a1b 100644 --- a/staking/programs/staking/src/state/positions.rs +++ b/staking/programs/staking/src/state/positions.rs @@ -226,6 +226,7 @@ impl<'a> DynamicPositionArray<'a> { Ok(exposure) } + /// This function is used to reduce the number of positions in the array by merging equivalent /// positions. Sometimes some positions have the same `target_with_parameters`, /// `activation_epoch` and `unlocking_start`. These can obviously be merged, but this is not