diff --git a/packages/indexer/README.md b/packages/indexer/README.md index ce71a89..dba901d 100644 --- a/packages/indexer/README.md +++ b/packages/indexer/README.md @@ -28,19 +28,23 @@ DATABASE_NAME=mydatabase REDIS_HOST=localhost REDIS_PORT=6380 + RPC_PROVIDER_URLS_1=https://mainnet.infura.io/v3/xxx RPC_PROVIDER_URLS_10=https://optimism-mainnet.infura.io/v3/xxx RPC_PROVIDER_URLS_137=https://polygon-mainnet.infura.io/v3/xxx HUBPOOL_CHAIN=1 SPOKEPOOL_CHAINS_ENABLED=1,2 +PROVIDER_CACHE_TTL=3600 + +// optional PROVIDER_CACHE_NAMESPACE=indexer_provider_cache -MAX_CONCURRENCY=1 -PCT_RPC_CALLS_LOGGED=100 +NODE_MAX_CONCURRENCY=1 +NODE_PCT_RPC_CALLS_LOGGED_=100 STANDARD_TTL_BLOCK_DISTANCE=1 NO_TTL_BLOCK_DISTANCE=1000 PROVIDER_CACHE_TTL=100000 -NODE_QUORUM_THRESHOLD=1 -RETRIES=2 -DELAY=1000 +NODE_QUORUM=1 +NODE_RETRIES=2 +NODE_RETRY_DELAY=1000 ``` diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 8367400..4ba95e4 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -4,6 +4,8 @@ import Redis from "ioredis"; import * as across from "@across-protocol/sdk"; import { connectToDatabase } from "./database/database.provider"; import * as parseEnv from "./parseEnv"; +import { RetryProvidersFactory } from "./web3/RetryProvidersFactory"; +import { RedisCache } from "./redis/redisCache"; async function initializeRedis( config: parseEnv.RedisConfig, @@ -28,8 +30,9 @@ async function initializeRedis( export async function Main(config: parseEnv.Config, logger: winston.Logger) { const { redisConfig, postgresConfig, hubConfig, spokeConfigs } = config; - const redis = await initializeRedis(redisConfig, logger); + const redisCache = new RedisCache(redis); + const retryProvidersFactory = new RetryProvidersFactory(redisCache, logger); const postgres = await connectToDatabase(postgresConfig, logger); const bundleProcessor = new services.bundles.Processor({ logger, diff --git a/packages/indexer/src/parseEnv.ts b/packages/indexer/src/parseEnv.ts index c385156..bd91d3e 100644 --- a/packages/indexer/src/parseEnv.ts +++ b/packages/indexer/src/parseEnv.ts @@ -1,8 +1,9 @@ import assert from "assert"; -import { DatabaseConfig } from "@repo/indexer-database"; import * as s from "superstruct"; -import { RetryProviderConfig } from "./utils/contractUtils"; +import { DatabaseConfig } from "@repo/indexer-database"; import * as services from "./services"; +import { DEFAULT_NO_TTL_DISTANCE } from "./web3/constants"; +import { RetryProviderConfig } from "./utils"; export type Config = { redisConfig: RedisConfig; @@ -66,6 +67,7 @@ function parsePostgresConfig( dbName: env.DATABASE_NAME, }; } + function parseProviderConfigs(env: Env): ProviderConfig[] { const results: ProviderConfig[] = []; for (const [key, value] of Object.entries(process.env)) { @@ -115,6 +117,68 @@ function parseRetryProviderConfig( }; } +export function parseProvidersUrls() { + const results: Map = new Map(); + for (const [key, value] of Object.entries(process.env)) { + const match = key.match(/^RPC_PROVIDER_URLS_(\d+)$/); + if (match) { + const chainId = match[1] ? parseNumber(match[1]) : undefined; + if (chainId && value) { + const providerUrls = parseArray(value); + results.set(chainId, providerUrls); + } + } + } + return results; +} + +export function parseRetryProviderEnvs(chainId: number) { + const providerCacheNamespace = + process.env.PROVIDER_CACHE_NAMESPACE || "indexer_provider_cache"; + const maxConcurrency = Number( + process.env[`NODE_MAX_CONCURRENCY_${chainId}`] || + process.env.NODE_MAX_CONCURRENCY || + "25", + ); + const pctRpcCallsLogged = Number( + process.env[`NODE_PCT_RPC_CALLS_LOGGED_${chainId}`] || + process.env.NODE_PCT_RPC_CALLS_LOGGED || + "0", + ); + const providerCacheTtl = process.env.PROVIDER_CACHE_TTL + ? Number(process.env.PROVIDER_CACHE_TTL) + : undefined; + const nodeQuorumThreshold = Number( + process.env[`NODE_QUORUM_${chainId}`] || process.env.NODE_QUORUM || "1", + ); + const retries = Number( + process.env[`NODE_RETRIES_${chainId}`] || process.env.NODE_RETRIES || "0", + ); + const retryDelay = Number( + process.env[`NODE_RETRY_DELAY_${chainId}`] || + process.env.NODE_RETRY_DELAY || + "1", + ); + // Note: if there is no env var override _and_ no default, this will remain undefined and + // effectively disable indefinite caching of old blocks/keys. + const noTtlBlockDistance: number | undefined = process.env[ + `NO_TTL_BLOCK_DISTANCE_${chainId}` + ] + ? Number(process.env[`NO_TTL_BLOCK_DISTANCE_${chainId}`]) + : DEFAULT_NO_TTL_DISTANCE[chainId]; + + return { + providerCacheNamespace, + maxConcurrency, + pctRpcCallsLogged, + providerCacheTtl, + nodeQuorumThreshold, + retries, + retryDelay, + noTtlBlockDistance, + }; +} + export function envToConfig(env: Env): Config { assert(env.HUBPOOL_CHAIN, "Requires HUBPOOL_CHAIN"); const redisConfig = parseRedisConfig(env); diff --git a/packages/indexer/src/utils/contractUtils.ts b/packages/indexer/src/utils/contractUtils.ts index c3e7a51..ccc3b94 100644 --- a/packages/indexer/src/utils/contractUtils.ts +++ b/packages/indexer/src/utils/contractUtils.ts @@ -150,6 +150,7 @@ export type RetryProviderDeps = { cache: across.interfaces.CachingMechanismInterface; logger: winston.Logger; }; + export function getRetryProvider( params: RetryProviderConfig & RetryProviderDeps, ) { diff --git a/packages/indexer/src/web3/RetryProvidersFactory.ts b/packages/indexer/src/web3/RetryProvidersFactory.ts new file mode 100644 index 0000000..f890fa2 --- /dev/null +++ b/packages/indexer/src/web3/RetryProvidersFactory.ts @@ -0,0 +1,53 @@ +import { Logger } from "winston"; +import { providers } from "@across-protocol/sdk"; + +import { parseRetryProviderEnvs, parseProvidersUrls } from "../parseEnv"; +import { RedisCache } from "../redis/redisCache"; +import { getChainCacheFollowDistance } from "./constants"; + +export class RetryProvidersFactory { + private retryProviders: Map = new Map(); + + constructor( + private redisCache: RedisCache, + private logger: Logger, + ) {} + + public initializeProviders() { + const providersUrls = parseProvidersUrls(); + + for (const [chainId, providerUrls] of providersUrls.entries()) { + const retryProviderEnvs = parseRetryProviderEnvs(chainId); + if (!providerUrls || providerUrls.length === 0) { + throw new Error(`No provider urls found for chainId: ${chainId}`); + } + const standardTtlBlockDistance = getChainCacheFollowDistance(chainId); + const provider = new providers.RetryProvider( + providerUrls.map((url) => [url, chainId]), + chainId, + retryProviderEnvs.nodeQuorumThreshold, + retryProviderEnvs.retries, + retryProviderEnvs.retryDelay, + retryProviderEnvs.maxConcurrency, + retryProviderEnvs.providerCacheNamespace, + retryProviderEnvs.pctRpcCallsLogged, + this.redisCache, + standardTtlBlockDistance, + retryProviderEnvs.noTtlBlockDistance, + retryProviderEnvs.providerCacheTtl, + this.logger, + ); + this.retryProviders.set(chainId, provider); + } + } + + public getProviderForChainId(chainId: number) { + const retryProvider = this.retryProviders.get(chainId); + + if (!retryProvider) { + throw new Error(`No retry provider found for chainId: ${chainId}`); + } + + return retryProvider; + } +} diff --git a/packages/indexer/src/web3/constants.ts b/packages/indexer/src/web3/constants.ts new file mode 100644 index 0000000..42c2d19 --- /dev/null +++ b/packages/indexer/src/web3/constants.ts @@ -0,0 +1,61 @@ +import { CHAIN_IDs } from "@across-protocol/constants"; + +// This is the block distance at which the bot, by default, stores in redis with no TTL. +// These are all intended to be roughly 2 days of blocks for each chain. +// blocks = 172800 / avg_block_time +export const DEFAULT_NO_TTL_DISTANCE: { [chainId: number]: number } = { + [CHAIN_IDs.ARBITRUM]: 691200, + [CHAIN_IDs.BASE]: 86400, + [CHAIN_IDs.BLAST]: 86400, + [CHAIN_IDs.BOBA]: 86400, + [CHAIN_IDs.LINEA]: 57600, + [CHAIN_IDs.LISK]: 86400, + [CHAIN_IDs.MAINNET]: 14400, + [CHAIN_IDs.MODE]: 86400, + [CHAIN_IDs.OPTIMISM]: 86400, + [CHAIN_IDs.POLYGON]: 86400, + [CHAIN_IDs.REDSTONE]: 86400, + [CHAIN_IDs.SCROLL]: 57600, + [CHAIN_IDs.ZK_SYNC]: 172800, + [CHAIN_IDs.ZORA]: 86400, +}; + +// This is the max anticipated distance on each chain before RPC data is likely to be consistent amongst providers. +// This distance should consider re-orgs, but also the time needed for various RPC providers to agree on chain state. +// Provider caching will not be allowed for queries whose responses depend on blocks closer than this many blocks. +// This is intended to be conservative. +export const CHAIN_CACHE_FOLLOW_DISTANCE: { [chainId: number]: number } = { + [CHAIN_IDs.ARBITRUM]: 32, + [CHAIN_IDs.BASE]: 120, + [CHAIN_IDs.BLAST]: 120, + [CHAIN_IDs.BOBA]: 0, + [CHAIN_IDs.LISK]: 120, + [CHAIN_IDs.LINEA]: 100, // Linea has a soft-finality of 1 block. This value is padded - but at 3s/block the padding is 5 minutes + [CHAIN_IDs.MAINNET]: 128, + [CHAIN_IDs.MODE]: 120, + [CHAIN_IDs.OPTIMISM]: 120, + [CHAIN_IDs.POLYGON]: 256, + [CHAIN_IDs.REDSTONE]: 120, + [CHAIN_IDs.SCROLL]: 100, + [CHAIN_IDs.ZK_SYNC]: 512, + [CHAIN_IDs.ZORA]: 120, + // Testnets: + [CHAIN_IDs.ARBITRUM_SEPOLIA]: 0, + [CHAIN_IDs.BASE_SEPOLIA]: 0, + [CHAIN_IDs.BLAST_SEPOLIA]: 0, + [CHAIN_IDs.LISK_SEPOLIA]: 0, + [CHAIN_IDs.MODE_SEPOLIA]: 0, + [CHAIN_IDs.OPTIMISM_SEPOLIA]: 0, + [CHAIN_IDs.POLYGON_AMOY]: 0, + [CHAIN_IDs.SEPOLIA]: 0, +}; + +export const getChainCacheFollowDistance = (chainId: number) => { + const chainCacheFollowDistance = CHAIN_CACHE_FOLLOW_DISTANCE[chainId]; + + if (!chainCacheFollowDistance) { + throw new Error(`Invalid chain cache distance for chain id ${chainId}`); + } + + return chainCacheFollowDistance; +};