Skip to content

Commit

Permalink
[enhancement](cloud) reconnect after the RPC request to the meta serv…
Browse files Browse the repository at this point in the history
…ice fails (#45668)

Co-authored-by: Gavin Chou <[email protected]>
  • Loading branch information
luwei16 and gavinchou authored Jan 2, 2025
1 parent 81e8182 commit 45f440d
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 64 deletions.
191 changes: 132 additions & 59 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,89 @@ class MetaServiceProxy {
public:
static Status get_client(std::shared_ptr<MetaService_Stub>* stub) {
TEST_SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client", Status::OK(), stub);
return get_pooled_client(stub);
return get_pooled_client(stub, nullptr);
}

static Status get_proxy(MetaServiceProxy** proxy) {
// The 'stub' is a useless parameter, added only to reuse the `get_pooled_client` function.
std::shared_ptr<MetaService_Stub> stub;
return get_pooled_client(&stub, proxy);
}

void set_unhealthy() {
std::unique_lock lock(_mutex);
maybe_unhealthy = true;
}

bool need_reconn(long now) {
return maybe_unhealthy && ((now - last_reconn_time_ms.front()) >
config::meta_service_rpc_reconnect_interval_ms);
}

Status get(std::shared_ptr<MetaService_Stub>* stub) {
using namespace std::chrono;

auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
{
std::shared_lock lock(_mutex);
if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) {
_last_access_at_ms.store(now, std::memory_order_relaxed);
*stub = _stub;
return Status::OK();
}
}

auto channel = std::make_unique<brpc::Channel>();
Status s = init_channel(channel.get());
if (!s.ok()) [[unlikely]] {
return s;
}

*stub = std::make_shared<MetaService_Stub>(channel.release(),
google::protobuf::Service::STUB_OWNS_CHANNEL);

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list() &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
config::meta_service_connection_age_base_seconds,
config::meta_service_connection_age_base_seconds * 2);
deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
} else {
deadline = LONG_MAX;
}

// Last one WIN
std::unique_lock lock(_mutex);
_last_access_at_ms.store(now, std::memory_order_relaxed);
_deadline_ms = deadline;
_stub = *stub;

last_reconn_time_ms.push(now);
last_reconn_time_ms.pop();
maybe_unhealthy = false;

return Status::OK();
}

private:
static bool is_meta_service_endpoint_list() {
return config::meta_service_endpoint.find(',') != std::string::npos;
}

static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
/**
* This function initializes a pool of `MetaServiceProxy` objects and selects one using
* round-robin. It returns a client stub via the selected proxy.
*
* @param stub A pointer to a shared pointer of `MetaService_Stub` to be retrieved.
* @param proxy (Optional) A pointer to store the selected `MetaServiceProxy`.
*
* @return Status Returns `Status::OK()` on success or an error status on failure.
*/
static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub,
MetaServiceProxy** proxy) {
static std::once_flag proxies_flag;
static size_t num_proxies = 1;
static std::atomic<size_t> index(0);
Expand All @@ -164,10 +238,16 @@ class MetaServiceProxy {
for (size_t i = 0; i + 1 < num_proxies; ++i) {
size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
Status s = proxies[next_index].get(stub);
if (proxy != nullptr) {
*proxy = &(proxies[next_index]);
}
if (s.ok()) return Status::OK();
}

size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
if (proxy != nullptr) {
*proxy = &(proxies[next_index]);
}
return proxies[next_index].get(stub);
}

Expand Down Expand Up @@ -220,53 +300,13 @@ class MetaServiceProxy {
_last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
}

Status get(std::shared_ptr<MetaService_Stub>* stub) {
using namespace std::chrono;

auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
{
std::shared_lock lock(_mutex);
if (_deadline_ms >= now && !is_idle_timeout(now)) {
_last_access_at_ms.store(now, std::memory_order_relaxed);
*stub = _stub;
return Status::OK();
}
}

auto channel = std::make_unique<brpc::Channel>();
Status s = init_channel(channel.get());
if (!s.ok()) [[unlikely]] {
return s;
}

*stub = std::make_shared<MetaService_Stub>(channel.release(),
google::protobuf::Service::STUB_OWNS_CHANNEL);

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list() &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
config::meta_service_connection_age_base_seconds,
config::meta_service_connection_age_base_seconds * 2);
deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
} else {
deadline = LONG_MAX;
}

// Last one WIN
std::unique_lock lock(_mutex);
_last_access_at_ms.store(now, std::memory_order_relaxed);
_deadline_ms = deadline;
_stub = *stub;
return Status::OK();
}

std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;

std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}};
bool maybe_unhealthy = false;
};

template <typename T, typename... Ts>
Expand Down Expand Up @@ -323,16 +363,19 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(20, 200);
std::uniform_int_distribution<uint32_t> u2(500, 1000);
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
cntl.set_max_retry(kBrpcRetryTimes);
res->Clear();
(stub.get()->*method)(&cntl, &req, res, nullptr);
if (cntl.Failed()) [[unlikely]] {
error_msg = cntl.ErrorText();
proxy->set_unhealthy();
} else if (res->status().code() == MetaServiceCode::OK) {
return Status::OK();
} else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
Expand Down Expand Up @@ -388,11 +431,12 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_

TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet);

std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));

MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
int tried = 0;
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
GetRowsetRequest req;
Expand Down Expand Up @@ -430,6 +474,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
_get_rowset_latency << latency;
int retry_times = config::meta_service_rpc_retry_times;
if (cntl.Failed()) {
proxy->set_unhealthy();
if (tried++ < retry_times) {
auto rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(20, 200);
Expand Down Expand Up @@ -633,15 +678,10 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
*delete_bitmap = *new_delete_bitmap;
}

std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));

int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version());
brpc::Controller cntl;
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
GetDeleteBitmapRequest req;
GetDeleteBitmapResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down Expand Up @@ -669,10 +709,43 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
}

VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
if (cntl.Failed()) {
return Status::RpcError("failed to get delete bitmap: {}", cntl.ErrorText());

int retry_times = 0;
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
auto start = std::chrono::high_resolution_clock::now();
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
brpc::Controller cntl;
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
cntl.set_max_retry(kBrpcRetryTimes);
res.Clear();
stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
if (cntl.Failed()) [[unlikely]] {
LOG_INFO("failed to get delete bitmap")
.tag("reason", cntl.ErrorText())
.tag("tablet_id", tablet->tablet_id())
.tag("partition_id", tablet->partition_id())
.tag("tried", retry_times);
proxy->set_unhealthy();
} else {
break;
}

if (++retry_times > config::delete_bitmap_rpc_retry_times) {
if (cntl.Failed()) {
return Status::RpcError("failed to get delete bitmap, tablet={} err={}",
tablet->tablet_id(), cntl.ErrorText());
}
break;
}
}
auto end = std::chrono::high_resolution_clock::now();

if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
return Status::NotFound("failed to get delete bitmap: {}", res.status().msg());
}
Expand Down Expand Up @@ -722,7 +795,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
{rst_id, segment_ids[i], vers[i]},
roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length()));
}
int64_t latency = cntl.latency_us();
int64_t latency = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
if (latency > 100 * 1000) { // 100ms
LOG(INFO) << "finish get_delete_bitmap rpc. rowset_ids.size()=" << rowset_ids.size()
<< ", delete_bitmaps.size()=" << delete_bitmaps.size() << ", latency=" << latency
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,9 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");

DEFINE_mBool(enable_cloud_tablet_report, "true");

DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");

DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000");
#include "common/compile_check_end.h"
} // namespace doris::config
4 changes: 4 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,9 @@ DECLARE_mBool(enable_use_cloud_unique_id_from_fe);

DECLARE_Bool(enable_cloud_tablet_report);

DECLARE_mInt32(delete_bitmap_rpc_retry_times);

DECLARE_mInt64(meta_service_rpc_reconnect_interval_ms);

#include "common/compile_check_end.h"
} // namespace doris::config
Original file line number Diff line number Diff line change
Expand Up @@ -3362,4 +3362,8 @@ public static int metaServiceRpcRetryTimes() {
"For disabling certain SQL queries, the configuration item is a list of simple class names of AST"
+ "(for example CreateRepositoryStmt, CreatePolicyCommand), separated by commas."})
public static String block_sql_ast_names = "";

public static long meta_service_rpc_reconnect_interval_ms = 5000;

public static long meta_service_rpc_retry_cnt = 10;
}
Loading

0 comments on commit 45f440d

Please sign in to comment.