Skip to content

Commit

Permalink
feat: reading of column metadata from duckdb in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
David Gichev authored and Taepper committed Jun 17, 2024
1 parent 3a9e43f commit dda828f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 46 deletions.
6 changes: 2 additions & 4 deletions .run/silo --api.run.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="silo --api" type="CMakeRunConfiguration" factoryName="Application" PROGRAM_PARAMS="--api --dataDirectory output/ --estimatedStartupTimeInMinutes 2" REDIRECT_INPUT="false" ELEVATE="false" USE_EXTERNAL_CONSOLE="false" EMULATE_TERMINAL="false" WORKING_DIR="file://.." PASS_PARENT_ENVS_2="true" PROJECT_NAME="SILO" TARGET_NAME="siloApi" CONFIG_NAME="Debug" RUN_TARGET_PROJECT_NAME="SILO" RUN_TARGET_NAME="siloApi">
<configuration default="false" name="silo --api" type="CMakeRunConfiguration" factoryName="Application" PROGRAM_PARAMS="--api --dataDirectory output/ --estimatedStartupTimeInMinutes 2" REDIRECT_INPUT="false" ELEVATE="false" USE_EXTERNAL_CONSOLE="false" EMULATE_TERMINAL="false" WORKING_DIR="file://$PROJECT_DIR$/testBaseData/exampleDataset" PASS_PARENT_ENVS_2="true" PROJECT_NAME="SILO" TARGET_NAME="siloApi" CONFIG_NAME="Debug" RUN_TARGET_PROJECT_NAME="SILO" RUN_TARGET_NAME="siloApi">
<envs>
<env name="SPDLOG_LEVEL" value="debug" />
</envs>
<method v="2">
<option name="com.jetbrains.cidr.execution.CidrBuildBeforeRunTaskProvider$BuildBeforeRunTask" enabled="true" />
</method>
<method v="2" />
</configuration>
</component>
8 changes: 2 additions & 6 deletions include/silo/common/table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class TableReader {
std::string order_by_clause;
std::unique_ptr<duckdb::MaterializedQueryResult> query_result;
std::unique_ptr<duckdb::DataChunk> current_chunk;
size_t current_row;
size_t current_row_in_chunk;
size_t current_start_of_chunk = 0;
size_t current_chunk_size = 0;

public:
explicit TableReader(
Expand All @@ -51,12 +51,8 @@ class TableReader {
size_t read();

private:
std::optional<std::string> nextKey();

std::string getTableQuery();

void advanceRow();

void loadTable();
};
} // namespace silo
63 changes: 29 additions & 34 deletions src/silo/common/table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

#include <fmt/format.h>
#include <oneapi/tbb/parallel_for.h>
#include <spdlog/spdlog.h>
#include <duckdb.hpp>

Expand Down Expand Up @@ -33,25 +34,38 @@ silo::TableReader::TableReader(
where_clause(where_clause),
order_by_clause(order_by_clause) {}

std::optional<std::string> silo::TableReader::nextKey() {
if (!current_chunk) {
return std::nullopt;
}

return current_chunk->GetValue(0, current_row_in_chunk).GetValue<std::string>();
}

size_t silo::TableReader::read() {
loadTable();
assert(query_result->ColumnCount() == column_functions.size() + 1);
while (nextKey()) {
for (size_t column_idx = 0; column_idx < column_functions.size(); column_idx++) {
column_functions.at(column_idx)
.function(current_row, current_chunk->GetValue(column_idx + 1, current_row_in_chunk));
while (true) {
current_chunk = query_result->Fetch();
if (!current_chunk) {
break;
}
advanceRow();
}
return current_row;
if (current_chunk->size() == 0) {
continue;
}
current_chunk_size = current_chunk->size();

tbb::parallel_for(
tbb::blocked_range<uint32_t>(0, column_functions.size()),
[&](const auto& local) {
for (size_t column_idx = local.begin(); column_idx != local.end(); column_idx++) {
auto& column = current_chunk->data[column_idx + 1];
for (size_t row_in_chunk = 0; row_in_chunk < current_chunk_size; row_in_chunk++) {
column_functions.at(column_idx)
.function(
current_start_of_chunk + row_in_chunk,
column.GetValue(current_start_of_chunk + row_in_chunk)
);
}
}
}
);
current_start_of_chunk += current_chunk_size;
};

return current_start_of_chunk;
}

std::string silo::TableReader::getTableQuery() {
Expand Down Expand Up @@ -87,23 +101,4 @@ void silo::TableReader::loadTable() {
"Error when executing SQL " + query_result->GetError()
);
}
current_chunk = query_result->Fetch();
current_row = 0;
current_row_in_chunk = 0;

while (current_chunk && current_chunk->size() == 0) {
current_chunk = query_result->Fetch();
}
}

void silo::TableReader::advanceRow() {
current_row++;
current_row_in_chunk++;
if (current_row_in_chunk == current_chunk->size()) {
current_row_in_chunk = 0;
current_chunk = query_result->Fetch();
while (current_chunk && current_chunk->size() == 0) {
current_chunk = query_result->Fetch();
}
}
}
14 changes: 12 additions & 2 deletions src/silo/preprocessing/preprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,18 @@ void Preprocessor::buildMetadataStore(
fmt::format("partition_id = {}", partition_id),
order_by_clause
);
const size_t number_of_rows = table_reader.read();
database.partitions.at(partition_id).sequence_count += number_of_rows;

int64_t fill_time;
{
const silo::common::BlockTimer timer(fill_time);
const size_t number_of_rows = table_reader.read();
database.partitions.at(partition_id).sequence_count += number_of_rows;
}
SPDLOG_DEBUG(
"build - finished fill columns for partition {} in {} microseconds",
partition_id,
fill_time
);
SPDLOG_INFO("build - finished columns for partition {}", partition_id);
}
}
Expand Down

0 comments on commit dda828f

Please sign in to comment.