From b40ccac5e827e0eb43bd36afd875c21cbd16c1fa Mon Sep 17 00:00:00 2001 From: bingyuyap Date: Mon, 2 Sep 2024 13:03:27 +0800 Subject: [PATCH] ft_watcher: add logic to persist token info Signed-off-by: bingyuyap --- database/fast-transfer-schema.sql | 11 ++ watcher/src/fastTransfer/swapLayer/parser.ts | 22 ++- watcher/src/fastTransfer/types.ts | 8 + watcher/src/utils/TokenInfoManager.ts | 145 +++++++++++++++++++ watcher/src/watchers/FTEVMWatcher.ts | 34 ++--- 5 files changed, 200 insertions(+), 20 deletions(-) create mode 100644 watcher/src/utils/TokenInfoManager.ts diff --git a/database/fast-transfer-schema.sql b/database/fast-transfer-schema.sql index 39460067..dee617a0 100644 --- a/database/fast-transfer-schema.sql +++ b/database/fast-transfer-schema.sql @@ -107,3 +107,14 @@ CREATE TABLE redeem_swaps ( relaying_fee BIGINT NOT NULL, redeem_time TIMESTAMP NOT NULL ); + +-- Token Infos table to store information about different tokens +-- A normalized table for us to reference token info for analytics purposes +CREATE TABLE token_infos ( + name VARCHAR(255) NOT NULL, + chain_id INTEGER NOT NULL, + decimals INTEGER NOT NULL, + symbol VARCHAR(50) NOT NULL, + token_address VARCHAR(255) NOT NULL, + PRIMARY KEY (token_address, chain_id) +); diff --git a/watcher/src/fastTransfer/swapLayer/parser.ts b/watcher/src/fastTransfer/swapLayer/parser.ts index a4b100bf..a0dabd23 100644 --- a/watcher/src/fastTransfer/swapLayer/parser.ts +++ b/watcher/src/fastTransfer/swapLayer/parser.ts @@ -1,18 +1,31 @@ import { ethers } from 'ethers'; import { TransferCompletion } from '../types'; import { parseVaa } from '@wormhole-foundation/wormhole-monitor-common'; +import { Knex } from 'knex'; +import { Chain, chainToChainId } from '@wormhole-foundation/sdk-base'; +import { TokenInfoManager } from '../../utils/TokenInfoManager'; class SwapLayerParser { private provider: ethers.providers.JsonRpcProvider; private swapLayerAddress: string; private swapLayerInterface: ethers.utils.Interface; - - constructor(provider: ethers.providers.JsonRpcProvider, swapLayerAddress: string) { + private tokenInfoManager: TokenInfoManager | null = null; + + constructor( + provider: ethers.providers.JsonRpcProvider, + swapLayerAddress: string, + db: Knex | null, + chain: Chain + ) { this.provider = provider; this.swapLayerAddress = swapLayerAddress; this.swapLayerInterface = new ethers.utils.Interface([ 'event Redeemed(address indexed recipient, address outputToken, uint256 outputAmount, uint256 relayingFee)', ]); + + if (db) { + this.tokenInfoManager = new TokenInfoManager(db, chainToChainId(chain), provider); + } } async parseSwapLayerTransaction( @@ -59,6 +72,11 @@ class SwapLayerParser { if (!swapEvent) return null; + // if we have the tokenInfoManager inited, persist the token info + // this ensures we have the token decimal and name for analytics purposes + if (this.tokenInfoManager) + await this.tokenInfoManager.saveTokenInfoIfNotExist(swapEvent.args.outputToken); + return { tx_hash: txHash, recipient: swapEvent.args.recipient, diff --git a/watcher/src/fastTransfer/types.ts b/watcher/src/fastTransfer/types.ts index 6a2412af..14e38cdd 100644 --- a/watcher/src/fastTransfer/types.ts +++ b/watcher/src/fastTransfer/types.ts @@ -156,3 +156,11 @@ export type TransferCompletion = { // on Solana Swap Layer, this acts as a link between complete_{transfer, swap}_payload and release_inbound staged_inbound?: string; }; + +export type TokenInfo = { + name: string; + chain_id: number; + decimals: number; + symbol: string; + token_address: string; +}; diff --git a/watcher/src/utils/TokenInfoManager.ts b/watcher/src/utils/TokenInfoManager.ts new file mode 100644 index 00000000..e73d5f05 --- /dev/null +++ b/watcher/src/utils/TokenInfoManager.ts @@ -0,0 +1,145 @@ +import { Knex } from 'knex'; +import { ethers, providers } from 'ethers'; +import { ChainId } from '@wormhole-foundation/sdk-base'; +import { Connection, PublicKey } from '@solana/web3.js'; +import { TokenInfo } from 'src/fastTransfer/types'; + +const minABI = [ + { + inputs: [], + name: 'name', + outputs: [{ internalType: 'string', name: '', type: 'string' }], + stateMutability: 'view', + type: 'function', + }, + { + inputs: [], + name: 'symbol', + outputs: [{ internalType: 'string', name: '', type: 'string' }], + stateMutability: 'view', + type: 'function', + }, + { + inputs: [], + name: 'decimals', + outputs: [{ internalType: 'uint8', name: '', type: 'uint8' }], + stateMutability: 'view', + type: 'function', + }, +]; +// TokenInfoManager class for managing token information across different chains +// This class is to ensure that token info (e.g. decimal, name) for tokens that we see on Swap Layer is persisted for analytics purposes +export class TokenInfoManager { + private tokenInfoMap: Map; + private db: Knex; + private chainId: ChainId; + private provider: providers.JsonRpcProvider | Connection; + + constructor(db: Knex, chainId: ChainId, provider: providers.JsonRpcProvider | Connection) { + this.tokenInfoMap = new Map(); + this.db = db; + this.chainId = chainId; + this.provider = provider; + } + + // Retrieve token information from the database + private async getTokenInfoFromDB(tokenAddress: string): Promise { + return await this.db('token_infos') + .select('token_address', 'name', 'symbol', 'decimals') + .where('token_address', tokenAddress) + .andWhere('chain_id', this.chainId) + .first(); + } + + private async saveTokenInfo(tokenAddress: string, tokenInfo: TokenInfo): Promise { + await this.db('token_infos').insert({ + token_address: tokenAddress, + name: tokenInfo.name, + symbol: tokenInfo.symbol, + decimals: tokenInfo.decimals, + chain_id: this.chainId, + }); + } + + // Save token information if it doesn't exist in the cache or database + public async saveTokenInfoIfNotExist(tokenAddress: string): Promise { + if (this.tokenInfoMap.has(tokenAddress)) { + return this.tokenInfoMap.get(tokenAddress) || null; + } + // Check if token info is in the database + const tokenInfo = await this.getTokenInfoFromDB(tokenAddress); + if (tokenInfo) { + this.tokenInfoMap.set(tokenAddress, tokenInfo); + return tokenInfo; + } + // If not in database, fetch from RPC + const fetchedTokenInfo = await this.fetchTokenInfoFromRPC(tokenAddress); + if (fetchedTokenInfo) { + await this.saveTokenInfo(tokenAddress, fetchedTokenInfo); + this.tokenInfoMap.set(tokenAddress, fetchedTokenInfo); + return fetchedTokenInfo; + } + return null; + } + + // Fetch token information from RPC based on the chain ID + private async fetchTokenInfoFromRPC(tokenAddress: string): Promise { + if (this.chainId === 1) { + return this.fetchSolanaTokenInfo(tokenAddress); + } + return this.fetchEVMTokenInfo(tokenAddress); + } + + // Fetch Solana token information + private async fetchSolanaTokenInfo(tokenAddress: string): Promise { + try { + const connection = this.provider as Connection; + const tokenPublicKey = new PublicKey(tokenAddress); + const accountInfo = await connection.getParsedAccountInfo(tokenPublicKey); + + if (accountInfo.value && accountInfo.value.data && 'parsed' in accountInfo.value.data) { + const parsedData = accountInfo.value.data.parsed; + if (parsedData.type === 'mint' && 'info' in parsedData) { + const { name, symbol, decimals } = parsedData.info; + if ( + typeof name === 'string' && + typeof symbol === 'string' && + typeof decimals === 'number' + ) { + return { name, symbol, decimals, chain_id: 1, token_address: tokenAddress }; + } + } + } + throw new Error('Invalid token account'); + } catch (error) { + console.error('Error fetching Solana token info:', error); + return null; + } + } + + // Fetch EVM token information + private async fetchEVMTokenInfo(tokenAddress: string): Promise { + // If it's null address, it's Ether. If this chain is not Ethereum then it's Wrapped Ether + if (tokenAddress.toLowerCase() === '0x0000000000000000000000000000000000000000') { + return { + name: this.chainId === 2 || this.chainId === 5 ? 'Ether' : 'Wrapped Ether', + symbol: this.chainId === 2 || this.chainId === 5 ? 'ETH' : 'WETH', + decimals: 18, + chain_id: this.chainId, + token_address: tokenAddress, + }; + } + + const provider = this.provider as providers.JsonRpcProvider; + const tokenContract = new ethers.Contract(tokenAddress, minABI, provider); + try { + const name = await tokenContract.name(); + const symbol = await tokenContract.symbol(); + const decimals = await tokenContract.decimals(); + return { name, symbol, decimals, chain_id: this.chainId, token_address: tokenAddress }; + } catch (error) { + console.error('Error fetching EVM token info:', error, tokenAddress); + return null; + } + } +} diff --git a/watcher/src/watchers/FTEVMWatcher.ts b/watcher/src/watchers/FTEVMWatcher.ts index 718b38dc..7989db69 100644 --- a/watcher/src/watchers/FTEVMWatcher.ts +++ b/watcher/src/watchers/FTEVMWatcher.ts @@ -42,27 +42,25 @@ export class FTEVMWatcher extends Watcher { this.provider = new ethers.providers.JsonRpcProvider(RPCS_BY_CHAIN[network][chain]); this.rpc = RPCS_BY_CHAIN[this.network][this.chain]!; this.tokenRouterParser = new TokenRouterParser(this.network, chain, this.provider); + + // Initialize database connection before creating swap layer parser + if (!isTest) { + this.pg = knex({ + client: 'pg', + connection: { + user: assertEnvironmentVariable('PG_FT_USER'), + password: assertEnvironmentVariable('PG_FT_PASSWORD'), + database: assertEnvironmentVariable('PG_FT_DATABASE'), + host: assertEnvironmentVariable('PG_FT_HOST'), + port: Number(assertEnvironmentVariable('PG_FT_PORT')), + }, + }); + } + this.swapLayerParser = this.swapLayerAddress - ? new SwapLayerParser(this.provider, this.swapLayerAddress) + ? new SwapLayerParser(this.provider, this.swapLayerAddress, this.pg, chain) : null; this.logger.debug('FTWatcher', network, chain, finalizedBlockTag); - // hacky way to not connect to the db in tests - // this is to allow ci to run without a db - if (isTest) { - // Components needed for testing is complete - return; - } - - this.pg = knex({ - client: 'pg', - connection: { - user: assertEnvironmentVariable('PG_FT_USER'), - password: assertEnvironmentVariable('PG_FT_PASSWORD'), - database: assertEnvironmentVariable('PG_FT_DATABASE'), - host: assertEnvironmentVariable('PG_FT_HOST'), - port: Number(assertEnvironmentVariable('PG_FT_PORT')), - }, - }); } async getBlock(blockNumberOrTag: number | BlockTag): Promise {