Skip to content

Commit

Permalink
Add support for Kafka 3.9.0
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj committed Oct 11, 2024
1 parent b56b770 commit 2fc58b3
Show file tree
Hide file tree
Showing 54 changed files with 548 additions and 96 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.44.0

* Add support for Kafka 3.9.0
* Add the "Unmanaged" KafkaTopic status update.
* The `ContinueReconciliationOnManualRollingUpdateFailure` feature gate moves to beta stage and is enabled by default.
If needed, `ContinueReconciliationOnManualRollingUpdateFailure` can be disabled in the feature gates configuration in the Cluster Operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,25 @@ public class KafkaVersionTestUtils {

private static final Set<String> SUPPORTED_VERSIONS = new KafkaVersion.Lookup(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()).supportedVersions();

public static final String LATEST_KAFKA_VERSION = "3.8.0";
public static final String LATEST_FORMAT_VERSION = "3.8";
public static final String LATEST_PROTOCOL_VERSION = "3.8";
public static final String LATEST_METADATA_VERSION = "3.8-IV0";
public static final String LATEST_KAFKA_VERSION = "3.9.0";
public static final String LATEST_FORMAT_VERSION = "3.9";
public static final String LATEST_PROTOCOL_VERSION = "3.9";
public static final String LATEST_METADATA_VERSION = "3.9-IV0";
public static final String LATEST_ZOOKEEPER_VERSION = "3.8.4";
public static final String LATEST_CHECKSUM = "ABCD1234";
public static final String LATEST_THIRD_PARTY_VERSION = "3.8.x";
public static final String KAFKA_390_VERSION = "3.9.0";
public static final String LATEST_THIRD_PARTY_VERSION = "3.9.x";
public static final String LATEST_KAFKA_IMAGE = KAFKA_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_CONNECT_IMAGE = KAFKA_CONNECT_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_MIRROR_MAKER_IMAGE = KAFKA_MIRROR_MAKER_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_MIRROR_MAKER_2_IMAGE = KAFKA_MIRROR_MAKER_2_IMAGE_STR + LATEST_KAFKA_VERSION;

public static final String PREVIOUS_KAFKA_VERSION = "3.7.1";
public static final String PREVIOUS_FORMAT_VERSION = "3.7";
public static final String PREVIOUS_PROTOCOL_VERSION = "3.7";
public static final String PREVIOUS_METADATA_VERSION = "3.7-IV4";
public static final String PREVIOUS_KAFKA_VERSION = "3.8.0";
public static final String PREVIOUS_FORMAT_VERSION = "3.8";
public static final String PREVIOUS_PROTOCOL_VERSION = "3.8";
public static final String PREVIOUS_METADATA_VERSION = "3.8-IV0";
public static final String PREVIOUS_ZOOKEEPER_VERSION = "3.8.4";
public static final String PREVIOUS_CHECKSUM = "ABCD1234";
public static final String PREVIOUS_THIRD_PARTY_VERSION = "3.7.x";
public static final String PREVIOUS_THIRD_PARTY_VERSION = "3.8.x";
public static final String PREVIOUS_KAFKA_IMAGE = KAFKA_IMAGE_STR + PREVIOUS_KAFKA_VERSION;
public static final String PREVIOUS_KAFKA_CONNECT_IMAGE = KAFKA_CONNECT_IMAGE_STR + PREVIOUS_KAFKA_VERSION;
public static final String PREVIOUS_KAFKA_MIRROR_MAKER_IMAGE = KAFKA_MIRROR_MAKER_IMAGE_STR + PREVIOUS_KAFKA_VERSION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
public class KafkaBrokerConfigurationBuilderTest {
private final static NodeRef NODE_REF = new NodeRef("my-cluster-kafka-2", 2, "kafka", false, true);

private final static KafkaVersion KAFKA_3_8_0 = new KafkaVersion(KafkaVersionTestUtils.LATEST_KAFKA_VERSION, "", "", "", "", false, false, "");
private final static KafkaVersion KAFKA_3_9_0 = new KafkaVersion(KafkaVersionTestUtils.KAFKA_390_VERSION, "", "", "", "", false, false, "");
private final static KafkaVersion KAFKA_3_8_0 = new KafkaVersion(KafkaVersionTestUtils.PREVIOUS_KAFKA_VERSION, "", "", "", "", false, false, "");
private final static KafkaVersion KAFKA_3_9_0 = new KafkaVersion(KafkaVersionTestUtils.LATEST_KAFKA_VERSION, "", "", "", "", false, false, "");

@ParallelTest
public void testBrokerId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testControllerNodeConfigurationOnMigration() {
assertThat(configuration, containsString("listener.name.controlplane-9090"));
assertThat(configuration, containsString("listeners=CONTROLPLANE-9090://0.0.0.0:9090"));
// controllers never advertises listeners
assertThat(configuration, not(containsString("advertised.listeners")));
assertThat(configuration, containsString("advertised.listeners=CONTROLPLANE-9090://my-cluster-controllers-3.my-cluster-kafka-brokers.my-namespace.svc:9090"));

assertThat(configuration, containsString("controller.listener.names=CONTROLPLANE-9090"));
assertThat(configuration, containsString("controller.quorum.voters=3@my-cluster-controllers-3.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,4@my-cluster-controllers-4.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,5@my-cluster-controllers-5.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,15 +1118,15 @@ public void testPerBrokerConfiguration() {
assertThat(config, CoreMatchers.containsString("node.id=1"));
assertThat(config, CoreMatchers.containsString("log.dirs=/var/lib/kafka/data-0/kafka-log1"));
assertThat(config, CoreMatchers.containsString("\nlisteners=CONTROLPLANE-9090://0.0.0.0:9090\n"));
assertThat(config, not(CoreMatchers.containsString("advertised.listeners")));
assertThat(config, CoreMatchers.containsString("advertised.listeners=CONTROLPLANE-9090://foo-controllers-1.foo-kafka-brokers.test.svc:9090\n"));
assertThat(config, CoreMatchers.containsString("process.roles=controller\n"));
assertThat(config, CoreMatchers.containsString("controller.quorum.voters=0@foo-controllers-0.foo-kafka-brokers.test.svc.cluster.local:9090,[email protected]:9090,[email protected]:9090,[email protected]:9090,[email protected]:9090\n"));

config = KC.generatePerBrokerConfiguration(4, ADVERTISED_HOSTNAMES, ADVERTISED_PORTS);
assertThat(config, CoreMatchers.containsString("node.id=4"));
assertThat(config, CoreMatchers.containsString("log.dirs=/var/lib/kafka/data-0/kafka-log4"));
assertThat(config, CoreMatchers.containsString("\nlisteners=CONTROLPLANE-9090://0.0.0.0:9090,REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092,TLS-9093://0.0.0.0:9093\n"));
assertThat(config, CoreMatchers.containsString("advertised.listeners=REPLICATION-9091://foo-mixed-4.foo-kafka-brokers.test.svc:9091,PLAIN-9092://mixed-4:9092,TLS-9093://mixed-4:10004\n"));
assertThat(config, CoreMatchers.containsString("advertised.listeners=CONTROLPLANE-9090://foo-mixed-4.foo-kafka-brokers.test.svc:9090,REPLICATION-9091://foo-mixed-4.foo-kafka-brokers.test.svc:9091,PLAIN-9092://mixed-4:9092,TLS-9093://mixed-4:10004\n"));
assertThat(config, CoreMatchers.containsString("process.roles=broker,controller\n"));
assertThat(config, CoreMatchers.containsString("controller.quorum.voters=0@foo-controllers-0.foo-kafka-brokers.test.svc.cluster.local:9090,[email protected]:9090,[email protected]:9090,[email protected]:9090,[email protected]:9090\n"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void passwordType() {
@ParallelTest
public void invalidVersion() {
assertConfigError("inter.broker.protocol.version", "dclncswn",
"inter.broker.protocol.version has value 'dclncswn' which does not match the required pattern: \\Q0.8.0\\E(\\.[0-9]+)*|\\Q0.8.0\\E|\\Q0.8.1\\E(\\.[0-9]+)*|\\Q0.8.1\\E|\\Q0.8.2\\E(\\.[0-9]+)*|\\Q0.8.2\\E|\\Q0.9.0\\E(\\.[0-9]+)*|\\Q0.9.0\\E|\\Q0.10.0\\E(\\.[0-9]+)*|\\Q0.10.0-IV0\\E|\\Q0.10.0-IV1\\E|\\Q0.10.1\\E(\\.[0-9]+)*|\\Q0.10.1-IV0\\E|\\Q0.10.1-IV1\\E|\\Q0.10.1-IV2\\E|\\Q0.10.2\\E(\\.[0-9]+)*|\\Q0.10.2-IV0\\E|\\Q0.11.0\\E(\\.[0-9]+)*|\\Q0.11.0-IV0\\E|\\Q0.11.0-IV1\\E|\\Q0.11.0-IV2\\E|\\Q1.0\\E(\\.[0-9]+)*|\\Q1.0-IV0\\E|\\Q1.1\\E(\\.[0-9]+)*|\\Q1.1-IV0\\E|\\Q2.0\\E(\\.[0-9]+)*|\\Q2.0-IV0\\E|\\Q2.0-IV1\\E|\\Q2.1\\E(\\.[0-9]+)*|\\Q2.1-IV0\\E|\\Q2.1-IV1\\E|\\Q2.1-IV2\\E|\\Q2.2\\E(\\.[0-9]+)*|\\Q2.2-IV0\\E|\\Q2.2-IV1\\E|\\Q2.3\\E(\\.[0-9]+)*|\\Q2.3-IV0\\E|\\Q2.3-IV1\\E|\\Q2.4\\E(\\.[0-9]+)*|\\Q2.4-IV0\\E|\\Q2.4-IV1\\E|\\Q2.5\\E(\\.[0-9]+)*|\\Q2.5-IV0\\E|\\Q2.6\\E(\\.[0-9]+)*|\\Q2.6-IV0\\E|\\Q2.7\\E(\\.[0-9]+)*|\\Q2.7-IV0\\E|\\Q2.7-IV1\\E|\\Q2.7-IV2\\E|\\Q2.8\\E(\\.[0-9]+)*|\\Q2.8-IV0\\E|\\Q2.8-IV1\\E|\\Q3.0\\E(\\.[0-9]+)*|\\Q3.0-IV0\\E|\\Q3.0-IV1\\E|\\Q3.1\\E(\\.[0-9]+)*|\\Q3.1-IV0\\E|\\Q3.2\\E(\\.[0-9]+)*|\\Q3.2-IV0\\E|\\Q3.3\\E(\\.[0-9]+)*|\\Q3.3-IV0\\E|\\Q3.3-IV1\\E|\\Q3.3-IV2\\E|\\Q3.3-IV3\\E|\\Q3.4\\E(\\.[0-9]+)*|\\Q3.4-IV0\\E|\\Q3.5\\E(\\.[0-9]+)*|\\Q3.5-IV0\\E|\\Q3.5-IV1\\E|\\Q3.5-IV2\\E|\\Q3.6\\E(\\.[0-9]+)*|\\Q3.6-IV0\\E|\\Q3.6-IV1\\E|\\Q3.6-IV2\\E|\\Q3.7\\E(\\.[0-9]+)*|\\Q3.7-IV0\\E|\\Q3.7-IV1\\E|\\Q3.7-IV2\\E|\\Q3.7-IV3\\E|\\Q3.7-IV4\\E|\\Q3.8\\E(\\.[0-9]+)*|\\Q3.8-IV0\\E|\\Q3.9\\E(\\.[0-9]+)*|\\Q3.9-IV0\\E");
"inter.broker.protocol.version has value 'dclncswn' which does not match the required pattern: \\Q0.8.0\\E(\\.[0-9]+)*|\\Q0.8.0\\E|\\Q0.8.1\\E(\\.[0-9]+)*|\\Q0.8.1\\E|\\Q0.8.2\\E(\\.[0-9]+)*|\\Q0.8.2\\E|\\Q0.9.0\\E(\\.[0-9]+)*|\\Q0.9.0\\E|\\Q0.10.0\\E(\\.[0-9]+)*|\\Q0.10.0-IV0\\E|\\Q0.10.0-IV1\\E|\\Q0.10.1\\E(\\.[0-9]+)*|\\Q0.10.1-IV0\\E|\\Q0.10.1-IV1\\E|\\Q0.10.1-IV2\\E|\\Q0.10.2\\E(\\.[0-9]+)*|\\Q0.10.2-IV0\\E|\\Q0.11.0\\E(\\.[0-9]+)*|\\Q0.11.0-IV0\\E|\\Q0.11.0-IV1\\E|\\Q0.11.0-IV2\\E|\\Q1.0\\E(\\.[0-9]+)*|\\Q1.0-IV0\\E|\\Q1.1\\E(\\.[0-9]+)*|\\Q1.1-IV0\\E|\\Q2.0\\E(\\.[0-9]+)*|\\Q2.0-IV0\\E|\\Q2.0-IV1\\E|\\Q2.1\\E(\\.[0-9]+)*|\\Q2.1-IV0\\E|\\Q2.1-IV1\\E|\\Q2.1-IV2\\E|\\Q2.2\\E(\\.[0-9]+)*|\\Q2.2-IV0\\E|\\Q2.2-IV1\\E|\\Q2.3\\E(\\.[0-9]+)*|\\Q2.3-IV0\\E|\\Q2.3-IV1\\E|\\Q2.4\\E(\\.[0-9]+)*|\\Q2.4-IV0\\E|\\Q2.4-IV1\\E|\\Q2.5\\E(\\.[0-9]+)*|\\Q2.5-IV0\\E|\\Q2.6\\E(\\.[0-9]+)*|\\Q2.6-IV0\\E|\\Q2.7\\E(\\.[0-9]+)*|\\Q2.7-IV0\\E|\\Q2.7-IV1\\E|\\Q2.7-IV2\\E|\\Q2.8\\E(\\.[0-9]+)*|\\Q2.8-IV0\\E|\\Q2.8-IV1\\E|\\Q3.0\\E(\\.[0-9]+)*|\\Q3.0-IV0\\E|\\Q3.0-IV1\\E|\\Q3.1\\E(\\.[0-9]+)*|\\Q3.1-IV0\\E|\\Q3.2\\E(\\.[0-9]+)*|\\Q3.2-IV0\\E|\\Q3.3\\E(\\.[0-9]+)*|\\Q3.3-IV0\\E|\\Q3.3-IV1\\E|\\Q3.3-IV2\\E|\\Q3.3-IV3\\E|\\Q3.4\\E(\\.[0-9]+)*|\\Q3.4-IV0\\E|\\Q3.5\\E(\\.[0-9]+)*|\\Q3.5-IV0\\E|\\Q3.5-IV1\\E|\\Q3.5-IV2\\E|\\Q3.6\\E(\\.[0-9]+)*|\\Q3.6-IV0\\E|\\Q3.6-IV1\\E|\\Q3.6-IV2\\E|\\Q3.7\\E(\\.[0-9]+)*|\\Q3.7-IV0\\E|\\Q3.7-IV1\\E|\\Q3.7-IV2\\E|\\Q3.7-IV3\\E|\\Q3.7-IV4\\E|\\Q3.8\\E(\\.[0-9]+)*|\\Q3.8-IV0\\E|\\Q3.9\\E(\\.[0-9]+)*|\\Q3.9-IV0\\E|\\Q4.0\\E(\\.[0-9]+)*|\\Q4.0-IV0\\E|\\Q4.0-IV1\\E");
}

@ParallelTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ public void startup() throws InterruptedException {
}

private static void waitForAllServicesToStart(Connect connect, int seconds) {
while (!connect.isRunning() && seconds-- > 0) {
while (!connect.herder().isReady() && seconds-- > 0) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new ConnectException(e);
}
}
if (!connect.isRunning()) {
if (!connect.herder().isReady()) {
throw new ConnectException(format("Connect failed to start."));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ private static Map<String, ConfigModel> configs(String version) throws NoSuchMet
continue;
} else if (key.validator != null && "class org.apache.kafka.raft.QuorumConfig$ControllerQuorumBootstrapServersValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.8 and newer
continue;
} else if (key.validator != null && "class org.apache.kafka.common.compress.GzipCompression$LevelValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.8 and newer
} else if (key.validator != null && "class org.apache.kafka.common.compress.GzipCompression$LevelValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.8
descriptor.setPattern("[1-9]{1}|-1");
} else if (key.validator != null && "class org.apache.kafka.common.record.CompressionType$1$1".equals(key.validator.getClass().toString()) && configName.equals("compression.gzip.level")) { // we compare the class names because of changes done between Kafka version 3.8 and 3.9 => this is for Kafka 3.9 and newer. Given it is an anonymous class, we also check the field name to protect against some changes
descriptor.setPattern("[1-9]{1}|-1");
} else if (key.validator != null) {
throw new IllegalStateException("Invalid validator class " + key.validator.getClass() + " for option " + configName);
throw new IllegalStateException("Invalid validator '" + key.validator.getClass() + "' for option '" + configName + "'");
}

result.put(configName, descriptor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
jmx_prometheus_javaagent-1.0.1.jar
snakeyaml-2.2.jar
Loading

0 comments on commit 2fc58b3

Please sign in to comment.