Skip to content

Commit

Permalink
remove config, include bridge fee calculation, input and output token…
Browse files Browse the repository at this point in the history
… prices

Signed-off-by: david <david@umaproject.org>
  • Loading branch information
daywiss committed Dec 30, 2024
1 parent a3283e9 commit ae571cf
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 135 deletions.
2 changes: 1 addition & 1 deletion packages/error-handling/src/utils/assert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export function assert(
message: string,
): asserts value is NonNullable<unknown> {
try {
return assertModule.ok(value !== null && value !== undefined, message);
return assertModule.ok(value, message);
} catch (e: unknown) {
throw new AssertError(message);
}
Expand Down
4 changes: 0 additions & 4 deletions packages/indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,4 @@ ENABLE_HUBPOOL_INDEXER=true
ENABLE_BUNDLE_EVENTS_PROCESSOR=true
ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE=true
ENABLE_BUNDLE_BUILDER=true
# use symbols defined in /home/dev/src/risklabs/indexer/packages/indexer/src/utils/coingeckoClient.ts
# separate them by comma, no spaces
COINGECKO_SYMBOLS=ethereum,optimism,across-protocol
```
205 changes: 119 additions & 86 deletions packages/indexer/src/messaging/priceWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import winston from "winston";
import { Job, Worker } from "bullmq";
import { DataSource, entities } from "@repo/indexer-database";
import { IndexerQueues } from "./service";
import { ethers } from "ethers";
import {
getIntegratorId,
yesterday,
CoingeckoClient,
findTokenByAddress,
} from "../utils";
import { RetryProvidersFactory } from "../web3/RetryProvidersFactory";
import { assert } from "@repo/error-handling";

export type PriceMessage = {
depositId: number;
Expand All @@ -27,17 +29,67 @@ export type PriceMessage = {
* - Logging errors and information at various stages of the process.
*/
export class PriceWorker {
public worker: Worker;
private worker: Worker;
private coingeckoClient: CoingeckoClient;
private relayHashInfoRepository;
private depositRepository;
private historicPriceRepository;

constructor(
private redis: Redis,
private postgres: DataSource,
private logger: winston.Logger,
) {
this.coingeckoClient = new CoingeckoClient();
this.relayHashInfoRepository = this.postgres.getRepository(
entities.RelayHashInfo,
);
this.depositRepository = this.postgres.getRepository(
entities.V3FundsDeposited,
);
this.historicPriceRepository = this.postgres.getRepository(
entities.HistoricPrice,
);
this.setWorker();
}
private async getPrice(
address: string,
chainId: number,
time: Date,
quoteCurrency = "usd",
): Promise<number> {
const priceTime = yesterday(time);
const tokenInfo = findTokenByAddress(address, chainId);
const baseCurrency = tokenInfo.coingeckoId;

const cachedPrice = await this.historicPriceRepository.findOne({
where: {
date: priceTime,
baseCurrency,
quoteCurrency,
},
});
// we have this price at this time in the db
if (cachedPrice) return Number(cachedPrice.price);

const fetchedPrice = await this.coingeckoClient.getHistoricDailyPrice(
priceTime.getTime(),
baseCurrency,
);
const price = fetchedPrice.market_data?.current_price[quoteCurrency];
assert(
price,
`Unable to fetch price for ${quoteCurrency} in ${baseCurrency} at ${priceTime}`,
);
await this.historicPriceRepository.insert({
date: priceTime,
baseCurrency,
quoteCurrency,
price: price.toString(),
});

return Number(price);
}

public setWorker() {
this.worker = new Worker(
Expand All @@ -57,117 +109,98 @@ export class PriceWorker {
{ connection: this.redis, concurrency: 10 },
);
}
private async run(params: PriceMessage) {
const { depositId, originChainId } = params;
const relayHashInfoRepository = this.postgres.getRepository(
entities.RelayHashInfo,
// price is assumed to be a float, amount is assumed in wei and decimals is the conversion for that amount
// this outputs the difference between input and output normalized to the price which is typically usd
private calculateBridgeFee(
inputToken: { amount: string; price: number; decimals: number },
outputToken: { amount: string; price: number; decimals: number },
): bigint {
const inputAmountBigInt = BigInt(inputToken.amount);
const outputAmountBigInt = BigInt(outputToken.amount);

const inputPriceBigInt = BigInt(
Math.round(inputToken.price * Math.pow(10, inputToken.decimals)),
);
const depositRepository = this.postgres.getRepository(
entities.V3FundsDeposited,
);
const historicPriceRepository = this.postgres.getRepository(
entities.HistoricPrice,
const outputPriceBigInt = BigInt(
Math.round(outputToken.price * Math.pow(10, outputToken.decimals)),
);

const relayHashInfo = await relayHashInfoRepository.findOne({
const normalizedInputAmount =
(inputAmountBigInt * inputPriceBigInt) /
BigInt(Math.pow(10, inputToken.decimals));
const normalizedOutputAmount =
(outputAmountBigInt * outputPriceBigInt) /
BigInt(Math.pow(10, outputToken.decimals));

return normalizedInputAmount - normalizedOutputAmount;
}
private async run(params: PriceMessage) {
const { depositId, originChainId } = params;

const relayHashInfo = await this.relayHashInfoRepository.findOne({
where: { depositId, originChainId },
});
const deposit = await depositRepository.findOne({
const deposit = await this.depositRepository.findOne({
where: { depositId, originChainId },
});

// This is catastrophic, we dont want worker retrying if we cannot find this data
if (!relayHashInfo || !deposit) {
this.logger.error({
at: "PriceWorker",
message: "Relay hash info not found",
message:
"Failed to retrieve relay hash information or deposit record from the database.",
...params,
});
return;
}

// if blockTimestamp doesnt exist, maybe we keep retrying till it does
const blockTime = relayHashInfo?.depositEvent?.blockTimestamp;
if (!blockTime) {
const errorMessage = "Deposit block time not found for relay hash info.";
this.logger.error({
at: "PriceWorker",
message: "Deposit block time not found for relay hash info",
message: errorMessage,
...params,
});
return;
throw new Error(errorMessage);
}
const priceTime = yesterday(blockTime);
const quoteCurrency = "usd";
const baseTokenInfo = findTokenByAddress(
relayHashInfo.fillEvent.outputToken,
relayHashInfo.destinationChainId,
const inputTokenAddress = relayHashInfo.fillEvent.inputToken;
const outputTokenAddress = relayHashInfo.fillEvent.outputToken;
const destinationChainId = relayHashInfo.destinationChainId;
const inputTokenInfo = findTokenByAddress(inputTokenAddress, originChainId);
const outputTokenInfo = findTokenByAddress(
outputTokenAddress,
destinationChainId,
);
const baseCurrency = baseTokenInfo?.coingeckoId;
let price: undefined | number;

if (!baseCurrency) {
this.logger.error({
at: "PriceWorker",
message: "Unable to find base currency to quote",
...params,
outputToken: relayHashInfo.fillEvent.outputToken,
destinationChainId: relayHashInfo.destinationChainId,
});
return;
}
const existingPrice = await historicPriceRepository.findOne({
where: {
date: priceTime,
baseCurrency,
quoteCurrency,
},
});
// fetch price if one hasnt been saved
if (!existingPrice) {
try {
const historicPriceData =
await this.coingeckoClient.getHistoricDailyPrice(
priceTime.getTime(),
baseCurrency,
);
price = historicPriceData.market_data?.current_price[quoteCurrency];
// wasnt able to get a price
if (price === undefined) {
this.logger.error(
`Unable to find ${quoteCurrency} for ${baseCurrency} at time ${priceTime}`,
);
return;
}
await historicPriceRepository.insert({
date: priceTime,
baseCurrency,
quoteCurrency,
price: price.toString(),
});
this.logger.info({
at: "PriceWorker",
...params,
message: `Fetched and inserted historic price for ${baseCurrency} on ${priceTime}`,
});
} catch (error) {
this.logger.error({
at: "PriceWorker",
...params,
message: `Failed to fetch or insert historic price for ${baseCurrency} on ${priceTime}`,
error: (error as Error).message,
});
}
} else {
price = Number(existingPrice.price);
}
const inputTokenPrice = await this.getPrice(
inputTokenAddress,
originChainId,
blockTime,
);
const outputTokenPrice = await this.getPrice(
outputTokenAddress,
destinationChainId,
blockTime,
);

if (price === undefined) {
this.logger.error({
at: "PriceWorker",
...params,
message: "Failed to get a valid price from cache or coingecko",
});
return;
}
// TODO: Compute bridge fee
const inputToken = {
amount: relayHashInfo.fillEvent.inputAmount,
price: inputTokenPrice,
decimals: inputTokenInfo.decimals,
};

const outputToken = {
amount: relayHashInfo.fillEvent.outputAmount,
price: outputTokenPrice,
decimals: outputTokenInfo.decimals,
};

const bridgeFee = this.calculateBridgeFee(inputToken, outputToken);
relayHashInfo.bridgeFeeUsd = bridgeFee.toString();
await this.relayHashInfoRepository.save(relayHashInfo);
}
public async close() {
return this.worker.close();
Expand Down
6 changes: 0 additions & 6 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
WebhookTypes,
parseWebhookClientsFromString,
} from "@repo/webhooks";
import { CoingeckoSymbol } from "./utils/coingeckoClient";

export type Config = {
redisConfig: RedisConfig;
Expand All @@ -19,7 +18,6 @@ export type Config = {
enableBundleIncludedEventsService: boolean;
enableBundleBuilder: boolean;
webhookConfig: WebhooksConfig;
coingeckoSymbols: CoingeckoSymbol[];
};
export type RedisConfig = {
host: string;
Expand Down Expand Up @@ -196,9 +194,6 @@ export function envToConfig(env: Env): Config {
enabledWebhookRequestWorkers: true,
clients: parseWebhookClientsFromString(env.WEBHOOK_CLIENTS ?? "[]"),
};
const coingeckoSymbols = parseArray(env.COINGECKO_SYMBOLS).map((symbol) =>
CoingeckoSymbol.create(symbol),
);
return {
redisConfig,
postgresConfig,
Expand All @@ -209,6 +204,5 @@ export function envToConfig(env: Env): Config {
enableBundleIncludedEventsService,
enableBundleBuilder,
webhookConfig,
coingeckoSymbols,
};
}
31 changes: 1 addition & 30 deletions packages/indexer/src/utils/coingeckoClient.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,6 @@
import * as s from "superstruct";
import { DateTime } from "luxon";

// tken from scraper and adapted from https://github.com/across-protocol/constants/blob/master/src/tokens.ts
export const CoingeckoSymbol = s.enums([
"across-protocol",
"aleph-zero",
"arbitrum",
"badger-dao",
"balancer",
"boba-network",
"bridged-usd-coin-base",
"dai",
"ethereum",
"gho",
"havven",
"lisk",
"matic-network",
"optimism",
"pooltogether",
"tether",
"uma",
"usd-coin",
"usd-coin-ethereum-bridged",
"usdb",
"weth",
"wmatic",
"wrapped-bitcoin",
]);
export type CoingeckoSymbol = s.Infer<typeof CoingeckoSymbol>;
export const CGHistoricPriceBase = s.object({
id: s.string(),
symbol: s.string(),
Expand All @@ -38,8 +11,6 @@ export const CGHistoricPriceBase = s.object({
}),
),
});
export const isCoingeckoSymbol = (symbol: string) =>
s.is(symbol, CoingeckoSymbol);

export type CGHistoricPriceBase = s.Infer<typeof CGHistoricPriceBase>;

Expand All @@ -57,7 +28,7 @@ export class CoingeckoClient {
// rounds timestamp to the current day
public async getHistoricDailyPrice(
timestamp: number,
symbol: CoingeckoSymbol,
symbol: string,
): Promise<CGHistoricPriceBase> {
const cgFormattedDate =
DateTime.fromMillis(timestamp).toFormat("dd-LL-yyyy");
Expand Down
Loading

0 comments on commit ae571cf

Please sign in to comment.