-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8665ae8
commit 16f73ae
Showing
16 changed files
with
600 additions
and
286 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import type { Flatfile } from '@flatfile/api' | ||
import api from '@flatfile/api' | ||
import { asyncMap } from 'modern-async' | ||
|
||
interface Part { | ||
sheetId: string | ||
pageNumber: number | ||
pageSize: number | ||
} | ||
|
||
export async function prepareParts( | ||
workbookId: string, | ||
pageSize: number, | ||
filter: Flatfile.Filter | ||
): Promise<Array<Part>> { | ||
const { data: sheets } = await api.sheets.list({ workbookId }) | ||
|
||
const partsArrays = await asyncMap(sheets, async (sheet) => { | ||
const { | ||
data: { | ||
counts: { total }, | ||
}, | ||
} = await api.sheets.getRecordCounts(sheet.id, { filter }) | ||
return Array.from({ length: Math.ceil(total / pageSize) }, (_, index) => ({ | ||
sheetId: sheet.id, | ||
pageNumber: index + 1, | ||
pageSize, | ||
})) | ||
}) | ||
|
||
return partsArrays.flat() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import { getSecret, isValidUrl } from '@flatfile/util-common' | ||
import { SheetExport } from './types' | ||
|
||
export async function postToWebhook( | ||
sheetExport: SheetExport, | ||
url: string | URL, | ||
urlParams: Array<{ key: string; value: unknown }>, | ||
secretName: string, | ||
environmentId: string, | ||
spaceId: string | ||
) { | ||
const baseUrl = isValidUrl(url) | ||
? url | ||
: await getSecret(url as string, environmentId) | ||
const queryParams = new URLSearchParams() | ||
urlParams.forEach(({ key, value }) => { | ||
Array.isArray(value) | ||
? value.forEach((v) => queryParams.append(key, String(v))) | ||
: queryParams.set(key, String(value)) | ||
}) | ||
const secret = secretName | ||
? await getSecret(secretName, environmentId, spaceId) | ||
: '' | ||
const response = await fetch(`${baseUrl}?${queryParams}`, { | ||
method: 'POST', | ||
headers: { | ||
'Content-Type': 'application/json', | ||
...(secret ? { Authorization: `Bearer ${secret}` } : {}), | ||
}, | ||
body: JSON.stringify({ sheet: sheetExport }), | ||
}) | ||
if (!response.ok) | ||
throw new Error(`HTTP ${response.status} ${response.statusText}`) | ||
return response.json() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import type { Flatfile } from '@flatfile/api' | ||
|
||
export interface SheetExport extends Flatfile.Sheet { | ||
records: Flatfile.Record_[] | ||
} | ||
|
||
export interface WebhookEgressOptions { | ||
secretName?: string | ||
urlParams?: Array<{ key: string; value: unknown }> | ||
pageSize?: number | ||
filter?: Flatfile.Filter | ||
debug?: boolean | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,81 +1,124 @@ | ||
import api from '@flatfile/api' | ||
import { FlatfileListener } from '@flatfile/listener' | ||
import { jobHandler } from '@flatfile/plugin-job-handler' | ||
import { logError } from '@flatfile/util-common' | ||
import { | ||
RejectionResponse, | ||
responseRejectionHandler, | ||
} from '@flatfile/util-response-rejection' | ||
import api, { Flatfile } from '@flatfile/api' | ||
import type FlatfileListener from '@flatfile/listener' | ||
import { FlatfileEvent } from '@flatfile/listener' | ||
import { deleteRecords, getRecordsRaw, logError } from '@flatfile/util-common' | ||
import { responseRejectionHandler } from '@flatfile/util-response-rejection' | ||
import { prepareParts } from './fetch.and.process.sheets' | ||
import { postToWebhook } from './post.to.webhook' | ||
import type { SheetExport, WebhookEgressOptions } from './types' | ||
|
||
export function webhookEgress(job: string, webhookUrl?: string) { | ||
return function (listener: FlatfileListener) { | ||
listener.use( | ||
jobHandler(job, async (event, tick) => { | ||
const { workbookId } = event.context | ||
const { data: workbook } = await api.workbooks.get(workbookId) | ||
const { data: workbookSheets } = await api.sheets.list({ workbookId }) | ||
export function webhookEgress( | ||
job: string, | ||
url: string | URL, | ||
options: WebhookEgressOptions = {} | ||
) { | ||
const { | ||
secretName = 'WEBHOOK_TOKEN', | ||
urlParams = [], | ||
pageSize = 10_000, | ||
filter = Flatfile.Filter.Valid, | ||
debug = false, | ||
} = options | ||
let deleteSubmitted = false | ||
|
||
await tick(30, 'Getting workbook data') | ||
return (listener: FlatfileListener) => { | ||
listener.on( | ||
'job:ready', | ||
{ job, isPart: false }, | ||
async (event: FlatfileEvent) => { | ||
const { jobId, workbookId } = event.context | ||
|
||
const sheets = [] | ||
for (const [_, element] of workbookSheets.entries()) { | ||
const { data: records } = await api.records.get(element.id) | ||
sheets.push({ | ||
...element, | ||
...records, | ||
await api.jobs.ack(jobId, { info: 'Splitting Job', progress: 10 }) | ||
|
||
const parts = await prepareParts(workbookId, pageSize, filter) | ||
if (parts.length > 0) { | ||
await api.jobs.split(jobId, { parts }) | ||
await api.jobs.ack(jobId, { | ||
info: `Job Split into ${parts.length} parts.`, | ||
progress: 20, | ||
}) | ||
} else { | ||
await api.jobs.complete(jobId, { | ||
outcome: { message: 'nothing to do' }, | ||
}) | ||
} | ||
} | ||
) | ||
|
||
await tick(60, 'Posting data to webhook') | ||
|
||
listener.on( | ||
'job:ready', | ||
{ job, isPart: true }, | ||
async (event: FlatfileEvent) => { | ||
const { jobId, environmentId, spaceId } = event.context | ||
try { | ||
const webhookReceiver = webhookUrl || process.env.WEBHOOK_SITE_URL | ||
const response = await fetch(webhookReceiver, { | ||
method: 'POST', | ||
headers: { | ||
'Content-Type': 'application/json', | ||
}, | ||
body: JSON.stringify({ | ||
workbook: { | ||
...workbook, | ||
sheets, | ||
}, | ||
}), | ||
const job = await api.jobs.get(jobId) | ||
const { sheetId, pageNumber } = job.data.partData! | ||
|
||
const records = await getRecordsRaw(sheetId, { | ||
pageNumber, | ||
pageSize, | ||
filter, | ||
}) | ||
|
||
if (response.status === 200) { | ||
const responseData = await response.json() | ||
const rejections: RejectionResponse = responseData.rejections | ||
if (records instanceof Error) { | ||
throw new Error(`Error fetching records: ${records.message}`) | ||
} | ||
|
||
if (rejections) { | ||
return await responseRejectionHandler(rejections) | ||
} | ||
const { data: sheet } = await api.sheets.get(sheetId) | ||
const sheetExport: SheetExport = { ...sheet, records } | ||
const responseData = await postToWebhook( | ||
sheetExport, | ||
url, | ||
urlParams, | ||
secretName, | ||
environmentId, | ||
spaceId | ||
) | ||
|
||
return { | ||
outcome: { | ||
message: `Data was successfully submitted to the provided webhook. Go check it out at ${webhookReceiver}.`, | ||
}, | ||
} | ||
const { rejections } = responseData | ||
if (rejections) { | ||
const response = await responseRejectionHandler(rejections) | ||
deleteSubmitted = rejections.deleteSubmitted | ||
|
||
await api.jobs.complete(jobId, response.jobCompleteDetails) | ||
} else { | ||
logError( | ||
'@flatfile/plugin-webhook-egress', | ||
`Failed to submit data to ${webhookReceiver}. Status: ${response.status} ${response.statusText}` | ||
) | ||
return { | ||
await api.jobs.complete(jobId, { | ||
outcome: { | ||
message: `Data was not successfully submitted to the provided webhook. Status: ${response.status} ${response.statusText}`, | ||
message: `Data was successfully submitted to the provided webhook. Check it out at ${url}.`, | ||
}, | ||
} | ||
}) | ||
} | ||
} catch (error) { | ||
logError( | ||
'@flatfile/plugin-webhook-egress', | ||
JSON.stringify(error, null, 2) | ||
) | ||
// Throw error to fail job | ||
throw new Error(`Error posting data to webhook`) | ||
if (debug) { | ||
logError('@flatfile/plugin-webhook-egress', error.message) | ||
} | ||
await api.jobs.fail(jobId, { outcome: { message: error.message } }) | ||
} | ||
}) | ||
} | ||
) | ||
|
||
listener.on( | ||
'job:parts-completed', | ||
{ job, isPart: false }, | ||
async (event: FlatfileEvent) => { | ||
const { jobId, workbookId } = event.context | ||
if (deleteSubmitted) { | ||
const { data: sheets } = await api.sheets.list({ workbookId }) | ||
for (const sheet of sheets) { | ||
const { | ||
data: { | ||
counts: { valid }, | ||
}, | ||
} = await api.sheets.getRecordCounts(sheet.id) | ||
if (valid > 0) await deleteRecords(sheet.id, { filter: 'valid' }) | ||
} | ||
} | ||
await api.jobs.complete(jobId, { | ||
outcome: { message: 'This job is now complete.' }, | ||
}) | ||
} | ||
) | ||
} | ||
} | ||
|
||
export * from './types' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
import api from '@flatfile/api' | ||
import { handleError } from './logging.helper' | ||
|
||
export async function getSecret( | ||
name: string, | ||
environmentId: string, | ||
spaceId?: string | ||
): Promise<string | undefined> { | ||
try { | ||
const secrets = await api.secrets.list({ spaceId, environmentId }) | ||
return secrets.data.find((secret) => secret.name === name)?.value | ||
} catch (e) { | ||
handleError(e, `Error fetching secret ${name}`) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
export * from './all.records' | ||
export * from './async.batch' | ||
export * from './delete.records' | ||
export * from './get.secret' | ||
export * from './logging.helper' | ||
export * from './valid.url' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.