Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: webhook-egress & response-rejection refactor #485

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 70 additions & 69 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion plugins/webhook-egress/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
"@flatfile/plugin-job-handler": "^0.5.2",
"@flatfile/util-common": "^1.3.2",
"@flatfile/util-response-rejection": "^1.3.3",
"cross-fetch": "^4.0.0"
"cross-fetch": "^4.0.0",
"modern-async": "^2.0.0"
},
"peerDependencies": {
"@flatfile/api": "^1.8.9",
Expand Down
32 changes: 32 additions & 0 deletions plugins/webhook-egress/src/fetch.and.process.sheets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { Flatfile } from '@flatfile/api'
import api from '@flatfile/api'
import { asyncMap } from 'modern-async'

interface Part {
sheetId: string
pageNumber: number
pageSize: number
}

export async function prepareParts(
workbookId: string,
pageSize: number,
filter: Flatfile.Filter
): Promise<Array<Part>> {
const { data: sheets } = await api.sheets.list({ workbookId })

const partsArrays = await asyncMap(sheets, async (sheet) => {
const {
data: {
counts: { total },
},
} = await api.sheets.getRecordCounts(sheet.id, { filter })
return Array.from({ length: Math.ceil(total / pageSize) }, (_, index) => ({
sheetId: sheet.id,
pageNumber: index + 1,
pageSize,
}))
})

return partsArrays.flat()
}
35 changes: 35 additions & 0 deletions plugins/webhook-egress/src/post.to.webhook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { getSecret, isValidUrl } from '@flatfile/util-common'
import { SheetExport } from './types'

export async function postToWebhook(
sheetExport: SheetExport,
url: string | URL,
urlParams: Array<{ key: string; value: unknown }>,
secretName: string,
environmentId: string,
spaceId: string
) {
const baseUrl = isValidUrl(url)
? url
: await getSecret(url as string, environmentId)
const queryParams = new URLSearchParams()
urlParams.forEach(({ key, value }) => {
Array.isArray(value)
? value.forEach((v) => queryParams.append(key, String(v)))
: queryParams.set(key, String(value))
})
const secret = secretName
? await getSecret(secretName, environmentId, spaceId)
: ''
const response = await fetch(`${baseUrl}?${queryParams}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(secret ? { Authorization: `Bearer ${secret}` } : {}),
},
body: JSON.stringify({ sheet: sheetExport }),
})
if (!response.ok)
throw new Error(`HTTP ${response.status} ${response.statusText}`)
return response.json()
}
13 changes: 13 additions & 0 deletions plugins/webhook-egress/src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { Flatfile } from '@flatfile/api'

export interface SheetExport extends Flatfile.Sheet {
records: Flatfile.Record_[]
}

export interface WebhookEgressOptions {
secretName?: string
urlParams?: Array<{ key: string; value: unknown }>
pageSize?: number
filter?: Flatfile.Filter
debug?: boolean
}
18 changes: 9 additions & 9 deletions plugins/webhook-egress/src/webhook.egress.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ fetchMock.dontMock()
describe('webhookEgress() e2e', () => {
const listener = setupListener()

let spaceId
let workbookId
let sheetId
let spaceId: string
let workbookId: string
let sheetId: string

beforeAll(async () => {
const space = await setupSpace()
Expand All @@ -30,7 +30,7 @@ describe('webhookEgress() e2e', () => {
'notes',
])
workbookId = workbook.id
sheetId = workbook.sheets[0].id
sheetId = workbook.sheets![0].id
await createRecords(sheetId, [
{
name: 'John Doe',
Expand Down Expand Up @@ -76,7 +76,7 @@ describe('webhookEgress() e2e', () => {
await listener.waitFor('job:ready', 1, 'workbook:egressTestSuccess')

const response = await api.jobs.get(successfulJobId)
expect(response.data.outcome.message).toEqual(
expect(response.data.outcome?.message).toEqual(
`Data was successfully submitted to the provided webhook. Go check it out at example.com.`
)
})
Expand Down Expand Up @@ -157,10 +157,10 @@ describe('webhookEgress() e2e', () => {
await listener.waitFor('job:ready', 1, 'workbook:egressTestSuccess')

const response = await api.jobs.get(successfulJobId)
expect(response.data.outcome.message).toEqual(
expect(response.data.outcome?.message).toEqual(
'The data has been successfully submitted without any rejections. This task is now complete.'
)
expect(response.data.outcome.heading).toEqual('Success!')
expect(response.data.outcome?.heading).toEqual('Success!')
})

it('returns rejections', async () => {
Expand Down Expand Up @@ -208,10 +208,10 @@ describe('webhookEgress() e2e', () => {
await listener.waitFor('job:ready', 1, 'workbook:egressTestSuccess')

const response = await api.jobs.get(successfulJobId)
expect(response.data.outcome.message).toEqual(
expect(response.data.outcome?.message).toEqual(
'During the data submission process, 1 records were rejected. Please review and correct these records before resubmitting.'
)
expect(response.data.outcome.heading).toEqual('Rejected Records')
expect(response.data.outcome?.heading).toEqual('Rejected Records')
})
})
})
165 changes: 104 additions & 61 deletions plugins/webhook-egress/src/webhook.egress.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,126 @@
import { FlatfileClient } from '@flatfile/api'
import { FlatfileListener } from '@flatfile/listener'
import { jobHandler } from '@flatfile/plugin-job-handler'
import { logError } from '@flatfile/util-common'
import {
RejectionResponse,
responseRejectionHandler,
} from '@flatfile/util-response-rejection'
import { Flatfile, FlatfileClient } from '@flatfile/api'
import type FlatfileListener from '@flatfile/listener'
import { FlatfileEvent } from '@flatfile/listener'
import { deleteRecords, getRecordsRaw, logError } from '@flatfile/util-common'
import { responseRejectionHandler } from '@flatfile/util-response-rejection'
import { prepareParts } from './fetch.and.process.sheets'
import { postToWebhook } from './post.to.webhook'
import type { SheetExport, WebhookEgressOptions } from './types'

const api = new FlatfileClient()

export function webhookEgress(job: string, webhookUrl?: string) {
return function (listener: FlatfileListener) {
listener.use(
jobHandler(job, async (event, tick) => {
const { workbookId } = event.context
const { data: workbook } = await api.workbooks.get(workbookId)
const { data: workbookSheets } = await api.sheets.list({ workbookId })
export function webhookEgress(
job: string,
url: string | URL,
options: WebhookEgressOptions = {}
) {
const {
secretName = 'WEBHOOK_TOKEN',
urlParams = [],
pageSize = 10_000,
filter = Flatfile.Filter.Valid,
debug = false,
} = options
let deleteSubmitted = false

await tick(30, 'Getting workbook data')
return (listener: FlatfileListener) => {
listener.on(
'job:ready',
{ job, isPart: false },
async (event: FlatfileEvent) => {
const { jobId, workbookId } = event.context

const sheets = []
for (const [_, element] of workbookSheets.entries()) {
const { data: records } = await api.records.get(element.id)
sheets.push({
...element,
...records,
await api.jobs.ack(jobId, { info: 'Splitting Job', progress: 10 })

const parts = await prepareParts(workbookId, pageSize, filter)
if (parts.length > 0) {
await api.jobs.split(jobId, { parts })
await api.jobs.ack(jobId, {
info: `Job Split into ${parts.length} parts.`,
progress: 20,
})
} else {
await api.jobs.complete(jobId, {
outcome: { message: 'nothing to do' },
})
}
}
)

await tick(60, 'Posting data to webhook')

listener.on(
'job:ready',
{ job, isPart: true },
async (event: FlatfileEvent) => {
const { jobId, environmentId, spaceId } = event.context
try {
const webhookReceiver = webhookUrl || process.env.WEBHOOK_SITE_URL
const response = await fetch(webhookReceiver, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
workbook: {
...workbook,
sheets,
},
}),
const job = await api.jobs.get(jobId)
const { sheetId, pageNumber } = job.data.partData!

const records = await getRecordsRaw(sheetId, {
pageNumber,
pageSize,
filter,
})

if (response.status === 200) {
const responseData = await response.json()
const rejections: RejectionResponse = responseData.rejections
if (records instanceof Error) {
throw new Error(`Error fetching records: ${records.message}`)
}

if (rejections) {
return await responseRejectionHandler(rejections)
}
const { data: sheet } = await api.sheets.get(sheetId)
const sheetExport: SheetExport = { ...sheet, records }
const responseData = await postToWebhook(
sheetExport,
url,
urlParams,
secretName,
environmentId,
spaceId
)

return {
outcome: {
message: `Data was successfully submitted to the provided webhook. Go check it out at ${webhookReceiver}.`,
},
}
const { rejections } = responseData
if (rejections) {
const response = await responseRejectionHandler(rejections)
deleteSubmitted = rejections.deleteSubmitted

await api.jobs.complete(jobId, response.jobCompleteDetails)
} else {
logError(
'@flatfile/plugin-webhook-egress',
`Failed to submit data to ${webhookReceiver}. Status: ${response.status} ${response.statusText}`
)
return {
await api.jobs.complete(jobId, {
outcome: {
message: `Data was not successfully submitted to the provided webhook. Status: ${response.status} ${response.statusText}`,
message: `Data was successfully submitted to the provided webhook. Check it out at ${url}.`,
},
}
})
}
} catch (error) {
logError(
'@flatfile/plugin-webhook-egress',
JSON.stringify(error, null, 2)
)
// Throw error to fail job
throw new Error(`Error posting data to webhook`)
if (debug) {
logError('@flatfile/plugin-webhook-egress', error.message)
}
await api.jobs.fail(jobId, { outcome: { message: error.message } })
}
})
}
)

listener.on(
'job:parts-completed',
{ job, isPart: false },
async (event: FlatfileEvent) => {
const { jobId, workbookId } = event.context
if (deleteSubmitted) {
const { data: sheets } = await api.sheets.list({ workbookId })
for (const sheet of sheets) {
const {
data: {
counts: { valid },
},
} = await api.sheets.getRecordCounts(sheet.id)
if (valid > 0) await deleteRecords(sheet.id, { filter: 'valid' })
}
}
await api.jobs.complete(jobId, {
outcome: { message: 'This job is now complete.' },
})
}
)
}
}

export * from './types'
15 changes: 15 additions & 0 deletions utils/common/src/get.secret.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import api from '@flatfile/api'
import { handleError } from './logging.helper'

export async function getSecret(
name: string,
environmentId: string,
spaceId?: string
): Promise<string | undefined> {
try {
const secrets = await api.secrets.list({ spaceId, environmentId })
return secrets.data.find((secret) => secret.name === name)?.value
} catch (e) {
handleError(e, `Error fetching secret ${name}`)
}
}
2 changes: 2 additions & 0 deletions utils/common/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export * from './all.records'
export * from './async.batch'
export * from './delete.records'
export * from './get.secret'
export * from './logging.helper'
export * from './slugify'
export * from './valid.url'
5 changes: 5 additions & 0 deletions utils/common/src/logging.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ export const logWarn = (packageName: string, msg: string): void => {
export const logError = (packageName: string, msg: string): void => {
log(packageName, msg, 'error')
}

export function handleError(error: any, message: string) {
console.error(error)
throw new Error(`${message}: ${error.message}`)
}
11 changes: 11 additions & 0 deletions utils/common/src/valid.url.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export function isValidUrl(url: string | URL) {
if (url instanceof URL) {
return true
}
try {
new URL(url)
return true
} catch (error) {
return false
}
}
Loading
Loading