From 73612b6bdd7937a85fbfa7f6211ae301a32cf592 Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Sun, 5 Jan 2025 11:37:57 +0300 Subject: [PATCH] Update PR and adress review comments --- cloudbuild.yaml | 49 ++++++----- .../compose-controller-spark-sql-single.yaml | 7 ++ docker/config/application.yaml | 3 +- docker/config_fhir_sink/application.yaml | 59 ------------- docker/config_fhir_sink/flink-conf.yaml | 31 ------- .../hapi-postgres-config_local.json | 9 -- .../controller_spark_sql_validation.sh | 88 ++++++------------- .../fhir/analytics/ConvertResourceFn.java | 2 +- .../fhir/analytics/FetchSearchPageFn.java | 9 +- .../google/fhir/analytics/FhirEtlOptions.java | 10 ++- pipelines/controller/config/application.yaml | 6 +- .../google/fhir/analytics/DataProperties.java | 23 ++--- 12 files changed, 85 insertions(+), 211 deletions(-) delete mode 100644 docker/config_fhir_sink/application.yaml delete mode 100644 docker/config_fhir_sink/flink-conf.yaml delete mode 100644 docker/config_fhir_sink/hapi-postgres-config_local.json diff --git a/cloudbuild.yaml b/cloudbuild.yaml index b89899fcb..9b91474a3 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -189,23 +189,20 @@ steps: '-c', 'CREATE DATABASE views;'] waitFor: ['Turn down FHIR Sink Server Search'] -- name: 'docker/compose' - id: 'Launch HAPI FHIR Sink Server Controller' - args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server-controller', 'up','--force-recreate', '-d' ] - env: - - SINK_SERVER_NAME=sink-server-controller - - SINK_SERVER_PORT=9001 - waitFor: ['Create views database'] - - name: 'docker/compose' id: 'Bring up controller and Spark containers' env: - PIPELINE_CONFIG=/workspace/docker/config - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh - - FHIRDATA_SINKFHIRSERVERURL=http://sink-server-controller:8080/fhir + - FHIRDATA_SINKFHIRSERVERURL= + - FHIRDATA_GENERATEPARQUETFILES=true + - FHIRDATA_NUMTHREADS=-1 + - FHIRDATA_CREATEHIVERESOURCETABLES=true + - FHIRDATA_CREATEPARQUETVIEWS=true + - FHIRDATA_SINKDBCONFIGPATH=config/hapi-postgres-config_local_views.json args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up', '--force-recreate', '-d' ] - waitFor: ['Launch HAPI FHIR Sink Server Controller'] + waitFor: ['Create views database'] - name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}' id: 'Run E2E Test for Dockerized Controller and Spark Thriftserver' @@ -224,39 +221,47 @@ steps: - name: 'docker/compose' id: 'Bring down controller and Spark containers for FHIR server to FHIR server sync' args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] - -# Resetting Sink FHIR server -- name: 'docker/compose' - id: 'Turn down HAPI Sink FHIR Server for FHIR server to FHIR server sync' - args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v'] + waitFor: ['Run E2E Test for Dockerized Controller and Spark Thriftserver'] - name: 'docker/compose' - id: 'Launch Sink FHIR Server for FHIR server to FHIR server sync' - args: ['-f', './docker/sink-compose.yml', 'up', '--force-recreate', '-d'] + id: 'Launch HAPI FHIR Sink Server Controller' + args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server-controller', 'up','--force-recreate', '-d' ] + env: + - SINK_SERVER_NAME=sink-server-controller + - SINK_SERVER_PORT=9001 + waitFor: ['Bring down controller and Spark containers for FHIR server to FHIR server sync'] # Spinning up only the pipeline controller for FHIR server to FHIR server sync - name: 'docker/compose' - id: 'Bring up only the pipeline controller for FHIR server to FHIR server sync' + id: 'Bring up the pipeline controller' env: - - PIPELINE_CONFIG=/workspace/docker/config_fhir_sink + - PIPELINE_CONFIG=/workspace/docker/config - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh + - FHIRDATA_SINKFHIRSERVERURL=http://sink-server-controller:8080/fhir + - FHIRDATA_GENERATEPARQUETFILES=false + - FHIRDATA_NUMTHREADS=1 + - FHIRDATA_CREATEHIVERESOURCETABLES=false + - FHIRDATA_CREATEPARQUETVIEWS=false + - FHIRDATA_SINKDBCONFIGPATH= args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up', '--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ] + waitFor: ['Launch HAPI FHIR Sink Server Controller'] - name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}' id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync' + waitFor: ['Bring up the pipeline controller'] env: - DWH_TYPE="FHIR" - name: 'docker/compose' - id: 'Bring down controller and Spark containers' + id: 'Bring down controller' args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] - waitFor: ['Run E2E Test for Dockerized Controller and Spark Thriftserver'] + waitFor: ['Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync'] - name: 'docker/compose' id: 'Turn down HAPI Source Server' args: [ '-f', './docker/hapi-compose.yml', 'down' ] - waitFor: ['Bring down controller and Spark containers'] + waitFor: ['Bring down controller'] - name: 'docker/compose' id: 'Turn down FHIR Sink Server Controller for e2e tests' diff --git a/docker/compose-controller-spark-sql-single.yaml b/docker/compose-controller-spark-sql-single.yaml index 83896375e..bb769ed0b 100644 --- a/docker/compose-controller-spark-sql-single.yaml +++ b/docker/compose-controller-spark-sql-single.yaml @@ -62,6 +62,13 @@ services: - ${DWH_ROOT}:/dwh environment: - JAVA_OPTS=$JAVA_OPTS + # This is to turn this on in e2e but leave it off in the default config. + - FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL + - FHIRDATA_GENERATEPARQUETFILES=$FHIRDATA_GENERATEPARQUETFILES + - FHIRDATA_NUMTHREADS=$FHIRDATA_NUMTHREADS + - FHIRDATA_CREATEHIVERESOURCETABLES=$FHIRDATA_CREATEHIVERESOURCETABLES + - FHIRDATA_CREATEPARQUETVIEWS=$FHIRDATA_CREATEPARQUETVIEWS + - FHIRDATA_SINKDBCONFIGPATH=$FHIRDATA_SINKDBCONFIGPATH ports: - '8090:8080' networks: diff --git a/docker/config/application.yaml b/docker/config/application.yaml index 77eab9270..23e5e0f53 100644 --- a/docker/config/application.yaml +++ b/docker/config/application.yaml @@ -26,8 +26,7 @@ fhirdata: # fhirServerUrl: "http://hapi-server:8080/fhir" dbConfig: "config/hapi-postgres-config_local.json" dwhRootPrefix: "/dwh/controller_DWH" - #Whether to create a Parquet DWH or not - createParquetDwh: true + generateParquetFiles: true incrementalSchedule: "0 0 * * * *" purgeSchedule: "0 30 * * * *" numOfDwhSnapshotsToRetain: 2 diff --git a/docker/config_fhir_sink/application.yaml b/docker/config_fhir_sink/application.yaml deleted file mode 100644 index 942a6c325..000000000 --- a/docker/config_fhir_sink/application.yaml +++ /dev/null @@ -1,59 +0,0 @@ -# -# Copyright 2020-2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# See `pipelines/controller/config/application.yaml` for full documentation -# of these options. -# This config is meant to be used by `compose-controller-spark-sql.yaml`. -fhirdata: - # 172.17.0.1 is an example docker network interface ip address; - # `hapi-server` is another docker example where a container with that name is - # running on the same docker network. - # fhirServerUrl: "http://172.17.0.1:8091/fhir" - # fhirServerUrl: "http://hapi-server:8080/fhir" - dbConfig: "config/hapi-postgres-config_local.json" - dwhRootPrefix: "/dwh/controller_DWH" - #Whether to create a Parquet DWH or not - createParquetDwh: false - incrementalSchedule: "0 0 * * * *" - purgeSchedule: "0 30 * * * *" - numOfDwhSnapshotsToRetain: 2 - # There is no Questionnaire in our test FHIR server, but it is added to - # prevent regression of https://github.com/google/fhir-data-pipes/issues/785. - # TODO: add resource table creation to e2e tests. - resourceList: "Patient,Encounter,Observation,Questionnaire,Condition,Practitioner,Location,Organization,DiagnosticReport,Immunization,MedicationRequest,PractitionerRole,Procedure" - numThreads: 1 - autoGenerateFlinkConfiguration: true - createHiveResourceTables: false - #thriftserverHiveConfig: "config/thriftserver-hive-config_local.json" - #hiveResourceViewsDir: "config/views" - # structureDefinitionsPath: "config/profile-definitions" - structureDefinitionsPath: "classpath:/r4-us-core-definitions" - fhirVersion: "R4" - rowGroupSizeForParquetFiles: 33554432 # 32mb - #viewDefinitionsDir: "config/views" - #sinkDbConfigPath: "config/hapi-postgres-config_local_views.json" - sinkFhirServerUrl: "http://sink-server:8080/fhir" - #sinkUserName: "hapi" - #sinkPassword: "hapi123" - recursiveDepth: 1 - -# Enable spring boot actuator end points, use "*" to expose all endpoints, or a comma-separated -# list to expose selected ones -management: - endpoints: - web: - exposure: - include: health,info,metrics,prometheus,pipeline-metrics diff --git a/docker/config_fhir_sink/flink-conf.yaml b/docker/config_fhir_sink/flink-conf.yaml deleted file mode 100644 index 109de5192..000000000 --- a/docker/config_fhir_sink/flink-conf.yaml +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# To use this config, FLINK_CONF_DIR env. var should be set to the parent dir. - -# This is needed to prevent an "Insufficient number of network buffers" -# exceptions when running the merger on large input with many workers. -taskmanager.memory.network.max: 256mb - -# This is needed to be able to process large resources, otherwise in JDBC -# mode we may get the following exception: -# "The record exceeds the maximum size of a sort buffer ..." -taskmanager.memory.managed.size: 256mb - -# This is to make pipeline.run() non-blocking with FlinkRunner; unfortunately -# this is overwritten in `local` mode: https://stackoverflow.com/a/74416240 -execution.attached: false - -# This is required to track the pipeline metrics when FlinkRunner is used. -execution.job-listeners: com.google.fhir.analytics.metrics.FlinkJobListener diff --git a/docker/config_fhir_sink/hapi-postgres-config_local.json b/docker/config_fhir_sink/hapi-postgres-config_local.json deleted file mode 100644 index 743efc6bb..000000000 --- a/docker/config_fhir_sink/hapi-postgres-config_local.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "jdbcDriverClass": "org.postgresql.Driver", - "databaseService" : "postgresql", - "databaseHostName" : "hapi-fhir-db", - "databasePort" : "5432", - "databaseUser" : "admin", - "databasePassword" : "admin", - "databaseName" : "hapi" -} diff --git a/e2e-tests/controller-spark/controller_spark_sql_validation.sh b/e2e-tests/controller-spark/controller_spark_sql_validation.sh index 52780c782..485da97b8 100755 --- a/e2e-tests/controller-spark/controller_spark_sql_validation.sh +++ b/e2e-tests/controller-spark/controller_spark_sql_validation.sh @@ -222,8 +222,6 @@ function wait_for_completion() { ####################################################################### function check_parquet() { local isIncremental=$1 - local runtime="5 minute" - local end_time=$(date -ud "$runtime" +%s) local output="${HOME_PATH}/${PARQUET_SUBDIR}" TOTAL_VIEW_PATIENTS=106 @@ -237,40 +235,8 @@ function check_parquet() { TOTAL_TEST_OBS=$((2*TOTAL_TEST_OBS)) fi - - while [[ $(date -u +%s) -le $end_time ]] - do - # check whether output directory has started receiving parquet files. - if [[ "$(ls -A $output)" ]] - then - local total_patients=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \ - "${output}/*/Patient/" | awk '{print $3}') - local total_encounters=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \ - "${output}/*/Encounter/" | awk '{print $3}') - local total_observations=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \ - "${output}/*/Observation/" | awk '{print $3}') - - print_message "Total patients: $total_patients" - print_message "Total encounters: $total_encounters" - print_message "Total observations: $total_observations" - - if [[ "${total_patients}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters}" \ - == "${TOTAL_TEST_ENCOUNTERS}" && "${total_observations}" == "${TOTAL_TEST_OBS}" ]] \ - ; then - print_message "Pipeline transformation successfully completed." - timeout=false - break - else - sleep 10 - fi - fi - done - - if [[ "${timeout}" == "true" ]] - # check whether output directory has received parquet files. if [[ "$(ls -A $output)" ]] - then local total_patients=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \ "${output}/*/Patient/" | awk '{print $3}') @@ -451,6 +417,26 @@ function validate_updated_resource() { } +function validate_updated_resource_in_fhir_sink() { + local fhir_username="hapi" + local fhir_password="hapi" + local fhir_url_extension="/fhir" + + # Fetch the patient resource using the Patient ID + local updated_family_name=$(curl -X GET -H "Content-Type: application/json; charset=utf-8" -u $fhir_username:$fhir_password \ + --connect-timeout 5 --max-time 20 "${SINK_FHIR_SERVER_URL}${fhir_url_extension}/Patient/${PATIENT_ID}" \ + | jq -r '.name[0].family') + + if [[ "${updated_family_name}" == "Anderson" ]] + then + print_message "Updated Patient data for ${PATIENT_ID} in FHIR sink verified successfully." + else + print_message "Updated Patient data verification for ${PATIENT_ID} in FHIR sink failed." + exit 6 + fi +} + + ################################################# # Function that counts resources in FHIR server and compares output to what is # in the source FHIR server @@ -493,7 +479,7 @@ setup "$@" fhir_source_query sleep 30 run_pipeline "FULL" - +wait_for_completion if [[ "${DWH_TYPE}" == "PARQUET" ]] then check_parquet false @@ -503,30 +489,17 @@ else test_fhir_sink "FULL" fi -wait_for_completion -check_parquet false -test_fhir_sink "FULL" - - clear add_resource update_resource # Incremental run. run_pipeline "INCREMENTAL" - +wait_for_completion if [[ "${DWH_TYPE}" == "PARQUET" ]] then check_parquet true -else - fhir_source_query - # Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server - sleep 300 - test_fhir_sink "INCREMENTAL" -fi -if [[ "${DWH_TYPE}" == "PARQUET" ]] -then validate_resource_tables validate_resource_tables_data validate_updated_resource @@ -534,18 +507,13 @@ then # View recreation run # TODO add validation for the views as well run_pipeline "VIEWS" +else + # Provide enough Buffer time for INCREMENTAL pipeline to completely run before testing the sink FHIR server + sleep 200 + fhir_source_query + test_fhir_sink "INCREMENTAL" + validate_updated_resource_in_fhir_sink fi -wait_for_completion -check_parquet true -fhir_source_query -test_fhir_sink "INCREMENTAL" - -validate_resource_tables -validate_resource_tables_data -validate_updated_resource - - - print_message "END!!" \ No newline at end of file diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java index a416c6ecc..553a8b363 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java index 2dde5f08e..1423194cb 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import ca.uhn.fhir.parser.IParser; import com.cerner.bunsen.exception.ProfileException; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig; import com.google.fhir.analytics.model.DatabaseConfiguration; import com.google.fhir.analytics.view.ViewApplicationException; @@ -87,7 +88,7 @@ abstract class FetchSearchPageFn extends DoFn> { protected final String parquetFile; - protected final Boolean createParquetDwh; + protected final Boolean generateParquetFiles; private final int secondsToFlush; @@ -136,7 +137,7 @@ abstract class FetchSearchPageFn extends DoFn> { this.oAuthClientSecret = options.getFhirServerOAuthClientSecret(); this.stageIdentifier = stageIdentifier; this.parquetFile = options.getOutputParquetPath(); - this.createParquetDwh = options.isCreateParquetDwh(); + this.generateParquetFiles = options.isGenerateParquetFiles(); this.secondsToFlush = options.getSecondsToFlushParquetFiles(); this.rowGroupSize = options.getRowGroupSizeForParquetFiles(); if (DATAFLOW_RUNNER.equals(options.getRunner().getSimpleName())) { @@ -212,7 +213,7 @@ public void setup() throws SQLException, ProfileException { oAuthClientSecret, fhirContext); fhirSearchUtil = new FhirSearchUtil(fetchUtil); - if (createParquetDwh) { + if (generateParquetFiles && !Strings.isNullOrEmpty(parquetFile)) { parquetUtil = new ParquetUtil( fhirContext.getVersion().getVersion(), diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java index 8b1a386cc..b1f6a306f 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -264,11 +264,13 @@ public interface FhirEtlOptions extends BasePipelineOptions { @Default.String("") String getSourceNdjsonFilePatternList(); - @Description("Flag to switch off/on creation of parquet files; can be turned off when syncing from a FHIR server to another.") + @Description( + "Flag to switch off/on generation of parquet files; can be turned off when syncing from a" + + " FHIR server to another.") @Default.Boolean(true) - Boolean isCreateParquetDwh(); + Boolean isGenerateParquetFiles(); - void setCreateParquetDwh(Boolean value); + void setGenerateParquetFiles(Boolean value); void setSourceNdjsonFilePatternList(String value); } diff --git a/pipelines/controller/config/application.yaml b/pipelines/controller/config/application.yaml index 52cb5bb53..edfd55471 100644 --- a/pipelines/controller/config/application.yaml +++ b/pipelines/controller/config/application.yaml @@ -77,9 +77,9 @@ fhirdata: # that directory too, such that files created by the pipelines are readable by # the Thrift Server, e.g., `setfacl -d -m o::rx dwh/`. dwhRootPrefix: "dwh/controller_DEV_DWH" - # Whether to create a Parquet DWH or not. In case of syncing from a FHIR server to another, if Parquet files are not needed, their generation can be switched off by this flag. - # generation of parquet DWH could be switched off/on - createParquetDwh: true + # Whether to generate Parquet Files or not. In case of syncing from a FHIR server to another, + # if Parquet files are not needed, their generation can be switched off by this flag. + generateParquetFiles: true # The schedule for automatic incremental pipeline runs. # Uses the Spring CronExpression format, i.e., diff --git a/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java b/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java index 5102cdcb1..1a24f33b0 100644 --- a/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java +++ b/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -119,7 +119,7 @@ public class DataProperties { private int recursiveDepth; - private boolean createParquetDwh; + private boolean generateParquetFiles; @PostConstruct void validateProperties() { @@ -137,20 +137,10 @@ void validateProperties() { Preconditions.checkArgument( !Strings.isNullOrEmpty(dwhRootPrefix), "dwhRootPrefix is required!"); - - if (!Strings.isNullOrEmpty(dbConfig)) { - if (!Strings.isNullOrEmpty(fhirServerUrl)) { - logger.warn("Both fhirServerUrl and dbConfig are set; ignoring fhirServerUrl!"); - } - logger.info("Using JDBC mode since dbConfig is set."); - } else { - // This should always be true because of the first Precondition. - Preconditions.checkArgument(!Strings.isNullOrEmpty(fhirServerUrl)); - logger.info("Using FHIR-search mode since dbConfig is not set."); - } Preconditions.checkState(fhirVersion != null, "FhirVersion cannot be empty"); Preconditions.checkState(!createHiveResourceTables || !thriftserverHiveConfig.isEmpty()); - Preconditions.checkState(!createHiveResourceTables || createParquetDwh); + Preconditions.checkState(!createHiveResourceTables || generateParquetFiles); + Preconditions.checkState(!createParquetViews || generateParquetFiles); } private PipelineConfig.PipelineConfigBuilder addFlinkOptions(FhirEtlOptions options) { @@ -228,7 +218,7 @@ PipelineConfig createBatchOptions() { String timestampSuffix = DwhFiles.safeTimestampSuffix(); options.setOutputParquetPath(dwhRootPrefix + DwhFiles.TIMESTAMP_PREFIX + timestampSuffix); - options.setCreateParquetDwh(createParquetDwh); + options.setGenerateParquetFiles(generateParquetFiles); PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options); @@ -249,7 +239,8 @@ List getConfigParams() { "fhirdata.fhirFetchMode", fhirFetchMode != null ? fhirFetchMode.name() : "", "", ""), new ConfigFields("fhirdata.fhirServerUrl", fhirServerUrl, "", ""), new ConfigFields("fhirdata.dwhRootPrefix", dwhRootPrefix, "", ""), - new ConfigFields("fhirdata.createParquetDwh", String.valueOf(createParquetDwh), "", ""), + new ConfigFields( + "fhirdata.generateParquetFiles", String.valueOf(generateParquetFiles), "", ""), new ConfigFields("fhirdata.incrementalSchedule", incrementalSchedule, "", ""), new ConfigFields("fhirdata.purgeSchedule", purgeSchedule, "", ""), new ConfigFields(