Skip to content

Commit

Permalink
[improvement](orc-reader) Implements ORC lazy materialization (apache…
Browse files Browse the repository at this point in the history
…#18615)

- Implements ORC lazy materialization, integrate with the implementation of apache/doris-thirdparty#56 and apache/doris-thirdparty#62.
- Refactor code: Move `execute_conjuncts()` and `execute_conjuncts_and_filter_block()` in `parquet_group_reader `to `VExprContext`, used by parquet reader and orc reader.
- Add session variables `enable_parquet_lazy_materialization` and `enable_orc_lazy_materialization` to control whether enable lazy materialization.
- Modify `build.sh` to update apache-orc submodule or download package every time.
  • Loading branch information
kaka11chen authored May 9, 2023
1 parent dfad7b6 commit 096aa25
Show file tree
Hide file tree
Showing 15 changed files with 716 additions and 264 deletions.
454 changes: 372 additions & 82 deletions be/src/vec/exec/format/orc/vorc_reader.cpp

Large diffs are not rendered by default.

163 changes: 138 additions & 25 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "common/config.h"
#include "common/status.h"
#include "exec/olap_common.h"
#include "exec/text_converter.h"
#include "io/file_factory.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
Expand Down Expand Up @@ -81,6 +82,38 @@ namespace doris::vectorized {

class ORCFileInputStream;

struct OrcPredicate {
std::string col_name;
orc::PredicateDataType data_type;
std::vector<orc::Literal> literals;
SQLFilterOp op;
};

struct LazyReadContext {
VExprContext* vconjunct_ctx = nullptr;
bool can_lazy_read = false;
// block->rows() returns the number of rows of the first column,
// so we should check and resize the first column
bool resize_first_column = true;
std::list<std::string> all_read_columns;
// include predicate_partition_columns & predicate_missing_columns
std::vector<uint32_t> all_predicate_col_ids;
// save slot_id to find dict filter column name, because expr column name may
// be different with orc column name
// std::pair<std::list<col_name>, std::vector<slot_id>>
std::pair<std::list<std::string>, std::vector<int>> predicate_columns;
std::list<std::string> predicate_orc_columns;
std::vector<std::string> lazy_read_columns;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
predicate_partition_columns;
// lazy read partition columns or all partition columns
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
partition_columns;
std::unordered_map<std::string, VExprContext*> predicate_missing_columns;
// lazy read missing columns or all missing columns
std::unordered_map<std::string, VExprContext*> missing_columns;
};

class OrcReader : public GenericReader {
ENABLE_FACTORY_CREATOR(OrcReader);

Expand All @@ -98,16 +131,31 @@ class OrcReader : public GenericReader {

OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, const std::vector<std::string>& column_names,
size_t batch_size, const std::string& ctz, io::IOContext* io_ctx);
size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
bool enable_lazy_mat = true);

OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<std::string>& column_names, const std::string& ctz,
io::IOContext* io_ctx);
io::IOContext* io_ctx, bool enable_lazy_mat = true);

~OrcReader() override;

Status init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
VExprContext* vconjunct_ctx);

Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContext*>& missing_columns) override;

Status _fill_partition_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns);
Status _fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContext*>& missing_columns);

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Expand All @@ -122,6 +170,8 @@ class OrcReader : public GenericReader {
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;

Status filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg);

private:
struct OrcProfile {
RuntimeProfile::Counter* read_time;
Expand All @@ -134,6 +184,19 @@ class OrcReader : public GenericReader {
RuntimeProfile::Counter* decode_null_map_time;
};

class ORCFilterImpl : public orc::ORCFilter {
public:
ORCFilterImpl(OrcReader* orcReader) : orcReader(orcReader) {}
~ORCFilterImpl() override = default;
void filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size,
void* arg) const override {
orcReader->filter(data, sel, size, arg);
}

private:
OrcReader* orcReader;
};

// Create inner orc file,
// return EOF if file is empty
// return EROOR if encounter error.
Expand All @@ -142,12 +205,13 @@ class OrcReader : public GenericReader {
void _init_profile();
Status _init_read_columns();
TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
void _init_search_argument(
bool _init_search_argument(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
void _init_bloom_filter(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
void _init_system_properties();
void _init_file_description();
template <bool is_filter = false>
Status _orc_column_to_doris_column(const std::string& col_name, const ColumnPtr& doris_column,
const DataTypePtr& data_type,
const orc::Type* orc_column_type,
Expand Down Expand Up @@ -194,7 +258,7 @@ class OrcReader : public GenericReader {
}
}

template <typename DecimalPrimitiveType, typename OrcColumnType>
template <typename DecimalPrimitiveType, typename OrcColumnType, bool is_filter>
Status _decode_explicit_decimal_column(const std::string& col_name,
const MutableColumnPtr& data_column,
const DataTypePtr& data_type,
Expand All @@ -218,42 +282,68 @@ class OrcReader : public GenericReader {
auto origin_size = column_data.size();
column_data.resize(origin_size + num_values);

for (int i = 0; i < num_values; ++i) {
int128_t value;
if constexpr (std::is_same_v<OrcColumnType, orc::Decimal64VectorBatch>) {
value = static_cast<int128_t>(cvb_data[i]);
} else {
uint64_t hi = data->values[i].getHighBits();
uint64_t lo = data->values[i].getLowBits();
value = (((int128_t)hi) << 64) | (int128_t)lo;
}
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
for (int i = 0; i < num_values; ++i) {
int128_t value;
if constexpr (std::is_same_v<OrcColumnType, orc::Decimal64VectorBatch>) {
value = static_cast<int128_t>(cvb_data[i]);
} else {
uint64_t hi = data->values[i].getHighBits();
uint64_t lo = data->values[i].getLowBits();
value = (((int128_t)hi) << 64) | (int128_t)lo;
}
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[origin_size + i]);
v = (DecimalPrimitiveType)value;
}
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
for (int i = 0; i < num_values; ++i) {
int128_t value;
if constexpr (std::is_same_v<OrcColumnType, orc::Decimal64VectorBatch>) {
value = static_cast<int128_t>(cvb_data[i]);
} else {
uint64_t hi = data->values[i].getHighBits();
uint64_t lo = data->values[i].getLowBits();
value = (((int128_t)hi) << 64) | (int128_t)lo;
}
value /= scale_params.scale_factor;
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[origin_size + i]);
v = (DecimalPrimitiveType)value;
}
} else {
for (int i = 0; i < num_values; ++i) {
int128_t value;
if constexpr (std::is_same_v<OrcColumnType, orc::Decimal64VectorBatch>) {
value = static_cast<int128_t>(cvb_data[i]);
} else {
uint64_t hi = data->values[i].getHighBits();
uint64_t lo = data->values[i].getLowBits();
value = (((int128_t)hi) << 64) | (int128_t)lo;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[origin_size + i]);
v = (DecimalPrimitiveType)value;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[origin_size + i]);
v = (DecimalPrimitiveType)value;
}
return Status::OK();
}

template <typename DecimalPrimitiveType>
template <typename DecimalPrimitiveType, bool is_filter>
Status _decode_decimal_column(const std::string& col_name, const MutableColumnPtr& data_column,
const DataTypePtr& data_type, orc::ColumnVectorBatch* cvb,
size_t num_values) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
if (dynamic_cast<orc::Decimal64VectorBatch*>(cvb) != nullptr) {
return _decode_explicit_decimal_column<DecimalPrimitiveType, orc::Decimal64VectorBatch>(
col_name, data_column, data_type, cvb, num_values);
return _decode_explicit_decimal_column<DecimalPrimitiveType, orc::Decimal64VectorBatch,
is_filter>(col_name, data_column, data_type, cvb,
num_values);
} else {
return _decode_explicit_decimal_column<DecimalPrimitiveType,
orc::Decimal128VectorBatch>(
col_name, data_column, data_type, cvb, num_values);
return _decode_explicit_decimal_column<DecimalPrimitiveType, orc::Decimal128VectorBatch,
is_filter>(col_name, data_column, data_type, cvb,
num_values);
}
}

template <typename CppType, typename DorisColumnType, typename OrcColumnType>
template <typename CppType, typename DorisColumnType, typename OrcColumnType, bool is_filter>
Status _decode_time_column(const std::string& col_name, const MutableColumnPtr& data_column,
orc::ColumnVectorBatch* cvb, size_t num_values) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
Expand All @@ -264,16 +354,30 @@ class OrcReader : public GenericReader {
auto& column_data = static_cast<ColumnVector<DorisColumnType>&>(*data_column).get_data();
auto origin_size = column_data.size();
column_data.resize(origin_size + num_values);
UInt8* __restrict filter_data;
if constexpr (is_filter) {
filter_data = _filter->data();
}
for (int i = 0; i < num_values; ++i) {
auto& v = reinterpret_cast<CppType&>(column_data[origin_size + i]);
if constexpr (std::is_same_v<OrcColumnType, orc::LongVectorBatch>) { // date
if constexpr (is_filter) {
if (!filter_data[i]) {
continue;
}
}
int64_t& date_value = data->data[i];
v.from_unixtime(date_value * 24 * 60 * 60, _time_zone); // day to seconds
if constexpr (std::is_same_v<CppType, VecDateTimeValue>) {
// we should cast to date if using date v1.
v.cast_to_date();
}
} else { // timestamp
if constexpr (is_filter) {
if (!filter_data[i]) {
continue;
}
}
v.from_unixtime(data->data[i], _time_zone);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
// nanoseconds will lose precision. only keep microseconds.
Expand All @@ -284,6 +388,7 @@ class OrcReader : public GenericReader {
return Status::OK();
}

template <bool is_filter>
Status _decode_string_column(const std::string& col_name, const MutableColumnPtr& data_column,
const orc::TypeKind& type_kind, orc::ColumnVectorBatch* cvb,
size_t num_values);
Expand Down Expand Up @@ -321,6 +426,7 @@ class OrcReader : public GenericReader {
std::unordered_map<std::string, std::string> _file_col_to_schema_col;
// Flag for hive engine. True if the external table engine is Hive.
bool _is_hive = false;
std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
std::vector<const orc::Type*> _col_orc_type;
std::unique_ptr<ORCFileInputStream> _file_input_stream;
Statistics _statistics;
Expand All @@ -330,15 +436,22 @@ class OrcReader : public GenericReader {
std::unique_ptr<orc::ColumnVectorBatch> _batch;
std::unique_ptr<orc::Reader> _reader;
std::unique_ptr<orc::RowReader> _row_reader;
std::unique_ptr<ORCFilterImpl> _orc_filter;
orc::ReaderOptions _reader_options;
orc::RowReaderOptions _row_reader_options;

std::shared_ptr<io::FileSystem> _file_system;

io::IOContext* _io_ctx;
bool _enable_lazy_mat = true;

std::vector<DecimalScaleParams> _decimal_scale_params;
size_t _decimal_scale_params_index;

std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::unique_ptr<IColumn::Filter> _filter = nullptr;
LazyReadContext _lazy_read_ctx;
std::unique_ptr<TextConverter> _text_converter = nullptr;
};

class ORCFileInputStream : public orc::InputStream {
Expand Down
Loading

0 comments on commit 096aa25

Please sign in to comment.