From 88813665cb022786d1d9a475dfeea5fc92ed6827 Mon Sep 17 00:00:00 2001 From: Junkil Park Date: Thu, 8 Aug 2024 02:12:53 -0700 Subject: [PATCH] Add indexing support for delegation pool allowlist Add indexing support for the delegation pool allowlist. EnableDelegatorsAllowlisting { pool_address } DisableDelegatorsAllowlisting { pool_address } AllowlistDelegator { pool_address, delegator_address } RemoveDelegatorFromAllowlist { pool_address, delegator_address } --- .../down.sql | 4 + .../up.sql | 23 ++++ .../stake_models/delegator_allowlist.rs | 121 ++++++++++++++++++ .../models/stake_models/delegator_pools.rs | 41 +++++- .../src/db/common/models/stake_models/mod.rs | 1 + .../common/models/stake_models/stake_utils.rs | 21 +++ rust/processor/src/db/postgres/schema.rs | 27 ++++ .../src/processors/stake_processor.rs | 27 +++- 8 files changed, 261 insertions(+), 4 deletions(-) create mode 100644 rust/processor/migrations/2024-05-01-211023_delegator_allowlist/down.sql create mode 100644 rust/processor/migrations/2024-05-01-211023_delegator_allowlist/up.sql create mode 100644 rust/processor/src/db/common/models/stake_models/delegator_allowlist.rs diff --git a/rust/processor/migrations/2024-05-01-211023_delegator_allowlist/down.sql b/rust/processor/migrations/2024-05-01-211023_delegator_allowlist/down.sql new file mode 100644 index 000000000..713de05ce --- /dev/null +++ b/rust/processor/migrations/2024-05-01-211023_delegator_allowlist/down.sql @@ -0,0 +1,4 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS delegated_staking_pool_allowlist; +DROP TABLE IF EXISTS current_delegated_staking_pool_allowlist; +ALTER TABLE delegated_staking_pools DROP COLUMN IF EXISTS allowlist_enabled; \ No newline at end of file diff --git a/rust/processor/migrations/2024-05-01-211023_delegator_allowlist/up.sql b/rust/processor/migrations/2024-05-01-211023_delegator_allowlist/up.sql new file mode 100644 index 000000000..9550ab938 --- /dev/null +++ b/rust/processor/migrations/2024-05-01-211023_delegator_allowlist/up.sql @@ -0,0 +1,23 @@ +-- Your SQL goes here +ALTER TABLE delegated_staking_pools +ADD COLUMN IF NOT EXISTS allowlist_enabled BOOLEAN NOT NULL DEFAULT FALSE; + +CREATE TABLE IF NOT EXISTS current_delegated_staking_pool_allowlist ( + staking_pool_address VARCHAR(66) NOT NULL, + delegator_address VARCHAR(66) NOT NULL, + -- Used for soft delete. On chain, it's a delete operation. + is_allowed BOOLEAN NOT NULL DEFAULT FALSE, + last_transaction_version BIGINT NOT NULL, + inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), + PRIMARY KEY (delegator_address, staking_pool_address) +); + +CREATE TABLE IF NOT EXISTS delegated_staking_pool_allowlist ( + staking_pool_address VARCHAR(66) NOT NULL, + delegator_address VARCHAR(66) NOT NULL, + -- Used for soft delete. On chain, it's a delete operation. + is_allowed BOOLEAN NOT NULL DEFAULT FALSE, + transaction_version BIGINT NOT NULL, + inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), + PRIMARY KEY (transaction_version, delegator_address, staking_pool_address) +); \ No newline at end of file diff --git a/rust/processor/src/db/common/models/stake_models/delegator_allowlist.rs b/rust/processor/src/db/common/models/stake_models/delegator_allowlist.rs new file mode 100644 index 000000000..3e57ee2a0 --- /dev/null +++ b/rust/processor/src/db/common/models/stake_models/delegator_allowlist.rs @@ -0,0 +1,121 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] + +use super::stake_utils::StakeEvent; +use crate::{ + schema::{current_delegated_staking_pool_allowlist, delegated_staking_pool_allowlist}, + utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address}, +}; +use ahash::AHashMap; +use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; +use field_count::FieldCount; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(delegator_address, staking_pool_address))] +#[diesel(table_name = current_delegated_staking_pool_allowlist)] +pub struct CurrentDelegatedStakingPoolAllowlist { + pub staking_pool_address: String, + pub delegator_address: String, + pub is_allowed: bool, + last_transaction_version: i64, +} + +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(transaction_version, delegator_address, staking_pool_address))] +#[diesel(table_name = delegated_staking_pool_allowlist)] +pub struct DelegatedStakingPoolAllowlist { + pub staking_pool_address: String, + pub delegator_address: String, + pub is_allowed: bool, + transaction_version: i64, +} + +impl CurrentDelegatedStakingPoolAllowlist { + pub fn from_transaction( + transaction: &Transaction, + ) -> anyhow::Result> { + let mut delegated_staking_pool_allowlist = AHashMap::new(); + let txn_data = match transaction.txn_data.as_ref() { + Some(data) => data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["DelegatedStakingPoolAllowlist"]) + .inc(); + tracing::warn!( + transaction_version = transaction.version, + "Transaction data doesn't exist", + ); + return Ok(delegated_staking_pool_allowlist); + }, + }; + let txn_version = transaction.version as i64; + + if let TxnData::User(user_txn) = txn_data { + for event in &user_txn.events { + if let Some(StakeEvent::AllowlistDelegatorEvent(ev)) = + StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? + { + let current_delegated_staking_pool_allowlist = + CurrentDelegatedStakingPoolAllowlist { + last_transaction_version: txn_version, + staking_pool_address: standardize_address(&ev.pool_address), + delegator_address: standardize_address(&ev.delegator_address), + is_allowed: ev.enabled, + }; + delegated_staking_pool_allowlist.insert( + ( + current_delegated_staking_pool_allowlist + .delegator_address + .clone(), + current_delegated_staking_pool_allowlist + .staking_pool_address + .clone(), + ), + current_delegated_staking_pool_allowlist, + ); + } + } + } + Ok(delegated_staking_pool_allowlist) + } +} + +impl DelegatedStakingPoolAllowlist { + pub fn from_transaction(transaction: &Transaction) -> anyhow::Result> { + let mut delegated_staking_pool_allowlist = vec![]; + let txn_data = match transaction.txn_data.as_ref() { + Some(data) => data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["DelegatedStakingPoolAllowlist"]) + .inc(); + tracing::warn!( + transaction_version = transaction.version, + "Transaction data doesn't exist", + ); + return Ok(delegated_staking_pool_allowlist); + }, + }; + let txn_version = transaction.version as i64; + + if let TxnData::User(user_txn) = txn_data { + for event in &user_txn.events { + if let Some(StakeEvent::AllowlistDelegatorEvent(ev)) = + StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? + { + delegated_staking_pool_allowlist.push(Self { + transaction_version: txn_version, + staking_pool_address: standardize_address(&ev.pool_address), + delegator_address: standardize_address(&ev.delegator_address), + is_allowed: ev.enabled, + }); + } + } + } + Ok(delegated_staking_pool_allowlist) + } +} diff --git a/rust/processor/src/db/common/models/stake_models/delegator_pools.rs b/rust/processor/src/db/common/models/stake_models/delegator_pools.rs index 6c5613990..dd9f7e4bf 100644 --- a/rust/processor/src/db/common/models/stake_models/delegator_pools.rs +++ b/rust/processor/src/db/common/models/stake_models/delegator_pools.rs @@ -4,7 +4,7 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] -use super::stake_utils::{StakeResource, StakeTableItem}; +use super::stake_utils::{StakeEvent, StakeResource, StakeTableItem}; use crate::{ schema::{ current_delegated_staking_pool_balances, delegated_staking_pool_balances, @@ -30,7 +30,10 @@ pub type DelegatorPoolBalanceMap = AHashMap &txn.events, + TxnData::BlockMetadata(txn) => &txn.events, + _ => { + return Ok(( + delegator_pool_map, + delegator_pool_balances, + delegator_pool_balances_map, + )) + }, + }; + + for event in events { + if let Some(StakeEvent::AllowlistingEvent(inner)) = + StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? + { + let staking_pool_address = standardize_address(&inner.pool_address); + let enabled = inner.enabled; + if delegator_pool_map.contains_key(&staking_pool_address) { + delegator_pool_map + .get_mut(&staking_pool_address) + .expect("Pool should exist") + .allowlist_enabled = enabled; + } else { + let pool = DelegatorPool { + staking_pool_address: staking_pool_address.clone(), + first_transaction_version: txn_version, + allowlist_enabled: enabled, + }; + delegator_pool_map.insert(staking_pool_address.clone(), pool); + } + } + } } Ok(( delegator_pool_map, @@ -211,6 +249,7 @@ impl DelegatorPool { Self { staking_pool_address: staking_pool_address.clone(), first_transaction_version: transaction_version, + allowlist_enabled: false, }, DelegatorPoolBalance { transaction_version, diff --git a/rust/processor/src/db/common/models/stake_models/mod.rs b/rust/processor/src/db/common/models/stake_models/mod.rs index 75db7e273..9324f7123 100644 --- a/rust/processor/src/db/common/models/stake_models/mod.rs +++ b/rust/processor/src/db/common/models/stake_models/mod.rs @@ -3,6 +3,7 @@ pub mod current_delegated_voter; pub mod delegator_activities; +pub mod delegator_allowlist; pub mod delegator_balances; pub mod delegator_pools; pub mod proposal_votes; diff --git a/rust/processor/src/db/common/models/stake_models/stake_utils.rs b/rust/processor/src/db/common/models/stake_models/stake_utils.rs index f623d3ffb..a9601c9ec 100644 --- a/rust/processor/src/db/common/models/stake_models/stake_utils.rs +++ b/rust/processor/src/db/common/models/stake_models/stake_utils.rs @@ -103,6 +103,18 @@ pub struct ReactivateStakeEvent { pub delegator_address: String, pub pool_address: String, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EnableAllowlistingEvent { + pub pool_address: String, + pub enabled: bool, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EnableDelegatorAllowlistingEvent { + pub pool_address: String, + pub delegator_address: String, + pub enabled: bool, +} #[derive(Serialize, Deserialize, Debug, Clone)] pub enum StakeTableItem { @@ -195,6 +207,8 @@ pub enum StakeEvent { UnlockStakeEvent(UnlockStakeEvent), WithdrawStakeEvent(WithdrawStakeEvent), ReactivateStakeEvent(ReactivateStakeEvent), + AllowlistingEvent(EnableAllowlistingEvent), + AllowlistDelegatorEvent(EnableDelegatorAllowlistingEvent), } impl StakeEvent { @@ -216,6 +230,13 @@ impl StakeEvent { }, "0x1::delegation_pool::ReactivateStakeEvent" => serde_json::from_str(data) .map(|inner| Some(StakeEvent::ReactivateStakeEvent(inner))), + "0x1::delegation_pool::EnableDelegatorsAllowlisting" + | "0x1::delegation_pool::DisableDelegatorsAllowlisting" => { + serde_json::from_str(data).map(|inner| Some(StakeEvent::AllowlistingEvent(inner))) + }, + "0x1::delegation_pool::AllowlistDelegator" + | "0x1::delegation_pool::RemoveDelegatorFromAllowlist" => serde_json::from_str(data) + .map(|inner| Some(StakeEvent::AllowlistDelegatorEvent(inner))), _ => Ok(None), } .context(format!( diff --git a/rust/processor/src/db/postgres/schema.rs b/rust/processor/src/db/postgres/schema.rs index 8c9508479..01d3d7bac 100644 --- a/rust/processor/src/db/postgres/schema.rs +++ b/rust/processor/src/db/postgres/schema.rs @@ -369,6 +369,18 @@ diesel::table! { } } +diesel::table! { + current_delegated_staking_pool_allowlist (delegator_address, staking_pool_address) { + #[max_length = 66] + staking_pool_address -> Varchar, + #[max_length = 66] + delegator_address -> Varchar, + is_allowed -> Bool, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + diesel::table! { current_delegated_staking_pool_balances (staking_pool_address) { #[max_length = 66] @@ -694,6 +706,18 @@ diesel::table! { } } +diesel::table! { + delegated_staking_pool_allowlist (transaction_version, delegator_address, staking_pool_address) { + #[max_length = 66] + staking_pool_address -> Varchar, + #[max_length = 66] + delegator_address -> Varchar, + is_allowed -> Bool, + transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + diesel::table! { delegated_staking_pool_balances (transaction_version, staking_pool_address) { transaction_version -> Int8, @@ -716,6 +740,7 @@ diesel::table! { staking_pool_address -> Varchar, first_transaction_version -> Int8, inserted_at -> Timestamp, + allowlist_enabled -> Bool, } } @@ -1300,6 +1325,7 @@ diesel::allow_tables_to_appear_in_same_query!( current_coin_balances, current_collection_datas, current_collections_v2, + current_delegated_staking_pool_allowlist, current_delegated_staking_pool_balances, current_delegated_voter, current_delegator_balances, @@ -1316,6 +1342,7 @@ diesel::allow_tables_to_appear_in_same_query!( current_token_v2_metadata, current_unified_fungible_asset_balances_to_be_renamed, delegated_staking_activities, + delegated_staking_pool_allowlist, delegated_staking_pool_balances, delegated_staking_pools, delegator_balances, diff --git a/rust/processor/src/processors/stake_processor.rs b/rust/processor/src/processors/stake_processor.rs index d623704d1..cc88e6d3f 100644 --- a/rust/processor/src/processors/stake_processor.rs +++ b/rust/processor/src/processors/stake_processor.rs @@ -6,6 +6,9 @@ use crate::{ db::common::models::stake_models::{ current_delegated_voter::CurrentDelegatedVoter, delegator_activities::DelegatedStakingActivity, + delegator_allowlist::{ + CurrentDelegatedStakingPoolAllowlist, DelegatedStakingPoolAllowlist, + }, delegator_balances::{ CurrentDelegatorBalance, CurrentDelegatorBalanceMap, DelegatorBalance, }, @@ -303,8 +306,9 @@ fn insert_delegator_pools_query( .on_conflict(staking_pool_address) .do_update() .set(( - first_transaction_version.eq(excluded(first_transaction_version)), - inserted_at.eq(excluded(inserted_at)), + first_transaction_version.eq(excluded(first_transaction_version)), + allowlist_enabled.eq(excluded(allowlist_enabled)), + inserted_at.eq(excluded(inserted_at)), )), Some( " WHERE delegated_staking_pools.first_transaction_version >= EXCLUDED.first_transaction_version ", @@ -416,6 +420,9 @@ impl ProcessorTrait for StakeProcessor { let mut all_current_delegated_voter = AHashMap::new(); let mut all_vote_delegation_handle_to_pool_address = AHashMap::new(); + let mut all_delegator_allowlist = vec![]; + let mut all_current_delegator_allowlist = AHashMap::new(); + for txn in &transactions { // Add votes data let current_stake_pool_voter = CurrentStakingPoolVoter::from_transaction(txn).unwrap(); @@ -430,10 +437,24 @@ impl ProcessorTrait for StakeProcessor { // Add delegator pools let (delegator_pools, mut delegator_pool_balances, current_delegator_pool_balances) = DelegatorPool::from_transaction(txn).unwrap(); - all_delegator_pools.extend(delegator_pools); + for (pool_address, pool) in delegator_pools.iter() { + // We need to keep the first transaction version for each pool. + if let Some(existing_pool) = all_delegator_pools.get_mut(pool_address) { + existing_pool.allowlist_enabled = pool.allowlist_enabled; + } else { + all_delegator_pools.insert(pool_address.clone(), pool.clone()); + } + } all_delegator_pool_balances.append(&mut delegator_pool_balances); all_current_delegator_pool_balances.extend(current_delegator_pool_balances); + // Add delegator pool allowlist. + let delegator_allowlist = DelegatedStakingPoolAllowlist::from_transaction(txn).unwrap(); + all_delegator_allowlist.extend(delegator_allowlist); + let current_delegator_allowlist = + CurrentDelegatedStakingPoolAllowlist::from_transaction(txn).unwrap(); + all_current_delegator_allowlist.extend(current_delegator_allowlist); + // Moving the transaction code here is the new paradigm to avoid redoing a lot of the duplicate work // Currently only delegator voting follows this paradigm // TODO: refactor all the other staking code to follow this paradigm