Skip to content

Commit

Permalink
refactor: lambda-callbacks instead of fill columns
Browse files Browse the repository at this point in the history
instead of the columns pulling the next row from the file_reader, the file_reader
 instead pushes the value to the columns using callbacks.

This is in preparation for parallelizing the building of column functions
  • Loading branch information
Taepper committed Jun 13, 2024
1 parent 6dbe251 commit b0f4d6b
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 75 deletions.
24 changes: 16 additions & 8 deletions include/silo/common/table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@

namespace silo {

struct ColumnFunction {
class ColumnFunction {
friend class TableReader;
std::string column_name;
std::function<void(size_t, const duckdb::Value&)> function;

public:
ColumnFunction(
std::string column_name,
std::function<void(size_t, const duckdb::Value&)> function
);
};

class TableReader {
Expand All @@ -31,12 +38,6 @@ class TableReader {
size_t current_row;
size_t current_row_in_chunk;

std::optional<std::string> nextKey();

std::string getTableQuery();

void advanceRow();

public:
explicit TableReader(
duckdb::Connection& connection,
Expand All @@ -47,7 +48,14 @@ class TableReader {
std::string_view order_by_clause
);

void read();
size_t read();

private:
std::optional<std::string> nextKey();

std::string getTableQuery();

void advanceRow();

void loadTable();
};
Expand Down
7 changes: 0 additions & 7 deletions include/silo/storage/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ class ColumnPartitionGroup {
std::map<std::string, storage::column::DateColumnPartition&> date_columns;
std::map<std::string, storage::column::PangoLineageColumnPartition&> pango_lineage_columns;

uint32_t fill(
duckdb::Connection& connection,
uint32_t partition_id,
const std::string& order_by_clause,
const silo::config::DatabaseConfig& database_config
);

void addValueToColumn(
const std::string& column_name,
config::ColumnType column_type,
Expand Down
10 changes: 9 additions & 1 deletion src/silo/common/table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@

#include "silo/preprocessing/preprocessing_exception.h"

silo::ColumnFunction::ColumnFunction(
std::string column_name,
std::function<void(size_t, const duckdb::Value&)> function
)
: column_name(std::move(column_name)),
function(std::move(function)) {}

silo::TableReader::TableReader(
duckdb::Connection& connection,
std::string_view table_name,
Expand All @@ -34,7 +41,7 @@ std::optional<std::string> silo::TableReader::nextKey() {
return current_chunk->GetValue(0, current_row_in_chunk).GetValue<std::string>();
}

void silo::TableReader::read() {
size_t silo::TableReader::read() {
loadTable();
assert(query_result->ColumnCount() == column_functions.size() + 1);
while (nextKey()) {
Expand All @@ -44,6 +51,7 @@ void silo::TableReader::read() {
}
advanceRow();
}
return current_row;
}

std::string silo::TableReader::getTableQuery() {
Expand Down
31 changes: 23 additions & 8 deletions src/silo/preprocessing/preprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,15 +651,30 @@ void Preprocessor::buildMetadataStore(
) {
for (size_t partition_id = 0; partition_id < partition_descriptor.getPartitions().size();
++partition_id) {
const auto& part = partition_descriptor.getPartitions()[partition_id];
for (size_t chunk_index = 0; chunk_index < part.getPartitionChunks().size(); ++chunk_index) {
const uint32_t sequences_added =
database.partitions.at(partition_id)
.columns.fill(
preprocessing_db.getConnection(), partition_id, order_by_clause, database_config
);
database.partitions.at(partition_id).sequence_count += sequences_added;
auto& column_group = database.partitions.at(partition_id).columns;
std::vector<ColumnFunction> column_functions;
for (auto& item : database_config.schema.metadata) {
column_functions.emplace_back(
item.name,
[&](size_t row_idx, const duckdb::Value& value) {
if (value.IsNull()) {
column_group.addNullToColumn(item.name, item.getColumnType());
} else {
column_group.addValueToColumn(item.name, item.getColumnType(), value);
}
}
);
}
TableReader table_reader(
preprocessing_db.getConnection(),
"partitioned_metadata",
database_config.schema.primary_key,
column_functions,
fmt::format("partition_id = {}", partition_id),
order_by_clause
);
size_t number_of_rows = table_reader.read();
database.partitions.at(partition_id).sequence_count += number_of_rows;
SPDLOG_INFO("build - finished columns for partition {}", partition_id);
}
}
Expand Down
51 changes: 0 additions & 51 deletions src/silo/storage/column_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,57 +26,6 @@ using silo::config::ColumnType;

using silo::common::OptionalBool;

uint32_t ColumnPartitionGroup::fill(
duckdb::Connection& connection,
uint32_t partition_id,
const std::string& order_by_clause,
const silo::config::DatabaseConfig& database_config
) {
uint32_t sequence_count = 0;

std::vector<std::string> column_names;
column_names.reserve(database_config.schema.metadata.size());
for (const auto& item : database_config.schema.metadata) {
column_names.push_back("\"" + item.name + "\"");
}
std::string column_name_sql = boost::algorithm::join(column_names, ", ");

auto result = connection.Query(fmt::format(
"SELECT {} FROM partitioned_metadata WHERE partition_id = {} {}",
column_name_sql,
partition_id,
order_by_clause
));
if (result->HasError()) {
throw preprocessing::PreprocessingException(
"Error in the execution of the duckdb statement for partition key table "
"generation: " +
result->GetError()
);
}
const size_t row_count = result->RowCount();
for (const auto& item : database_config.schema.metadata) {
const auto column_type = item.getColumnType();
reserveSpaceInColumn(item.name, column_type, row_count);
}

for (auto it = result->begin(); it != result->end(); ++it) {
size_t column_index = 0;
for (const auto& item : database_config.schema.metadata) {
const auto column_type = item.getColumnType();
const auto value = it.current_row.GetValue<duckdb::Value>(column_index++);
addValueToColumn(item.name, column_type, value);
}
if (++sequence_count == UINT32_MAX) {
throw std::runtime_error(
"SILO is currently limited to UINT32_MAX=" + std::to_string(UINT32_MAX) + " sequences."
);
}
}

return sequence_count;
}

void ColumnPartitionGroup::addValueToColumn(
const std::string& column_name,
ColumnType column_type,
Expand Down

0 comments on commit b0f4d6b

Please sign in to comment.