Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MirrorMaker2: Check if bootstrap servers are the same for different cluster aliases #10632

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ public static KafkaMirrorMaker2Connectors fromCrd(Reconciliation reconciliation,
errorMessages.add("Target cluster alias " + mirror.getTargetCluster() + " is used in a mirror definition, but cluster with this alias does not exist in cluster definitions");
}

if (!mirror.getTargetCluster().equals(connectCluster)) {
errorMessages.add("Connect cluster alias (currently set to " + connectCluster + ") has to be the same as the target cluster alias " + mirror.getTargetCluster());
if (!mirror.getTargetCluster().equals(connectCluster) && !hasMatchingBootstrapServers(kafkaMirrorMaker2.getSpec().getClusters(), connectCluster, mirror.getTargetCluster())) {
errorMessages.add("Connect cluster alias (currently set to " + connectCluster + ") must match the target cluster alias " + mirror.getTargetCluster() + " or both clusters must have the same bootstrap servers.");
}
}

Expand Down Expand Up @@ -326,4 +326,23 @@ private static String addTLSConfigToMirrorMaker2ConnectorConfig(Map<String, Obje
return "PLAINTEXT";
}
}

private static boolean hasMatchingBootstrapServers(List<KafkaMirrorMaker2ClusterSpec> clusterList, String connectClusterAlias, String targetClusterAlias) {
// Find the cluster for the connectClusterAlias
String connectClusterBootstrap = clusterList.stream()
.filter(cluster -> connectClusterAlias.equals(cluster.getAlias()))
.map(KafkaMirrorMaker2ClusterSpec::getBootstrapServers)
.findFirst()
.orElse(null);

// Find the cluster for the targetClusterAlias
String targetClusterBootstrap = clusterList.stream()
.filter(cluster -> targetClusterAlias.equals(cluster.getAlias()))
.map(KafkaMirrorMaker2ClusterSpec::getBootstrapServers)
.findFirst()
.orElse(null);

varada-sunanda-ibm marked this conversation as resolved.
Show resolved Hide resolved
// Return true if both are found and have matching bootstrap servers
return connectClusterBootstrap != null && connectClusterBootstrap.equals(targetClusterBootstrap);
}
}
varada-sunanda-ibm marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testFailingValidation() {
assertThat(ex.getMessage(), is("KafkaMirrorMaker2 resource validation failed: " +
"[Each MirrorMaker 2 mirror definition has to specify the source cluster alias, " +
"Target cluster alias wrong-target is used in a mirror definition, but cluster with this alias does not exist in cluster definitions, " +
"Connect cluster alias (currently set to target) has to be the same as the target cluster alias wrong-target]"));
"Connect cluster alias (currently set to target) must match the target cluster alias wrong-target or both clusters must have the same bootstrap servers.]"));
}

@Test
Expand All @@ -131,7 +131,7 @@ public void testMirrorTargetClusterNotSameAsConnectCluster() {
.build();
InvalidResourceException ex = assertThrows(InvalidResourceException.class, () -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2));
assertThat(ex.getMessage(), is("KafkaMirrorMaker2 resource validation failed: " +
"[Connect cluster alias (currently set to source) has to be the same as the target cluster alias target]"));
"[Connect cluster alias (currently set to source) must match the target cluster alias target or both clusters must have the same bootstrap servers.]"));

// A case where one mirror has the correct target cluster, but the other does not
KafkaMirrorMaker2 kmm2CorrectAndIncorrectMirror = new KafkaMirrorMaker2Builder(KMM2)
Expand All @@ -147,7 +147,62 @@ public void testMirrorTargetClusterNotSameAsConnectCluster() {
.build();
ex = assertThrows(InvalidResourceException.class, () -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2CorrectAndIncorrectMirror));
assertThat(ex.getMessage(), is("KafkaMirrorMaker2 resource validation failed: " +
"[Connect cluster alias (currently set to target) has to be the same as the target cluster alias third]"));
"[Connect cluster alias (currently set to target) must match the target cluster alias third or both clusters must have the same bootstrap servers.]"));
}

@Test
public void testClusterNotSameButBootstrapUrlSame() {
KafkaMirrorMaker2 kmm2 = new KafkaMirrorMaker2Builder(KMM2)
.editSpec()
.withConnectCluster("source")
.addToClusters(new KafkaMirrorMaker2ClusterSpecBuilder()
.withAlias("third")
.withBootstrapServers("source:9092")
.build())
.editMirror(0)
.withTargetCluster("third")
.endMirror()
.endSpec()
.build();

assertDoesNotThrow(() -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2));
}

@Test
public void testSourceClusterNotConnectCluster() {
KafkaMirrorMaker2 kmm2 = new KafkaMirrorMaker2Builder(KMM2)
.editSpec()
.withConnectCluster("target")
.addToClusters(new KafkaMirrorMaker2ClusterSpecBuilder()
.withAlias("third")
.withBootstrapServers("source:9092")
.build())
.editMirror(0)
.withTargetCluster("third")
.endMirror()
.endSpec()
.build();

InvalidResourceException ex = assertThrows(InvalidResourceException.class, () -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2));
assertThat(ex.getMessage(), is("KafkaMirrorMaker2 resource validation failed: " +
"[Connect cluster alias (currently set to target) must match the target cluster alias third or both clusters must have the same bootstrap servers.]"));
}

@Test
public void testMultipleMirrors() {
KafkaMirrorMaker2 kmm2CorrectAndIncorrectMirror = new KafkaMirrorMaker2Builder(KMM2)
.editSpec()
.addToClusters(new KafkaMirrorMaker2ClusterSpecBuilder()
.withAlias("fourth")
.withBootstrapServers("target:9092")
.build())
.addToMirrors(new KafkaMirrorMaker2MirrorSpecBuilder()
.withSourceCluster("source")
.withTargetCluster("fourth").build())
.endSpec()
.build();

assertDoesNotThrow(() -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2CorrectAndIncorrectMirror));
}

@Test
Expand Down
Loading