Skip to content

Commit

Permalink
Add indexing support for delegation pool allowlist
Browse files Browse the repository at this point in the history
Add indexing support for the delegation pool allowlist.

EnableDelegatorsAllowlisting { pool_address }
DisableDelegatorsAllowlisting { pool_address }
AllowlistDelegator { pool_address, delegator_address }
RemoveDelegatorFromAllowlist { pool_address, delegator_address }
  • Loading branch information
junkil-park committed Aug 8, 2024
1 parent 89844fb commit 8881366
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS delegated_staking_pool_allowlist;
DROP TABLE IF EXISTS current_delegated_staking_pool_allowlist;
ALTER TABLE delegated_staking_pools DROP COLUMN IF EXISTS allowlist_enabled;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- Your SQL goes here
ALTER TABLE delegated_staking_pools
ADD COLUMN IF NOT EXISTS allowlist_enabled BOOLEAN NOT NULL DEFAULT FALSE;

CREATE TABLE IF NOT EXISTS current_delegated_staking_pool_allowlist (
staking_pool_address VARCHAR(66) NOT NULL,
delegator_address VARCHAR(66) NOT NULL,
-- Used for soft delete. On chain, it's a delete operation.
is_allowed BOOLEAN NOT NULL DEFAULT FALSE,
last_transaction_version BIGINT NOT NULL,
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (delegator_address, staking_pool_address)
);

CREATE TABLE IF NOT EXISTS delegated_staking_pool_allowlist (
staking_pool_address VARCHAR(66) NOT NULL,
delegator_address VARCHAR(66) NOT NULL,
-- Used for soft delete. On chain, it's a delete operation.
is_allowed BOOLEAN NOT NULL DEFAULT FALSE,
transaction_version BIGINT NOT NULL,
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (transaction_version, delegator_address, staking_pool_address)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]

use super::stake_utils::StakeEvent;
use crate::{
schema::{current_delegated_staking_pool_allowlist, delegated_staking_pool_allowlist},
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{transaction::TxnData, Transaction};
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(delegator_address, staking_pool_address))]
#[diesel(table_name = current_delegated_staking_pool_allowlist)]
pub struct CurrentDelegatedStakingPoolAllowlist {
pub staking_pool_address: String,
pub delegator_address: String,
pub is_allowed: bool,
last_transaction_version: i64,
}

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, delegator_address, staking_pool_address))]
#[diesel(table_name = delegated_staking_pool_allowlist)]
pub struct DelegatedStakingPoolAllowlist {
pub staking_pool_address: String,
pub delegator_address: String,
pub is_allowed: bool,
transaction_version: i64,
}

impl CurrentDelegatedStakingPoolAllowlist {
pub fn from_transaction(
transaction: &Transaction,
) -> anyhow::Result<AHashMap<(String, String), Self>> {
let mut delegated_staking_pool_allowlist = AHashMap::new();
let txn_data = match transaction.txn_data.as_ref() {
Some(data) => data,
None => {
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["DelegatedStakingPoolAllowlist"])
.inc();
tracing::warn!(
transaction_version = transaction.version,
"Transaction data doesn't exist",
);
return Ok(delegated_staking_pool_allowlist);
},
};
let txn_version = transaction.version as i64;

if let TxnData::User(user_txn) = txn_data {
for event in &user_txn.events {
if let Some(StakeEvent::AllowlistDelegatorEvent(ev)) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
let current_delegated_staking_pool_allowlist =
CurrentDelegatedStakingPoolAllowlist {
last_transaction_version: txn_version,
staking_pool_address: standardize_address(&ev.pool_address),
delegator_address: standardize_address(&ev.delegator_address),
is_allowed: ev.enabled,
};
delegated_staking_pool_allowlist.insert(
(
current_delegated_staking_pool_allowlist
.delegator_address
.clone(),
current_delegated_staking_pool_allowlist
.staking_pool_address
.clone(),
),
current_delegated_staking_pool_allowlist,
);
}
}
}
Ok(delegated_staking_pool_allowlist)
}
}

impl DelegatedStakingPoolAllowlist {
pub fn from_transaction(transaction: &Transaction) -> anyhow::Result<Vec<Self>> {
let mut delegated_staking_pool_allowlist = vec![];
let txn_data = match transaction.txn_data.as_ref() {
Some(data) => data,
None => {
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["DelegatedStakingPoolAllowlist"])
.inc();
tracing::warn!(
transaction_version = transaction.version,
"Transaction data doesn't exist",
);
return Ok(delegated_staking_pool_allowlist);
},
};
let txn_version = transaction.version as i64;

if let TxnData::User(user_txn) = txn_data {
for event in &user_txn.events {
if let Some(StakeEvent::AllowlistDelegatorEvent(ev)) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
delegated_staking_pool_allowlist.push(Self {
transaction_version: txn_version,
staking_pool_address: standardize_address(&ev.pool_address),
delegator_address: standardize_address(&ev.delegator_address),
is_allowed: ev.enabled,
});
}
}
}
Ok(delegated_staking_pool_allowlist)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]

use super::stake_utils::{StakeResource, StakeTableItem};
use super::stake_utils::{StakeEvent, StakeResource, StakeTableItem};
use crate::{
schema::{
current_delegated_staking_pool_balances, delegated_staking_pool_balances,
Expand All @@ -30,7 +30,10 @@ pub type DelegatorPoolBalanceMap = AHashMap<StakingPoolAddress, CurrentDelegator
#[diesel(table_name = delegated_staking_pools)]
pub struct DelegatorPool {
pub staking_pool_address: String,
// We should add a new field like `last_transaction_version` to track the last transaction version
// that updated the pool
pub first_transaction_version: i64,
pub allowlist_enabled: bool,
}

// Metadata to fill pool balances and delegator balance
Expand Down Expand Up @@ -136,6 +139,41 @@ impl DelegatorPool {
}
}
}
let txn_version = transaction.version as i64;

let events = match txn_data {
TxnData::User(txn) => &txn.events,
TxnData::BlockMetadata(txn) => &txn.events,
_ => {
return Ok((
delegator_pool_map,
delegator_pool_balances,
delegator_pool_balances_map,
))
},
};

for event in events {
if let Some(StakeEvent::AllowlistingEvent(inner)) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
let staking_pool_address = standardize_address(&inner.pool_address);
let enabled = inner.enabled;
if delegator_pool_map.contains_key(&staking_pool_address) {
delegator_pool_map
.get_mut(&staking_pool_address)
.expect("Pool should exist")
.allowlist_enabled = enabled;
} else {
let pool = DelegatorPool {
staking_pool_address: staking_pool_address.clone(),
first_transaction_version: txn_version,
allowlist_enabled: enabled,
};
delegator_pool_map.insert(staking_pool_address.clone(), pool);
}
}
}
}
Ok((
delegator_pool_map,
Expand Down Expand Up @@ -211,6 +249,7 @@ impl DelegatorPool {
Self {
staking_pool_address: staking_pool_address.clone(),
first_transaction_version: transaction_version,
allowlist_enabled: false,
},
DelegatorPoolBalance {
transaction_version,
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/stake_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod current_delegated_voter;
pub mod delegator_activities;
pub mod delegator_allowlist;
pub mod delegator_balances;
pub mod delegator_pools;
pub mod proposal_votes;
Expand Down
21 changes: 21 additions & 0 deletions rust/processor/src/db/common/models/stake_models/stake_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ pub struct ReactivateStakeEvent {
pub delegator_address: String,
pub pool_address: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EnableAllowlistingEvent {
pub pool_address: String,
pub enabled: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EnableDelegatorAllowlistingEvent {
pub pool_address: String,
pub delegator_address: String,
pub enabled: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum StakeTableItem {
Expand Down Expand Up @@ -195,6 +207,8 @@ pub enum StakeEvent {
UnlockStakeEvent(UnlockStakeEvent),
WithdrawStakeEvent(WithdrawStakeEvent),
ReactivateStakeEvent(ReactivateStakeEvent),
AllowlistingEvent(EnableAllowlistingEvent),
AllowlistDelegatorEvent(EnableDelegatorAllowlistingEvent),
}

impl StakeEvent {
Expand All @@ -216,6 +230,13 @@ impl StakeEvent {
},
"0x1::delegation_pool::ReactivateStakeEvent" => serde_json::from_str(data)
.map(|inner| Some(StakeEvent::ReactivateStakeEvent(inner))),
"0x1::delegation_pool::EnableDelegatorsAllowlisting"
| "0x1::delegation_pool::DisableDelegatorsAllowlisting" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::AllowlistingEvent(inner)))
},
"0x1::delegation_pool::AllowlistDelegator"
| "0x1::delegation_pool::RemoveDelegatorFromAllowlist" => serde_json::from_str(data)
.map(|inner| Some(StakeEvent::AllowlistDelegatorEvent(inner))),
_ => Ok(None),
}
.context(format!(
Expand Down
27 changes: 27 additions & 0 deletions rust/processor/src/db/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,18 @@ diesel::table! {
}
}

diesel::table! {
current_delegated_staking_pool_allowlist (delegator_address, staking_pool_address) {
#[max_length = 66]
staking_pool_address -> Varchar,
#[max_length = 66]
delegator_address -> Varchar,
is_allowed -> Bool,
last_transaction_version -> Int8,
inserted_at -> Timestamp,
}
}

diesel::table! {
current_delegated_staking_pool_balances (staking_pool_address) {
#[max_length = 66]
Expand Down Expand Up @@ -694,6 +706,18 @@ diesel::table! {
}
}

diesel::table! {
delegated_staking_pool_allowlist (transaction_version, delegator_address, staking_pool_address) {
#[max_length = 66]
staking_pool_address -> Varchar,
#[max_length = 66]
delegator_address -> Varchar,
is_allowed -> Bool,
transaction_version -> Int8,
inserted_at -> Timestamp,
}
}

diesel::table! {
delegated_staking_pool_balances (transaction_version, staking_pool_address) {
transaction_version -> Int8,
Expand All @@ -716,6 +740,7 @@ diesel::table! {
staking_pool_address -> Varchar,
first_transaction_version -> Int8,
inserted_at -> Timestamp,
allowlist_enabled -> Bool,
}
}

Expand Down Expand Up @@ -1300,6 +1325,7 @@ diesel::allow_tables_to_appear_in_same_query!(
current_coin_balances,
current_collection_datas,
current_collections_v2,
current_delegated_staking_pool_allowlist,
current_delegated_staking_pool_balances,
current_delegated_voter,
current_delegator_balances,
Expand All @@ -1316,6 +1342,7 @@ diesel::allow_tables_to_appear_in_same_query!(
current_token_v2_metadata,
current_unified_fungible_asset_balances_to_be_renamed,
delegated_staking_activities,
delegated_staking_pool_allowlist,
delegated_staking_pool_balances,
delegated_staking_pools,
delegator_balances,
Expand Down
27 changes: 24 additions & 3 deletions rust/processor/src/processors/stake_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use crate::{
db::common::models::stake_models::{
current_delegated_voter::CurrentDelegatedVoter,
delegator_activities::DelegatedStakingActivity,
delegator_allowlist::{
CurrentDelegatedStakingPoolAllowlist, DelegatedStakingPoolAllowlist,
},
delegator_balances::{
CurrentDelegatorBalance, CurrentDelegatorBalanceMap, DelegatorBalance,
},
Expand Down Expand Up @@ -303,8 +306,9 @@ fn insert_delegator_pools_query(
.on_conflict(staking_pool_address)
.do_update()
.set((
first_transaction_version.eq(excluded(first_transaction_version)),
inserted_at.eq(excluded(inserted_at)),
first_transaction_version.eq(excluded(first_transaction_version)),
allowlist_enabled.eq(excluded(allowlist_enabled)),
inserted_at.eq(excluded(inserted_at)),
)),
Some(
" WHERE delegated_staking_pools.first_transaction_version >= EXCLUDED.first_transaction_version ",
Expand Down Expand Up @@ -416,6 +420,9 @@ impl ProcessorTrait for StakeProcessor {
let mut all_current_delegated_voter = AHashMap::new();
let mut all_vote_delegation_handle_to_pool_address = AHashMap::new();

let mut all_delegator_allowlist = vec![];
let mut all_current_delegator_allowlist = AHashMap::new();

for txn in &transactions {
// Add votes data
let current_stake_pool_voter = CurrentStakingPoolVoter::from_transaction(txn).unwrap();
Expand All @@ -430,10 +437,24 @@ impl ProcessorTrait for StakeProcessor {
// Add delegator pools
let (delegator_pools, mut delegator_pool_balances, current_delegator_pool_balances) =
DelegatorPool::from_transaction(txn).unwrap();
all_delegator_pools.extend(delegator_pools);
for (pool_address, pool) in delegator_pools.iter() {
// We need to keep the first transaction version for each pool.
if let Some(existing_pool) = all_delegator_pools.get_mut(pool_address) {
existing_pool.allowlist_enabled = pool.allowlist_enabled;
} else {
all_delegator_pools.insert(pool_address.clone(), pool.clone());
}
}
all_delegator_pool_balances.append(&mut delegator_pool_balances);
all_current_delegator_pool_balances.extend(current_delegator_pool_balances);

// Add delegator pool allowlist.
let delegator_allowlist = DelegatedStakingPoolAllowlist::from_transaction(txn).unwrap();
all_delegator_allowlist.extend(delegator_allowlist);
let current_delegator_allowlist =
CurrentDelegatedStakingPoolAllowlist::from_transaction(txn).unwrap();
all_current_delegator_allowlist.extend(current_delegator_allowlist);

// Moving the transaction code here is the new paradigm to avoid redoing a lot of the duplicate work
// Currently only delegator voting follows this paradigm
// TODO: refactor all the other staking code to follow this paradigm
Expand Down

0 comments on commit 8881366

Please sign in to comment.