Skip to content

Commit

Permalink
watcher: make near archive watcher work
Browse files Browse the repository at this point in the history
  • Loading branch information
panoel committed Nov 26, 2024
1 parent beb6118 commit 393b218
Show file tree
Hide file tree
Showing 25 changed files with 260 additions and 86 deletions.
2 changes: 1 addition & 1 deletion watcher/scripts/backfillArbitrum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { Chain, contracts } from '@wormhole-foundation/sdk-base';
const watcher = new ArbitrumWatcher('Mainnet');
for (const blockNumber of blockNumbers) {
log.text = `Fetching block ${blockNumber}`;
const vaasByBlock = await watcher.getMessagesForBlocks(blockNumber, blockNumber);
const { vaasByBlock } = await watcher.getMessagesForBlocks(blockNumber, blockNumber);
await db.storeVaasByBlock(chain, vaasByBlock);
}
log.succeed('Uploaded messages to db successfully');
Expand Down
4 changes: 3 additions & 1 deletion watcher/scripts/locateMessageGaps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ import { ChainId, Network, toChain, toChainId } from '@wormhole-foundation/sdk-b
while (fromBlock <= rangeEnd && !found) {
const toBlock = Math.min(fromBlock + watcher.maximumBatchSize - 1, rangeEnd);
const messages = await watcher.getMessagesForBlocks(fromBlock, toBlock);
for (const message of Object.entries(messages).filter(([key, value]) => value.length > 0)) {
for (const message of Object.entries(messages.vaasByBlock).filter(
([key, value]) => value.length > 0
)) {
const locatedMessages = message[1].filter((msgKey) => {
const [_transaction, vaaKey] = msgKey.split(':');
const [_chain, msgEmitter, msgSeq] = vaaKey.split('/');
Expand Down
2 changes: 1 addition & 1 deletion watcher/src/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Mode } from '@wormhole-foundation/wormhole-monitor-common';
import { AxiosRequestConfig } from 'axios';

export const TIMEOUT = 0.5 * 1000;
export const HB_INTERVAL = 5 * 60 * 1000; // 5 Minutes
export const HB_INTERVAL = 15 * 60 * 1000; // 15 Minutes
export type WorkerData = {
network: Network;
chain: Chain;
Expand Down
7 changes: 5 additions & 2 deletions watcher/src/watchers/AlgorandWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ export class AlgorandWatcher extends Watcher {
return messages;
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const txIds = await this.getApplicationLogTransactionIds(fromBlock, toBlock);
const transactions = [];
for (const txId of txIds) {
Expand All @@ -124,6 +127,6 @@ export class AlgorandWatcher extends Watcher {
if (!vaasByBlock[toBlockKey]) {
vaasByBlock[toBlockKey] = [];
}
return vaasByBlock;
return { vaasByBlock };
}
}
7 changes: 5 additions & 2 deletions watcher/src/watchers/AptosWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ export class AptosWatcher extends Watcher {
);
}

async getMessagesForBlocks(fromSequence: number, toSequence: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromSequence: number,
toSequence: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const limit = toSequence - fromSequence + 1;
const events: AptosEvent[] = (await this.client.getEventsByEventHandle(
this.coreBridgeAddress,
Expand All @@ -63,7 +66,7 @@ export class AptosWatcher extends Watcher {
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] ?? []), vaaKey];
})
);
return vaasByBlock;
return { vaasByBlock };
}

isValidBlockKey(key: string) {
Expand Down
7 changes: 5 additions & 2 deletions watcher/src/watchers/CosmwasmWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ export class CosmwasmWatcher extends Watcher {
throw new Error(`Unable to parse result of ${this.latestBlockTag} on ${this.rpc}`);
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const address = contracts.coreBridge.get(this.network, this.chain);
if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
Expand Down Expand Up @@ -153,7 +156,7 @@ export class CosmwasmWatcher extends Watcher {
}
}
}
return vaasByBlock;
return { vaasByBlock };
}
}

Expand Down
12 changes: 10 additions & 2 deletions watcher/src/watchers/EVMWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class EVMWatcher extends Watcher {
this.lastTimestamp = 0;
this.latestFinalizedBlockNumber = 0;
this.finalizedBlockTag = finalizedBlockTag;
// Special cases for batch size
if (chain === 'Acala' || chain === 'Karura' || chain === 'Berachain') {
this.maximumBatchSize = 50;
} else if (
Expand All @@ -55,6 +56,10 @@ export class EVMWatcher extends Watcher {
) {
this.maximumBatchSize = 10;
}
// Special cases for watch loop delay
if (chain === 'Berachain') {
this.watchLoopDelay = 1000;
}
}

async getBlock(blockNumberOrTag: number | BlockTag): Promise<Block> {
Expand Down Expand Up @@ -221,7 +226,10 @@ export class EVMWatcher extends Watcher {
return block.number;
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const address = contracts.coreBridge.get(this.network, this.chain);
if (!address) {
throw new Error(`Core contract not defined for ${this.chain} on ${this.network}!`);
Expand Down Expand Up @@ -252,6 +260,6 @@ export class EVMWatcher extends Watcher {
const blockKey = makeBlockKey(blockNumber.toString(), timestampsByBlock[blockNumber]);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
return vaasByBlock;
return { vaasByBlock };
}
}
7 changes: 5 additions & 2 deletions watcher/src/watchers/InjectiveExplorerWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ export class InjectiveExplorerWatcher extends Watcher {
// should be core, but the explorer doesn't support it yet
// use "to": as the pagination key
// compare block height ("block_number":) with what is passed in.
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const coreAddress = contracts.coreBridge.get(this.network, this.chain);
const address = contracts.tokenBridge.get(this.network, this.chain);
if (!coreAddress || !address) {
Expand Down Expand Up @@ -169,7 +172,7 @@ export class InjectiveExplorerWatcher extends Watcher {
);
vaasByBlock[blockKey] = [];
}
return vaasByBlock;
return { vaasByBlock };
}
}

Expand Down
120 changes: 107 additions & 13 deletions watcher/src/watchers/NearArchiveWatcher.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
import { decode } from 'bs58';
import { Provider } from 'near-api-js/lib/providers';
import { BlockResult } from 'near-api-js/lib/providers/provider';
import { BlockResult, ExecutionStatus } from 'near-api-js/lib/providers/provider';
import { z } from 'zod';
import { VaasByBlock } from '../databases/types';
import { makeBlockKey } from '../databases/utils';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import {
fetchBlockByBlockId,
getMessagesFromBlockResults,
getNearProvider,
getTimestampByBlock,
isWormholePublishEventLog,
} from '../utils/near';
import { Watcher } from './Watcher';
import { assertEnvironmentVariable, sleep } from '@wormhole-foundation/wormhole-monitor-common';
import { Network, contracts } from '@wormhole-foundation/sdk-base';
import axios from 'axios';
import { AXIOS_CONFIG_JSON } from '../consts';
import { AXIOS_CONFIG_JSON, HB_INTERVAL } from '../consts';
import { EventLog } from 'src/types/near';

export class NearArchiveWatcher extends Watcher {
provider: Provider | null = null;

constructor(network: Network) {
super(network, 'Near');
this.maximumBatchSize = 1000;
this.maximumBatchSize = 1_000_000;
this.watchLoopDelay = 60 * 60 * 1000; // 1 hour
}

async getFinalizedBlockNumber(): Promise<number> {
Expand All @@ -37,7 +39,11 @@ export class NearArchiveWatcher extends Watcher {
}
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const quittingTimestamp = Date.now() + HB_INTERVAL * 0.75;
const origFromBlock = fromBlock;
const origToBlock = toBlock;
this.logger.info(`fetching info for blocks ${origFromBlock} to ${origToBlock}`);
Expand Down Expand Up @@ -114,21 +120,26 @@ export class NearArchiveWatcher extends Watcher {
}

this.logger.info(`Fetched ${blocks.length} blocks`);
const vaasByBlock: VaasByBlock = await getMessagesFromBlockResults(
const response: ConstrainedResponse = await this.getMessagesFromBlockResultsConstrained(
this.network,
provider,
blocks,
true
quittingTimestamp
);
// This is the case where there are no transactions in the time window.
if (response.lastBlockHeight === 0) {
response.lastBlockHeight = toBlock;
}
const lastBlockInfo = await fetchBlockByBlockId(provider, response.lastBlockHeight);
// Make a block for the to_block, if it isn't already there
const blockKey = makeBlockKey(
toBlockInfo.header.height.toString(),
new Date(toBlockInfo.header.timestamp / 1_000_000).toISOString()
response.lastBlockHeight.toString(),
new Date(lastBlockInfo.header.timestamp / 1_000_000).toISOString()
);
if (!vaasByBlock[blockKey]) {
vaasByBlock[blockKey] = [];
if (!response.vaasByBlock[blockKey]) {
response.vaasByBlock[blockKey] = [];
}
return vaasByBlock;
return response;
}

async getProvider(): Promise<Provider> {
Expand Down Expand Up @@ -191,7 +202,90 @@ export class NearArchiveWatcher extends Watcher {
}
return txs.reverse();
}

async getMessagesFromBlockResultsConstrained(
network: Network,
provider: Provider,
blocks: BlockResult[],
quittingTime: number
): Promise<ConstrainedResponse> {
const vaasByBlock: VaasByBlock = {};
let lastBlockHeight = 0;
let prevLastBlockHeight = 0;
this.logger.debug(`Fetching messages from ${blocks.length} blocks...`);
try {
for (let i = 0; i < blocks.length; i++) {
this.logger.debug(`Fetching messages from block ${i + 1}/${blocks.length}...`);
const { height, timestamp } = blocks[i].header;
prevLastBlockHeight = lastBlockHeight;
lastBlockHeight = height;
const blockKey = makeBlockKey(
height.toString(),
new Date(timestamp / 1_000_000).toISOString()
);
let localVaasByBlock: VaasByBlock = {};
localVaasByBlock[blockKey] = [];

const chunks = [];
this.logger.debug('attempting to fetch chunks');
for (const chunk of blocks[i].chunks) {
chunks.push(await provider.chunk(chunk.chunk_hash));
}

const transactions = chunks.flatMap(({ transactions }) => transactions);
const coreBridge = contracts.coreBridge.get(network, 'Near');
if (!coreBridge) {
throw new Error('Unable to get contract address for Near');
}
this.logger.debug(`attempting to fetch ${transactions.length} transactions`);
const totTx = transactions.length;
let txCount = 1;
for (const tx of transactions) {
this.logger.debug(`fetching transaction ${txCount}/${totTx}`);
txCount++;
const outcome = await provider.txStatus(tx.hash, coreBridge);
const logs = outcome.receipts_outcome
.filter(
({ outcome }) =>
(outcome as any).executor_id === coreBridge &&
(outcome.status as ExecutionStatus).SuccessValue
)
.flatMap(({ outcome }) => outcome.logs)
.filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat
.map((log) => JSON.parse(log.slice(11)) as EventLog)
.filter(isWormholePublishEventLog);
for (const log of logs) {
const vaaKey = makeVaaKey(tx.hash, 'Near', log.emitter, log.seq.toString());
localVaasByBlock[blockKey] = [...localVaasByBlock[blockKey], vaaKey];
}
}
this.logger.debug(
`Fetched ${localVaasByBlock[blockKey].length} messages from block ${blockKey}`
);
vaasByBlock[blockKey] = localVaasByBlock[blockKey];
if (Date.now() >= quittingTime) {
this.logger.warn(`Quitting early due to time constraint.`);
break;
}
}
} catch (e) {
this.logger.error(`Near block getMessagesFromBlockResultsConstrained error: ${e}`);
this.logger.warn(`Quitting early due to error.`);
lastBlockHeight = prevLastBlockHeight;
}

const numMessages = Object.values(vaasByBlock).flat().length;
this.logger.debug(`Fetched ${numMessages} messages from ${blocks.length} blocks`);

return { vaasByBlock, lastBlockHeight };
}
}

type ConstrainedResponse = {
vaasByBlock: VaasByBlock;
lastBlockHeight: number;
};

type GetTransactionsByAccountIdResponse = {
txns: NearTxn[];
};
Expand Down
9 changes: 7 additions & 2 deletions watcher/src/watchers/NearWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ export class NearWatcher extends Watcher {
return block.header.height;
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
// assume toBlock was retrieved from getFinalizedBlockNumber and is finalized
this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const provider = await this.getProvider();
Expand All @@ -48,7 +51,9 @@ export class NearWatcher extends Watcher {
}
}

return getMessagesFromBlockResults(this.network, provider, blocks);
return {
vaasByBlock: await getMessagesFromBlockResults(this.network, provider, blocks),
};
}

async getProvider(): Promise<Provider> {
Expand Down
7 changes: 5 additions & 2 deletions watcher/src/watchers/SeiExplorerWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ export class SeiExplorerWatcher extends CosmwasmWatcher {

// retrieve blocks for core contract
// compare block height with what is passed in
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const address = contracts.coreBridge.get(this.network, this.chain);
if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
Expand Down Expand Up @@ -237,6 +240,6 @@ export class SeiExplorerWatcher extends CosmwasmWatcher {
}
// NOTE: this does not set an empty entry for the latest block since we don't know if the graphql response
// is synced with the block height. Therefore, the latest block will only update when a new transaction appears.
return vaasByBlock;
return { vaasByBlock };
}
}
7 changes: 5 additions & 2 deletions watcher/src/watchers/SolanaWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ export class SolanaWatcher extends Watcher {
return block;
}

async getMessagesForBlocks(fromSlot: number, toSlot: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromSlot: number,
toSlot: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
// in the rare case of maximumBatchSize skipped blocks in a row,
// you might hit this error due to the recursion below
if (fromSlot > toSlot) throw new Error('solana: invalid block range');
Expand Down Expand Up @@ -251,7 +254,7 @@ export class SolanaWatcher extends Watcher {
toSlot.toString(),
new Date(toBlock.blockTime! * 1000).toISOString()
);
return { [lastBlockKey]: [], ...vaasByBlock };
return { vaasByBlock: { [lastBlockKey]: [], ...vaasByBlock } };
}

isValidVaaKey(key: string) {
Expand Down
Loading

0 comments on commit 393b218

Please sign in to comment.