Skip to content

Commit

Permalink
improve(indexer): refactor configuration parsing (#45)
Browse files Browse the repository at this point in the history
Signed-off-by: david <david@umaproject.org>
  • Loading branch information
daywiss authored Sep 26, 2024
1 parent a348302 commit 00e747f
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 205 deletions.
2 changes: 1 addition & 1 deletion apps/node/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async function run() {
void (await Template.Main(process.env));
return "Example template app running";
case "indexer":
void (await Indexer.Main(process.env, logger));
void (await Indexer.Main(Indexer.envToConfig(process.env), logger));
break;
case "persistence-example":
void (await PersistenceExample.Main(process.env));
Expand Down
27 changes: 27 additions & 0 deletions packages/indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,30 @@ Without docker:

## Test
In this package run `pnpm test`

## ENV
```
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_USER=user
DATABASE_PASSWORD=password
DATABASE_NAME=mydatabase
REDIS_HOST=localhost
REDIS_PORT=6380
RPC_PROVIDER_URLS_1=https://mainnet.infura.io/v3/xxx
RPC_PROVIDER_URLS_10=https://optimism-mainnet.infura.io/v3/xxx
RPC_PROVIDER_URLS_137=https://polygon-mainnet.infura.io/v3/xxx
HUBPOOL_CHAIN=1
SPOKEPOOL_CHAINS_ENABLED=1,2
PROVIDER_CACHE_NAMESPACE=indexer_provider_cache
MAX_CONCURRENCY=1
PCT_RPC_CALLS_LOGGED=100
STANDARD_TTL_BLOCK_DISTANCE=1
NO_TTL_BLOCK_DISTANCE=1000
PROVIDER_CACHE_TTL=100000
NODE_QUORUM_THRESHOLD=1
RETRIES=2
DELAY=1000
```
2 changes: 1 addition & 1 deletion packages/indexer/src/generics/BaseIndexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export abstract class BaseIndexer {
this.logger.error({
at: "BaseIndexer#start",
message: `Failed to initialize ${this.name}`,
error: e,
error: (e as unknown as Error).message,
});
return;
}
Expand Down
1 change: 1 addition & 0 deletions packages/indexer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./main";
export * from "./parseEnv";
198 changes: 13 additions & 185 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import assert from "assert";
import * as services from "./services";
import winston from "winston";
import Redis from "ioredis";
import * as s from "superstruct";
import * as acrossConstants from "@across-protocol/constants";
import * as across from "@across-protocol/sdk";
import { connectToDatabase } from "./database/database.provider";
import { providers } from "ethers";
import { DatabaseConfig } from "@repo/indexer-database";
import * as parseEnv from "./parseEnv";

type RedisConfig = {
host: string;
port: number;
};
async function initializeRedis(config: RedisConfig, logger: winston.Logger) {
async function initializeRedis(
config: parseEnv.RedisConfig,
logger: winston.Logger,
) {
const redis = new Redis({
...config,
});
Expand All @@ -31,197 +26,30 @@ async function initializeRedis(config: RedisConfig, logger: winston.Logger) {
});
}

function getPostgresConfig(
env: Record<string, string | undefined>,
): DatabaseConfig {
assert(env.DATABASE_HOST, "requires DATABASE_HOST");
assert(env.DATABASE_PORT, "requires DATABASE_PORT");
assert(env.DATABASE_USER, "requires DATABASE_USER");
assert(env.DATABASE_PASSWORD, "requires DATABASE_PASSWORD");
assert(env.DATABASE_NAME, "requires DATABASE_NAME");
return {
host: env.DATABASE_HOST,
port: env.DATABASE_PORT,
user: env.DATABASE_USER,
password: env.DATABASE_PASSWORD,
dbName: env.DATABASE_NAME,
};
}

type RetryProviderConfig = {
providerCacheNamespace: string;
maxConcurrency: number;
pctRpcCallsLogged: number;
standardTtlBlockDistance: number;
noTtlBlockDistance: number;
providerCacheTtl: number;
nodeQuorumThreshold: number;
retries: number;
delay: number;
};
// superstruct coersion to turn string into an int and validate
const stringToInt = s.coerce(s.number(), s.string(), (value) =>
parseInt(value),
);
function getRetryProviderConfig(
env: Record<string, string | undefined>,
): RetryProviderConfig {
assert(env.PROVIDER_CACHE_NAMESPACE, "requires PROVIDER_CACHE_NAMESPACE");
assert(env.MAX_CONCURRENCY, "requires MAX_CONCURRENCY");
assert(env.PCT_RPC_CALLS_LOGGED, "requires PCT_RPC_CALLS_LOGGED");
assert(
env.STANDARD_TTL_BLOCK_DISTANCE,
"requires STANDARD_TTL_BLOCK_DISTANCE",
);
assert(env.NO_TTL_BLOCK_DISTANCE, "requires NO_TTL_BLOCK_DISTANCE");
assert(env.PROVIDER_CACHE_TTL, "requires PROVIDER_CACHE_TTL");
assert(env.NODE_QUORUM_THRESHOLD, "requires NODE_QUORUM_THRESHOLD");
assert(env.RETRIES, "requires RETRIES");
assert(env.DELAY, "requires DELAY");
return {
providerCacheNamespace: env.PROVIDER_CACHE_NAMESPACE,
maxConcurrency: s.create(env.MAX_CONCURRENCY, stringToInt),
pctRpcCallsLogged: s.create(env.PCT_RPC_CALLS_LOGGED, stringToInt),
standardTtlBlockDistance: s.create(
env.STANDARD_TTL_BLOCK_DISTANCE,
stringToInt,
),
noTtlBlockDistance: s.create(env.NO_TTL_BLOCK_DISTANCE, stringToInt),
providerCacheTtl: s.create(env.PROVIDER_CACHE_TTL, stringToInt),
nodeQuorumThreshold: s.create(env.NODE_QUORUM_THRESHOLD, stringToInt),
retries: s.create(env.RETRIES, stringToInt),
delay: s.create(env.DELAY, stringToInt),
};
}

// utility call to create the spoke pool event indexer config
async function getSpokePoolIndexerConfig(params: {
retryProviderConfig: RetryProviderConfig;
spokePoolProviderUrl: string;
hubPoolNetworkInfo: providers.Network;
hubPoolProviderUrl: string;
}) {
const {
retryProviderConfig,
spokePoolProviderUrl,
hubPoolProviderUrl,
hubPoolNetworkInfo,
} = params;
const tempSpokeProvider = new providers.JsonRpcProvider(spokePoolProviderUrl);
const spokePoolNetworkInfo = await tempSpokeProvider.getNetwork();
return {
retryProviderConfig,
configStoreConfig: {
chainId: hubPoolNetworkInfo.chainId,
providerUrl: hubPoolProviderUrl,
maxBlockLookBack: 10000,
},
hubConfig: {
chainId: hubPoolNetworkInfo.chainId,
providerUrl: hubPoolProviderUrl,
maxBlockLookBack: 10000,
},
spokeConfig: {
chainId: spokePoolNetworkInfo.chainId,
providerUrl: spokePoolProviderUrl,
// TODO: Set this per chain
maxBlockLookBack: 10000,
},
redisKeyPrefix: `spokePoolIndexer:${spokePoolNetworkInfo.chainId}`,
};
}
// utility call to create the hubpool event indexer config
async function getHubPoolIndexerConfig(params: {
retryProviderConfig: RetryProviderConfig;
hubPoolNetworkInfo: providers.Network;
hubPoolProviderUrl: string;
}) {
const { retryProviderConfig, hubPoolProviderUrl, hubPoolNetworkInfo } =
params;
return {
retryProviderConfig,
hubConfig: {
chainId: hubPoolNetworkInfo.chainId,
providerUrl: hubPoolProviderUrl,
maxBlockLookBack: 10000,
},
redisKeyPrefix: `hubPoolIndexer:${hubPoolNetworkInfo.chainId}`,
};
}

export async function Main(
env: Record<string, string | undefined>,
logger: winston.Logger,
) {
const spokePoolProviderUrls: string[] = Object.values(
acrossConstants.MAINNET_CHAIN_IDs,
)
.map((chainId) => env[`INDEXER_SPOKEPOOL_PROVIDER_URL_${chainId}`])
.filter((x): x is string => !!x);

assert(
spokePoolProviderUrls.length > 0,
"Must provide a url for at least one provider on one chain, for example: INDEXER_SPOKEPOOL_PROVIDER_URL_1",
);

assert(
env.INDEXER_HUBPOOL_PROVIDER_URL,
"requires INDEXER_HUBPOOL_PROVIDER_URL",
);
const hubPoolProviderUrl = env.INDEXER_HUBPOOL_PROVIDER_URL;
assert(env.INDEXER_REDIS_HOST, "requires INDEXER_REDIS_HOST");
assert(env.INDEXER_REDIS_PORT, "requires INDEXER_REDIS_PORT");
const redisConfig = {
host: env.INDEXER_REDIS_HOST,
port: Number(env.INDEXER_REDIS_PORT),
};
export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const { redisConfig, postgresConfig, hubConfig, spokeConfigs } = config;

const redis = await initializeRedis(redisConfig, logger);

const postgresConfig = getPostgresConfig(env);
const postgres = await connectToDatabase(postgresConfig, logger);

const retryProviderConfig = getRetryProviderConfig(env);
const tempHubProvider = new providers.JsonRpcProvider(hubPoolProviderUrl);
const hubPoolNetworkInfo = await tempHubProvider.getNetwork();
const bundleProcessor = new services.bundles.Processor({
logger,
redis,
postgres,
});
const spokePoolIndexers: Array<services.spokePoolIndexer.Indexer> = [];
const hubPoolIndexerConfig = await getHubPoolIndexerConfig({
hubPoolNetworkInfo,
hubPoolProviderUrl,
retryProviderConfig,
});
// canonical hubpool indexer
const hubPoolIndexer = new services.hubPoolIndexer.Indexer({
logger,
redis,
postgres,
...hubPoolIndexerConfig,
...hubConfig,
});
// instanciate multiple spoke pool event indexers
for (const spokePoolProviderUrl of spokePoolProviderUrls) {
const config = await getSpokePoolIndexerConfig({
hubPoolNetworkInfo,
spokePoolProviderUrl,
hubPoolProviderUrl,
retryProviderConfig,
});
logger.info({
message: "Starting indexer",
...config,
});
const spokeIndexer = new services.spokePoolIndexer.Indexer({
const spokePoolIndexers = spokeConfigs.map((spokeConfig) => {
return new services.spokePoolIndexer.Indexer({
logger,
redis,
postgres,
...config,
...spokeConfig,
});
spokePoolIndexers.push(spokeIndexer);
}
});

let exitRequested = false;
process.on("SIGINT", () => {
Expand Down Expand Up @@ -260,7 +88,7 @@ export async function Main(
(r) => r.status === "fulfilled",
),
bundleProcessorRunSuccess: bundleResults.status === "fulfilled",
hubPoolRunSucccess: hubPoolResult.status === "fulfilled",
hubPoolRunSuccess: hubPoolResult.status === "fulfilled",
},
});

Expand Down
Loading

0 comments on commit 00e747f

Please sign in to comment.