diff --git a/packages/pglite/README.md b/packages/pglite/README.md index 6f30fae7..a927dd98 100644 --- a/packages/pglite/README.md +++ b/packages/pglite/README.md @@ -196,6 +196,9 @@ The `query` and `exec` methods take an optional `options` objects with the follo }); ``` +- `blob: Blob | File` + Attach a `Blob` or `File` object to the query that can used with a `COPY FROM` command by using the virtual `/dev/blob` device, see [importing and exporting](#importing-and-exporting-with-copy-tofrom). + #### `.exec(query: string, options?: QueryOptions): Promise>` Execute one or more statements. *(note that parameters are not supported)* @@ -274,7 +277,8 @@ Result objects have the following properties: - `rows: Row[]` - The rows retuned by the query - `affectedRows?: number` - Count of the rows affected by the query. Note this is *not* the count of rows returned, it is the number or rows in the database changed by the query. -- `fields: { name: string; dataTypeID: number }[]` - Field name and Postgres data type ID for each field returned. +- `fields: { name: string; dataTypeID: number }[]` - Field name and Postgres data type ID for each field returned. +- `blob: Blob` - A `Blob` containing the data written to the virtual `/dev/blob/` device by a `COPY TO` command. See [importing and exporting](#importing-and-exporting-with-copy-tofrom). ### Row Objects: @@ -301,6 +305,25 @@ await pg.exec(` *Work in progress: We plan to expand this API to allow sharing of the worker PGlite across browser tabs.* +### Importing and exporting with `COPY TO/FROM` + +PGlite has support importing and exporting via `COPY TO/FROM` by using a virtual `/dev/blob` device. + +To import a file pass the `File` or `Blob` in the query options as `blob`, and copy from the `/dev/blob` device. + +```ts +await pg.query("COPY my_table FROM '/dev/blob';", [], { + blob: MyBlob +}) +``` + +To export a table or query to a file you just have to write to the `/dev/blob` device, the file will be retied as `blob` on the query results: + +```ts +const ret = await pg.query("COPY my_table TO '/dev/blob';") +// ret.blob is a `Blob` object with the data from the copy. +``` + ## Extensions PGlite supports the pl/pgsql procedural language extension, this is included and enabled by default. diff --git a/packages/pglite/examples/copy.html b/packages/pglite/examples/copy.html new file mode 100644 index 00000000..8ae427d2 --- /dev/null +++ b/packages/pglite/examples/copy.html @@ -0,0 +1,48 @@ + diff --git a/packages/pglite/src/fs/idbfs.ts b/packages/pglite/src/fs/idbfs.ts index 5eb62880..e8beef45 100644 --- a/packages/pglite/src/fs/idbfs.ts +++ b/packages/pglite/src/fs/idbfs.ts @@ -45,6 +45,7 @@ export class IdbFs extends FilesystemBase { const options: Partial = { ...opts, preRun: [ + ...(opts.preRun || []), (mod: any) => { const idbfs = mod.FS.filesystems.IDBFS; // Mount the idbfs to the users dataDir diff --git a/packages/pglite/src/fs/memoryfs.ts b/packages/pglite/src/fs/memoryfs.ts index 7a052a58..07332a00 100644 --- a/packages/pglite/src/fs/memoryfs.ts +++ b/packages/pglite/src/fs/memoryfs.ts @@ -19,6 +19,7 @@ export class MemoryFS extends FilesystemBase { const options: Partial = { ...opts, preRun: [ + ...(opts.preRun || []), (mod: any) => { /** * There is an issue with just mounting the filesystem, Postgres stalls... diff --git a/packages/pglite/src/fs/nodefs.ts b/packages/pglite/src/fs/nodefs.ts index d2cda56e..a4118350 100644 --- a/packages/pglite/src/fs/nodefs.ts +++ b/packages/pglite/src/fs/nodefs.ts @@ -32,6 +32,7 @@ export class NodeFS extends FilesystemBase { const options: Partial = { ...opts, preRun: [ + ...(opts.preRun || []), (mod: any) => { const nodefs = mod.FS.filesystems.NODEFS; mod.FS.mkdir(PGDATA); diff --git a/packages/pglite/src/interface.ts b/packages/pglite/src/interface.ts index c0d836ed..4e5b46c1 100644 --- a/packages/pglite/src/interface.ts +++ b/packages/pglite/src/interface.ts @@ -13,6 +13,7 @@ export interface ParserOptions { export interface QueryOptions { rowMode?: RowMode; parsers?: ParserOptions; + blob?: Blob | File; } export interface ExecProtocolOptions { @@ -54,6 +55,7 @@ export type Results = { rows: Row[]; affectedRows?: number; fields: { name: string; dataTypeID: number }[]; + blob?: Blob; // Only set when a file is returned, such as from a COPY command }; export interface Transaction { diff --git a/packages/pglite/src/parse.ts b/packages/pglite/src/parse.ts index bcc99c1a..de988f97 100644 --- a/packages/pglite/src/parse.ts +++ b/packages/pglite/src/parse.ts @@ -14,6 +14,7 @@ import { parseType } from "./types.js"; export function parseResults( messages: Array, options?: QueryOptions, + blob?: Blob ): Array { const resultSets: Results[] = []; let currentResultSet: Results = { rows: [], fields: [] }; @@ -23,7 +24,7 @@ export function parseResults( (msg) => msg instanceof RowDescriptionMessage || msg instanceof DataRowMessage || - msg instanceof CommandCompleteMessage, + msg instanceof CommandCompleteMessage ); filteredMessages.forEach((msg, index) => { @@ -39,9 +40,9 @@ export function parseResults( parseType( field, currentResultSet!.fields[i].dataTypeID, - options?.parsers, - ), - ), + options?.parsers + ) + ) ); } else { // rowMode === "object" @@ -52,17 +53,21 @@ export function parseResults( parseType( field, currentResultSet!.fields[i].dataTypeID, - options?.parsers, + options?.parsers ), - ]), - ), + ]) + ) ); } } else if (msg instanceof CommandCompleteMessage) { affectedRows += retrieveRowCount(msg); if (index === filteredMessages.length - 1) - resultSets.push({ ...currentResultSet, affectedRows }); + resultSets.push({ + ...currentResultSet, + affectedRows, + ...(blob ? { blob } : {}), + }); else resultSets.push(currentResultSet); currentResultSet = { rows: [], fields: [] }; diff --git a/packages/pglite/src/pglite.ts b/packages/pglite/src/pglite.ts index a34e6b94..392bc586 100644 --- a/packages/pglite/src/pglite.ts +++ b/packages/pglite/src/pglite.ts @@ -54,6 +54,11 @@ export class PGlite implements PGliteInterface { #parser = new Parser(); + // These are the current ArrayBuffer that is being read or written to + // during a query, such as COPY FROM or COPY TO. + #queryReadBuffer?: ArrayBuffer; + #queryWriteChunks?: Uint8Array[]; + /** * Create a new PGlite instance * @param dataDir The directory to store the database files @@ -130,6 +135,58 @@ export class PGlite implements PGliteInterface { ...(this.debug > 0 ? { print: console.info, printErr: console.error } : { print: () => {}, printErr: () => {} }), + preRun: [ + (mod: any) => { + // Register /dev/blob device + // This is used to read and write blobs when used in COPY TO/FROM + // e.g. COPY mytable TO '/dev/blob' WITH (FORMAT binary) + // The data is returned by the query as a `blob` property in the results + const devId = mod.FS.makedev(64, 0); + let callCounter = 0; + const devOpt = { + open: (stream: any) => {}, + close: (stream: any) => {}, + read: ( + stream: any, + buffer: Uint8Array, + offset: number, + length: number, + position: number, + ) => { + const buf = this.#queryReadBuffer; + if (!buf) { + throw new Error("No File or Blob provided to read from"); + } + const contents = new Uint8Array(buf); + if (position >= contents.length) return 0; + const size = Math.min(contents.length - position, length); + for (let i = 0; i < size; i++) { + buffer[offset + i] = contents[position + i]; + } + return size; + }, + write: ( + stream: any, + buffer: Uint8Array, + offset: number, + length: number, + position: number, + ) => { + callCounter++; + this.#queryWriteChunks ??= []; + this.#queryWriteChunks.push( + buffer.slice(offset, offset + length), + ); + return length; + }, + llseek: (stream: any, offset: number, whence: number) => { + throw new Error("Cannot seek /dev/blob"); + }, + }; + mod.FS.registerDevice(devId, devOpt); + mod.FS.mkdev("/dev/blob", devId); + }, + ], onRuntimeInitialized: async (Module: EmPostgres) => { await this.fs!.initialSyncFs(Module.FS); this.#ready = true; @@ -269,9 +326,8 @@ export class PGlite implements PGliteInterface { ): Promise> { return await this.#queryMutex.runExclusive(async () => { // We need to parse, bind and execute a query with parameters - if (this.debug > 1) { - console.log("runQuery", query, params, options); - } + this.#log("runQuery", query, params, options); + await this.#handleBlob(options?.blob); const parsedParams = params?.map((p) => serializeType(p)) || []; let results; try { @@ -295,12 +351,19 @@ export class PGlite implements PGliteInterface { } finally { await this.#execProtocolNoSync(serialize.sync()); } + this.#cleanupBlob(); if (!this.#inTransaction) { await this.#syncToFs(); } + let blob: Blob | undefined; + if (this.#queryWriteChunks) { + blob = new Blob(this.#queryWriteChunks); + this.#queryWriteChunks = undefined; + } return parseResults( results.map(([msg]) => msg), options, + blob, )[0] as Results; }); } @@ -318,21 +381,27 @@ export class PGlite implements PGliteInterface { ): Promise> { return await this.#queryMutex.runExclusive(async () => { // No params so we can just send the query - if (this.debug > 1) { - console.log("runExec", query, options); - } + this.#log("runExec", query, options); + await this.#handleBlob(options?.blob); let results; try { results = await this.#execProtocolNoSync(serialize.query(query)); } finally { await this.#execProtocolNoSync(serialize.sync()); } + this.#cleanupBlob(); if (!this.#inTransaction) { await this.#syncToFs(); } + let blob: Blob | undefined; + if (this.#queryWriteChunks) { + blob = new Blob(this.#queryWriteChunks); + this.#queryWriteChunks = undefined; + } return parseResults( results.map(([msg]) => msg), options, + blob, ) as Array; }); } @@ -397,6 +466,21 @@ export class PGlite implements PGliteInterface { }); } + /** + * Handle a file attached to the current query + * @param file The file to handle + */ + async #handleBlob(blob?: File | Blob) { + this.#queryReadBuffer = blob ? await blob.arrayBuffer() : undefined; + } + + /** + * Cleanup the current file + */ + #cleanupBlob() { + this.#queryReadBuffer = undefined; + } + /** * Wait for the database to be ready */ @@ -499,4 +583,13 @@ export class PGlite implements PGliteInterface { await doSync(); } } + + /** + * Internal log function + */ + #log(...args: any[]) { + if (this.debug > 0) { + console.log(...args); + } + } } diff --git a/packages/pglite/tests/basic.test.js b/packages/pglite/tests/basic.test.js index c19e8642..af2709f4 100644 --- a/packages/pglite/tests/basic.test.js +++ b/packages/pglite/tests/basic.test.js @@ -319,3 +319,55 @@ test("basic transaction", async (t) => { affectedRows: 0, }); }); + +test("basic copy to/from blob", async (t) => { + const db = new PGlite(); + await db.exec(` + CREATE TABLE IF NOT EXISTS test ( + id SERIAL PRIMARY KEY, + test TEXT + ); + INSERT INTO test (test) VALUES ('test'), ('test2'); + `); + + // copy to + const ret = await db.query("COPY test TO '/dev/blob' WITH (FORMAT csv);"); + const csv = await ret.blob.text(); + t.is(csv, "1,test\n2,test2\n"); + + // copy from + const blob2 = new Blob([csv]); + await db.exec(` + CREATE TABLE IF NOT EXISTS test2 ( + id SERIAL PRIMARY KEY, + test TEXT + ); + `); + await db.query("COPY test2 FROM '/dev/blob' WITH (FORMAT csv);", [], { blob: blob2 }); + const res = await db.query(` + SELECT * FROM test2; + `); + t.deepEqual(res, { + rows: [ + { + id: 1, + test: "test", + }, + { + id: 2, + test: "test2", + }, + ], + fields: [ + { + name: "id", + dataTypeID: 23, + }, + { + name: "test", + dataTypeID: 25, + }, + ], + affectedRows: 0, + }); +});