Skip to content

Commit

Permalink
[opt](cloud) reduce cache hotspot table write amplification (#45557)
Browse files Browse the repository at this point in the history
1. batch insert cloud_cache_hotspot in FE
2. enlarge polling interval in FE
3. shrink bucket num to 1 for cloud_cache_hotspot table
4. ignore stable statistics only catch the dynamic in BE

Signed-off-by: zhengyu <[email protected]>
  • Loading branch information
freemandealer authored Dec 19, 2024
1 parent 6c57c3c commit b5249a9
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 42 deletions.
94 changes: 56 additions & 38 deletions be/src/cloud/cloud_tablet_hotspot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,55 @@ TabletHotspot::~TabletHotspot() {
}
}

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
void get_return_partitions(
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
hot_partition,
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
last_hot_partition,
std::vector<THotTableMessage>* hot_tables, int& return_partitions, int N) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
auto last_value_iter = last_hot_partition.find(key);
if (last_value_iter != last_hot_partition.end()) {
auto last_partition_iter = last_value_iter->second.find(partition_id);
if (last_partition_iter != last_value_iter->second.end()) {
const auto& last_value = last_partition_iter->second;
if (std::abs(static_cast<int64_t>(value.qpd) -
static_cast<int64_t>(last_value.qpd)) < 5 &&
std::abs(static_cast<int64_t>(value.qpw) -
static_cast<int64_t>(last_value.qpw)) < 10 &&
std::abs(static_cast<int64_t>(value.last_access_time) -
static_cast<int64_t>(last_value.last_access_time)) < 60) {
LOG(INFO) << "skip partition_id=" << partition_id << " qpd=" << value.qpd
<< " qpw=" << value.qpw
<< " last_access_time=" << value.last_access_time
<< " last_qpd=" << last_value.qpd
<< " last_qpw=" << last_value.qpw
<< " last_access_time=" << last_value.last_access_time;
continue;
}
}
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

using TabletHotspotMapKey = std::pair<int64_t, int64_t>;
}

void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables) {
// map<pair<table_id, index_id>, map<partition_id, value>> for day
Expand Down Expand Up @@ -108,33 +145,14 @@ void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_t
});
constexpr int N = 50;
int return_partitions = 0;
auto get_return_partitions =
[=, &return_partitions](
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>& hot_partition) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
get_return_partitions(day_hot_partitions);
get_return_partitions(week_hot_partitions);

get_return_partitions(day_hot_partitions, _last_day_hot_partitions, hot_tables,
return_partitions, N);
get_return_partitions(week_hot_partitions, _last_week_hot_partitions, hot_tables,
return_partitions, N);

_last_day_hot_partitions = std::move(day_hot_partitions);
_last_week_hot_partitions = std::move(week_hot_partitions);
}

void HotspotCounter::make_dot_point() {
Expand Down
19 changes: 19 additions & 0 deletions be/src/cloud/cloud_tablet_hotspot.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ struct HotspotCounter {
};

using HotspotCounterPtr = std::shared_ptr<HotspotCounter>;
using TabletHotspotMapKey = std::pair<int64_t, int64_t>;

struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
}
};

class TabletHotspot {
public:
Expand All @@ -71,6 +84,12 @@ class TabletHotspot {
bool _closed {false};
std::mutex _mtx;
std::condition_variable _cond;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_day_hot_partitions;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_week_hot_partitions;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -3190,11 +3190,11 @@ public static int metaServiceRpcRetryTimes() {
public static boolean enable_fetch_cluster_cache_hotspot = true;

@ConfField(mutable = true)
public static long fetch_cluster_cache_hotspot_interval_ms = 600000;
public static long fetch_cluster_cache_hotspot_interval_ms = 3600000;
// to control the max num of values inserted into cache hotspot internal table
// insert into cache table when the size of batch values reaches this limit
@ConfField(mutable = true)
public static long batch_insert_cluster_cache_hotspot_num = 1000;
public static long batch_insert_cluster_cache_hotspot_num = 5000;

/**
* intervals between be status checks for CloudUpgradeMgr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ public void runAfterCatalogReady() {
}
});
}
triggerBatchInsert();
});
});
triggerBatchInsert();
idToTable.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ public class CacheHotspotManagerUtils {
+ " last_access_time DATETIMEV2)\n"
+ " UNIQUE KEY(cluster_id, backend_id, table_id, index_id, partition_id, insert_day)\n"
+ " PARTITION BY RANGE (insert_day) ()\n"
+ " DISTRIBUTED BY HASH (cluster_id)\n"
+ " DISTRIBUTED BY HASH (cluster_id) BUCKETS 1\n"
+ " PROPERTIES (\n"
+ " \"dynamic_partition.enable\" = \"true\",\n"
+ " \"dynamic_partition.buckets\" = \"1\",\n"
+ " \"dynamic_partition.time_unit\" = \"DAY\",\n"
+ " \"dynamic_partition.start\" = \"-7\",\n"
+ " \"dynamic_partition.end\" = \"3\",\n"
Expand Down

0 comments on commit b5249a9

Please sign in to comment.