Skip to content

Commit

Permalink
Merge branch 'develop' into ignore_convention
Browse files Browse the repository at this point in the history
  • Loading branch information
fdefalco authored Dec 20, 2023
2 parents 2eb9a79 + e953a7e commit 19104a6
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 44 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export(buildNetworkUnmappedSourceCodeIndex)
export(computeCharacterizationDifference)
export(getIgnoredReleases)
export(getIgnoredSources)
export(exportToParquet)
export(getSourceReleaseKey)
import(data.table)
import(dplyr)
Expand Down
85 changes: 58 additions & 27 deletions R/AugmentConceptFiles.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

#' Augment Concept Files
#' @name Augment Concept Files
#'
#' @details
#' Adds Data Quality and Temporal details to concept data files
#' @details Adds Data Quality and Temporal details to concept data files
#'
#' @param releaseFolder Folder containing a specific release of a data source
#'
Expand All @@ -30,57 +29,89 @@
#'

#' @export
augmentConceptFiles <- function(releaseFolder) {
augmentConceptFiles <- function(releaseFolder, format) {
library("DBI")
dataQualityResultsFile <- file.path(releaseFolder, "dq-result.json")
duckdbCon <- NULL
if (format == "duckdb") {
duckdbCon <- DBI::dbConnect(duckdb::duckdb(), dbdir = file.path(releaseFolder, "concepts", 'data.duckdb'), read_only = FALSE)
table <- tbl(duckdbCon, DBI::Id(schema = "concepts", table = "concept_metadata"))
table <- as.data.frame(table)
table <- table %>% mutate(COUNT_FAILED = NA, IS_STATIONARY = NA, SEASONALITY_SCORE = NA)
}

if (file.exists(dataQualityResultsFile)) {
writeLines("updating concept files with data quality results")
dataQualityResults <- jsonlite::fromJSON(dataQualityResultsFile)
results <- dataQualityResults$CheckResults

# augment achilles concept files with data quality failure count for relevant concept checks
conceptAggregates <- results %>% filter(!is.na(results$conceptId) & results$failed==1) %>% count(conceptId,tolower(cdmTableName))
names(conceptAggregates) <- c("concept_id","cdm_table_name", "count_failed")
conceptAggregates <- results %>%
filter(!is.na(results$conceptId) & results$failed == 1) %>%
count(conceptId, tolower(cdmTableName))
names(conceptAggregates) <- c("concept_id", "cdm_table_name", "count_failed")
writeLines(paste0(nrow(conceptAggregates), " concept level data quality issues found."))
if (nrow(conceptAggregates) > 0) {
for (row in 1:nrow(conceptAggregates)) {
print(trimws(conceptAggregates[row, "concept_id"]))
concept_id <- trimws(conceptAggregates[row, "concept_id"])
count_failed <- conceptAggregates[row, "count_failed"]

writeLines(paste0(row, "/", nrow(conceptAggregates), " - inserting data quality results"))
conceptFileName <- paste0("concept_", trimws(conceptAggregates[row, "concept_id"]), ".json")
conceptFile <- file.path(releaseFolder, "concepts", trimws(conceptAggregates[row, "cdm_table_name"]), conceptFileName)
if (file.exists(conceptFile)) {
conceptContent <- readLines(conceptFile)
conceptData <- jsonlite::fromJSON(conceptContent)
conceptData$COUNT_FAILED <- conceptAggregates[row, "count_failed"]
conceptJson <- jsonlite::toJSON(conceptData)
write(conceptJson, conceptFile)

if (format == "duckdb") {
table <- table %>% mutate(COUNT_FAILED = ifelse(CONCEPT_ID == concept_id, count_failed, COUNT_FAILED))
DBI::dbWriteTable(duckdbCon, DBI::Id(schema = "concepts", table = "concept_metadata"), table, overwrite = TRUE)
}
if (format == "json") {
conceptFileName <- paste0("concept_", trimws(conceptAggregates[row, "concept_id"]), ".json")
conceptFile <- file.path(releaseFolder, "concepts", trimws(conceptAggregates[row, "cdm_table_name"]), conceptFileName)

if (file.exists(conceptFile)) {
conceptContent <- readLines(conceptFile)
conceptData <- jsonlite::fromJSON(conceptContent)
conceptData$COUNT_FAILED <- conceptAggregates[row, "count_failed"]
conceptJson <- jsonlite::toJSON(conceptData)
write(conceptJson, conceptFile)
}
}
}
}
} else {
writeLines(paste("missing data quality result file ",dataQualityResultsFile))
writeLines(paste("missing data quality result file ", dataQualityResultsFile))
}

temporalCharacterizationFile <- file.path(releaseFolder,"temporal-characterization.csv")
temporalCharacterizationFile <- file.path(releaseFolder, "temporal-characterization.csv")
if (file.exists(temporalCharacterizationFile)) {
temporalCharacterization <- read.csv(temporalCharacterizationFile,header = T)
temporalCharacterization <- read.csv(temporalCharacterizationFile, header = T)
writeLines(paste0(nrow(temporalCharacterization), " temporal characterization insights found."))
# augment achilles concept files with temporal characterization check results
if (nrow(temporalCharacterization) > 0) {
for (row in 1:nrow(temporalCharacterization)) {
writeLines(paste0(row, "/", nrow(temporalCharacterization), " - inserting temporal characterization details"))
conceptFileName <- paste0("concept_",trimws(temporalCharacterization[row,"CONCEPT_ID"]),".json")
conceptFile <- file.path(releaseFolder,"concepts", trimws(tolower(temporalCharacterization[row,"CDM_TABLE_NAME"])), conceptFileName)
if (file.exists(conceptFile)) {
conceptContent <- readLines(conceptFile)
conceptData <- jsonlite::fromJSON(conceptContent)
conceptData$IS_STATIONARY <- temporalCharacterization[row,"IS_STATIONARY"]
conceptData$SEASONALITY_SCORE <- temporalCharacterization[row,"SEASONALITY_SCORE"]
conceptJson <- jsonlite::toJSON(conceptData)
write(conceptJson, conceptFile)
concept_id <- temporalCharacterization[row, "CONCEPT_ID"]
is_stationary <- temporalCharacterization[row, "IS_STATIONARY"]
seasonality_score <- temporalCharacterization[row, "SEASONALITY_SCORE"]
if (format == "duckdb") {
table <- table %>% mutate(IS_STATIONARY = ifelse(CONCEPT_ID == concept_id, is_stationary, IS_STATIONARY))
table <- table %>% mutate(SEASONALITY_SCORE = ifelse(CONCEPT_ID == concept_id, seasonality_score, SEASONALITY_SCORE))
DBI::dbWriteTable(duckdbCon, DBI::Id(schema = "concepts", table = "concept_metadata"), table, overwrite = TRUE)
}
if (format == "json") {
conceptFileName <- paste0("concept_", trimws(concept_id), ".json")
conceptFile <- file.path(releaseFolder, "concepts", trimws(tolower(temporalCharacterization[row, "CDM_TABLE_NAME"])), conceptFileName)
if (file.exists(conceptFile)) {
conceptContent <- readLines(conceptFile)
conceptData <- jsonlite::fromJSON(conceptContent)
conceptData$IS_STATIONARY <- is_stationary
conceptData$SEASONALITY_SCORE <- seasonality_score
conceptJson <- jsonlite::toJSON(conceptData)
write(conceptJson, conceptFile)
}
}
}
}
} else {
writeLines(paste("missing temporal characterization data ",temporalCharacterizationFile))
writeLines(paste("missing temporal characterization data ", temporalCharacterizationFile))
}
}
80 changes: 80 additions & 0 deletions R/ExportToParquet.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# @file ExportToParquet.R
#
#
# Copyright 2021 Observational Health Data Sciences and Informatics
#
# This file is part of AresIndexer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#' Export to Parquet
#' @name exportToParquet
#'
#' @details Exports tables from DuckDB database generated by the Achilles package
#'
#' @param sourceFolders A vector of folder locations that contain the files
#' exported from Achilles in the ARES Option format (Achilles::exportAO)
#'
#'
#'
#'
#'
#'
#' @importFrom data.table data.table
#'
#' @export

library("DBI")
library("duckdb")
library("dplyr")

exportToParquet <- function(sourceFolders) {
releaseFolders <- list.dirs(sourceFolders, recursive = F)
for (releaseFolder in releaseFolders) {
print(paste0("Processing release folder ", releaseFolder))
databaseLocation <- file.path(releaseFolder, "concepts", "data.duckdb")
duckdbCon <- duckdb::dbConnect(duckdb::duckdb(), dbdir = databaseLocation, read_only = TRUE)
tableNames <- duckdb::dbListTables(duckdbCon, schema = "concepts")
sort_column <- "CONCEPT_ID"

for (tableName in tableNames) {
table <- tbl(duckdbCon, DBI::Id(schema = "concepts", table = tableName))
table <- as.data.frame(table)

if (sort_column %in% colnames(table)) {
sort_col_index <- which(colnames(table) == sort_column)
table[[sort_col_index]] <- as.integer(table[[sort_col_index]])
table <- table[order(table[[sort_col_index]]),]
} else {
warning(paste("Sorting column '", sort_column, "' not found in the table. Skipping sorting.", sep = ""))
}

filename <- paste0(tableName, ".parquet")
parquetPath <- file.path(releaseFolder, "concepts", filename)
arrow::write_parquet(
table,
version = "2.6",
parquetPath,
compression = "gzip",
compression_level = 5,
use_dictionary = TRUE,
write_statistics = TRUE,
chunk_size = 8 * 1024
)

cat("Processed:", tableName, "\n")
}
duckdb::dbDisconnect(duckdbCon, shutdown = TRUE)
}
}

17 changes: 0 additions & 17 deletions man/augmentConceptFiles.Rd

This file was deleted.

0 comments on commit 19104a6

Please sign in to comment.