From 22ddb90658da79f5d8d4358488c44a960a2d118b Mon Sep 17 00:00:00 2001 From: amateima <89395931+amateima@users.noreply.github.com> Date: Mon, 25 Nov 2024 21:35:04 +0200 Subject: [PATCH] feat: integrate webhooks in the indexer (#114) Signed-off-by: david Co-authored-by: david --- packages/indexer-api/src/main.ts | 4 +- .../src/entities/WebhookClient.ts | 11 +- .../src/entities/WebhookRequest.ts | 18 +- packages/indexer-database/src/main.ts | 3 + .../src/migrations/1732198003042-Webhook.ts | 97 ---------- .../src/migrations/1732293783614-Webhook.ts | 175 ------------------ .../migrations/1732297474910-WebhookClient.ts | 21 +++ .../1732297948190-WebhookRequest.ts | 22 +++ .../src/migrations/1732300892947-Webhook.ts | 175 ------------------ .../1732310112989-WebhookRequest.ts | 15 ++ .../service/AcrossIndexerManager.ts | 9 +- packages/indexer/src/main.ts | 5 +- .../src/services/spokePoolProcessor.ts | 65 ++++++- packages/webhooks/README.md | 5 +- packages/webhooks/package.json | 2 + .../adapter/messaging/WebhookRequestWorker.ts | 84 +++++++++ .../messaging/WebhooksQueuesService.ts | 53 ++++++ .../src/database/webhookClientRepository.ts | 11 +- .../src/database/webhookRequestRepository.ts | 11 +- .../webhooks/src/eventProcessorManager.ts | 88 +++++---- .../src/eventProcessors/depositStatus.ts | 72 ++++--- packages/webhooks/src/factory.ts | 35 ++-- packages/webhooks/src/notifier.ts | 4 +- packages/webhooks/src/router.ts | 12 +- packages/webhooks/src/types.ts | 3 +- packages/webhooks/src/utils.ts | 3 +- pnpm-lock.yaml | 6 + 27 files changed, 452 insertions(+), 557 deletions(-) delete mode 100644 packages/indexer-database/src/migrations/1732198003042-Webhook.ts delete mode 100644 packages/indexer-database/src/migrations/1732293783614-Webhook.ts create mode 100644 packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts create mode 100644 packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts delete mode 100644 packages/indexer-database/src/migrations/1732300892947-Webhook.ts create mode 100644 packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts create mode 100644 packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts create mode 100644 packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts diff --git a/packages/indexer-api/src/main.ts b/packages/indexer-api/src/main.ts index d8e32dd..f1b5646 100644 --- a/packages/indexer-api/src/main.ts +++ b/packages/indexer-api/src/main.ts @@ -87,10 +87,10 @@ export async function Main( const redis = await initializeRedis(redisConfig, logger); const webhooks = Webhooks.WebhookFactory( { - requireApiKey: false, enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: false, }, - { postgres, logger }, + { postgres, logger, redis }, ); const allRouters: Record = { diff --git a/packages/indexer-database/src/entities/WebhookClient.ts b/packages/indexer-database/src/entities/WebhookClient.ts index e267717..ecae8fa 100644 --- a/packages/indexer-database/src/entities/WebhookClient.ts +++ b/packages/indexer-database/src/entities/WebhookClient.ts @@ -1,14 +1,15 @@ -import { Entity, PrimaryColumn, Column, PrimaryGeneratedColumn } from "typeorm"; +import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm"; @Entity() +@Unique("UK_webhook_client_api_key", ["apiKey"]) export class WebhookClient { + @PrimaryGeneratedColumn() + id: number; + @Column() name: string; - @PrimaryGeneratedColumn() - id: string; - - @Column({ unique: true }) + @Column() apiKey: string; @Column("jsonb") diff --git a/packages/indexer-database/src/entities/WebhookRequest.ts b/packages/indexer-database/src/entities/WebhookRequest.ts index 7022f5f..4f35ef3 100644 --- a/packages/indexer-database/src/entities/WebhookRequest.ts +++ b/packages/indexer-database/src/entities/WebhookRequest.ts @@ -1,16 +1,28 @@ -import { Entity, PrimaryColumn, Column } from "typeorm"; +import { + Entity, + PrimaryColumn, + Column, + Unique, + CreateDateColumn, + Index, +} from "typeorm"; @Entity() +@Unique("UK_webhook_request_clientId_filter", ["clientId", "filter"]) +@Index("IX_webhook_request_filter", ["filter"]) export class WebhookRequest { @PrimaryColumn() id: string; + @Column({ type: "integer" }) + clientId: number; + @Column() url: string; @Column() filter: string; - @Column({ type: "text", nullable: true, default: undefined }) - clientId?: string | undefined; + @CreateDateColumn() + createdAt: Date; } diff --git a/packages/indexer-database/src/main.ts b/packages/indexer-database/src/main.ts index 7ba56df..96c9bf4 100644 --- a/packages/indexer-database/src/main.ts +++ b/packages/indexer-database/src/main.ts @@ -36,6 +36,9 @@ export const createDataSource = (config: DatabaseConfig): DataSource => { entities.RootBundleExecutedJoinTable, // Others entities.RelayHashInfo, + // Webhooks + entities.WebhookRequest, + entities.WebhookClient, ], migrationsTableName: "_migrations", migrations: ["migrations/*.ts"], diff --git a/packages/indexer-database/src/migrations/1732198003042-Webhook.ts b/packages/indexer-database/src/migrations/1732198003042-Webhook.ts deleted file mode 100644 index 075409a..0000000 --- a/packages/indexer-database/src/migrations/1732198003042-Webhook.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { MigrationInterface, QueryRunner } from "typeorm"; - -export class Webhook1732198003042 implements MigrationInterface { - name = "Webhook1732198003042"; - - public async up(queryRunner: QueryRunner): Promise { - await queryRunner.query( - `CREATE TABLE "webhook_request" ("id" character varying NOT NULL, "url" character varying NOT NULL, "filter" character varying NOT NULL, CONSTRAINT "PK_67a7784045de2d1b7139b611b93" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "webhook_client" ("name" character varying NOT NULL, "id" character varying NOT NULL, "apiKey" character varying NOT NULL, "domains" text NOT NULL, CONSTRAINT "PK_f7330fb3bdb2e19534eae691d44" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."root_bundle_canceled" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."root_bundle_executed" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."root_bundle_disputed" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."proposed_root_bundle" ALTER COLUMN "chainIds" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."proposed_root_bundle" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."set_pool_rebalance_route" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."v3_funds_deposited" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."filled_v3_relay" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."requested_v3_slow_fill" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."requested_speed_up_v3_deposit" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."relayed_root_bundle" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."executed_relayer_refund_root" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."tokens_bridged" ALTER COLUMN "finalised" DROP DEFAULT`, - ); - } - - public async down(queryRunner: QueryRunner): Promise { - await queryRunner.query( - `ALTER TABLE "evm"."tokens_bridged" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."executed_relayer_refund_root" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."relayed_root_bundle" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."requested_speed_up_v3_deposit" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."requested_v3_slow_fill" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."filled_v3_relay" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."v3_funds_deposited" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."set_pool_rebalance_route" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."proposed_root_bundle" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."proposed_root_bundle" ALTER COLUMN "chainIds" SET DEFAULT '[]'`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."root_bundle_disputed" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."root_bundle_executed" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query( - `ALTER TABLE "evm"."root_bundle_canceled" ALTER COLUMN "finalised" SET DEFAULT false`, - ); - await queryRunner.query(`DROP TABLE "webhook_client"`); - await queryRunner.query(`DROP TABLE "webhook_request"`); - } -} diff --git a/packages/indexer-database/src/migrations/1732293783614-Webhook.ts b/packages/indexer-database/src/migrations/1732293783614-Webhook.ts deleted file mode 100644 index 693a6a8..0000000 --- a/packages/indexer-database/src/migrations/1732293783614-Webhook.ts +++ /dev/null @@ -1,175 +0,0 @@ -import { MigrationInterface, QueryRunner } from "typeorm"; - -export class Webhook1732293783614 implements MigrationInterface { - name = "Webhook1732293783614"; - - public async up(queryRunner: QueryRunner): Promise { - await queryRunner.query( - `CREATE TABLE "evm"."root_bundle_canceled" ("id" SERIAL NOT NULL, "caller" character varying NOT NULL, "requestTime" TIMESTAMP NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_rootBundleCanceled_txHash" UNIQUE ("transactionHash"), CONSTRAINT "PK_97a84a7224c26da0f0d5dc24b6a" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."root_bundle_executed" ("id" SERIAL NOT NULL, "leafId" integer NOT NULL, "groupIndex" integer NOT NULL, "chainId" integer NOT NULL, "l1Tokens" jsonb NOT NULL, "bundleLpFees" jsonb NOT NULL, "netSendAmounts" jsonb NOT NULL, "runningBalances" jsonb NOT NULL, "caller" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_rootBundleExecuted_chain_leaf_groupIdx_txHash" UNIQUE ("chainId", "leafId", "groupIndex", "transactionHash"), CONSTRAINT "PK_a3b0c39415b0b42afa7bd78075e" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."root_bundle_disputed" ("id" SERIAL NOT NULL, "disputer" character varying NOT NULL, "requestTime" TIMESTAMP NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_rootBundleDisputed_txHash" UNIQUE ("transactionHash"), CONSTRAINT "PK_93937e629b5c5c1471049bce3c4" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "bundle_block_range" ("id" SERIAL NOT NULL, "bundleId" integer NOT NULL, "chainId" integer NOT NULL, "startBlock" integer NOT NULL, "endBlock" integer NOT NULL, CONSTRAINT "UK_bundleBlockRange_bundleId_chainId" UNIQUE ("bundleId", "chainId"), CONSTRAINT "PK_903331c592ac44aaf237755fd8b" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TYPE "public"."bundle_event_type_enum" AS ENUM('deposit', 'expiredDeposit', 'fill', 'slowFill', 'unexecutableSlowFill')`, - ); - await queryRunner.query( - `CREATE TABLE "bundle_event" ("id" SERIAL NOT NULL, "bundleId" integer NOT NULL, "type" "public"."bundle_event_type_enum" NOT NULL, "relayHash" character varying NOT NULL, "repaymentChainId" integer, CONSTRAINT "UK_bundleEvent_eventType_relayHash" UNIQUE ("type", "relayHash"), CONSTRAINT "PK_d633122fa4b52768e1b588bddee" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TYPE "public"."bundle_status_enum" AS ENUM('Proposed', 'Canceled', 'Disputed', 'Executed')`, - ); - await queryRunner.query( - `CREATE TABLE "bundle" ("id" SERIAL NOT NULL, "poolRebalanceRoot" character varying NOT NULL, "relayerRefundRoot" character varying NOT NULL, "slowRelayRoot" character varying NOT NULL, "proposalId" integer NOT NULL, "cancelationId" integer, "disputeId" integer, "status" "public"."bundle_status_enum" NOT NULL DEFAULT 'Proposed', "eventsAssociated" boolean NOT NULL DEFAULT false, CONSTRAINT "REL_a8344aa79161a63b6397cc8006" UNIQUE ("proposalId"), CONSTRAINT "REL_d728c78130d07f0857ca9d08f4" UNIQUE ("cancelationId"), CONSTRAINT "REL_707430c410bc8a69af9432bedf" UNIQUE ("disputeId"), CONSTRAINT "PK_637e3f87e837d6532109c198dea" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."proposed_root_bundle" ("id" SERIAL NOT NULL, "challengePeriodEndTimestamp" TIMESTAMP NOT NULL, "poolRebalanceLeafCount" integer NOT NULL, "bundleEvaluationBlockNumbers" jsonb NOT NULL, "chainIds" jsonb NOT NULL, "poolRebalanceRoot" character varying NOT NULL, "relayerRefundRoot" character varying NOT NULL, "slowRelayRoot" character varying NOT NULL, "proposer" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_proposedRootBundle_txHash" UNIQUE ("transactionHash"), CONSTRAINT "PK_61f8cd3411bf1976fdb13dca607" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."set_pool_rebalance_route" ("id" SERIAL NOT NULL, "destinationChainId" integer NOT NULL, "l1Token" character varying NOT NULL, "destinationToken" character varying NOT NULL, "blockNumber" integer NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_spr_transactionHash_transactionIndex_logIndex" UNIQUE ("transactionHash", "transactionIndex", "logIndex"), CONSTRAINT "PK_93edcf0d94f29e5cd34513baf9d" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."v3_funds_deposited" ("id" SERIAL NOT NULL, "relayHash" character varying NOT NULL, "depositId" integer NOT NULL, "originChainId" integer NOT NULL, "destinationChainId" integer NOT NULL, "fromLiteChain" boolean NOT NULL, "toLiteChain" boolean NOT NULL, "depositor" character varying NOT NULL, "recipient" character varying NOT NULL, "inputToken" character varying NOT NULL, "inputAmount" character varying NOT NULL, "outputToken" character varying NOT NULL, "outputAmount" character varying NOT NULL, "message" character varying NOT NULL, "exclusiveRelayer" character varying NOT NULL, "exclusivityDeadline" TIMESTAMP, "fillDeadline" TIMESTAMP NOT NULL, "quoteTimestamp" TIMESTAMP NOT NULL, "quoteBlockNumber" integer NOT NULL, "integratorId" character varying, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_v3FundsDeposited_depositId_originChainId" UNIQUE ("depositId", "originChainId"), CONSTRAINT "PK_7fb4637d005c1caba823aefdbd1" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TYPE "evm"."filled_v3_relay_filltype_enum" AS ENUM('0', '1', '2')`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."filled_v3_relay" ("id" SERIAL NOT NULL, "relayHash" character varying NOT NULL, "depositId" integer NOT NULL, "originChainId" integer NOT NULL, "destinationChainId" integer NOT NULL, "depositor" character varying NOT NULL, "recipient" character varying NOT NULL, "inputToken" character varying NOT NULL, "inputAmount" character varying NOT NULL, "outputToken" character varying NOT NULL, "outputAmount" character varying NOT NULL, "message" character varying NOT NULL, "exclusiveRelayer" character varying NOT NULL, "exclusivityDeadline" TIMESTAMP, "fillDeadline" TIMESTAMP NOT NULL, "updatedRecipient" character varying NOT NULL, "updatedMessage" character varying NOT NULL, "updatedOutputAmount" character varying NOT NULL, "fillType" "evm"."filled_v3_relay_filltype_enum" NOT NULL, "relayer" character varying NOT NULL, "repaymentChainId" integer NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_filledV3Relay_relayHash" UNIQUE ("relayHash"), CONSTRAINT "PK_8f1cc6f89a5ed042e3ed258d400" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."requested_v3_slow_fill" ("id" SERIAL NOT NULL, "relayHash" character varying NOT NULL, "depositId" integer NOT NULL, "originChainId" integer NOT NULL, "destinationChainId" integer NOT NULL, "depositor" character varying NOT NULL, "recipient" character varying NOT NULL, "inputToken" character varying NOT NULL, "inputAmount" character varying NOT NULL, "outputToken" character varying NOT NULL, "outputAmount" character varying NOT NULL, "message" character varying NOT NULL, "exclusiveRelayer" character varying NOT NULL, "exclusivityDeadline" TIMESTAMP, "fillDeadline" TIMESTAMP NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_requestedV3SlowFill_relayHash" UNIQUE ("relayHash"), CONSTRAINT "PK_ef6d61ccd9e937b8a798ad82d3c" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."requested_speed_up_v3_deposit" ("id" SERIAL NOT NULL, "originChainId" integer NOT NULL, "depositId" integer NOT NULL, "depositor" character varying NOT NULL, "updatedRecipient" character varying NOT NULL, "updatedMessage" character varying NOT NULL, "updatedOutputAmount" character varying NOT NULL, "depositorSignature" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "finalised" boolean NOT NULL, "blockNumber" integer NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_speedUpV3_depositId_originChain_txHash_logIdx" UNIQUE ("depositId", "originChainId", "transactionHash", "logIndex"), CONSTRAINT "PK_92225be4f84268c26a66b4eaa17" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."relayed_root_bundle" ("id" SERIAL NOT NULL, "chainId" integer NOT NULL, "rootBundleId" integer NOT NULL, "relayerRefundRoot" character varying NOT NULL, "slowRelayRoot" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_relayedRootBundle_chainId_rootBundleId" UNIQUE ("chainId", "rootBundleId"), CONSTRAINT "PK_b95beeb64004ee791b2195aaa80" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."executed_relayer_refund_root" ("id" SERIAL NOT NULL, "chainId" integer NOT NULL, "rootBundleId" integer NOT NULL, "leafId" integer NOT NULL, "l2TokenAddress" character varying NOT NULL, "amountToReturn" character varying NOT NULL, "refundAmounts" jsonb NOT NULL, "refundAddresses" jsonb NOT NULL, "caller" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_executedRelayerRefundRoot_chain_rootBundle_leaf" UNIQUE ("chainId", "rootBundleId", "leafId"), CONSTRAINT "PK_9785720b5a11005f37d894fd412" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."tokens_bridged" ("id" SERIAL NOT NULL, "chainId" integer NOT NULL, "leafId" integer NOT NULL, "l2TokenAddress" character varying NOT NULL, "amountToReturn" character varying NOT NULL, "caller" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_tokensBridged_chain_leaf_l2Token_txHash" UNIQUE ("chainId", "leafId", "l2TokenAddress", "transactionHash"), CONSTRAINT "PK_ca5a436f7fabd6c700cb7327415" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TYPE "public"."relay_hash_info_status_enum" AS ENUM('unfilled', 'filled', 'slowFillRequested', 'slowFilled', 'expired', 'refunded')`, - ); - await queryRunner.query( - `CREATE TABLE "relay_hash_info" ("id" SERIAL NOT NULL, "relayHash" character varying NOT NULL, "depositId" integer NOT NULL, "originChainId" integer NOT NULL, "destinationChainId" integer NOT NULL, "depositTxHash" character varying, "depositEventId" integer, "fillTxHash" character varying, "fillEventId" integer, "slowFillRequestEventId" integer, "fillDeadline" TIMESTAMP NOT NULL, "status" "public"."relay_hash_info_status_enum" NOT NULL DEFAULT 'unfilled', "depositRefundTxHash" character varying, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_relayHashInfo_relayHash" UNIQUE ("relayHash"), CONSTRAINT "REL_4e5fd1998c43638a6e836a3636" UNIQUE ("depositEventId"), CONSTRAINT "REL_8aec45003aaa82a8550b9a1535" UNIQUE ("fillEventId"), CONSTRAINT "REL_37cf938a3a02547d23e967867a" UNIQUE ("slowFillRequestEventId"), CONSTRAINT "PK_cb69f68900aa0ce2756f103692f" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "webhook_request" ("id" character varying NOT NULL, "url" character varying NOT NULL, "filter" character varying NOT NULL, "clientId" text, CONSTRAINT "PK_67a7784045de2d1b7139b611b93" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "webhook_client" ("name" character varying NOT NULL, "id" SERIAL NOT NULL, "apiKey" character varying NOT NULL, "domains" jsonb NOT NULL, CONSTRAINT "UQ_242a96416f14915efcdecda3bd8" UNIQUE ("apiKey"), CONSTRAINT "PK_f7330fb3bdb2e19534eae691d44" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "bundle_executions" ("bundleId" integer NOT NULL, "executionId" integer NOT NULL, CONSTRAINT "PK_d781edd9ee5d58baab40ec27585" PRIMARY KEY ("bundleId", "executionId"))`, - ); - await queryRunner.query( - `CREATE INDEX "IDX_7ac73eb154127e8d68b3a881e7" ON "bundle_executions" ("bundleId") `, - ); - await queryRunner.query( - `CREATE INDEX "IDX_9551b3ed2ed4a9cf286637e51f" ON "bundle_executions" ("executionId") `, - ); - await queryRunner.query( - `ALTER TABLE "bundle_block_range" ADD CONSTRAINT "FK_f5c43af2e3e71193090d4f37285" FOREIGN KEY ("bundleId") REFERENCES "bundle"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_event" ADD CONSTRAINT "FK_62dcd4f6f0d1713fab0c8542dba" FOREIGN KEY ("bundleId") REFERENCES "bundle"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" ADD CONSTRAINT "FK_bundle_rootBundleProposeId" FOREIGN KEY ("proposalId") REFERENCES "evm"."proposed_root_bundle"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" ADD CONSTRAINT "FK_bundle_rootBundleCanceledId" FOREIGN KEY ("cancelationId") REFERENCES "evm"."root_bundle_canceled"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" ADD CONSTRAINT "FK_bundle_rootBundleDisputedId" FOREIGN KEY ("disputeId") REFERENCES "evm"."root_bundle_disputed"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" ADD CONSTRAINT "FK_relayHashInfo_depositEventId" FOREIGN KEY ("depositEventId") REFERENCES "evm"."v3_funds_deposited"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" ADD CONSTRAINT "FK_relayHashInfo_fillEventId" FOREIGN KEY ("fillEventId") REFERENCES "evm"."filled_v3_relay"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" ADD CONSTRAINT "FK_relayHashInfo_slowFillRequestEventId" FOREIGN KEY ("slowFillRequestEventId") REFERENCES "evm"."requested_v3_slow_fill"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_executions" ADD CONSTRAINT "FK_7ac73eb154127e8d68b3a881e7c" FOREIGN KEY ("bundleId") REFERENCES "bundle"("id") ON DELETE CASCADE ON UPDATE CASCADE`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_executions" ADD CONSTRAINT "FK_9551b3ed2ed4a9cf286637e51fa" FOREIGN KEY ("executionId") REFERENCES "evm"."root_bundle_executed"("id") ON DELETE CASCADE ON UPDATE CASCADE`, - ); - } - - public async down(queryRunner: QueryRunner): Promise { - await queryRunner.query( - `ALTER TABLE "bundle_executions" DROP CONSTRAINT "FK_9551b3ed2ed4a9cf286637e51fa"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_executions" DROP CONSTRAINT "FK_7ac73eb154127e8d68b3a881e7c"`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" DROP CONSTRAINT "FK_relayHashInfo_slowFillRequestEventId"`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" DROP CONSTRAINT "FK_relayHashInfo_fillEventId"`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" DROP CONSTRAINT "FK_relayHashInfo_depositEventId"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" DROP CONSTRAINT "FK_bundle_rootBundleDisputedId"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" DROP CONSTRAINT "FK_bundle_rootBundleCanceledId"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" DROP CONSTRAINT "FK_bundle_rootBundleProposeId"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_event" DROP CONSTRAINT "FK_62dcd4f6f0d1713fab0c8542dba"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_block_range" DROP CONSTRAINT "FK_f5c43af2e3e71193090d4f37285"`, - ); - await queryRunner.query( - `DROP INDEX "public"."IDX_9551b3ed2ed4a9cf286637e51f"`, - ); - await queryRunner.query( - `DROP INDEX "public"."IDX_7ac73eb154127e8d68b3a881e7"`, - ); - await queryRunner.query(`DROP TABLE "bundle_executions"`); - await queryRunner.query(`DROP TABLE "webhook_client"`); - await queryRunner.query(`DROP TABLE "webhook_request"`); - await queryRunner.query(`DROP TABLE "relay_hash_info"`); - await queryRunner.query(`DROP TYPE "public"."relay_hash_info_status_enum"`); - await queryRunner.query(`DROP TABLE "evm"."tokens_bridged"`); - await queryRunner.query(`DROP TABLE "evm"."executed_relayer_refund_root"`); - await queryRunner.query(`DROP TABLE "evm"."relayed_root_bundle"`); - await queryRunner.query(`DROP TABLE "evm"."requested_speed_up_v3_deposit"`); - await queryRunner.query(`DROP TABLE "evm"."requested_v3_slow_fill"`); - await queryRunner.query(`DROP TABLE "evm"."filled_v3_relay"`); - await queryRunner.query(`DROP TYPE "evm"."filled_v3_relay_filltype_enum"`); - await queryRunner.query(`DROP TABLE "evm"."v3_funds_deposited"`); - await queryRunner.query(`DROP TABLE "evm"."set_pool_rebalance_route"`); - await queryRunner.query(`DROP TABLE "evm"."proposed_root_bundle"`); - await queryRunner.query(`DROP TABLE "bundle"`); - await queryRunner.query(`DROP TYPE "public"."bundle_status_enum"`); - await queryRunner.query(`DROP TABLE "bundle_event"`); - await queryRunner.query(`DROP TYPE "public"."bundle_event_type_enum"`); - await queryRunner.query(`DROP TABLE "bundle_block_range"`); - await queryRunner.query(`DROP TABLE "evm"."root_bundle_disputed"`); - await queryRunner.query(`DROP TABLE "evm"."root_bundle_executed"`); - await queryRunner.query(`DROP TABLE "evm"."root_bundle_canceled"`); - } -} diff --git a/packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts b/packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts new file mode 100644 index 0000000..3cfaf4c --- /dev/null +++ b/packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts @@ -0,0 +1,21 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class WebhookClient1732297474910 implements MigrationInterface { + name = "WebhookClient1732297474910"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "webhook_client" ( + "id" SERIAL NOT NULL, + "name" character varying NOT NULL, + "apiKey" character varying NOT NULL, + "domains" jsonb NOT NULL, + CONSTRAINT "UK_webhook_client_api_key" UNIQUE ("apiKey"), + CONSTRAINT "PK_f7330fb3bdb2e19534eae691d44" PRIMARY KEY ("id") + )`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "webhook_client"`); + } +} diff --git a/packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts b/packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts new file mode 100644 index 0000000..8c8190d --- /dev/null +++ b/packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts @@ -0,0 +1,22 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class WebhookRequest1732297948190 implements MigrationInterface { + name = "WebhookRequest1732297948190"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "webhook_request" ( + "id" character varying NOT NULL, + "clientId" integer NOT NULL, + "url" character varying NOT NULL, + "filter" character varying NOT NULL, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "UK_webhook_request_clientId_filter" UNIQUE ("clientId", "filter"), + CONSTRAINT "PK_67a7784045de2d1b7139b611b93" PRIMARY KEY ("id") + )`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "webhook_request"`); + } +} diff --git a/packages/indexer-database/src/migrations/1732300892947-Webhook.ts b/packages/indexer-database/src/migrations/1732300892947-Webhook.ts deleted file mode 100644 index 0cc6601..0000000 --- a/packages/indexer-database/src/migrations/1732300892947-Webhook.ts +++ /dev/null @@ -1,175 +0,0 @@ -import { MigrationInterface, QueryRunner } from "typeorm"; - -export class Webhook1732300892947 implements MigrationInterface { - name = "Webhook1732300892947"; - - public async up(queryRunner: QueryRunner): Promise { - await queryRunner.query( - `CREATE TABLE "evm"."root_bundle_canceled" ("id" SERIAL NOT NULL, "caller" character varying NOT NULL, "requestTime" TIMESTAMP NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_rootBundleCanceled_txHash" UNIQUE ("transactionHash"), CONSTRAINT "PK_97a84a7224c26da0f0d5dc24b6a" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."root_bundle_executed" ("id" SERIAL NOT NULL, "leafId" integer NOT NULL, "groupIndex" integer NOT NULL, "chainId" integer NOT NULL, "l1Tokens" jsonb NOT NULL, "bundleLpFees" jsonb NOT NULL, "netSendAmounts" jsonb NOT NULL, "runningBalances" jsonb NOT NULL, "caller" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_rootBundleExecuted_chain_leaf_groupIdx_txHash" UNIQUE ("chainId", "leafId", "groupIndex", "transactionHash"), CONSTRAINT "PK_a3b0c39415b0b42afa7bd78075e" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."root_bundle_disputed" ("id" SERIAL NOT NULL, "disputer" character varying NOT NULL, "requestTime" TIMESTAMP NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_rootBundleDisputed_txHash" UNIQUE ("transactionHash"), CONSTRAINT "PK_93937e629b5c5c1471049bce3c4" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "bundle_block_range" ("id" SERIAL NOT NULL, "bundleId" integer NOT NULL, "chainId" integer NOT NULL, "startBlock" integer NOT NULL, "endBlock" integer NOT NULL, CONSTRAINT "UK_bundleBlockRange_bundleId_chainId" UNIQUE ("bundleId", "chainId"), CONSTRAINT "PK_903331c592ac44aaf237755fd8b" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TYPE "public"."bundle_event_type_enum" AS ENUM('deposit', 'expiredDeposit', 'fill', 'slowFill', 'unexecutableSlowFill')`, - ); - await queryRunner.query( - `CREATE TABLE "bundle_event" ("id" SERIAL NOT NULL, "bundleId" integer NOT NULL, "type" "public"."bundle_event_type_enum" NOT NULL, "relayHash" character varying NOT NULL, "repaymentChainId" integer, CONSTRAINT "UK_bundleEvent_eventType_relayHash" UNIQUE ("type", "relayHash"), CONSTRAINT "PK_d633122fa4b52768e1b588bddee" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TYPE "public"."bundle_status_enum" AS ENUM('Proposed', 'Canceled', 'Disputed', 'Executed')`, - ); - await queryRunner.query( - `CREATE TABLE "bundle" ("id" SERIAL NOT NULL, "poolRebalanceRoot" character varying NOT NULL, "relayerRefundRoot" character varying NOT NULL, "slowRelayRoot" character varying NOT NULL, "proposalId" integer NOT NULL, "cancelationId" integer, "disputeId" integer, "status" "public"."bundle_status_enum" NOT NULL DEFAULT 'Proposed', "eventsAssociated" boolean NOT NULL DEFAULT false, CONSTRAINT "REL_a8344aa79161a63b6397cc8006" UNIQUE ("proposalId"), CONSTRAINT "REL_d728c78130d07f0857ca9d08f4" UNIQUE ("cancelationId"), CONSTRAINT "REL_707430c410bc8a69af9432bedf" UNIQUE ("disputeId"), CONSTRAINT "PK_637e3f87e837d6532109c198dea" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."proposed_root_bundle" ("id" SERIAL NOT NULL, "challengePeriodEndTimestamp" TIMESTAMP NOT NULL, "poolRebalanceLeafCount" integer NOT NULL, "bundleEvaluationBlockNumbers" jsonb NOT NULL, "chainIds" jsonb NOT NULL, "poolRebalanceRoot" character varying NOT NULL, "relayerRefundRoot" character varying NOT NULL, "slowRelayRoot" character varying NOT NULL, "proposer" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_proposedRootBundle_txHash" UNIQUE ("transactionHash"), CONSTRAINT "PK_61f8cd3411bf1976fdb13dca607" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."set_pool_rebalance_route" ("id" SERIAL NOT NULL, "destinationChainId" integer NOT NULL, "l1Token" character varying NOT NULL, "destinationToken" character varying NOT NULL, "blockNumber" integer NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_spr_transactionHash_transactionIndex_logIndex" UNIQUE ("transactionHash", "transactionIndex", "logIndex"), CONSTRAINT "PK_93edcf0d94f29e5cd34513baf9d" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."v3_funds_deposited" ("id" SERIAL NOT NULL, "relayHash" character varying NOT NULL, "depositId" integer NOT NULL, "originChainId" integer NOT NULL, "destinationChainId" integer NOT NULL, "fromLiteChain" boolean NOT NULL, "toLiteChain" boolean NOT NULL, "depositor" character varying NOT NULL, "recipient" character varying NOT NULL, "inputToken" character varying NOT NULL, "inputAmount" character varying NOT NULL, "outputToken" character varying NOT NULL, "outputAmount" character varying NOT NULL, "message" character varying NOT NULL, "exclusiveRelayer" character varying NOT NULL, "exclusivityDeadline" TIMESTAMP, "fillDeadline" TIMESTAMP NOT NULL, "quoteTimestamp" TIMESTAMP NOT NULL, "quoteBlockNumber" integer NOT NULL, "integratorId" character varying, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_v3FundsDeposited_depositId_originChainId" UNIQUE ("depositId", "originChainId"), CONSTRAINT "PK_7fb4637d005c1caba823aefdbd1" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TYPE "evm"."filled_v3_relay_filltype_enum" AS ENUM('0', '1', '2')`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."filled_v3_relay" ("id" SERIAL NOT NULL, "relayHash" character varying NOT NULL, "depositId" integer NOT NULL, "originChainId" integer NOT NULL, "destinationChainId" integer NOT NULL, "depositor" character varying NOT NULL, "recipient" character varying NOT NULL, "inputToken" character varying NOT NULL, "inputAmount" character varying NOT NULL, "outputToken" character varying NOT NULL, "outputAmount" character varying NOT NULL, "message" character varying NOT NULL, "exclusiveRelayer" character varying NOT NULL, "exclusivityDeadline" TIMESTAMP, "fillDeadline" TIMESTAMP NOT NULL, "updatedRecipient" character varying NOT NULL, "updatedMessage" character varying NOT NULL, "updatedOutputAmount" character varying NOT NULL, "fillType" "evm"."filled_v3_relay_filltype_enum" NOT NULL, "relayer" character varying NOT NULL, "repaymentChainId" integer NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_filledV3Relay_relayHash" UNIQUE ("relayHash"), CONSTRAINT "PK_8f1cc6f89a5ed042e3ed258d400" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."requested_v3_slow_fill" ("id" SERIAL NOT NULL, "relayHash" character varying NOT NULL, "depositId" integer NOT NULL, "originChainId" integer NOT NULL, "destinationChainId" integer NOT NULL, "depositor" character varying NOT NULL, "recipient" character varying NOT NULL, "inputToken" character varying NOT NULL, "inputAmount" character varying NOT NULL, "outputToken" character varying NOT NULL, "outputAmount" character varying NOT NULL, "message" character varying NOT NULL, "exclusiveRelayer" character varying NOT NULL, "exclusivityDeadline" TIMESTAMP, "fillDeadline" TIMESTAMP NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_requestedV3SlowFill_relayHash" UNIQUE ("relayHash"), CONSTRAINT "PK_ef6d61ccd9e937b8a798ad82d3c" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."requested_speed_up_v3_deposit" ("id" SERIAL NOT NULL, "originChainId" integer NOT NULL, "depositId" integer NOT NULL, "depositor" character varying NOT NULL, "updatedRecipient" character varying NOT NULL, "updatedMessage" character varying NOT NULL, "updatedOutputAmount" character varying NOT NULL, "depositorSignature" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "finalised" boolean NOT NULL, "blockNumber" integer NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_speedUpV3_depositId_originChain_txHash_logIdx" UNIQUE ("depositId", "originChainId", "transactionHash", "logIndex"), CONSTRAINT "PK_92225be4f84268c26a66b4eaa17" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."relayed_root_bundle" ("id" SERIAL NOT NULL, "chainId" integer NOT NULL, "rootBundleId" integer NOT NULL, "relayerRefundRoot" character varying NOT NULL, "slowRelayRoot" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_relayedRootBundle_chainId_rootBundleId" UNIQUE ("chainId", "rootBundleId"), CONSTRAINT "PK_b95beeb64004ee791b2195aaa80" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."executed_relayer_refund_root" ("id" SERIAL NOT NULL, "chainId" integer NOT NULL, "rootBundleId" integer NOT NULL, "leafId" integer NOT NULL, "l2TokenAddress" character varying NOT NULL, "amountToReturn" character varying NOT NULL, "refundAmounts" jsonb NOT NULL, "refundAddresses" jsonb NOT NULL, "caller" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_executedRelayerRefundRoot_chain_rootBundle_leaf" UNIQUE ("chainId", "rootBundleId", "leafId"), CONSTRAINT "PK_9785720b5a11005f37d894fd412" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "evm"."tokens_bridged" ("id" SERIAL NOT NULL, "chainId" integer NOT NULL, "leafId" integer NOT NULL, "l2TokenAddress" character varying NOT NULL, "amountToReturn" character varying NOT NULL, "caller" character varying NOT NULL, "transactionHash" character varying NOT NULL, "transactionIndex" integer NOT NULL, "logIndex" integer NOT NULL, "blockNumber" integer NOT NULL, "finalised" boolean NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_tokensBridged_chain_leaf_l2Token_txHash" UNIQUE ("chainId", "leafId", "l2TokenAddress", "transactionHash"), CONSTRAINT "PK_ca5a436f7fabd6c700cb7327415" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TYPE "public"."relay_hash_info_status_enum" AS ENUM('unfilled', 'filled', 'slowFillRequested', 'slowFilled', 'expired', 'refunded')`, - ); - await queryRunner.query( - `CREATE TABLE "relay_hash_info" ("id" SERIAL NOT NULL, "relayHash" character varying NOT NULL, "depositId" integer NOT NULL, "originChainId" integer NOT NULL, "destinationChainId" integer NOT NULL, "depositTxHash" character varying, "depositEventId" integer, "fillTxHash" character varying, "fillEventId" integer, "slowFillRequestEventId" integer, "fillDeadline" TIMESTAMP NOT NULL, "status" "public"."relay_hash_info_status_enum" NOT NULL DEFAULT 'unfilled', "depositRefundTxHash" character varying, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "UK_relayHashInfo_relayHash" UNIQUE ("relayHash"), CONSTRAINT "REL_4e5fd1998c43638a6e836a3636" UNIQUE ("depositEventId"), CONSTRAINT "REL_8aec45003aaa82a8550b9a1535" UNIQUE ("fillEventId"), CONSTRAINT "REL_37cf938a3a02547d23e967867a" UNIQUE ("slowFillRequestEventId"), CONSTRAINT "PK_cb69f68900aa0ce2756f103692f" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "webhook_request" ("id" character varying NOT NULL, "url" character varying NOT NULL, "filter" character varying NOT NULL, "clientId" text, CONSTRAINT "PK_67a7784045de2d1b7139b611b93" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "webhook_client" ("name" character varying NOT NULL, "id" SERIAL NOT NULL, "apiKey" character varying NOT NULL, "domains" jsonb NOT NULL, CONSTRAINT "UQ_242a96416f14915efcdecda3bd8" UNIQUE ("apiKey"), CONSTRAINT "PK_f7330fb3bdb2e19534eae691d44" PRIMARY KEY ("id"))`, - ); - await queryRunner.query( - `CREATE TABLE "bundle_executions" ("bundleId" integer NOT NULL, "executionId" integer NOT NULL, CONSTRAINT "PK_d781edd9ee5d58baab40ec27585" PRIMARY KEY ("bundleId", "executionId"))`, - ); - await queryRunner.query( - `CREATE INDEX "IDX_7ac73eb154127e8d68b3a881e7" ON "bundle_executions" ("bundleId") `, - ); - await queryRunner.query( - `CREATE INDEX "IDX_9551b3ed2ed4a9cf286637e51f" ON "bundle_executions" ("executionId") `, - ); - await queryRunner.query( - `ALTER TABLE "bundle_block_range" ADD CONSTRAINT "FK_f5c43af2e3e71193090d4f37285" FOREIGN KEY ("bundleId") REFERENCES "bundle"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_event" ADD CONSTRAINT "FK_62dcd4f6f0d1713fab0c8542dba" FOREIGN KEY ("bundleId") REFERENCES "bundle"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" ADD CONSTRAINT "FK_bundle_rootBundleProposeId" FOREIGN KEY ("proposalId") REFERENCES "evm"."proposed_root_bundle"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" ADD CONSTRAINT "FK_bundle_rootBundleCanceledId" FOREIGN KEY ("cancelationId") REFERENCES "evm"."root_bundle_canceled"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" ADD CONSTRAINT "FK_bundle_rootBundleDisputedId" FOREIGN KEY ("disputeId") REFERENCES "evm"."root_bundle_disputed"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" ADD CONSTRAINT "FK_relayHashInfo_depositEventId" FOREIGN KEY ("depositEventId") REFERENCES "evm"."v3_funds_deposited"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" ADD CONSTRAINT "FK_relayHashInfo_fillEventId" FOREIGN KEY ("fillEventId") REFERENCES "evm"."filled_v3_relay"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" ADD CONSTRAINT "FK_relayHashInfo_slowFillRequestEventId" FOREIGN KEY ("slowFillRequestEventId") REFERENCES "evm"."requested_v3_slow_fill"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_executions" ADD CONSTRAINT "FK_7ac73eb154127e8d68b3a881e7c" FOREIGN KEY ("bundleId") REFERENCES "bundle"("id") ON DELETE CASCADE ON UPDATE CASCADE`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_executions" ADD CONSTRAINT "FK_9551b3ed2ed4a9cf286637e51fa" FOREIGN KEY ("executionId") REFERENCES "evm"."root_bundle_executed"("id") ON DELETE CASCADE ON UPDATE CASCADE`, - ); - } - - public async down(queryRunner: QueryRunner): Promise { - await queryRunner.query( - `ALTER TABLE "bundle_executions" DROP CONSTRAINT "FK_9551b3ed2ed4a9cf286637e51fa"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_executions" DROP CONSTRAINT "FK_7ac73eb154127e8d68b3a881e7c"`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" DROP CONSTRAINT "FK_relayHashInfo_slowFillRequestEventId"`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" DROP CONSTRAINT "FK_relayHashInfo_fillEventId"`, - ); - await queryRunner.query( - `ALTER TABLE "relay_hash_info" DROP CONSTRAINT "FK_relayHashInfo_depositEventId"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" DROP CONSTRAINT "FK_bundle_rootBundleDisputedId"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" DROP CONSTRAINT "FK_bundle_rootBundleCanceledId"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle" DROP CONSTRAINT "FK_bundle_rootBundleProposeId"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_event" DROP CONSTRAINT "FK_62dcd4f6f0d1713fab0c8542dba"`, - ); - await queryRunner.query( - `ALTER TABLE "bundle_block_range" DROP CONSTRAINT "FK_f5c43af2e3e71193090d4f37285"`, - ); - await queryRunner.query( - `DROP INDEX "public"."IDX_9551b3ed2ed4a9cf286637e51f"`, - ); - await queryRunner.query( - `DROP INDEX "public"."IDX_7ac73eb154127e8d68b3a881e7"`, - ); - await queryRunner.query(`DROP TABLE "bundle_executions"`); - await queryRunner.query(`DROP TABLE "webhook_client"`); - await queryRunner.query(`DROP TABLE "webhook_request"`); - await queryRunner.query(`DROP TABLE "relay_hash_info"`); - await queryRunner.query(`DROP TYPE "public"."relay_hash_info_status_enum"`); - await queryRunner.query(`DROP TABLE "evm"."tokens_bridged"`); - await queryRunner.query(`DROP TABLE "evm"."executed_relayer_refund_root"`); - await queryRunner.query(`DROP TABLE "evm"."relayed_root_bundle"`); - await queryRunner.query(`DROP TABLE "evm"."requested_speed_up_v3_deposit"`); - await queryRunner.query(`DROP TABLE "evm"."requested_v3_slow_fill"`); - await queryRunner.query(`DROP TABLE "evm"."filled_v3_relay"`); - await queryRunner.query(`DROP TYPE "evm"."filled_v3_relay_filltype_enum"`); - await queryRunner.query(`DROP TABLE "evm"."v3_funds_deposited"`); - await queryRunner.query(`DROP TABLE "evm"."set_pool_rebalance_route"`); - await queryRunner.query(`DROP TABLE "evm"."proposed_root_bundle"`); - await queryRunner.query(`DROP TABLE "bundle"`); - await queryRunner.query(`DROP TYPE "public"."bundle_status_enum"`); - await queryRunner.query(`DROP TABLE "bundle_event"`); - await queryRunner.query(`DROP TYPE "public"."bundle_event_type_enum"`); - await queryRunner.query(`DROP TABLE "bundle_block_range"`); - await queryRunner.query(`DROP TABLE "evm"."root_bundle_disputed"`); - await queryRunner.query(`DROP TABLE "evm"."root_bundle_executed"`); - await queryRunner.query(`DROP TABLE "evm"."root_bundle_canceled"`); - } -} diff --git a/packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts b/packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts new file mode 100644 index 0000000..bbf1d23 --- /dev/null +++ b/packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts @@ -0,0 +1,15 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class WebhookRequest1732310112989 implements MigrationInterface { + name = "WebhookRequest1732310112989"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE INDEX "IX_webhook_request_filter" ON "webhook_request" ("filter") `, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IX_webhook_request_filter"`); + } +} diff --git a/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts index 4a8c740..c5721c5 100644 --- a/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts +++ b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts @@ -1,6 +1,7 @@ import { Logger } from "winston"; import { DataSource } from "@repo/indexer-database"; +import { eventProcessorManager } from "@repo/webhooks"; import { Config } from "../../parseEnv"; import { HubPoolRepository } from "../../database/HubPoolRepository"; @@ -39,6 +40,7 @@ export class AcrossIndexerManager { private spokePoolRepository: SpokePoolRepository, private redisCache: RedisCache, private indexerQueuesService: IndexerQueuesService, + private webhookWriteFn?: eventProcessorManager.WebhookWriteFn, ) {} public async start() { @@ -93,7 +95,12 @@ export class AcrossIndexerManager { this.hubPoolClientFactory, this.spokePoolClientFactory, this.spokePoolRepository, - new SpokePoolProcessor(this.postgres, this.logger, chainId), + new SpokePoolProcessor( + this.postgres, + this.logger, + chainId, + this.webhookWriteFn, + ), this.indexerQueuesService, ); const spokePoolIndexer = new Indexer( diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 56847a5..3767271 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -57,10 +57,10 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { // Call write to kick off webhook calls const { write } = WebhookFactory( { - requireApiKey: false, enabledWebhooks: [WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: true, }, - { postgres, logger }, + { postgres, logger, redis }, ); // Retry providers factory const retryProvidersFactory = new RetryProvidersFactory( @@ -96,6 +96,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { new SpokePoolRepository(postgres, logger), redisCache, indexerQueuesService, + write, ); const bundleServicesManager = new BundleServicesManager( config, diff --git a/packages/indexer/src/services/spokePoolProcessor.ts b/packages/indexer/src/services/spokePoolProcessor.ts index b913d3b..9c9bcb5 100644 --- a/packages/indexer/src/services/spokePoolProcessor.ts +++ b/packages/indexer/src/services/spokePoolProcessor.ts @@ -1,11 +1,14 @@ import { utils } from "@across-protocol/sdk"; +import winston from "winston"; + import { DataSource, entities, utils as dbUtils, SaveQueryResultType, } from "@repo/indexer-database"; -import winston from "winston"; +import { WebhookTypes, eventProcessorManager } from "@repo/webhooks"; + import { RelayStatus } from "../../../indexer-database/dist/src/entities"; import { StoreEventsResult } from "../data-indexing/service/SpokePoolIndexerDataHandler"; @@ -22,6 +25,7 @@ export class SpokePoolProcessor { private readonly postgres: DataSource, private readonly logger: winston.Logger, private readonly chainId: number, + private readonly webhookWriteFn?: eventProcessorManager.WebhookWriteFn, ) {} public async process(events: StoreEventsResult) { @@ -37,9 +41,19 @@ export class SpokePoolProcessor { SpokePoolEvents.V3FundsDeposited, [...newDeposits, ...updatedDeposits], ); - // TODO: for new deposits, notify status change to unfilled - // here... + // Notify webhook of new deposits + newDeposits.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.id, + originChainId: deposit.originChainId, + depositTxHash: deposit.transactionHash, + status: RelayStatus.Unfilled, + }, + }); + }); const newSlowFillRequests = dbUtils.filterSaveQueryResults( events.slowFillRequests, SaveQueryResultType.Inserted, @@ -52,8 +66,19 @@ export class SpokePoolProcessor { SpokePoolEvents.RequestedV3SlowFill, [...newSlowFillRequests, ...updatedSlowFillRequests], ); - // TODO: for new slow fill requests, notify status change to slow fill requested - // here... + + // Notify webhook of new slow fill requests + newSlowFillRequests.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.id, + originChainId: deposit.originChainId, + depositTxHash: deposit.transactionHash, + status: RelayStatus.SlowFillRequested, + }, + }); + }); const newFills = dbUtils.filterSaveQueryResults( events.fills, @@ -67,16 +92,38 @@ export class SpokePoolProcessor { ...newFills, ...updatedFills, ]); - // TODO: for new fills, notify status change to filled - // here... + + // Notify webhook of new fills + newFills.forEach((fill) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: fill.depositId, + originChainId: fill.originChainId, + depositTxHash: fill.transactionHash, + status: RelayStatus.Filled, + }, + }); + }); const expiredDeposits = await this.updateExpiredRelays(); // TODO: for expired deposits, notify status change to expired // here... const refundedDeposits = await this.updateRefundedDepositsStatus(); - // TODO: for refunded deposits, notify status change to refunded - // here... + + // Notify webhook of refunded deposits + refundedDeposits.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.depositId, + originChainId: deposit.originChainId, + depositTxHash: deposit.depositTxHash, + status: RelayStatus.Refunded, + }, + }); + }); } /** diff --git a/packages/webhooks/README.md b/packages/webhooks/README.md index 189a516..0ec7ad7 100644 --- a/packages/webhooks/README.md +++ b/packages/webhooks/README.md @@ -11,7 +11,6 @@ The `factory.ts` file provides a `WebhookFactory` function that sets up the webh To use the `WebhookFactory`, you need to provide a configuration object and dependencies: - **Config**: This object should include: - - requireApiKey: boolean; - Should registration of new webhooks require an api key - enabledWebhooks: WebhookTypes[]; - What event processors should be enabled, like 'DepositStatus' - **Dependencies**: This object should include: @@ -27,8 +26,8 @@ import { DataSource } from "@repo/indexer-database"; const webhooks = WebhookFactory( { - requireApiKey: false, - enableWebhooks: [WebhookTypes.DepositStatus], + enabledWebhooks: [WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: false, }, { postgres, logger }, ); diff --git a/packages/webhooks/package.json b/packages/webhooks/package.json index 580c468..7ecb36e 100644 --- a/packages/webhooks/package.json +++ b/packages/webhooks/package.json @@ -22,8 +22,10 @@ "license": "ISC", "dependencies": { "@repo/indexer-database": "workspace:*", + "bullmq": "^5.12.12", "express": "^4.19.2", "express-bearer-token": "^3.0.0", + "ioredis": "^5.4.1", "redis": "^4.7.0", "superstruct": "2.0.3-1", "uuid": "^11.0.3" diff --git a/packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts b/packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts new file mode 100644 index 0000000..2956077 --- /dev/null +++ b/packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts @@ -0,0 +1,84 @@ +import Redis from "ioredis"; +import winston from "winston"; +import { Job, Worker } from "bullmq"; + +import { DataSource, entities } from "@repo/indexer-database"; + +import { WebhooksQueues } from "./WebhooksQueuesService"; +import { WebhookTypes } from "../../factory"; +import { WebhookWriteFn } from "../../eventProcessorManager"; + +export type WebhookRequestQueueJob = { + webhookRequestId: string; + depositTxHash: string; + originChainId: number; +}; + +export class WebhookRequestWorker { + public worker: Worker; + + constructor( + private redis: Redis, + private postgres: DataSource, + private logger: winston.Logger, + private webhookWriteFn: WebhookWriteFn, + ) { + this.setWorker(); + } + + public setWorker() { + this.worker = new Worker( + WebhooksQueues.WebhookRequest, + async (job: Job) => { + try { + this.logger.debug({ + at: "WebhookRequestWorker", + message: `Processing job for webhook request ${job.data.webhookRequestId}`, + }); + await this.run(job.data); + } catch (error) { + this.logger.error({ + at: "WebhookRequestWorker", + message: `Error processing job for webhook request ${job.data.webhookRequestId}`, + error, + }); + throw error; + } + }, + { connection: this.redis, concurrency: 10 }, + ); + } + + private async run(webhookRequestJob: WebhookRequestQueueJob) { + const { depositTxHash, originChainId } = webhookRequestJob; + const relayHashInfo = await this.postgres + .getRepository(entities.RelayHashInfo) + .findOne({ + where: { + depositTxHash, + originChainId, + }, + }); + if (!relayHashInfo) { + this.logger.warn({ + at: "WebhookRequestWorker", + message: `Relay hash info not found for webhook request ${webhookRequestJob.webhookRequestId}`, + webhookRequestJob, + }); + return; + } + this.webhookWriteFn({ + type: WebhookTypes.DepositStatus, + event: { + originChainId: relayHashInfo.originChainId, + depositTxHash: relayHashInfo.depositTxHash, + depositId: relayHashInfo.depositId, + status: relayHashInfo.status, + }, + }); + } + + public async close() { + return this.worker.close(); + } +} diff --git a/packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts b/packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts new file mode 100644 index 0000000..f25cd1d --- /dev/null +++ b/packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts @@ -0,0 +1,53 @@ +import Redis from "ioredis"; +import { Queue, JobsOptions, BulkJobOptions } from "bullmq"; + +export enum WebhooksQueues { + WebhookRequest = "WebhookRequest", +} + +export class WebhooksQueuesService { + private queues = {} as Record; + + constructor(private connection: Redis) { + this.initializeQueues(); + } + + private initializeQueues() { + const queueNames = Object.values(WebhooksQueues); + queueNames.forEach( + (queueName) => + (this.queues[queueName] = new Queue(queueName, { + connection: this.connection, + defaultJobOptions: { + attempts: Number.MAX_SAFE_INTEGER, + removeOnComplete: true, + }, + })), + ); + } + + public async publishMessage( + queue: WebhooksQueues, + message: T, + options: JobsOptions = {}, + ) { + const q = this.queues[queue]; + if (q) { + await q.add(queue, message, options); + } + } + + public async publishMessagesBulk( + queue: WebhooksQueues, + jobName: string, + messages: T[], + options: BulkJobOptions = {}, + ) { + const q = this.queues[queue]; + if (q) { + await q.addBulk( + messages.map((m) => ({ name: jobName, data: m, opts: options })), + ); + } + } +} diff --git a/packages/webhooks/src/database/webhookClientRepository.ts b/packages/webhooks/src/database/webhookClientRepository.ts index ec4c540..32035ae 100644 --- a/packages/webhooks/src/database/webhookClientRepository.ts +++ b/packages/webhooks/src/database/webhookClientRepository.ts @@ -19,7 +19,7 @@ export class WebhookClientRepository { await this.repository.insert(client); } - public async unregisterClient(clientId: string): Promise { + public async unregisterClient(clientId: number): Promise { const existingClient = await this.repository.findOne({ where: { id: clientId }, }); @@ -30,7 +30,7 @@ export class WebhookClientRepository { } public async getClient( - clientId: string, + clientId: number, ): Promise { return ( (await this.repository.findOne({ where: { id: clientId } })) ?? undefined @@ -48,4 +48,11 @@ export class WebhookClientRepository { assert(result, "Invalid api key"); return result; } + + public async getWebhookClientById( + id: number, + ): Promise { + const client = await this.repository.findOne({ where: { id } }); + return client ?? undefined; + } } diff --git a/packages/webhooks/src/database/webhookRequestRepository.ts b/packages/webhooks/src/database/webhookRequestRepository.ts index e6abc76..c632b33 100644 --- a/packages/webhooks/src/database/webhookRequestRepository.ts +++ b/packages/webhooks/src/database/webhookRequestRepository.ts @@ -9,7 +9,9 @@ export class WebhookRequestRepository { this.repository = this.dataSource.getRepository(entities.WebhookRequest); } - public async register(webhook: entities.WebhookRequest): Promise { + public async register( + webhook: Omit, + ): Promise { const existingWebhook = await this.repository.findOne({ where: { id: webhook.id }, }); @@ -47,6 +49,13 @@ export class WebhookRequestRepository { return this.repository.find({ where: { filter } }); } + public async findWebhookRequestsByFilterAndClient( + filter: string, + clientId: number, + ): Promise { + return this.repository.find({ where: { filter, clientId } }); + } + public async hasWebhookRequest(webhookId: string): Promise { const result = await this.repository.findOne({ where: { id: webhookId }, diff --git a/packages/webhooks/src/eventProcessorManager.ts b/packages/webhooks/src/eventProcessorManager.ts index 899f0ad..12aa53a 100644 --- a/packages/webhooks/src/eventProcessorManager.ts +++ b/packages/webhooks/src/eventProcessorManager.ts @@ -1,13 +1,23 @@ -import { WebhookClientRepository } from "./database/webhookClientRepository"; -import { DataSource, entities } from "@repo/indexer-database"; import { Logger } from "winston"; import assert from "assert"; + +import { DataSource, entities } from "@repo/indexer-database"; + +import { WebhookClientRepository } from "./database/webhookClientRepository"; import { JSONValue, IEventProcessor } from "./types"; +import { + WebhooksQueues, + WebhooksQueuesService, +} from "./adapter/messaging/WebhooksQueuesService"; +import { WebhookTypes } from "./factory"; +import { WebhookRequestQueueJob } from "./adapter/messaging/WebhookRequestWorker"; export type EventProcessorRecord = Record; -type EventType = { - type: string; +export type WebhookWriteFn = (event: EventType) => void; + +export type EventType = { + type: WebhookTypes; event: JSONValue; }; @@ -18,21 +28,22 @@ export type Config = { export type Dependencies = { postgres: DataSource; logger: Logger; + webhooksQueuesService: WebhooksQueuesService; }; export class EventProcessorManager { private logger: Logger; private clientRepository: WebhookClientRepository; - private processors = new Map(); + private processors = new Map(); + private webhooksQueuesService: WebhooksQueuesService; - constructor( - private config: Config, - deps: Dependencies, - ) { + constructor(deps: Dependencies) { this.logger = deps.logger; this.clientRepository = new WebhookClientRepository(deps.postgres); // Initialize the client manager + this.webhooksQueuesService = deps.webhooksQueuesService; } + // Register a new type of webhook processor able to be written to - public registerEventProcessor(name: string, webhook: IEventProcessor) { + public registerEventProcessor(name: WebhookTypes, webhook: IEventProcessor) { this.logger.debug( `Attempting to register event processor with name: ${name}`, ); @@ -46,15 +57,12 @@ export class EventProcessorManager { ); } - private getEventProcessor(name: string) { + private getEventProcessor(name: WebhookTypes) { const eventProcessor = this.processors.get(name); - assert( - eventProcessor, - "EventProcessor does not exist by type: ${event.type}", - ); + assert(eventProcessor, `EventProcessor does not exist by type: ${name}`); return eventProcessor; } - write = (event: EventType): void => { + write: WebhookWriteFn = (event: EventType): void => { const webhook = this.getEventProcessor(event.type); webhook.write(event.event); }; @@ -62,35 +70,39 @@ export class EventProcessorManager { async registerWebhook( id: string, params: { type: string; url: string; filter: JSONValue }, - apiKey?: string, + apiKey: string, ) { this.logger.debug( `Attempting to register webhook of type: ${params.type} with URL: ${params.url}`, ); - let client; - if (this.config.requireApiKey) { - if (apiKey === undefined) throw new Error("Api Key required"); - client = await this.clientRepository.getClientByApiKey(apiKey); - const urlDomain = new URL(params.url).hostname; - const isDevDomain = - urlDomain === "localhost" || urlDomain.startsWith("127."); - if (!isDevDomain) { - const isDomainValid = client.domains.includes(urlDomain); - assert( - isDomainValid, - "The base URL of the provided webhook does not match any of the client domains", - ); - } - } - const webhook = this.getEventProcessor(params.type); - const result = await webhook.register( + const client = await this.clientRepository.getClientByApiKey(apiKey); + const urlDomain = new URL(params.url).hostname; + const isDomainValid = client.domains.includes(urlDomain); + assert( + isDomainValid, + "The base URL of the provided webhook does not match any of the client domains", + ); + assert((params.filter as any).depositTxHash, "depositTxHash is required"); + assert((params.filter as any).originChainId, "originChainId is required"); + const webhook = this.getEventProcessor(params.type as WebhookTypes); + const webhookRequestId = await webhook.register( id, params.url, params.filter, - client?.id, + client.id, + ); + this.logger.debug( + `Successfully registered webhook with ID: ${webhookRequestId}`, + ); + this.webhooksQueuesService.publishMessage( + WebhooksQueues.WebhookRequest, + { + webhookRequestId, + depositTxHash: (params.filter as any).depositTxHash, + originChainId: (params.filter as any).originChainId, + }, ); - this.logger.debug(`Successfully registered webhook with ID: ${result}`); - return result; + return webhookRequestId; } // TODO: gaurd this with api key @@ -101,7 +113,7 @@ export class EventProcessorManager { this.logger.debug( `Attempting to unregister webhook of type: ${params.type} with ID: ${params.id}`, ); - const webhook = this.getEventProcessor(params.type); + const webhook = this.getEventProcessor(params.type as WebhookTypes); await webhook.unregister(params.id); this.logger.debug( `Successfully unregistered webhook with ID: ${params.id}`, diff --git a/packages/webhooks/src/eventProcessors/depositStatus.ts b/packages/webhooks/src/eventProcessors/depositStatus.ts index 9f0fd5d..889a119 100644 --- a/packages/webhooks/src/eventProcessors/depositStatus.ts +++ b/packages/webhooks/src/eventProcessors/depositStatus.ts @@ -1,11 +1,13 @@ import assert from "assert"; import * as ss from "superstruct"; +import { Logger } from "winston"; import { DataSource, entities } from "@repo/indexer-database"; import { WebhookRequestRepository } from "../database/webhookRequestRepository"; import { customId } from "../utils"; import { IEventProcessor, NotificationPayload } from "../types"; +import { WebhookClientRepository } from "../database/webhookClientRepository"; export const DepositStatusEvent = ss.object({ originChainId: ss.number(), @@ -24,18 +26,22 @@ export type DepositStatusFilter = ss.Infer; export type Dependencies = { notify: (params: NotificationPayload) => void; postgres: DataSource; + logger: Logger; }; export class DepositStatusProcessor implements IEventProcessor { private webhookRequests: WebhookRequestRepository; + private webhookClientsRepository: WebhookClientRepository; private notify: (params: NotificationPayload) => void; - private postgres: DataSource; + private logger: Logger; + constructor( deps: Dependencies, private type: string = "DepositStatus", ) { this.webhookRequests = new WebhookRequestRepository(deps.postgres); + this.webhookClientsRepository = new WebhookClientRepository(deps.postgres); this.notify = deps.notify; - this.postgres = deps.postgres; + this.logger = deps.logger; } private async _write(event: DepositStatusEvent): Promise { const filter = customId( @@ -43,37 +49,67 @@ export class DepositStatusProcessor implements IEventProcessor { event.originChainId, event.depositTxHash, ); - const hooks = + const webhookRequests = await this.webhookRequests.findWebhookRequestsByFilter(filter); + const uniqueClientIds = [ + ...new Set(webhookRequests.map((hook) => hook.clientId)), + ]; + const clients = await Promise.all( + uniqueClientIds.map((id) => + this.webhookClientsRepository.getWebhookClientById(id), + ), + ); + const clientsMap = clients + .filter((client) => client !== undefined) + .reduce( + (acc, client) => { + acc[client.id] = client; + return acc; + }, + {} as Record, + ); + //TODO: unregister any hooks where event has reached terminal state - await Promise.all( - hooks.map((hook) => { + webhookRequests.forEach((hook) => { + const client = clientsMap[hook.clientId]; + if (client) { this.notify({ url: hook.url, - data: event, + data: { ...event, webhookRequestId: hook.id }, + apiKey: client.apiKey, }); - }), - ); + } else { + this.logger.error({ + at: "DepositStatusProcessor::_write", + message: `Client not found for webhook request ${hook.id}`, + webhookRequest: hook, + }); + } + }); } + write(e: unknown) { this._write(ss.create(e, DepositStatusEvent)).catch((err) => console.error(err), ); } + private async _register( id: string, url: string, params: DepositStatusFilter, - clientId?: string, + clientId: number, ): Promise { const filter = customId( this.type, - clientId ?? "", params.originChainId, params.depositTxHash, ); const existingFilters = - await this.webhookRequests.findWebhookRequestsByFilter(filter); + await this.webhookRequests.findWebhookRequestsByFilterAndClient( + filter, + clientId, + ); assert( existingFilters.length === 0, "Webhook already exists for this filter", @@ -84,21 +120,9 @@ export class DepositStatusProcessor implements IEventProcessor { url, clientId, }); - const relayHashInfoRepository = this.postgres.getRepository( - entities.RelayHashInfo, - ); - const relayHashInfo = await relayHashInfoRepository.findOne({ - where: params, - }); - if (relayHashInfo) - this._write({ - depositId: relayHashInfo.depositId, - status: relayHashInfo.status, - ...params, - }); return id; } - async register(id: string, url: string, params: unknown, clientId?: string) { + async register(id: string, url: string, params: unknown, clientId: number) { return this._register( id, url, diff --git a/packages/webhooks/src/factory.ts b/packages/webhooks/src/factory.ts index 7a99fc0..1d5023e 100644 --- a/packages/webhooks/src/factory.ts +++ b/packages/webhooks/src/factory.ts @@ -1,38 +1,42 @@ import assert from "assert"; -import { EventProcessorManager } from "./eventProcessorManager"; -import { DataSource } from "@repo/indexer-database"; import { Logger } from "winston"; +import { Redis } from "ioredis"; + +import { DataSource } from "@repo/indexer-database"; +import { EventProcessorManager } from "./eventProcessorManager"; import { WebhookNotifier } from "./notifier"; import { DepositStatusProcessor } from "./eventProcessors"; import { WebhookRouter } from "./router"; +import { WebhooksQueuesService } from "./adapter/messaging/WebhooksQueuesService"; +import { WebhookRequestWorker } from "./adapter/messaging/WebhookRequestWorker"; export enum WebhookTypes { DepositStatus = "DepositStatus", } export type Config = { - requireApiKey: boolean; enabledWebhooks: WebhookTypes[]; + enabledWebhookRequestWorkers: boolean; }; type Dependencies = { postgres: DataSource; + redis: Redis; logger: Logger; }; export function WebhookFactory(config: Config, deps: Dependencies) { - const { logger, postgres } = deps; + const { logger, postgres, redis } = deps; const notifier = new WebhookNotifier({ logger }); assert( config.enabledWebhooks.length, "No webhooks enabled, specify one in config", ); - const eventProcessorManager = new EventProcessorManager( - config ?? { requireApiKey: false }, - { - postgres, - logger, - }, - ); + const webhooksQueuesService = new WebhooksQueuesService(redis); + const eventProcessorManager = new EventProcessorManager({ + postgres, + logger, + webhooksQueuesService, + }); config.enabledWebhooks.forEach((name) => { switch (name) { // add more webhook types here @@ -42,6 +46,7 @@ export function WebhookFactory(config: Config, deps: Dependencies) { new DepositStatusProcessor( { postgres, + logger, notify: notifier.notify, }, WebhookTypes.DepositStatus, @@ -54,6 +59,14 @@ export function WebhookFactory(config: Config, deps: Dependencies) { } } }); + if (config.enabledWebhookRequestWorkers) { + new WebhookRequestWorker( + redis, + postgres, + logger, + eventProcessorManager.write, + ); + } const router = WebhookRouter({ eventProcessorManager }); return { write: eventProcessorManager.write, diff --git a/packages/webhooks/src/notifier.ts b/packages/webhooks/src/notifier.ts index 810ed2b..7c89666 100644 --- a/packages/webhooks/src/notifier.ts +++ b/packages/webhooks/src/notifier.ts @@ -10,7 +10,9 @@ export type Dependencies = { export class BaseNotifier { private logger: Logger; - constructor(private deps: Dependencies) {} + constructor(private deps: Dependencies) { + this.logger = deps.logger; + } public notify = (payload: NotificationPayload): void => { this.deps.notify(payload).catch((error) => { diff --git a/packages/webhooks/src/router.ts b/packages/webhooks/src/router.ts index 60f677c..7e4aa4b 100644 --- a/packages/webhooks/src/router.ts +++ b/packages/webhooks/src/router.ts @@ -27,18 +27,18 @@ export function WebhookRouter(deps: Dependencies): express.Router { router.post( "/webhook", async ( - req: express.Request & { token?: string }, + req: express.Request, res: express.Response, next: express.NextFunction, ) => { try { const parsedBody = RegistrationParams.create(req.body); + const token = req.token; + if (!token) { + throw new Error("API Key required"); + } const id = uuidv4(); - await deps.eventProcessorManager.registerWebhook( - id, - parsedBody, - req.token, - ); + await deps.eventProcessorManager.registerWebhook(id, parsedBody, token); res.status(201).send(id); } catch (error) { next(error); diff --git a/packages/webhooks/src/types.ts b/packages/webhooks/src/types.ts index dedc0a5..cfb74bb 100644 --- a/packages/webhooks/src/types.ts +++ b/packages/webhooks/src/types.ts @@ -6,7 +6,7 @@ export interface IEventProcessor { id: string, url: string, params: JSONValue, - clientId?: string, + clientId: number, ): Promise; unregister(id: string): Promise; } @@ -22,6 +22,7 @@ export type JSONValue = export type NotificationPayload = { url: string; data: JSONValue; + apiKey: string; }; export const RegistrationParams = ss.object({ diff --git a/packages/webhooks/src/utils.ts b/packages/webhooks/src/utils.ts index dd6a0ca..2bde3ed 100644 --- a/packages/webhooks/src/utils.ts +++ b/packages/webhooks/src/utils.ts @@ -1,10 +1,11 @@ import { NotificationPayload } from "./types"; export async function post(params: NotificationPayload): Promise { - const { url, data } = params; + const { url, data, apiKey } = params; const response = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json", + Authorization: `Bearer ${apiKey}`, }, body: JSON.stringify(data), }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1d1c6eb..3172ff3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -509,12 +509,18 @@ importers: '@repo/indexer-database': specifier: workspace:* version: link:../indexer-database + bullmq: + specifier: ^5.12.12 + version: 5.12.14 express: specifier: ^4.19.2 version: 4.21.1 express-bearer-token: specifier: ^3.0.0 version: 3.0.0 + ioredis: + specifier: ^5.4.1 + version: 5.4.1 redis: specifier: ^4.7.0 version: 4.7.0