Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhooks): allow client registration through env #118

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/indexer-api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ export async function Main(
const postgres = await connectToDatabase(postgresConfig, logger);
const redisConfig = Indexer.parseRedisConfig(env);
const redis = await initializeRedis(redisConfig, logger);
const webhooks = Webhooks.WebhookFactory(
const webhooks = await Webhooks.WebhookFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually a factory is a class that has a builder function on it. Is this Factory a function?

{
enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: false,
// indexer will register clients
clients: [],
},
{ postgres, logger, redis },
);
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-database/src/entities/WebhookClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm";

@Entity()
@Unique("UK_webhook_client_api_key", ["apiKey"])
@Unique("UK_webhook_client_name", ["name"])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wnated to make a unique constraint for name, i dont know if i did this right because i have no idea what UK_webhook means

export class WebhookClient {
@PrimaryGeneratedColumn()
id: number;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class Webhook1732730161339 implements MigrationInterface {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did the migration after adding uniquye name constraint, again dont know if this is correct

name = "Webhook1732730161339";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "webhook_client" ADD CONSTRAINT "UQ_a08bea1c3eba7711301141ae001" UNIQUE ("name")`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "webhook_client" DROP CONSTRAINT "UQ_a08bea1c3eba7711301141ae001"`,
);
}
}
12 changes: 5 additions & 7 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const redisCache = new RedisCache(redis);
const postgres = await connectToDatabase(postgresConfig, logger);
// Call write to kick off webhook calls
const { write } = WebhookFactory(
{
enabledWebhooks: [WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: true,
},
{ postgres, logger, redis },
);
const { write } = await WebhookFactory(config.webhookConfig, {
postgres,
logger,
redis,
});
// Retry providers factory
const retryProvidersFactory = new RetryProvidersFactory(
redisCache,
Expand Down
12 changes: 12 additions & 0 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import * as s from "superstruct";
import { DatabaseConfig } from "@repo/indexer-database";
import { getNoTtlBlockDistance } from "./web3/constants";
import { assert } from "@repo/error-handling";
import {
Config as WebhooksConfig,
WebhookTypes,
parseWebhookClientsFromString,
} from "@repo/webhooks";

export type Config = {
redisConfig: RedisConfig;
Expand All @@ -12,6 +17,7 @@ export type Config = {
enableBundleEventsProcessor: boolean;
enableBundleIncludedEventsService: boolean;
enableBundleBuilder: boolean;
webhookConfig: WebhooksConfig;
};
export type RedisConfig = {
host: string;
Expand Down Expand Up @@ -182,6 +188,11 @@ export function envToConfig(env: Env): Config {
`SPOKEPOOL_CHAINS_ENABLED=${chainId} but did not find any corresponding RPC_PROVIDER_URLS_${chainId}`,
);
});
const webhookConfig = {
enabledWebhooks: [WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: true,
clients: parseWebhookClientsFromString(env.WEBHOOK_CLIENTS ?? "[]"),
};
return {
redisConfig,
postgresConfig,
Expand All @@ -191,5 +202,6 @@ export function envToConfig(env: Env): Config {
enableBundleEventsProcessor,
enableBundleIncludedEventsService,
enableBundleBuilder,
webhookConfig,
};
}
43 changes: 34 additions & 9 deletions packages/webhooks/src/database/webhookClientRepository.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { entities, DataSource } from "@repo/indexer-database";
import { exists } from "../utils";
import assert from "assert";

// This class is intended to store integration clients allowed to use the webhook service.
Expand All @@ -9,14 +10,30 @@ export class WebhookClientRepository {
this.repository = this.dataSource.getRepository(entities.WebhookClient);
}

public async registerClient(client: entities.WebhookClient): Promise<void> {
const existingClient = await this.repository.findOne({
where: { id: client.id },
});
if (existingClient) {
throw new Error(`Client with id ${client.id} already exists.`);
public async registerClient(
client: Omit<entities.WebhookClient, "id">,
): Promise<entities.WebhookClient> {
assert(
!(await this.hasClientByName(client.name)),
"Client with that name already exists",
);
const result = await this.repository.insert(client);
return result.raw[0];
}
public async upsertClient(
client: Omit<entities.WebhookClient, "id">,
): Promise<entities.WebhookClient> {
if (await this.hasClientByName(client.name)) {
return this.updateClientByName(client);
} else {
return this.registerClient(client);
}
await this.repository.insert(client);
}
public async updateClientByName(
client: Omit<entities.WebhookClient, "id">,
): Promise<entities.WebhookClient> {
const result = await this.repository.update({ name: client.name }, client);
return result.raw[0];
}

public async unregisterClient(clientId: number): Promise<void> {
Expand All @@ -40,12 +57,20 @@ export class WebhookClientRepository {
public async listClients(): Promise<entities.WebhookClient[]> {
return this.repository.find();
}

public async hasClientByName(name: string): Promise<boolean> {
const result = await this.repository.findOne({ where: { name } });
return exists(result);
}
public async getClientByName(name: string): Promise<entities.WebhookClient> {
const result = await this.repository.findOne({ where: { name } });
assert(result, `Client by name: ${name} does not exist`);
return result;
}
public async getClientByApiKey(
apiKey: string,
): Promise<entities.WebhookClient> {
const result = await this.repository.findOne({ where: { apiKey } });
assert(result, "Invalid api key");
assert(result, `Client by apiKey: ${apiKey} does not exist`);
return result;
}

Expand Down
22 changes: 9 additions & 13 deletions packages/webhooks/src/eventProcessorManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export type Dependencies = {
postgres: DataSource;
logger: Logger;
webhooksQueuesService: WebhooksQueuesService;
clientRepository: WebhookClientRepository;
};
export class EventProcessorManager {
private logger: Logger;
Expand All @@ -38,8 +39,8 @@ export class EventProcessorManager {

constructor(deps: Dependencies) {
this.logger = deps.logger;
this.clientRepository = new WebhookClientRepository(deps.postgres); // Initialize the client manager
this.webhooksQueuesService = deps.webhooksQueuesService;
this.clientRepository = deps.clientRepository;
}

// Register a new type of webhook processor able to be written to
Expand Down Expand Up @@ -76,12 +77,13 @@ export class EventProcessorManager {
`Attempting to register webhook of type: ${params.type} with URL: ${params.url}`,
);
const client = await this.clientRepository.getClientByApiKey(apiKey);
const urlDomain = new URL(params.url).hostname;
const isDomainValid = client.domains.includes(urlDomain);
assert(
isDomainValid,
"The base URL of the provided webhook does not match any of the client domains",
);
// TODO: Reinable this potentially when we need it, but not great for testing
// const urlDomain = new URL(params.url).hostname;
// const isDomainValid = client.domains.includes(urlDomain);
// assert(
// isDomainValid,
// "The base URL of the provided webhook does not match any of the client domains",
// );
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed domain checking for now

assert((params.filter as any).depositTxHash, "depositTxHash is required");
assert((params.filter as any).originChainId, "originChainId is required");
const webhook = this.getEventProcessor(params.type as WebhookTypes);
Expand Down Expand Up @@ -119,10 +121,4 @@ export class EventProcessorManager {
`Successfully unregistered webhook with ID: ${params.id}`,
);
}

async registerClient(client: entities.WebhookClient) {
this.logger.debug(`Attempting to register client with ID: ${client.id}`);
await this.clientRepository.registerClient(client);
this.logger.debug(`Successfully registered client with ID: ${client.id}`);
}
}
19 changes: 17 additions & 2 deletions packages/webhooks/src/factory.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Logger } from "winston";
import { Redis } from "ioredis";

import { DataSource } from "@repo/indexer-database";
import { DataSource, entities } from "@repo/indexer-database";
import { assert } from "@repo/error-handling";

import { EventProcessorManager } from "./eventProcessorManager";
Expand All @@ -10,6 +10,8 @@ import { DepositStatusProcessor } from "./eventProcessors";
import { WebhookRouter } from "./router";
import { WebhooksQueuesService } from "./adapter/messaging/WebhooksQueuesService";
import { WebhookRequestWorker } from "./adapter/messaging/WebhookRequestWorker";
import { WebhookClientRepository } from "./database/webhookClientRepository";
import { PartialWebhookClients } from "./types";

export enum WebhookTypes {
DepositStatus = "DepositStatus",
Expand All @@ -18,25 +20,38 @@ export enum WebhookTypes {
export type Config = {
enabledWebhooks: WebhookTypes[];
enabledWebhookRequestWorkers: boolean;
clients: PartialWebhookClients;
};
type Dependencies = {
postgres: DataSource;
redis: Redis;
logger: Logger;
};

export function WebhookFactory(config: Config, deps: Dependencies) {
export async function WebhookFactory(config: Config, deps: Dependencies) {
const { logger, postgres, redis } = deps;
const notifier = new WebhookNotifier({ logger });
assert(
config.enabledWebhooks.length,
"No webhooks enabled, specify one in config",
);
const webhooksQueuesService = new WebhooksQueuesService(redis);
const clientRepository = new WebhookClientRepository(postgres);
const eventProcessorManager = new EventProcessorManager({
postgres,
logger,
webhooksQueuesService,
clientRepository,
});
const clientRegistrations = await Promise.all(
config.clients.map((client) => {
return clientRepository.upsertClient(client);
}),
);
logger.info({
message: "Registered webhook api clients",
at: "Webhooks package factory",
clientRegistrations,
});
config.enabledWebhooks.forEach((name) => {
switch (name) {
Expand Down
10 changes: 10 additions & 0 deletions packages/webhooks/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,13 @@ export const UnregisterParams = ss.object({
id: ss.string(),
});
export type UnregisterParams = ss.Infer<typeof UnregisterParams>;

export const PartialWebhookClient = ss.type({
name: ss.string(),
apiKey: ss.string(),
domains: ss.array(ss.string()),
});
export type PartialWebhookClient = ss.Infer<typeof PartialWebhookClient>;

export const PartialWebhookClients = ss.array(PartialWebhookClient);
export type PartialWebhookClients = ss.Infer<typeof PartialWebhookClients>;
10 changes: 9 additions & 1 deletion packages/webhooks/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { NotificationPayload } from "./types";
import { NotificationPayload, PartialWebhookClients } from "./types";
import * as ss from "superstruct";
export async function post(params: NotificationPayload): Promise<void> {
const { url, data, apiKey } = params;
const response = await fetch(url, {
Expand Down Expand Up @@ -42,3 +43,10 @@ export function exists<T>(val: T | null | undefined): val is T {
export function customId(...args: (string | number)[]): string {
return args.join("!");
}

export function parseWebhookClientsFromString(
envStr: string,
): PartialWebhookClients {
const clients = JSON.parse(envStr);
return ss.create(clients, PartialWebhookClients);
}