From 00e747f562465bd6f5cc9fd8cdcf5bd065733f42 Mon Sep 17 00:00:00 2001 From: David A <4429761+daywiss@users.noreply.github.com> Date: Thu, 26 Sep 2024 11:20:09 -0400 Subject: [PATCH] improve(indexer): refactor configuration parsing (#45) Signed-off-by: david --- apps/node/src/app.ts | 2 +- packages/indexer/README.md | 27 +++ packages/indexer/src/generics/BaseIndexer.ts | 2 +- packages/indexer/src/index.ts | 1 + packages/indexer/src/main.ts | 198 ++---------------- packages/indexer/src/parseEnv.ts | 178 ++++++++++++++++ packages/indexer/src/services/bundles.ts | 2 +- .../indexer/src/services/hubPoolIndexer.ts | 3 +- .../indexer/src/services/spokePoolIndexer.ts | 16 +- packages/indexer/src/utils/contractUtils.ts | 6 +- 10 files changed, 230 insertions(+), 205 deletions(-) create mode 100644 packages/indexer/src/parseEnv.ts diff --git a/apps/node/src/app.ts b/apps/node/src/app.ts index 09ea294..175c842 100644 --- a/apps/node/src/app.ts +++ b/apps/node/src/app.ts @@ -37,7 +37,7 @@ async function run() { void (await Template.Main(process.env)); return "Example template app running"; case "indexer": - void (await Indexer.Main(process.env, logger)); + void (await Indexer.Main(Indexer.envToConfig(process.env), logger)); break; case "persistence-example": void (await PersistenceExample.Main(process.env)); diff --git a/packages/indexer/README.md b/packages/indexer/README.md index 767bb18..ce71a89 100644 --- a/packages/indexer/README.md +++ b/packages/indexer/README.md @@ -17,3 +17,30 @@ Without docker: ## Test In this package run `pnpm test` + +## ENV +``` +DATABASE_HOST=localhost +DATABASE_PORT=5432 +DATABASE_USER=user +DATABASE_PASSWORD=password +DATABASE_NAME=mydatabase + +REDIS_HOST=localhost +REDIS_PORT=6380 +RPC_PROVIDER_URLS_1=https://mainnet.infura.io/v3/xxx +RPC_PROVIDER_URLS_10=https://optimism-mainnet.infura.io/v3/xxx +RPC_PROVIDER_URLS_137=https://polygon-mainnet.infura.io/v3/xxx +HUBPOOL_CHAIN=1 +SPOKEPOOL_CHAINS_ENABLED=1,2 + +PROVIDER_CACHE_NAMESPACE=indexer_provider_cache +MAX_CONCURRENCY=1 +PCT_RPC_CALLS_LOGGED=100 +STANDARD_TTL_BLOCK_DISTANCE=1 +NO_TTL_BLOCK_DISTANCE=1000 +PROVIDER_CACHE_TTL=100000 +NODE_QUORUM_THRESHOLD=1 +RETRIES=2 +DELAY=1000 +``` diff --git a/packages/indexer/src/generics/BaseIndexer.ts b/packages/indexer/src/generics/BaseIndexer.ts index 204651a..7f88305 100644 --- a/packages/indexer/src/generics/BaseIndexer.ts +++ b/packages/indexer/src/generics/BaseIndexer.ts @@ -37,7 +37,7 @@ export abstract class BaseIndexer { this.logger.error({ at: "BaseIndexer#start", message: `Failed to initialize ${this.name}`, - error: e, + error: (e as unknown as Error).message, }); return; } diff --git a/packages/indexer/src/index.ts b/packages/indexer/src/index.ts index d66d2cd..025a49c 100644 --- a/packages/indexer/src/index.ts +++ b/packages/indexer/src/index.ts @@ -1 +1,2 @@ export * from "./main"; +export * from "./parseEnv"; diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 05ddda2..8367400 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -1,19 +1,14 @@ -import assert from "assert"; import * as services from "./services"; import winston from "winston"; import Redis from "ioredis"; -import * as s from "superstruct"; -import * as acrossConstants from "@across-protocol/constants"; import * as across from "@across-protocol/sdk"; import { connectToDatabase } from "./database/database.provider"; -import { providers } from "ethers"; -import { DatabaseConfig } from "@repo/indexer-database"; +import * as parseEnv from "./parseEnv"; -type RedisConfig = { - host: string; - port: number; -}; -async function initializeRedis(config: RedisConfig, logger: winston.Logger) { +async function initializeRedis( + config: parseEnv.RedisConfig, + logger: winston.Logger, +) { const redis = new Redis({ ...config, }); @@ -31,197 +26,30 @@ async function initializeRedis(config: RedisConfig, logger: winston.Logger) { }); } -function getPostgresConfig( - env: Record, -): DatabaseConfig { - assert(env.DATABASE_HOST, "requires DATABASE_HOST"); - assert(env.DATABASE_PORT, "requires DATABASE_PORT"); - assert(env.DATABASE_USER, "requires DATABASE_USER"); - assert(env.DATABASE_PASSWORD, "requires DATABASE_PASSWORD"); - assert(env.DATABASE_NAME, "requires DATABASE_NAME"); - return { - host: env.DATABASE_HOST, - port: env.DATABASE_PORT, - user: env.DATABASE_USER, - password: env.DATABASE_PASSWORD, - dbName: env.DATABASE_NAME, - }; -} - -type RetryProviderConfig = { - providerCacheNamespace: string; - maxConcurrency: number; - pctRpcCallsLogged: number; - standardTtlBlockDistance: number; - noTtlBlockDistance: number; - providerCacheTtl: number; - nodeQuorumThreshold: number; - retries: number; - delay: number; -}; -// superstruct coersion to turn string into an int and validate -const stringToInt = s.coerce(s.number(), s.string(), (value) => - parseInt(value), -); -function getRetryProviderConfig( - env: Record, -): RetryProviderConfig { - assert(env.PROVIDER_CACHE_NAMESPACE, "requires PROVIDER_CACHE_NAMESPACE"); - assert(env.MAX_CONCURRENCY, "requires MAX_CONCURRENCY"); - assert(env.PCT_RPC_CALLS_LOGGED, "requires PCT_RPC_CALLS_LOGGED"); - assert( - env.STANDARD_TTL_BLOCK_DISTANCE, - "requires STANDARD_TTL_BLOCK_DISTANCE", - ); - assert(env.NO_TTL_BLOCK_DISTANCE, "requires NO_TTL_BLOCK_DISTANCE"); - assert(env.PROVIDER_CACHE_TTL, "requires PROVIDER_CACHE_TTL"); - assert(env.NODE_QUORUM_THRESHOLD, "requires NODE_QUORUM_THRESHOLD"); - assert(env.RETRIES, "requires RETRIES"); - assert(env.DELAY, "requires DELAY"); - return { - providerCacheNamespace: env.PROVIDER_CACHE_NAMESPACE, - maxConcurrency: s.create(env.MAX_CONCURRENCY, stringToInt), - pctRpcCallsLogged: s.create(env.PCT_RPC_CALLS_LOGGED, stringToInt), - standardTtlBlockDistance: s.create( - env.STANDARD_TTL_BLOCK_DISTANCE, - stringToInt, - ), - noTtlBlockDistance: s.create(env.NO_TTL_BLOCK_DISTANCE, stringToInt), - providerCacheTtl: s.create(env.PROVIDER_CACHE_TTL, stringToInt), - nodeQuorumThreshold: s.create(env.NODE_QUORUM_THRESHOLD, stringToInt), - retries: s.create(env.RETRIES, stringToInt), - delay: s.create(env.DELAY, stringToInt), - }; -} - -// utility call to create the spoke pool event indexer config -async function getSpokePoolIndexerConfig(params: { - retryProviderConfig: RetryProviderConfig; - spokePoolProviderUrl: string; - hubPoolNetworkInfo: providers.Network; - hubPoolProviderUrl: string; -}) { - const { - retryProviderConfig, - spokePoolProviderUrl, - hubPoolProviderUrl, - hubPoolNetworkInfo, - } = params; - const tempSpokeProvider = new providers.JsonRpcProvider(spokePoolProviderUrl); - const spokePoolNetworkInfo = await tempSpokeProvider.getNetwork(); - return { - retryProviderConfig, - configStoreConfig: { - chainId: hubPoolNetworkInfo.chainId, - providerUrl: hubPoolProviderUrl, - maxBlockLookBack: 10000, - }, - hubConfig: { - chainId: hubPoolNetworkInfo.chainId, - providerUrl: hubPoolProviderUrl, - maxBlockLookBack: 10000, - }, - spokeConfig: { - chainId: spokePoolNetworkInfo.chainId, - providerUrl: spokePoolProviderUrl, - // TODO: Set this per chain - maxBlockLookBack: 10000, - }, - redisKeyPrefix: `spokePoolIndexer:${spokePoolNetworkInfo.chainId}`, - }; -} -// utility call to create the hubpool event indexer config -async function getHubPoolIndexerConfig(params: { - retryProviderConfig: RetryProviderConfig; - hubPoolNetworkInfo: providers.Network; - hubPoolProviderUrl: string; -}) { - const { retryProviderConfig, hubPoolProviderUrl, hubPoolNetworkInfo } = - params; - return { - retryProviderConfig, - hubConfig: { - chainId: hubPoolNetworkInfo.chainId, - providerUrl: hubPoolProviderUrl, - maxBlockLookBack: 10000, - }, - redisKeyPrefix: `hubPoolIndexer:${hubPoolNetworkInfo.chainId}`, - }; -} - -export async function Main( - env: Record, - logger: winston.Logger, -) { - const spokePoolProviderUrls: string[] = Object.values( - acrossConstants.MAINNET_CHAIN_IDs, - ) - .map((chainId) => env[`INDEXER_SPOKEPOOL_PROVIDER_URL_${chainId}`]) - .filter((x): x is string => !!x); - - assert( - spokePoolProviderUrls.length > 0, - "Must provide a url for at least one provider on one chain, for example: INDEXER_SPOKEPOOL_PROVIDER_URL_1", - ); - - assert( - env.INDEXER_HUBPOOL_PROVIDER_URL, - "requires INDEXER_HUBPOOL_PROVIDER_URL", - ); - const hubPoolProviderUrl = env.INDEXER_HUBPOOL_PROVIDER_URL; - assert(env.INDEXER_REDIS_HOST, "requires INDEXER_REDIS_HOST"); - assert(env.INDEXER_REDIS_PORT, "requires INDEXER_REDIS_PORT"); - const redisConfig = { - host: env.INDEXER_REDIS_HOST, - port: Number(env.INDEXER_REDIS_PORT), - }; +export async function Main(config: parseEnv.Config, logger: winston.Logger) { + const { redisConfig, postgresConfig, hubConfig, spokeConfigs } = config; const redis = await initializeRedis(redisConfig, logger); - - const postgresConfig = getPostgresConfig(env); const postgres = await connectToDatabase(postgresConfig, logger); - - const retryProviderConfig = getRetryProviderConfig(env); - const tempHubProvider = new providers.JsonRpcProvider(hubPoolProviderUrl); - const hubPoolNetworkInfo = await tempHubProvider.getNetwork(); const bundleProcessor = new services.bundles.Processor({ logger, redis, postgres, }); - const spokePoolIndexers: Array = []; - const hubPoolIndexerConfig = await getHubPoolIndexerConfig({ - hubPoolNetworkInfo, - hubPoolProviderUrl, - retryProviderConfig, - }); - // canonical hubpool indexer const hubPoolIndexer = new services.hubPoolIndexer.Indexer({ logger, redis, postgres, - ...hubPoolIndexerConfig, + ...hubConfig, }); - // instanciate multiple spoke pool event indexers - for (const spokePoolProviderUrl of spokePoolProviderUrls) { - const config = await getSpokePoolIndexerConfig({ - hubPoolNetworkInfo, - spokePoolProviderUrl, - hubPoolProviderUrl, - retryProviderConfig, - }); - logger.info({ - message: "Starting indexer", - ...config, - }); - const spokeIndexer = new services.spokePoolIndexer.Indexer({ + const spokePoolIndexers = spokeConfigs.map((spokeConfig) => { + return new services.spokePoolIndexer.Indexer({ logger, redis, postgres, - ...config, + ...spokeConfig, }); - spokePoolIndexers.push(spokeIndexer); - } + }); let exitRequested = false; process.on("SIGINT", () => { @@ -260,7 +88,7 @@ export async function Main( (r) => r.status === "fulfilled", ), bundleProcessorRunSuccess: bundleResults.status === "fulfilled", - hubPoolRunSucccess: hubPoolResult.status === "fulfilled", + hubPoolRunSuccess: hubPoolResult.status === "fulfilled", }, }); diff --git a/packages/indexer/src/parseEnv.ts b/packages/indexer/src/parseEnv.ts new file mode 100644 index 0000000..c385156 --- /dev/null +++ b/packages/indexer/src/parseEnv.ts @@ -0,0 +1,178 @@ +import assert from "assert"; +import { DatabaseConfig } from "@repo/indexer-database"; +import * as s from "superstruct"; +import { RetryProviderConfig } from "./utils/contractUtils"; +import * as services from "./services"; + +export type Config = { + redisConfig: RedisConfig; + postgresConfig: DatabaseConfig; + spokeConfigs: Omit< + services.spokePoolIndexer.Config, + "logger" | "redis" | "postgres" + >[]; + hubConfig: Omit< + services.hubPoolIndexer.Config, + "logger" | "redis" | "postgres" + >; +}; +export type RedisConfig = { + host: string; + port: number; +}; +export type ProviderConfig = [providerUrl: string, chainId: number]; + +export type Env = Record; + +function parseRedisConfig(env: Env): RedisConfig { + assert(env.REDIS_HOST, "requires REDIS_HOST"); + assert(env.REDIS_PORT, "requires REDIS_PORT"); + const port = parseNumber(env.REDIS_PORT); + return { + host: env.REDIS_HOST, + port, + }; +} + +function parseArray(value: string | undefined): string[] { + if (value === undefined) return []; + return value + .split(",") + .map((x) => x.trim()) + .filter((x) => x.length); +} + +// superstruct coersion to turn string into an int and validate +const stringToInt = s.coerce(s.number(), s.string(), (value) => + parseInt(value), +); +function parseNumber(value: string): number { + return s.create(value, stringToInt); +} + +function parsePostgresConfig( + env: Record, +): DatabaseConfig { + assert(env.DATABASE_HOST, "requires DATABASE_HOST"); + assert(env.DATABASE_PORT, "requires DATABASE_PORT"); + assert(env.DATABASE_USER, "requires DATABASE_USER"); + assert(env.DATABASE_PASSWORD, "requires DATABASE_PASSWORD"); + assert(env.DATABASE_NAME, "requires DATABASE_NAME"); + return { + host: env.DATABASE_HOST, + port: env.DATABASE_PORT, + user: env.DATABASE_USER, + password: env.DATABASE_PASSWORD, + dbName: env.DATABASE_NAME, + }; +} +function parseProviderConfigs(env: Env): ProviderConfig[] { + const results: ProviderConfig[] = []; + for (const [key, value] of Object.entries(process.env)) { + const match = key.match(/^RPC_PROVIDER_URLS_(\d+)$/); + if (match) { + const chainId = match[1] ? parseNumber(match[1]) : undefined; + if (chainId && value) { + const providerUrls = parseArray(value); + providerUrls.forEach((url) => { + results.push([url, chainId]); + }); + } + } + } + return results; +} + +function parseRetryProviderConfig( + env: Record, +): Omit { + assert(env.PROVIDER_CACHE_NAMESPACE, "requires PROVIDER_CACHE_NAMESPACE"); + assert(env.MAX_CONCURRENCY, "requires MAX_CONCURRENCY"); + assert(env.PCT_RPC_CALLS_LOGGED, "requires PCT_RPC_CALLS_LOGGED"); + assert( + env.STANDARD_TTL_BLOCK_DISTANCE, + "requires STANDARD_TTL_BLOCK_DISTANCE", + ); + assert(env.NO_TTL_BLOCK_DISTANCE, "requires NO_TTL_BLOCK_DISTANCE"); + assert(env.PROVIDER_CACHE_TTL, "requires PROVIDER_CACHE_TTL"); + assert(env.NODE_QUORUM_THRESHOLD, "requires NODE_QUORUM_THRESHOLD"); + assert(env.RETRIES, "requires RETRIES"); + assert(env.DELAY, "requires DELAY"); + + return { + providerCacheNamespace: env.PROVIDER_CACHE_NAMESPACE, + maxConcurrency: s.create(env.MAX_CONCURRENCY, stringToInt), + pctRpcCallsLogged: s.create(env.PCT_RPC_CALLS_LOGGED, stringToInt), + standardTtlBlockDistance: s.create( + env.STANDARD_TTL_BLOCK_DISTANCE, + stringToInt, + ), + noTtlBlockDistance: s.create(env.NO_TTL_BLOCK_DISTANCE, stringToInt), + providerCacheTtl: s.create(env.PROVIDER_CACHE_TTL, stringToInt), + nodeQuorumThreshold: s.create(env.NODE_QUORUM_THRESHOLD, stringToInt), + retries: s.create(env.RETRIES, stringToInt), + delay: s.create(env.DELAY, stringToInt), + }; +} + +export function envToConfig(env: Env): Config { + assert(env.HUBPOOL_CHAIN, "Requires HUBPOOL_CHAIN"); + const redisConfig = parseRedisConfig(env); + const postgresConfig = parsePostgresConfig(env); + const allProviderConfigs = parseProviderConfigs(env); + const retryProviderConfig = parseRetryProviderConfig(env); + const hubPoolChain = parseNumber(env.HUBPOOL_CHAIN); + const spokePoolChainsEnabled = parseArray(env.SPOKEPOOL_CHAINS_ENABLED).map( + parseNumber, + ); + const providerConfigs = allProviderConfigs.filter( + (provider) => provider[1] === hubPoolChain, + ); + assert( + allProviderConfigs.length > 0, + `Requires at least one RPC_PROVIDER_URLS_CHAINID`, + ); + + const hubConfig = { + retryProviderConfig: { + ...retryProviderConfig, + chainId: hubPoolChain, + providerConfigs, + }, + hubConfig: { + chainId: hubPoolChain, + maxBlockLookBack: 10000, + }, + redisKeyPrefix: `hubPoolIndexer:${hubPoolChain}`, + }; + + const spokeConfigs = spokePoolChainsEnabled.map((chainId) => { + const providerConfigs = allProviderConfigs.filter( + (provider) => provider[1] == chainId, + ); + assert( + providerConfigs.length > 0, + `SPOKEPOOL_CHAINS_ENABLED=${chainId} but did not find any corresponding RPC_PROVIDER_URLS_${chainId}`, + ); + return { + retryProviderConfig: { + ...retryProviderConfig, + chainId, + providerConfigs, + }, + spokeConfig: { + chainId, + maxBlockLookBack: 10000, + }, + hubConfig: hubConfig.hubConfig, + redisKeyPrefix: `spokePoolIndexer:${chainId}`, + }; + }); + + return { + redisConfig, + postgresConfig, + hubConfig, + spokeConfigs, + }; +} diff --git a/packages/indexer/src/services/bundles.ts b/packages/indexer/src/services/bundles.ts index d31fe11..39442b8 100644 --- a/packages/indexer/src/services/bundles.ts +++ b/packages/indexer/src/services/bundles.ts @@ -13,7 +13,7 @@ const BLOCKS_PER_BUNDLE = Math.floor( BUNDLE_LIVENESS_SECONDS / AVERAGE_SECONDS_PER_BLOCK, ); -type BundleConfig = { +export type BundleConfig = { logger: winston.Logger; redis: Redis | undefined; postgres: DataSource | undefined; diff --git a/packages/indexer/src/services/hubPoolIndexer.ts b/packages/indexer/src/services/hubPoolIndexer.ts index 924340a..cb003d0 100644 --- a/packages/indexer/src/services/hubPoolIndexer.ts +++ b/packages/indexer/src/services/hubPoolIndexer.ts @@ -10,14 +10,13 @@ import { HubPoolRepository } from "../database/HubPoolRepository"; import { getDeployedBlockNumber } from "@across-protocol/contracts"; import { differenceWith, isEqual } from "lodash"; -type Config = { +export type Config = { logger: winston.Logger; redis: Redis; postgres: DataSource; retryProviderConfig: utils.RetryProviderConfig; hubConfig: { chainId: number; - providerUrl: string; maxBlockLookBack: number; }; redisKeyPrefix: string; diff --git a/packages/indexer/src/services/spokePoolIndexer.ts b/packages/indexer/src/services/spokePoolIndexer.ts index 20e2600..2c5740d 100644 --- a/packages/indexer/src/services/spokePoolIndexer.ts +++ b/packages/indexer/src/services/spokePoolIndexer.ts @@ -13,24 +13,17 @@ import { BaseIndexer } from "../generics"; import { providers } from "ethers"; import { Processor } from "./spokePoolProcessor"; -type Config = { +export type Config = { logger: winston.Logger; redis: Redis; postgres: DataSource; retryProviderConfig: utils.RetryProviderConfig; - configStoreConfig: { - chainId: number; - providerUrl: string; - maxBlockLookBack: number; - }; hubConfig: { chainId: number; - providerUrl: string; maxBlockLookBack: number; }; spokeConfig: { chainId: number; - providerUrl: string; maxBlockLookBack: number; }; redisKeyPrefix: string; @@ -57,7 +50,6 @@ export class Indexer extends BaseIndexer { redisKeyPrefix, hubConfig, spokeConfig, - configStoreConfig, } = this.config; this.resolvedRangeStore = new RangeQueryStore({ @@ -76,7 +68,7 @@ export class Indexer extends BaseIndexer { ...retryProviderConfig, cache: redisCache, logger, - ...configStoreConfig, + ...hubConfig, }); this.spokePoolProvider = utils.getRetryProvider({ ...retryProviderConfig, @@ -100,8 +92,8 @@ export class Indexer extends BaseIndexer { this.configStoreClient = await utils.getConfigStoreClient({ logger, provider: configStoreProvider, - maxBlockLookBack: configStoreConfig.maxBlockLookBack, - chainId: configStoreConfig.chainId, + maxBlockLookBack: hubConfig.maxBlockLookBack, + chainId: hubConfig.chainId, }); this.hubPoolClient = await utils.getHubPoolClient({ configStoreClient: this.configStoreClient, diff --git a/packages/indexer/src/utils/contractUtils.ts b/packages/indexer/src/utils/contractUtils.ts index 7418e91..c3e7a51 100644 --- a/packages/indexer/src/utils/contractUtils.ts +++ b/packages/indexer/src/utils/contractUtils.ts @@ -143,18 +143,18 @@ export type RetryProviderConfig = { nodeQuorumThreshold: number; retries: number; delay: number; + providerConfigs: [providerUrls: string, chainId: number][]; + chainId: number; }; export type RetryProviderDeps = { cache: across.interfaces.CachingMechanismInterface; logger: winston.Logger; - providerUrl: string; - chainId: number; }; export function getRetryProvider( params: RetryProviderConfig & RetryProviderDeps, ) { return new across.providers.RetryProvider( - [[params.providerUrl, params.chainId]], + params.providerConfigs, params.chainId, params.nodeQuorumThreshold, params.retries,