-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MirrorMaker2: Check if bootstrap servers are the same for different cluster aliases #10632
MirrorMaker2: Check if bootstrap servers are the same for different cluster aliases #10632
Conversation
Slack Discussion : https://cloud-native.slack.com/archives/CMH3Q3SNP/p1725430611080659 |
...er-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java
Outdated
Show resolved
Hide resolved
...perator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java
Outdated
Show resolved
Hide resolved
...er-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java
Outdated
Show resolved
Hide resolved
I had a pass with no additional comments to the ones already there. I will have another one when comments are addressed. |
4d90a71
to
ed65c33
Compare
ed65c33
to
6a70be1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
...er-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java
Outdated
Show resolved
Hide resolved
...er-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java
Outdated
Show resolved
Hide resolved
...er-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java
Outdated
Show resolved
Hide resolved
private static boolean hasMatchingBootstrapServers(KafkaMirrorMaker2 kafkaMirrorMaker2, String connectClusterAlias, String targetClusterAlias) { | ||
List<String> configs = Optional.ofNullable(kafkaMirrorMaker2) | ||
.map(KafkaMirrorMaker2::getSpec) | ||
.map(KafkaMirrorMaker2Spec::getClusters) | ||
.orElse(Collections.emptyList()) | ||
.stream() | ||
.filter(cluster -> connectClusterAlias.equals(cluster.getAlias()) || targetClusterAlias.equals(cluster.getAlias())) | ||
.map(KafkaMirrorMaker2ClusterSpec::getBootstrapServers) | ||
.toList(); | ||
|
||
return configs.size() == 2 && configs.get(0).equals(configs.get(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks super complex and hard to read. Why don't you pass the list of clusters here, find the one for the first alias, find the one for the second alias and then compare them?
...er-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java
Show resolved
Hide resolved
...perator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java
Outdated
Show resolved
Hide resolved
...perator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java
Show resolved
Hide resolved
…luster aliases Signed-off-by: Varada Sunanda <[email protected]>
…luster aliases Signed-off-by: Varada Sunanda <[email protected]>
Signed-off-by: Varada Sunanda <[email protected]>
70aef0b
to
34acd9f
Compare
Signed-off-by: Varada Sunanda <[email protected]>
34acd9f
to
233cea5
Compare
.editSpec() | ||
.withConnectCluster("source") | ||
.addToClusters(new KafkaMirrorMaker2ClusterSpecBuilder() | ||
.withAlias("third") | ||
.withBootstrapServers("source:9092") | ||
.build()) | ||
.editMirror(0) | ||
.withTargetCluster("third") | ||
.endMirror() | ||
.endSpec() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be aligned according to the edit
/ end
methods. Same applies to other two tests below.
.editSpec() | |
.withConnectCluster("source") | |
.addToClusters(new KafkaMirrorMaker2ClusterSpecBuilder() | |
.withAlias("third") | |
.withBootstrapServers("source:9092") | |
.build()) | |
.editMirror(0) | |
.withTargetCluster("third") | |
.endMirror() | |
.endSpec() | |
.editSpec() | |
.withConnectCluster("source") | |
.addToClusters(new KafkaMirrorMaker2ClusterSpecBuilder() | |
.withAlias("third") | |
.withBootstrapServers("source:9092") | |
.build()) | |
.editMirror(0) | |
.withTargetCluster("third") | |
.endMirror() | |
.endSpec() |
...perator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java
Outdated
Show resolved
Hide resolved
List<String> configs = clusterList | ||
.stream() | ||
.filter(cluster -> connectClusterAlias.equals(cluster.getAlias()) || targetClusterAlias.equals(cluster.getAlias())) | ||
.map(KafkaMirrorMaker2ClusterSpec::getBootstrapServers) | ||
.toList(); | ||
|
||
return configs.size() == 2 && configs.get(0).equals(configs.get(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you insist on doing it this comúplicated way despite my comments, maybe you can add at least some comment/JavaDoc to what it does and how.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies @scholzj, I missed that earlier comment. Did you mean something like this?
private static boolean hasMatchingBootstrapServers(List<KafkaMirrorMaker2ClusterSpec> clusterList, String connectClusterAlias, String targetClusterAlias) {
// Find the cluster for the connectClusterAlias
String connectClusterBootstrap = clusterList.stream()
.filter(cluster -> connectClusterAlias.equals(cluster.getAlias()))
.map(KafkaMirrorMaker2ClusterSpec::getBootstrapServers)
.findFirst()
.orElse(null);
// Find the cluster for the targetClusterAlias
String targetClusterBootstrap = clusterList.stream()
.filter(cluster -> targetClusterAlias.equals(cluster.getAlias()))
.map(KafkaMirrorMaker2ClusterSpec::getBootstrapServers)
.findFirst()
.orElse(null);
// Return true if both are found and have matching bootstrap servers
return connectClusterBootstrap != null && connectClusterBootstrap.equals(targetClusterBootstrap);
}
Let me know if this is what you were suggesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is what I would have used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I have pushed in the latest changes. Please can you re-review.
Signed-off-by: Varada Sunanda <[email protected]>
/azp run regression |
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
Type of change
Select the type of your PR
Description
After the changes we made in the PR #9923, the current condition prevents users from independently managing the connections between Kafka Connect and Kafka, as well as between the MM2 connectors and Kafka.
Specifically, we’re using two distinct KafkaUsers, each assigned with minimal set of ACLs that are required.
Our proposed solution is to modify the condition to permit scenarios where the connectCluster bootstrapServer and the mirror.targetCluster bootstrapServer are identical.
Checklist
Please go through this checklist and make sure all applicable tasks have been done