Skip to content

Commit

Permalink
[server][fc] Integrate server side handlers with gRPC service + mTLS (#…
Browse files Browse the repository at this point in the history
…565)

- Adapts Netty Channel pipeline and corresponding handler classes to be compatible with gRPC requests
- Can collect server side statistics and enforce read quota for gRPC requests
- Introduce mutual TLS support on gRPC enabled servers and fast client, when an SSLFactory is present
- Update unit tests and integration tests
  • Loading branch information
rkunds authored Aug 18, 2023
1 parent d05c24e commit d2ca1aa
Show file tree
Hide file tree
Showing 57 changed files with 2,987 additions and 657 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ subprojects {
avroCompiler libraries.avroCompiler
avroCompiler libraries.avroUtilBuilder
avroCompiler 'org.slf4j:slf4j-simple:1.7.32'
runtimeOnly libraries.grpcNettyShaded
implementation libraries.grpcNettyShaded
implementation libraries.grpcProtobuf
implementation libraries.grpcStub
compileOnly libraries.tomcatAnnotations
Expand Down Expand Up @@ -728,6 +728,7 @@ ext.createDiffFile = { ->

// venice-common
':!internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java',
':!internal/venice-common/src/main/java/com/linkedin/venice/acl/handler/StoreAclHandler.java',

// venice-test-common
':!internal/venice-test-common/*',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static com.linkedin.venice.ConfigKeys.FAST_AVRO_FIELD_LIMIT_PER_METHOD;
import static com.linkedin.venice.ConfigKeys.FREEZE_INGESTION_IF_READY_TO_SERVE_OR_LOCAL_DATA_EXISTS;
import static com.linkedin.venice.ConfigKeys.GRPC_READ_SERVER_PORT;
import static com.linkedin.venice.ConfigKeys.GRPC_SERVER_WORKER_THREAD_COUNT;
import static com.linkedin.venice.ConfigKeys.HELIX_HYBRID_STORE_QUOTA_ENABLED;
import static com.linkedin.venice.ConfigKeys.HYBRID_QUOTA_ENFORCEMENT_ENABLED;
import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT;
Expand Down Expand Up @@ -266,6 +267,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
* number of worker threads for the netty listener. If not specified, netty uses twice cpu count.
*/
private final int nettyWorkerThreadCount;
private final int grpcWorkerThreadCount;

private final long databaseSyncBytesIntervalForTransactionalMode;

Expand Down Expand Up @@ -479,6 +481,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getInt(SERVER_SOURCE_TOPIC_OFFSET_CHECK_INTERVAL_MS, (int) TimeUnit.SECONDS.toMillis(60));
nettyGracefulShutdownPeriodSeconds = serverProperties.getInt(SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS, 30);
nettyWorkerThreadCount = serverProperties.getInt(SERVER_NETTY_WORKER_THREADS, 0);
grpcWorkerThreadCount =
serverProperties.getInt(GRPC_SERVER_WORKER_THREAD_COUNT, Runtime.getRuntime().availableProcessors());

remoteIngestionRepairSleepInterval = serverProperties.getInt(
SERVER_REMOTE_INGESTION_REPAIR_SLEEP_INTERVAL_SECONDS,
Expand Down Expand Up @@ -870,6 +874,10 @@ public int getNettyWorkerThreadCount() {
return nettyWorkerThreadCount;
}

public int getGrpcWorkerThreadCount() {
return grpcWorkerThreadCount;
}

public long getDatabaseSyncBytesIntervalForTransactionalMode() {
return databaseSyncBytesIntervalForTransactionalMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class ClientConfig<K, V, T extends SpecificRecord> {
* gRPC servers when we make a request to receive Metadata from a server to obtain information in order to successfully
* route requests to the correct server/partition
*/
private final Map<String, String> nettyServerToGrpcAddressMap;
private final GrpcClientConfig grpcClientConfig;

private ClientConfig(
String storeName,
Expand Down Expand Up @@ -117,17 +117,18 @@ private ClientConfig(
String clusterDiscoveryD2Service,
boolean useStreamingBatchGetAsDefault,
boolean useGrpc,
Map<String, String> nettyServerToGrpcAddressMap) {
GrpcClientConfig grpcClientConfig) {
if (storeName == null || storeName.isEmpty()) {
throw new VeniceClientException("storeName param shouldn't be empty");
}
if (r2Client == null) {
if (r2Client == null && !useGrpc) {
throw new VeniceClientException("r2Client param shouldn't be null");
}
if (useGrpc && nettyServerToGrpcAddressMap == null) {
if (useGrpc && grpcClientConfig == null) {
throw new UnsupportedOperationException(
"we require a mapping of netty server addresses to grpc server addresses to use a gRPC enabled client");
"we require additional gRPC related configs when we create a gRPC enabled client");
}

this.r2Client = r2Client;
this.storeName = storeName;
this.statsPrefix = (statsPrefix == null ? "" : statsPrefix);
Expand Down Expand Up @@ -239,11 +240,7 @@ private ClientConfig(
}

this.useGrpc = useGrpc;
if (this.useGrpc) {
LOGGER.info("Using gRPC for Venice Fast Client");
}

this.nettyServerToGrpcAddressMap = this.useGrpc ? nettyServerToGrpcAddressMap : null;
this.grpcClientConfig = grpcClientConfig;
}

public String getStoreName() {
Expand Down Expand Up @@ -367,8 +364,8 @@ public boolean useGrpc() {
return useGrpc;
}

public Map<String, String> getNettyServerToGrpcAddressMap() {
return nettyServerToGrpcAddressMap;
public GrpcClientConfig getGrpcClientConfig() {
return grpcClientConfig;
}

public static class ClientConfigBuilder<K, V, T extends SpecificRecord> {
Expand Down Expand Up @@ -419,7 +416,7 @@ public static class ClientConfigBuilder<K, V, T extends SpecificRecord> {
private String clusterDiscoveryD2Service;
private boolean useStreamingBatchGetAsDefault = false;
private boolean useGrpc = false;
private Map<String, String> nettyServerToGrpcAddressMap = null;
private GrpcClientConfig grpcClientConfig = null;

public ClientConfigBuilder<K, V, T> setStoreName(String storeName) {
this.storeName = storeName;
Expand Down Expand Up @@ -582,9 +579,8 @@ public ClientConfigBuilder<K, V, T> setUseGrpc(boolean useGrpc) {
return this;
}

public ClientConfigBuilder<K, V, T> setNettyServerToGrpcAddressMap(
Map<String, String> nettyServerToGrpcAddressMap) {
this.nettyServerToGrpcAddressMap = nettyServerToGrpcAddressMap;
public ClientConfigBuilder<K, V, T> setGrpcClientConfig(GrpcClientConfig grpcClientConfig) {
this.grpcClientConfig = grpcClientConfig;
return this;
}

Expand Down Expand Up @@ -619,7 +615,7 @@ public ClientConfigBuilder<K, V, T> clone() {
.setClusterDiscoveryD2Service(clusterDiscoveryD2Service)
.setUseStreamingBatchGetAsDefault(useStreamingBatchGetAsDefault)
.setUseGrpc(useGrpc)
.setNettyServerToGrpcAddressMap(nettyServerToGrpcAddressMap);
.setGrpcClientConfig(grpcClientConfig);
}

public ClientConfig<K, V, T> build() {
Expand Down Expand Up @@ -654,7 +650,7 @@ public ClientConfig<K, V, T> build() {
clusterDiscoveryD2Service,
useStreamingBatchGetAsDefault,
useGrpc,
nettyServerToGrpcAddressMap);
grpcClientConfig);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public DispatchingAvroGenericStoreClient(StoreMetadata metadata, ClientConfig co
this(
metadata,
config,
config.useGrpc() ? new GrpcTransportClient(config) : new R2TransportClient(config.getR2Client()));
config.useGrpc()
? new GrpcTransportClient(config.getGrpcClientConfig())
: new R2TransportClient(config.getR2Client()));
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.linkedin.venice.fastclient;

import com.linkedin.r2.transport.common.Client;
import com.linkedin.venice.security.SSLFactory;
import java.util.Map;


public class GrpcClientConfig {
// Use r2Client for non-storage related requests (not implemented in gRPC yet)
private final Client r2Client;
// require a map from netty server to grpc address due to lack of gRPC service discovery
private final Map<String, String> nettyServerToGrpcAddressMap;
// SSL Factory required if using SSL
private final SSLFactory sslFactory;

public GrpcClientConfig(Builder builder) {
this.r2Client = builder.r2Client;
this.nettyServerToGrpcAddressMap = builder.nettyServerToGrpcAddressMap;
this.sslFactory = builder.sslFactory;
}

public Client getR2Client() {
return r2Client;
}

public Map<String, String> getNettyServerToGrpcAddressMap() {
return nettyServerToGrpcAddressMap;
}

public SSLFactory getSslFactory() {
return sslFactory;
}

public static class Builder {
private Client r2Client = null;
private Map<String, String> nettyServerToGrpcAddressMap = null;
private SSLFactory sslFactory = null;

public Builder setR2Client(Client r2Client) {
this.r2Client = r2Client;
return this;
}

public Builder setNettyServerToGrpcAddressMap(Map<String, String> nettyServerToGrpcAddressMap) {
this.nettyServerToGrpcAddressMap = nettyServerToGrpcAddressMap;
return this;
}

public Builder setSSLFactory(SSLFactory sslFactory) {
this.sslFactory = sslFactory;
return this;
}

public GrpcClientConfig build() {
verify();
return new GrpcClientConfig(this);
}

private void verify() {
if (r2Client == null) {
throw new IllegalArgumentException("R2 client must be set when enabling gRPC on FC");
}
if (nettyServerToGrpcAddressMap == null) {
throw new IllegalArgumentException("Netty server to grpc address map must be set");
}
if (nettyServerToGrpcAddressMap.size() == 0) {
throw new IllegalArgumentException("Netty server to grpc address map must not be empty");
}
}
}
}
Loading

0 comments on commit d2ca1aa

Please sign in to comment.