Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(connectors): adds procedures to manage connector offsets #10661

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions documentation/assemblies/configuring/assembly-config.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ include::../../modules/configuring/proc-manual-stop-pause-connector.adoc[levelof
include::../../modules/configuring/proc-manual-restart-connector.adoc[leveloffset=+2]
//Procedure to manually restart a Kafka connector task
include::../../modules/configuring/proc-manual-restart-connector-task.adoc[leveloffset=+2]
//procedure to list offsets
include::../../modules/configuring/proc-listing-connector-offsets.adoc[leveloffset=+2]
//procedure to alter offsets
include::../../modules/configuring/proc-altering-connector-offsets.adoc[leveloffset=+2]
//procedure to reset offsets
include::../../modules/configuring/proc-resetting-connector-offsets.adoc[leveloffset=+2]

//`KafkaMirrorMaker2` resource config
include::../../modules/configuring/con-config-mirrormaker2.adoc[leveloffset=+1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,35 @@ Specifically, ensure that the following properties have the same value across al
* `topic.filter.class`

For example, the value for `replication.policy.class` must be the same for the source, checkpoint, and heartbeat connectors.
Mismatched or missing settings cause issues with data replication or offset syncing, so it's essential to keep all relevant connectors configured with the same settings.
Mismatched or missing settings cause issues with data replication or offset syncing, so it's essential to keep all relevant connectors configured with the same settings.

== Listing the offsets of MirrorMaker 2 connectors

To list the offset positions of the internal MirrorMaker 2 connectors, use the same configuration that's used to manage Kafka Connect connectors.
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved

In this example, the `sourceConnector` configuration is updated to return the connector offset position.
The offset information is written to a specified config map.

.Example configuration for connector producers and consumers
[source,yaml,subs="+quotes,attributes"]
----
apiVersion: {KafkaMirrorMaker2ApiVersion}
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
version: {DefaultKafkaVersion}
# ...
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
sourceConnector:
listOffsets:
toConfigMap:
name: my-connector-offsets
# ...
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved
----

For more information on setting up the configuration, see xref:proc-listing-connector-offsets-{context}[].

NOTE: It is possible to use configuration to xref:proc-altering-connector-offsets-{context}[alter] or xref:proc-resetting-connector-offsets-{context}[reset] connector offsets, though this is rarely necessary.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Module included in the following assemblies:
//
// assembly-config.adoc

[id='proc-altering-connector-offsets-{context}']
= Altering connector offsets

[role="_abstract"]
To alter connector offsets using `KafkaConnector` resources, configure the resource to stop the connector and add `alterOffsets` configuration to specify the offset changes in a config map.
You can reuse the same config map used to xref:proc-listing-connector-offsets-{context}[list offsets].

After the connector is stopped and the configuration is in place, annotate the `KafkaConnector` resource to apply the offset alteration, then restart the connector.

Altering connector offsets can be useful, for example, to skip a _poison_ record or replay a record.

In this procedure, we alter the offset position for a source connector named `my-source-connector`.

.Prerequisites

* The Cluster Operator is running.

.Procedure

. Edit the `KafkaConnector` resource to stop the connector and include the `alterOffsets` configuration.
+
.Example configuration to stop a connector and alter offsets
[source,yaml,subs="+attributes"]
----
apiVersion: {KafkaConnectorApiVersion}
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
state: stopped # <1>
alterOffsets:
fromConfigMap: # <2>
name: my-connector-offsets # <3>
# ...
----
<1> Changes the state of the connector to `stopped`. The default state for the connector when this property is not set is `running`.
<2> The reference to the config map that provides the update.
<3> The name of the config map, which is named `my-connector-offsets` in this example.

. Edit the config map to make the alteration.
+
In this example, we're resetting the offset position for a source connector to 15000.
+
.Example source connector offset list configuration
[source,yaml,subs="+attributes"]
----
apiVersion: v1
kind: ConfigMap
metadata:
# ...
data:
offsets.json: |-
{
"offsets" : [ {
"partition" : {
"filename" : "/data/myfile.txt"
},
"offset" : {
"position" : 15000 # <1>
}
} ]
}
----
<1> The updated offset position in the source partition.
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved

. Run the command to update the offset position by annotating the `KafkaConnector` resource:
+
[source,shell]
----
kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=alter -n <namespace>
----
+
The annotation remains until either the update operation succeeds, or it is manually removed from the resource.

. Check the changes by using the procedure to xref:proc-listing-connector-offsets-{context}[list connector offsets].

. Restart the connector by changing the state to `running`.
+
.Example configuration to start a connector
[source,yaml,subs="+attributes"]
----
apiVersion: {KafkaConnectorApiVersion}
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
state: running
# ...
----
145 changes: 145 additions & 0 deletions documentation/modules/configuring/proc-listing-connector-offsets.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Module included in the following assemblies:
//
// assembly-config.adoc

[id='proc-listing-connector-offsets-{context}']
= Listing connector offsets

[role="_abstract"]
To track connector offsets using `KafkaConnector` resources, add the `listOffsets` configuration.
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved
The offsets, which keep track of the flow of data, are written to a config map specified in the configuration.
If the config map does not exist, Strimzi creates it.

After the configuration is in place, annotate the `KafkaConnector` resource to write the list to the config map.

Sink connectors use Kafka's standard consumer offset mechanism, while source connectors store offsets in a custom format within a Kafka topic.

* For sink connectors, the list shows Kafka topic partitions and the last committed offset for each partition.
* For source connectors, the list shows the source system’s partition and the last offset processed.

.Prerequisites

* The Cluster Operator is running.

.Procedure

. Edit the `KafkaConnector` resource for the connector to include the `listOffsets` configuration.
+
.Example configuration to list offsets
[source,yaml,subs="+attributes"]
----
apiVersion: {KafkaConnectorApiVersion}
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
listOffsets:
toConfigMap: # <1>
name: my-connector-offsets # <2>
# ...
----
<1> The reference to the config map where the list of offsets will be written to.
<2> The name of the config map, which is named `my-connector-offsets` in this example.

. Run the command to write the list to the config map by annotating the `KafkaConnector` resource:
+
[source,shell]
----
kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n <namespace>
----
+
The annotation remains until either the list operation succeeds, or it is manually removed from the resource.

. After the `KafkaConnector` resource is updated, use the following command to check if the config map with the offsets was created:
+
[source,shell]
----
kubectl get configmap my-connector-offsets -n <namespace>
----

. Inspect the contents of the config map to verify the offsets are being listed:
+
[source,shell]
----
kubectl describe configmap my-connector-offsets -n <namespace>
----
+
Strimzi puts the offsets into the `offsets.json` property.
It doesn't overwrite any other properties when updating an existing config map.
+
--
.Example source connector offset list
[source,yaml,subs="+attributes"]
----
apiVersion: v1
kind: ConfigMap
metadata:
# ...
ownerReferences: # <1>
- apiVersion: {KafkaConnectApiVersion}
blockOwnerDeletion: false
controller: false
kind: KafkaConnector
name: my-source-connector
uid: 637e3be7-bd96-43ab-abde-c55b4c4550e0
resourceVersion: "66951"
uid: 641d60a9-36eb-4f29-9895-8f2c1eb9638e
data:
offsets.json: |-
{
"offsets" : [ {
"partition" : {
"filename" : "/data/myfile.txt" # <2>
},
"offset" : {
"position" : 15295 # <3>
}
} ]
}
----
<1> The owner reference pointing to the `KafkaConnector` resource for the source connector.
To provide a custom owner reference, create the config map in advance and set the owner reference.
<2> The source partition, represented by the filename `/data/myfile.txt` in this example for a file-based connector.
<3> The last processed offset position in the source partition.
--
+
--
.Example sink connector offset list
[source,yaml,subs="+attributes"]
----
apiVersion: v1
kind: ConfigMap
metadata:
# ...
ownerReferences: # <1>
- apiVersion: {KafkaConnectApiVersion}
blockOwnerDeletion: false
controller: false
kind: KafkaConnector
name: my-sink-connector
uid: 84a29d7f-77e6-43ac-bfbb-719f9b9a4b3b
resourceVersion: "79241"
uid: 721e30bc-23df-41a2-9b48-fb2b7d9b042c
data:
offsets.json: |-
{
"offsets": [
{
"partition": {
"kafka_topic": "my-topic", # <2>
"kafka_partition": 2 # <3>
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved
},
"offset": {
"kafka_offset": 4 # <4>
}
}
]
}
----
<1> The owner reference pointing to the `KafkaConnector` resource for the sink connector.
<2> The Kafka topic that the sink connector is consuming from.
<3> The partition of the Kafka topic.
<4> The last committed Kafka offset for this topic and partition.
--
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Module included in the following assemblies:
//
// assembly-deploy-kafka-connect-with-plugins.adoc
// assembly-config.adoc

[id='proc-manual-restart-connector-task-{context}']
= Manually restarting Kafka Connect connector tasks
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Module included in the following assemblies:
//
// assembly-deploy-kafka-connect-with-plugins.adoc
// assembly-config.adoc

[id='proc-manual-restart-connector-{context}']
= Manually restarting Kafka Connect connectors
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Module included in the following assemblies:
//
// assembly-deploy-kafka-connect-with-plugins.adoc
// assembly-config.adoc

[id='proc-manual-stop-pause-connector-{context}']
= Manually stopping or pausing Kafka Connect connectors
Expand Down
Loading
Loading