From 5b10fe7cb57e962de097f6a6ff714ce4e7ffc425 Mon Sep 17 00:00:00 2001 From: Michael Kalish Date: Thu, 31 Oct 2024 07:51:30 -0400 Subject: [PATCH] fixup! 16148: send reports from the submissions directly to FHIR Convert --- .../event/ReportStreamEventBuilder.kt | 8 +- .../event/ReportStreamEventService.kt | 22 +- .../kotlin/fhirengine/azure/FHIRFunctions.kt | 12 + .../kotlin/fhirengine/engine/FHIRConverter.kt | 201 +++++++++------- .../kotlin/fhirengine/engine/FHIREngine.kt | 3 +- .../common/UniversalPipelineTestUtils.kt | 3 + .../azure/FHIRConverterIntegrationTests.kt | 225 ++++++++++++++++-- 7 files changed, 368 insertions(+), 106 deletions(-) diff --git a/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventBuilder.kt b/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventBuilder.kt index f7ed5a649e3..1f3898ac88d 100644 --- a/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventBuilder.kt +++ b/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventBuilder.kt @@ -167,6 +167,7 @@ open class ReportStreamItemEventBuilder( private var theParentItemIndex = 1 private var theChildIndex = 1 private var theTrackingId: String? = null + private var theSender: String? = null fun trackingId(bundle: Bundle) { theTrackingId = AzureEventUtils.getIdentifier(bundle).value @@ -180,6 +181,10 @@ open class ReportStreamItemEventBuilder( theChildIndex = childItemIndex } + fun sender(sender: String) { + theSender = sender + } + protected fun getItemEventData(): ItemEventData { if (theParentReportId == null) { throw IllegalStateException("Parent Report ID must be set to generate an ItemEvent") @@ -188,7 +193,8 @@ open class ReportStreamItemEventBuilder( theChildIndex, theParentReportId!!, theParentItemIndex, - theTrackingId + theTrackingId, + theSender ) } diff --git a/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventService.kt b/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventService.kt index 42509ce0748..25f4af95eb4 100644 --- a/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventService.kt +++ b/prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventService.kt @@ -202,6 +202,7 @@ interface IReportStreamEventService { parentReportId: UUID, parentItemIndex: Int, trackingId: String?, + senderString: String? = null, ): ItemEventData } @@ -400,11 +401,15 @@ class ReportStreamEventService( val submittedReportIds = if (parentReportId != null) { val rootReports = reportService.getRootReports(parentReportId) rootReports.ifEmpty { - listOf(dbAccess.fetchReportFile(parentReportId)) + try { + listOf(dbAccess.fetchReportFile(parentReportId)) + } catch (ex: IllegalStateException) { + emptyList() + } } } else { emptyList() - }.map { it.reportId } + }.map { it.reportId }.ifEmpty { if (parentReportId != null) listOf(parentReportId) else emptyList() } return ReportEventData( childReportId, @@ -423,16 +428,23 @@ class ReportStreamEventService( parentReportId: UUID, parentItemIndex: Int, trackingId: String?, + senderString: String?, ): ItemEventData { val submittedIndex = reportService.getRootItemIndex(parentReportId, parentItemIndex) ?: parentItemIndex - val rootReport = - reportService.getRootReports(parentReportId).firstOrNull() ?: dbAccess.fetchReportFile(parentReportId) + val sender = if (senderString != null) { + senderString + } else { + val rootReport = + reportService.getRootReports(parentReportId).firstOrNull() ?: dbAccess.fetchReportFile(parentReportId) + "${rootReport.sendingOrg}.${rootReport.sendingOrgClient}" + } + return ItemEventData( childItemIndex, parentItemIndex, submittedIndex, trackingId, - "${rootReport.sendingOrg}.${rootReport.sendingOrgClient}" + sender ) } } \ No newline at end of file diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index 6adfb4922cc..88f053e06aa 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -25,6 +25,7 @@ import gov.cdc.prime.router.fhirengine.engine.FHIRTranslator import gov.cdc.prime.router.fhirengine.engine.FhirReceiveQueueMessage import gov.cdc.prime.router.fhirengine.engine.PrimeRouterQueueMessage import gov.cdc.prime.router.fhirengine.engine.ReportPipelineMessage +import gov.cdc.prime.router.fhirengine.engine.SubmissionSenderNotFound import org.apache.commons.lang3.StringUtils import org.apache.logging.log4j.kotlin.Logging import org.jooq.exception.DataAccessException @@ -166,6 +167,17 @@ class FHIRFunctions( // DB connectivity issues that are resolved without intervention logger.error(ex) throw ex + } catch (ex: SubmissionSenderNotFound) { + logger.error(ex) + val tableEntity = Submission( + ex.reportId.toString(), + "Rejected", + ex.blobURL, + actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString() + ) + submissionTableService.insertSubmission(tableEntity) + queueAccess.sendMessage("${messageContent.messageQueueName}-poison", message) + return emptyList() } catch (ex: Exception) { // We're catching anything else that occurs because the most likely cause is a code or configuration error // that will not be resolved if the message is automatically retried diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt index e5e8e4fc447..b74c5fd5b24 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt @@ -13,7 +13,6 @@ import fhirengine.engine.ProcessedFHIRItem import fhirengine.engine.ProcessedHL7Item import gov.cdc.prime.reportstream.shared.BlobUtils import gov.cdc.prime.reportstream.shared.QueueMessage -import gov.cdc.prime.reportstream.shared.Submission import gov.cdc.prime.router.ActionLogDetail import gov.cdc.prime.router.ActionLogScope import gov.cdc.prime.router.ActionLogger @@ -24,6 +23,7 @@ import gov.cdc.prime.router.Metadata import gov.cdc.prime.router.MimeFormat import gov.cdc.prime.router.Options import gov.cdc.prime.router.Report +import gov.cdc.prime.router.Sender import gov.cdc.prime.router.SettingsProvider import gov.cdc.prime.router.Topic import gov.cdc.prime.router.UnmappableConditionMessage @@ -34,7 +34,6 @@ import gov.cdc.prime.router.azure.DatabaseAccess import gov.cdc.prime.router.azure.Event import gov.cdc.prime.router.azure.LookupTableConditionMapper import gov.cdc.prime.router.azure.ProcessEvent -import gov.cdc.prime.router.azure.SubmissionTableService import gov.cdc.prime.router.azure.db.Tables import gov.cdc.prime.router.azure.db.enums.TaskAction import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor @@ -80,17 +79,101 @@ class FHIRConverter( blob: BlobAccess = BlobAccess(), azureEventService: AzureEventService = AzureEventServiceImpl(), reportService: ReportService = ReportService(), - private val submissionTableService: SubmissionTableService = SubmissionTableService.getInstance(), ) : FHIREngine(metadata, settings, db, blob, azureEventService, reportService) { override val finishedField: Field = Tables.TASK.PROCESSED_AT override val engineType: String = "Convert" - private val clientIdHeader = "client_id" - override val taskAction: TaskAction = TaskAction.convert + data class FHIRConvertInput( + val reportId: UUID, + val topic: Topic, + val schemaName: String, + val blobURL: String, + val blobDigest: String, + val blobSubFolderName: String, + val isExternalReport: Boolean, + val sender: Sender? = null, + ) { + fun getBlobInfo(): BlobAccess.BlobInfo { + val format = Report.getFormatFromBlobURL(blobURL) + return BlobAccess.BlobInfo(format, blobURL, blobDigest.toByteArray()) + } + + companion object { + + private val clientIdHeader = "client_id" + fun fromFhirConvertQueueMessage( + message: FhirConvertQueueMessage, + actionHistory: ActionHistory, + ): FHIRConvertInput { + val reportId = message.reportId + val topic = message.topic + val schemaName = message.schemaName + val blobUrl = message.blobURL + val blobDigest = message.digest + val blobSubFolderName = message.blobSubFolderName + actionHistory.trackExistingInputReport(reportId) + return FHIRConvertInput( + reportId, + topic, + schemaName, + blobUrl, + blobDigest, + blobSubFolderName, + isExternalReport = false + ) + } + + fun fromFHIRReceiveQueueMessage( + message: FhirReceiveQueueMessage, + actionHistory: ActionHistory, + settings: SettingsProvider, + ): FHIRConvertInput { + val reportId = message.reportId + val blobUrl = message.blobURL + val blobDigest = message.digest + val blobSubFolderName = message.blobSubFolderName + + val clientId = message.headers[clientIdHeader] + val sender = clientId?.takeIf { it.isNotBlank() }?.let { settings.findSender(it) } + if (sender == null) { + throw SubmissionSenderNotFound("No such sender $clientId", reportId, blobUrl) + } + val topic = sender.topic + val schemaName = sender.schemaName + + val format = Report.getFormatFromBlobURL(blobUrl) + val report = Report( + sender.format, + listOf(ClientSource(organization = sender.organizationName, client = sender.name)), + 1, + nextAction = TaskAction.convert, + topic = sender.topic, + id = reportId, + bodyURL = blobUrl + ) + actionHistory.trackExternalInputReport( + report, + BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray()) + ) + + return FHIRConvertInput( + reportId, + topic, + schemaName, + blobUrl, + blobDigest, + blobSubFolderName, + isExternalReport = true, + sender = sender + ) + } + } + } + /** * Accepts a [message] in either HL7 or FHIR format * HL7 messages will be converted into FHIR. @@ -105,64 +188,18 @@ class FHIRConverter( actionHistory: ActionHistory, ): List = when (message) { is FhirConvertQueueMessage -> { - val reportId = message.reportId - val topic = message.topic - val schemaName = message.schemaName - val blobUrl = message.blobURL - val blobDigest = message.digest - val blobSubFolderName = message.blobSubFolderName - actionHistory.trackExistingInputReport(reportId) + val input = FHIRConvertInput.fromFhirConvertQueueMessage(message, actionHistory) + fhirEngineRunResults( - reportId, - topic, - blobUrl, - blobDigest, - schemaName, - blobSubFolderName, + input, actionLogger, actionHistory ) } is FhirReceiveQueueMessage -> { - val clientId = message.headers[clientIdHeader] - val sender = clientId?.takeIf { it.isNotBlank() }?.let { settings.findSender(it) } - if (sender == null) { - val tableEntity = Submission( - message.reportId.toString(), - "Rejected", - message.blobURL, - actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString() - ) - submissionTableService.insertSubmission(tableEntity) - throw RuntimeException("No such sender $clientId") - } - val reportId = message.reportId - - val blobUrl = message.blobURL - val blobDigest = message.digest - val blobSubFolderName = message.blobSubFolderName - - val topic = sender.topic - val schemaName = sender.schemaName - val report = Report( - sender.format, - listOf(ClientSource(organization = sender.organizationName, client = sender.name)), - 1, - metadata = metadata, - nextAction = TaskAction.destination_filter, - topic = sender.topic, - id = reportId, - bodyURL = blobUrl - ) - val blobInfo = BlobAccess.BlobInfo(MimeFormat.FHIR, blobUrl, blobDigest.toByteArray()) - actionHistory.trackExternalInputReport(report, blobInfo) + val input = FHIRConvertInput.fromFHIRReceiveQueueMessage(message, actionHistory, settings) fhirEngineRunResults( - reportId, - topic, - blobUrl, - blobDigest, - schemaName, - blobSubFolderName, + input, actionLogger, actionHistory ) @@ -175,24 +212,19 @@ class FHIRConverter( } private fun fhirEngineRunResults( - reportId: UUID, - topic: Topic, - blobURL: String, - blobDigest: String, - schemaName: String, - blobSubFolderName: String, + input: FHIRConvertInput, actionLogger: ActionLogger, actionHistory: ActionHistory, ): List { val contextMap = mapOf( MDCUtils.MDCProperty.ACTION_NAME to actionHistory.action.actionName.name, - MDCUtils.MDCProperty.REPORT_ID to reportId, - MDCUtils.MDCProperty.TOPIC to topic, - MDCUtils.MDCProperty.BLOB_URL to blobURL + MDCUtils.MDCProperty.REPORT_ID to input.reportId, + MDCUtils.MDCProperty.TOPIC to input.topic, + MDCUtils.MDCProperty.BLOB_URL to input.blobURL ) withLoggingContext(contextMap) { - actionLogger.setReportId(reportId) - val format = Report.getFormatFromBlobURL(blobURL) + actionLogger.setReportId(input.reportId) + val format = Report.getFormatFromBlobURL(input.blobURL) logger.info("Starting FHIR Convert step") // This line is a workaround for a defect in the hapi-fhir library @@ -206,7 +238,7 @@ class FHIRConverter( // TODO: https://github.com/CDCgov/prime-reportstream/issues/14287 FhirPathUtils - val processedItems = process(format, blobURL, blobDigest, topic, actionLogger) + val processedItems = process(format, input.blobURL, input.blobDigest, input.topic, actionLogger) // processedItems can be empty in three scenarios: // - the blob had no contents, i.e. an empty file was submitted @@ -217,7 +249,7 @@ class FHIRConverter( "Applied sender transform and routed" ) { val transformer = getTransformerFromSchema( - schemaName + input.schemaName ) maybeParallelize( @@ -233,10 +265,10 @@ class FHIRConverter( MimeFormat.FHIR, emptyList(), parentItemLineageData = listOf( - Report.ParentItemLineageData(reportId, itemIndex.toInt() + 1) + Report.ParentItemLineageData(input.reportId, itemIndex.toInt() + 1) ), metadata = this.metadata, - topic = topic, + topic = input.topic, nextAction = TaskAction.none ) val noneEvent = ProcessEvent( @@ -254,13 +286,13 @@ class FHIRConverter( TaskAction.convert, processedItem.validationError!!.message, ) { - parentReportId(reportId) + parentReportId(input.reportId) parentItemIndex(itemIndex.toInt() + 1) params( mapOf( ReportStreamEventProperties.ITEM_FORMAT to format, ReportStreamEventProperties.VALIDATION_PROFILE - to topic.validator.validatorProfileName + to input.topic.validator.validatorProfileName ) ) } @@ -276,10 +308,10 @@ class FHIRConverter( MimeFormat.FHIR, emptyList(), parentItemLineageData = listOf( - Report.ParentItemLineageData(reportId, itemIndex.toInt() + 1) + Report.ParentItemLineageData(input.reportId, itemIndex.toInt() + 1) ), metadata = this.metadata, - topic = topic, + topic = input.topic, nextAction = TaskAction.destination_filter ) @@ -298,7 +330,7 @@ class FHIRConverter( MimeFormat.FHIR, bodyBytes, report.id.toString(), - blobSubFolderName, + input.blobSubFolderName, routeEvent.eventAction ) report.bodyURL = blobInfo.blobUrl @@ -321,7 +353,10 @@ class FHIRConverter( report, TaskAction.convert ) { - parentReportId(reportId) + if (input.isExternalReport) { + sender(input.sender?.fullName ?: "") + } + parentReportId(input.reportId) parentItemIndex(itemIndex.toInt() + 1) trackingId(bundle) params( @@ -341,8 +376,8 @@ class FHIRConverter( report.id, blobInfo.blobUrl, BlobUtils.digestToString(blobInfo.digest), - blobSubFolderName, - topic + input.blobSubFolderName, + input.topic ) ) } @@ -354,7 +389,7 @@ class FHIRConverter( emptyList(), 0, metadata = this.metadata, - topic = topic, + topic = input.topic, nextAction = TaskAction.none ) actionHistory.trackEmptyReport(report) @@ -364,7 +399,7 @@ class FHIRConverter( TaskAction.convert, "Submitted report was either empty or could not be parsed into HL7" ) { - parentReportId(reportId) + parentReportId(input.reportId) params( mapOf( ReportStreamEventProperties.ITEM_FORMAT to format @@ -701,4 +736,8 @@ class FHIRConverter( } else { null } +} + +class SubmissionSenderNotFound(senderId: String, val reportId: UUID, val blobURL: String) : RuntimeException() { + override val message = "No sender was found for: $senderId" } \ No newline at end of file diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt index f731264c7b5..b45a1ce06cc 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt @@ -135,8 +135,7 @@ abstract class FHIREngine( databaseAccess ?: databaseAccessSingleton, blobAccess ?: BlobAccess(), azureEventService ?: AzureEventServiceImpl(), - reportService ?: ReportService(), - submissionTableService = submissionTableService ?: SubmissionTableService.getInstance() + reportService ?: ReportService() ) TaskAction.destination_filter -> FHIRDestinationFilter( metadata ?: Metadata.getInstance(), diff --git a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt index ba0393617e2..45f03b9514e 100644 --- a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt +++ b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt @@ -284,6 +284,7 @@ object UniversalPipelineTestUtils { txn: DataAccessTransaction, expectedItems: Int? = null, expectedReports: Int = 1, + parentIsRoot: Boolean = false, ): List { val itemLineages = DSL .using(txn) @@ -299,6 +300,8 @@ object UniversalPipelineTestUtils { // itemCount is on the report created by the test. It will not be null. if (parent.itemCount > 1) { assertThat(itemLineages.map { it.parentIndex }).isEqualTo((1..expectedItems).toList()) + } else if (parentIsRoot) { + assertThat(itemLineages.map { it.parentIndex }).isEqualTo((1..expectedItems).toList()) } else { assertThat(itemLineages.map { it.parentIndex }).isEqualTo(MutableList(expectedItems) { 1 }) } diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt index d64426d6d25..8be72bf279c 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt @@ -9,6 +9,7 @@ import assertk.assertions.isEqualToIgnoringGivenProperties import assertk.assertions.matchesPredicate import gov.cdc.prime.reportstream.shared.BlobUtils import gov.cdc.prime.reportstream.shared.QueueMessage +import gov.cdc.prime.router.ClientSource import gov.cdc.prime.router.FileSettings import gov.cdc.prime.router.Metadata import gov.cdc.prime.router.MimeFormat @@ -72,6 +73,7 @@ import gov.cdc.prime.router.metadata.LookupTable import gov.cdc.prime.router.metadata.ObservationMappingConstants import gov.cdc.prime.router.unittest.UnitTestUtils import gov.cdc.prime.router.version.Version +import io.ktor.client.utils.EmptyContent.headers import io.mockk.every import io.mockk.mockk import io.mockk.mockkConstructor @@ -89,6 +91,7 @@ import tech.tablesaw.api.StringColumn import tech.tablesaw.api.Table import java.nio.charset.Charset import java.time.OffsetDateTime +import java.util.UUID @Testcontainers @ExtendWith(ReportStreamTestDatabaseSetupExtension::class) @@ -103,6 +106,7 @@ class FHIRConverterIntegrationTests { ) val azureEventService = InMemoryAzureEventService() + val mockSubmissionTableService = mockk() private fun createFHIRFunctionsInstance(): FHIRFunctions { val settings = FileSettings().loadOrganizations(universalPipelineOrganization) @@ -116,10 +120,11 @@ class FHIRConverterIntegrationTests { .settingsProvider(settings) .databaseAccess(ReportStreamTestDatabaseContainer.testDatabaseAccess) .build() + return FHIRFunctions( workflowEngine, databaseAccess = ReportStreamTestDatabaseContainer.testDatabaseAccess, - submissionTableService = mockk() + submissionTableService = mockSubmissionTableService ) } @@ -133,21 +138,15 @@ class FHIRConverterIntegrationTests { metadata, settings, ReportStreamTestDatabaseContainer.testDatabaseAccess, - azureEventService = azureEventService, - submissionTableService = mockk() + azureEventService = azureEventService ) } - private fun generateQueueMessage( + private fun generateFHIRConvertQueueMessage( report: Report, blobContents: String, sender: Sender, - headers: Map? = null, ): String { - val headersString = headers?.entries?.joinToString(separator = ",\n") { (key, value) -> - """"$key": "$value"""" - } ?: "" - return """ { "type": "convert", @@ -157,7 +156,29 @@ class FHIRConverterIntegrationTests { "blobSubFolderName": "${sender.fullName}", "topic": "${sender.topic.jsonVal}", "schemaName": "${sender.schemaName}" - ${if (headersString.isNotEmpty()) ",\n$headersString" else ""} + } + """.trimIndent() + } + + private fun generateFHIRReceiveQueueMessage( + report: Report, + blobContents: String, + sender: Sender, + ): String { + // TODO: something is wrong with the Jackson configuration as it should not require the type to parse this + val headers = mapOf("client_id" to sender.fullName) + val headersStringMap = headers.entries.joinToString(separator = ",\n") { (key, value) -> + """"$key": "$value"""" + } + val headersString = "[\"java.util.LinkedHashMap\",{$headersStringMap}]" + return """ + { + "type": "receive-fhir", + "reportId": "${report.id}", + "blobURL": "${report.bodyURL}", + "digest": "${BlobUtils.digestToString(BlobUtils.sha256Digest(blobContents.toByteArray()))}", + "blobSubFolderName": "${sender.fullName}", + "headers":$headersString } """.trimIndent() } @@ -171,6 +192,7 @@ class FHIRConverterIntegrationTests { mockkObject(BlobAccess.BlobContainerMetadata) every { BlobAccess.BlobContainerMetadata.build(any(), any()) } returns getBlobContainerMetadata() mockkConstructor(DatabaseLookupTableAccess::class) + every { mockSubmissionTableService.insertSubmission(any()) } returns Unit } @AfterEach @@ -246,6 +268,174 @@ class FHIRConverterIntegrationTests { } } + @Test + fun `should successfully process a FHIRReceiveQueueMessage`() { + val receivedReportContents = + listOf(cleanHL7Record, invalidHL7Record, unparseableHL7Record, badEncodingHL7Record) + .joinToString("\n") + val receiveBlobUrl = BlobAccess.uploadBlob( + "receive/happy-path.hl7", + receivedReportContents.toByteArray(), + getBlobContainerMetadata() + ) + + val receiveReport = Report( + hl7SenderWithNoTransform.format, + listOf( + ClientSource( + organization = hl7SenderWithNoTransform.organizationName, + client = hl7SenderWithNoTransform.name + ) + ), + 1, + nextAction = TaskAction.convert, + topic = hl7SenderWithNoTransform.topic, + id = UUID.randomUUID(), + bodyURL = receiveBlobUrl + ) + val queueMessage = + generateFHIRReceiveQueueMessage(receiveReport, receivedReportContents, hl7SenderWithNoTransform) + val fhirFunctions = createFHIRFunctionsInstance() + + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) + + ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> + val (routedReports, unroutedReports) = fetchChildReports( + receiveReport, txn, 4, 4, parentIsRoot = true + ).partition { it.nextAction != TaskAction.none } + assertThat(routedReports).hasSize(2) + routedReports.forEach { + assertThat(it.nextAction).isEqualTo(TaskAction.destination_filter) + assertThat(it.receivingOrg).isEqualTo(null) + assertThat(it.receivingOrgSvc).isEqualTo(null) + assertThat(it.schemaName).isEqualTo("None") + assertThat(it.schemaTopic).isEqualTo(Topic.FULL_ELR) + assertThat(it.bodyFormat).isEqualTo("FHIR") + } + assertThat(unroutedReports).hasSize(2) + unroutedReports.forEach { + assertThat(it.nextAction).isEqualTo(TaskAction.none) + assertThat(it.receivingOrg).isEqualTo(null) + assertThat(it.receivingOrgSvc).isEqualTo(null) + assertThat(it.schemaName).isEqualTo("None") + assertThat(it.schemaTopic).isEqualTo(Topic.FULL_ELR) + assertThat(it.bodyFormat).isEqualTo("FHIR") + } + // Verify that the expected FHIR bundles were uploaded + val reportAndBundles = + routedReports.map { + Pair( + it, + BlobAccess.downloadBlobAsByteArray(it.bodyUrl, getBlobContainerMetadata()) + ) + } + + assertThat(reportAndBundles).transform { pairs -> pairs.map { it.second } }.each { + it.matchesPredicate { bytes -> + val invalidHL7Result = CompareData().compare( + cleanHL7RecordConverted.byteInputStream(), + bytes.inputStream(), + MimeFormat.FHIR, + null + ) + invalidHL7Result.passed + + val cleanHL7Result = CompareData().compare( + invalidHL7RecordConverted.byteInputStream(), + bytes.inputStream(), + MimeFormat.FHIR, + null + ) + invalidHL7Result.passed || cleanHL7Result.passed + } + } + + val expectedQueueMessages = reportAndBundles.map { (report, fhirBundle) -> + FhirDestinationFilterQueueMessage( + report.reportId, + report.bodyUrl, + BlobUtils.digestToString(BlobUtils.sha256Digest(fhirBundle)), + hl7SenderWithNoTransform.fullName, + hl7SenderWithNoTransform.topic + ) + }.map { it.serialize() } + + verify(exactly = 2) { + QueueAccess.sendMessage( + QueueMessage.elrDestinationFilterQueueName, + match { expectedQueueMessages.contains(it) } + ) + } + + val actionLogs = DSL.using(txn).select(Tables.ACTION_LOG.asterisk()).from(Tables.ACTION_LOG) + .where(Tables.ACTION_LOG.REPORT_ID.eq(receiveReport.id)) + .and(Tables.ACTION_LOG.TYPE.eq(ActionLogType.error)) + .fetchInto( + DetailedActionLog::class.java + ) + + assertThat(actionLogs).hasSize(2) + @Suppress("ktlint:standard:max-line-length") + assertThat(actionLogs).transform { logs -> logs.map { it.detail.message } } + .containsOnly( + "Item 3 in the report was not parseable. Reason: exception while parsing HL7: Determine encoding for message. The following is the first 50 chars of the message for reference, although this may not be where the issue is: MSH^~\\&|CDC PRIME - Atlanta, Georgia (Dekalb)^2.16", + "Item 4 in the report was not parseable. Reason: exception while parsing HL7: Invalid or incomplete encoding characters - MSH-2 is ^~\\&#!" + ) + assertThat(actionLogs).transform { + it.map { log -> + log.trackingId + } + }.containsOnly( + "", + "" + ) + + assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_ACCEPTED]!!).hasSize(2) + val event = + azureEventService + .reportStreamEvents[ReportStreamEventName.ITEM_ACCEPTED]!!.last() as ReportStreamItemEvent + assertThat(event.reportEventData).isEqualToIgnoringGivenProperties( + ReportEventData( + routedReports[1].reportId, + receiveReport.id, + listOf(receiveReport.id), + Topic.FULL_ELR, + routedReports[1].bodyUrl, + TaskAction.convert, + OffsetDateTime.now(), + Version.commitId + ), + ReportEventData::timestamp + ) + assertThat(event.itemEventData).isEqualToIgnoringGivenProperties( + ItemEventData( + 1, + 2, + 2, + "371784", + "phd.hl7-elr-no-transform" + ) + ) + assertThat(event.params).isEqualTo( + mapOf( + ReportStreamEventProperties.ITEM_FORMAT to MimeFormat.HL7, + ReportStreamEventProperties.BUNDLE_DIGEST to BundleDigestLabResult( + observationSummaries = AzureEventUtils + .getObservationSummaries( + FhirTranscoder.decode( + reportAndBundles[1].second.toString(Charset.defaultCharset()) + ) + ), + patientState = listOf("TX"), + orderingFacilityState = listOf("FL"), + performerState = emptyList(), + eventType = "ORU^R01^ORU_R01" + ) + ) + ) + } + } + @Test fun `should successfully convert HL7 messages`() { val receivedReportContents = @@ -258,7 +448,8 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7SenderWithNoTransform, receiveBlobUrl, 4) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7SenderWithNoTransform) + val queueMessage = + generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7SenderWithNoTransform) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -439,7 +630,7 @@ class FHIRConverterIntegrationTests { MimeFormat.FHIR, fhirSenderWithNoTransform, receiveBlobUrl, 4 ) - val queueMessage = generateQueueMessage( + val queueMessage = generateFHIRConvertQueueMessage( receiveReport, receivedReportContents, fhirSenderWithNoTransform ) @@ -585,7 +776,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, senderWithValidation, receiveBlobUrl, 2) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, senderWithValidation) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, senderWithValidation) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -713,7 +904,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 2) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -794,7 +985,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -823,7 +1014,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) @@ -852,7 +1043,7 @@ class FHIRConverterIntegrationTests { ) val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1) - val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val queueMessage = generateFHIRConvertQueueMessage(receiveReport, receivedReportContents, hl7Sender) val fhirFunctions = createFHIRFunctionsInstance() fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert))