Skip to content

Commit

Permalink
FSR-813 | add DTS Lambda (#50)
Browse files Browse the repository at this point in the history
* https://eaflood.atlassian.net/browse/FSR-813

* Refactor dts-process.js and add dtsProcessEventRule.json

* feat: Refactor dts-process.js and add dtsProcessEventRule.json

* Update imtd-stations.js module exports

* feat: Add HTTP_NOT_FOUND constant and use it in dts-process.js

* moved delete station in to helper folder

* Refactor dts-process.js and add dtsProcessEventRule.json

* feat: Add HTTP_NOT_FOUND constant and use it in dts-process.js

* refactored deleting a station

* feat: Retrieve rloi_ids with offset and limit in dts-process.js

* Refactor dts-process.js and add dtsProcessEventRule.json

* Refactor dts-process.js and add dtsProcessEventRule.json

* feat: Refactor dts-process.js and add getStationData function

* feat: Refactor dts-process.js and add validateStationData function

* Refactor imtd-process.js to use rloiids instead of stations

* Refactor dts-process.js and imtd-process.js to use rloiids instead of stations
  • Loading branch information
nikiwycherley authored Jun 4, 2024
1 parent 78868de commit 334065f
Show file tree
Hide file tree
Showing 11 changed files with 508 additions and 69 deletions.
7 changes: 7 additions & 0 deletions config/dtsProcessEventRule.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"Description": "Event rule to schedule the dtsProcess lambda execution",
"Name": "{PLACEHOLDER}",
"RoleArn": "{PLACEHOLDER}",
"ScheduleExpression": "{PLACEHOLDER}",
"State": "ENABLED"
}
6 changes: 6 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module.exports = {
HTTP_BAD_REQUEST: 400,
HTTP_NOT_FOUND: 404,
HTTP_TOO_MANY_REQUESTS: 429,
INTERNAL_SERVER_ERROR: 500
}
71 changes: 71 additions & 0 deletions lib/functions/dts-process.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
const logger = require('../helpers/logging')
const pg = require('../helpers/db')
const invokeLambda = require('../helpers/invoke-lambda')
const { deleteStation, getRloiIds, getStationData, validateStationData } = require('../helpers/imtd-api')

async function insertStation (stationDataArray) {
try {
await pg.transaction(async trx => {
await Promise.all(stationDataArray.map(async (stationData) => {
const stationID = stationData.station_id
await trx('station_display_time_series').where({ station_id: stationID }).delete()
await trx('station_display_time_series').insert(stationData)
logger.info(`Processed displayTimeSeries for RLOI id ${stationID}`)
}))
})
} catch (error) {
logger.error('Database error processing stationData', error)
throw error
}
}

async function getData (stationId) {
try {
const stationData = await getStationData(stationId)
if (stationData.length === 0) {
(console.log('Deleting station: ', stationId))
const tableName = 'station_display_time_series'
await deleteStation(stationId, tableName)
}
await validateStationData(stationData)
await insertStation(stationData)
} catch (error) {
logger.error(`Could not process data for station ${stationId} (${error.message})`)
}
}

async function handler ({ offset = 0 } = {}) {
const BATCH_SIZE = parseInt(process.env.IMTD_BATCH_SIZE || '500')

logger.info(`Retrieving up to ${BATCH_SIZE} rloi_ids with an offset of ${offset}`)
const rloiids = await getRloiIds({
offset,
limit: BATCH_SIZE
})
logger.info(`Retrieved ${rloiids.length} rloi_ids`)

for (const rloiid of rloiids) {
await getData(rloiid.rloi_id)
}

if (rloiids.length >= BATCH_SIZE) {
const functionName = process.env.AWS_LAMBDA_FUNCTION_NAME
const newOffset = offset + BATCH_SIZE
logger.info(`Invoking ${functionName} with an offset of ${newOffset}`)

await invokeLambda(functionName, {
offset: newOffset
})
}
}

module.exports = {
handler,
validateStationData
}

process.on('SIGTERM', async () => {
logger.info('SIGTERM received, destroying DB connection')
await pg.destroy()
process.exit(0)
})
50 changes: 7 additions & 43 deletions lib/functions/imtd-process.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
const parseThresholds = require('../models/parse-thresholds')
const axios = require('axios')
const logger = require('../helpers/logging')
const pg = require('../helpers/db')
const invokeLambda = require('../helpers/invoke-lambda')

async function deleteThresholds (stationId) {
try {
await pg('station_imtd_threshold').where({ station_id: stationId }).delete()
logger.info(`Deleted thresholds for RLOI id ${stationId}`)
} catch (error) {
logger.error(`Error deleting thresholds for station ${stationId}`, error)
throw error
}
}
const deleteThresholds = require('../helpers/imtd-api').deleteStation
const { getRloiIds, getImtdApiResponse } = require('../helpers/imtd-api')
const tableName = 'station_imtd_threshold'

async function insertThresholds (stationId, thresholds) {
try {
Expand All @@ -37,21 +29,6 @@ async function insertThresholds (stationId, thresholds) {
}
}

async function getImtdApiResponse (stationId) {
const hostname = 'imfs-prd1-thresholds-api.azurewebsites.net'
try {
return await axios.get(`https://${hostname}/Location/${stationId}?version=2`)
} catch (error) {
if (error.response?.status === 404) {
logger.info(`Station ${stationId} not found (HTTP Status: 404)`)
} else {
const message = error.response?.status ? `HTTP Status: ${error.response.status}` : `Error: ${error.message}`
throw Error(`IMTD API request for station ${stationId} failed (${message})`)
}
return {}
}
}

async function getIMTDThresholds (stationId) {
const response = await getImtdApiResponse(stationId)
if (response.data) {
Expand All @@ -66,36 +43,23 @@ async function getData (stationId) {
if (thresholds.length > 0) {
await insertThresholds(stationId, thresholds)
} else {
await deleteThresholds(stationId)
await deleteThresholds(stationId, tableName)
logger.info(`Deleted data for RLOI id ${stationId}`)
}
} catch (error) {
logger.error(`Could not process data for station ${stationId} (${error.message})`)
}
}

async function getRloiIds ({ limit, offset } = {}) {
try {
logger.info(`Retrieving up to ${limit} rloi_ids with an offset of ${offset}`)
const result = await pg('rivers_mview')
.distinct('rloi_id')
.whereNotNull('rloi_id')
.orderBy('rloi_id', 'asc')
.limit(limit)
.offset(offset)
logger.info(`Retrieved ${result.length} rloi_ids`)
return result
} catch (error) {
throw Error(`Could not get list of id's from database (Error: ${error.message})`)
}
}

async function handler ({ offset = 0 } = {}) {
const BATCH_SIZE = parseInt(process.env.IMTD_BATCH_SIZE || '500')

logger.info(`Retrieving up to ${BATCH_SIZE} rloi_ids with an offset of ${offset}`)
const stations = await getRloiIds({
offset,
limit: BATCH_SIZE
})
logger.info(`Retrieved ${stations.length} rloi_ids`)

for (const station of stations) {
await getData(station.rloi_id)
Expand Down
66 changes: 66 additions & 0 deletions lib/helpers/imtd-api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
const pg = require('./db')
const axios = require('axios')
const { HTTP_NOT_FOUND } = require('../constants')
const logger = require('./logging')
const parseStation = require('../models/parse-time-series')
const Joi = require('joi')

async function deleteStation (stationId, tableName) {
await pg(tableName).where({ station_id: stationId }).delete()
}

async function getRloiIds ({ limit, offset } = {}) {
try {
const result = await pg('rivers_mview')
.distinct('rloi_id')
.whereNotNull('rloi_id')
.orderBy('rloi_id', 'asc')
.limit(limit)
.offset(offset)
return result
} catch (error) {
throw Error(`Could not get list of id's from database (Error: ${error.message})`)
}
}

async function getImtdApiResponse (stationId) {
const hostname = 'imfs-prd1-thresholds-api.azurewebsites.net'
try {
return await axios.get(`https://${hostname}/Location/${stationId}?version=2`)
} catch (error) {
if (error.response?.status === HTTP_NOT_FOUND) {
logger.info(`Station ${stationId} not found (HTTP Status: 404)`)
} else {
const message = error.response?.status ? `HTTP Status: ${error.response.status}` : `Error: ${error.message}`
throw Error(`IMTD API request for station ${stationId} failed (${message})`)
}
return {}
}
}

async function getStationData (stationId) {
const response = await getImtdApiResponse(stationId)
if (response.data) {
return parseStation(response.data[0].TimeSeriesMetaData, stationId)
}
return []
}

async function validateStationData (stationDataArray) {
const schema = Joi.object({
station_id: Joi.number().required(),
direction: Joi.string().required(),
display_time_series: Joi.boolean().required()
})

try {
const validatedData = await Promise.all(
stationDataArray.map((stationData) => schema.validateAsync(stationData))
)
return validatedData
} catch (error) {
throw new Error(`Validation error: ${error.message}`)
}
}

module.exports = { deleteStation, getRloiIds, getImtdApiResponse, getStationData, validateStationData }
25 changes: 25 additions & 0 deletions lib/models/parse-time-series.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* @param {Object} data - The data to be parsed.
* @returns {Object} - The processed data.
*/
function parseTimeSeries (data, stationId) {
if (!data) {
return {}
}

const processedData = data.map((item) => ({
station_id: stationId,
direction: item.qualifier === 'Downstream Stage' ? 'd' : 'u',
display_time_series: item.DisplayTimeSeries
}))

const uniqueProcessedData = processedData.filter((item, index, self) =>
index === self.findIndex((t) => (
t.station_id === item.station_id && t.direction === item.direction
))
)

return uniqueProcessedData
}

module.exports = parseTimeSeries
2 changes: 1 addition & 1 deletion lib/models/rloi.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ function removePostfix (name) {
}

async function fetchStation (s3, bucket, key) {
return await s3.getObject({
return s3.getObject({
Bucket: bucket,
Key: key
})
Expand Down
4 changes: 4 additions & 0 deletions serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ functions:
name: ${env:LFW_DATA_TARGET_ENV_NAME}${self:service}-imtdProcess
handler: lib/functions/imtd-process.handler
timeout: 900
dtsProcess:
name: ${env:LFW_DATA_TARGET_ENV_NAME}${self:service}-dtsProcess
handler: lib/functions/dts-process.handler
timeout: 900
23 changes: 21 additions & 2 deletions test/data/imtd-stations.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,25 @@ const apiNoMatchingThresholdResponse = {
]
}

const flattenedData = [{ stationId: 9521, floodWarningArea: '113FWFEXE04', floodWarningType: 'W', direction: 'u', level: 2.4 }, { stationId: 9521, floodWarningArea: '113FWFEXE03', floodWarningType: 'W', direction: 'u', level: 2.5 }, { stationId: 9521, floodWarningArea: '113FWFEXE04', floodWarningType: 'W', direction: 'u', level: 2.7 }, { stationId: 9521, floodWarningArea: '113FWFEXE06', floodWarningType: 'W', direction: 'u', level: 2.9 }, { stationId: 9521, floodWarningArea: '113FWFEXE05', floodWarningType: 'W', direction: 'u', level: 3.6 }, { stationId: 9524, floodWarningArea: '121WAF910', floodWarningType: 'A', direction: 'u', level: 0.95 }, { stationId: 9524, floodWarningArea: '121FWF214', floodWarningType: 'W', direction: 'u', level: 1.2 }, { stationId: 9524, floodWarningArea: '121FWF214', floodWarningType: 'W', direction: 'u', level: 1.7 }, { stationId: 9525, floodWarningArea: '121WAF918', floodWarningType: 'A', direction: 'u', level: 0.7 }, { stationId: 9525, floodWarningArea: '121FWF121', floodWarningType: 'W', direction: 'u', level: 0.7 }, { stationId: 9525, floodWarningArea: '121FWF121', floodWarningType: 'W', direction: 'u', level: 1.25 }]
const api404 = {
type: 'https://tools.ietf.org/html/rfc7231#section-6.5.4',
title: 'Not Found',
status: 404,
traceId: '0HN38TFTTO070:00000003'
}

const flattenedData = [
{ stationId: 9521, floodWarningArea: '113FWFEXE04', floodWarningType: 'W', direction: 'u', level: 2.4 },
{ stationId: 9521, floodWarningArea: '113FWFEXE03', floodWarningType: 'W', direction: 'u', level: 2.5 },
{ stationId: 9521, floodWarningArea: '113FWFEXE04', floodWarningType: 'W', direction: 'u', level: 2.7 },
{ stationId: 9521, floodWarningArea: '113FWFEXE06', floodWarningType: 'W', direction: 'u', level: 2.9 },
{ stationId: 9521, floodWarningArea: '113FWFEXE05', floodWarningType: 'W', direction: 'u', level: 3.6 },
{ stationId: 9524, floodWarningArea: '121WAF910', floodWarningType: 'A', direction: 'u', level: 0.95 },
{ stationId: 9524, floodWarningArea: '121FWF214', floodWarningType: 'W', direction: 'u', level: 1.2 },
{ stationId: 9524, floodWarningArea: '121FWF214', floodWarningType: 'W', direction: 'u', level: 1.7 },
{ stationId: 9525, floodWarningArea: '121WAF918', floodWarningType: 'A', direction: 'u', level: 0.7 },
{ stationId: 9525, floodWarningArea: '121FWF121', floodWarningType: 'W', direction: 'u', level: 0.7 },
{ stationId: 9525, floodWarningArea: '121FWF121', floodWarningType: 'W', direction: 'u', level: 1.25 }
]

module.exports = { stations, apiResponse, apiNoMatchingThresholdResponse, flattenedData }
module.exports = { stations, apiResponse, apiNoMatchingThresholdResponse, flattenedData, api404 }
Loading

0 comments on commit 334065f

Please sign in to comment.