Skip to content

Commit

Permalink
feat(webhooks): add postgres persistence to clients and requests
Browse files Browse the repository at this point in the history
Signed-off-by: david <david@umaproject.org>
  • Loading branch information
daywiss committed Nov 20, 2024
1 parent 2217ea3 commit 5c308e0
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 210 deletions.
16 changes: 16 additions & 0 deletions packages/indexer-database/src/entities/WebhookClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Entity, PrimaryColumn, Column } from "typeorm";

@Entity()
export class WebhookClient {
@Column()
name: string;

@PrimaryColumn()
id: string;

@Column()
apiKey: string;

@Column("simple-array")
domains: string[];
}
13 changes: 13 additions & 0 deletions packages/indexer-database/src/entities/WebhookRequest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Entity, PrimaryColumn, Column } from "typeorm";

@Entity()
export class WebhookRequest {
@PrimaryColumn()
id: string;

@Column()
url: string;

@Column()
filter: string;
}
3 changes: 3 additions & 0 deletions packages/indexer-database/src/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ export * from "./BundleEvent";
export * from "./BundleBlockRange";
export * from "./RootBundleExecutedJoinTable";
export * from "./RelayHashInfo";

export * from "./WebhookRequest";
export * from "./WebhookClient";
3 changes: 2 additions & 1 deletion packages/webhooks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"express": "^4.19.2",
"express-bearer-token": "^3.0.0",
"redis": "^4.7.0",
"superstruct": "2.0.3-1"
"superstruct": "2.0.3-1",
"typeorm": "^0.3.20"
},
"exports": {
".": "./dist/index.js"
Expand Down
2 changes: 2 additions & 0 deletions packages/webhooks/src/database/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./webhookRequestRepository";
export * from "./webhookClientRepository";
60 changes: 30 additions & 30 deletions packages/webhooks/src/database/webhookClientRepository.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,49 @@
import { AsyncStore } from "../store";

export interface WebhookClient {
id: string;
apiKey: string;
url: string;
domains: string[];
}
import { DataSource } from "typeorm";
import { entities } from "@repo/indexer-database";

// This class is intended to store integration clients allowed to use the webhook service.
export class WebhookClientRepository {
constructor(private store: AsyncStore<WebhookClient>) {}
private repository;

constructor(private dataSource: DataSource) {
this.repository = this.dataSource.getRepository(entities.WebhookClient);
}

public async registerClient(client: WebhookClient): Promise<void> {
if (await this.store.has(client.id)) {
public async registerClient(client: entities.WebhookClient): Promise<void> {
const existingClient = await this.repository.findOne({
where: { id: client.id },
});
if (existingClient) {
throw new Error(`Client with id ${client.id} already exists.`);
}
await this.store.set(client.id, client);
await this.repository.save(client);
}

public async unregisterClient(clientId: string): Promise<void> {
if (!(await this.store.has(clientId))) {
const existingClient = await this.repository.findOne({
where: { id: clientId },
});
if (!existingClient) {
throw new Error(`Client with id ${clientId} does not exist.`);
}
await this.store.delete(clientId);
await this.repository.delete({ id: clientId });
}

public async getClient(clientId: string): Promise<WebhookClient | undefined> {
return this.store.get(clientId);
public async getClient(
clientId: string,
): Promise<entities.WebhookClient | undefined> {
return (
(await this.repository.findOne({ where: { id: clientId } })) ?? undefined
);
}

public async listClients(): Promise<WebhookClient[]> {
const clients: WebhookClient[] = [];
for await (const client of this.store.values()) {
clients.push(client);
}
return clients;
public async listClients(): Promise<entities.WebhookClient[]> {
return this.repository.find();
}

public async findClientsByApiKey(apiKey: string): Promise<WebhookClient[]> {
const clients: WebhookClient[] = [];
for await (const client of this.store.values()) {
if (client.apiKey === apiKey) {
clients.push(client);
}
}
return clients;
public async findClientsByApiKey(
apiKey: string,
): Promise<entities.WebhookClient[]> {
return this.repository.find({ where: { apiKey } });
}
}
44 changes: 24 additions & 20 deletions packages/webhooks/src/database/webhookRequestRepository.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,52 @@
import { AsyncStore } from "../store";
import { DataSource } from "typeorm";
import { WebhookRequest } from "../types";
import { entities } from "@repo/indexer-database";

export class WebhookRequestRepository {
constructor(private store: AsyncStore<WebhookRequest>) {}
private repository;

constructor(private dataSource: DataSource) {
this.repository = this.dataSource.getRepository(entities.WebhookRequest);
}

public async register(webhook: WebhookRequest): Promise<void> {
if (await this.store.has(webhook.id)) {
const existingWebhook = await this.repository.findOne({
where: { id: webhook.id },
});
if (existingWebhook) {
throw new Error(`Webhook with id ${webhook.id} already exists.`);
}
await this.store.set(webhook.id, webhook);
await this.repository.save(webhook);
}

public async unregister(webhookId: string): Promise<void> {
if (!(await this.store.has(webhookId))) {
const existingWebhook = await this.repository.findOne({
where: { id: webhookId },
});
if (!existingWebhook) {
throw new Error(`Webhook with id ${webhookId} does not exist.`);
}
await this.store.delete(webhookId);
await this.repository.delete({ id: webhookId });
}

public async getWebhook(
webhookId: string,
): Promise<WebhookRequest | undefined> {
return this.store.get(webhookId);
return (
(await this.repository.findOne({ where: { id: webhookId } })) ?? undefined
);
}

public async listWebhooks(): Promise<WebhookRequest[]> {
const webhooks: WebhookRequest[] = [];
for await (const webhook of this.store.values()) {
webhooks.push(webhook);
}
return webhooks;
return this.repository.find();
}

public async filterWebhooks(filter: string): Promise<WebhookRequest[]> {
const webhooks: WebhookRequest[] = [];
for await (const webhook of this.store.values()) {
if (webhook.filter === filter) {
webhooks.push(webhook);
}
}
return webhooks;
return this.repository.find({ where: { filter } });
}

public async hasWebhook(webhookId: string): Promise<boolean> {
return this.store.has(webhookId);
const count = await this.repository.count({ where: { id: webhookId } });
return count > 0;
}
}
34 changes: 23 additions & 11 deletions packages/webhooks/src/eventProcessorManager.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import { MemoryStore } from "./store";
import {
WebhookClientRepository,
WebhookClient,
} from "./database/webhookClientRepository";
import { WebhookClientRepository } from "./database/webhookClientRepository";
import { DataSource, entities } from "@repo/indexer-database";
import { Logger } from "winston";
import assert from "assert";
Expand Down Expand Up @@ -33,15 +29,21 @@ export class EventProcessorManager {
deps: Dependencies,
) {
this.logger = deps.logger;
this.clientRepository = new WebhookClientRepository(new MemoryStore()); // Initialize the client manager
this.clientRepository = new WebhookClientRepository(deps.postgres); // Initialize the client manager
}
// Register a new type of webhook processor able to be written to
public registerEventProcessor(name: string, webhook: IEventProcessor) {
this.logger.info(
`Attempting to register event processor with name: ${name}`,
);
assert(
!this.processors.has(name),
`Webhook with that name already exists: ${name}`,
);
this.processors.set(name, webhook);
this.logger.info(
`Successfully registered event processor with name: ${name}`,
);
}

private getEventProcessor(name: string) {
Expand All @@ -61,6 +63,9 @@ export class EventProcessorManager {
params: { type: string; url: string; filter: JSONValue },
apiKey?: string,
) {
this.logger.info(
`Attempting to register webhook of type: ${params.type} with URL: ${params.url}`,
);
if (this.config.requireApiKey) {
if (apiKey === undefined) throw new Error("Api Key required");
const clients = await this.clientRepository.findClientsByApiKey(apiKey);
Expand All @@ -79,20 +84,27 @@ export class EventProcessorManager {
}
}
const webhook = this.getEventProcessor(params.type);
return webhook.register(params.url, params.filter);
const result = await webhook.register(params.url, params.filter);
this.logger.info(`Successfully registered webhook with ID: ${result}`);
return result;
}

// TODO: gaurd this with api key
async unregisterWebhook(
params: { type: string; id: string },
apiKey?: string,
) {
// Assuming the IWebhook interface has an unregister method
this.logger.info(
`Attempting to unregister webhook of type: ${params.type} with ID: ${params.id}`,
);
const webhook = this.getEventProcessor(params.type);
return webhook.unregister(params.id);
await webhook.unregister(params.id);
this.logger.info(`Successfully unregistered webhook with ID: ${params.id}`);
}

async registerClient(client: WebhookClient) {
return this.clientRepository.registerClient(client);
async registerClient(client: entities.WebhookClient) {
this.logger.info(`Attempting to register client with ID: ${client.id}`);
await this.clientRepository.registerClient(client);
this.logger.info(`Successfully registered client with ID: ${client.id}`);
}
}
36 changes: 24 additions & 12 deletions packages/webhooks/src/eventProcessors/depositStatus.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import assert from "assert";
import * as ss from "superstruct";

import { DataSource, entities } from "@repo/indexer-database";
import { WebhookRequestRepository } from "../database/webhookRequestRepository";
import {
JSONValue,
IEventProcessor,
NotificationPayload,
WebhookRequest,
} from "../types";
import { customId } from "../utils";

import * as ss from "superstruct";
import { IEventProcessor, NotificationPayload } from "../types";

export const DepositStatusEvent = ss.object({
originChainId: ss.number(),
Expand All @@ -25,22 +22,28 @@ export const DepositStatusFilter = ss.object({
export type DepositStatusFilter = ss.Infer<typeof DepositStatusFilter>;

export type Dependencies = {
webhookRequests: WebhookRequestRepository;
notify: (params: NotificationPayload) => void;
postgres: DataSource;
};
export class DepositStatusProcessor implements IEventProcessor {
private webhookRequests: WebhookRequestRepository;
private notify: (params: NotificationPayload) => void;
private postgres: DataSource;
// Type shoudl be uniqe across all event processors, this is to avoid colliding with multiple
// processors writing to the same tables
public type = "DepositStatus";

constructor(deps: Dependencies) {
this.webhookRequests = deps.webhookRequests;
this.webhookRequests = new WebhookRequestRepository(deps.postgres);
this.notify = deps.notify;
this.postgres = deps.postgres;
}
private async _write(event: DepositStatusEvent): Promise<void> {
const filter = [event.originChainId, event.depositTxHash].join("!");
const filter = customId(
this.type,
event.originChainId,
event.depositTxHash,
);
const hooks = await this.webhookRequests.filterWebhooks(filter);
//TODO: unregister any hooks where event has reached terminal state
await Promise.all(
Expand All @@ -61,8 +64,17 @@ export class DepositStatusProcessor implements IEventProcessor {
url: string,
params: DepositStatusFilter,
): Promise<string> {
const id = [url, params.originChainId, params.depositTxHash].join("!");
const filter = [params.originChainId, params.depositTxHash].join("!");
const id = customId(
this.type,
url,
params.originChainId,
params.depositTxHash,
);
const filter = customId(
this.type,
params.originChainId,
params.depositTxHash,
);
assert(
!(await this.webhookRequests.hasWebhook(id)),
"This webhook already exists",
Expand Down
4 changes: 1 addition & 3 deletions packages/webhooks/src/factory.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import assert from "assert";
import { EventProcessorManager } from "./eventProcessorManager";
import { MemoryStore } from "./store";
import { DataSource } from "@repo/indexer-database";
import { Logger } from "winston";
import { WebhookNotifier } from "./notifier";
import { DepositStatusProcessor } from "./eventProcessors";
import { WebhookRequestRepository } from "./database/webhookRequestRepository";
import { WebhookRouter } from "./router";
import { entities } from "@repo/indexer-database";

export enum WebhookTypes {
DepositStatus = "DepositStatus",
Expand Down Expand Up @@ -36,15 +36,13 @@ export function WebhookFactory(config: Config, deps: Dependencies) {
},
);
config.enabledWebhooks.forEach((name) => {
const hooks = new WebhookRequestRepository(new MemoryStore());
switch (name) {
// add more webhook types here
case "DepositStatus": {
eventProcessorManager.registerEventProcessor(
name,
new DepositStatusProcessor({
postgres,
webhookRequests: hooks,
notify: notifier.notify,
}),
);
Expand Down
1 change: 0 additions & 1 deletion packages/webhooks/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ export * from "./factory";
export * as eventProcessors from "./eventProcessors";
export * as eventProcessorManager from "./eventProcessorManager";
export * as router from "./router";
export * as store from "./store";
export * from "./types";
export * from "./utils";
1 change: 0 additions & 1 deletion packages/webhooks/src/notifier.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { post } from "./utils";
import { NotificationPayload } from "./types";
import { AsyncStore } from "./store";
import { Logger } from "winston";

export type Dependencies = {
Expand Down
Loading

0 comments on commit 5c308e0

Please sign in to comment.