Skip to content

Commit

Permalink
ft_watcher: pr comments
Browse files Browse the repository at this point in the history
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
  • Loading branch information
bingyuyap committed Jun 13, 2024
1 parent d6f86bd commit 1c827a8
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 56 deletions.
34 changes: 34 additions & 0 deletions watcher/src/fastTransfer/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { PublicKey } from '@solana/web3.js';
import BN from 'bn.js'; // Imported since FT codebase uses BN

// Type definitions are snake_case to match the database schema
Expand Down Expand Up @@ -108,3 +109,36 @@ export type FastTransferId = {
fast_vaa_id?: string;
auction_pubkey?: string;
};

// these can be found in the matchingEngineProgram, but we are making custom snake cased
// types to match the events in the logs parsed. Somehow anchor does not automatically convert
// the logs to the correct types
export type AuctionUpdated = {
config_id: number;
auction: PublicKey;
vaa: PublicKey | null;
source_chain: number;
target_protocol: MessageProtocol;
redeemer_message_len: number;
end_slot: BN;
best_offer_token: PublicKey;
token_balance_before: BN;
amount_in: BN;
total_deposit: BN;
max_offer_price_allowed: BN;
};

export type MessageProtocol = {
Local?: {
program_id: PublicKey;
};
Cctp?: {
domain: number;
};
None?: {};
};

export type AuctionUpdatedEvent = {
name: 'AuctionUpdated';
data: AuctionUpdated;
};
147 changes: 94 additions & 53 deletions watcher/src/watchers/FTSolanaWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
} from '@solana/web3.js';
import { findFromSignatureAndToSignature } from '../utils/solana';
import { makeBlockKey } from '../databases/utils';
import { BorshCoder, EventParser, Instruction } from '@coral-xyz/anchor';
import { BorshCoder, Event, EventParser, Instruction } from '@coral-xyz/anchor';
import { decodeTransferInstruction } from '@solana/spl-token';

import MATCHING_ENGINE_IDL from '../idls/matching_engine.json';
Expand All @@ -37,6 +37,8 @@ import {
FastTransferStatus,
ParsedLogs,
isOfferArgs,
AuctionUpdated,
AuctionUpdatedEvent,
} from '../fastTransfer/types';
import knex, { Knex } from 'knex';
import { assertEnvironmentVariable } from '@wormhole-foundation/wormhole-monitor-common';
Expand Down Expand Up @@ -79,7 +81,7 @@ export class FastTransferSolanaWatcher extends SolanaWatcher {
this.MATCHING_ENGINE_PROGRAM_ID,
new PublicKey(this.USDC_MINT)
);
this.getSignaturesLimit = 100;
this.connection = new Connection(this.rpc);
this.logger = getLogger(`fast_transfer_solana_${network.toLowerCase()}`);
this.eventParser = new EventParser(
new PublicKey(this.MATCHING_ENGINE_PROGRAM_ID),
Expand Down Expand Up @@ -327,20 +329,9 @@ export class FastTransferSolanaWatcher extends SolanaWatcher {
);
const fastVaaMessage = LiquidityLayerMessage.decode(fastVaaAccount.payload());

let message_protocol: FastTransferProtocol = FastTransferProtocol.NONE;
let cctp_domain: number | undefined;
let local_program_id: string | undefined;

if (res.meta?.logMessages) {
const auctionUpdate = await this.fetchEventFromLogs('AuctionUpdated', res.meta.logMessages);
if (auctionUpdate.target_protocol.Cctp) {
message_protocol = FastTransferProtocol.CCTP;
cctp_domain = auctionUpdate.target_protocol.Cctp.domain;
} else if (auctionUpdate.target_protocol.Local) {
message_protocol = FastTransferProtocol.LOCAL;
local_program_id = auctionUpdate.target_protocol.Local.program_id.toBase58();
}
}
const { message_protocol, cctp_domain, local_program_id } = this.checkMessageProtocols(
res.meta?.logMessages || []
);

if (!fastVaaMessage.fastMarketOrder) {
throw new Error(
Expand Down Expand Up @@ -393,6 +384,40 @@ export class FastTransferSolanaWatcher extends SolanaWatcher {
return { auction: fast_transfer, auction_offer };
}

checkMessageProtocols(logs: string[]): {
message_protocol: FastTransferProtocol;
cctp_domain: number | undefined;
local_program_id: string | undefined;
} {
const auctionUpdate = this.getAuctionUpdatedFromLogs(logs);
if (!auctionUpdate) {
return {
message_protocol: FastTransferProtocol.NONE,
cctp_domain: undefined,
local_program_id: undefined,
};
}

let message_protocol: FastTransferProtocol = FastTransferProtocol.NONE;
let cctp_domain: number | undefined;
let local_program_id: string | undefined;

const { target_protocol } = auctionUpdate;
if (target_protocol.Cctp) {
message_protocol = FastTransferProtocol.CCTP;
cctp_domain = target_protocol.Cctp.domain;
} else if (target_protocol.Local) {
message_protocol = FastTransferProtocol.LOCAL;
local_program_id = target_protocol.Local.program_id.toBase58();
}

return {
message_protocol,
cctp_domain,
local_program_id,
};
}

/**
* This function parses the `improve_offer` instruction
* We can safely assume that the offer price here is better than the previous offer price since
Expand Down Expand Up @@ -648,7 +673,7 @@ export class FastTransferSolanaWatcher extends SolanaWatcher {
ix: MessageCompiledInstruction
) {
// Slow relay is not done yet, will implement after
throw new Error('[parseSettleAuctionNoneCctp] not implemented');
throw new Error('[parseSettleAuctionNoneLocal] not implemented');
}

async parseSettleAuctionNoneCctp(
Expand All @@ -661,59 +686,76 @@ export class FastTransferSolanaWatcher extends SolanaWatcher {

/*
* `fetchAuction` fetches the auction from the chain first
* if the auction is closed, `matchingEngineProgram.fetchAuction` will throw an error
* we can catch this error and fetch the auction from the auction history
* it's more suitable to use try-catch here because it accounts for when the auction account
* is used to store other data.
* if `auctionAccount` is not null, decode it using borsh program and return
* otherwise, fetch the auction from the auction history
* if no auction is found even from history, return null
*/
async fetchAuction(pubkey: string): Promise<{
vaaHash: string;
info: AuctionInfo;
info: AuctionInfo | null;
} | null> {
try {
const auction = await this.matchingEngineProgram.fetchAuction({
address: new PublicKey(pubkey),
});

if (!auction.info) {
throw new Error('Auction info not found');
}
const auctionAccount = await this.connection?.getAccountInfo(new PublicKey(pubkey));

if (auctionAccount) {
const auctionInfo = this.matchingEngineBorshCoder.accounts.decode(
'Auction',
auctionAccount.data
);
// We need to do this manually because the account info given is in snake_case
return {
vaaHash: Buffer.from(auction.vaaHash).toString('hex'),
info: auction.info,
vaaHash: Buffer.from(auctionInfo.vaa_hash).toString('hex'),
info: {
configId: auctionInfo.info.config_id,
custodyTokenBump: auctionInfo.info.custody_token_bump,
vaaSequence: auctionInfo.info.vaa_sequence,
sourceChain: auctionInfo.info.source_chain,
bestOfferToken: auctionInfo.info.best_offer_token,
initialOfferToken: auctionInfo.info.initial_offer_token,
startSlot: auctionInfo.info.start_slot,
amountIn: auctionInfo.info.amount_in,
securityDeposit: auctionInfo.info.security_deposit,
offerPrice: auctionInfo.info.offer_price,
redeemerMessageLen: auctionInfo.info.redeemer_message_len,
destinationAssetInfo: auctionInfo.info.destination_asset_info,
},
};
} catch (e) {
try {
const auction = await this.fetchAuctionFromHistory(pubkey);
}

if (!auction) {
throw new Error('Auction not found');
}
const auction = await this.fetchAuctionFromHistory(pubkey);

return {
vaaHash: Buffer.from(auction.vaaHash).toString('hex'),
info: auction.info,
};
} catch (e) {
this.logger.error('Failed to fetch auction from history:', e);
}
if (!auction) {
this.logger.error(`[fetchAuction] no auction found for ${pubkey}`);
return null;
}

return null;
return {
vaaHash: Buffer.from(auction.vaaHash).toString('hex'),
info: auction.info,
};
}

async fetchEventFromLogs(name: string, logs: string[]) {
/*
* `getAuctionUpdatedFromLogs` fetches the auction updated event from the logs
* it's used to get the auction info from the auction updated event
* We only need `AuctionUpdated` event for now. If we need more events in the future, we can add them here
*/
getAuctionUpdatedFromLogs(logs: string[]): AuctionUpdated | null {
const parsedLogs = this.eventParser.parseLogs(logs);
for (let event of parsedLogs) {
if (event.name === name) {
if (this.isAuctionUpdatedEvent(event)) {
return event.data;
}
}

return null;
}

/*
* `isAuctionUpdatedEvent` is a type guard that checks if the event is an `AuctionUpdated` event
*/
isAuctionUpdatedEvent(event: Event): event is AuctionUpdatedEvent {
return event.name === 'AuctionUpdated' && event.data !== null && typeof event.data === 'object';
}

/*
* `fetchAuctionFromHistory` fetches the auction from the auction history
* if there is a mapping in the db, we fetch the auction from the auction history using the mapping
Expand Down Expand Up @@ -745,7 +787,8 @@ export class FastTransferSolanaWatcher extends SolanaWatcher {
let latestAuctionHistoryIndex = await this.getDbLatestAuctionHistoryIndex();
const auctionHistories = [];

while (true) {
let foundAllAuctionHistory = false;
while (!foundAllAuctionHistory) {
try {
const auctionHistory = await this.matchingEngineProgram.fetchAuctionHistory(
latestAuctionHistoryIndex
Expand All @@ -754,8 +797,7 @@ export class FastTransferSolanaWatcher extends SolanaWatcher {
latestAuctionHistoryIndex++;
} catch (error) {
// if no more auction history records to fetch or an error occurred, break the loop
this.logger.error('No more auction history records to fetch or an error occurred:', error);
break;
foundAllAuctionHistory = true;
}
}

Expand Down Expand Up @@ -790,7 +832,6 @@ export class FastTransferSolanaWatcher extends SolanaWatcher {

try {
const result = await this.pg('auction_history_mapping').max('index as maxIndex').first();
console.log(result);
return result && result.maxIndex !== null ? BigInt(result.maxIndex) : 0n;
} catch (error) {
this.logger.error('Failed to fetch the largest index from auction_history_mapping:', error);
Expand Down
8 changes: 5 additions & 3 deletions watcher/src/watchers/__tests__/FTSolanaWatcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ jest.setTimeout(60_000);

// This test is working, but testing it is not very useful since the return value is just the lastBlockKey.
// It is just an entrypoint to test the whole thing with a local postgres database.
// Skipping because it requires db
test.skip('getMessagesByBlock', async () => {
const watcher = new FastTransferSolanaWatcher('Testnet');
await watcher.getMessagesByBlock(301864980, 302864980);
Expand Down Expand Up @@ -197,7 +198,7 @@ test.skip('should fetch closed Auction', async () => {
const watcher = new FastTransferSolanaWatcher('Testnet');
const auction = await watcher.fetchAuction('FS4EAzWA2WuMKyGBy2C7EBvHL9W63NDX9JR4CPveAiDK');

if (!auction) {
if (!auction || !auction.info) {
throw new Error('Auction not found');
}

Expand Down Expand Up @@ -252,7 +253,7 @@ test('should fetch auction update from logs', async () => {
if (!tx.meta?.logMessages) {
throw new Error('No log messages');
}
const auctionUpdate = await watcher.fetchEventFromLogs('AuctionUpdated', tx.meta.logMessages);
const auctionUpdate = await watcher.getAuctionUpdatedFromLogs(tx.meta.logMessages);

if (!auctionUpdate) {
throw new Error('Auction update not found');
Expand All @@ -261,7 +262,7 @@ test('should fetch auction update from logs', async () => {
expect({
config_id: 2,
auction: auctionUpdate.auction.toString(),
vaa: auctionUpdate.vaa.toString(),
vaa: auctionUpdate.vaa?.toString(),
source_chain: auctionUpdate.source_chain,
target_protocol: {
Local: {
Expand Down Expand Up @@ -295,6 +296,7 @@ test('should fetch auction update from logs', async () => {
});
});

// Skipped because it requires database
test.skip('should index all auction history', async () => {
const watcher = new FastTransferSolanaWatcher('Testnet');
await watcher.indexAuctionHistory('77W4Votv6bK1tyq4xcvyo2V9gXYknXBwcZ53XErgcEs9');
Expand Down

0 comments on commit 1c827a8

Please sign in to comment.