-
Clone the repository
git clone https://github.com/confluentinc/demo-scene.git cd demo-scene/kafka-connect-single-message-transforms
-
Bring the stack up
docker-compose up -d
-
Wait for Kafka Connect to start up
bash -c ' \ echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n" while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" sleep 5 done echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n" '
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day7-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day7-transactions.with" : "#{Internet.uuid}",
"genv.day7-transactions.cost.with" : "#{Commerce.price}",
"genv.day7-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day7-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day7-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day7-transactions.item.with" : "#{Beer.name}",
"topic.day7-transactions.throttle.ms" : 1000
}'
Just like the RegExRouter
, the TimeStampRouter
can be used to modify the topic name of messages as they pass through Kafka Connect. Since the topic name is usually the basis for the naming of the object to which messages are written in a sink connector, this is a great way to achieve time-based partitioning of those objects if required. For example, instead of streaming messages from Kafka to an Elasticsearch index called cars
, they can be routed to monthly indices e.g. cars_2020-10
, cars_2020-11
, cars_2020-12
, etc.
Note that the TimeStampRouter
uses the timestamp of the Kafka message itself. The message timestamp can be set by the producer API explicitly, or allowed to default to the setting on the broker (log.message.timestamp.type
) or topic (message.timestamp.type
) which by default is the time on the broker at which the message is created (CreateTime
). Message timestamps were added in Apache Kafka 0.10 in KIP-32.
The TimeStampRouter
takes two arguments; the format of the final topic name to generate, and the format of the timestamp to put in the topic name (based on SimpleDateFormat
).
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day7-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "addTimestampToTopic",
"transforms.addTimestampToTopic.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addTimestampToTopic.topic.format" : "${topic}_${timestamp}",
"transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
}'
The data is read from the source topic (day7-transactions
) and passes through the transformation which takes the message timestamp and appends it to the topic, resulting in a table in the target database:
mysql> show tables;
+-------------------------------+
| Tables_in_demo |
+-------------------------------+
| day7-transactions_2020-12-16 |
+-------------------------------+
See also 🎥 Kafka Connect in Action : JDBC Sink (👾 demo code
) and 🎥 ksqlDB & Kafka Connect JDBC Sink in action (👾 demo code
)
In the payload of the data itself you may well have a date field that denotes the business value of the date by which you want to partition the data. In the data that we streamed to the database above, for example, you can see txn_date
:
mysql> SELECT txn_date, item, cost FROM `day7-transactions_2020-12-11` LIMIT 5;
+------------------------------+------------------------------+-------+
| txn_date | item | cost |
+------------------------------+------------------------------+-------+
| Thu Dec 03 02:37:03 GMT 2020 | Hop Rod Rye | 34.52 |
| Sat Dec 05 18:53:49 GMT 2020 | Duvel | 97.81 |
| Fri Dec 04 22:45:54 GMT 2020 | Trois Pistoles | 41.45 |
| Fri Dec 04 16:37:25 GMT 2020 | Ten FIDY | 63.56 |
| Fri Dec 04 17:16:59 GMT 2020 | Stone Imperial Russian Stout | 94.66 |
+------------------------------+------------------------------+-------+
5 rows in set (0.00 sec)
This is the difference between 'system date' (when the data was loaded into Kafka), and 'business date' (when the actual event took place).
There is a Single Message Transform called MessageTimestampRouter
which is part of Confluent Platform and can be used to route data based on a time field in the message value itself.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day7-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "addTimestampToTopicFromField",
"transforms.addTimestampToTopicFromField.type" : "io.confluent.connect.transforms.MessageTimestampRouter",
"transforms.addTimestampToTopicFromField.message.timestamp.keys" : "txn_date",
"transforms.addTimestampToTopicFromField.message.timestamp.format": "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.addTimestampToTopicFromField.topic.format" : "${topic}_${timestamp}",
"transforms.addTimestampToTopicFromField.topic.timestamp.format" : "YYYY-MM-dd"
}'
Currently fails…
org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [appending message's timestamp field to topic], found: org.apache.kafka.connect.data.Struct
at io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30)
at io.confluent.connect.transforms.MessageTimestampRouter.apply(MessageTimestampRouter.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
... 14 more
TRACE [sink-jdbc-mysql-day7-00|task-2] Applying transformation io.confluent.connect.transforms.MessageTimestampRouter to SinkRecord{kafkaOffset=2300, timestampType=CreateTime} ConnectRecord{topic='day7-transactions', kafkaPartition=0, key=013e350e-ac03-44cd-bc2b-7b348ec4df6b, keySchema=Schema{STRING}, value=Struct{txn_date=Thu Dec 03 02:25:24 GMT 2020,cost=73.58,item=Delirium Noctorum,card_type=mastercard,customer_remarks=He laid out Biff in one punch. I didn't know he had it in him. He's never stood up to Biff in his life!}, valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1607681956641, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)
The cause of this is that the Single Message Transform currently expects to handle raw JSON formatted records - not Avro/Protobuf/JSON Schema.