Skip to content

Commit

Permalink
Merge branch 'master' into nereids_refresh_catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 authored Nov 15, 2024
2 parents 8deae3f + fc5b87e commit 8251753
Show file tree
Hide file tree
Showing 27 changed files with 1,421 additions and 284 deletions.
6 changes: 4 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,11 @@ Status Compaction::do_inverted_index_compaction() {
DORIS_TRY(inverted_index_file_readers[src_segment_id]->open(index_meta));
}
for (int dest_segment_id = 0; dest_segment_id < dest_segment_num; dest_segment_id++) {
auto* dest_dir =
auto dest_dir =
DORIS_TRY(inverted_index_file_writers[dest_segment_id]->open(index_meta));
dest_index_dirs[dest_segment_id] = dest_dir;
// Destination directories in dest_index_dirs do not need to be deconstructed,
// but their lifecycle must be managed by inverted_index_file_writers.
dest_index_dirs[dest_segment_id] = dest_dir.get();
}
auto st = compact_column(index_meta->index_id(), src_idx_dirs, dest_index_dirs,
index_tmp_path.native(), trans_vec, dest_segment_num_rows);
Expand Down
7 changes: 0 additions & 7 deletions be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ Status compact_column(int64_t index_id,
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed.
_CLDECDELETE(dir)
for (auto* d : dest_index_dirs) {
if (d != nullptr) {
// NOTE: DO NOT close dest dir here, because it will be closed when dest index writer finalize.
//d->close();
//_CLDELETE(d);
}
}

// delete temporary segment_path, only when inverted_index_ram_dir_enable is false
if (!config::inverted_index_ram_dir_enable) {
Expand Down
521 changes: 295 additions & 226 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp

Large diffs are not rendered by default.

68 changes: 59 additions & 9 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@

#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "runtime/exec_env.h"

namespace doris {
class TabletIndex;

namespace segment_v2 {
class DorisFSDirectory;
using InvertedIndexDirectoryMap =
std::map<std::pair<int64_t, std::string>, std::unique_ptr<lucene::store::Directory>>;
std::map<std::pair<int64_t, std::string>, std::shared_ptr<lucene::store::Directory>>;

class InvertedIndexFileWriter;
using InvertedIndexFileWriterPtr = std::unique_ptr<InvertedIndexFileWriter>;
Expand All @@ -58,16 +60,19 @@ class InvertedIndexFileWriter {
_rowset_id(std::move(rowset_id)),
_seg_id(seg_id),
_storage_format(storage_format),
_idx_v2_writer(std::move(file_writer)) {}
_local_fs(io::global_local_filesystem()),
_idx_v2_writer(std::move(file_writer)) {
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
_tmp_dir = tmp_file_dir.native();
}

Result<DorisFSDirectory*> open(const TabletIndex* index_meta);
Result<std::shared_ptr<DorisFSDirectory>> open(const TabletIndex* index_meta);
Status delete_index(const TabletIndex* index_meta);
Status initialize(InvertedIndexDirectoryMap& indices_dirs);
~InvertedIndexFileWriter() = default;
virtual ~InvertedIndexFileWriter() = default;
Status write_v2();
Status write_v1();
Status close();
int64_t headerLength();
const InvertedIndexFileInfo* get_index_file_info() const {
DCHECK(_closed) << debug_string();
return &_file_info;
Expand All @@ -77,11 +82,7 @@ class InvertedIndexFileWriter {
return _total_file_size;
}
const io::FileSystemSPtr& get_fs() const { return _fs; }
void sort_files(std::vector<FileInfo>& file_infos);
void copyFile(const char* fileName, lucene::store::Directory* dir,
lucene::store::IndexOutput* output, uint8_t* buffer, int64_t bufferLength);
InvertedIndexStorageFormatPB get_storage_format() const { return _storage_format; }

void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = opts; }

std::string debug_string() const {
Expand All @@ -99,12 +100,61 @@ class InvertedIndexFileWriter {
}

private:
// Helper functions shared between write_v1 and write_v2
std::vector<FileInfo> prepare_sorted_files(lucene::store::Directory* directory);
void sort_files(std::vector<FileInfo>& file_infos);
void copyFile(const char* fileName, lucene::store::Directory* dir,
lucene::store::IndexOutput* output, uint8_t* buffer, int64_t bufferLength);
void finalize_output_dir(lucene::store::Directory* out_dir);
void add_index_info(int64_t index_id, const std::string& index_suffix,
int64_t compound_file_size);
int64_t headerLength();
// Helper functions specific to write_v1
std::pair<int64_t, int32_t> calculate_header_length(const std::vector<FileInfo>& sorted_files,
lucene::store::Directory* directory);
std::pair<lucene::store::Directory*, std::unique_ptr<lucene::store::IndexOutput>>
create_output_stream_v1(int64_t index_id, const std::string& index_suffix);
virtual void write_header_and_data_v1(lucene::store::IndexOutput* output,
const std::vector<FileInfo>& sorted_files,
lucene::store::Directory* directory,
int64_t header_length, int32_t header_file_count);
// Helper functions specific to write_v2
std::pair<lucene::store::Directory*, std::unique_ptr<lucene::store::IndexOutput>>
create_output_stream_v2();
void write_version_and_indices_count(lucene::store::IndexOutput* output);
struct FileMetadata {
int64_t index_id;
std::string index_suffix;
std::string filename;
int64_t offset;
int64_t length;
lucene::store::Directory* directory;

FileMetadata(int64_t id, const std::string& suffix, const std::string& file, int64_t off,
int64_t len, lucene::store::Directory* dir)
: index_id(id),
index_suffix(suffix),
filename(file),
offset(off),
length(len),
directory(dir) {}
};
std::vector<FileMetadata> prepare_file_metadata_v2(int64_t& current_offset);
virtual void write_index_headers_and_metadata(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);
void copy_files_data_v2(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);
Status _insert_directory_into_map(int64_t index_id, const std::string& index_suffix,
std::shared_ptr<DorisFSDirectory> dir);
// Member variables...
InvertedIndexDirectoryMap _indices_dirs;
const io::FileSystemSPtr _fs;
std::string _index_path_prefix;
std::string _rowset_id;
int64_t _seg_id;
InvertedIndexStorageFormatPB _storage_format;
std::string _tmp_dir;
const std::shared_ptr<io::LocalFileSystem>& _local_fs;

// write to disk or stream
io::FileWriterPtr _idx_v2_writer = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
bool create_index = true;
bool close_dir_on_shutdown = true;
auto index_writer = std::make_unique<lucene::index::IndexWriter>(
_dir, _analyzer.get(), create_index, close_dir_on_shutdown);
_dir.get(), _analyzer.get(), create_index, close_dir_on_shutdown);
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setRAMBufferSizeMB_error",
{ index_writer->setRAMBufferSizeMB(-100); })
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setMaxBufferedDocs_error",
Expand Down Expand Up @@ -708,7 +708,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr;
InvertedIndexCtxSPtr _inverted_index_ctx = nullptr;
DorisFSDirectory* _dir = nullptr;
std::shared_ptr<DorisFSDirectory> _dir = nullptr;
const KeyCoder* _value_key_coder;
const TabletIndex* _index_meta;
InvertedIndexParserType _parser_type;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,7 @@ void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector<std::
std::shared_lock l(stale_delete_bitmap_lock);
//<rowset_id, start_version, end_version>
std::vector<std::tuple<std::string, uint64_t, uint64_t>> to_delete;
auto tablet_id = -1;
int64_t tablet_id = -1;
for (auto& version_str : vector) {
auto it = _stale_delete_bitmap.find(version_str);
if (it != _stale_delete_bitmap.end()) {
Expand Down
17 changes: 7 additions & 10 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,23 +516,20 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block
local_state._matched_rows_done
: local_state._matched_rows_done);

size_t join_block_column_size = local_state._join_block.columns();
{
vectorized::Block tmp_block = local_state._join_block;

// Here make _join_block release the columns' ptr
local_state._join_block.set_columns(local_state._join_block.clone_empty_columns());

local_state.add_tuple_is_null_column(&tmp_block);
local_state.add_tuple_is_null_column(&local_state._join_block);
{
SCOPED_TIMER(local_state._join_filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, &tmp_block, tmp_block.columns()));
local_state._conjuncts, &local_state._join_block,
local_state._join_block.columns()));
}
RETURN_IF_ERROR(local_state._build_output_block(&tmp_block, block, false));
RETURN_IF_ERROR(
local_state._build_output_block(&local_state._join_block, block, false));
local_state._reset_tuple_is_null_column();
}
local_state._join_block.clear_column_data();

local_state._join_block.clear_column_data(join_block_column_size);
if (!(*eos) and !local_state._need_more_input_data) {
auto func = [&](auto&& join_op_variants, auto set_build_side_flag,
auto set_probe_side_flag) {
Expand Down
33 changes: 26 additions & 7 deletions be/src/vec/sink/autoinc_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/thrift_rpc_helper.h"

Expand All @@ -45,10 +46,18 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) {
}

Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
LOG_INFO(
"[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, "
"db_id={}, table_id={}, column_id={}, length={}",
_db_id, _table_id, _column_id, length);
constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
_rpc_status = Status::OK();
TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
_rpc_status = Status::InternalError<false>("injected error");
break;
});
TAutoIncrementRangeRequest request;
TAutoIncrementRangeResult result;
request.__set_db_id(_db_id);
Expand All @@ -68,8 +77,9 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {

if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, requested to non-master FE@{}:{}, change "
"to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, column_id={}",
"Failed to fetch auto-increment range, requested to non-master FE@{}:{}, "
"change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, "
"column_id={}",
master_addr.hostname, master_addr.port, result.master_address.hostname,
result.master_address.port, retry_times, _db_id, _table_id, _column_id);
master_addr = result.master_address;
Expand All @@ -79,15 +89,15 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {

if (!_rpc_status.ok()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, encounter rpc failure. "
"Failed to fetch auto-increment range, encounter rpc failure. "
"errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
_rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (result.length != length) [[unlikely]] {
auto msg = fmt::format(
"Failed to fetch auto-incremnt range, request length={}, but get "
"Failed to fetch auto-increment range, request length={}, but get "
"result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
length, result.length, retry_times, _db_id, _table_id, _column_id);
LOG(WARNING) << msg;
Expand All @@ -97,14 +107,14 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
}

LOG_INFO(
"get auto-incremnt range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
"get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
"retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.start, result.length,
get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
return result.start;
}
CHECK(!_rpc_status.ok());
return _rpc_status;
return ResultError(_rpc_status);
}

void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
Expand Down Expand Up @@ -154,10 +164,19 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
auto&& res = _fetch_ids_from_fe(length);
if (!res.has_value()) [[unlikely]] {
auto&& err = res.error();
LOG_WARNING(
"[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment "
"values from fe, db_id={}, table_id={}, column_id={}, status={}",
_db_id, _table_id, _column_id, err);
_is_fetching = false;
return;
}
int64_t start = res.value();
LOG_INFO(
"[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment "
"values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}",
_db_id, _table_id, _column_id, start, length);
{
std::lock_guard<std::mutex> lock {_latch};
_buffers.emplace_back(start, length);
Expand All @@ -168,4 +187,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
return Status::OK();
}

} // namespace doris::vectorized
} // namespace doris::vectorized
12 changes: 10 additions & 2 deletions be/test/common/status_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,16 @@ TEST_F(StatusTest, TStatusCodeWithStatus) {
continue;
}
EXPECT_TRUE(tstatus_st.first < ErrorCode::MAX_ERROR_CODE_DEFINE_NUM);
EXPECT_TRUE(ErrorCode::error_states[tstatus_st.first].description.compare(
tstatus_st.second) == 0);
bool ret = ErrorCode::error_states[tstatus_st.first].description.compare(
tstatus_st.second) == 0;
if (!ret) {
std::cout << "ErrorCode::error_states's " << tstatus_st.first << " compare to "
<< tstatus_st.second
<< " failed. you need check whether TStatusCode defined in thrift "
"is same with ERROR_CODES in status.h"
<< std::endl;
}
EXPECT_TRUE(ret);
}
}

Expand Down
Loading

0 comments on commit 8251753

Please sign in to comment.