From 565f80eabf9f7b4bc4deca367ed4f06529151f9e Mon Sep 17 00:00:00 2001 From: Mayank Shrivastava Date: Thu, 4 Jul 2024 15:10:30 -0700 Subject: [PATCH] Add configs to specify keepAlive and shutdownTimeout for GrpcQueryClient. - Added `channelShutdownTimeoutSecond` config for `GrpcQueryClient` with default of 10s. - Added configs for keep-alive: - `channelKeepAliveEnabled` to enable/disable the feature, default false. - `channelKeepAliveTimeiSeconds` to configures the interval for sending keep-alive pings, default 300 seconds (5 minutes). - `channelKeepAliveTimeoutSeconds` configures the timeout for waiting for a ping acknowledgment, default 300 seconds (5 minutes). - `channelKeepAliveWithoutCalls` ensures pings are sent even when there are no active calls, keeping the connection alive during idle period, default true. --- .../pinot/common/config/GrpcConfig.java | 38 +++++++++++++++++++ .../common/utils/grpc/GrpcQueryClient.java | 18 +++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java index 68a6b790a39d..7dd83f277e8b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java @@ -28,6 +28,23 @@ public class GrpcConfig { public static final String GRPC_TLS_PREFIX = "tls"; public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText"; public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE = "maxInboundMessageSizeBytes"; + + private static final String CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = "channelShutdownTimeoutSecond"; + private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10; + + // KeepAlive configs + private static final String CONFIG_CHANNEL_KEEP_ALIVE_ENABLED = "channelKeepAliveEnabled"; // To control keep alive on no usage + private static final boolean DEFAULT_CHANNEL_KEEP_ALIVE_ENABLED = false; + + private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS = "channelKeepAliveTimeSeconds"; + private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS = 300; // 5 minutes + + private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = "channelKeepAliveTimeoutSeconds"; + private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = 300; // 5 minutes + + private static final String CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = "channelKeepAliveWithoutCalls"; + private static final boolean DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = true; + // Default max message size to 128MB public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 * 1024; // Default use plain text for transport @@ -69,6 +86,27 @@ public boolean isUsePlainText() { return Boolean.parseBoolean(_pinotConfig.getProperty(CONFIG_USE_PLAIN_TEXT, DEFAULT_IS_USE_PLAIN_TEXT)); } + public int getChannelShutdownTimeoutSecond() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECOND, DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND); + } + + public boolean getChannelKeepAliveEnabled() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_ENABLED, DEFAULT_CHANNEL_KEEP_ALIVE_ENABLED); + } + + public int getChannelKeepAliveTimeSeconds() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS, DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS); + } + + public int getChannelKeepAliveTimeoutSeconds() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS, + DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS); + } + + public boolean getChannelKeepAliveWithoutCalls() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS, DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS); + } + public TlsConfig getTlsConfig() { return _tlsConfig; } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java index a41a30c5d419..54761e26115b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java @@ -45,7 +45,7 @@ public class GrpcQueryClient implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryClient.class); - private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10; + // the key is the hashCode of the TlsConfig, the value is the SslContext // We don't use TlsConfig as the map key because the TlsConfig is mutable, which means the hashCode can change. If the // hashCode changes and the map is resized, the SslContext of the old hashCode will be lost. @@ -53,6 +53,7 @@ public class GrpcQueryClient implements Closeable { private final ManagedChannel _managedChannel; private final PinotQueryServerGrpc.PinotQueryServerBlockingStub _blockingStub; + private final int _channelShutdownTimeoutSeconds; public GrpcQueryClient(String host, int port) { this(host, port, new GrpcConfig(Collections.emptyMap())); @@ -64,11 +65,20 @@ public GrpcQueryClient(String host, int port, GrpcConfig config) { ManagedChannelBuilder.forAddress(host, port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) .usePlaintext().build(); } else { - _managedChannel = + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) - .sslContext(buildSslContext(config.getTlsConfig())).build(); + .sslContext(buildSslContext(config.getTlsConfig())); + + if (config.getChannelKeepAliveEnabled()) { + channelBuilder.keepAliveTime(config.getChannelKeepAliveTimeSeconds(), TimeUnit.SECONDS) + .keepAliveTimeout(config.getChannelKeepAliveTimeoutSeconds(), TimeUnit.SECONDS) + .keepAliveWithoutCalls(config.getChannelKeepAliveWithoutCalls()); + } + + _managedChannel = channelBuilder.build(); } _blockingStub = PinotQueryServerGrpc.newBlockingStub(_managedChannel); + _channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond(); } private SslContext buildSslContext(TlsConfig tlsConfig) { @@ -103,7 +113,7 @@ public void close() { if (!_managedChannel.isShutdown()) { try { _managedChannel.shutdownNow(); - if (!_managedChannel.awaitTermination(DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND, TimeUnit.SECONDS)) { + if (!_managedChannel.awaitTermination(_channelShutdownTimeoutSeconds, TimeUnit.SECONDS)) { LOGGER.warn("Timed out forcefully shutting down connection: {}. ", _managedChannel); } } catch (Exception e) {