From 169031a6bb7ddccfc4c490ffb2377651576d45f5 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 25 Jun 2024 22:26:59 +0100 Subject: [PATCH] Support for pg_notify (#102) * Support for pg_notify * Examples using pg_notify * Update submodule * Add tests * Docs and fix unsubscribe --- packages/pglite/README.md | 29 ++++++++ packages/pglite/examples/notify-worker.html | 22 ++++++ packages/pglite/examples/notify.html | 22 ++++++ packages/pglite/src/interface.ts | 12 ++++ packages/pglite/src/pglite.ts | 74 +++++++++++++++++++++ packages/pglite/src/worker/index.ts | 63 +++++++++++++++++- packages/pglite/src/worker/process.ts | 9 ++- packages/pglite/tests/notify.test.js | 43 ++++++++++++ postgres | 2 +- 9 files changed, 273 insertions(+), 3 deletions(-) create mode 100644 packages/pglite/examples/notify-worker.html create mode 100644 packages/pglite/examples/notify.html create mode 100644 packages/pglite/tests/notify.test.js diff --git a/packages/pglite/README.md b/packages/pglite/README.md index a927dd98..0164a375 100644 --- a/packages/pglite/README.md +++ b/packages/pglite/README.md @@ -265,6 +265,35 @@ await pg.transaction(async (tx) => { Close the database, ensuring it is shut down cleanly. +#### `.listen(channel: string, callback: (payload: string) => void): Promise` + +Subscribe to a [pg_notify](https://www.postgresql.org/docs/current/sql-notify.html) channel. The callback will receive the payload from the notification. + +Returns an unsubscribe function to unsubscribe from the channel. + +##### Example: + +```ts +const unsub = await pg.listen('test', (payload) => { + console.log('Received:', payload); +}); +await pg.query("NOTIFY test, 'Hello, world!'"); +``` + +#### `.unlisten(channel: string, callback?: (payload: string) => void): Promise` + +Unsubscribe from the channel. If a callback is provided it removes only that callback from the subscription, when no callback is provided is unsubscribes all callbacks for the channel. + +#### `onNotification(callback: (channel: string, payload: string) => void): () => void` + +Add an event handler for all notifications received from Postgres. + +**Note:** This does not subscribe to the notification, you will have to manually subscribe with `LISTEN channel_name`. + +#### `offNotification(callback: (channel: string, payload: string) => void): void` + +Remove an event handler for all notifications received from Postgres. + ### Properties: - `.ready` *boolean (read only)*: Whether the database is ready to accept queries. diff --git a/packages/pglite/examples/notify-worker.html b/packages/pglite/examples/notify-worker.html new file mode 100644 index 00000000..9b27de68 --- /dev/null +++ b/packages/pglite/examples/notify-worker.html @@ -0,0 +1,22 @@ + diff --git a/packages/pglite/examples/notify.html b/packages/pglite/examples/notify.html new file mode 100644 index 00000000..9a63e55c --- /dev/null +++ b/packages/pglite/examples/notify.html @@ -0,0 +1,22 @@ + diff --git a/packages/pglite/src/interface.ts b/packages/pglite/src/interface.ts index 4e5b46c1..fd5536f0 100644 --- a/packages/pglite/src/interface.ts +++ b/packages/pglite/src/interface.ts @@ -47,6 +47,18 @@ export interface PGliteInterface { message: Uint8Array, options?: ExecProtocolOptions, ): Promise>; + listen( + channel: string, + callback: (payload: string) => void, + ): Promise<() => Promise>; + unlisten( + channel: string, + callback?: (payload: string) => void, + ): Promise; + onNotification( + callback: (channel: string, payload: string) => void, + ): () => void; + offNotification(callback: (channel: string, payload: string) => void): void; } export type Row = T; diff --git a/packages/pglite/src/pglite.ts b/packages/pglite/src/pglite.ts index 392bc586..6dbaa068 100644 --- a/packages/pglite/src/pglite.ts +++ b/packages/pglite/src/pglite.ts @@ -24,6 +24,7 @@ import { DatabaseError, NoticeMessage, CommandCompleteMessage, + NotificationResponseMessage, } from "pg-protocol/dist/messages.js"; export class PGlite implements PGliteInterface { @@ -58,6 +59,11 @@ export class PGlite implements PGliteInterface { // during a query, such as COPY FROM or COPY TO. #queryReadBuffer?: ArrayBuffer; #queryWriteChunks?: Uint8Array[]; + + #notifyListeners = new Map void>>(); + #globalNotifyListeners = new Set< + (channel: string, payload: string) => void + >(); /** * Create a new PGlite instance @@ -545,6 +551,19 @@ export class PGlite implements PGliteInterface { this.#inTransaction = false; break; } + } else if (msg instanceof NotificationResponseMessage) { + // We've received a notification, call the listeners + const listeners = this.#notifyListeners.get(msg.channel); + if (listeners) { + listeners.forEach((cb) => { + // We use queueMicrotask so that the callback is called after any + // synchronous code has finished running. + queueMicrotask(() => cb(msg.payload)); + }); + } + this.#globalNotifyListeners.forEach((cb) => { + queueMicrotask(() => cb(msg.channel, msg.payload)); + }); } results.push([msg, data]); }); @@ -592,4 +611,59 @@ export class PGlite implements PGliteInterface { console.log(...args); } } + + /** + * Listen for a notification + * @param channel The channel to listen on + * @param callback The callback to call when a notification is received + */ + async listen(channel: string, callback: (payload: string) => void) { + if (!this.#notifyListeners.has(channel)) { + this.#notifyListeners.set(channel, new Set()); + } + this.#notifyListeners.get(channel)!.add(callback); + await this.exec(`LISTEN ${channel}`); + return async () => { + await this.unlisten(channel, callback); + }; + } + + /** + * Stop listening for a notification + * @param channel The channel to stop listening on + * @param callback The callback to remove + */ + async unlisten(channel: string, callback?: (payload: string) => void) { + if (callback) { + this.#notifyListeners.get(channel)?.delete(callback); + if (this.#notifyListeners.get(channel)!.size === 0) { + await this.exec(`UNLISTEN ${channel}`); + this.#notifyListeners.delete(channel); + } + } else { + await this.exec(`UNLISTEN ${channel}`); + this.#notifyListeners.delete(channel); + } + } + + /** + * Listen to notifications + * @param callback The callback to call when a notification is received + */ + onNotification( + callback: (channel: string, payload: string) => void, + ): () => void { + this.#globalNotifyListeners.add(callback); + return () => { + this.#globalNotifyListeners.delete(callback); + }; + } + + /** + * Stop listening to notifications + * @param callback The callback to remove + */ + offNotification(callback: (channel: string, payload: string) => void) { + this.#globalNotifyListeners.delete(callback); + } } diff --git a/packages/pglite/src/worker/index.ts b/packages/pglite/src/worker/index.ts index 038059a1..6cbcf600 100644 --- a/packages/pglite/src/worker/index.ts +++ b/packages/pglite/src/worker/index.ts @@ -23,6 +23,11 @@ export class PGliteWorker implements PGliteInterface { #worker: WorkerInterface; #options: PGliteOptions; + #notifyListeners = new Map void>>(); + #globalNotifyListeners = new Set< + (channel: string, payload: string) => void + >(); + constructor(dataDir: string, options?: PGliteOptions) { const { dataDir: dir, fsType } = parseDataDir(dataDir); this.dataDir = dir; @@ -42,7 +47,11 @@ export class PGliteWorker implements PGliteInterface { } async #init(dataDir: string) { - await this.#worker.init(dataDir, this.#options); + await this.#worker.init( + dataDir, + this.#options, + Comlink.proxy(this.receiveNotification.bind(this)), + ); this.#ready = true; } @@ -81,4 +90,56 @@ export class PGliteWorker implements PGliteInterface { ): Promise> { return this.#worker.execProtocol(message); } + + async listen( + channel: string, + callback: (payload: string) => void, + ): Promise<() => Promise> { + if (!this.#notifyListeners.has(channel)) { + this.#notifyListeners.set(channel, new Set()); + } + this.#notifyListeners.get(channel)?.add(callback); + await this.exec(`LISTEN ${channel}`); + return async () => { + await this.unlisten(channel, callback); + }; + } + + async unlisten( + channel: string, + callback?: (payload: string) => void, + ): Promise { + if (callback) { + this.#notifyListeners.get(channel)?.delete(callback); + } else { + this.#notifyListeners.delete(channel); + } + if (this.#notifyListeners.get(channel)?.size === 0) { + // As we currently have a dedicated worker we can just unlisten + await this.exec(`UNLISTEN ${channel}`); + } + } + + onNotification(callback: (channel: string, payload: string) => void) { + this.#globalNotifyListeners.add(callback); + return () => { + this.#globalNotifyListeners.delete(callback); + }; + } + + offNotification(callback: (channel: string, payload: string) => void) { + this.#globalNotifyListeners.delete(callback); + } + + receiveNotification(channel: string, payload: string) { + const listeners = this.#notifyListeners.get(channel); + if (listeners) { + for (const listener of listeners) { + queueMicrotask(() => listener(payload)); + } + } + for (const listener of this.#globalNotifyListeners) { + queueMicrotask(() => listener(channel, payload)); + } + } } diff --git a/packages/pglite/src/worker/process.ts b/packages/pglite/src/worker/process.ts index 7b048a02..d9695b94 100644 --- a/packages/pglite/src/worker/process.ts +++ b/packages/pglite/src/worker/process.ts @@ -5,9 +5,16 @@ import type { PGliteOptions, QueryOptions } from "../interface.js"; let db: PGlite; const worker = { - async init(dataDir?: string, options?: PGliteOptions) { + async init( + dataDir?: string, + options?: PGliteOptions, + onNotification?: (channel: string, payload: string) => void, + ) { db = new PGlite(dataDir, options); await db.waitReady; + if (onNotification) { + db.onNotification(onNotification); + } return true; }, async close() { diff --git a/packages/pglite/tests/notify.test.js b/packages/pglite/tests/notify.test.js new file mode 100644 index 00000000..5e3c1a74 --- /dev/null +++ b/packages/pglite/tests/notify.test.js @@ -0,0 +1,43 @@ +import test from "ava"; +import { PGlite } from "../dist/index.js"; + +test("notify", async (t) => { + const db = new PGlite(); + + await db.listen("test", (payload) => { + t.is(payload, '321'); + }); + + await db.query("NOTIFY test, '321'"); + + await new Promise((resolve) => setTimeout(resolve, 1000)); +}); + +test("unlisten", async (t) => { + const db = new PGlite(); + + const unsub = await db.listen("test", () => { + t.fail(); + }); + + await unsub(); + + await db.query("NOTIFY test"); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + t.pass(); +}); + +test('onNotification', async (t) => { + const db = new PGlite(); + + db.onNotification((chan, payload) => { + t.is(chan, 'test'); + t.is(payload, '123'); + }); + + await db.query("LISTEN test"); + await db.query("NOTIFY test, '123'"); + + await new Promise((resolve) => setTimeout(resolve, 1000)); +}); diff --git a/postgres b/postgres index 6fa3ddda..1ef91d2a 160000 --- a/postgres +++ b/postgres @@ -1 +1 @@ -Subproject commit 6fa3dddad1a303416833b59abd58d45b46c1594b +Subproject commit 1ef91d2a9a370c463edaec0f98ac72cfd0b57647