Skip to content

Commit

Permalink
Support for pg_notify (#102)
Browse files Browse the repository at this point in the history
* Support for pg_notify

* Examples using pg_notify

* Update submodule

* Add tests

* Docs and fix unsubscribe
  • Loading branch information
samwillis authored Jun 25, 2024
1 parent 5b91370 commit 169031a
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 3 deletions.
29 changes: 29 additions & 0 deletions packages/pglite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,35 @@ await pg.transaction(async (tx) => {

Close the database, ensuring it is shut down cleanly.

#### `.listen(channel: string, callback: (payload: string) => void): Promise<void>`

Subscribe to a [pg_notify](https://www.postgresql.org/docs/current/sql-notify.html) channel. The callback will receive the payload from the notification.

Returns an unsubscribe function to unsubscribe from the channel.

##### Example:

```ts
const unsub = await pg.listen('test', (payload) => {
console.log('Received:', payload);
});
await pg.query("NOTIFY test, 'Hello, world!'");
```

#### `.unlisten(channel: string, callback?: (payload: string) => void): Promise<void>`

Unsubscribe from the channel. If a callback is provided it removes only that callback from the subscription, when no callback is provided is unsubscribes all callbacks for the channel.

#### `onNotification(callback: (channel: string, payload: string) => void): () => void`

Add an event handler for all notifications received from Postgres.

**Note:** This does not subscribe to the notification, you will have to manually subscribe with `LISTEN channel_name`.

#### `offNotification(callback: (channel: string, payload: string) => void): void`

Remove an event handler for all notifications received from Postgres.

### Properties:

- `.ready` *boolean (read only)*: Whether the database is ready to accept queries.
Expand Down
22 changes: 22 additions & 0 deletions packages/pglite/examples/notify-worker.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<script type="module">
import { PGliteWorker } from "../dist/worker/index.js";

const pg = new PGliteWorker();

const unsub = await pg.listen('test', (payload) => {
console.log('Received:', payload);
});

await new Promise((resolve) => setTimeout(resolve, 500));

await pg.query('NOTIFY test, \'Hello, world!\'');

await new Promise((resolve) => setTimeout(resolve, 500));

await pg.query('NOTIFY test, \'Hello, world again!\'');

await unsub();

await pg.query('NOTIFY test, \'Will not be received!\'');

</script>
22 changes: 22 additions & 0 deletions packages/pglite/examples/notify.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<script type="module">
import { PGlite } from "../dist/index.js";

const pg = new PGlite();

const unsub = await pg.listen('test', (payload) => {
console.log('Received:', payload);
});

await new Promise((resolve) => setTimeout(resolve, 500));

await pg.query('NOTIFY test, \'Hello, world!\'');

await new Promise((resolve) => setTimeout(resolve, 500));

await pg.query('NOTIFY test, \'Hello, world again!\'');

await unsub();

await pg.query('NOTIFY test, \'Will not be received!\'');

</script>
12 changes: 12 additions & 0 deletions packages/pglite/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ export interface PGliteInterface {
message: Uint8Array,
options?: ExecProtocolOptions,
): Promise<Array<[BackendMessage, Uint8Array]>>;
listen(
channel: string,
callback: (payload: string) => void,
): Promise<() => Promise<void>>;
unlisten(
channel: string,
callback?: (payload: string) => void,
): Promise<void>;
onNotification(
callback: (channel: string, payload: string) => void,
): () => void;
offNotification(callback: (channel: string, payload: string) => void): void;
}

export type Row<T = { [key: string]: any }> = T;
Expand Down
74 changes: 74 additions & 0 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
DatabaseError,
NoticeMessage,
CommandCompleteMessage,
NotificationResponseMessage,
} from "pg-protocol/dist/messages.js";

export class PGlite implements PGliteInterface {
Expand Down Expand Up @@ -58,6 +59,11 @@ export class PGlite implements PGliteInterface {
// during a query, such as COPY FROM or COPY TO.
#queryReadBuffer?: ArrayBuffer;
#queryWriteChunks?: Uint8Array[];

#notifyListeners = new Map<string, Set<(payload: string) => void>>();
#globalNotifyListeners = new Set<
(channel: string, payload: string) => void
>();

/**
* Create a new PGlite instance
Expand Down Expand Up @@ -545,6 +551,19 @@ export class PGlite implements PGliteInterface {
this.#inTransaction = false;
break;
}
} else if (msg instanceof NotificationResponseMessage) {
// We've received a notification, call the listeners
const listeners = this.#notifyListeners.get(msg.channel);
if (listeners) {
listeners.forEach((cb) => {
// We use queueMicrotask so that the callback is called after any
// synchronous code has finished running.
queueMicrotask(() => cb(msg.payload));
});
}
this.#globalNotifyListeners.forEach((cb) => {
queueMicrotask(() => cb(msg.channel, msg.payload));
});
}
results.push([msg, data]);
});
Expand Down Expand Up @@ -592,4 +611,59 @@ export class PGlite implements PGliteInterface {
console.log(...args);
}
}

/**
* Listen for a notification
* @param channel The channel to listen on
* @param callback The callback to call when a notification is received
*/
async listen(channel: string, callback: (payload: string) => void) {
if (!this.#notifyListeners.has(channel)) {
this.#notifyListeners.set(channel, new Set());
}
this.#notifyListeners.get(channel)!.add(callback);
await this.exec(`LISTEN ${channel}`);
return async () => {
await this.unlisten(channel, callback);
};
}

/**
* Stop listening for a notification
* @param channel The channel to stop listening on
* @param callback The callback to remove
*/
async unlisten(channel: string, callback?: (payload: string) => void) {
if (callback) {
this.#notifyListeners.get(channel)?.delete(callback);
if (this.#notifyListeners.get(channel)!.size === 0) {
await this.exec(`UNLISTEN ${channel}`);
this.#notifyListeners.delete(channel);
}
} else {
await this.exec(`UNLISTEN ${channel}`);
this.#notifyListeners.delete(channel);
}
}

/**
* Listen to notifications
* @param callback The callback to call when a notification is received
*/
onNotification(
callback: (channel: string, payload: string) => void,
): () => void {
this.#globalNotifyListeners.add(callback);
return () => {
this.#globalNotifyListeners.delete(callback);
};
}

/**
* Stop listening to notifications
* @param callback The callback to remove
*/
offNotification(callback: (channel: string, payload: string) => void) {
this.#globalNotifyListeners.delete(callback);
}
}
63 changes: 62 additions & 1 deletion packages/pglite/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ export class PGliteWorker implements PGliteInterface {
#worker: WorkerInterface;
#options: PGliteOptions;

#notifyListeners = new Map<string, Set<(payload: string) => void>>();
#globalNotifyListeners = new Set<
(channel: string, payload: string) => void
>();

constructor(dataDir: string, options?: PGliteOptions) {
const { dataDir: dir, fsType } = parseDataDir(dataDir);
this.dataDir = dir;
Expand All @@ -42,7 +47,11 @@ export class PGliteWorker implements PGliteInterface {
}

async #init(dataDir: string) {
await this.#worker.init(dataDir, this.#options);
await this.#worker.init(
dataDir,
this.#options,
Comlink.proxy(this.receiveNotification.bind(this)),
);
this.#ready = true;
}

Expand Down Expand Up @@ -81,4 +90,56 @@ export class PGliteWorker implements PGliteInterface {
): Promise<Array<[BackendMessage, Uint8Array]>> {
return this.#worker.execProtocol(message);
}

async listen(
channel: string,
callback: (payload: string) => void,
): Promise<() => Promise<void>> {
if (!this.#notifyListeners.has(channel)) {
this.#notifyListeners.set(channel, new Set());
}
this.#notifyListeners.get(channel)?.add(callback);
await this.exec(`LISTEN ${channel}`);
return async () => {
await this.unlisten(channel, callback);
};
}

async unlisten(
channel: string,
callback?: (payload: string) => void,
): Promise<void> {
if (callback) {
this.#notifyListeners.get(channel)?.delete(callback);
} else {
this.#notifyListeners.delete(channel);
}
if (this.#notifyListeners.get(channel)?.size === 0) {
// As we currently have a dedicated worker we can just unlisten
await this.exec(`UNLISTEN ${channel}`);
}
}

onNotification(callback: (channel: string, payload: string) => void) {
this.#globalNotifyListeners.add(callback);
return () => {
this.#globalNotifyListeners.delete(callback);
};
}

offNotification(callback: (channel: string, payload: string) => void) {
this.#globalNotifyListeners.delete(callback);
}

receiveNotification(channel: string, payload: string) {
const listeners = this.#notifyListeners.get(channel);
if (listeners) {
for (const listener of listeners) {
queueMicrotask(() => listener(payload));
}
}
for (const listener of this.#globalNotifyListeners) {
queueMicrotask(() => listener(channel, payload));
}
}
}
9 changes: 8 additions & 1 deletion packages/pglite/src/worker/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@ import type { PGliteOptions, QueryOptions } from "../interface.js";
let db: PGlite;

const worker = {
async init(dataDir?: string, options?: PGliteOptions) {
async init(
dataDir?: string,
options?: PGliteOptions,
onNotification?: (channel: string, payload: string) => void,
) {
db = new PGlite(dataDir, options);
await db.waitReady;
if (onNotification) {
db.onNotification(onNotification);
}
return true;
},
async close() {
Expand Down
43 changes: 43 additions & 0 deletions packages/pglite/tests/notify.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import test from "ava";
import { PGlite } from "../dist/index.js";

test("notify", async (t) => {
const db = new PGlite();

await db.listen("test", (payload) => {
t.is(payload, '321');
});

await db.query("NOTIFY test, '321'");

await new Promise((resolve) => setTimeout(resolve, 1000));
});

test("unlisten", async (t) => {
const db = new PGlite();

const unsub = await db.listen("test", () => {
t.fail();
});

await unsub();

await db.query("NOTIFY test");

await new Promise((resolve) => setTimeout(resolve, 1000));
t.pass();
});

test('onNotification', async (t) => {
const db = new PGlite();

db.onNotification((chan, payload) => {
t.is(chan, 'test');
t.is(payload, '123');
});

await db.query("LISTEN test");
await db.query("NOTIFY test, '123'");

await new Promise((resolve) => setTimeout(resolve, 1000));
});
2 changes: 1 addition & 1 deletion postgres

0 comments on commit 169031a

Please sign in to comment.