Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fence writer zombies (breaking change) #255

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 108 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,16 @@ By default the connector will attempt to use Kafka client config from the worker
the control topic. If that config cannot be read for some reason, Kafka client settings
can be set explicitly using `iceberg.kafka.*` properties.

### Source topic offsets
### Consumer offsets

Source topic offsets are stored in two different consumer groups. The first is the sink-managed consumer
group defined by the `iceberg.control.group-id` property. The second is the Kafka Connect managed
consumer group which is named `connect-<connector name>` by default. The sink-managed consumer
group is used by the sink to achieve exactly-once processing. The Kafka Connect consumer group is
only used as a fallback if the sink-managed consumer group is missing. To reset the offsets,
both consumer groups need to be reset.
Source topic offsets are stored in Kafka Connect consumer group (which is named `connect-<connector name>` by default).
To reset the source topic offsets of the connector, the Kafka Connect consumer group needs to be reset.

Control topic offsets are stored in a separate, sink-managed consumer group which we'll refer to as the Coordinator
consumer group. By default, this will be something like `cg-control-<connector-name>-coord` (unless you've configured
your connector with an explicit `iceberg.control.group-id` in which case it will be something like
`<iceberg.control.group-id>-coord`). To reset control topic offsets of the connector, the Coordinator consumer group
needs to be reset.

### Message format

Expand Down Expand Up @@ -170,6 +172,105 @@ from the classpath are loaded. Next, if `iceberg.hadoop-conf-dir` is specified,
are loaded from that location. Finally, any `iceberg.hadoop.*` properties from the sink config are
applied. When merging these, the order of precedence is sink config > config dir > classpath.

# Upgrade
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice docs. Thanks for looking out for the users.


## Upgrading from 0.6.X to 0.7.0

Prior to version 0.7.0, the consumer offsets for the source topics were tracked by both the connect-group-id and the
control-group-id. It's important to note that the consumer offsets stored in the control-group-id were always considered
the "source-of-truth" and could be ahead of those tracked by the connect-group-id in exceptional, temporary situations.

Starting from version 0.7.0, consumer offsets for the source topic are now tracked by the connect-group-id _exclusively_
i.e. consumer offsets for the source topics will no longer be tracked by the control-group-id. This change is necessary
to eliminate duplicates from zombie tasks. This means that the new "source-of-truth" for source topic consumer offsets
will be the connect-group-id.

Unfortunately, this is a breaking change and the upgrade process itself introduces a risk of duplicate records being written to Iceberg.
If you don't care about a small number of duplicates, you can just upgrade to version 0.7.0 just like any other patch release.
However, if you do want to avoid duplicates during the upgrade process, please read the following general instructions for upgrading connectors safely.
Please note that the following instructions are written assuming you are running Kafka Connect version 3.6.0.
You may need to adjust the approach depending on your version of Kafka Connect and your deployment process.

### Step 1
Stop all existing Iceberg Sink connectors running on the Kafka Connect cluster.
We need to stop the connectors because we will potentially be resetting consumer offsets for these connectors later and it is not possible to do this without stopping the connectors.

You can stop a connector via the Kafka Connect REST API e.g.
```bash
curl -X PUT http://localhost:8083/connectors/<connector-name>/stop
```

### Step 2
Fetch the current consumer offsets of the connect-group-id and the control-group-id.
The connect-group-id will be something like `connect-<connector-name>`.
By default, the control-group-id will be something like `cg-control-<connector-name>` unless you've configured your connector with an explicit `iceberg.control.group-id`.

Be careful not to confuse the control-group-id with the coordinator-consumer-group-id.
The coordinator-consumer-group-id looks very similar to the control-group-id but has a `-coord` suffix e.g. `cg-control-<connector-name>-coord`.
We are only interested in the **connect-group-id** and **control-group-id** for the purposes of this migration.
You should _not_ interact with the **coordinator-consumer-group-id** for the purposes of this migration.

You can retrieve the current consumer offsets for a given consumer-group-id using the `kafka-consumer-groups.sh` tool e.g.
```bash
./kafka-consumer-groups.sh \
--bootstrap-server <bootstrap-server-url> \
--describe \
--group <consumer-group-id>

# Consumer group 'connect-my-connector' has no active members.
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
# connect-my-connector my-topic-name 0 900 1000 100 - - -

# Consumer group 'cg-control-my-connector' has no active members.
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
# cg-control-my-connector my-topic-name 0 1000 1000 0 - - -
```

### Step 3

Move the consumer offsets for the connect-group-id forward so that they match those of the control-group-id, if necessary.
If the consumer offsets for the connect-group-id and the control-group-id are already the same, no action is needed for
this step.
If however you see that the connect-group-id consumer offsets are behind those of the control-group-id, you will need to
move the consumer offsets of the connect-group-id forward to match those of the control-group-id.
Note: It is impossible for the consumer offsets of the connect-group-id to be ahead those of the control-group-id for
connector version < 0.7.0.

You can reset consumer offsets for the connect-group-id using the Kafka Connect REST API e.g.
```bash
curl -X PATCH \
--header "Content-Type: application/json" \
--data '{ "offsets": [ { "partition": { "kafka_topic": "my_topic_name", "kafka_partition": 0 }, "offset": { "kafka_offset": 1000 } } ] }' \
localhost:8083/connectors/<connector-name>/offsets
# {"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."}
```

### Step 4

If you have successfully completed the above steps for all Iceberg Sink connectors running on the Kafka Connect cluster,
it is now safe to update the Iceberg Sink Connector version on all workers in the Kafka Connect cluster to version 0.7.0.

You can check the installed connector version using the Kafka Connect REST API e.g.
```bash
curl localhost:8083/connector-plugins
# [{"class": "io.tabular.iceberg.connect.IcebergSinkConnector", "type": "sink", "version": "1.5.2-kc-0.7.0"}]
```

### Step 5

Once the Iceberg Sink Connector version on the cluster has been updated to 0.7.0, it is safe to resume the connectors
that we stopped in step 1.

You can resume a connector via the Kafka Connect REST API e.g.
```bash
curl -X PUT http://localhost:8083/connectors/<connector-name>/resume
```

At this point, the upgrade process is complete.

Note: The now unused control-group-id will eventually be removed from Kafka automatically (by default after 7 days) so
no special action is necessary there.

# Examples

## Initial setup
Expand Down
18 changes: 9 additions & 9 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,10 @@ There are two sets of offsets to manage, the offsets for the source topic(s) and

#### Source topic

The offsets for the source topic are managed by the workers. A worker sends its data files events and also commits the source offsets to a sink-managed consumer group within a Kafka transaction. All control topic consumers have isolation set to read only committed events. This ensures that files sent to the coordinator correspond to the source topic offsets that were stored.
The offsets for the source topic are managed by the workers. A worker sends its data files events and also commits the source offsets to the Kafka Connect consumer group within a Kafka transaction. All control topic consumers have isolation set to read only committed events. This ensures that files sent to the coordinator correspond to the source topic offsets that were stored.

The Kafka Connect managed offsets are kept in sync during flushes. The reason behind having a second consumer group, rather than only using the Kafka Connect consumer group, is to ensure that the offsets are committed in a transaction with the sending of the data files events. The Kafka Connect consumer group cannot be directly updated as it has active consumers.

When a task starts up, the consumer offsets are initialized to those in the sink-managed consumer group rather than the Kafka Connect consumer group. The offsets in the Kafka Connect consumer group are only be used if offsets in the sink-managed group are missing. The offsets in the sink-managed group are the source of truth.

#### Control topic

On coordinator startup, the control topic offsets are restored from the consumer group. Any data files events added after the offsets are processed during startup. If the consumer group had not yet been initialized, then the coordinator’s consumer starts reading from the latest.
Expand All @@ -103,14 +101,16 @@ An upsert mode is also supported for data that is not in change data capture for

The connector has exactly-once semantics. Workers ensure this by sending the data files events and committing offsets for the source topic within a Kafka transaction. The coordinator ensures this by setting the control topic consumer to only read committed events, and also by saving the offsets for the control topic as part of the Iceberg commit data.

* The offsets for the source topic in the sink-managed consumer group correspond to the data files events successfully written to the control topic.
* The offsets for the source topic in the Kafka Connect consumer group correspond to the data files events successfully written to the control topic.
* The offsets for the control topic correspond to the Iceberg snapshot, as the offsets are stored in the snapshot metadata.

### Zombie fencing

If a task encounters a very heavy GC cycle during a transaction that causes a pause longer than the consumer session timeout (45 seconds by default), a partition might be assigned to a different task even though the “zombie” is still alive (but in a degraded state).

In this circumstance, the new worker starts reading from the current committed offsets. When the zombie starts processing again, it complete the commit. This could lead to duplicates in this extreme case. Zombie fencing will be targeted for a future release.
If the task running the Coordinator process encounters a heavy GC cycle that causes a pause longer than the consumer session timeout (45 seconds by default), it may become a zombie.
In this scenario, Kafka Connect will replace that task with a new one even though the “zombie” is still alive (but in a degraded state).
A new Coordinator process will begin processing datafiles from the control topic.
When the zombie starts processing again later, it may commit a datafile that has already been committed by the new Coordinator process, leading to duplicates in this extreme case.
Coordinator zombie fencing will be targeted for a future release.

## Error Handling

Expand All @@ -120,7 +120,7 @@ All errors in the connector itself are non-retryable. This includes errors durin

### Worker fails during processing

If a failure occurs on a worker while processing messages or writing files, an exception is thrown and the task restarts from the last Kafka offsets committed to the sink-managed consumer group. Any data that had been written since the last commit is left in place, uncommitted. New data files are written from the offsets, and only these will be committed. Table maintenance should be performed regularly to clean up the orphaned files.
If a failure occurs on a worker while processing messages or writing files, an exception is thrown and the task restarts from the last Kafka offsets committed to the Kafka Connect managed consumer group. Any data that had been written since the last commit is left in place, uncommitted. New data files are written from the offsets, and only these will be committed. Table maintenance should be performed regularly to clean up the orphaned files.

### Worker fails to receive begin commit event

Expand All @@ -144,10 +144,10 @@ If the table is rolled back to an older snapshot, then that also rolls back to o

* Optionally commit as unpartitioned to avoid many small files
* More seamless snapshot rollback behavior
* Zombie fencing during offset commit
* Pluggable commit coordinator
* Allow a backend to handle instead of requiring a control topic
* Distribute commits across workers
* Coordinator zombie fencing

## Alternatives Considered

Expand Down
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafk
kafka-connect-api = { module = "org.apache.kafka:connect-api", version.ref = "kafka-ver" }
kafka-connect-json = { module = "org.apache.kafka:connect-json", version.ref = "kafka-ver" }
kafka-connect-transforms = { module = "org.apache.kafka:connect-transforms", version.ref = "kafka-ver" }
kafka-connect-runtime = { module = "org.apache.kafka:connect-runtime", version.ref = "kafka-ver" }
slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j-ver" }

# test dependencies
Expand All @@ -64,7 +65,7 @@ palantir-gradle = "com.palantir.baseline:gradle-baseline-java:4.42.0"
iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet", "iceberg-kafka-connect-events"]
iceberg-ext = ["iceberg-aws", "iceberg-aws-bundle", "iceberg-azure", "iceberg-azure-bundle", "iceberg-gcp","iceberg-gcp-bundle", "iceberg-nessie"]
jackson = ["jackson-core", "jackson-databind"]
kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms"]
kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms", "kafka-connect-runtime"]


[plugins]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package io.tabular.iceberg.connect.channel;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.Offset;
Expand All @@ -29,7 +28,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.connect.events.DataComplete;
import org.apache.iceberg.connect.events.DataWritten;
Expand All @@ -42,11 +40,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,6 +52,7 @@ public class CommitterImpl extends Channel implements Committer, AutoCloseable {
private final SinkTaskContext context;
private final IcebergSinkConfig config;
private final Optional<CoordinatorThread> maybeCoordinatorThread;
private final ConsumerGroupMetadata consumerGroupMetadata;

public CommitterImpl(SinkTaskContext context, IcebergSinkConfig config, Catalog catalog) {
this(context, config, catalog, new KafkaClientFactory(config.kafkaProps()));
Expand Down Expand Up @@ -92,11 +88,14 @@ private CommitterImpl(

this.maybeCoordinatorThread = coordinatorThreadFactory.create(context, config);

// The source-of-truth for source-topic offsets is the control-group-id
Map<TopicPartition, Long> stableConsumerOffsets =
fetchStableConsumerOffsets(config.controlGroupId());
// Rewind kafka connect consumer to avoid duplicates
context.offset(stableConsumerOffsets);
ConsumerGroupMetadata groupMetadata;
try {
groupMetadata = KafkaUtils.consumerGroupMetadata(context);
} catch (IllegalArgumentException e) {
LOG.warn("Could not extract ConsumerGroupMetadata from consumer inside Kafka Connect, falling back to simple ConsumerGroupMetadata which can result in duplicates from zombie tasks");
groupMetadata = new ConsumerGroupMetadata(config.connectGroupId());
}
Comment on lines +92 to +97
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We fetch the consumer-group-metadata via reflection from inside the Kafka Connect framework. This is technically unsafe as we are relying on private, implementation details. Hence I also implemented falling back to simple ConsumerGroupMetadata (which is basically what we were doing previously) and does not do zombie fencing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just fail?

this.consumerGroupMetadata = groupMetadata;

consumeAvailable(
// initial poll with longer duration so the consumer will initialize...
Expand All @@ -108,20 +107,6 @@ private CommitterImpl(
() -> new Committable(ImmutableMap.of(), ImmutableList.of())));
}

private Map<TopicPartition, Long> fetchStableConsumerOffsets(String groupId) {
try {
ListConsumerGroupOffsetsResult response =
admin()
.listConsumerGroupOffsets(
groupId, new ListConsumerGroupOffsetsOptions().requireStable(true));
return response.partitionsToOffsetAndMetadata().get().entrySet().stream()
.filter(entry -> context.assignment().contains(entry.getKey()))
.collect(toMap(Map.Entry::getKey, entry -> entry.getValue().offset()));
} catch (InterruptedException | ExecutionException e) {
throw new ConnectException(e);
}
}

private void throwExceptionIfCoordinatorIsTerminated() {
if (maybeCoordinatorThread.map(CoordinatorThread::isTerminated).orElse(false)) {
throw new IllegalStateException("Coordinator unexpectedly terminated");
Expand Down Expand Up @@ -183,8 +168,7 @@ private void sendCommitResponse(UUID commitId, CommittableSupplier committableSu
events.add(commitReady);

Map<TopicPartition, Offset> offsets = committable.offsetsByTopicPartition();
send(events, offsets, new ConsumerGroupMetadata(config.controlGroupId()));
send(ImmutableList.of(), offsets, new ConsumerGroupMetadata(config.connectGroupId()));
send(events, offsets, consumerGroupMetadata);
Comment on lines -186 to +171
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice how we commit offsets against only one consumer group now; the connect-<connector-name> consumer group.

We no longer commit source topic offsets to the config.controlGroupId and TBH I don't understand why we ever did since we could have always taken this approach (irrespective of zombie fencing) cc @bryanck if you can shed any light here as to why this was necessary in the past or if it was just an oversight.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
package io.tabular.iceberg.connect.channel;

import java.util.concurrent.ExecutionException;

import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerSinkTaskContext;
import org.apache.kafka.connect.sink.SinkTaskContext;

public class KafkaUtils {

Expand All @@ -40,5 +46,18 @@ public static ConsumerGroupDescription consumerGroupDescription(
}
}

private static final String WorkerSinkTaskContextClassName =
WorkerSinkTaskContext.class.getName();

@SuppressWarnings("unchecked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth a comment around using reflection to get at some very specific implementation detail stuff here but otherwise 👍

public static ConsumerGroupMetadata consumerGroupMetadata(SinkTaskContext sinkTaskContext) {
return ((Consumer<byte[], byte[]>) DynFields
.builder()
.hiddenImpl(WorkerSinkTaskContextClassName, "consumer")
.build(sinkTaskContext)
.get())
.groupMetadata();
}

private KafkaUtils() {}
}
Loading