Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](cloud) reconnect after the RPC request to the meta service fails #45668

Merged
merged 8 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 132 additions & 59 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,89 @@ class MetaServiceProxy {
public:
static Status get_client(std::shared_ptr<MetaService_Stub>* stub) {
TEST_SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client", Status::OK(), stub);
return get_pooled_client(stub);
return get_pooled_client(stub, nullptr);
}

static Status get_proxy(MetaServiceProxy** proxy) {
// The 'stub' is a useless parameter, added only to reuse the `get_pooled_client` function.
std::shared_ptr<MetaService_Stub> stub;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment for this function, and the stub is a placeholder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return get_pooled_client(&stub, proxy);
}

void set_unhealthy() {
std::unique_lock lock(_mutex);
maybe_unhealthy = true;
}

bool need_reconn(long now) {
return maybe_unhealthy && ((now - last_reconn_time_ms.front()) >
config::meta_service_rpc_reconnect_interval_ms);
}

Status get(std::shared_ptr<MetaService_Stub>* stub) {
using namespace std::chrono;

auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
{
std::shared_lock lock(_mutex);
if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) {
_last_access_at_ms.store(now, std::memory_order_relaxed);
*stub = _stub;
return Status::OK();
}
}

auto channel = std::make_unique<brpc::Channel>();
Status s = init_channel(channel.get());
if (!s.ok()) [[unlikely]] {
return s;
}

*stub = std::make_shared<MetaService_Stub>(channel.release(),
google::protobuf::Service::STUB_OWNS_CHANNEL);

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list() &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
config::meta_service_connection_age_base_seconds,
config::meta_service_connection_age_base_seconds * 2);
deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
} else {
deadline = LONG_MAX;
}

// Last one WIN
std::unique_lock lock(_mutex);
_last_access_at_ms.store(now, std::memory_order_relaxed);
_deadline_ms = deadline;
_stub = *stub;

last_reconn_time_ms.push(now);
last_reconn_time_ms.pop();
maybe_unhealthy = false;

return Status::OK();
}

private:
static bool is_meta_service_endpoint_list() {
return config::meta_service_endpoint.find(',') != std::string::npos;
}

static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
/**
* This function initializes a pool of `MetaServiceProxy` objects and selects one using
* round-robin. It returns a client stub via the selected proxy.
*
* @param stub A pointer to a shared pointer of `MetaService_Stub` to be retrieved.
* @param proxy (Optional) A pointer to store the selected `MetaServiceProxy`.
*
* @return Status Returns `Status::OK()` on success or an error status on failure.
*/
static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment for this methods including behavior and params

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

MetaServiceProxy** proxy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try not to use double stars. is it possible to use a pointer to an unique_ptr?

static std::once_flag proxies_flag;
static size_t num_proxies = 1;
static std::atomic<size_t> index(0);
Expand All @@ -164,10 +238,16 @@ class MetaServiceProxy {
for (size_t i = 0; i + 1 < num_proxies; ++i) {
size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
Status s = proxies[next_index].get(stub);
if (proxy != nullptr) {
*proxy = &(proxies[next_index]);
}
if (s.ok()) return Status::OK();
}

size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
if (proxy != nullptr) {
*proxy = &(proxies[next_index]);
}
return proxies[next_index].get(stub);
}

Expand Down Expand Up @@ -220,53 +300,13 @@ class MetaServiceProxy {
_last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
}

Status get(std::shared_ptr<MetaService_Stub>* stub) {
using namespace std::chrono;

auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
{
std::shared_lock lock(_mutex);
if (_deadline_ms >= now && !is_idle_timeout(now)) {
_last_access_at_ms.store(now, std::memory_order_relaxed);
*stub = _stub;
return Status::OK();
}
}

auto channel = std::make_unique<brpc::Channel>();
Status s = init_channel(channel.get());
if (!s.ok()) [[unlikely]] {
return s;
}

*stub = std::make_shared<MetaService_Stub>(channel.release(),
google::protobuf::Service::STUB_OWNS_CHANNEL);

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list() &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
config::meta_service_connection_age_base_seconds,
config::meta_service_connection_age_base_seconds * 2);
deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
} else {
deadline = LONG_MAX;
}

// Last one WIN
std::unique_lock lock(_mutex);
_last_access_at_ms.store(now, std::memory_order_relaxed);
_deadline_ms = deadline;
_stub = *stub;
return Status::OK();
}

std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;

std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}};
bool maybe_unhealthy = false;
};

template <typename T, typename... Ts>
Expand Down Expand Up @@ -323,16 +363,19 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(20, 200);
std::uniform_int_distribution<uint32_t> u2(500, 1000);
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use a new proxy every time we retry?

while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
cntl.set_max_retry(kBrpcRetryTimes);
res->Clear();
(stub.get()->*method)(&cntl, &req, res, nullptr);
if (cntl.Failed()) [[unlikely]] {
error_msg = cntl.ErrorText();
proxy->set_unhealthy();
} else if (res->status().code() == MetaServiceCode::OK) {
return Status::OK();
} else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
Expand Down Expand Up @@ -388,11 +431,12 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_

TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet);

std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));

MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
int tried = 0;
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
GetRowsetRequest req;
Expand Down Expand Up @@ -430,6 +474,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
_get_rowset_latency << latency;
int retry_times = config::meta_service_rpc_retry_times;
if (cntl.Failed()) {
proxy->set_unhealthy();
if (tried++ < retry_times) {
auto rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(20, 200);
Expand Down Expand Up @@ -633,15 +678,10 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
*delete_bitmap = *new_delete_bitmap;
}

std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));

int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version());
brpc::Controller cntl;
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
GetDeleteBitmapRequest req;
GetDeleteBitmapResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down Expand Up @@ -669,10 +709,43 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
}

VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
if (cntl.Failed()) {
return Status::RpcError("failed to get delete bitmap: {}", cntl.ErrorText());

int retry_times = 0;
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
auto start = std::chrono::high_resolution_clock::now();
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
brpc::Controller cntl;
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
cntl.set_max_retry(kBrpcRetryTimes);
res.Clear();
stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
if (cntl.Failed()) [[unlikely]] {
LOG_INFO("failed to get delete bitmap")
.tag("reason", cntl.ErrorText())
.tag("tablet_id", tablet->tablet_id())
.tag("partition_id", tablet->partition_id())
.tag("tried", retry_times);
proxy->set_unhealthy();
} else {
break;
}

if (++retry_times > config::delete_bitmap_rpc_retry_times) {
if (cntl.Failed()) {
return Status::RpcError("failed to get delete bitmap, tablet={} err={}",
tablet->tablet_id(), cntl.ErrorText());
}
break;
}
}
auto end = std::chrono::high_resolution_clock::now();

if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
return Status::NotFound("failed to get delete bitmap: {}", res.status().msg());
}
Expand Down Expand Up @@ -722,7 +795,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
{rst_id, segment_ids[i], vers[i]},
roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length()));
}
int64_t latency = cntl.latency_us();
int64_t latency = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
if (latency > 100 * 1000) { // 100ms
LOG(INFO) << "finish get_delete_bitmap rpc. rowset_ids.size()=" << rowset_ids.size()
<< ", delete_bitmaps.size()=" << delete_bitmaps.size() << ", latency=" << latency
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,9 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");

DEFINE_mBool(enable_cloud_tablet_report, "true");

DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");

DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000");
#include "common/compile_check_end.h"
} // namespace doris::config
4 changes: 4 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,9 @@ DECLARE_mBool(enable_use_cloud_unique_id_from_fe);

DECLARE_Bool(enable_cloud_tablet_report);

DECLARE_mInt32(delete_bitmap_rpc_retry_times);

DECLARE_mInt64(meta_service_rpc_reconnect_interval_ms);

#include "common/compile_check_end.h"
} // namespace doris::config
Original file line number Diff line number Diff line change
Expand Up @@ -3362,4 +3362,8 @@ public static int metaServiceRpcRetryTimes() {
"For disabling certain SQL queries, the configuration item is a list of simple class names of AST"
+ "(for example CreateRepositoryStmt, CreatePolicyCommand), separated by commas."})
public static String block_sql_ast_names = "";

public static long meta_service_rpc_reconnect_interval_ms = 5000;

public static long meta_service_rpc_retry_cnt = 10;
}
Loading
Loading