Skip to content

Commit

Permalink
Fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
mozzy11 committed Jan 5, 2025
1 parent 104990a commit 84121c5
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 190 deletions.
16 changes: 13 additions & 3 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,11 @@ steps:
env:
- PIPELINE_CONFIG=/workspace/docker/config
- DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh
- FHIRDATA_SINKFHIRSERVERURL=http://sink-server-controller:8080/fhir
- FHIRDATA_SINKFHIRSERVERURL=
- FHIRDATA_GENERATEPARQUETFILES=true
- FHIRDATA_NUMTHREADS=-1
- FHIRDATA_CREATEHIVERESOURCETABLES=true
- FHIRDATA_SINKDBCONFIGPATH=config/hapi-postgres-config_local_views.json
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up',
'--force-recreate', '-d' ]
waitFor: ['Launch HAPI FHIR Sink Server Controller']
Expand Down Expand Up @@ -236,15 +240,21 @@ steps:

# Spinning up only the pipeline controller for FHIR server to FHIR server sync
- name: 'docker/compose'
id: 'Bring up only the pipeline controller for FHIR server to FHIR server sync'
id: 'Bring up the pipeline controller'
env:
- PIPELINE_CONFIG=/workspace/docker/config_fhir_sink
- PIPELINE_CONFIG=/workspace/docker/config
- DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh
- FHIRDATA_SINKFHIRSERVERURL=http://sink-server-controller:8080/fhir
- FHIRDATA_GENERATEPARQUETFILES=false
- FHIRDATA_NUMTHREADS=1
- FHIRDATA_CREATEHIVERESOURCETABLES=false
- FHIRDATA_SINKDBCONFIGPATH=
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up',
'--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ]

- name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}'
id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync'
waitFor: ['Bring up the pipeline controller']
env:
- DWH_TYPE="FHIR"

Expand Down
6 changes: 6 additions & 0 deletions docker/compose-controller-spark-sql-single.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ services:
- ${DWH_ROOT}:/dwh
environment:
- JAVA_OPTS=$JAVA_OPTS
# This is to turn this on in e2e but leave it off in the default config.
- FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL
- FHIRDATA_GENERATEPARQUETFILES=$FHIRDATA_GENERATEPARQUETFILES
- FHIRDATA_NUMTHREADS=$FHIRDATA_NUMTHREADS
- FHIRDATA_CREATEHIVERESOURCETABLES=$FHIRDATA_CREATEHIVERESOURCETABLES
- FHIRDATA_SINKDBCONFIGPATH=$FHIRDATA_SINKDBCONFIGPATH
ports:
- '8090:8080'
networks:
Expand Down
3 changes: 1 addition & 2 deletions docker/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ fhirdata:
# fhirServerUrl: "http://hapi-server:8080/fhir"
dbConfig: "config/hapi-postgres-config_local.json"
dwhRootPrefix: "/dwh/controller_DWH"
#Whether to create a Parquet DWH or not
createParquetDwh: true
generateParquetFiles: true
incrementalSchedule: "0 0 * * * *"
purgeSchedule: "0 30 * * * *"
numOfDwhSnapshotsToRetain: 2
Expand Down
59 changes: 0 additions & 59 deletions docker/config_fhir_sink/application.yaml

This file was deleted.

31 changes: 0 additions & 31 deletions docker/config_fhir_sink/flink-conf.yaml

This file was deleted.

9 changes: 0 additions & 9 deletions docker/config_fhir_sink/hapi-postgres-config_local.json

This file was deleted.

70 changes: 8 additions & 62 deletions e2e-tests/controller-spark/controller_spark_sql_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ function run_pipeline() {
}

function wait_for_completion() {
local runtime="15 minute"
local runtime="5 minute"
local end_time=$(date -ud "$runtime" +%s)

while [[ $(date -u +%s) -le $end_time ]]
Expand Down Expand Up @@ -222,8 +222,6 @@ function wait_for_completion() {
#######################################################################
function check_parquet() {
local isIncremental=$1
local runtime="5 minute"
local end_time=$(date -ud "$runtime" +%s)
local output="${HOME_PATH}/${PARQUET_SUBDIR}"
TOTAL_VIEW_PATIENTS=106

Expand All @@ -237,40 +235,8 @@ function check_parquet() {
TOTAL_TEST_OBS=$((2*TOTAL_TEST_OBS))
fi


while [[ $(date -u +%s) -le $end_time ]]
do
# check whether output directory has started receiving parquet files.
if [[ "$(ls -A $output)" ]]
then
local total_patients=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Patient/" | awk '{print $3}')
local total_encounters=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Encounter/" | awk '{print $3}')
local total_observations=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Observation/" | awk '{print $3}')

print_message "Total patients: $total_patients"
print_message "Total encounters: $total_encounters"
print_message "Total observations: $total_observations"

if [[ "${total_patients}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters}" \
== "${TOTAL_TEST_ENCOUNTERS}" && "${total_observations}" == "${TOTAL_TEST_OBS}" ]] \
; then
print_message "Pipeline transformation successfully completed."
timeout=false
break
else
sleep 10
fi
fi
done

if [[ "${timeout}" == "true" ]]

# check whether output directory has received parquet files.
if [[ "$(ls -A $output)" ]]

then
local total_patients=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
"${output}/*/Patient/" | awk '{print $3}')
Expand Down Expand Up @@ -493,7 +459,7 @@ setup "$@"
fhir_source_query
sleep 30
run_pipeline "FULL"

wait_for_completion
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet false
Expand All @@ -503,49 +469,29 @@ else
test_fhir_sink "FULL"
fi

wait_for_completion
check_parquet false
test_fhir_sink "FULL"


clear

add_resource
update_resource
# Incremental run.
run_pipeline "INCREMENTAL"

wait_for_completion
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet true
else
fhir_source_query
# Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server
sleep 300
test_fhir_sink "INCREMENTAL"
fi

if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
validate_resource_tables
validate_resource_tables_data
validate_updated_resource

# View recreation run
# TODO add validation for the views as well
run_pipeline "VIEWS"

else
fhir_source_query
# Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server
sleep 300
test_fhir_sink "INCREMENTAL"
fi

wait_for_completion
check_parquet true
fhir_source_query
test_fhir_sink "INCREMENTAL"

validate_resource_tables
validate_resource_tables_data
validate_updated_resource



print_message "END!!"
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {

protected final String parquetFile;

protected final Boolean createParquetDwh;
protected final Boolean generateParquetFiles;

private final int secondsToFlush;

Expand Down Expand Up @@ -136,7 +136,7 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {
this.oAuthClientSecret = options.getFhirServerOAuthClientSecret();
this.stageIdentifier = stageIdentifier;
this.parquetFile = options.getOutputParquetPath();
this.createParquetDwh = options.isCreateParquetDwh();
this.generateParquetFiles = options.isGenerateParquetFiles();
this.secondsToFlush = options.getSecondsToFlushParquetFiles();
this.rowGroupSize = options.getRowGroupSizeForParquetFiles();
if (DATAFLOW_RUNNER.equals(options.getRunner().getSimpleName())) {
Expand Down Expand Up @@ -212,7 +212,7 @@ public void setup() throws SQLException, ProfileException {
oAuthClientSecret,
fhirContext);
fhirSearchUtil = new FhirSearchUtil(fetchUtil);
if (createParquetDwh) {
if (generateParquetFiles) {
parquetUtil =
new ParquetUtil(
fhirContext.getVersion().getVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,13 @@ public interface FhirEtlOptions extends BasePipelineOptions {
@Default.String("")
String getSourceNdjsonFilePatternList();

@Description("Flag to switch off/on creation of parquet files; can be turned off when syncing from a FHIR server to another.")
@Description(
"Flag to switch off/on generation of parquet files; can be turned off when syncing from a"
+ " FHIR server to another.")
@Default.Boolean(true)
Boolean isCreateParquetDwh();
Boolean isGenerateParquetFiles();

void setCreateParquetDwh(Boolean value);
void setGenrateParquetFiles(Boolean value);

void setSourceNdjsonFilePatternList(String value);
}
6 changes: 3 additions & 3 deletions pipelines/controller/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ fhirdata:
# that directory too, such that files created by the pipelines are readable by
# the Thrift Server, e.g., `setfacl -d -m o::rx dwh/`.
dwhRootPrefix: "dwh/controller_DEV_DWH"
# Whether to create a Parquet DWH or not. In case of syncing from a FHIR server to another, if Parquet files are not needed, their generation can be switched off by this flag.
# generation of parquet DWH could be switched off/on
createParquetDwh: true
# Whether to generate Parquet Files or not. In case of syncing from a FHIR server to another,
# if Parquet files are not needed, their generation can be switched off by this flag.
generateParquetFiles: true

# The schedule for automatic incremental pipeline runs.
# Uses the Spring CronExpression format, i.e.,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class DataProperties {

private int recursiveDepth;

private boolean createParquetDwh;
private boolean generateParquetFiles;

@PostConstruct
void validateProperties() {
Expand All @@ -137,20 +137,9 @@ void validateProperties() {

Preconditions.checkArgument(
!Strings.isNullOrEmpty(dwhRootPrefix), "dwhRootPrefix is required!");

if (!Strings.isNullOrEmpty(dbConfig)) {
if (!Strings.isNullOrEmpty(fhirServerUrl)) {
logger.warn("Both fhirServerUrl and dbConfig are set; ignoring fhirServerUrl!");
}
logger.info("Using JDBC mode since dbConfig is set.");
} else {
// This should always be true because of the first Precondition.
Preconditions.checkArgument(!Strings.isNullOrEmpty(fhirServerUrl));
logger.info("Using FHIR-search mode since dbConfig is not set.");
}
Preconditions.checkState(fhirVersion != null, "FhirVersion cannot be empty");
Preconditions.checkState(!createHiveResourceTables || !thriftserverHiveConfig.isEmpty());
Preconditions.checkState(!createHiveResourceTables || createParquetDwh);
Preconditions.checkState(!createHiveResourceTables || generateParquetFiles);
}

private PipelineConfig.PipelineConfigBuilder addFlinkOptions(FhirEtlOptions options) {
Expand Down Expand Up @@ -228,7 +217,7 @@ PipelineConfig createBatchOptions() {
String timestampSuffix = DwhFiles.safeTimestampSuffix();
options.setOutputParquetPath(dwhRootPrefix + DwhFiles.TIMESTAMP_PREFIX + timestampSuffix);

options.setCreateParquetDwh(createParquetDwh);
options.setGenrateParquetFiles(generateParquetFiles);

PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options);

Expand All @@ -249,7 +238,8 @@ List<ConfigFields> getConfigParams() {
"fhirdata.fhirFetchMode", fhirFetchMode != null ? fhirFetchMode.name() : "", "", ""),
new ConfigFields("fhirdata.fhirServerUrl", fhirServerUrl, "", ""),
new ConfigFields("fhirdata.dwhRootPrefix", dwhRootPrefix, "", ""),
new ConfigFields("fhirdata.createParquetDwh", String.valueOf(createParquetDwh), "", ""),
new ConfigFields(
"fhirdata.generateParquetFiles", String.valueOf(generateParquetFiles), "", ""),
new ConfigFields("fhirdata.incrementalSchedule", incrementalSchedule, "", ""),
new ConfigFields("fhirdata.purgeSchedule", purgeSchedule, "", ""),
new ConfigFields(
Expand Down

0 comments on commit 84121c5

Please sign in to comment.