Skip to content

Commit

Permalink
[enhancement](cloud) reconnect after the RPC request to the meta serv…
Browse files Browse the repository at this point in the history
…ice fails
  • Loading branch information
luwei16 committed Dec 19, 2024
1 parent 088d2fe commit efb1c77
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 59 deletions.
177 changes: 119 additions & 58 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,75 @@ class MetaServiceProxy {
public:
static Status get_client(std::shared_ptr<MetaService_Stub>* stub) {
TEST_SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client", Status::OK(), stub);
return get_pooled_client(stub);
return get_pooled_client(stub, nullptr);
}

static Status get_proxy(MetaServiceProxy** proxy) {
std::shared_ptr<MetaService_Stub> stub;
return get_pooled_client(&stub, proxy);
}

void set_unhealthy() {
std::unique_lock lock(_mutex);
maybe_unhealthy = true;
}

bool need_reconn(long now) {
return maybe_unhealthy &&
((now - last_reconn_time_ms.front()) > config::ms_rpc_reconn_interval_ms);
}

Status get(std::shared_ptr<MetaService_Stub>* stub) {
using namespace std::chrono;

auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
{
std::shared_lock lock(_mutex);
if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) {
_last_access_at_ms.store(now, std::memory_order_relaxed);
*stub = _stub;
return Status::OK();
}
}

auto channel = std::make_unique<brpc::Channel>();
Status s = init_channel(channel.get());
if (!s.ok()) [[unlikely]] {
return s;
}

*stub = std::make_shared<MetaService_Stub>(channel.release(),
google::protobuf::Service::STUB_OWNS_CHANNEL);

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
config::meta_service_connection_age_base_seconds,
config::meta_service_connection_age_base_seconds * 2);
deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
} else {
deadline = LONG_MAX;
}

// Last one WIN
std::unique_lock lock(_mutex);
_last_access_at_ms.store(now, std::memory_order_relaxed);
_deadline_ms = deadline;
_stub = *stub;

last_reconn_time_ms.push(now);
last_reconn_time_ms.pop();
maybe_unhealthy = false;

return Status::OK();
}

private:
static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub,
MetaServiceProxy** proxy) {
static std::once_flag proxies_flag;
static size_t num_proxies = 1;
static std::atomic<size_t> index(0);
Expand All @@ -163,10 +227,16 @@ class MetaServiceProxy {
for (size_t i = 0; i + 1 < num_proxies; ++i) {
size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
Status s = proxies[next_index].get(stub);
if (proxy != nullptr) {
*proxy = &(proxies[next_index]);
}
if (s.ok()) return Status::OK();
}

size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
if (proxy != nullptr) {
*proxy = &(proxies[next_index]);
}
return proxies[next_index].get(stub);
}

Expand Down Expand Up @@ -219,55 +289,15 @@ class MetaServiceProxy {
_last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
}

Status get(std::shared_ptr<MetaService_Stub>* stub) {
using namespace std::chrono;

auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
{
std::shared_lock lock(_mutex);
if (_deadline_ms >= now && !is_idle_timeout(now)) {
_last_access_at_ms.store(now, std::memory_order_relaxed);
*stub = _stub;
return Status::OK();
}
}

auto channel = std::make_unique<brpc::Channel>();
Status s = init_channel(channel.get());
if (!s.ok()) [[unlikely]] {
return s;
}

*stub = std::make_shared<MetaService_Stub>(channel.release(),
google::protobuf::Service::STUB_OWNS_CHANNEL);

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
config::meta_service_connection_age_base_seconds,
config::meta_service_connection_age_base_seconds * 2);
deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
} else {
deadline = LONG_MAX;
}

// Last one WIN
std::unique_lock lock(_mutex);
_last_access_at_ms.store(now, std::memory_order_relaxed);
_deadline_ms = deadline;
_stub = *stub;
return Status::OK();
}

static std::atomic_bool is_meta_service_endpoint_list;

std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;

std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}};
bool maybe_unhealthy = false;
};

std::atomic_bool MetaServiceProxy::is_meta_service_endpoint_list = false;
Expand Down Expand Up @@ -326,16 +356,19 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(20, 200);
std::uniform_int_distribution<uint32_t> u2(500, 1000);
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
cntl.set_max_retry(kBrpcRetryTimes);
res->Clear();
(stub.get()->*method)(&cntl, &req, res, nullptr);
if (cntl.Failed()) [[unlikely]] {
error_msg = cntl.ErrorText();
proxy->set_unhealthy();
} else if (res->status().code() == MetaServiceCode::OK) {
return Status::OK();
} else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
Expand Down Expand Up @@ -391,11 +424,12 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_

TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet);

std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));

MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
int tried = 0;
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
GetRowsetRequest req;
Expand Down Expand Up @@ -447,6 +481,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
.tag("partition_id", tablet->partition_id())
.tag("tried", tried)
.tag("sleep", duration_ms);
proxy->set_unhealthy();
continue;
}
return Status::RpcError("failed to get rowset meta: {}", cntl.ErrorText());
Expand Down Expand Up @@ -636,15 +671,10 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
*delete_bitmap = *new_delete_bitmap;
}

std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));

int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version());
brpc::Controller cntl;
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
GetDeleteBitmapRequest req;
GetDeleteBitmapResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down Expand Up @@ -672,10 +702,41 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
}

VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
if (cntl.Failed()) {
return Status::RpcError("failed to get delete bitmap: {}", cntl.ErrorText());

int retry_times = 0;
brpc::Controller cntl;
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
cntl.set_max_retry(kBrpcRetryTimes);
res.Clear();
stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
if (cntl.Failed()) [[unlikely]] {
LOG_INFO("failed to get delete bitmap")
.tag("reason", cntl.ErrorText())
.tag("tablet_id", tablet->tablet_id())
.tag("partition_id", tablet->partition_id())
.tag("tried", retry_times);
proxy->set_unhealthy();
} else {
break;
}

if (++retry_times > config::delete_bitmap_rpc_retry_times) {
if (cntl.Failed()) {
return Status::RpcError("failed to get delete bitmap, tablet={} err={}",
tablet->tablet_id(), cntl.ErrorText());
}
break;
}
}

if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
return Status::NotFound("failed to get delete bitmap: {}", res.status().msg());
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,9 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");

DEFINE_mBool(enable_cloud_tablet_report, "true");

DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");

DEFINE_mInt64(ms_rpc_reconn_interval_ms, "20000");
#include "common/compile_check_end.h"
} // namespace doris::config
4 changes: 4 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,9 @@ DECLARE_mBool(enable_use_cloud_unique_id_from_fe);

DECLARE_Bool(enable_cloud_tablet_report);

DECLARE_mInt32(delete_bitmap_rpc_retry_times);

DECLARE_mInt64(ms_rpc_reconn_interval_ms);

#include "common/compile_check_end.h"
} // namespace doris::config
Original file line number Diff line number Diff line change
Expand Up @@ -3307,4 +3307,6 @@ public static int metaServiceRpcRetryTimes() {
"For disabling certain SQL queries, the configuration item is a list of simple class names of AST"
+ "(for example CreateRepositoryStmt, CreatePolicyCommand), separated by commas."})
public static String block_sql_ast_names = "";

public static long ms_rpc_reconn_interval_ms = 20000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -41,6 +43,7 @@ public class MetaServiceProxy {
// use concurrent map to allow access serviceMap in multi thread.
private ReentrantLock lock = new ReentrantLock();
private final Map<String, MetaServiceClient> serviceMap;
private Queue<Long> lastConnTimeMs = new LinkedList<>();

static {
if (Config.isCloudMode() && (Config.meta_service_endpoint == null || Config.meta_service_endpoint.isEmpty())) {
Expand All @@ -50,6 +53,9 @@ public class MetaServiceProxy {

public MetaServiceProxy() {
this.serviceMap = Maps.newConcurrentMap();
for (int i = 0; i < 3; ++i) {
lastConnTimeMs.add(0L);
}
}

private static class SingletonHolder {
Expand Down Expand Up @@ -77,6 +83,16 @@ public static MetaServiceProxy getInstance() {
return MetaServiceProxy.SingletonHolder.get();
}

public boolean needReconn() {
lock.lock();
try {
long now = System.currentTimeMillis();
return (now - lastConnTimeMs.element() > Config.ms_rpc_reconn_interval_ms);
} finally {
lock.unlock();
}
}

public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request)
throws RpcException {
try {
Expand Down Expand Up @@ -138,6 +154,8 @@ private MetaServiceClient getProxy() {
if (service == null) {
service = new MetaServiceClient(address);
serviceMap.put(address, service);
lastConnTimeMs.add(System.currentTimeMillis());
lastConnTimeMs.remove();
}
return service;
} finally {
Expand All @@ -158,16 +176,26 @@ public MetaServiceClientWrapper(MetaServiceProxy proxy) {
public <Response> Response executeRequest(Function<MetaServiceClient, Response> function) throws RpcException {
int tried = 0;
while (tried++ < 3) {
MetaServiceClient client = null;
try {
MetaServiceClient client = proxy.getProxy();
client = proxy.getProxy();
return function.apply(client);
} catch (StatusRuntimeException sre) {
if (sre.getStatus().getCode() == Status.Code.UNAVAILABLE || tried == 3) {
if (proxy.needReconn() && client != null) {
client.shutdown(true);
}
throw new RpcException("", sre.getMessage(), sre);
}
} catch (Exception e) {
if (proxy.needReconn() && client != null) {
client.shutdown(true);
}
throw new RpcException("", e.getMessage(), e);
} catch (Throwable t) {
if (proxy.needReconn() && client != null) {
client.shutdown(true);
}
throw new RpcException("", t.getMessage());
}
}
Expand Down

0 comments on commit efb1c77

Please sign in to comment.