From f41ccdfd905aed7c84596f787386e8801bda50ac Mon Sep 17 00:00:00 2001 From: Mikhail-iontsev Date: Thu, 16 Nov 2023 08:24:49 +0400 Subject: [PATCH] feat: added support for duckdb --- NAMESPACE | 1 + R/AugmentConceptFiles.R | 85 ++++++++++++++++++++--------- R/ExportToParquet.R | 80 +++++++++++++++++++++++++++ man/augmentConceptFiles.Rd | 17 ------ man/buildNetworkPerformanceIndex.Rd | 3 +- 5 files changed, 141 insertions(+), 45 deletions(-) create mode 100644 R/ExportToParquet.R delete mode 100644 man/augmentConceptFiles.Rd diff --git a/NAMESPACE b/NAMESPACE index 41b9a69..b48645c 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -9,6 +9,7 @@ export(buildNetworkIndex) export(buildNetworkPerformanceIndex) export(buildNetworkUnmappedSourceCodeIndex) export(computeCharacterizationDifference) +export(exportToParquet) export(getSourceReleaseKey) import(data.table) import(dplyr) diff --git a/R/AugmentConceptFiles.R b/R/AugmentConceptFiles.R index f28b8e3..6acbda0 100644 --- a/R/AugmentConceptFiles.R +++ b/R/AugmentConceptFiles.R @@ -17,10 +17,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -#' Augment Concept Files +#' @name Augment Concept Files #' -#' @details -#' Adds Data Quality and Temporal details to concept data files +#' @details Adds Data Quality and Temporal details to concept data files #' #' @param releaseFolder Folder containing a specific release of a data source #' @@ -30,8 +29,16 @@ #' #' @export -augmentConceptFiles <- function(releaseFolder) { +augmentConceptFiles <- function(releaseFolder, format) { + library("DBI") dataQualityResultsFile <- file.path(releaseFolder, "dq-result.json") + duckdbCon <- NULL + if (format == "duckdb") { + duckdbCon <- DBI::dbConnect(duckdb::duckdb(), dbdir = file.path(releaseFolder, "concepts", 'data.duckdb'), read_only = FALSE) + table <- tbl(duckdbCon, DBI::Id(schema = "concepts", table = "concept_metadata")) + table <- as.data.frame(table) + table <- table %>% mutate(COUNT_FAILED = NA, IS_STATIONARY = NA, SEASONALITY_SCORE = NA) + } if (file.exists(dataQualityResultsFile)) { writeLines("updating concept files with data quality results") @@ -39,48 +46,72 @@ augmentConceptFiles <- function(releaseFolder) { results <- dataQualityResults$CheckResults # augment achilles concept files with data quality failure count for relevant concept checks - conceptAggregates <- results %>% filter(!is.na(results$conceptId) & results$failed==1) %>% count(conceptId,tolower(cdmTableName)) - names(conceptAggregates) <- c("concept_id","cdm_table_name", "count_failed") + conceptAggregates <- results %>% + filter(!is.na(results$conceptId) & results$failed == 1) %>% + count(conceptId, tolower(cdmTableName)) + names(conceptAggregates) <- c("concept_id", "cdm_table_name", "count_failed") writeLines(paste0(nrow(conceptAggregates), " concept level data quality issues found.")) if (nrow(conceptAggregates) > 0) { for (row in 1:nrow(conceptAggregates)) { + print(trimws(conceptAggregates[row, "concept_id"])) + concept_id <- trimws(conceptAggregates[row, "concept_id"]) + count_failed <- conceptAggregates[row, "count_failed"] + writeLines(paste0(row, "/", nrow(conceptAggregates), " - inserting data quality results")) - conceptFileName <- paste0("concept_", trimws(conceptAggregates[row, "concept_id"]), ".json") - conceptFile <- file.path(releaseFolder, "concepts", trimws(conceptAggregates[row, "cdm_table_name"]), conceptFileName) - if (file.exists(conceptFile)) { - conceptContent <- readLines(conceptFile) - conceptData <- jsonlite::fromJSON(conceptContent) - conceptData$COUNT_FAILED <- conceptAggregates[row, "count_failed"] - conceptJson <- jsonlite::toJSON(conceptData) - write(conceptJson, conceptFile) + + if (format == "duckdb") { + table <- table %>% mutate(COUNT_FAILED = ifelse(CONCEPT_ID == concept_id, count_failed, COUNT_FAILED)) + DBI::dbWriteTable(duckdbCon, DBI::Id(schema = "concepts", table = "concept_metadata"), table, overwrite = TRUE) + } + if (format == "json") { + conceptFileName <- paste0("concept_", trimws(conceptAggregates[row, "concept_id"]), ".json") + conceptFile <- file.path(releaseFolder, "concepts", trimws(conceptAggregates[row, "cdm_table_name"]), conceptFileName) + + if (file.exists(conceptFile)) { + conceptContent <- readLines(conceptFile) + conceptData <- jsonlite::fromJSON(conceptContent) + conceptData$COUNT_FAILED <- conceptAggregates[row, "count_failed"] + conceptJson <- jsonlite::toJSON(conceptData) + write(conceptJson, conceptFile) + } } } } } else { - writeLines(paste("missing data quality result file ",dataQualityResultsFile)) + writeLines(paste("missing data quality result file ", dataQualityResultsFile)) } - temporalCharacterizationFile <- file.path(releaseFolder,"temporal-characterization.csv") + temporalCharacterizationFile <- file.path(releaseFolder, "temporal-characterization.csv") if (file.exists(temporalCharacterizationFile)) { - temporalCharacterization <- read.csv(temporalCharacterizationFile,header = T) + temporalCharacterization <- read.csv(temporalCharacterizationFile, header = T) writeLines(paste0(nrow(temporalCharacterization), " temporal characterization insights found.")) # augment achilles concept files with temporal characterization check results if (nrow(temporalCharacterization) > 0) { for (row in 1:nrow(temporalCharacterization)) { writeLines(paste0(row, "/", nrow(temporalCharacterization), " - inserting temporal characterization details")) - conceptFileName <- paste0("concept_",trimws(temporalCharacterization[row,"CONCEPT_ID"]),".json") - conceptFile <- file.path(releaseFolder,"concepts", trimws(tolower(temporalCharacterization[row,"CDM_TABLE_NAME"])), conceptFileName) - if (file.exists(conceptFile)) { - conceptContent <- readLines(conceptFile) - conceptData <- jsonlite::fromJSON(conceptContent) - conceptData$IS_STATIONARY <- temporalCharacterization[row,"IS_STATIONARY"] - conceptData$SEASONALITY_SCORE <- temporalCharacterization[row,"SEASONALITY_SCORE"] - conceptJson <- jsonlite::toJSON(conceptData) - write(conceptJson, conceptFile) + concept_id <- temporalCharacterization[row, "CONCEPT_ID"] + is_stationary <- temporalCharacterization[row, "IS_STATIONARY"] + seasonality_score <- temporalCharacterization[row, "SEASONALITY_SCORE"] + if (format == "duckdb") { + table <- table %>% mutate(IS_STATIONARY = ifelse(CONCEPT_ID == concept_id, is_stationary, IS_STATIONARY)) + table <- table %>% mutate(SEASONALITY_SCORE = ifelse(CONCEPT_ID == concept_id, seasonality_score, SEASONALITY_SCORE)) + DBI::dbWriteTable(duckdbCon, DBI::Id(schema = "concepts", table = "concept_metadata"), table, overwrite = TRUE) + } + if (format == "json") { + conceptFileName <- paste0("concept_", trimws(concept_id), ".json") + conceptFile <- file.path(releaseFolder, "concepts", trimws(tolower(temporalCharacterization[row, "CDM_TABLE_NAME"])), conceptFileName) + if (file.exists(conceptFile)) { + conceptContent <- readLines(conceptFile) + conceptData <- jsonlite::fromJSON(conceptContent) + conceptData$IS_STATIONARY <- is_stationary + conceptData$SEASONALITY_SCORE <- seasonality_score + conceptJson <- jsonlite::toJSON(conceptData) + write(conceptJson, conceptFile) + } } } } } else { - writeLines(paste("missing temporal characterization data ",temporalCharacterizationFile)) + writeLines(paste("missing temporal characterization data ", temporalCharacterizationFile)) } } diff --git a/R/ExportToParquet.R b/R/ExportToParquet.R new file mode 100644 index 0000000..3f741f1 --- /dev/null +++ b/R/ExportToParquet.R @@ -0,0 +1,80 @@ +# @file ExportToParquet.R +# +# +# Copyright 2021 Observational Health Data Sciences and Informatics +# +# This file is part of AresIndexer +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#' Export to Parquet +#' @name exportToParquet +#' +#' @details Exports tables from DuckDB database generated by the Achilles package +#' +#' @param sourceFolders A vector of folder locations that contain the files +#' exported from Achilles in the ARES Option format (Achilles::exportAO) +#' +#' +#' +#' +#' +#' +#' @importFrom data.table data.table +#' +#' @export + +library("DBI") +library("duckdb") +library("dplyr") + +exportToParquet <- function(sourceFolders) { + releaseFolders <- list.dirs(sourceFolders, recursive = F) + for (releaseFolder in releaseFolders) { + print(paste0("Processing release folder ", releaseFolder)) + databaseLocation <- file.path(releaseFolder, "concepts", "data.duckdb") + duckdbCon <- duckdb::dbConnect(duckdb::duckdb(), dbdir = databaseLocation, read_only = TRUE) + tableNames <- duckdb::dbListTables(duckdbCon, schema = "concepts") + sort_column <- "CONCEPT_ID" + + for (tableName in tableNames) { + table <- tbl(duckdbCon, DBI::Id(schema = "concepts", table = tableName)) + table <- as.data.frame(table) + + if (sort_column %in% colnames(table)) { + sort_col_index <- which(colnames(table) == sort_column) + table[[sort_col_index]] <- as.integer(table[[sort_col_index]]) + table <- table[order(table[[sort_col_index]]),] + } else { + warning(paste("Sorting column '", sort_column, "' not found in the table. Skipping sorting.", sep = "")) + } + + filename <- paste0(tableName, ".parquet") + parquetPath <- file.path(releaseFolder, "concepts", filename) + arrow::write_parquet( + table, + version = "2.6", + parquetPath, + compression = "gzip", + compression_level = 5, + use_dictionary = TRUE, + write_statistics = TRUE, + chunk_size = 8 * 1024 + ) + + cat("Processed:", tableName, "\n") + } + duckdb::dbDisconnect(duckdbCon, shutdown = TRUE) + } +} + diff --git a/man/augmentConceptFiles.Rd b/man/augmentConceptFiles.Rd deleted file mode 100644 index 7b67d45..0000000 --- a/man/augmentConceptFiles.Rd +++ /dev/null @@ -1,17 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/AugmentConceptFiles.R -\name{augmentConceptFiles} -\alias{augmentConceptFiles} -\title{Augment Concept Files} -\usage{ -augmentConceptFiles(releaseFolder) -} -\arguments{ -\item{releaseFolder}{Folder containing a specific release of a data source} -} -\description{ -Augment Concept Files -} -\details{ -Adds Data Quality and Temporal details to concept data files -} diff --git a/man/buildNetworkPerformanceIndex.Rd b/man/buildNetworkPerformanceIndex.Rd index 622196b..36b0843 100644 --- a/man/buildNetworkPerformanceIndex.Rd +++ b/man/buildNetworkPerformanceIndex.Rd @@ -13,5 +13,6 @@ Network performance results object. Build Network Performance Index } \details{ -Builds an index with network performance results for Achilles and DQD execution across all source folders. +Builds an index with network performance results for Achilles +and DQD execution across all source folders. }