diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 2cd6b58c57b5f6..bac9e3c99965a2 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -139,7 +139,71 @@ class MetaServiceProxy { public: static Status get_client(std::shared_ptr* stub) { TEST_SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client", Status::OK(), stub); - return get_pooled_client(stub); + return get_pooled_client(stub, nullptr); + } + + static Status get_proxy(MetaServiceProxy** proxy) { + // The 'stub' is a useless parameter, added only to reuse the `get_pooled_client` function. + std::shared_ptr stub; + return get_pooled_client(&stub, proxy); + } + + void set_unhealthy() { + std::unique_lock lock(_mutex); + maybe_unhealthy = true; + } + + bool need_reconn(long now) { + return maybe_unhealthy && ((now - last_reconn_time_ms.front()) > + config::meta_service_rpc_reconnect_interval_ms); + } + + Status get(std::shared_ptr* stub) { + using namespace std::chrono; + + auto now = duration_cast(system_clock::now().time_since_epoch()).count(); + { + std::shared_lock lock(_mutex); + if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) { + _last_access_at_ms.store(now, std::memory_order_relaxed); + *stub = _stub; + return Status::OK(); + } + } + + auto channel = std::make_unique(); + Status s = init_channel(channel.get()); + if (!s.ok()) [[unlikely]] { + return s; + } + + *stub = std::make_shared(channel.release(), + google::protobuf::Service::STUB_OWNS_CHANNEL); + + long deadline = now; + // connection age only works without list endpoint. + if (!is_meta_service_endpoint_list() && + config::meta_service_connection_age_base_seconds > 0) { + std::default_random_engine rng(static_cast(now)); + std::uniform_int_distribution<> uni( + config::meta_service_connection_age_base_seconds, + config::meta_service_connection_age_base_seconds * 2); + deadline = now + duration_cast(seconds(uni(rng))).count(); + } else { + deadline = LONG_MAX; + } + + // Last one WIN + std::unique_lock lock(_mutex); + _last_access_at_ms.store(now, std::memory_order_relaxed); + _deadline_ms = deadline; + _stub = *stub; + + last_reconn_time_ms.push(now); + last_reconn_time_ms.pop(); + maybe_unhealthy = false; + + return Status::OK(); } private: @@ -147,7 +211,17 @@ class MetaServiceProxy { return config::meta_service_endpoint.find(',') != std::string::npos; } - static Status get_pooled_client(std::shared_ptr* stub) { + /** + * This function initializes a pool of `MetaServiceProxy` objects and selects one using + * round-robin. It returns a client stub via the selected proxy. + * + * @param stub A pointer to a shared pointer of `MetaService_Stub` to be retrieved. + * @param proxy (Optional) A pointer to store the selected `MetaServiceProxy`. + * + * @return Status Returns `Status::OK()` on success or an error status on failure. + */ + static Status get_pooled_client(std::shared_ptr* stub, + MetaServiceProxy** proxy) { static std::once_flag proxies_flag; static size_t num_proxies = 1; static std::atomic index(0); @@ -164,10 +238,16 @@ class MetaServiceProxy { for (size_t i = 0; i + 1 < num_proxies; ++i) { size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; Status s = proxies[next_index].get(stub); + if (proxy != nullptr) { + *proxy = &(proxies[next_index]); + } if (s.ok()) return Status::OK(); } size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; + if (proxy != nullptr) { + *proxy = &(proxies[next_index]); + } return proxies[next_index].get(stub); } @@ -220,53 +300,13 @@ class MetaServiceProxy { _last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now; } - Status get(std::shared_ptr* stub) { - using namespace std::chrono; - - auto now = duration_cast(system_clock::now().time_since_epoch()).count(); - { - std::shared_lock lock(_mutex); - if (_deadline_ms >= now && !is_idle_timeout(now)) { - _last_access_at_ms.store(now, std::memory_order_relaxed); - *stub = _stub; - return Status::OK(); - } - } - - auto channel = std::make_unique(); - Status s = init_channel(channel.get()); - if (!s.ok()) [[unlikely]] { - return s; - } - - *stub = std::make_shared(channel.release(), - google::protobuf::Service::STUB_OWNS_CHANNEL); - - long deadline = now; - // connection age only works without list endpoint. - if (!is_meta_service_endpoint_list() && - config::meta_service_connection_age_base_seconds > 0) { - std::default_random_engine rng(static_cast(now)); - std::uniform_int_distribution<> uni( - config::meta_service_connection_age_base_seconds, - config::meta_service_connection_age_base_seconds * 2); - deadline = now + duration_cast(seconds(uni(rng))).count(); - } else { - deadline = LONG_MAX; - } - - // Last one WIN - std::unique_lock lock(_mutex); - _last_access_at_ms.store(now, std::memory_order_relaxed); - _deadline_ms = deadline; - _stub = *stub; - return Status::OK(); - } - std::shared_mutex _mutex; std::atomic _last_access_at_ms {0}; long _deadline_ms {0}; std::shared_ptr _stub; + + std::queue last_reconn_time_ms {std::deque {0, 0, 0}}; + bool maybe_unhealthy = false; }; template @@ -323,9 +363,11 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res, std::default_random_engine rng = make_random_engine(); std::uniform_int_distribution u(20, 200); std::uniform_int_distribution u2(500, 1000); - std::shared_ptr stub; - RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub)); + MetaServiceProxy* proxy; + RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); while (true) { + std::shared_ptr stub; + RETURN_IF_ERROR(proxy->get(&stub)); brpc::Controller cntl; cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); cntl.set_max_retry(kBrpcRetryTimes); @@ -333,6 +375,7 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res, (stub.get()->*method)(&cntl, &req, res, nullptr); if (cntl.Failed()) [[unlikely]] { error_msg = cntl.ErrorText(); + proxy->set_unhealthy(); } else if (res->status().code() == MetaServiceCode::OK) { return Status::OK(); } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { @@ -388,11 +431,12 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); - std::shared_ptr stub; - RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub)); - + MetaServiceProxy* proxy; + RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); int tried = 0; while (true) { + std::shared_ptr stub; + RETURN_IF_ERROR(proxy->get(&stub)); brpc::Controller cntl; cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); GetRowsetRequest req; @@ -430,6 +474,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ _get_rowset_latency << latency; int retry_times = config::meta_service_rpc_retry_times; if (cntl.Failed()) { + proxy->set_unhealthy(); if (tried++ < retry_times) { auto rng = make_random_engine(); std::uniform_int_distribution u(20, 200); @@ -633,15 +678,10 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_ *delete_bitmap = *new_delete_bitmap; } - std::shared_ptr stub; - RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub)); - int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version()); - brpc::Controller cntl; // When there are many delete bitmaps that need to be synchronized, it // may take a longer time, especially when loading the tablet for the // first time, so set a relatively long timeout time. - cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); GetDeleteBitmapRequest req; GetDeleteBitmapResponse res; req.set_cloud_unique_id(config::cloud_unique_id); @@ -669,10 +709,43 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_ } VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString(); - stub->get_delete_bitmap(&cntl, &req, &res, nullptr); - if (cntl.Failed()) { - return Status::RpcError("failed to get delete bitmap: {}", cntl.ErrorText()); + + int retry_times = 0; + MetaServiceProxy* proxy; + RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); + auto start = std::chrono::high_resolution_clock::now(); + while (true) { + std::shared_ptr stub; + RETURN_IF_ERROR(proxy->get(&stub)); + // When there are many delete bitmaps that need to be synchronized, it + // may take a longer time, especially when loading the tablet for the + // first time, so set a relatively long timeout time. + brpc::Controller cntl; + cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); + cntl.set_max_retry(kBrpcRetryTimes); + res.Clear(); + stub->get_delete_bitmap(&cntl, &req, &res, nullptr); + if (cntl.Failed()) [[unlikely]] { + LOG_INFO("failed to get delete bitmap") + .tag("reason", cntl.ErrorText()) + .tag("tablet_id", tablet->tablet_id()) + .tag("partition_id", tablet->partition_id()) + .tag("tried", retry_times); + proxy->set_unhealthy(); + } else { + break; + } + + if (++retry_times > config::delete_bitmap_rpc_retry_times) { + if (cntl.Failed()) { + return Status::RpcError("failed to get delete bitmap, tablet={} err={}", + tablet->tablet_id(), cntl.ErrorText()); + } + break; + } } + auto end = std::chrono::high_resolution_clock::now(); + if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { return Status::NotFound("failed to get delete bitmap: {}", res.status().msg()); } @@ -722,7 +795,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_ {rst_id, segment_ids[i], vers[i]}, roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length())); } - int64_t latency = cntl.latency_us(); + int64_t latency = std::chrono::duration_cast(end - start).count(); if (latency > 100 * 1000) { // 100ms LOG(INFO) << "finish get_delete_bitmap rpc. rowset_ids.size()=" << rowset_ids.size() << ", delete_bitmaps.size()=" << delete_bitmaps.size() << ", latency=" << latency diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index f90bf536f63018..66e584905afc98 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -77,5 +77,9 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120"); DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true"); DEFINE_mBool(enable_cloud_tablet_report, "true"); + +DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25"); + +DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000"); #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index a8a7c0c48ec91f..50f058bf8b0c79 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -111,5 +111,9 @@ DECLARE_mBool(enable_use_cloud_unique_id_from_fe); DECLARE_Bool(enable_cloud_tablet_report); +DECLARE_mInt32(delete_bitmap_rpc_retry_times); + +DECLARE_mInt64(meta_service_rpc_reconnect_interval_ms); + #include "common/compile_check_end.h" } // namespace doris::config diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c796a59453750c..765fca5274a7a1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3362,4 +3362,8 @@ public static int metaServiceRpcRetryTimes() { "For disabling certain SQL queries, the configuration item is a list of simple class names of AST" + "(for example CreateRepositoryStmt, CreatePolicyCommand), separated by commas."}) public static String block_sql_ast_names = ""; + + public static long meta_service_rpc_reconnect_interval_ms = 5000; + + public static long meta_service_rpc_retry_cnt = 10; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index d7f718e3ca46bf..2a69132e44bcd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -27,7 +27,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.LinkedList; import java.util.Map; +import java.util.Queue; +import java.util.Random; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -41,6 +44,7 @@ public class MetaServiceProxy { // use concurrent map to allow access serviceMap in multi thread. private ReentrantLock lock = new ReentrantLock(); private final Map serviceMap; + private Queue lastConnTimeMs = new LinkedList<>(); static { if (Config.isCloudMode() && (Config.meta_service_endpoint == null || Config.meta_service_endpoint.isEmpty())) { @@ -50,6 +54,9 @@ public class MetaServiceProxy { public MetaServiceProxy() { this.serviceMap = Maps.newConcurrentMap(); + for (int i = 0; i < 3; ++i) { + lastConnTimeMs.add(0L); + } } private static class SingletonHolder { @@ -77,6 +84,16 @@ public static MetaServiceProxy getInstance() { return MetaServiceProxy.SingletonHolder.get(); } + public boolean needReconn() { + lock.lock(); + try { + long now = System.currentTimeMillis(); + return (now - lastConnTimeMs.element() > Config.meta_service_rpc_reconnect_interval_ms); + } finally { + lock.unlock(); + } + } + public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) throws RpcException { try { @@ -138,6 +155,8 @@ private MetaServiceClient getProxy() { if (service == null) { service = new MetaServiceClient(address); serviceMap.put(address, service); + lastConnTimeMs.add(System.currentTimeMillis()); + lastConnTimeMs.remove(); } return service; } finally { @@ -150,6 +169,7 @@ private MetaServiceClient getProxy() { public static class MetaServiceClientWrapper { private final MetaServiceProxy proxy; + private Random random = new Random(); public MetaServiceClientWrapper(MetaServiceProxy proxy) { this.proxy = proxy; @@ -157,18 +177,40 @@ public MetaServiceClientWrapper(MetaServiceProxy proxy) { public Response executeRequest(Function function) throws RpcException { int tried = 0; - while (tried++ < 3) { + while (tried++ < Config.meta_service_rpc_retry_cnt) { + MetaServiceClient client = null; try { - MetaServiceClient client = proxy.getProxy(); + client = proxy.getProxy(); return function.apply(client); } catch (StatusRuntimeException sre) { - if (sre.getStatus().getCode() == Status.Code.UNAVAILABLE || tried == 3) { + LOG.info("failed to request meta servive code {}, msg {}, trycnt {}", sre.getStatus().getCode(), + sre.getMessage(), tried); + if (tried >= Config.meta_service_rpc_retry_cnt + || (sre.getStatus().getCode() != Status.Code.UNAVAILABLE + && sre.getStatus().getCode() != Status.Code.UNKNOWN)) { throw new RpcException("", sre.getMessage(), sre); } } catch (Exception e) { - throw new RpcException("", e.getMessage(), e); + LOG.info("failed to request meta servive trycnt {}", tried, e); + if (tried >= Config.meta_service_rpc_retry_cnt) { + throw new RpcException("", e.getMessage(), e); + } } catch (Throwable t) { - throw new RpcException("", t.getMessage()); + LOG.info("failed to request meta servive trycnt {}", tried, t); + if (tried >= Config.meta_service_rpc_retry_cnt) { + throw new RpcException("", t.getMessage()); + } + } + + if (proxy.needReconn() && client != null) { + client.shutdown(true); + } + + int delay = 20 + random.nextInt(200 - 20 + 1); + try { + Thread.sleep(delay); + } catch (InterruptedException interruptedException) { + // ignore } } return null; // impossible and unreachable, just make the compiler happy