-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[controller][server][test] Support cluster by cluster pub sub clients for controller. #600
Conversation
@@ -186,7 +186,11 @@ public DaVinciBackend( | |||
new VeniceWriterFactory(backendProps.toProperties(), pubSubClientsFactory.getProducerAdapterFactory(), null); | |||
String instanceName = Utils.getHostName() + "_" + Utils.getPid(); | |||
int derivedSchemaID = backendProps.getInt(PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1); | |||
pushStatusStoreWriter = new PushStatusStoreWriter(writerFactory, instanceName, derivedSchemaID); | |||
Map<String, VeniceWriterFactory> writerFactoryMap = new HashMap<>(1); | |||
String clusterName = backendConfig.getClusterName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DVC is cluster agnostic and can discover cluster using its NativeMetadataRepository since a DVC can be subscribed to multiple stores belonging to different Venice clusters. You can use the ClusterInfoProvider
interface to get cluster name given a store name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For DVC and server side, we do not need clusterName discovery logic. As each host naturally belong to a specific cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That assumption is true for server but how is it true for DVC? What if the DVC user subscribe to multiple stores that span across multiple venice clusters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a valid concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is kind of old, but I think it's pretty valid. Are we going to address this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this cluster by cluster is mainly used for controller only. I do not plan to do it in this PR, but the concern is valid, we can do it in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, sorry I am not sure if I am fully follow your intention of this PR, if this is only for controller, I think you could skip DVC for now? As for seas-careers, their DVC use cases are spanning across different clusters already. This might not be correct for them.
(Some of the config in backend config does not apply to all DVC stores)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't see a good reason here to introduce a potential bug into a client library that even if harmless we'll have to force everyone to upgrade off of at some point.
95980a0
to
39192b8
Compare
if (VeniceSystemStoreUtils.isSystemStore(storeName)) { | ||
if (storeName.equals(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getSystemStoreName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we combine this two if conditions for simplicity?
internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManager.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManager.java
Show resolved
Hide resolved
private final Lazy<PubSubAdminAdapter> pubSubReadOnlyAdminAdapter; | ||
private final Lazy<PubSubAdminAdapter> pubSubDefaultAdminAdapter; | ||
private final Lazy<PubSubConsumerAdapter> pubSubConsumerAdapter; | ||
private final Map<String, PubSubAdminAdapter> pubSubAdminAdapterMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading the type declaration of pubSubDefaultAdminAdapter
raises a question that if it's necessary to use the Lazy<PubSubAdminAdapter>
as the value in the map? Same for the below pubSubConsumerAdapterMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if it is optional lazy is good, otherwise no need to have lazy here.
0d3a363
to
0238883
Compare
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java
Show resolved
Hide resolved
c5a8616
to
ada2cb4
Compare
@@ -186,7 +186,11 @@ public DaVinciBackend( | |||
new VeniceWriterFactory(backendProps.toProperties(), pubSubClientsFactory.getProducerAdapterFactory(), null); | |||
String instanceName = Utils.getHostName() + "_" + Utils.getPid(); | |||
int derivedSchemaID = backendProps.getInt(PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1); | |||
pushStatusStoreWriter = new PushStatusStoreWriter(writerFactory, instanceName, derivedSchemaID); | |||
Map<String, VeniceWriterFactory> writerFactoryMap = new HashMap<>(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why (1)? It seems we don't do this in other similar spots in this PR, but why here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using (1) here is for having fixed clusterName
for all stores in this DavinciBackend
, as I changed the constructor of PushStatusStoreWriter
for using clusterName
to VeniceWriterFactory
map and cluster discovery supplier function. For controller, it will use proper cluster discover supplier function. As we want to have a PubSubClientFactory for all stores regardless which cluster a specific store belongs to, we just using a placeholder clusterName here as only one entry for this map, and each store discovery logic will return that clusterName to help it find that specific VencieWriterFactory for every store.
@@ -300,6 +300,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { | |||
private final String[] msgForLagMeasurement; | |||
private final Runnable runnableForKillIngestionTasksForNonCurrentVersions; | |||
|
|||
protected final String clusterName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
@@ -3498,6 +3498,7 @@ private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig( | |||
doReturn(databaseSyncBytesIntervalForDeferredWriteMode).when(storeConfig) | |||
.getDatabaseSyncBytesIntervalForDeferredWriteMode(); | |||
doReturn(false).when(storeConfig).isReadOnlyForBatchOnlyStoreEnabled(); | |||
// doReturn(clusterName).when(storeConfig).getClusterName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this?
.setPubSubAdminAdapterFactory(new ApacheKafkaAdminAdapterFactory()) | ||
.setPubSubConsumerAdapterFactory(new ApacheKafkaConsumerAdapterFactory()) | ||
.setDefaultPubSubClientsFactory(pubSubClientsFactory) | ||
.setClusterNameSupplier(s -> Optional.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe have a default name supplier that's s->Optional.empty() in topicManagerRepository?
I think I need some explanation here. I understand that the intent is that we want to change the pubsub implementation under the hood cluster by cluster. Thats just an artifact of how we usually roll out features. However, it seems unfortunate and potentially brittle looking forward to leak the cluster->store association into all these different components. Particularly since up to this point, the architecture supports materializing a given store in a different locations (store move, davinci, etc.). Also, are there no gotcha's here for store move? Like, if we move a store from one cluster with a certain pubsub implementation to another cluster, will it continue to work? I think I need to hear a compelling justification for why we couldn't wrap this up in a different way. One way that springs to mind is that instead of coupling this to clusters we couple it to stores? Maybe that's naive. At the very least, it seems most of the logic provided discerns the clusterName adjacently from the topic name. It seems like we can pull that logic into the adapter implementations and discern it centrally there as opposed to breeching all these interfaces. |
Currently clusterName is still not a property for store and this cause trouble discovering cluster for store. Regarding store move, as long as we have pub sub client implementation for source cluster and destination cluster, then this still work, it is only up to which cluster this store belongs at the time point we want to use pub sub client to do sth. I think let adapter implementation getting aware of cluster name is not good, clusterName is venice definition, and our cluster find logic should not be related to Pub Sub Clients. Maybe we can put clusterName into Store or PubSubTopic, that is the question I am thinking. I feel that would also be a bigger change than this. |
ada2cb4
to
04b6f50
Compare
Summary, imperative, start upper case, don't end with a period
Previously, different venice cluster for controller share the same pub sub admin, consumer and producer clients, especially for
TopicManager
. As venice is going to support different pub sub clients, we need to add support for different pub sub clients for different cluster running in the same controller host.In current controller's pub sub client usage in common class (e.g
TopicManager
) used by controller, there is no cluster name information in API. To avoid changing the interface, a cluster discovery supplier function is added toTopicManagerRepository
and other utility class. For controller, this cluster discovery logic will rely onHelixReadOnlyStoreConfigRepository
to resolve cluster name from topic. For server, it just uses the default cluster name for that host.It is possible that
HelixReadOnlyStoreConfigRepository
based logic cannot resolve cluster name from topic and some pub sub client usage is not for specific cluster (e.g. listing all topics insideTopicManager
), we will rely on default pub sub clients factory to do this.How was this PR tested?
Does this PR introduce any user-facing changes?