Skip to content

Commit

Permalink
Merge branch 'apache:master' into sss
Browse files Browse the repository at this point in the history
  • Loading branch information
Tech-Circle-48 authored Dec 21, 2024
2 parents e2ffdf2 + 0ab4eae commit 804640e
Show file tree
Hide file tree
Showing 236 changed files with 3,280 additions and 1,896 deletions.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ github:
- COMPILE (DORIS_COMPILE)
- Need_2_Approval
- Cloud UT (Doris Cloud UT)
- performance (Doris Performance)

required_pull_request_reviews:
dismiss_stale_reviews: true
Expand Down
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,9 @@ Apache Doris is an easy-to-use, high-performance and real-time analytical databa

All this makes Apache Doris an ideal tool for scenarios including report analysis, ad-hoc query, unified data warehouse, and data lake query acceleration. On Apache Doris, users can build various applications, such as user behavior analysis, AB test platform, log retrieval analysis, user portrait analysis, and order analysis.

🎉 Version 2.1.4 released now. Check out the 🔗[Release Notes](https://doris.apache.org/docs/releasenotes/release-2.1.4) here. The 2.1 verison delivers exceptional performance with 100% higher out-of-the-box queries proven by TPC-DS 1TB tests, enhanced data lake analytics that are 4-6 times speedier than Trino and Spark, solid support for semi-structured data analysis with new Variant types and suite of analytical functions, asynchronous materialized views for query acceleration, optimized real-time writing at scale, and better workload management with stability and runtime SQL resource tracking.
🎉 Check out the 🔗[All releases](https://doris.apache.org/docs/releasenotes/all-release), where you'll find a chronological summary of Apache Doris versions released over the past year.


🎉 Version 2.0.12 is now released ! This fully evolved and stable release is ready for all users to upgrade. Check out the 🔗[Release Notes](https://doris.apache.org/docs/2.0/releasenotes/release-2.0.12) here.

👀 Have a look at the 🔗[Official Website](https://doris.apache.org/) for a comprehensive list of Apache Doris's core features, blogs and user cases.
👀 Explore the 🔗[Official Website](https://doris.apache.org/) to discover Apache Doris's core features, blogs, and user cases in detail.

## 📈 Usage Scenarios

Expand Down
23 changes: 15 additions & 8 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ namespace doris {
using namespace ErrorCode;

static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
static constexpr int LOAD_INITIATOR_ID = -1;

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}
Expand Down Expand Up @@ -504,13 +505,19 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
int64_t txn_expiration) {
if (rowset.rowset_meta()->rowset_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE) [[unlikely]] {
// May cause the segment files generated by the transient rowset writer unable to be
// recycled, see `CloudRowsetWriter::build` for detail.
LOG(WARNING) << "Wrong rowset state: " << rowset.rowset_meta()->rowset_state();
DCHECK(false) << rowset.rowset_meta()->rowset_state();
if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
auto msg = fmt::format(
"wrong rowset state when create_transient_rowset_writer, rowset state should be "
"BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
tablet_id());
// see `CloudRowsetWriter::build` for detail.
// if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
// in `RowsetMeta::merge_rowset_meta()` in previous trials.
LOG(WARNING) << msg;
DCHECK(false) << msg;
}

RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
Expand Down Expand Up @@ -719,8 +726,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, ms_lock_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
Expand Down
94 changes: 56 additions & 38 deletions be/src/cloud/cloud_tablet_hotspot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,55 @@ TabletHotspot::~TabletHotspot() {
}
}

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
void get_return_partitions(
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
hot_partition,
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
last_hot_partition,
std::vector<THotTableMessage>* hot_tables, int& return_partitions, int N) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
auto last_value_iter = last_hot_partition.find(key);
if (last_value_iter != last_hot_partition.end()) {
auto last_partition_iter = last_value_iter->second.find(partition_id);
if (last_partition_iter != last_value_iter->second.end()) {
const auto& last_value = last_partition_iter->second;
if (std::abs(static_cast<int64_t>(value.qpd) -
static_cast<int64_t>(last_value.qpd)) < 5 &&
std::abs(static_cast<int64_t>(value.qpw) -
static_cast<int64_t>(last_value.qpw)) < 10 &&
std::abs(static_cast<int64_t>(value.last_access_time) -
static_cast<int64_t>(last_value.last_access_time)) < 60) {
LOG(INFO) << "skip partition_id=" << partition_id << " qpd=" << value.qpd
<< " qpw=" << value.qpw
<< " last_access_time=" << value.last_access_time
<< " last_qpd=" << last_value.qpd
<< " last_qpw=" << last_value.qpw
<< " last_access_time=" << last_value.last_access_time;
continue;
}
}
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

using TabletHotspotMapKey = std::pair<int64_t, int64_t>;
}

void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables) {
// map<pair<table_id, index_id>, map<partition_id, value>> for day
Expand Down Expand Up @@ -108,33 +145,14 @@ void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_t
});
constexpr int N = 50;
int return_partitions = 0;
auto get_return_partitions =
[=, &return_partitions](
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>& hot_partition) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
get_return_partitions(day_hot_partitions);
get_return_partitions(week_hot_partitions);

get_return_partitions(day_hot_partitions, _last_day_hot_partitions, hot_tables,
return_partitions, N);
get_return_partitions(week_hot_partitions, _last_week_hot_partitions, hot_tables,
return_partitions, N);

_last_day_hot_partitions = std::move(day_hot_partitions);
_last_week_hot_partitions = std::move(week_hot_partitions);
}

void HotspotCounter::make_dot_point() {
Expand Down
19 changes: 19 additions & 0 deletions be/src/cloud/cloud_tablet_hotspot.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ struct HotspotCounter {
};

using HotspotCounterPtr = std::shared_ptr<HotspotCounter>;
using TabletHotspotMapKey = std::pair<int64_t, int64_t>;

struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
}
};

class TabletHotspot {
public:
Expand All @@ -71,6 +84,12 @@ class TabletHotspot {
bool _closed {false};
std::mutex _mtx;
std::condition_variable _cond;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_day_hot_partitions;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_week_hot_partitions;
};

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,8 @@ DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");

DEFINE_mInt32(compaction_num_per_round, "1");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,8 @@ DECLARE_Bool(enable_table_size_correctness_check);
// Enable sleep 5s between delete cumulative compaction.
DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);

DECLARE_mInt32(compaction_num_per_round);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ void refresh_memory_state_after_memory_change() {
}

void refresh_cache_capacity() {
if (doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load(
std::memory_order_relaxed)) {
// the last cache capacity adjustment has not been completed.
return;
}
if (refresh_cache_capacity_sleep_time_ms <= 0) {
auto cache_capacity_reduce_mem_limit = int64_t(
doris::MemInfo::soft_mem_limit() * config::cache_capacity_reduce_mem_limit_frac);
Expand All @@ -247,6 +252,8 @@ void refresh_cache_capacity() {
new_cache_capacity_adjust_weighted;
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms;
} else {
refresh_cache_capacity_sleep_time_ms = 0;
}
}
refresh_cache_capacity_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;
Expand Down
11 changes: 10 additions & 1 deletion be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ namespace ErrorCode {
E(ENTRY_NOT_FOUND, -7002, false); \
E(INVALID_TABLET_STATE, -7211, false); \
E(ROWSETS_EXPIRED, -7311, false); \
E(CGROUP_ERROR, -7411, false);
E(CGROUP_ERROR, -7411, false); \
E(FATAL_ERROR, -7412, false);

// Define constexpr int error_code_name = error_code_value
#define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE;
Expand Down Expand Up @@ -446,6 +447,14 @@ class [[nodiscard]] Status {

static Status OK() { return {}; }

template <bool stacktrace = true, typename... Args>
static Status FatalError(std::string_view msg, Args&&... args) {
#ifndef NDEBUG
LOG(FATAL) << fmt::format(msg, std::forward<Args>(args)...);
#endif
return Error<ErrorCode::FATAL_ERROR, stacktrace>(msg, std::forward<Args>(args)...);
}

// default have stacktrace. could disable manually.
#define ERROR_CTOR(name, code) \
template <bool stacktrace = true, typename... Args> \
Expand Down
5 changes: 4 additions & 1 deletion be/src/gutil/strings/escaping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <limits>
#include <ostream>

#include "common/exception.h"

using std::numeric_limits;
#include <vector>

Expand Down Expand Up @@ -1084,7 +1086,8 @@ int Base64UnescapeInternal(const char* src, int szsrc, char* dest, int szdest,

default:
// state should have no other values at this point.
LOG(FATAL) << "This can't happen; base64 decoder state = " << state;
throw doris::Exception(
doris::Status::FatalError("This can't happen; base64 decoder state = {}", state));
}

// The remainder of the string should be all whitespace, mixed with
Expand Down
6 changes: 4 additions & 2 deletions be/src/gutil/strings/numbers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <limits>
#include <ostream>

#include "common/exception.h"

using std::numeric_limits;
#include <string>

Expand Down Expand Up @@ -772,8 +774,8 @@ uint64 atoi_kmgt(const char* s) {
scale = GG_ULONGLONG(1) << 40;
break;
default:
LOG(FATAL) << "Invalid mnemonic: `" << c << "';"
<< " should be one of `K', `M', `G', and `T'.";
throw doris::Exception(doris::Status::FatalError(
"Invalid mnemonic: `{}'; should be one of `K', `M', `G', and `T'.", c));
}
}
return n * scale;
Expand Down
5 changes: 3 additions & 2 deletions be/src/gutil/strings/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <mutex>
#include <ostream>

#include "common/exception.h"

using std::copy;
using std::max;
using std::min;
Expand Down Expand Up @@ -489,8 +491,7 @@ const char* strstr_delimited(const char* haystack, const char* needle, char deli
++haystack;
}
}
LOG(FATAL) << "Unreachable statement";
return nullptr;
throw doris::Exception(doris::Status::FatalError("Unreachable statement"));
}

// ----------------------------------------------------------------------
Expand Down
8 changes: 6 additions & 2 deletions be/src/gutil/threading/thread_collision_warner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

#include "gutil/threading/thread_collision_warner.h"

#include "common/exception.h"
#include "common/status.h"

#ifdef __linux__
#include <syscall.h>
#else
Expand All @@ -19,8 +22,9 @@
namespace base {

void DCheckAsserter::warn(int64_t previous_thread_id, int64_t current_thread_id) {
LOG(FATAL) << "Thread Collision! Previous thread id: " << previous_thread_id
<< ", current thread id: " << current_thread_id;
throw doris::Exception(doris::Status::FatalError(
"Thread Collision! Previous thread id: {}, current thread id: {}", previous_thread_id,
current_thread_id));
}

static subtle::Atomic64 CurrentThread() {
Expand Down
5 changes: 2 additions & 3 deletions be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ class FileFactory {
case TStorageBackendType::HDFS:
return TFileType::FILE_HDFS;
default:
LOG(FATAL) << "not match type to convert, from type:" << type;
throw Exception(Status::FatalError("not match type to convert, from type:{}", type));
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
throw Exception(Status::FatalError("__builtin_unreachable"));
}
};

Expand Down
Loading

0 comments on commit 804640e

Please sign in to comment.