Skip to content

Commit

Permalink
Merge branch 'master' into sss
Browse files Browse the repository at this point in the history
  • Loading branch information
Tech-Circle-48 authored Dec 20, 2024
2 parents 487f53a + b515f86 commit 7291a69
Show file tree
Hide file tree
Showing 170 changed files with 2,334 additions and 1,598 deletions.
23 changes: 15 additions & 8 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ namespace doris {
using namespace ErrorCode;

static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
static constexpr int LOAD_INITIATOR_ID = -1;

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}
Expand Down Expand Up @@ -504,13 +505,19 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
int64_t txn_expiration) {
if (rowset.rowset_meta()->rowset_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE) [[unlikely]] {
// May cause the segment files generated by the transient rowset writer unable to be
// recycled, see `CloudRowsetWriter::build` for detail.
LOG(WARNING) << "Wrong rowset state: " << rowset.rowset_meta()->rowset_state();
DCHECK(false) << rowset.rowset_meta()->rowset_state();
if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
auto msg = fmt::format(
"wrong rowset state when create_transient_rowset_writer, rowset state should be "
"BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
tablet_id());
// see `CloudRowsetWriter::build` for detail.
// if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
// in `RowsetMeta::merge_rowset_meta()` in previous trials.
LOG(WARNING) << msg;
DCHECK(false) << msg;
}

RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
Expand Down Expand Up @@ -719,8 +726,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, ms_lock_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
Expand Down
94 changes: 56 additions & 38 deletions be/src/cloud/cloud_tablet_hotspot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,55 @@ TabletHotspot::~TabletHotspot() {
}
}

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
void get_return_partitions(
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
hot_partition,
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
last_hot_partition,
std::vector<THotTableMessage>* hot_tables, int& return_partitions, int N) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
auto last_value_iter = last_hot_partition.find(key);
if (last_value_iter != last_hot_partition.end()) {
auto last_partition_iter = last_value_iter->second.find(partition_id);
if (last_partition_iter != last_value_iter->second.end()) {
const auto& last_value = last_partition_iter->second;
if (std::abs(static_cast<int64_t>(value.qpd) -
static_cast<int64_t>(last_value.qpd)) < 5 &&
std::abs(static_cast<int64_t>(value.qpw) -
static_cast<int64_t>(last_value.qpw)) < 10 &&
std::abs(static_cast<int64_t>(value.last_access_time) -
static_cast<int64_t>(last_value.last_access_time)) < 60) {
LOG(INFO) << "skip partition_id=" << partition_id << " qpd=" << value.qpd
<< " qpw=" << value.qpw
<< " last_access_time=" << value.last_access_time
<< " last_qpd=" << last_value.qpd
<< " last_qpw=" << last_value.qpw
<< " last_access_time=" << last_value.last_access_time;
continue;
}
}
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

using TabletHotspotMapKey = std::pair<int64_t, int64_t>;
}

void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables) {
// map<pair<table_id, index_id>, map<partition_id, value>> for day
Expand Down Expand Up @@ -108,33 +145,14 @@ void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_t
});
constexpr int N = 50;
int return_partitions = 0;
auto get_return_partitions =
[=, &return_partitions](
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>& hot_partition) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
get_return_partitions(day_hot_partitions);
get_return_partitions(week_hot_partitions);

get_return_partitions(day_hot_partitions, _last_day_hot_partitions, hot_tables,
return_partitions, N);
get_return_partitions(week_hot_partitions, _last_week_hot_partitions, hot_tables,
return_partitions, N);

_last_day_hot_partitions = std::move(day_hot_partitions);
_last_week_hot_partitions = std::move(week_hot_partitions);
}

void HotspotCounter::make_dot_point() {
Expand Down
19 changes: 19 additions & 0 deletions be/src/cloud/cloud_tablet_hotspot.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ struct HotspotCounter {
};

using HotspotCounterPtr = std::shared_ptr<HotspotCounter>;
using TabletHotspotMapKey = std::pair<int64_t, int64_t>;

struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
}
};

class TabletHotspot {
public:
Expand All @@ -71,6 +84,12 @@ class TabletHotspot {
bool _closed {false};
std::mutex _mtx;
std::condition_variable _cond;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_day_hot_partitions;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_week_hot_partitions;
};

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,8 @@ DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");

DEFINE_mInt32(compaction_num_per_round, "1");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,8 @@ DECLARE_Bool(enable_table_size_correctness_check);
// Enable sleep 5s between delete cumulative compaction.
DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);

DECLARE_mInt32(compaction_num_per_round);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ void refresh_memory_state_after_memory_change() {
}

void refresh_cache_capacity() {
if (doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load(
std::memory_order_relaxed)) {
// the last cache capacity adjustment has not been completed.
return;
}
if (refresh_cache_capacity_sleep_time_ms <= 0) {
auto cache_capacity_reduce_mem_limit = int64_t(
doris::MemInfo::soft_mem_limit() * config::cache_capacity_reduce_mem_limit_frac);
Expand All @@ -247,6 +252,8 @@ void refresh_cache_capacity() {
new_cache_capacity_adjust_weighted;
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms;
} else {
refresh_cache_capacity_sleep_time_ms = 0;
}
}
refresh_cache_capacity_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;
Expand Down
11 changes: 10 additions & 1 deletion be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ namespace ErrorCode {
E(ENTRY_NOT_FOUND, -7002, false); \
E(INVALID_TABLET_STATE, -7211, false); \
E(ROWSETS_EXPIRED, -7311, false); \
E(CGROUP_ERROR, -7411, false);
E(CGROUP_ERROR, -7411, false); \
E(FATAL_ERROR, -7412, false);

// Define constexpr int error_code_name = error_code_value
#define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE;
Expand Down Expand Up @@ -446,6 +447,14 @@ class [[nodiscard]] Status {

static Status OK() { return {}; }

template <bool stacktrace = true, typename... Args>
static Status FatalError(std::string_view msg, Args&&... args) {
#ifndef NDEBUG
LOG(FATAL) << fmt::format(msg, std::forward<Args>(args)...);
#endif
return Error<ErrorCode::FATAL_ERROR, stacktrace>(msg, std::forward<Args>(args)...);
}

// default have stacktrace. could disable manually.
#define ERROR_CTOR(name, code) \
template <bool stacktrace = true, typename... Args> \
Expand Down
5 changes: 4 additions & 1 deletion be/src/gutil/strings/escaping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <limits>
#include <ostream>

#include "common/exception.h"

using std::numeric_limits;
#include <vector>

Expand Down Expand Up @@ -1084,7 +1086,8 @@ int Base64UnescapeInternal(const char* src, int szsrc, char* dest, int szdest,

default:
// state should have no other values at this point.
LOG(FATAL) << "This can't happen; base64 decoder state = " << state;
throw doris::Exception(
doris::Status::FatalError("This can't happen; base64 decoder state = {}", state));
}

// The remainder of the string should be all whitespace, mixed with
Expand Down
6 changes: 4 additions & 2 deletions be/src/gutil/strings/numbers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <limits>
#include <ostream>

#include "common/exception.h"

using std::numeric_limits;
#include <string>

Expand Down Expand Up @@ -772,8 +774,8 @@ uint64 atoi_kmgt(const char* s) {
scale = GG_ULONGLONG(1) << 40;
break;
default:
LOG(FATAL) << "Invalid mnemonic: `" << c << "';"
<< " should be one of `K', `M', `G', and `T'.";
throw doris::Exception(doris::Status::FatalError(
"Invalid mnemonic: `{}'; should be one of `K', `M', `G', and `T'.", c));
}
}
return n * scale;
Expand Down
5 changes: 3 additions & 2 deletions be/src/gutil/strings/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <mutex>
#include <ostream>

#include "common/exception.h"

using std::copy;
using std::max;
using std::min;
Expand Down Expand Up @@ -489,8 +491,7 @@ const char* strstr_delimited(const char* haystack, const char* needle, char deli
++haystack;
}
}
LOG(FATAL) << "Unreachable statement";
return nullptr;
throw doris::Exception(doris::Status::FatalError("Unreachable statement"));
}

// ----------------------------------------------------------------------
Expand Down
8 changes: 6 additions & 2 deletions be/src/gutil/threading/thread_collision_warner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

#include "gutil/threading/thread_collision_warner.h"

#include "common/exception.h"
#include "common/status.h"

#ifdef __linux__
#include <syscall.h>
#else
Expand All @@ -19,8 +22,9 @@
namespace base {

void DCheckAsserter::warn(int64_t previous_thread_id, int64_t current_thread_id) {
LOG(FATAL) << "Thread Collision! Previous thread id: " << previous_thread_id
<< ", current thread id: " << current_thread_id;
throw doris::Exception(doris::Status::FatalError(
"Thread Collision! Previous thread id: {}, current thread id: {}", previous_thread_id,
current_thread_id));
}

static subtle::Atomic64 CurrentThread() {
Expand Down
5 changes: 2 additions & 3 deletions be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ class FileFactory {
case TStorageBackendType::HDFS:
return TFileType::FILE_HDFS;
default:
LOG(FATAL) << "not match type to convert, from type:" << type;
throw Exception(Status::FatalError("not match type to convert, from type:{}", type));
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
throw Exception(Status::FatalError("__builtin_unreachable"));
}
};

Expand Down
12 changes: 4 additions & 8 deletions be/src/olap/block_column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,21 @@ class BlockColumnPredicate {
}

virtual bool can_do_apply_safely(PrimitiveType input_type, bool is_null) const {
LOG(FATAL) << "should not reach here";
return true;
throw Exception(Status::FatalError("should not reach here"));
}

virtual bool support_zonemap() const { return true; }

virtual bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const {
LOG(FATAL) << "should not reach here";
return true;
throw Exception(Status::FatalError("should not reach here"));
}

virtual bool evaluate_and(const segment_v2::BloomFilter* bf) const {
LOG(FATAL) << "should not reach here";
return true;
throw Exception(Status::FatalError("should not reach here"));
}

virtual bool evaluate_and(const StringRef* dict_words, const size_t dict_num) const {
LOG(FATAL) << "should not reach here";
return true;
throw Exception(Status::FatalError("should not reach here"));
}

virtual bool can_do_bloom_filter(bool ngram) const { return false; }
Expand Down
Loading

0 comments on commit 7291a69

Please sign in to comment.