Skip to content

Commit

Permalink
refactor price lookups to use workers
Browse files Browse the repository at this point in the history
Signed-off-by: david <david@umaproject.org>
  • Loading branch information
daywiss committed Dec 24, 2024
1 parent 421d5e8 commit bc2bf3d
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 107 deletions.
3 changes: 3 additions & 0 deletions packages/indexer-database/src/entities/RelayHashInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ export class RelayHashInfo {
@CreateDateColumn()
createdAt: Date;

@Column()
bridgeFeeUsd: string;

@UpdateDateColumn()
updatedAt: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export class V3FundsDeposited {
@CreateDateColumn()
createdAt: Date;

// this has been converted from block time seconds
@Column({ nullable: true })
blockTimestamp?: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { SpokePoolRepository } from "../../database/SpokePoolRepository";
import { SpokePoolProcessor } from "../../services/spokePoolProcessor";
import { IndexerQueues, IndexerQueuesService } from "../../messaging/service";
import { IntegratorIdMessage } from "../../messaging/IntegratorIdWorker";
import { PriceMessage } from "../../messaging/priceWorker";
import { FillWithBlock } from "@across-protocol/sdk/dist/cjs/interfaces/SpokePool";

export type FetchEventsResult = {
v3FundsDepositedEvents: utils.V3FundsDepositedWithIntegradorId[];
Expand Down Expand Up @@ -114,6 +116,8 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler {
);
await this.updateNewDepositsWithIntegratorId(newInsertedDeposits);
await this.spokePoolProcessor.process(storedEvents);
// publish new relays to workers to fill in prices
await this.publishNewRelays(events.filledV3RelayEvents);
}
private async getBlockTime(blockNumber: number): Promise<number> {
const block = await this.provider.getBlock(blockNumber);
Expand Down Expand Up @@ -275,6 +279,19 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler {
});
}

private async publishNewRelays(relays: FillWithBlock[]) {
const messages: PriceMessage[] = relays.map((relay) => {
return {
depositId: relay.depositId,
originChainId: relay.originChainId,
};
});
await this.indexerQueuesService.publishMessagesBulk(
IndexerQueues.PriceQuery,
IndexerQueues.PriceQuery, // Use queue name as job name
messages,
);
}
private async publishIntegratorIdMessages(
deposits: entities.V3FundsDeposited[],
) {
Expand Down
11 changes: 3 additions & 8 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import {
import { BundleRepository } from "./database/BundleRepository";
import { IndexerQueuesService } from "./messaging/service";
import { IntegratorIdWorker } from "./messaging/IntegratorIdWorker";
import { PriceWorker } from "./messaging/priceWorker";
import { AcrossIndexerManager } from "./data-indexing/service/AcrossIndexerManager";
import { BundleServicesManager } from "./services/BundleServicesManager";
import { CoingeckoPriceProcessor } from "./services";

async function initializeRedis(
config: parseEnv.RedisConfig,
Expand Down Expand Up @@ -56,10 +56,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const redis = await initializeRedis(redisConfig, logger);
const redisCache = new RedisCache(redis);
const postgres = await connectToDatabase(postgresConfig, logger);
const priceProcessor = new CoingeckoPriceProcessor(
{ symbols: config.coingeckoSymbols },
{ logger, postgres },
);
// Call write to kick off webhook calls
const { write } = await WebhookFactory(config.webhookConfig, {
postgres,
Expand Down Expand Up @@ -121,6 +117,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
logger,
retryProvidersFactory,
);
const priceWorker = new PriceWorker(redis, postgres, logger);

let exitRequested = false;
process.on("SIGINT", () => {
Expand All @@ -130,9 +127,9 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
message: "Wait for shutdown, or press Ctrl+C again to forcefully exit.",
});
integratorIdWorker.close();
priceWorker.close();
acrossIndexerManager.stopGracefully();
bundleServicesManager.stop();
priceProcessor.stop();
} else {
integratorIdWorker.close();
logger.info({ at: "Indexer#Main", message: "Forcing exit..." });
Expand All @@ -152,8 +149,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
await Promise.allSettled([
bundleServicesManager.start(),
acrossIndexerManager.start(),
// run prices call to check every minute or so. it will only cache once a day
priceProcessor.start(60),
]);

logger.info({
Expand Down
172 changes: 172 additions & 0 deletions packages/indexer/src/messaging/priceWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import Redis from "ioredis";
import winston from "winston";
import { Job, Worker } from "bullmq";
import { DataSource, entities } from "@repo/indexer-database";
import { IndexerQueues } from "./service";
import {
getIntegratorId,
yesterday,
CoingeckoClient,
findTokenByAddress,
} from "../utils";
import { RetryProvidersFactory } from "../web3/RetryProvidersFactory";

export type PriceMessage = {
depositId: number;
originChainId: number;
};

/**
* This worker listens to the `IntegratorId` queue and processes each job by:
* - Retrieving the deposit information from the database based on the provided relay hash.
* - Checking if the deposit record already has an integrator ID.
* - If the integrator ID is not set, the worker fetches it from the transaction data.
* - If found, the integrator ID is saved back into the deposit record in the database.
*/
export class PriceWorker {
public worker: Worker;
private coingeckoClient: CoingeckoClient;

constructor(
private redis: Redis,
private postgres: DataSource,
private logger: winston.Logger,
) {
this.coingeckoClient = new CoingeckoClient();
this.setWorker();
}

public setWorker() {
this.worker = new Worker(
IndexerQueues.PriceQuery,
async (job: Job<PriceMessage>) => {
try {
await this.run(job.data);
} catch (error) {
this.logger.error({
at: "PriceWorker",
message: `Error getting price for deposit ${job.data.depositId} on chain ${job.data.originChainId}`,
error,
});
throw error;
}
},
{ connection: this.redis, concurrency: 10 },
);
}
private async run(params: PriceMessage) {
const { depositId, originChainId } = params;
const relayHashInfoRepository = this.postgres.getRepository(
entities.RelayHashInfo,
);
const depositRepository = this.postgres.getRepository(
entities.V3FundsDeposited,
);
const historicPriceRepository = this.postgres.getRepository(
entities.HistoricPrice,
);

const relayHashInfo = await relayHashInfoRepository.findOne({
where: { depositId, originChainId },
});
const deposit = await depositRepository.findOne({
where: { depositId, originChainId },
});

if (!relayHashInfo || !deposit) {
this.logger.error({
at: "PriceWorker",
message: "Relay hash info not found",
...params,
});
return;
}

const blockTime = relayHashInfo?.depositEvent?.blockTimestamp;
if (!blockTime) {
this.logger.error({
at: "PriceWorker",
message: "Deposit block time not found for relay hash info",
...params,
});
return;
}
const priceTime = yesterday(blockTime);
const quoteCurrency = "usd";
const baseTokenInfo = findTokenByAddress(
relayHashInfo.fillEvent.outputToken,
relayHashInfo.destinationChainId,
);
const baseCurrency = baseTokenInfo?.coingeckoId;
let price: undefined | number;

if (!baseCurrency) {
this.logger.error({
at: "PriceWorker",
message: "Unable to find base currency to quote",
...params,
outputToken: relayHashInfo.fillEvent.outputToken,
destinationChainId: relayHashInfo.destinationChainId,
});
return;
}
const existingPrice = await historicPriceRepository.findOne({
where: {
date: priceTime,
baseCurrency,
quoteCurrency,
},
});
// fetch price if one hasnt been saved
if (!existingPrice) {
try {
const historicPriceData =
await this.coingeckoClient.getHistoricDailyPrice(
priceTime.getTime(),
baseCurrency,
);
price = historicPriceData.market_data?.current_price[quoteCurrency];
// wasnt able to get a price
if (price === undefined) {
this.logger.error(
`Unable to find ${quoteCurrency} for ${baseCurrency} at time ${priceTime}`,
);
return;
}
await historicPriceRepository.insert({
date: priceTime,
baseCurrency,
quoteCurrency,
price: price.toString(),
});
this.logger.info({
at: "PriceWorker",
...params,
message: `Fetched and inserted historic price for ${baseCurrency} on ${priceTime}`,
});
} catch (error) {
this.logger.error({
at: "PriceWorker",
...params,
message: `Failed to fetch or insert historic price for ${baseCurrency} on ${priceTime}`,
error: (error as Error).message,
});
}
} else {
price = Number(existingPrice.price);
}

if (price === undefined) {
this.logger.error({
at: "PriceWorker",
...params,
message: "Failed to get a valid price from cache or coingecko",
});
return;
}
// TODO: Compute bridge fee
}
public async close() {
return this.worker.close();
}
}
1 change: 1 addition & 0 deletions packages/indexer/src/messaging/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Queue, JobsOptions, BulkJobOptions } from "bullmq";

export enum IndexerQueues {
IntegratorId = "IntegratorId",
PriceQuery = "PriceQuery",
}

export class IndexerQueuesService {
Expand Down
1 change: 0 additions & 1 deletion packages/indexer/src/services/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export * from "./bundles";
export * from "./spokePoolProcessor";
export * from "./BundleBuilderService";
export * from "./priceProcessor";
87 changes: 0 additions & 87 deletions packages/indexer/src/services/priceProcessor.ts

This file was deleted.

Loading

0 comments on commit bc2bf3d

Please sign in to comment.