diff --git a/README.md b/README.md index 7ce5c79e..21895c77 100644 --- a/README.md +++ b/README.md @@ -63,14 +63,16 @@ By default the connector will attempt to use Kafka client config from the worker the control topic. If that config cannot be read for some reason, Kafka client settings can be set explicitly using `iceberg.kafka.*` properties. -### Source topic offsets +### Consumer offsets -Source topic offsets are stored in two different consumer groups. The first is the sink-managed consumer -group defined by the `iceberg.control.group-id` property. The second is the Kafka Connect managed -consumer group which is named `connect-` by default. The sink-managed consumer -group is used by the sink to achieve exactly-once processing. The Kafka Connect consumer group is -only used as a fallback if the sink-managed consumer group is missing. To reset the offsets, -both consumer groups need to be reset. +Source topic offsets are stored in Kafka Connect consumer group (which is named `connect-` by default). +To reset the source topic offsets of the connector, the Kafka Connect consumer group needs to be reset. + +Control topic offsets are stored in a separate, sink-managed consumer group which we'll refer to as the Coordinator +consumer group. By default, this will be something like `cg-control--coord` (unless you've configured +your connector with an explicit `iceberg.control.group-id` in which case it will be something like +`-coord`). To reset control topic offsets of the connector, the Coordinator consumer group +needs to be reset. ### Message format @@ -170,6 +172,105 @@ from the classpath are loaded. Next, if `iceberg.hadoop-conf-dir` is specified, are loaded from that location. Finally, any `iceberg.hadoop.*` properties from the sink config are applied. When merging these, the order of precedence is sink config > config dir > classpath. +# Upgrade + +## Upgrading from 0.6.X to 0.7.0 + +Prior to version 0.7.0, the consumer offsets for the source topics were tracked by both the connect-group-id and the +control-group-id. It's important to note that the consumer offsets stored in the control-group-id were always considered +the "source-of-truth" and could be ahead of those tracked by the connect-group-id in exceptional, temporary situations. + +Starting from version 0.7.0, consumer offsets for the source topic are now tracked by the connect-group-id _exclusively_ +i.e. consumer offsets for the source topics will no longer be tracked by the control-group-id. This change is necessary +to eliminate duplicates from zombie tasks. This means that the new "source-of-truth" for source topic consumer offsets +will be the connect-group-id. + +Unfortunately, this is a breaking change and the upgrade process itself introduces a risk of duplicate records being written to Iceberg. +If you don't care about a small number of duplicates, you can just upgrade to version 0.7.0 just like any other patch release. +However, if you do want to avoid duplicates during the upgrade process, please read the following general instructions for upgrading connectors safely. +Please note that the following instructions are written assuming you are running Kafka Connect version 3.6.0. +You may need to adjust the approach depending on your version of Kafka Connect and your deployment process. + +### Step 1 +Stop all existing Iceberg Sink connectors running on the Kafka Connect cluster. +We need to stop the connectors because we will potentially be resetting consumer offsets for these connectors later and it is not possible to do this without stopping the connectors. + +You can stop a connector via the Kafka Connect REST API e.g. +```bash +curl -X PUT http://localhost:8083/connectors//stop +``` + +### Step 2 +Fetch the current consumer offsets of the connect-group-id and the control-group-id. +The connect-group-id will be something like `connect-`. +By default, the control-group-id will be something like `cg-control-` unless you've configured your connector with an explicit `iceberg.control.group-id`. + +Be careful not to confuse the control-group-id with the coordinator-consumer-group-id. +The coordinator-consumer-group-id looks very similar to the control-group-id but has a `-coord` suffix e.g. `cg-control--coord`. +We are only interested in the **connect-group-id** and **control-group-id** for the purposes of this migration. +You should _not_ interact with the **coordinator-consumer-group-id** for the purposes of this migration. + +You can retrieve the current consumer offsets for a given consumer-group-id using the `kafka-consumer-groups.sh` tool e.g. +```bash +./kafka-consumer-groups.sh \ +--bootstrap-server \ +--describe \ +--group + +# Consumer group 'connect-my-connector' has no active members. +# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID +# connect-my-connector my-topic-name 0 900 1000 100 - - - + +# Consumer group 'cg-control-my-connector' has no active members. +# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID +# cg-control-my-connector my-topic-name 0 1000 1000 0 - - - +``` + +### Step 3 + +Move the consumer offsets for the connect-group-id forward so that they match those of the control-group-id, if necessary. +If the consumer offsets for the connect-group-id and the control-group-id are already the same, no action is needed for +this step. +If however you see that the connect-group-id consumer offsets are behind those of the control-group-id, you will need to +move the consumer offsets of the connect-group-id forward to match those of the control-group-id. +Note: It is impossible for the consumer offsets of the connect-group-id to be ahead those of the control-group-id for +connector version < 0.7.0. + +You can reset consumer offsets for the connect-group-id using the Kafka Connect REST API e.g. +```bash +curl -X PATCH \ +--header "Content-Type: application/json" \ +--data '{ "offsets": [ { "partition": { "kafka_topic": "my_topic_name", "kafka_partition": 0 }, "offset": { "kafka_offset": 1000 } } ] }' \ +localhost:8083/connectors//offsets +# {"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."} +``` + +### Step 4 + +If you have successfully completed the above steps for all Iceberg Sink connectors running on the Kafka Connect cluster, +it is now safe to update the Iceberg Sink Connector version on all workers in the Kafka Connect cluster to version 0.7.0. + +You can check the installed connector version using the Kafka Connect REST API e.g. +```bash +curl localhost:8083/connector-plugins +# [{"class": "io.tabular.iceberg.connect.IcebergSinkConnector", "type": "sink", "version": "1.5.2-kc-0.7.0"}] +``` + +### Step 5 + +Once the Iceberg Sink Connector version on the cluster has been updated to 0.7.0, it is safe to resume the connectors +that we stopped in step 1. + +You can resume a connector via the Kafka Connect REST API e.g. +```bash +curl -X PUT http://localhost:8083/connectors//resume +``` + +At this point, the upgrade process is complete. + +Note: The now unused control-group-id will eventually be removed from Kafka automatically (by default after 7 days) so +no special action is necessary there. + # Examples ## Initial setup diff --git a/docs/design.md b/docs/design.md index 6ddaaa2b..0e90c194 100644 --- a/docs/design.md +++ b/docs/design.md @@ -73,12 +73,10 @@ There are two sets of offsets to manage, the offsets for the source topic(s) and #### Source topic -The offsets for the source topic are managed by the workers. A worker sends its data files events and also commits the source offsets to a sink-managed consumer group within a Kafka transaction. All control topic consumers have isolation set to read only committed events. This ensures that files sent to the coordinator correspond to the source topic offsets that were stored. +The offsets for the source topic are managed by the workers. A worker sends its data files events and also commits the source offsets to the Kafka Connect consumer group within a Kafka transaction. All control topic consumers have isolation set to read only committed events. This ensures that files sent to the coordinator correspond to the source topic offsets that were stored. The Kafka Connect managed offsets are kept in sync during flushes. The reason behind having a second consumer group, rather than only using the Kafka Connect consumer group, is to ensure that the offsets are committed in a transaction with the sending of the data files events. The Kafka Connect consumer group cannot be directly updated as it has active consumers. -When a task starts up, the consumer offsets are initialized to those in the sink-managed consumer group rather than the Kafka Connect consumer group. The offsets in the Kafka Connect consumer group are only be used if offsets in the sink-managed group are missing. The offsets in the sink-managed group are the source of truth. - #### Control topic On coordinator startup, the control topic offsets are restored from the consumer group. Any data files events added after the offsets are processed during startup. If the consumer group had not yet been initialized, then the coordinator’s consumer starts reading from the latest. @@ -103,14 +101,16 @@ An upsert mode is also supported for data that is not in change data capture for The connector has exactly-once semantics. Workers ensure this by sending the data files events and committing offsets for the source topic within a Kafka transaction. The coordinator ensures this by setting the control topic consumer to only read committed events, and also by saving the offsets for the control topic as part of the Iceberg commit data. -* The offsets for the source topic in the sink-managed consumer group correspond to the data files events successfully written to the control topic. +* The offsets for the source topic in the Kafka Connect consumer group correspond to the data files events successfully written to the control topic. * The offsets for the control topic correspond to the Iceberg snapshot, as the offsets are stored in the snapshot metadata. ### Zombie fencing -If a task encounters a very heavy GC cycle during a transaction that causes a pause longer than the consumer session timeout (45 seconds by default), a partition might be assigned to a different task even though the “zombie” is still alive (but in a degraded state). - -In this circumstance, the new worker starts reading from the current committed offsets. When the zombie starts processing again, it complete the commit. This could lead to duplicates in this extreme case. Zombie fencing will be targeted for a future release. +If the task running the Coordinator process encounters a heavy GC cycle that causes a pause longer than the consumer session timeout (45 seconds by default), it may become a zombie. +In this scenario, Kafka Connect will replace that task with a new one even though the “zombie” is still alive (but in a degraded state). +A new Coordinator process will begin processing datafiles from the control topic. +When the zombie starts processing again later, it may commit a datafile that has already been committed by the new Coordinator process, leading to duplicates in this extreme case. +Coordinator zombie fencing will be targeted for a future release. ## Error Handling @@ -120,7 +120,7 @@ All errors in the connector itself are non-retryable. This includes errors durin ### Worker fails during processing -If a failure occurs on a worker while processing messages or writing files, an exception is thrown and the task restarts from the last Kafka offsets committed to the sink-managed consumer group. Any data that had been written since the last commit is left in place, uncommitted. New data files are written from the offsets, and only these will be committed. Table maintenance should be performed regularly to clean up the orphaned files. +If a failure occurs on a worker while processing messages or writing files, an exception is thrown and the task restarts from the last Kafka offsets committed to the Kafka Connect managed consumer group. Any data that had been written since the last commit is left in place, uncommitted. New data files are written from the offsets, and only these will be committed. Table maintenance should be performed regularly to clean up the orphaned files. ### Worker fails to receive begin commit event @@ -144,10 +144,10 @@ If the table is rolled back to an older snapshot, then that also rolls back to o * Optionally commit as unpartitioned to avoid many small files * More seamless snapshot rollback behavior -* Zombie fencing during offset commit * Pluggable commit coordinator * Allow a backend to handle instead of requiring a control topic * Distribute commits across workers +* Coordinator zombie fencing ## Alternatives Considered diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ce279ad8..905c807f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -43,6 +43,7 @@ kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafk kafka-connect-api = { module = "org.apache.kafka:connect-api", version.ref = "kafka-ver" } kafka-connect-json = { module = "org.apache.kafka:connect-json", version.ref = "kafka-ver" } kafka-connect-transforms = { module = "org.apache.kafka:connect-transforms", version.ref = "kafka-ver" } +kafka-connect-runtime = { module = "org.apache.kafka:connect-runtime", version.ref = "kafka-ver" } slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j-ver" } # test dependencies @@ -64,7 +65,7 @@ palantir-gradle = "com.palantir.baseline:gradle-baseline-java:4.42.0" iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet", "iceberg-kafka-connect-events"] iceberg-ext = ["iceberg-aws", "iceberg-aws-bundle", "iceberg-azure", "iceberg-azure-bundle", "iceberg-gcp","iceberg-gcp-bundle", "iceberg-nessie"] jackson = ["jackson-core", "jackson-databind"] -kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms"] +kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms", "kafka-connect-runtime"] [plugins] diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java index 10730d99..1534edac 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java @@ -19,7 +19,6 @@ package io.tabular.iceberg.connect.channel; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.data.Offset; @@ -29,7 +28,6 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ExecutionException; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.connect.events.DataComplete; import org.apache.iceberg.connect.events.DataWritten; @@ -42,11 +40,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +52,7 @@ public class CommitterImpl extends Channel implements Committer, AutoCloseable { private final SinkTaskContext context; private final IcebergSinkConfig config; private final Optional maybeCoordinatorThread; + private final ConsumerGroupMetadata consumerGroupMetadata; public CommitterImpl(SinkTaskContext context, IcebergSinkConfig config, Catalog catalog) { this(context, config, catalog, new KafkaClientFactory(config.kafkaProps())); @@ -92,11 +88,14 @@ private CommitterImpl( this.maybeCoordinatorThread = coordinatorThreadFactory.create(context, config); - // The source-of-truth for source-topic offsets is the control-group-id - Map stableConsumerOffsets = - fetchStableConsumerOffsets(config.controlGroupId()); - // Rewind kafka connect consumer to avoid duplicates - context.offset(stableConsumerOffsets); + ConsumerGroupMetadata groupMetadata; + try { + groupMetadata = KafkaUtils.consumerGroupMetadata(context); + } catch (IllegalArgumentException e) { + LOG.warn("Could not extract ConsumerGroupMetadata from consumer inside Kafka Connect, falling back to simple ConsumerGroupMetadata which can result in duplicates from zombie tasks"); + groupMetadata = new ConsumerGroupMetadata(config.connectGroupId()); + } + this.consumerGroupMetadata = groupMetadata; consumeAvailable( // initial poll with longer duration so the consumer will initialize... @@ -108,20 +107,6 @@ private CommitterImpl( () -> new Committable(ImmutableMap.of(), ImmutableList.of()))); } - private Map fetchStableConsumerOffsets(String groupId) { - try { - ListConsumerGroupOffsetsResult response = - admin() - .listConsumerGroupOffsets( - groupId, new ListConsumerGroupOffsetsOptions().requireStable(true)); - return response.partitionsToOffsetAndMetadata().get().entrySet().stream() - .filter(entry -> context.assignment().contains(entry.getKey())) - .collect(toMap(Map.Entry::getKey, entry -> entry.getValue().offset())); - } catch (InterruptedException | ExecutionException e) { - throw new ConnectException(e); - } - } - private void throwExceptionIfCoordinatorIsTerminated() { if (maybeCoordinatorThread.map(CoordinatorThread::isTerminated).orElse(false)) { throw new IllegalStateException("Coordinator unexpectedly terminated"); @@ -183,8 +168,7 @@ private void sendCommitResponse(UUID commitId, CommittableSupplier committableSu events.add(commitReady); Map offsets = committable.offsetsByTopicPartition(); - send(events, offsets, new ConsumerGroupMetadata(config.controlGroupId())); - send(ImmutableList.of(), offsets, new ConsumerGroupMetadata(config.connectGroupId())); + send(events, offsets, consumerGroupMetadata); } @Override diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/KafkaUtils.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/KafkaUtils.java index c3caaf68..8c9430f7 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/KafkaUtils.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/KafkaUtils.java @@ -19,11 +19,17 @@ package io.tabular.iceberg.connect.channel; import java.util.concurrent.ExecutionException; + +import org.apache.iceberg.common.DynFields; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.WorkerSinkTaskContext; +import org.apache.kafka.connect.sink.SinkTaskContext; public class KafkaUtils { @@ -40,5 +46,18 @@ public static ConsumerGroupDescription consumerGroupDescription( } } + private static final String WorkerSinkTaskContextClassName = + WorkerSinkTaskContext.class.getName(); + + @SuppressWarnings("unchecked") + public static ConsumerGroupMetadata consumerGroupMetadata(SinkTaskContext sinkTaskContext) { + return ((Consumer) DynFields + .builder() + .hiddenImpl(WorkerSinkTaskContextClassName, "consumer") + .build(sinkTaskContext) + .get()) + .groupMetadata(); + } + private KafkaUtils() {} } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java index b3365be3..c3f94d80 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java @@ -25,7 +25,6 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.tabular.iceberg.connect.IcebergSinkConfig; @@ -63,7 +62,9 @@ import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -72,15 +73,19 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.runtime.WorkerSinkTaskContext; import org.apache.kafka.connect.sink.SinkTaskContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import java.util.Collection; +import java.util.Properties; + class CommitterImplTest { private static final String CATALOG_NAME = "iceberg"; @@ -147,10 +152,14 @@ private static IcebergSinkConfig makeConfig(int taskId) { private static final IcebergSinkConfig CONFIG = makeConfig(1); - private SinkTaskContext mockContext() { - SinkTaskContext mockContext = mock(SinkTaskContext.class); - when(mockContext.assignment()).thenReturn(ASSIGNED_SOURCE_TOPIC_PARTITIONS); - return mockContext; + private WorkerSinkTaskContext workerSinkTaskContext(IcebergSinkConfig config, Collection assignedSourceTopicPartitions) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, config.connectGroupId()); + KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + kafkaConsumer.assign(assignedSourceTopicPartitions); + + return new WorkerSinkTaskContext(kafkaConsumer, null, null); } private static DynConstructors.Ctor ctorCoordinatorKey() { @@ -275,7 +284,7 @@ private void assertDataComplete( new TopicPartition(x.topic(), x.partition()), Pair.of(x.offset(), x.timestamp()))) .collect(Collectors.toList())) - .isEqualTo( + .containsExactlyInAnyOrderElementsOf( expectedAssignments.entrySet().stream() .map(e -> Pair.of(e.getKey(), e.getValue())) .collect(Collectors.toList())); @@ -285,42 +294,14 @@ private OffsetDateTime offsetDateTime(Long ms) { return OffsetDateTime.ofInstant(Instant.ofEpochMilli(ms), ZoneOffset.UTC); } - @Test - public void - testShouldRewindOffsetsToStableControlGroupConsumerOffsetsForAssignedPartitionsOnConstruction() - throws IOException { - SinkTaskContext mockContext = mockContext(); - - ArgumentCaptor> offsetArgumentCaptor = - ArgumentCaptor.forClass(Map.class); - - IcebergSinkConfig config = makeConfig(1); - - NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); - - whenAdminListConsumerGroupOffsetsThenReturn( - ImmutableMap.of( - config.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L), - config.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 90L, SOURCE_TP1, 80L))); - - try (CommitterImpl ignored = - new CommitterImpl(mockContext, config, kafkaClientFactory, coordinatorThreadFactory)) { - initConsumer(); - - verify(mockContext).offset(offsetArgumentCaptor.capture()); - assertThat(offsetArgumentCaptor.getAllValues()) - .isEqualTo(ImmutableList.of(ImmutableMap.of(SOURCE_TP0, 110L))); - } - } - @Test public void testCommitShouldThrowExceptionIfCoordinatorIsTerminated() throws IOException { - SinkTaskContext mockContext = mockContext(); IcebergSinkConfig config = makeConfig(0); + WorkerSinkTaskContext context = workerSinkTaskContext(config, ASSIGNED_SOURCE_TOPIC_PARTITIONS); whenAdminListConsumerGroupOffsetsThenReturn( ImmutableMap.of( - config.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + config.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); TerminatedCoordinatorThreadFactory coordinatorThreadFactory = new TerminatedCoordinatorThreadFactory(); @@ -331,7 +312,7 @@ public void testCommitShouldThrowExceptionIfCoordinatorIsTerminated() throws IOE }; try (CommitterImpl committerImpl = - new CommitterImpl(mockContext, config, kafkaClientFactory, coordinatorThreadFactory)) { + new CommitterImpl(context, config, kafkaClientFactory, coordinatorThreadFactory)) { initConsumer(); Committer committer = committerImpl; @@ -346,13 +327,13 @@ public void testCommitShouldThrowExceptionIfCoordinatorIsTerminated() throws IOE @Test public void testCommitShouldDoNothingIfThereAreNoMessages() throws IOException { - SinkTaskContext mockContext = mockContext(); + WorkerSinkTaskContext context = workerSinkTaskContext(CONFIG, ASSIGNED_SOURCE_TOPIC_PARTITIONS); NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); whenAdminListConsumerGroupOffsetsThenReturn( ImmutableMap.of( - CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); CommittableSupplier committableSupplier = () -> { @@ -360,7 +341,7 @@ public void testCommitShouldDoNothingIfThereAreNoMessages() throws IOException { }; try (CommitterImpl committerImpl = - new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + new CommitterImpl(context, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { initConsumer(); Committer committer = committerImpl; @@ -373,13 +354,13 @@ public void testCommitShouldDoNothingIfThereAreNoMessages() throws IOException { @Test public void testCommitShouldDoNothingIfThereIsNoCommitRequestMessage() throws IOException { - SinkTaskContext mockContext = mockContext(); + WorkerSinkTaskContext context = workerSinkTaskContext(CONFIG, ASSIGNED_SOURCE_TOPIC_PARTITIONS); NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); whenAdminListConsumerGroupOffsetsThenReturn( ImmutableMap.of( - CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); CommittableSupplier committableSupplier = () -> { @@ -387,7 +368,7 @@ public void testCommitShouldDoNothingIfThereIsNoCommitRequestMessage() throws IO }; try (CommitterImpl committerImpl = - new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + new CommitterImpl(context, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { initConsumer(); Committer committer = committerImpl; @@ -411,14 +392,14 @@ public void testCommitShouldDoNothingIfThereIsNoCommitRequestMessage() throws IO @Test public void testCommitShouldRespondToCommitRequest() throws IOException { - SinkTaskContext mockContext = mockContext(); + WorkerSinkTaskContext context = workerSinkTaskContext(CONFIG, ASSIGNED_SOURCE_TOPIC_PARTITIONS); NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); UUID commitId = UUID.randomUUID(); whenAdminListConsumerGroupOffsetsThenReturn( ImmutableMap.of( - CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); List dataFiles = ImmutableList.of(createDataFile()); List deleteFiles = ImmutableList.of(); @@ -432,7 +413,7 @@ public void testCommitShouldRespondToCommitRequest() throws IOException { new WriterResult(TABLE_1_IDENTIFIER, dataFiles, deleteFiles, partitionStruct))); try (CommitterImpl committerImpl = - new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + new CommitterImpl(context, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { initConsumer(); Committer committer = committerImpl; @@ -464,19 +445,17 @@ public void testCommitShouldRespondToCommitRequest() throws IOException { commitId, ImmutableMap.of(SOURCE_TP0, Pair.of(100L, offsetDateTime(200L)))); - assertThat(producer.consumerGroupOffsetsHistory()).hasSize(2); + assertThat(producer.consumerGroupOffsetsHistory()).hasSize(1); Map expectedConsumerOffset = ImmutableMap.of(SOURCE_TP0, new OffsetAndMetadata(100L)); assertThat(producer.consumerGroupOffsetsHistory().get(0)) - .isEqualTo(ImmutableMap.of(CONFIG.controlGroupId(), expectedConsumerOffset)); - assertThat(producer.consumerGroupOffsetsHistory().get(1)) .isEqualTo(ImmutableMap.of(CONFIG.connectGroupId(), expectedConsumerOffset)); } } @Test public void testCommitWhenCommittableIsEmpty() throws IOException { - SinkTaskContext mockContext = mockContext(); + WorkerSinkTaskContext context = workerSinkTaskContext(CONFIG, ASSIGNED_SOURCE_TOPIC_PARTITIONS); NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); @@ -484,13 +463,13 @@ public void testCommitWhenCommittableIsEmpty() throws IOException { whenAdminListConsumerGroupOffsetsThenReturn( ImmutableMap.of( - CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); CommittableSupplier committableSupplier = () -> new Committable(ImmutableMap.of(), ImmutableList.of()); try (CommitterImpl committerImpl = - new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + new CommitterImpl(context, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { initConsumer(); Committer committer = committerImpl; @@ -522,21 +501,15 @@ public void testCommitWhenCommittableIsEmpty() throws IOException { @Test public void testCommitShouldCommitOffsetsOnlyForPartitionsWeMadeProgressOn() throws IOException { - SinkTaskContext mockContext = mockContext(); - NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); - TopicPartition sourceTp0 = new TopicPartition(SOURCE_TOPIC, 0); - TopicPartition sourceTp1 = new TopicPartition(SOURCE_TOPIC, 1); - Set sourceTopicPartitions = ImmutableSet.of(sourceTp0, sourceTp1); - - when(mockContext.assignment()).thenReturn(sourceTopicPartitions); + WorkerSinkTaskContext context = workerSinkTaskContext(CONFIG, ImmutableList.of(SOURCE_TP0, SOURCE_TP1)); UUID commitId = UUID.randomUUID(); whenAdminListConsumerGroupOffsetsThenReturn( ImmutableMap.of( - CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + CONFIG.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); List dataFiles = ImmutableList.of(createDataFile()); List deleteFiles = ImmutableList.of(); @@ -544,12 +517,12 @@ public void testCommitShouldCommitOffsetsOnlyForPartitionsWeMadeProgressOn() thr CommittableSupplier committableSupplier = () -> new Committable( - ImmutableMap.of(sourceTp1, new Offset(100L, 200L)), + ImmutableMap.of(SOURCE_TP1, new Offset(100L, 200L)), ImmutableList.of( new WriterResult(TABLE_1_IDENTIFIER, dataFiles, deleteFiles, partitionStruct))); try (CommitterImpl committerImpl = - new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + new CommitterImpl(context, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { initConsumer(); Committer committer = committerImpl; @@ -580,15 +553,13 @@ public void testCommitShouldCommitOffsetsOnlyForPartitionsWeMadeProgressOn() thr producerId, commitId, ImmutableMap.of( - sourceTp0, Pair.of(null, null), - sourceTp1, Pair.of(100L, offsetDateTime(200L)))); + SOURCE_TP0, Pair.of(null, null), + SOURCE_TP1, Pair.of(100L, offsetDateTime(200L)))); - assertThat(producer.consumerGroupOffsetsHistory()).hasSize(2); + assertThat(producer.consumerGroupOffsetsHistory()).hasSize(1); Map expectedConsumerOffset = - ImmutableMap.of(sourceTp1, new OffsetAndMetadata(100L)); + ImmutableMap.of(SOURCE_TP1, new OffsetAndMetadata(100L)); assertThat(producer.consumerGroupOffsetsHistory().get(0)) - .isEqualTo(ImmutableMap.of(CONFIG.controlGroupId(), expectedConsumerOffset)); - assertThat(producer.consumerGroupOffsetsHistory().get(1)) .isEqualTo(ImmutableMap.of(CONFIG.connectGroupId(), expectedConsumerOffset)); } } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/KafkaUtilsTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/KafkaUtilsTest.java new file mode 100644 index 00000000..396fe98e --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/KafkaUtilsTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.connect.runtime.WorkerSinkTaskContext; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; + +class KafkaUtilsTest { + + @Test + public void testConsumerGroupMetadata() { + String connectGroupId = "connect-abc"; + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9192"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, connectGroupId); + KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + WorkerSinkTaskContext context = new WorkerSinkTaskContext(kafkaConsumer, null, null); + + ConsumerGroupMetadata consumerGroupMetadata = KafkaUtils.consumerGroupMetadata(context); + assertThat(consumerGroupMetadata).isEqualTo(kafkaConsumer.groupMetadata()); + } + + @Test + public void testConsumerGroupMetadataThrowsErrorWhenNotGivenAWorkerSinkTaskContext() { + SinkTaskContext context = mock(SinkTaskContext.class); + assertThatThrownBy(() -> KafkaUtils.consumerGroupMetadata(context)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(String.format("Cannot bind field consumer to instance of class %s", context.getClass().getName())); + } + +}