Skip to content

Commit

Permalink
Platform/angela/14116/read hl7 (#16859)
Browse files Browse the repository at this point in the history
* [WIP] Still working on tests. Fixed HL7AckUtilsTest and moved isBatch() into the companion object in HL7Reader.

* [WIP] Tests still broken, fixed FHIRBundleHelpersTests by using default parsing to get the message type instead of RS specific parsing

* [WIP] Fixed HL7Reader tests and marked a couple things in HL7Reader as deprecated because they are only being used in the CLI and should probably not be used elsewhere. Removing useages from CLI was out of scope for this ticket.

* [WIP] Added error handling to HL7MessageHelpers for batching files

* Dealing with merge conflicts

* More effort to deal with merge conflicts

* One more little leftover from merge conflicts

* Fixed some more references to getMessages() that came in from main and added some HL7Reader tests back in with modifications

* Minor fix to HL7MessageHelpers

* added requested helper function

* removed unnecessary test

* Removed MessageType validation from SubmissionReceiver

* Removed HL7MessageParseAndConvertConfiguration

* Modified ProcessFhirCommands to remove things from HL7Reader that were only otherwise part of the getMessages() saga

* removed deprecation mark from ProcessFhirCommands
  • Loading branch information
adegolier authored Jan 8, 2025
1 parent e0509fe commit aeb20c7
Show file tree
Hide file tree
Showing 19 changed files with 316 additions and 699 deletions.
2 changes: 0 additions & 2 deletions prime-router/src/main/kotlin/SettingsProvider.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonValue
import gov.cdc.prime.router.CustomerStatus.ACTIVE
import gov.cdc.prime.router.CustomerStatus.INACTIVE
import gov.cdc.prime.router.CustomerStatus.TESTING
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.validation.IItemValidator
import gov.cdc.prime.router.validation.MarsOtcElrOnboardingValidator
import gov.cdc.prime.router.validation.MarsOtcElrValidator
Expand Down Expand Up @@ -55,7 +54,6 @@ enum class Topic(
val isUniversalPipeline: Boolean = true,
val isSendOriginal: Boolean = false,
val validator: IItemValidator = NoopItemValidator(),
val hl7ParseConfiguration: HL7Reader.Companion.HL7MessageParseAndConvertConfiguration? = null,
) {
FULL_ELR("full-elr", true, false),
ETOR_TI("etor-ti", true, false),
Expand Down
17 changes: 7 additions & 10 deletions prime-router/src/main/kotlin/SubmissionReceiver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import gov.cdc.prime.router.azure.ReportWriter
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.fhirengine.engine.FhirConvertQueueMessage
import gov.cdc.prime.router.fhirengine.engine.MessageType
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.fhirengine.utils.HL7MessageHelpers
import gov.cdc.prime.router.fhirengine.utils.HL7Reader

/**
Expand Down Expand Up @@ -268,14 +268,14 @@ class UniversalPipelineReceiver : SubmissionReceiver {

when (sender.format) {
MimeFormat.HL7 -> {
val messages = HL7Reader(actionLogs).getMessages(content)
val isBatch = HL7Reader(actionLogs).isBatch(content, messages.size)
// create a Report for this incoming HL7 message to use for tracking in the database
val messageCount = HL7MessageHelpers.messageCount(content)
val isBatch = HL7Reader.isBatch(content, messageCount)

// create a Report for this incoming HL7 message to use for tracking in the database
report = Report(
if (isBatch) MimeFormat.HL7_BATCH else MimeFormat.HL7,
sources,
messages.size,
messageCount,
metadata = metadata,
nextAction = TaskAction.convert,
topic = sender.topic,
Expand All @@ -290,11 +290,8 @@ class UniversalPipelineReceiver : SubmissionReceiver {
// actionLogs
// )
// }

// check for valid message type
messages.forEachIndexed {
idx, element ->
MessageType.validateMessageType(element, actionLogs, idx + 1)
if (messageCount == 0 && !actionLogs.hasErrors()) {
actionLogs.error(InvalidReportMessage("Unable to find HL7 messages in provided data."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import com.google.common.net.HttpHeaders
import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
import com.microsoft.azure.functions.HttpStatus
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Sender
import gov.cdc.prime.router.azure.HttpUtilities
import gov.cdc.prime.router.azure.HttpUtilities.Companion.isSuccessful
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7ACKUtils
import gov.cdc.prime.router.fhirengine.utils.HL7MessageHelpers
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.history.DetailedSubmissionHistory
import org.apache.logging.log4j.kotlin.Logging
Expand Down Expand Up @@ -106,12 +106,11 @@ class SubmissionResponseBuilder(
contentType == HttpUtilities.hl7V2MediaType &&
requestBody != null
) {
val hl7Reader = HL7Reader(ActionLogger())
val messages = hl7Reader.getMessages(requestBody)
val isBatch = hl7Reader.isBatch(requestBody, messages.size)
val messageCount = HL7MessageHelpers.messageCount(requestBody)
val isBatch = HL7Reader.isBatch(requestBody, messageCount)

if (!isBatch && messages.size == 1) {
val message = messages.first()
if (!isBatch && messageCount == 1) {
val message = HL7Reader.parseHL7Message(requestBody)
val acceptAcknowledgementType = HL7Reader.getAcceptAcknowledgmentType(message)
val ackResponseRequired = acceptAcknowledgmentTypeRespondValues.contains(acceptAcknowledgementType)
if (ackResponseRequired) {
Expand Down
18 changes: 6 additions & 12 deletions prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ class ProcessFhirCommands : CliktCommand(
(isCli && outputFormat == MimeFormat.HL7.toString()) ||
(
receiver != null &&
(receiver.format == MimeFormat.HL7 || receiver.format == MimeFormat.HL7_BATCH)
)
(receiver.format == MimeFormat.HL7 || receiver.format == MimeFormat.HL7_BATCH)
)
) -> {
val (bundle2, inputMessage) = convertHl7ToFhir(contents, receiver)

Expand Down Expand Up @@ -297,7 +297,7 @@ class ProcessFhirCommands : CliktCommand(
}
}

private fun evaluateReceiverFilters(receiver: Receiver?, messageOrBundle: MessageOrBundle, isCli: Boolean) {
private fun evaluateReceiverFilters(receiver: Receiver?, messageOrBundle: MessageOrBundle, isCli: Boolean) {
if (receiver != null && messageOrBundle.bundle != null) {
val reportStreamFilters = mutableListOf<Pair<String, ReportStreamFilter>>()
reportStreamFilters.add(Pair("Jurisdictional Filter", receiver.jurisdictionalFilter))
Expand Down Expand Up @@ -522,10 +522,8 @@ class ProcessFhirCommands : CliktCommand(
// However, the library used to encode the HL7 message throws an error it there are more than 4 encoding
// characters, so this work around exists for that scenario
val stringToEncode = hl7String.replace("MSH|^~\\&#|", "MSH|^~\\&|")
val hl7message = HL7Reader.parseHL7Message(
stringToEncode,
null
)
val hl7message = HL7Reader.parseHL7Message(stringToEncode)

// if a hl7 parsing failure happens, throw error and show the message
if (hl7message.toString().lowercase().contains("failed")) {
throw CliktError("HL7 parser failure. $hl7message")
Expand All @@ -534,12 +532,8 @@ class ProcessFhirCommands : CliktCommand(
val msh = hl7message.get("MSH") as Segment
Terser.set(msh, 2, 0, 1, 1, "^~\\&#")
}
val hl7profile = HL7Reader.getMessageProfile(hl7message.toString())
// search hl7 profile map and create translator with config path if found
var fhirMessage = when (val configPath = HL7Reader.profileDirectoryMap[hl7profile]) {
null -> HL7toFhirTranslator(inputSchema).translate(hl7message)
else -> HL7toFhirTranslator(configPath).translate(hl7message)
}
var fhirMessage = HL7toFhirTranslator(inputSchema).translate(hl7message)

val stamper = ConditionStamper(LookupTableConditionMapper(Metadata.getInstance()))
fhirMessage.getObservations().forEach { observation ->
Expand Down
13 changes: 9 additions & 4 deletions prime-router/src/main/kotlin/cli/ProcessHl7Commands.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package gov.cdc.prime.router.cli

import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.core.CliktError
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.file
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.cli.helpers.HL7DiffHelper
import gov.cdc.prime.router.fhirengine.utils.HL7Reader

Expand Down Expand Up @@ -45,9 +45,14 @@ class ProcessHl7Commands : CliktCommand(
val comparisonFile = comparisonFile.inputStream().readBytes().toString(Charsets.UTF_8)
if (comparisonFile.isBlank()) throw CliktError("File ${this.comparisonFile.absolutePath} is empty.")

val actionLogger = ActionLogger()
val starterMessages = HL7Reader(actionLogger).getMessages(starterFile)
val comparisonMessages = HL7Reader(actionLogger).getMessages(comparisonFile)
val starterMessages = Hl7InputStreamMessageStringIterator(starterFile.byteInputStream()).asSequence()
.map { rawItem ->
HL7Reader.parseHL7Message(rawItem)
}.toList()
val comparisonMessages = Hl7InputStreamMessageStringIterator(comparisonFile.byteInputStream()).asSequence()
.map { rawItem ->
HL7Reader.parseHL7Message(rawItem)
}.toList()

starterMessages.forEachIndexed { counter, message ->
val differences = hl7DiffHelper.diffHl7(message, comparisonMessages[counter])
Expand Down
39 changes: 13 additions & 26 deletions prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ import gov.cdc.prime.router.fhirengine.translation.hl7.FhirTransformer
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.FhirPathUtils
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.fhirengine.utils.HL7Reader.Companion.parseHL7Message
import gov.cdc.prime.router.fhirengine.utils.getObservations
import gov.cdc.prime.router.fhirengine.utils.getRSMessageType
import gov.cdc.prime.router.fhirengine.utils.isElr
import gov.cdc.prime.router.logging.LogMeasuredTime
import gov.cdc.prime.router.report.ReportService
import gov.cdc.prime.router.validation.IItemValidator
Expand Down Expand Up @@ -397,7 +395,7 @@ class FHIRConverter(
)
}

FHIREngineRunResult(
FHIREngineRunResult(
routeEvent,
report,
blobInfo.blobUrl,
Expand Down Expand Up @@ -427,7 +425,7 @@ class FHIRConverter(
report,
TaskAction.convert,
"Submitted report was either empty or could not be parsed into HL7"
) {
) {
parentReportId(input.reportId)
params(
mapOf(
Expand Down Expand Up @@ -479,7 +477,7 @@ class FHIRConverter(
"format" to format.name
)
) {
getBundlesFromRawHL7(rawReport, validator, input.topic.hl7ParseConfiguration)
getBundlesFromRawHL7(rawReport, validator)
}
} catch (ex: ParseFailureError) {
actionLogger.error(
Expand Down Expand Up @@ -571,7 +569,6 @@ class FHIRConverter(
private fun getBundlesFromRawHL7(
rawReport: String,
validator: IItemValidator,
hL7MessageParseAndConvertConfiguration: HL7Reader.Companion.HL7MessageParseAndConvertConfiguration?,
): List<IProcessedItem<Message>> {
val itemStream =
Hl7InputStreamMessageStringIterator(rawReport.byteInputStream()).asSequence()
Expand All @@ -580,17 +577,16 @@ class FHIRConverter(
}.toList()

return maybeParallelize(itemStream.size, itemStream.stream(), "Generating FHIR bundles in").map { item ->
parseHL7Item(item, hL7MessageParseAndConvertConfiguration)
parseHL7Item(item)
}.map { item ->
validateAndConvertHL7Item(item, validator, hL7MessageParseAndConvertConfiguration)
validateAndConvertHL7Item(item, validator)
}.collect(Collectors.toList())
}

private fun parseHL7Item(
item: ProcessedHL7Item,
hL7MessageParseAndConvertConfiguration: HL7Reader.Companion.HL7MessageParseAndConvertConfiguration?,
) = try {
val message = parseHL7Message(item.rawItem, hL7MessageParseAndConvertConfiguration)
val message = parseHL7Message(item.rawItem)
item.updateParsed(message)
} catch (e: HL7Exception) {
item.updateParsed(
Expand All @@ -605,20 +601,11 @@ class FHIRConverter(
private fun validateAndConvertHL7Item(
item: ProcessedHL7Item,
validator: IItemValidator,
hL7MessageParseAndConvertConfiguration: HL7Reader.Companion.HL7MessageParseAndConvertConfiguration?,
): ProcessedHL7Item = if (item.parsedItem != null) {
val validationResult = validator.validate(item.parsedItem)
if (validationResult.isValid()) {
try {
val bundle = when (hL7MessageParseAndConvertConfiguration) {
null -> HL7toFhirTranslator.getHL7ToFhirTranslatorInstance().translate(item.parsedItem)
else ->
HL7toFhirTranslator
.getHL7ToFhirTranslatorInstance(
hL7MessageParseAndConvertConfiguration.hl7toFHIRMappingLocation
)
.translate(item.parsedItem)
}
val bundle = HL7toFhirTranslator.getHL7ToFhirTranslatorInstance().translate(item.parsedItem)
item.setBundle(bundle)
} catch (ex: Exception) {
item.setConversionError(
Expand Down Expand Up @@ -759,13 +746,13 @@ class FHIRConverter(
* transformer in tests.
*/
fun getTransformerFromSchema(schemaName: String): FhirTransformer? = if (schemaName.isNotBlank()) {
withLoggingContext(mapOf("schemaName" to schemaName)) {
logger.info("Apply a sender transform to the items in the report")
}
FhirTransformer(schemaName)
} else {
null
withLoggingContext(mapOf("schemaName" to schemaName)) {
logger.info("Apply a sender transform to the items in the report")
}
FhirTransformer(schemaName)
} else {
null
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package gov.cdc.prime.router.fhirengine.translation

import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import com.azure.storage.blob.models.BlobItem
import fhirengine.engine.CustomFhirPathFunctions
import fhirengine.engine.CustomTranslationFunctions
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Hl7Configuration
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig
Expand Down Expand Up @@ -41,7 +41,6 @@ class TranslationSchemaManager : Logging {
Regex("/$previousValidBlobName-$timestampRegex")
private val previousPreviousValidBlobNameRegex =
Regex("/$previousPreviousValidBlobName-$timestampRegex")
private val hL7Reader = HL7Reader(ActionLogger())

/**
* Container class that holds the current state for a schema type in a particular azure store.
Expand Down Expand Up @@ -440,7 +439,11 @@ class TranslationSchemaManager : Logging {
)
).validate(
inputBundle,
hL7Reader.getMessages(rawValidationInput.output)[0]
HL7Reader.parseHL7Message(
Hl7InputStreamMessageStringIterator(rawValidationInput.output.byteInputStream())
.asSequence()
.first()
),
)
}
}
Expand Down
21 changes: 17 additions & 4 deletions prime-router/src/main/kotlin/fhirengine/utils/HL7MessageHelpers.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gov.cdc.prime.router.fhirengine.utils

import ca.uhn.hl7v2.AbstractHL7Exception
import ca.uhn.hl7v2.model.v251.datatype.DTM
import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import ca.uhn.hl7v2.util.Terser
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Hl7Configuration
Expand All @@ -24,6 +26,8 @@ object HL7MessageHelpers : Logging {
*/
const val hl7SegmentDelimiter = "\r"

val actionLogger = ActionLogger()

/**
* Generate a HL7 Batch file from the list of [hl7RawMsgs] for the given [receiver]. The [hl7RawMsgs] are expected
* to be real HL7 messages at this point, so we will not validate their contents here for performance reasons.
Expand All @@ -34,12 +38,17 @@ object HL7MessageHelpers : Logging {
val useBatchHeaders = receiver.translation.useBatchHeaders
// Grab the first message to extract some data if not set in the settings
val firstMessage = if (hl7RawMsgs.isNotEmpty()) {
val messages = HL7Reader(ActionLogger()).getMessages(hl7RawMsgs[0])
if (messages.isEmpty()) {
try {
val message = HL7Reader.parseHL7Message(hl7RawMsgs[0])
Terser(message)
} catch (exception: Hl7InputStreamMessageStringIterator.ParseFailureError) {
logger.warn("Unable to extract batch header values from HL7: ${hl7RawMsgs[0].take(80)} ...")
HL7Reader.logHL7ParseFailure(exception, actionLogger)
null
} catch (exception: AbstractHL7Exception) {
logger.warn("Unable to extract batch header values from HL7: ${hl7RawMsgs[0].take(80)} ...")
HL7Reader.recordError(exception, actionLogger)
null
} else {
Terser(messages[0])
}
} else {
null
Expand Down Expand Up @@ -94,4 +103,8 @@ object HL7MessageHelpers : Logging {

return builder.toString()
}

fun messageCount(rawHl7: String): Int {
return Hl7InputStreamMessageStringIterator(rawHl7.byteInputStream()).asSequence().count()
}
}
Loading

0 comments on commit aeb20c7

Please sign in to comment.