From b179d19c50690ec06daec7cfe04dbd998e639b28 Mon Sep 17 00:00:00 2001 From: Jaren Brownlee Date: Mon, 21 Oct 2024 15:03:10 -0500 Subject: [PATCH 1/2] fix short uuid and status race issues --- .../data_warehouse/data/file_mapper.ts | 5 ++- .../data/report_query_repository.ts | 15 ++++--- .../services/blob_storage/azure_blob_impl.ts | 31 +++++++------ .../services/blob_storage/filesystem_impl.ts | 43 +++++++++++-------- 4 files changed, 53 insertions(+), 41 deletions(-) diff --git a/server/src/data_access_layer/mappers/data_warehouse/data/file_mapper.ts b/server/src/data_access_layer/mappers/data_warehouse/data/file_mapper.ts index 4454708b..845c0f66 100644 --- a/server/src/data_access_layer/mappers/data_warehouse/data/file_mapper.ts +++ b/server/src/data_access_layer/mappers/data_warehouse/data/file_mapper.ts @@ -355,10 +355,11 @@ export default class FileMapper extends Mapper { }; } - // fetch the fully qualified file path complete with file name and short uuid + // fetch the fully qualified file path complete with file name and short uuid (if present) private filePathMetadataStatement(fileIDs: string[]): QueryConfig { const text = `SELECT id, - short_uuid || file_name AS file_name, + CASE WHEN short_uuid IS NULL THEN file_name + ELSE short_uuid || file_name END AS file_name, TRIM('/\\' FROM adapter_file_path) AS access_path FROM files WHERE id IN (%L)`; diff --git a/server/src/data_access_layer/repositories/data_warehouse/data/report_query_repository.ts b/server/src/data_access_layer/repositories/data_warehouse/data/report_query_repository.ts index 18a211a2..022b3218 100644 --- a/server/src/data_access_layer/repositories/data_warehouse/data/report_query_repository.ts +++ b/server/src/data_access_layer/repositories/data_warehouse/data/report_query_repository.ts @@ -141,6 +141,14 @@ export default class ReportQueryRepository extends Repository implements Reposit return Promise.resolve(Result.Failure(`error: unsupported or unimplemented file storage method being used`)); } + // set report status to "processing" + let statusSet = await this.#reportRepo.setStatus( + reportID, 'processing', + `executing query ${queryID}: "${reportQuery.query}" as part of report ${reportID}` + ); + if (statusSet.isError) {return Promise.resolve(Result.Failure(`unable to set report status`))} + + // kick off the describe or query process if (describe) { this.processDescribe(reportID, request.query!, storageConnection, files as FileMetadata[]); } else { @@ -154,13 +162,6 @@ export default class ReportQueryRepository extends Repository implements Reposit ); } - // set report status to "processing" - let statusSet = await this.#reportRepo.setStatus( - reportID, 'processing', - `executing query ${queryID}: "${reportQuery.query}" as part of report ${reportID}` - ); - if (statusSet.isError) {return Promise.resolve(Result.Failure(`unable to set report status`))} - // return report ID to the user so they can poll for results return Promise.resolve(Result.Success(reportID)); } diff --git a/server/src/services/blob_storage/azure_blob_impl.ts b/server/src/services/blob_storage/azure_blob_impl.ts index a63dc3b3..f6680aca 100644 --- a/server/src/services/blob_storage/azure_blob_impl.ts +++ b/server/src/services/blob_storage/azure_blob_impl.ts @@ -214,25 +214,30 @@ export default class AzureBlobImpl implements BlobStorage { } async renameFile(f: File): Promise> { - const newFileClient = this._ContainerClient.getBlockBlobClient(`${f.adapter_file_path}${f.short_uuid}${f.file_name}`); - const oldFileClient = this._ContainerClient.getBlockBlobClient(`${f.adapter_file_path}${f.file_name}${f.short_uuid}`); + // only perform the rename if uuid is present; otherwise, file should already be accessible + if (!f.short_uuid) { + return Promise.resolve(Result.Success(true)); + } else { + const newFileClient = this._ContainerClient.getBlockBlobClient(`${f.adapter_file_path}${f.short_uuid}${f.file_name}`); + const oldFileClient = this._ContainerClient.getBlockBlobClient(`${f.adapter_file_path}${f.file_name}${f.short_uuid}`); - try { - const copyPoller = await newFileClient.beginCopyFromURL(oldFileClient.url); - const copy_res = await copyPoller.pollUntilDone(); + try { + const copyPoller = await newFileClient.beginCopyFromURL(oldFileClient.url); + const copy_res = await copyPoller.pollUntilDone(); - if (copy_res._response.status === 201 || copy_res._response.status === 202) { - const delete_res = await oldFileClient.delete(); + if (copy_res._response.status === 201 || copy_res._response.status === 202) { + const delete_res = await oldFileClient.delete(); - if (delete_res._response.status === 201 || delete_res._response.status === 202) { - return Promise.resolve(Result.Success(true)); + if (delete_res._response.status === 201 || delete_res._response.status === 202) { + return Promise.resolve(Result.Success(true)); + } } + } catch (e) { + Logger.error(`azure rename blob error: ${e}`); + return Promise.resolve(Result.Success(false)); } - } catch (e) { - Logger.error(`azure rename blob error: ${e}`); + return Promise.resolve(Result.Success(false)); } - - return Promise.resolve(Result.Success(false)); } } diff --git a/server/src/services/blob_storage/filesystem_impl.ts b/server/src/services/blob_storage/filesystem_impl.ts index 1de735d4..7651d0b6 100644 --- a/server/src/services/blob_storage/filesystem_impl.ts +++ b/server/src/services/blob_storage/filesystem_impl.ts @@ -172,27 +172,32 @@ export default class Filesystem implements BlobStorage { } renameFile(f: File): Promise> { - try { - /* eslint-disable-next-line security/detect-non-literal-fs-filename -- - * TypeScript wants to guard against malicious file renaming, - * but since the rename is generated server-side and not by the end user, - * there is no security risk - **/ - const rename_res = fs.rename( - `${f.adapter_file_path}${f.file_name}${f.short_uuid}`, - `${f.adapter_file_path}${f.short_uuid}${f.file_name}`, - (err) => { - if (err) throw err; - }); - - if (rename_res === null || rename_res === undefined) { - return Promise.resolve(Result.Success(true)); + // only rename file if short uuid is present; otherwise, file should already be accessible + if (!f.short_uuid) { + return Promise.resolve(Result.Success(true)); + } else { + try { + /* eslint-disable-next-line security/detect-non-literal-fs-filename -- + * TypeScript wants to guard against malicious file renaming, + * but since the rename is generated server-side and not by the end user, + * there is no security risk + **/ + const rename_res = fs.rename( + `${f.adapter_file_path}${f.file_name}${f.short_uuid}`, + `${f.adapter_file_path}${f.short_uuid}${f.file_name}`, + (err) => { + if (err) throw err; + }); + + if (rename_res === null || rename_res === undefined) { + return Promise.resolve(Result.Success(true)); + } + } catch (e) { + Logger.error(`filesystem rename error: ${e}`); + return Promise.resolve(Result.Success(false)); } - } catch (e) { - Logger.error(`filesystem rename error: ${e}`); + return Promise.resolve(Result.Success(false)); } - - return Promise.resolve(Result.Success(false)); } } From 0935e528f6ec4d227b19fc287c33865238f25fca Mon Sep 17 00:00:00 2001 From: Jaren Brownlee Date: Thu, 24 Oct 2024 06:31:23 -0500 Subject: [PATCH 2/2] query optimizations --- .../data/report_query_mapper.ts | 22 +++- .../data/report_query_repository.ts | 107 ++++++++++++++---- .../etl/type_transformation_repository.ts | 6 - .../data_warehouse/data/report_query.ts | 21 ++++ 4 files changed, 128 insertions(+), 28 deletions(-) diff --git a/server/src/data_access_layer/mappers/data_warehouse/data/report_query_mapper.ts b/server/src/data_access_layer/mappers/data_warehouse/data/report_query_mapper.ts index e7f8a84b..a881f2ed 100644 --- a/server/src/data_access_layer/mappers/data_warehouse/data/report_query_mapper.ts +++ b/server/src/data_access_layer/mappers/data_warehouse/data/report_query_mapper.ts @@ -1,7 +1,7 @@ import Result from '../../../../common_classes/result'; import Mapper from '../../mapper'; import { PoolClient, QueryConfig } from 'pg'; -import ReportQuery from '../../../../domain_objects/data_warehouse/data/report_query'; +import ReportQuery, { CompletedQueryMatch } from '../../../../domain_objects/data_warehouse/data/report_query'; const format = require('pg-format'); @@ -88,6 +88,12 @@ export default class ReportQueryMapper extends Mapper { return super.runStatement(this.deleteStatement(id), { transaction }); } + CheckQueryExists(query: string): Promise> { + return super.retrieve(this.checkQueryExistsStatement(query), { + resultClass: CompletedQueryMatch + }); + } + // Below are a set of query building functions. So far they're very simple // and the return value is something that the postgres driver can understand. // The hope is that this method will allow us to be more flexible and create @@ -187,4 +193,18 @@ export default class ReportQueryMapper extends Mapper { values: [queryID, fileID], }; } + + private checkQueryExistsStatement(query: string): QueryConfig { + // compare a query with those in the database, sanitizing queries + // of extraneous whitespace and casing before comparison + // return { + const text = `SELECT rq.id, r.status_message + FROM report_queries rq JOIN reports r ON rq.report_id = r.id + WHERE r.status = 'completed' + AND LOWER(REGEXP_REPLACE(TRIM(rq.query), '\\s+', ' ', 'g')) = LOWER(REGEXP_REPLACE(TRIM($1), '\\s+', ' ', 'g')) + ORDER BY rq.created_at DESC LIMIT 1`; + const values = [query]; + // } + return {text, values} + } } diff --git a/server/src/data_access_layer/repositories/data_warehouse/data/report_query_repository.ts b/server/src/data_access_layer/repositories/data_warehouse/data/report_query_repository.ts index 022b3218..a9962aaf 100644 --- a/server/src/data_access_layer/repositories/data_warehouse/data/report_query_repository.ts +++ b/server/src/data_access_layer/repositories/data_warehouse/data/report_query_repository.ts @@ -1,5 +1,5 @@ import RepositoryInterface, {QueryOptions, Repository} from '../../repository'; -import ReportQuery, { TimeseriesInitialRequest } from '../../../../domain_objects/data_warehouse/data/report_query'; +import ReportQuery, { ReportQueryMetadata, TimeseriesInitialRequest } from '../../../../domain_objects/data_warehouse/data/report_query'; import Result from '../../../../common_classes/result'; import ReportQueryMapper from '../../../mappers/data_warehouse/data/report_query_mapper'; import {PoolClient} from 'pg'; @@ -72,17 +72,13 @@ export default class ReportQueryRepository extends Repository implements Reposit return Promise.resolve(Result.Success(true)); } + // perform all necessary checks before kicking off the query including verifying + // all files are timeseries and checking for previously executed/select * queries async initiateQuery(containerID: string, dataSourceID: string, request: TimeseriesInitialRequest, user: User, describe: boolean): Promise> { // check that all files exist and are timeseries, return an error if not const isTimeseries = await this.#fileRepo.checkTimeseries(request.file_ids!); if (isTimeseries.isError) {return Promise.resolve(Result.Pass(isTimeseries))} - // create a new report object - const report = new Report({container_id: containerID}); - const reportSaved = await this.#reportMapper.Create(user.id!, report); - if (reportSaved.isError) {return Promise.resolve(Result.Pass(reportSaved))} - const reportID = reportSaved.value.id! - // formulate query if describe, check for presence of table name if regular query if (describe) { const describeQueries: string[] = []; @@ -98,18 +94,70 @@ export default class ReportQueryRepository extends Repository implements Reposit } } - // create a report query based on the timeseries rust module query request + // create a new report object to return the ID if a SELECT * or repeated query is found + const reportSaved = await this.#reportMapper.Create(user.id!, new Report({container_id: containerID})); + if (reportSaved.isError) {return Promise.resolve(Result.Pass(reportSaved))} + const reportID = reportSaved.value.id! + + // check if the query text was already successfully used in a previous query + // if so return the result file from that original query + const previousQueryResults = await this.#mapper.CheckQueryExists(request.query!); + // if an error is found, simply log and move on + if (previousQueryResults.isError) { + Logger.error(previousQueryResults.error.error); + } + + if (previousQueryResults.value) { + // grab and use the previous status message for this report + void this.#reportRepo.setStatus(reportID, 'completed', previousQueryResults.value.status_message); + + return Promise.resolve(Result.Success(reportID)); + } + + // create a query object if a previous query was not found const reportQuery = new ReportQuery({query: request.query!, report_id: reportID}); const querySaved = await this.#mapper.Create(user.id!, reportQuery); if (querySaved.isError) { return Promise.resolve(Result.Pass(querySaved))} const queryID = querySaved.value.id! - // fetch file metadata - const fileInfo = await this.#fileRepo.listPathMetadata(...request.file_ids!); - if (fileInfo.isError) {return Promise.resolve(Result.Failure('unable to find file information'))} - const files = fileInfo.value; + // check if the query is a SELECT * query; if so return original file instead of querying + // verify there's only one file being queried + if (request.file_ids!.length === 1) { + const fileID = request.file_ids![0]; + // trim and case densensitize query to eliminate any syntax variance + const normalizedQuery = request.query?.trim().replace(/\s+/g, ' ').replace(';', '').toLowerCase(); + if (normalizedQuery === `select * from table_${fileID}`) { + // set the original file as the report file and return report ID + const resultSet = await this.setResultFile(reportID, queryID, fileID); + if (resultSet.isError) { + const errorMessage = `error attaching record to report ${reportID}: ${resultSet.error.error}`; + void this.#reportRepo.setStatus(reportID, 'error', errorMessage); + Logger.error(errorMessage); + } + + // if everything was successful, set the report status to completed + const successMessage = `results now available. Download them at "/containers/${containerID}/files/${fileID}/download"`; + void this.#reportRepo.setStatus(reportID, 'completed', successMessage); + return Promise.resolve(Result.Success(reportID)); + } + } + + const queryMetadata: ReportQueryMetadata = { + container_id: containerID, + data_source_id: dataSourceID, + request: request, + user: user, + report_id: reportID, + query: reportQuery, + query_id: queryID + } + + // kickoff the query itself if there are no early return scenarios + return this.kickoffQuery(queryMetadata, describe); + } - // create a connection string based on the type of storage being used + // create a connection string based on the type of storage being used + async createConnectionString(containerID: string, dataSourceID: string): Promise> { const uploadPath = `containers/${containerID}/datasources/${dataSourceID}`; let storageConnection: string; if (Config.file_storage_method === 'filesystem') { @@ -141,29 +189,46 @@ export default class ReportQueryRepository extends Repository implements Reposit return Promise.resolve(Result.Failure(`error: unsupported or unimplemented file storage method being used`)); } + return Promise.resolve(Result.Success(storageConnection)); + } + + async kickoffQuery(queryMetadata: ReportQueryMetadata, describe: boolean): Promise> { + // fetch file metadata + const fileInfo = await this.#fileRepo.listPathMetadata(...queryMetadata.request.file_ids!); + if (fileInfo.isError) {return Promise.resolve(Result.Pass(fileInfo))} + const files = fileInfo.value; + + const getConnString = await this.createConnectionString(queryMetadata.container_id, queryMetadata.data_source_id); + if (getConnString.isError) {return Promise.resolve(Result.Pass(getConnString))} + const storageConnection = getConnString.value; + // set report status to "processing" let statusSet = await this.#reportRepo.setStatus( - reportID, 'processing', - `executing query ${queryID}: "${reportQuery.query}" as part of report ${reportID}` + queryMetadata.report_id, 'processing', + `executing query ${queryMetadata.query_id}: "${queryMetadata.query.query}" as part of report ${queryMetadata.report_id}` ); if (statusSet.isError) {return Promise.resolve(Result.Failure(`unable to set report status`))} // kick off the describe or query process if (describe) { - this.processDescribe(reportID, request.query!, storageConnection, files as FileMetadata[]); + this.processDescribe( + queryMetadata.report_id, + queryMetadata.request.query!, + storageConnection, + files as FileMetadata[]); } else { this.processQuery( - reportID, - request.query!, + queryMetadata.report_id, + queryMetadata.request.query!, storageConnection, files as FileMetadata[], - queryID, - user + queryMetadata.query_id, + queryMetadata.user ); } // return report ID to the user so they can poll for results - return Promise.resolve(Result.Success(reportID)); + return Promise.resolve(Result.Success(queryMetadata.report_id)); } async processQuery( diff --git a/server/src/data_access_layer/repositories/data_warehouse/etl/type_transformation_repository.ts b/server/src/data_access_layer/repositories/data_warehouse/etl/type_transformation_repository.ts index 57cf8591..e95fa8f2 100644 --- a/server/src/data_access_layer/repositories/data_warehouse/etl/type_transformation_repository.ts +++ b/server/src/data_access_layer/repositories/data_warehouse/etl/type_transformation_repository.ts @@ -392,18 +392,12 @@ export default class TypeTransformationRepository extends Repository implements // if the data_source value for any given param is an old ID for an existing data source, replace it with the actual data source ID const backfillDataSources = (params: EdgeConnectionParameter[]) => { params.forEach(param => { - console.log('old param val', param.value); if (param.value) { // backfill old ID with new ID if present const matchedSource = dataSources.find(src => src?.DataSourceRecord?.old_id === param.value); - console.log('old id', matchedSource?.DataSourceRecord?.old_id); - console.log('new id', matchedSource?.DataSourceRecord?.id); if (matchedSource) param.value = matchedSource!.DataSourceRecord!.id!; - console.log('new param val', param.value); } else { - console.log('dsID', dataSourceID); param.value = dataSourceID; - console.log('new param val', param.value); } }) } diff --git a/server/src/domain_objects/data_warehouse/data/report_query.ts b/server/src/domain_objects/data_warehouse/data/report_query.ts index f4171549..80c6b88e 100644 --- a/server/src/domain_objects/data_warehouse/data/report_query.ts +++ b/server/src/domain_objects/data_warehouse/data/report_query.ts @@ -1,6 +1,7 @@ import { BaseDomainClass } from "../../../common_classes/base_domain_class"; import {IsArray, IsOptional, IsString} from 'class-validator'; import Report from './report'; +import { User } from "../../access_management/user"; /* ReportQuery represents a query and its execution status. @@ -39,6 +40,26 @@ export default class ReportQuery extends BaseDomainClass{ } } +// input type for passing info between query methods +export type ReportQueryMetadata = { + container_id: string; + data_source_id: string; + request: TimeseriesInitialRequest; + user: User; + report_id: string; + query: ReportQuery; + query_id: string; +} + +// domain object for the results of checkQueryExists query +export class CompletedQueryMatch { + @IsString() + query_id?: string; + + @IsString() + status_message?: string; +} + // initial object used to create request for the timeseries rust module export class TimeseriesInitialRequest { @IsOptional()