Skip to content

Commit

Permalink
feat: webhook module v1
Browse files Browse the repository at this point in the history
Signed-off-by: david <david@umaproject.org>
  • Loading branch information
daywiss committed Nov 14, 2024
1 parent 91c3305 commit 7c77181
Show file tree
Hide file tree
Showing 14 changed files with 889 additions and 188 deletions.
3 changes: 3 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
"@across-protocol/constants": "^3.1.20",
"@across-protocol/contracts": "^3.0.16",
"@across-protocol/sdk": "^3.2.2",
"@types/express": "^4.17.21",
"@types/lodash": "^4.17.7",
"bullmq": "^5.12.12",
"ethers": "^5.7.2",
"express": "^4.19.2",
"express-bearer-token": "^3.0.0",
"ioredis": "^5.4.1",
"lodash": "^4.17.21",
"redis": "^4.7.0",
Expand Down
131 changes: 131 additions & 0 deletions packages/indexer/src/webhooks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Webhooks Module

This module provides a comprehensive system for managing webhooks within the indexer package. It includes components for creating, registering, and notifying webhooks, as well as handling notifications and retries.

# Indexer Usage

The `factory.ts` file provides a `WebhookFactory` function that sets up the webhooks system. This function initializes the necessary components and returns an object containing the webhooks manager, express application, and notifier. Here's how you can use it:

### Configuration

To use the `WebhookFactory`, you need to provide a configuration object and dependencies:

- **Config**: This object should include:

- `express`: Configuration for the express server, such as the port number.
- `webhooks`: Configuration for the webhooks, including whether an API key is required.
- `enable`: An array of webhook types to enable, e.g., `["DepositStatus"]`.

- **Dependencies**: This object should include:
- `postgres`: An instance of `DataSource` for database interactions.
- `logger`: An instance of `Logger` for logging purposes.

### Adding an event Example

```js
import { WebhookFactory } from "./webhooks";
import { Logger } from "winston";
import { DataSource } from "@repo/indexer-database";

const config = {
// you need to specify a webhook to enable
enable: ["DepositStatus"],
express: {
port: 3000,
},
webhooks: {
requireApiKey: false,
},
};
const dependencies = {
postgres: Datasource,
logger: Logger,
};
const webhookLib = WebhookFactory(config, dependencies);

// respond to some event in the form:
// type EventType = {
// type:string,
// payload:JSONValue
// }
webhookLib.webhooks.write({
type: "DepositStatus",
payload: {
originChainId,
depositTxHash,
depositId,
status,
},
});
```

# Webhooks API Documentation

This document provides an overview of how to interact with the Webhooks API provided by the indexer package.

## Base URL

The base URL for the webhooks API is determined by the express server configuration. For example, if the server is running on port 3000, the base URL would be:

```
http://localhost:3000
```

## Endpoints

### Register a New Webhook

**Endpoint:** `/webhook`
**Method:** `POST`
**Description:** Register a new webhook to receive notifications.

**Request Body:**

- `type` (string): The type of webhook to register, e.g., `DepositStatus`.
- `url` (string): The URL where notifications should be sent.
- `filters` (object, optional): Any filters to apply to the notifications.
- `access_token` (string, optional): A valid API key if authentication is required.

**Example Request:**

```js
{
"type": "DepositStatus",
"url": "https://example.com/webhook",
"filters": {
originChainId: number,
depositTxHash: string,
},
}
// Example using fetch with API key as Bearer token in header
fetch('http://localhost:3000/webhook', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-api-key'
},
body: JSON.stringify({
type: 'DepositStatus',
url: 'https://example.com/webhook',
filters: {
originChainId: 1,
depositTxHash: '0x123...'
}
})
})
.then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');
}
return response.json();
})
.then(data => console.log('Webhook registered successfully:', data))
.catch(error => console.error('There was a problem with the fetch operation:', error));

```

**Response:**

- `200 OK`: Webhook registered successfully.
- `400 Bad Request`: Missing or invalid parameters.
- `401 Unauthorized`: Invalid API key.
49 changes: 49 additions & 0 deletions packages/indexer/src/webhooks/clients.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { AsyncStore } from "./store";

export interface WebhookClient {
id: string;
apiKey: string;
url: string;
domains: string[];
}

// This class is intended to store integration clients allowed to use the webhook service.
export class WebhookClientManager {
constructor(private store: AsyncStore<WebhookClient>) {}

public async registerClient(client: WebhookClient): Promise<void> {
if (await this.store.has(client.id)) {
throw new Error(`Client with id ${client.id} already exists.`);
}
await this.store.set(client.id, client);
}

public async unregisterClient(clientId: string): Promise<void> {
if (!(await this.store.has(clientId))) {
throw new Error(`Client with id ${clientId} does not exist.`);
}
await this.store.delete(clientId);
}

public async getClient(clientId: string): Promise<WebhookClient | undefined> {
return this.store.get(clientId);
}

public async listClients(): Promise<WebhookClient[]> {
const clients: WebhookClient[] = [];
for await (const client of this.store.values()) {
clients.push(client);
}
return clients;
}

public async findClientsByApiKey(apiKey: string): Promise<WebhookClient[]> {
const clients: WebhookClient[] = [];
for await (const client of this.store.values()) {
if (client.apiKey === apiKey) {
clients.push(client);
}
}
return clients;
}
}
83 changes: 83 additions & 0 deletions packages/indexer/src/webhooks/express.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import assert from "assert";
import express from "express";
import { Webhooks } from "./webhooks";
import * as ss from "superstruct";
import bearerToken from "express-bearer-token";

type Config = {
port?: number;
};

type Dependencies = {
webhooks: Webhooks;
};

const RegistrationParams = ss.object({
type: ss.string(),
url: ss.string(),
filter: ss.record(ss.string(), ss.any()),
});
const UnregisterParams = ss.object({
type: ss.string(),
id: ss.string(),
});

export function ExpressApp(
config: Config,
deps: Dependencies,
): express.Application {
const app = express();
const port = config.port ?? 3000;

app.use(express.json());
app.use(bearerToken());

app.post(
"/webhook",
async (
req: express.Request & { token?: string },
res: express.Response,
) => {
try {
const parsedBody = RegistrationParams.create(req.body);
const id = await deps.webhooks.registerWebhook(parsedBody, req.token);
res.status(201).send(id);
} catch (error) {
res.status(400).send((error as Error).message);
}
},
);

app.delete(
"/webhook/:id",
async (
req: express.Request & { token?: string },
res: express.Response,
) => {
try {
const parsedBody = UnregisterParams.create(req.body);
await deps.webhooks.unregisterWebhook(parsedBody, req.token);
res.status(204).send();
} catch (error) {
res.status(400).send((error as Error).message);
}
},
);

app.get("/", (req, res) => {
res.send("Webhook server running");
});

app.use(
(
err: Error,
req: express.Request,
res: express.Response,
next: express.NextFunction,
) => {
res.status(500).send("Something went wrong!");
},
);

return app;
}
59 changes: 59 additions & 0 deletions packages/indexer/src/webhooks/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import assert from "assert";
import { Webhooks } from "./webhooks";
import { MemoryStore } from "./store";
import { ExpressApp } from "./express";
import { DataSource } from "@repo/indexer-database";
import { Logger } from "winston";
import { WebhookNotifier } from "./notifier";
import { DepositStatusWebhook } from "./webhook";
import { WebhookRequests } from "./webhookRequests";

type Config = {
express?: {
port: number;
};
webhooks?: {
requireApiKey: boolean;
};
enable: string[];
};
type Dependencies = {
postgres: DataSource;
logger: Logger;
};

export function WebhookFactory(config: Config, deps: Dependencies) {
const { logger, postgres } = deps;
const notifier = new WebhookNotifier({
logger,
pending: new MemoryStore(),
completed: new MemoryStore(),
});
assert(config.enable.length, "No webhooks enabled, specify one in config");
const webhooks = new Webhooks(config?.webhooks ?? { requireApiKey: false }, {
postgres,
logger,
});
config.enable.forEach((name) => {
const hooks = new WebhookRequests(new MemoryStore());
switch (name) {
// add more webhook types here
case "DepositStatus": {
webhooks.registerWebhookProcessor(
name,
new DepositStatusWebhook({
postgres,
hooks,
notify: notifier.notify,
}),
);
break;
}
default: {
throw new Error(`Unhandled webhook type: ${name}`);
}
}
});
const express = ExpressApp(config?.express ?? { port: 3000 }, { webhooks });
return { webhooks, express, notifier };
}
6 changes: 6 additions & 0 deletions packages/indexer/src/webhooks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export * from "./factory";
export * as webhooks from "./webhooks";
export * as webhook from "./webhook";
export * as express from "./express";
export * from "./types";
export * from "./utils";
Loading

0 comments on commit 7c77181

Please sign in to comment.