Skip to content

Commit

Permalink
Add configs to specify keepAlive and shutdownTimeout for GrpcQueryCli…
Browse files Browse the repository at this point in the history
…ent.

- Added `channelShutdownTimeoutSecond` config for `GrpcQueryClient` with default of 10s.
- Added configs for keep-alive:
  - `channelKeepAliveEnabled` to enable/disable the feature, default false.
  - `channelKeepAliveTimeiSeconds` to configures the interval for sending keep-alive pings,
     default 300 seconds (5 minutes).
  - `channelKeepAliveTimeoutSeconds` configures the timeout for waiting for a ping acknowledgment,
     default 300 seconds (5 minutes).
  - `channelKeepAliveWithoutCalls` ensures pings are sent even when there are no active calls,
     keeping the connection alive during idle period, default true.
  • Loading branch information
mayankshriv committed Jul 4, 2024
1 parent cf1a0f6 commit 565f80e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ public class GrpcConfig {
public static final String GRPC_TLS_PREFIX = "tls";
public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText";
public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE = "maxInboundMessageSizeBytes";

private static final String CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = "channelShutdownTimeoutSecond";
private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10;

// KeepAlive configs
private static final String CONFIG_CHANNEL_KEEP_ALIVE_ENABLED = "channelKeepAliveEnabled"; // To control keep alive on no usage
private static final boolean DEFAULT_CHANNEL_KEEP_ALIVE_ENABLED = false;

private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS = "channelKeepAliveTimeSeconds";
private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS = 300; // 5 minutes

private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = "channelKeepAliveTimeoutSeconds";
private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = 300; // 5 minutes

private static final String CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = "channelKeepAliveWithoutCalls";
private static final boolean DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = true;

// Default max message size to 128MB
public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024 * 1024;
// Default use plain text for transport
Expand Down Expand Up @@ -69,6 +86,27 @@ public boolean isUsePlainText() {
return Boolean.parseBoolean(_pinotConfig.getProperty(CONFIG_USE_PLAIN_TEXT, DEFAULT_IS_USE_PLAIN_TEXT));
}

public int getChannelShutdownTimeoutSecond() {
return _pinotConfig.getProperty(CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECOND, DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND);
}

public boolean getChannelKeepAliveEnabled() {
return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_ENABLED, DEFAULT_CHANNEL_KEEP_ALIVE_ENABLED);
}

public int getChannelKeepAliveTimeSeconds() {
return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS, DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS);
}

public int getChannelKeepAliveTimeoutSeconds() {
return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS,
DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS);
}

public boolean getChannelKeepAliveWithoutCalls() {
return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS, DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS);
}

public TlsConfig getTlsConfig() {
return _tlsConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@

public class GrpcQueryClient implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryClient.class);
private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10;

// the key is the hashCode of the TlsConfig, the value is the SslContext
// We don't use TlsConfig as the map key because the TlsConfig is mutable, which means the hashCode can change. If the
// hashCode changes and the map is resized, the SslContext of the old hashCode will be lost.
private static final Map<Integer, SslContext> CLIENT_SSL_CONTEXTS_CACHE = new ConcurrentHashMap<>();

private final ManagedChannel _managedChannel;
private final PinotQueryServerGrpc.PinotQueryServerBlockingStub _blockingStub;
private final int _channelShutdownTimeoutSeconds;

public GrpcQueryClient(String host, int port) {
this(host, port, new GrpcConfig(Collections.emptyMap()));
Expand All @@ -64,11 +65,20 @@ public GrpcQueryClient(String host, int port, GrpcConfig config) {
ManagedChannelBuilder.forAddress(host, port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
.usePlaintext().build();
} else {
_managedChannel =
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(host, port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
.sslContext(buildSslContext(config.getTlsConfig())).build();
.sslContext(buildSslContext(config.getTlsConfig()));

if (config.getChannelKeepAliveEnabled()) {
channelBuilder.keepAliveTime(config.getChannelKeepAliveTimeSeconds(), TimeUnit.SECONDS)
.keepAliveTimeout(config.getChannelKeepAliveTimeoutSeconds(), TimeUnit.SECONDS)
.keepAliveWithoutCalls(config.getChannelKeepAliveWithoutCalls());
}

_managedChannel = channelBuilder.build();
}
_blockingStub = PinotQueryServerGrpc.newBlockingStub(_managedChannel);
_channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond();
}

private SslContext buildSslContext(TlsConfig tlsConfig) {
Expand Down Expand Up @@ -103,7 +113,7 @@ public void close() {
if (!_managedChannel.isShutdown()) {
try {
_managedChannel.shutdownNow();
if (!_managedChannel.awaitTermination(DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND, TimeUnit.SECONDS)) {
if (!_managedChannel.awaitTermination(_channelShutdownTimeoutSeconds, TimeUnit.SECONDS)) {
LOGGER.warn("Timed out forcefully shutting down connection: {}. ", _managedChannel);
}
} catch (Exception e) {
Expand Down

0 comments on commit 565f80e

Please sign in to comment.