Skip to content

Commit

Permalink
feat(webhooks): allow client registration through env (#118)
Browse files Browse the repository at this point in the history
Signed-off-by: david <david@umaproject.org>
  • Loading branch information
daywiss authored Nov 27, 2024
1 parent df6c94d commit f0ca94a
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 33 deletions.
4 changes: 3 additions & 1 deletion packages/indexer-api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ export async function Main(
const postgres = await connectToDatabase(postgresConfig, logger);
const redisConfig = Indexer.parseRedisConfig(env);
const redis = await initializeRedis(redisConfig, logger);
const webhooks = Webhooks.WebhookFactory(
const webhooks = await Webhooks.WebhookFactory(
{
enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: false,
// indexer will register clients
clients: [],
},
{ postgres, logger, redis },
);
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-database/src/entities/WebhookClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm";

@Entity()
@Unique("UK_webhook_client_api_key", ["apiKey"])
@Unique("UK_webhook_client_name", ["name"])
export class WebhookClient {
@PrimaryGeneratedColumn()
id: number;
Expand Down
17 changes: 17 additions & 0 deletions packages/indexer-database/src/migrations/1732730161339-Webhook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class Webhook1732730161339 implements MigrationInterface {
name = "Webhook1732730161339";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "webhook_client" ADD CONSTRAINT "UQ_a08bea1c3eba7711301141ae001" UNIQUE ("name")`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "webhook_client" DROP CONSTRAINT "UQ_a08bea1c3eba7711301141ae001"`,
);
}
}
12 changes: 5 additions & 7 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const redisCache = new RedisCache(redis);
const postgres = await connectToDatabase(postgresConfig, logger);
// Call write to kick off webhook calls
const { write } = WebhookFactory(
{
enabledWebhooks: [WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: true,
},
{ postgres, logger, redis },
);
const { write } = await WebhookFactory(config.webhookConfig, {
postgres,
logger,
redis,
});
// Retry providers factory
const retryProvidersFactory = new RetryProvidersFactory(
redisCache,
Expand Down
12 changes: 12 additions & 0 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import * as s from "superstruct";
import { DatabaseConfig } from "@repo/indexer-database";
import { getNoTtlBlockDistance } from "./web3/constants";
import { assert } from "@repo/error-handling";
import {
Config as WebhooksConfig,
WebhookTypes,
parseWebhookClientsFromString,
} from "@repo/webhooks";

export type Config = {
redisConfig: RedisConfig;
Expand All @@ -12,6 +17,7 @@ export type Config = {
enableBundleEventsProcessor: boolean;
enableBundleIncludedEventsService: boolean;
enableBundleBuilder: boolean;
webhookConfig: WebhooksConfig;
};
export type RedisConfig = {
host: string;
Expand Down Expand Up @@ -182,6 +188,11 @@ export function envToConfig(env: Env): Config {
`SPOKEPOOL_CHAINS_ENABLED=${chainId} but did not find any corresponding RPC_PROVIDER_URLS_${chainId}`,
);
});
const webhookConfig = {
enabledWebhooks: [WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: true,
clients: parseWebhookClientsFromString(env.WEBHOOK_CLIENTS ?? "[]"),
};
return {
redisConfig,
postgresConfig,
Expand All @@ -191,5 +202,6 @@ export function envToConfig(env: Env): Config {
enableBundleEventsProcessor,
enableBundleIncludedEventsService,
enableBundleBuilder,
webhookConfig,
};
}
43 changes: 34 additions & 9 deletions packages/webhooks/src/database/webhookClientRepository.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { entities, DataSource } from "@repo/indexer-database";
import { exists } from "../utils";
import assert from "assert";

// This class is intended to store integration clients allowed to use the webhook service.
Expand All @@ -9,14 +10,30 @@ export class WebhookClientRepository {
this.repository = this.dataSource.getRepository(entities.WebhookClient);
}

public async registerClient(client: entities.WebhookClient): Promise<void> {
const existingClient = await this.repository.findOne({
where: { id: client.id },
});
if (existingClient) {
throw new Error(`Client with id ${client.id} already exists.`);
public async registerClient(
client: Omit<entities.WebhookClient, "id">,
): Promise<entities.WebhookClient> {
assert(
!(await this.hasClientByName(client.name)),
"Client with that name already exists",
);
const result = await this.repository.insert(client);
return result.raw[0];
}
public async upsertClient(
client: Omit<entities.WebhookClient, "id">,
): Promise<entities.WebhookClient> {
if (await this.hasClientByName(client.name)) {
return this.updateClientByName(client);
} else {
return this.registerClient(client);
}
await this.repository.insert(client);
}
public async updateClientByName(
client: Omit<entities.WebhookClient, "id">,
): Promise<entities.WebhookClient> {
const result = await this.repository.update({ name: client.name }, client);
return result.raw[0];
}

public async unregisterClient(clientId: number): Promise<void> {
Expand All @@ -40,12 +57,20 @@ export class WebhookClientRepository {
public async listClients(): Promise<entities.WebhookClient[]> {
return this.repository.find();
}

public async hasClientByName(name: string): Promise<boolean> {
const result = await this.repository.findOne({ where: { name } });
return exists(result);
}
public async getClientByName(name: string): Promise<entities.WebhookClient> {
const result = await this.repository.findOne({ where: { name } });
assert(result, `Client by name: ${name} does not exist`);
return result;
}
public async getClientByApiKey(
apiKey: string,
): Promise<entities.WebhookClient> {
const result = await this.repository.findOne({ where: { apiKey } });
assert(result, "Invalid api key");
assert(result, `Client by apiKey: ${apiKey} does not exist`);
return result;
}

Expand Down
22 changes: 9 additions & 13 deletions packages/webhooks/src/eventProcessorManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export type Dependencies = {
postgres: DataSource;
logger: Logger;
webhooksQueuesService: WebhooksQueuesService;
clientRepository: WebhookClientRepository;
};
export class EventProcessorManager {
private logger: Logger;
Expand All @@ -38,8 +39,8 @@ export class EventProcessorManager {

constructor(deps: Dependencies) {
this.logger = deps.logger;
this.clientRepository = new WebhookClientRepository(deps.postgres); // Initialize the client manager
this.webhooksQueuesService = deps.webhooksQueuesService;
this.clientRepository = deps.clientRepository;
}

// Register a new type of webhook processor able to be written to
Expand Down Expand Up @@ -76,12 +77,13 @@ export class EventProcessorManager {
`Attempting to register webhook of type: ${params.type} with URL: ${params.url}`,
);
const client = await this.clientRepository.getClientByApiKey(apiKey);
const urlDomain = new URL(params.url).hostname;
const isDomainValid = client.domains.includes(urlDomain);
assert(
isDomainValid,
"The base URL of the provided webhook does not match any of the client domains",
);
// TODO: Reinable this potentially when we need it, but not great for testing
// const urlDomain = new URL(params.url).hostname;
// const isDomainValid = client.domains.includes(urlDomain);
// assert(
// isDomainValid,
// "The base URL of the provided webhook does not match any of the client domains",
// );
assert((params.filter as any).depositTxHash, "depositTxHash is required");
assert((params.filter as any).originChainId, "originChainId is required");
const webhook = this.getEventProcessor(params.type as WebhookTypes);
Expand Down Expand Up @@ -119,10 +121,4 @@ export class EventProcessorManager {
`Successfully unregistered webhook with ID: ${params.id}`,
);
}

async registerClient(client: entities.WebhookClient) {
this.logger.debug(`Attempting to register client with ID: ${client.id}`);
await this.clientRepository.registerClient(client);
this.logger.debug(`Successfully registered client with ID: ${client.id}`);
}
}
19 changes: 17 additions & 2 deletions packages/webhooks/src/factory.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Logger } from "winston";
import { Redis } from "ioredis";

import { DataSource } from "@repo/indexer-database";
import { DataSource, entities } from "@repo/indexer-database";
import { assert } from "@repo/error-handling";

import { EventProcessorManager } from "./eventProcessorManager";
Expand All @@ -10,6 +10,8 @@ import { DepositStatusProcessor } from "./eventProcessors";
import { WebhookRouter } from "./router";
import { WebhooksQueuesService } from "./adapter/messaging/WebhooksQueuesService";
import { WebhookRequestWorker } from "./adapter/messaging/WebhookRequestWorker";
import { WebhookClientRepository } from "./database/webhookClientRepository";
import { PartialWebhookClients } from "./types";

export enum WebhookTypes {
DepositStatus = "DepositStatus",
Expand All @@ -18,25 +20,38 @@ export enum WebhookTypes {
export type Config = {
enabledWebhooks: WebhookTypes[];
enabledWebhookRequestWorkers: boolean;
clients: PartialWebhookClients;
};
type Dependencies = {
postgres: DataSource;
redis: Redis;
logger: Logger;
};

export function WebhookFactory(config: Config, deps: Dependencies) {
export async function WebhookFactory(config: Config, deps: Dependencies) {
const { logger, postgres, redis } = deps;
const notifier = new WebhookNotifier({ logger });
assert(
config.enabledWebhooks.length,
"No webhooks enabled, specify one in config",
);
const webhooksQueuesService = new WebhooksQueuesService(redis);
const clientRepository = new WebhookClientRepository(postgres);
const eventProcessorManager = new EventProcessorManager({
postgres,
logger,
webhooksQueuesService,
clientRepository,
});
const clientRegistrations = await Promise.all(
config.clients.map((client) => {
return clientRepository.upsertClient(client);
}),
);
logger.info({
message: "Registered webhook api clients",
at: "Webhooks package factory",
clientRegistrations,
});
config.enabledWebhooks.forEach((name) => {
switch (name) {
Expand Down
10 changes: 10 additions & 0 deletions packages/webhooks/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,13 @@ export const UnregisterParams = ss.object({
id: ss.string(),
});
export type UnregisterParams = ss.Infer<typeof UnregisterParams>;

export const PartialWebhookClient = ss.type({
name: ss.string(),
apiKey: ss.string(),
domains: ss.array(ss.string()),
});
export type PartialWebhookClient = ss.Infer<typeof PartialWebhookClient>;

export const PartialWebhookClients = ss.array(PartialWebhookClient);
export type PartialWebhookClients = ss.Infer<typeof PartialWebhookClients>;
10 changes: 9 additions & 1 deletion packages/webhooks/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { NotificationPayload } from "./types";
import { NotificationPayload, PartialWebhookClients } from "./types";
import * as ss from "superstruct";
export async function post(params: NotificationPayload): Promise<void> {
const { url, data, apiKey } = params;
const response = await fetch(url, {
Expand Down Expand Up @@ -42,3 +43,10 @@ export function exists<T>(val: T | null | undefined): val is T {
export function customId(...args: (string | number)[]): string {
return args.join("!");
}

export function parseWebhookClientsFromString(
envStr: string,
): PartialWebhookClients {
const clients = JSON.parse(envStr);
return ss.create(clients, PartialWebhookClients);
}

0 comments on commit f0ca94a

Please sign in to comment.