Skip to content

Commit

Permalink
Support for adding or receiving a blob for use with COPY FROM/TO (#106)
Browse files Browse the repository at this point in the history
* Support for adding or receving a blob for a COPY

* Add tests

* Docs

* Fix submodule
  • Loading branch information
samwillis authored Jun 25, 2024
1 parent 7df54d9 commit 5b91370
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 15 deletions.
25 changes: 24 additions & 1 deletion packages/pglite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ The `query` and `exec` methods take an optional `options` objects with the follo
});
```

- `blob: Blob | File`
Attach a `Blob` or `File` object to the query that can used with a `COPY FROM` command by using the virtual `/dev/blob` device, see [importing and exporting](#importing-and-exporting-with-copy-tofrom).

#### `.exec(query: string, options?: QueryOptions): Promise<Array<Results>>`

Execute one or more statements. *(note that parameters are not supported)*
Expand Down Expand Up @@ -274,7 +277,8 @@ Result objects have the following properties:

- `rows: Row<T>[]` - The rows retuned by the query
- `affectedRows?: number` - Count of the rows affected by the query. Note this is *not* the count of rows returned, it is the number or rows in the database changed by the query.
- `fields: { name: string; dataTypeID: number }[]` - Field name and Postgres data type ID for each field returned.
- `fields: { name: string; dataTypeID: number }[]` - Field name and Postgres data type ID for each field returned.
- `blob: Blob` - A `Blob` containing the data written to the virtual `/dev/blob/` device by a `COPY TO` command. See [importing and exporting](#importing-and-exporting-with-copy-tofrom).


### Row<T> Objects:
Expand All @@ -301,6 +305,25 @@ await pg.exec(`

*Work in progress: We plan to expand this API to allow sharing of the worker PGlite across browser tabs.*

### Importing and exporting with `COPY TO/FROM`

PGlite has support importing and exporting via `COPY TO/FROM` by using a virtual `/dev/blob` device.

To import a file pass the `File` or `Blob` in the query options as `blob`, and copy from the `/dev/blob` device.

```ts
await pg.query("COPY my_table FROM '/dev/blob';", [], {
blob: MyBlob
})
```

To export a table or query to a file you just have to write to the `/dev/blob` device, the file will be retied as `blob` on the query results:

```ts
const ret = await pg.query("COPY my_table TO '/dev/blob';")
// ret.blob is a `Blob` object with the data from the copy.
```

## Extensions

PGlite supports the pl/pgsql procedural language extension, this is included and enabled by default.
Expand Down
48 changes: 48 additions & 0 deletions packages/pglite/examples/copy.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<script type="module">
import { PGlite } from "../dist/index.js";

const pg = new PGlite();

await pg.exec(`
CREATE TABLE IF NOT EXISTS test (
id SERIAL PRIMARY KEY,
name TEXT
);
`);

// add 1000 rows:
await pg.exec("INSERT INTO test (name) SELECT 'name' || i FROM generate_series(1, 1000) i;");

// Copy the date to a file:
console.log('Copying data to file...') // 'test.csv
const ret = await pg.query("COPY test TO '/dev/blob' WITH (FORMAT binary);");

console.log('Data copied to blob:')
const blob = ret.blob;
console.log(blob);

// Download the file:
const a = document.createElement('a');
a.href = URL.createObjectURL(new File([blob], 'test.csv', { type: 'text/csv' }));
a.download = 'test.csv';
a.click();

// Other table
await pg.exec(`
CREATE TABLE IF NOT EXISTS test2 (
id SERIAL PRIMARY KEY,
name TEXT
);
`);

// import the data from the file:
console.log('Importing data from file...')
const ret2 = await pg.query("COPY test2 FROM '/dev/blob' WITH (FORMAT binary);", [], {
blob: blob
});

console.log('Data imported from file:')
const ret3 = await pg.query("SELECT * FROM test2;", []);
console.log(ret3.rows);

</script>
1 change: 1 addition & 0 deletions packages/pglite/src/fs/idbfs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export class IdbFs extends FilesystemBase {
const options: Partial<EmPostgres> = {
...opts,
preRun: [
...(opts.preRun || []),
(mod: any) => {
const idbfs = mod.FS.filesystems.IDBFS;
// Mount the idbfs to the users dataDir
Expand Down
1 change: 1 addition & 0 deletions packages/pglite/src/fs/memoryfs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class MemoryFS extends FilesystemBase {
const options: Partial<EmPostgres> = {
...opts,
preRun: [
...(opts.preRun || []),
(mod: any) => {
/**
* There is an issue with just mounting the filesystem, Postgres stalls...
Expand Down
1 change: 1 addition & 0 deletions packages/pglite/src/fs/nodefs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export class NodeFS extends FilesystemBase {
const options: Partial<EmPostgres> = {
...opts,
preRun: [
...(opts.preRun || []),
(mod: any) => {
const nodefs = mod.FS.filesystems.NODEFS;
mod.FS.mkdir(PGDATA);
Expand Down
2 changes: 2 additions & 0 deletions packages/pglite/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface ParserOptions {
export interface QueryOptions {
rowMode?: RowMode;
parsers?: ParserOptions;
blob?: Blob | File;
}

export interface ExecProtocolOptions {
Expand Down Expand Up @@ -54,6 +55,7 @@ export type Results<T = { [key: string]: any }> = {
rows: Row<T>[];
affectedRows?: number;
fields: { name: string; dataTypeID: number }[];
blob?: Blob; // Only set when a file is returned, such as from a COPY command
};

export interface Transaction {
Expand Down
21 changes: 13 additions & 8 deletions packages/pglite/src/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { parseType } from "./types.js";
export function parseResults(
messages: Array<BackendMessage>,
options?: QueryOptions,
blob?: Blob
): Array<Results> {
const resultSets: Results[] = [];
let currentResultSet: Results = { rows: [], fields: [] };
Expand All @@ -23,7 +24,7 @@ export function parseResults(
(msg) =>
msg instanceof RowDescriptionMessage ||
msg instanceof DataRowMessage ||
msg instanceof CommandCompleteMessage,
msg instanceof CommandCompleteMessage
);

filteredMessages.forEach((msg, index) => {
Expand All @@ -39,9 +40,9 @@ export function parseResults(
parseType(
field,
currentResultSet!.fields[i].dataTypeID,
options?.parsers,
),
),
options?.parsers
)
)
);
} else {
// rowMode === "object"
Expand All @@ -52,17 +53,21 @@ export function parseResults(
parseType(
field,
currentResultSet!.fields[i].dataTypeID,
options?.parsers,
options?.parsers
),
]),
),
])
)
);
}
} else if (msg instanceof CommandCompleteMessage) {
affectedRows += retrieveRowCount(msg);

if (index === filteredMessages.length - 1)
resultSets.push({ ...currentResultSet, affectedRows });
resultSets.push({
...currentResultSet,
affectedRows,
...(blob ? { blob } : {}),
});
else resultSets.push(currentResultSet);

currentResultSet = { rows: [], fields: [] };
Expand Down
105 changes: 99 additions & 6 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ export class PGlite implements PGliteInterface {

#parser = new Parser();

// These are the current ArrayBuffer that is being read or written to
// during a query, such as COPY FROM or COPY TO.
#queryReadBuffer?: ArrayBuffer;
#queryWriteChunks?: Uint8Array[];

/**
* Create a new PGlite instance
* @param dataDir The directory to store the database files
Expand Down Expand Up @@ -130,6 +135,58 @@ export class PGlite implements PGliteInterface {
...(this.debug > 0
? { print: console.info, printErr: console.error }
: { print: () => {}, printErr: () => {} }),
preRun: [
(mod: any) => {
// Register /dev/blob device
// This is used to read and write blobs when used in COPY TO/FROM
// e.g. COPY mytable TO '/dev/blob' WITH (FORMAT binary)
// The data is returned by the query as a `blob` property in the results
const devId = mod.FS.makedev(64, 0);
let callCounter = 0;
const devOpt = {
open: (stream: any) => {},
close: (stream: any) => {},
read: (
stream: any,
buffer: Uint8Array,
offset: number,
length: number,
position: number,
) => {
const buf = this.#queryReadBuffer;
if (!buf) {
throw new Error("No File or Blob provided to read from");
}
const contents = new Uint8Array(buf);
if (position >= contents.length) return 0;
const size = Math.min(contents.length - position, length);
for (let i = 0; i < size; i++) {
buffer[offset + i] = contents[position + i];
}
return size;
},
write: (
stream: any,
buffer: Uint8Array,
offset: number,
length: number,
position: number,
) => {
callCounter++;
this.#queryWriteChunks ??= [];
this.#queryWriteChunks.push(
buffer.slice(offset, offset + length),
);
return length;
},
llseek: (stream: any, offset: number, whence: number) => {
throw new Error("Cannot seek /dev/blob");
},
};
mod.FS.registerDevice(devId, devOpt);
mod.FS.mkdev("/dev/blob", devId);
},
],
onRuntimeInitialized: async (Module: EmPostgres) => {
await this.fs!.initialSyncFs(Module.FS);
this.#ready = true;
Expand Down Expand Up @@ -269,9 +326,8 @@ export class PGlite implements PGliteInterface {
): Promise<Results<T>> {
return await this.#queryMutex.runExclusive(async () => {
// We need to parse, bind and execute a query with parameters
if (this.debug > 1) {
console.log("runQuery", query, params, options);
}
this.#log("runQuery", query, params, options);
await this.#handleBlob(options?.blob);
const parsedParams = params?.map((p) => serializeType(p)) || [];
let results;
try {
Expand All @@ -295,12 +351,19 @@ export class PGlite implements PGliteInterface {
} finally {
await this.#execProtocolNoSync(serialize.sync());
}
this.#cleanupBlob();
if (!this.#inTransaction) {
await this.#syncToFs();
}
let blob: Blob | undefined;
if (this.#queryWriteChunks) {
blob = new Blob(this.#queryWriteChunks);
this.#queryWriteChunks = undefined;
}
return parseResults(
results.map(([msg]) => msg),
options,
blob,
)[0] as Results<T>;
});
}
Expand All @@ -318,21 +381,27 @@ export class PGlite implements PGliteInterface {
): Promise<Array<Results>> {
return await this.#queryMutex.runExclusive(async () => {
// No params so we can just send the query
if (this.debug > 1) {
console.log("runExec", query, options);
}
this.#log("runExec", query, options);
await this.#handleBlob(options?.blob);
let results;
try {
results = await this.#execProtocolNoSync(serialize.query(query));
} finally {
await this.#execProtocolNoSync(serialize.sync());
}
this.#cleanupBlob();
if (!this.#inTransaction) {
await this.#syncToFs();
}
let blob: Blob | undefined;
if (this.#queryWriteChunks) {
blob = new Blob(this.#queryWriteChunks);
this.#queryWriteChunks = undefined;
}
return parseResults(
results.map(([msg]) => msg),
options,
blob,
) as Array<Results>;
});
}
Expand Down Expand Up @@ -397,6 +466,21 @@ export class PGlite implements PGliteInterface {
});
}

/**
* Handle a file attached to the current query
* @param file The file to handle
*/
async #handleBlob(blob?: File | Blob) {
this.#queryReadBuffer = blob ? await blob.arrayBuffer() : undefined;
}

/**
* Cleanup the current file
*/
#cleanupBlob() {
this.#queryReadBuffer = undefined;
}

/**
* Wait for the database to be ready
*/
Expand Down Expand Up @@ -499,4 +583,13 @@ export class PGlite implements PGliteInterface {
await doSync();
}
}

/**
* Internal log function
*/
#log(...args: any[]) {
if (this.debug > 0) {
console.log(...args);
}
}
}
Loading

0 comments on commit 5b91370

Please sign in to comment.