diff --git a/include/silo/common/table_reader.h b/include/silo/common/table_reader.h index 8b6d525a4..12763b78a 100644 --- a/include/silo/common/table_reader.h +++ b/include/silo/common/table_reader.h @@ -13,9 +13,16 @@ namespace silo { -struct ColumnFunction { +class ColumnFunction { + friend class TableReader; std::string column_name; std::function function; + + public: + ColumnFunction( + std::string column_name, + std::function function + ); }; class TableReader { @@ -31,12 +38,6 @@ class TableReader { size_t current_row; size_t current_row_in_chunk; - std::optional nextKey(); - - std::string getTableQuery(); - - void advanceRow(); - public: explicit TableReader( duckdb::Connection& connection, @@ -47,7 +48,14 @@ class TableReader { std::string_view order_by_clause ); - void read(); + size_t read(); + + private: + std::optional nextKey(); + + std::string getTableQuery(); + + void advanceRow(); void loadTable(); }; diff --git a/include/silo/storage/column_group.h b/include/silo/storage/column_group.h index 41e547152..dc3c17640 100644 --- a/include/silo/storage/column_group.h +++ b/include/silo/storage/column_group.h @@ -81,13 +81,6 @@ class ColumnPartitionGroup { std::map date_columns; std::map pango_lineage_columns; - uint32_t fill( - duckdb::Connection& connection, - uint32_t partition_id, - const std::string& order_by_clause, - const silo::config::DatabaseConfig& database_config - ); - void addValueToColumn( const std::string& column_name, config::ColumnType column_type, diff --git a/src/silo/common/table_reader.cpp b/src/silo/common/table_reader.cpp index 9e73c88eb..9c4848947 100644 --- a/src/silo/common/table_reader.cpp +++ b/src/silo/common/table_reader.cpp @@ -11,6 +11,13 @@ #include "silo/preprocessing/preprocessing_exception.h" +silo::ColumnFunction::ColumnFunction( + std::string column_name, + std::function function +) + : column_name(std::move(column_name)), + function(std::move(function)) {} + silo::TableReader::TableReader( duckdb::Connection& connection, std::string_view table_name, @@ -34,7 +41,7 @@ std::optional silo::TableReader::nextKey() { return current_chunk->GetValue(0, current_row_in_chunk).GetValue(); } -void silo::TableReader::read() { +size_t silo::TableReader::read() { loadTable(); assert(query_result->ColumnCount() == column_functions.size() + 1); while (nextKey()) { @@ -44,6 +51,7 @@ void silo::TableReader::read() { } advanceRow(); } + return current_row; } std::string silo::TableReader::getTableQuery() { diff --git a/src/silo/preprocessing/preprocessor.cpp b/src/silo/preprocessing/preprocessor.cpp index 953286ce1..99a5ed40f 100644 --- a/src/silo/preprocessing/preprocessor.cpp +++ b/src/silo/preprocessing/preprocessor.cpp @@ -651,15 +651,31 @@ void Preprocessor::buildMetadataStore( ) { for (size_t partition_id = 0; partition_id < partition_descriptor.getPartitions().size(); ++partition_id) { - const auto& part = partition_descriptor.getPartitions()[partition_id]; - for (size_t chunk_index = 0; chunk_index < part.getPartitionChunks().size(); ++chunk_index) { - const uint32_t sequences_added = - database.partitions.at(partition_id) - .columns.fill( - preprocessing_db.getConnection(), partition_id, order_by_clause, database_config - ); - database.partitions.at(partition_id).sequence_count += sequences_added; + auto& column_group = database.partitions.at(partition_id).columns; + std::vector column_functions; + column_functions.reserve(database_config.schema.metadata.size()); + for (auto& item : database_config.schema.metadata) { + column_functions.emplace_back( + item.name, + [&](size_t /*row_idx*/, const duckdb::Value& value) { + if (value.IsNull()) { + column_group.addNullToColumn(item.name, item.getColumnType()); + } else { + column_group.addValueToColumn(item.name, item.getColumnType(), value); + } + } + ); } + TableReader table_reader( + preprocessing_db.getConnection(), + "partitioned_metadata", + database_config.schema.primary_key, + column_functions, + fmt::format("partition_id = {}", partition_id), + order_by_clause + ); + const size_t number_of_rows = table_reader.read(); + database.partitions.at(partition_id).sequence_count += number_of_rows; SPDLOG_INFO("build - finished columns for partition {}", partition_id); } } diff --git a/src/silo/storage/column_group.cpp b/src/silo/storage/column_group.cpp index d377db5a3..9bb08e424 100644 --- a/src/silo/storage/column_group.cpp +++ b/src/silo/storage/column_group.cpp @@ -26,57 +26,6 @@ using silo::config::ColumnType; using silo::common::OptionalBool; -uint32_t ColumnPartitionGroup::fill( - duckdb::Connection& connection, - uint32_t partition_id, - const std::string& order_by_clause, - const silo::config::DatabaseConfig& database_config -) { - uint32_t sequence_count = 0; - - std::vector column_names; - column_names.reserve(database_config.schema.metadata.size()); - for (const auto& item : database_config.schema.metadata) { - column_names.push_back("\"" + item.name + "\""); - } - std::string column_name_sql = boost::algorithm::join(column_names, ", "); - - auto result = connection.Query(fmt::format( - "SELECT {} FROM partitioned_metadata WHERE partition_id = {} {}", - column_name_sql, - partition_id, - order_by_clause - )); - if (result->HasError()) { - throw preprocessing::PreprocessingException( - "Error in the execution of the duckdb statement for partition key table " - "generation: " + - result->GetError() - ); - } - const size_t row_count = result->RowCount(); - for (const auto& item : database_config.schema.metadata) { - const auto column_type = item.getColumnType(); - reserveSpaceInColumn(item.name, column_type, row_count); - } - - for (auto it = result->begin(); it != result->end(); ++it) { - size_t column_index = 0; - for (const auto& item : database_config.schema.metadata) { - const auto column_type = item.getColumnType(); - const auto value = it.current_row.GetValue(column_index++); - addValueToColumn(item.name, column_type, value); - } - if (++sequence_count == UINT32_MAX) { - throw std::runtime_error( - "SILO is currently limited to UINT32_MAX=" + std::to_string(UINT32_MAX) + " sequences." - ); - } - } - - return sequence_count; -} - void ColumnPartitionGroup::addValueToColumn( const std::string& column_name, ColumnType column_type,