diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java index 68a6b790a39d..ed2d32a7360e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java @@ -28,6 +28,20 @@ public class GrpcConfig { public static final String GRPC_TLS_PREFIX = "tls"; public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText"; public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE = "maxInboundMessageSizeBytes"; + + private static final String CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS = "channelShutdownTimeoutSeconds"; + private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS = 10; + + // KeepAlive configs + private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS = "channelKeepAliveTimeSeconds"; + private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS = -1; // Set value > 0 to enable keep alive + + private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = "channelKeepAliveTimeoutSeconds"; + private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = 20; // 20 seconds + + private static final String CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = "channelKeepAliveWithoutCalls"; + private static final boolean DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = true; + // Default max message size to 128MB public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 * 1024; // Default use plain text for transport @@ -69,6 +83,23 @@ public boolean isUsePlainText() { return Boolean.parseBoolean(_pinotConfig.getProperty(CONFIG_USE_PLAIN_TEXT, DEFAULT_IS_USE_PLAIN_TEXT)); } + public int getChannelShutdownTimeoutSecond() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS, DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS); + } + + public int getChannelKeepAliveTimeSeconds() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS, DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS); + } + + public int getChannelKeepAliveTimeoutSeconds() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS, + DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS); + } + + public boolean isChannelKeepAliveWithoutCalls() { + return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS, DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS); + } + public TlsConfig getTlsConfig() { return _tlsConfig; } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java index a41a30c5d419..9987876b9580 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java @@ -45,7 +45,7 @@ public class GrpcQueryClient implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryClient.class); - private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10; + // the key is the hashCode of the TlsConfig, the value is the SslContext // We don't use TlsConfig as the map key because the TlsConfig is mutable, which means the hashCode can change. If the // hashCode changes and the map is resized, the SslContext of the old hashCode will be lost. @@ -53,22 +53,35 @@ public class GrpcQueryClient implements Closeable { private final ManagedChannel _managedChannel; private final PinotQueryServerGrpc.PinotQueryServerBlockingStub _blockingStub; + private final int _channelShutdownTimeoutSeconds; public GrpcQueryClient(String host, int port) { this(host, port, new GrpcConfig(Collections.emptyMap())); } public GrpcQueryClient(String host, int port, GrpcConfig config) { + ManagedChannelBuilder channelBuilder; if (config.isUsePlainText()) { - _managedChannel = + channelBuilder = ManagedChannelBuilder.forAddress(host, port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) - .usePlaintext().build(); + .usePlaintext(); } else { - _managedChannel = + channelBuilder = NettyChannelBuilder.forAddress(host, port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) - .sslContext(buildSslContext(config.getTlsConfig())).build(); + .sslContext(buildSslContext(config.getTlsConfig())); + } + + // Set keep alive configs, if enabled + int channelKeepAliveTimeSeconds = config.getChannelKeepAliveTimeSeconds(); + if (channelKeepAliveTimeSeconds > 0) { + channelBuilder.keepAliveTime(channelKeepAliveTimeSeconds, TimeUnit.SECONDS) + .keepAliveTimeout(config.getChannelKeepAliveTimeoutSeconds(), TimeUnit.SECONDS) + .keepAliveWithoutCalls(config.isChannelKeepAliveWithoutCalls()); } + + _managedChannel = channelBuilder.build(); _blockingStub = PinotQueryServerGrpc.newBlockingStub(_managedChannel); + _channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond(); } private SslContext buildSslContext(TlsConfig tlsConfig) { @@ -103,7 +116,7 @@ public void close() { if (!_managedChannel.isShutdown()) { try { _managedChannel.shutdownNow(); - if (!_managedChannel.awaitTermination(DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND, TimeUnit.SECONDS)) { + if (!_managedChannel.awaitTermination(_channelShutdownTimeoutSeconds, TimeUnit.SECONDS)) { LOGGER.warn("Timed out forcefully shutting down connection: {}. ", _managedChannel); } } catch (Exception e) {