Skip to content

Latest commit

 

History

History
186 lines (154 loc) · 7.37 KB

File metadata and controls

186 lines (154 loc) · 7.37 KB

Twelve Days of Single Message Transforms - Day 4 - RegExRouter

🎥 Recording

maxresdefault

Setup

  1. Clone the repository

    git clone https://github.com/confluentinc/demo-scene.git
    cd demo-scene/kafka-connect-single-message-transforms
  2. Bring the stack up

    docker-compose up -d
  3. Wait for Kafka Connect to start up

bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'

Create the data generator

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-00/config \
    -d '{
        "connector.class"                             : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day4-transactions.with"                : "#{Internet.uuid}",
        "genv.day4-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day4-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day4-transactions.item.with"            : "#{Beer.name}",
        "topic.day4-transactions.throttle.ms"         : 500
    }'

Stream the data to MySQL

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
    -d '{
          "connector.class"               : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                : "jdbc:mysql://mysql:3306/demo",
          "connection.user"               : "mysqluser",
          "connection.password"           : "mysqlpw",
          "topics"                        : "day4-transactions",
          "tasks.max"                     : "4",
          "auto.create"                   : "true",
          "auto.evolve"                   : "true"
        }'

This works; you get a table created in MySQL:

mysql> show tables;
+-------------------+
| Tables_in_demo    |
+-------------------+
| day4-transactions |
+-------------------+
1 row in set (0.00 sec)

What data’s in the table?

mysql> select * from day4-transactions;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '-transactions' at line 1

Turns out a hyphen in the table name does not make your life easy in MySQL. You can quote is with a backtick, but it is not ideal

mysql> select * from `day4-transactions` LIMIT 1;
+-----------+-------+---------------------------+
| card_type | cost  | item                      |
+-----------+-------+---------------------------+
| switch    | 98.77 | Westmalle Trappist Tripel |
+-----------+-------+---------------------------+
1 row in set (0.00 sec)

By default the JDBC Sink connector takes the topic name as the name of the table to create. Similar patterns are seen with other connectors, such as the Elasticsearch sink connector which uses the topic name as the name of the target index to populate.

Using the RegExRouter override the topic name can be modified either as data is streamed into Kafka from a source connector, or as it leaves Kafka in a sink connector.

Let’s modify the above connector to route data to a table called transactions instead, and drop the day4- prefix.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
    -d '{
          "connector.class"                       : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                        : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                       : "mysqluser",
          "connection.password"                   : "mysqlpw",
          "topics"                                : "day4-transactions",
          "tasks.max"                             : "4",
          "auto.create"                           : "true",
          "auto.evolve"                           : "true",
          "transforms"                            : "dropTopicPrefix",
          "transforms.dropTopicPrefix.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.dropTopicPrefix.regex"      : "day4-(.*)",
          "transforms.dropTopicPrefix.replacement": "$1"
        }'

Since we’ve PUT the above configuration it updates the existing connector, and now we have a table in MySQL without the day4- prefix that’s much easier to work with:

mysql> show tables;
+-------------------+
| Tables_in_demo    |
+-------------------+
| day4-transactions |
| transactions      |
+-------------------+
2 rows in set (0.00 sec)

mysql> select * from transactions limit 1;
+-----------+-------+-----------------+
| card_type | cost  | item            |
+-----------+-------+-----------------+
| dankort   | 27.12 | Sapporo Premium |
+-----------+-------+-----------------+
1 row in set (0.00 sec)

You could also take it a step further with the regex and if the day4 was important handle it with a second capture group - see https://regexr.com/5i7eb for details.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
    -d '{
          "connector.class"                       : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                        : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                       : "mysqluser",
          "connection.password"                   : "mysqlpw",
          "topics"                                : "day4-transactions",
          "tasks.max"                             : "4",
          "auto.create"                           : "true",
          "auto.evolve"                           : "true",
          "transforms"                            : "dropTopicPrefix",
          "transforms.dropTopicPrefix.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.dropTopicPrefix.regex"      : "day(\\d+)-(.*)",
          "transforms.dropTopicPrefix.replacement": "$2_day$1"
        }'
Note
You need to escape the \ when passing it through curl, so \ becomes \\.