Skip to content

Commit

Permalink
GH-3416: Upgrading to kafka-client 3.8.0
Browse files Browse the repository at this point in the history
Fixes: #3416
  • Loading branch information
sobychacko authored Aug 12, 2024
1 parent 5120260 commit 6874ea3
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ext {
jaywayJsonPathVersion = '2.9.0'
junit4Version = '4.13.2'
junitJupiterVersion = '5.11.0-RC1'
kafkaVersion = '3.7.1'
kafkaVersion = '3.8.0'
kotlinCoroutinesVersion = '1.8.1'
log4jVersion = '2.23.1'
micrometerDocsVersion = '1.0.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* @author Nakul Mishra
* @author Pawel Lozinski
* @author Adrian Chlebosz
* @author Soby Chacko
*
* @since 3.1
*/
Expand Down Expand Up @@ -252,15 +253,15 @@ public void destroy() {
}

private void addDefaultBrokerPropsIfAbsent() {
this.brokerProperties.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true");
this.brokerProperties.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
this.brokerProperties.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), "" + this.count);
this.brokerProperties.putIfAbsent(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
this.brokerProperties.putIfAbsent("delete.topic.enable", "true");
this.brokerProperties.putIfAbsent("group.initial.rebalance.delay.ms", "0");
this.brokerProperties.putIfAbsent("offsets.topic.replication.factor", "" + this.count);
this.brokerProperties.putIfAbsent("num.partitions", "" + this.partitionsPerTopic);
}

private void logDir(Properties brokerConfigProperties) {
try {
brokerConfigProperties.put(KafkaConfig.LogDirProp(),
brokerConfigProperties.put("log.dir",
Files.createTempDirectory("spring.kafka." + UUID.randomUUID()).toString());
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -87,6 +87,7 @@
* @author Nakul Mishra
* @author Pawel Lozinski
* @author Adrian Chlebosz
* @author Soby Chacko
*
* @since 2.2
*/
Expand Down Expand Up @@ -302,17 +303,17 @@ public void afterPropertiesSet() {
}
this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort();
this.kafkaServers.clear();
boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
boolean userLogDir = this.brokerProperties.get("log.dir") != null && this.count == 1;
for (int i = 0; i < this.count; i++) {
Properties brokerConfigProperties = createBrokerProperties(i);
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(),
brokerConfigProperties.setProperty("replica.socket.timeout.ms", "1000");
brokerConfigProperties.setProperty("controller.socket.timeout.ms", "1000");
brokerConfigProperties.setProperty("offsets.topic.replication.factor", "1");
brokerConfigProperties.setProperty("replica.high.watermark.checkpoint.interval.ms",
String.valueOf(Long.MAX_VALUE));
this.brokerProperties.forEach(brokerConfigProperties::put);
if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
if (!this.brokerProperties.containsKey("num.partitions")) {
brokerConfigProperties.setProperty("num.partitions", "" + this.partitionsPerTopic);
}
if (!userLogDir) {
logDir(brokerConfigProperties);
Expand All @@ -337,7 +338,7 @@ public void afterPropertiesSet() {

private void logDir(Properties brokerConfigProperties) {
try {
brokerConfigProperties.put(KafkaConfig.LogDirProp(),
brokerConfigProperties.put("log.dir",
Files.createTempDirectory("spring.kafka." + UUID.randomUUID()).toString());
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;

Expand All @@ -28,6 +29,7 @@

/**
* @author Gary Russell
* @author Soby Chacko
* @since 2.3
*
*/
Expand All @@ -45,10 +47,10 @@ void test() {
Bytes bytes = Bytes.wrap("baz".getBytes());
out = serializer.serialize("x", bytes);
assertThat(out).isEqualTo("baz".getBytes());
assertThat(KafkaTestUtils.getPropertyValue(serializer, "stringSerializer.encoding")).isEqualTo("UTF-8");
assertThat(KafkaTestUtils.getPropertyValue(serializer, "stringSerializer.encoding")).isEqualTo(StandardCharsets.UTF_8);
Map<String, Object> configs = Collections.singletonMap("serializer.encoding", "UTF-16");
serializer.configure(configs, false);
assertThat(KafkaTestUtils.getPropertyValue(serializer, "stringSerializer.encoding")).isEqualTo("UTF-16");
assertThat(KafkaTestUtils.getPropertyValue(serializer, "stringSerializer.encoding")).isEqualTo(StandardCharsets.UTF_16);
}

}

0 comments on commit 6874ea3

Please sign in to comment.