Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13864: provide the construct interceptor for KafkaProducer and KafkaConsumer #12125

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -663,12 +663,38 @@ public KafkaConsumer(Properties properties,
public KafkaConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(configs, keyDeserializer, valueDeserializer, null);
}

/**
* A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value {@link Deserializer}.
lqjack marked this conversation as resolved.
Show resolved Hide resolved
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}.
* <p>
* Note: after creating a {@code KafkaConsumer} you must always {@link #close()} it to avoid resource leaks.
*
* @param configs The consumer configs
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param interceptors The list interceptors for consumer that implements {$link ConsumerInterceptor}.
lqjack marked this conversation as resolved.
Show resolved Hide resolved
*/
public KafkaConsumer(Map<String, Object> configs,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid touching to the existing and offer more flexibility, I would have created a new constructor with a ProducerConfig to allow overriding the method ProducerConfig#getConfiguredInstances to complete the list of interceptors with instances or any other type of instances such as the MetricsReporter of the partitioner. See the Spring issue for a concrete example: spring-projects/spring-kafka#2244
This approach can work as well.

Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
List<ConsumerInterceptor<K, V>> interceptors) {
this(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
keyDeserializer, valueDeserializer);
keyDeserializer, valueDeserializer, interceptors);
}

@SuppressWarnings("unchecked")
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(config, keyDeserializer, valueDeserializer, null);
}

@SuppressWarnings("unchecked")
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer, List<ConsumerInterceptor<K, V>> interceptors) {
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);
Expand Down Expand Up @@ -705,6 +731,9 @@ public KafkaConsumer(Map<String, Object> configs,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
if (interceptors != null && !interceptors.isEmpty()) {
interceptorList.addAll(interceptors);
}
this.interceptors = new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,27 @@ public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, S
keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
}

/**
* A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
lqjack marked this conversation as resolved.
Show resolved Hide resolved
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
* either the string "42" or the integer 42).
* <p>
* Note: after creating a {@code KafkaProducer} you must always {@link #close()} it to avoid resource leaks.
* @param configs The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
* @param interceptors The list interceptors for producer that implements {$link ProducerInterceptor}.
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer,
List<ProducerInterceptor<K, V>> interceptors) {
this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer, null, null,
new ProducerInterceptors<>(interceptors), Time.SYSTEM);
}

/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,17 @@ public void testInvalidSocketReceiveBufferSize() {
() -> new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
}

@Test
public void testConstructConsumerWithInterceptor() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, -2);
List<ConsumerInterceptor<String, String>> interceptors = new ArrayList<>();
interceptors.add(new MockConsumerInterceptor());
assertThrows(KafkaException.class,
() -> new KafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), interceptors));
}

@Test
public void shouldIgnoreGroupInstanceIdForEmptyGroupId() {
Map<String, Object> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,31 @@ public void testSerializerClose() {
assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get());
}

@Test
public void testConstructKafkaProducerWithInterceptor() {
try {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testConstructKafkaProducerWithInterceptor");
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

List<ProducerInterceptor<String, String>> interceptors = new ArrayList<>();
interceptors.add(new MockProducerInterceptor());
KafkaProducer<String, String> producer = new KafkaProducer<>(
configs, new StringSerializer(), new StringSerializer(), interceptors);
assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());

assertNull(MockProducerInterceptor.CLUSTER_META.get());

producer.close();
assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
} finally {
// cleanup since we are using mutable static variables in MockProducerInterceptor
MockProducerInterceptor.resetCounters();
}
}

@Test
public void testInterceptorConstructClose() {
try {
Expand Down