-
Clone the repository
git clone https://github.com/confluentinc/demo-scene.git cd demo-scene/kafka-connect-single-message-transforms
-
Bring the stack up
docker-compose up -d
-
Wait for Kafka Connect to start up
bash -c ' \ echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n" while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" sleep 5 done echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n" '
You can get nested data on a Kafka topic from any number of places. Here I’m going to use ksqlDB to create some and write it as Avro. We’re using Avro because it’s going to get written to a database, and databases need schemas - so plain JSON wouldn’t cut it here. We could use Protobuf or JSON Schema too if we wanted.
Launch ksqlDB:
docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
Create a stream:
CREATE STREAM CUSTOMERS (ID BIGINT KEY, FULL_NAME VARCHAR, ADDRESS STRUCT<STREET VARCHAR, CITY VARCHAR, COUNTY_OR_STATE VARCHAR, ZIP_OR_POSTCODE VARCHAR>)
WITH (KAFKA_TOPIC='day3-customers',
VALUE_FORMAT='AVRO',
REPLICAS=1,
PARTITIONS=4);
Populate the topic with some nested data
INSERT INTO CUSTOMERS VALUES(1,'Opossum, american virginia',STRUCT(STREET:='20 Acker Terrace', CITY:='Lynchburg', COUNTY_OR_STATE:='Virginia', ZIP_OR_POSTCODE:='24515'));
INSERT INTO CUSTOMERS VALUES(2,'Skua, long-tailed',STRUCT(STREET:='7 Laurel Terrace', CITY:='Manassas', COUNTY_OR_STATE:='Virginia', ZIP_OR_POSTCODE:='22111'));
INSERT INTO CUSTOMERS VALUES(3,'Red deer',STRUCT(STREET:='53 Basil Terrace', CITY:='Lexington', COUNTY_OR_STATE:='Kentucky', ZIP_OR_POSTCODE:='40515'));
INSERT INTO CUSTOMERS VALUES(4,'Vervet monkey',STRUCT(STREET:='7615 Brown Park', CITY:='Chicago', COUNTY_OR_STATE:='Illinois', ZIP_OR_POSTCODE:='60681'));
INSERT INTO CUSTOMERS VALUES(5,'White spoonbill',STRUCT(STREET:='7 Fulton Parkway', CITY:='Asheville', COUNTY_OR_STATE:='North Carolina', ZIP_OR_POSTCODE:='28805'));
INSERT INTO CUSTOMERS VALUES(6,'Laughing kookaburra',STRUCT(STREET:='84 Monument Alley', CITY:='San Jose', COUNTY_OR_STATE:='California', ZIP_OR_POSTCODE:='95113'));
INSERT INTO CUSTOMERS VALUES(7,'Fox, bat-eared',STRUCT(STREET:='2946 Daystar Drive', CITY:='Jamaica', COUNTY_OR_STATE:='New York', ZIP_OR_POSTCODE:='11431'));
INSERT INTO CUSTOMERS VALUES(8,'Sun gazer',STRUCT(STREET:='61 Lakewood Gardens Parkway', CITY:='Pensacola', COUNTY_OR_STATE:='Florida', ZIP_OR_POSTCODE:='32590'));
INSERT INTO CUSTOMERS VALUES(9,'American bighorn sheep',STRUCT(STREET:='326 Sauthoff Crossing', CITY:='San Antonio', COUNTY_OR_STATE:='Texas', ZIP_OR_POSTCODE:='78296'));
INSERT INTO CUSTOMERS VALUES(10,'Greater rhea',STRUCT(STREET:='97 Morning Way', CITY:='Charleston', COUNTY_OR_STATE:='West Virginia', ZIP_OR_POSTCODE:='25331'));
See also 🎥 Kafka Connect in Action : JDBC Sink (👾 demo code
) and 🎥 ksqlDB & Kafka Connect JDBC Sink in action (👾 demo code
)
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day3-customersa",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"
}'
This fails! You’ll see the error
…(STRUCT) type doesn't have a mapping to the SQL database column type
Many databases don’t have support for nested fields, and whilst some have added it in recent times the JDBC Sink connector doesn’t support it.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day3-customers",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "flatten",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter" : "_"
}'
This will work, and you can now see the data in MySQL:
mysql> describe `day3-customers`;
+-------------------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------------+------+------+-----+---------+-------+
| FULL_NAME | text | YES | | NULL | |
| ADDRESS_STREET | text | YES | | NULL | |
| ADDRESS_CITY | text | YES | | NULL | |
| ADDRESS_COUNTY_OR_STATE | text | YES | | NULL | |
| ADDRESS_ZIP_OR_POSTCODE | text | YES | | NULL | |
+-------------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)
mysql> select * from `day3-customers`;
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
| FULL_NAME | ADDRESS_STREET | ADDRESS_CITY | ADDRESS_COUNTY_OR_STATE | ADDRESS_ZIP_OR_POSTCODE |
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
| Opossum, american virginia | 20 Acker Terrace | Lynchburg | Virginia | 24515 |
| Red deer | 53 Basil Terrace | Lexington | Kentucky | 40515 |
| Laughing kookaburra | 84 Monument Alley | San Jose | California | 95113 |
| American bighorn sheep | 326 Sauthoff Crossing | San Antonio | Texas | 78296 |
| Skua, long-tailed | 7 Laurel Terrace | Manassas | Virginia | 22111 |
| Fox, bat-eared | 2946 Daystar Drive | Jamaica | New York | 11431 |
| Greater rhea | 97 Morning Way | Charleston | West Virginia | 25331 |
| Vervet monkey | 7615 Brown Park | Chicago | Illinois | 60681 |
| White spoonbill | 7 Fulton Parkway | Asheville | North Carolina | 28805 |
| Sun gazer | 61 Lakewood Gardens Parkway | Pensacola | Florida | 32590 |
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
10 rows in set (0.00 sec)
Here’s how to add the key into the target table:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day3-customers2",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "flatten",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter" : "_",
"pk.mode" : "record_key",
"pk.fields" : "id",
"key.converter" : "org.apache.kafka.connect.converters.LongConverter"
}'