Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: lambda-callbacks instead of fill columns #482

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions include/silo/common/table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@

namespace silo {

struct ColumnFunction {
class ColumnFunction {
friend class TableReader;
std::string column_name;
std::function<void(size_t, const duckdb::Value&)> function;

public:
ColumnFunction(
std::string column_name,
std::function<void(size_t, const duckdb::Value&)> function
);
};

class TableReader {
Expand All @@ -31,12 +38,6 @@ class TableReader {
size_t current_row;
size_t current_row_in_chunk;

std::optional<std::string> nextKey();

std::string getTableQuery();

void advanceRow();

public:
explicit TableReader(
duckdb::Connection& connection,
Expand All @@ -47,7 +48,14 @@ class TableReader {
std::string_view order_by_clause
);

void read();
size_t read();

private:
std::optional<std::string> nextKey();

std::string getTableQuery();

void advanceRow();

void loadTable();
};
Expand Down
7 changes: 0 additions & 7 deletions include/silo/storage/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ class ColumnPartitionGroup {
std::map<std::string, storage::column::DateColumnPartition&> date_columns;
std::map<std::string, storage::column::PangoLineageColumnPartition&> pango_lineage_columns;

uint32_t fill(
duckdb::Connection& connection,
uint32_t partition_id,
const std::string& order_by_clause,
const silo::config::DatabaseConfig& database_config
);

void addValueToColumn(
const std::string& column_name,
config::ColumnType column_type,
Expand Down
10 changes: 9 additions & 1 deletion src/silo/common/table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@

#include "silo/preprocessing/preprocessing_exception.h"

silo::ColumnFunction::ColumnFunction(
std::string column_name,
std::function<void(size_t, const duckdb::Value&)> function
)
: column_name(std::move(column_name)),
function(std::move(function)) {}

silo::TableReader::TableReader(
duckdb::Connection& connection,
std::string_view table_name,
Expand All @@ -34,7 +41,7 @@ std::optional<std::string> silo::TableReader::nextKey() {
return current_chunk->GetValue(0, current_row_in_chunk).GetValue<std::string>();
}

void silo::TableReader::read() {
size_t silo::TableReader::read() {
loadTable();
assert(query_result->ColumnCount() == column_functions.size() + 1);
while (nextKey()) {
Expand All @@ -44,6 +51,7 @@ void silo::TableReader::read() {
}
advanceRow();
}
return current_row;
}

std::string silo::TableReader::getTableQuery() {
Expand Down
32 changes: 24 additions & 8 deletions src/silo/preprocessing/preprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,15 +651,31 @@ void Preprocessor::buildMetadataStore(
) {
for (size_t partition_id = 0; partition_id < partition_descriptor.getPartitions().size();
++partition_id) {
const auto& part = partition_descriptor.getPartitions()[partition_id];
for (size_t chunk_index = 0; chunk_index < part.getPartitionChunks().size(); ++chunk_index) {
const uint32_t sequences_added =
database.partitions.at(partition_id)
.columns.fill(
preprocessing_db.getConnection(), partition_id, order_by_clause, database_config
);
database.partitions.at(partition_id).sequence_count += sequences_added;
auto& column_group = database.partitions.at(partition_id).columns;
std::vector<ColumnFunction> column_functions;
column_functions.reserve(database_config.schema.metadata.size());
for (auto& item : database_config.schema.metadata) {
column_functions.emplace_back(
item.name,
[&](size_t /*row_idx*/, const duckdb::Value& value) {
if (value.IsNull()) {
column_group.addNullToColumn(item.name, item.getColumnType());
} else {
column_group.addValueToColumn(item.name, item.getColumnType(), value);
}
}
);
}
TableReader table_reader(
preprocessing_db.getConnection(),
"partitioned_metadata",
database_config.schema.primary_key,
column_functions,
fmt::format("partition_id = {}", partition_id),
order_by_clause
);
const size_t number_of_rows = table_reader.read();
database.partitions.at(partition_id).sequence_count += number_of_rows;
SPDLOG_INFO("build - finished columns for partition {}", partition_id);
}
}
Expand Down
51 changes: 0 additions & 51 deletions src/silo/storage/column_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,57 +26,6 @@ using silo::config::ColumnType;

using silo::common::OptionalBool;

uint32_t ColumnPartitionGroup::fill(
duckdb::Connection& connection,
uint32_t partition_id,
const std::string& order_by_clause,
const silo::config::DatabaseConfig& database_config
) {
uint32_t sequence_count = 0;

std::vector<std::string> column_names;
column_names.reserve(database_config.schema.metadata.size());
for (const auto& item : database_config.schema.metadata) {
column_names.push_back("\"" + item.name + "\"");
}
std::string column_name_sql = boost::algorithm::join(column_names, ", ");

auto result = connection.Query(fmt::format(
"SELECT {} FROM partitioned_metadata WHERE partition_id = {} {}",
column_name_sql,
partition_id,
order_by_clause
));
if (result->HasError()) {
throw preprocessing::PreprocessingException(
"Error in the execution of the duckdb statement for partition key table "
"generation: " +
result->GetError()
);
}
const size_t row_count = result->RowCount();
for (const auto& item : database_config.schema.metadata) {
const auto column_type = item.getColumnType();
reserveSpaceInColumn(item.name, column_type, row_count);
}

for (auto it = result->begin(); it != result->end(); ++it) {
size_t column_index = 0;
for (const auto& item : database_config.schema.metadata) {
const auto column_type = item.getColumnType();
const auto value = it.current_row.GetValue<duckdb::Value>(column_index++);
addValueToColumn(item.name, column_type, value);
}
if (++sequence_count == UINT32_MAX) {
throw std::runtime_error(
"SILO is currently limited to UINT32_MAX=" + std::to_string(UINT32_MAX) + " sequences."
);
}
}

return sequence_count;
}

void ColumnPartitionGroup::addValueToColumn(
const std::string& column_name,
ColumnType column_type,
Expand Down