Skip to content

Commit

Permalink
feat(indexer): add coingecko price collection from tokens against usd
Browse files Browse the repository at this point in the history
Signed-off-by: david <david@umaproject.org>
  • Loading branch information
David authored and daywiss committed Dec 24, 2024
1 parent 0c88a91 commit 6aa16ce
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 28 deletions.
7 changes: 5 additions & 2 deletions packages/error-handling/src/utils/assert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import { AssertError } from "../errors/AssertError";
* @returns An assertion of `value`.
* @throws {@link AssertError} if assert's validity fails
*/
export function assert(value: unknown, message: string): asserts value {
export function assert(
value: unknown,
message: string,
): asserts value is NonNullable<unknown> {
try {
return assertModule.ok(value, message);
return assertModule.ok(value !== null && value !== undefined, message);
} catch (e: unknown) {
throw new AssertError(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class HistoricPrice1733941169940 implements MigrationInterface {
name = 'HistoricPrice1733941169940'

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "historic_market_price" ("id" SERIAL NOT NULL, "baseCurrency" character varying NOT NULL, "quoteCurrency" character varying NOT NULL DEFAULT 'usd', "date" date NOT NULL, "price" numeric NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_historic_price_baseCurrency_quoteCurrency_date" UNIQUE ("baseCurrency", "quoteCurrency", "date"), CONSTRAINT "PK_b0a22436b47e742187aa7408561" PRIMARY KEY ("id"))`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE "historic_market_price"`);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class HistoricPrice1734549610006 implements MigrationInterface {
name = 'HistoricPrice1734549610006'

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "historic_price" ("id" SERIAL NOT NULL, "baseCurrency" character varying NOT NULL, "quoteCurrency" character varying NOT NULL DEFAULT 'usd', "date" character varying NOT NULL, "price" numeric NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_historic_price_baseCurrency_quoteCurrency_date" UNIQUE ("baseCurrency", "quoteCurrency", "date"), CONSTRAINT "PK_77dc3f4978cdfb03f1bb3a7444b" PRIMARY KEY ("id"))`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE "historic_price"`);
}

}
35 changes: 35 additions & 0 deletions packages/indexer-database/src/entities/HistoricPrice.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import {
Column,
CreateDateColumn,
Entity,
PrimaryGeneratedColumn,
Unique,
} from "typeorm";

@Entity()
@Unique("UK_historic_price_baseCurrency_quoteCurrency_date", [
"baseCurrency",
"quoteCurrency",
"date",
])
export class HistoricPrice {
@PrimaryGeneratedColumn()
id: number;

// bear in mind we are using coingecko symbols directly here, for all intents and purposes this is coingecko historic market price
@Column()
baseCurrency: string;

@Column({ default: "usd" })
quoteCurrency: string;

// yyyy-LL-dd
@Column()
date: string;

@Column({ type: "decimal" })
price: string;

@CreateDateColumn()
createdAt: Date;
}
1 change: 1 addition & 0 deletions packages/indexer-database/src/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ export * from "./RelayHashInfo";

export * from "./WebhookRequest";
export * from "./WebhookClient";
export * from "./HistoricPrice";
2 changes: 2 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
"@repo/webhooks": "workspace:*",
"@types/express": "^4.17.21",
"@types/lodash": "^4.17.7",
"@types/luxon": "^3.4.2",
"bullmq": "^5.12.12",
"ethers": "^5.7.2",
"express": "^4.19.2",
"express-bearer-token": "^3.0.0",
"ioredis": "^5.4.1",
"lodash": "^4.17.21",
"luxon": "^3.5.0",
"redis": "^4.7.0",
"superstruct": "^2.0.3-1",
"winston": "^3.13.1"
Expand Down
8 changes: 8 additions & 0 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { IndexerQueuesService } from "./messaging/service";
import { IntegratorIdWorker } from "./messaging/IntegratorIdWorker";
import { AcrossIndexerManager } from "./data-indexing/service/AcrossIndexerManager";
import { BundleServicesManager } from "./services/BundleServicesManager";
import { CoingeckoPriceProcessor } from "./services";

async function initializeRedis(
config: parseEnv.RedisConfig,
Expand Down Expand Up @@ -55,6 +56,10 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const redis = await initializeRedis(redisConfig, logger);
const redisCache = new RedisCache(redis);
const postgres = await connectToDatabase(postgresConfig, logger);
const priceProcessor = new CoingeckoPriceProcessor(
{ symbols: config.coingeckoSymbols },
{ logger, postgres },
);
// Call write to kick off webhook calls
const { write } = await WebhookFactory(config.webhookConfig, {
postgres,
Expand Down Expand Up @@ -127,6 +132,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
integratorIdWorker.close();
acrossIndexerManager.stopGracefully();
bundleServicesManager.stop();
priceProcessor.stop();
} else {
integratorIdWorker.close();
logger.info({ at: "Indexer#Main", message: "Forcing exit..." });
Expand All @@ -146,6 +152,8 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
await Promise.allSettled([
bundleServicesManager.start(),
acrossIndexerManager.start(),
// run prices call to check every minute or so. it will only cache once a day
priceProcessor.start(60),
]);

logger.info({
Expand Down
15 changes: 11 additions & 4 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
WebhookTypes,
parseWebhookClientsFromString,
} from "@repo/webhooks";
import { CoingeckoSymbol } from "./utils/coingeckoClient";

export type Config = {
redisConfig: RedisConfig;
Expand All @@ -18,6 +19,7 @@ export type Config = {
enableBundleIncludedEventsService: boolean;
enableBundleBuilder: boolean;
webhookConfig: WebhooksConfig;
coingeckoSymbols: CoingeckoSymbol[];
};
export type RedisConfig = {
host: string;
Expand All @@ -29,11 +31,12 @@ export type ProviderConfig = [providerUrl: string, chainId: number];
export type Env = Record<string, string | undefined>;

export function parseRedisConfig(env: Env): RedisConfig {
assert(env.REDIS_HOST, "requires REDIS_HOST");
assert(env.REDIS_PORT, "requires REDIS_PORT");
const port = parseNumber(env.REDIS_PORT);
const { REDIS_HOST, REDIS_PORT } = env;
assert(REDIS_HOST, "requires REDIS_HOST");
assert(REDIS_PORT, "requires REDIS_PORT");
const port = parseNumber(REDIS_PORT);
return {
host: env.REDIS_HOST,
host: REDIS_HOST,
port,
// @dev: this retry config is needed for bullmq workers
maxRetriesPerRequest: null,
Expand Down Expand Up @@ -193,6 +196,9 @@ export function envToConfig(env: Env): Config {
enabledWebhookRequestWorkers: true,
clients: parseWebhookClientsFromString(env.WEBHOOK_CLIENTS ?? "[]"),
};
const coingeckoSymbols = parseArray(env.COINGECKO_SYMBOLS).map((symbol) =>
CoingeckoSymbol.create(symbol),
);
return {
redisConfig,
postgresConfig,
Expand All @@ -203,5 +209,6 @@ export function envToConfig(env: Env): Config {
enableBundleIncludedEventsService,
enableBundleBuilder,
webhookConfig,
coingeckoSymbols,
};
}
1 change: 1 addition & 0 deletions packages/indexer/src/services/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./bundles";
export * from "./spokePoolProcessor";
export * from "./BundleBuilderService";
export * from "./priceProcessor";
82 changes: 82 additions & 0 deletions packages/indexer/src/services/priceProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { CoingeckoSymbol, CoingeckoClient } from "../utils/coingeckoClient";
import { Logger } from "winston";
import { DataSource, entities } from "@repo/indexer-database";
import { BaseIndexer } from "../generics";
import { DateTime } from "luxon";

type Config = {
symbols: CoingeckoSymbol[];
// not used currently
quoteCurrency?: string;
};

type Deps = {
logger: Logger;
postgres: DataSource;
};

export class CoingeckoPriceProcessor extends BaseIndexer {
private coingeckoClient: CoingeckoClient;
constructor(
private config: Config,
private deps: Deps,
) {
super(deps.logger, "CoingeckoPriceProcessor");
this.coingeckoClient = new CoingeckoClient();
}

protected async indexerLogic(): Promise<void> {
const now = Date.now();
const dbFormattedDate = DateTime.fromMillis(now).toFormat("yyyy-LL-dd");
const quoteCurrency = this.config.quoteCurrency ?? "usd";
const historicPriceRepository = this.deps.postgres.getRepository(
entities.HistoricPrice,
);

for (const symbol of this.config.symbols) {
const existingPrice = await historicPriceRepository.findOne({
where: {
date: dbFormattedDate,
baseCurrency: symbol,
quoteCurrency,
},
});
// do nothing, we have a price for this day
if (existingPrice) return;

try {
const historicPriceData =
await this.coingeckoClient.getHistoricDailyPrice(now, symbol);
const price =
historicPriceData.market_data?.current_price[quoteCurrency];
// wasnt able to get a price
if (price === undefined) {
this.deps.logger.error(
`Unable to find ${quoteCurrency} for ${symbol}`,
);
return;
}
await historicPriceRepository.insert({
date: dbFormattedDate,
baseCurrency: symbol,
quoteCurrency,
price: price.toString(),
});
this.logger.info({
at: "CoingeckoPriceProcessor#indexerLogic",
message: `Inserted historic price for ${symbol} on ${dbFormattedDate}`,
});
} catch (error) {
this.logger.error({
at: "CoingeckoPriceProcessor#indexerLogic",
message: `Failed to fetch or insert historic price for ${symbol} on ${dbFormattedDate}`,
error: (error as Error).message,
});
}
}
}

protected async initialize(): Promise<void> {
// Initialization logic if needed
}
}
54 changes: 54 additions & 0 deletions packages/indexer/src/utils/coingeckoClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import * as s from "superstruct";
import { DateTime } from "luxon";

export const CoingeckoSymbol = s.enums([
"ethereum",
"matic-network",
"wrapped-bitcoin",
"usd-coin",
"uma",
"badger-dao",
"weth",
"boba-network",
"dai",
"balancer",
"tether",
"across-protocol",
"havven",
"pooltogether",
"bridged-usd-coin-base",
"optimism",
"usd-coin-ethereum-bridged",
]);
export type CoingeckoSymbol = s.Infer<typeof CoingeckoSymbol>;
export const CGHistoricPriceBase = s.object({
id: s.string(),
symbol: s.string(),
name: s.string(),
market_data: s.optional(
s.object({
current_price: s.record(s.string(), s.number()),
}),
),
});
export type CGHistoricPriceBase = s.Infer<typeof CGHistoricPriceBase>;

export class CoingeckoClient {
constructor(private baseUrl: string = "https://api.coingecko.com/api/v3") {}

// rounds timestamp to the current day
public async getHistoricDailyPrice(
timestamp: number,
symbol: CoingeckoSymbol,
): Promise<CGHistoricPriceBase> {
const cgFormattedDate =
DateTime.fromMillis(timestamp).toFormat("dd-LL-yyyy");
const response = await fetch(
`${this.baseUrl}/coins/${symbol}/history?date=${cgFormattedDate}&localization=false`,
);
if (!response.ok) {
throw new Error(`Error fetching historic price: ${response.statusText}`);
}
return s.create(await response.json(), CGHistoricPriceBase);
}
}
Loading

0 comments on commit 6aa16ce

Please sign in to comment.