Skip to content

Files

Latest commit

93eff19 · Dec 18, 2020

History

History
262 lines (225 loc) · 12 KB
·

File metadata and controls

262 lines (225 loc) · 12 KB
·

Twelve Days of Single Message Transforms - Day 10 - ReplaceField

The ReplaceField Single Message Transform has three modes of operation on fields of data passing through Kafka Connect, either in a Source connector or Sink connector.

  • Include only the fields specified in the list (include)

  • Include all fields except the ones specified (exclude)

  • Rename field(s) (renames)

(KIP-629 has started to be implemented in Apache Kafka 2.7. If you are using an earlier version then you will have to use blacklist and whitelist in place of exclude and include respectively)

🎥 Recording

maxresdefault

Setup

  1. Clone the repository

    git clone https://github.com/confluentinc/demo-scene.git
    cd demo-scene/kafka-connect-single-message-transforms
  2. Bring the stack up

    docker-compose up -d
  3. Wait for Kafka Connect to start up

    bash -c ' \
    echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
    while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
      echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
      sleep 5
    done
    echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
    '

Generate test data

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day10-00/config \
    -d '{
        "connector.class"                              : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day10-transactions.with"                : "#{Internet.uuid}",
        "genv.day10-transactions.cost.with"            : "#{Commerce.price}",
        "genv.day10-transactions.units.with"           : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day10-transactions.txn_date.with"        : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day10-transactions.cc_num.with"          : "#{Business.creditCardNumber}",
        "genv.day10-transactions.cc_exp.with"          : "#{Business.creditCardExpiry}",
        "genv.day10-transactions.card_type.with"       : "#{Business.creditCardType}",
        "genv.day10-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day10-transactions.item.with"            : "#{Beer.name}",
        "topic.day10-transactions.throttle.ms"         : 1000
    }'

Set up test data sink

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-00/config \
    -d '{
          "connector.class"                       : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                        : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                       : "mysqluser",
          "connection.password"                   : "mysqlpw",
          "topics"                                : "day10-transactions",
          "tasks.max"                             : "4",
          "auto.create"                           : "true",
          "auto.evolve"                           : "true",
          "transforms"                            : "changeTableName",
          "transforms.changeTableName.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTableName.regex"      : ".*",
          "transforms.changeTableName.replacement": "production_data"
        }'

Dropping fields in a sink connector

Imagine we have a source topic with multiple fields in:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day10-transactions -C -c1 -o-1 -u -q -J |  jq  '.payload'
{
  "Gen0": {
    "cost"            : { "string": "12.85" },
    "units"           : { "string": "2" },
    "card_type"       : { "string": "maestro" },
    "item"            : { "string": "Hercules Double IPA" },
    "customer_remarks": { "string": "Perfect! My experiment worked! They're all exactly 25 minutes slow!" },
    "cc_num"          : { "string": "1228-1221-1221-1431" },
    "cc_exp"          : { "string": "2013-9-12" },
    "txn_date"        : { "string": "Wed Dec 16 07:59:52 GMT 2020" }
  }
}

This data is in Kafka, but the system to which we’re going to stream it with a sink connector mustn’t hold sensitive information, such as credit card data. One option is to Mask it, but this retains the fields in the payload which is wasteful if we simply don’t want them in the target system. We can use ReplaceField to exclude a set of fields from passing through Kafka Connect:

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-01/config \
  -d '{
      "connector.class"            : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"             : "jdbc:mysql://mysql:3306/demo",
      "connection.user"            : "mysqluser",
      "connection.password"        : "mysqlpw",
      "topics"                     : "day10-transactions",
      "tasks.max"                  : "4",
      "auto.create"                : "true",
      "auto.evolve"                : "true",
      "transforms"                 : "dropCC",
      "transforms.dropCC.type"     : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.dropCC.blacklist": "cc_num,cc_exp,card_type"
      }'

In the target system (a database, in this case) the credit card fields are not present, exactly as intended:

mysql> describe `day10-transactions`;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| cost             | text | YES  |     | NULL    |       |
| units            | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| customer_remarks | text | YES  |     | NULL    |       |
| txn_date         | text | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)

Including only certain fields in a source connector

This time we have a source connector that’s ingesting data from a system that includes numerous fields that we don’t want to ingest into Kafka. Because a Single Message Transform applies to the pipeline before a message is written to Kafka, not after, we can deliberately ensure that certain data is never stored in Kafka if it’s not intended to be.

The source connector in this example is reading data from a database with a schema that looks like this:

mysql> describe production_data;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| cost             | text | YES  |     | NULL    |       |
| units            | text | YES  |     | NULL    |       |
| card_type        | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| customer_remarks | text | YES  |     | NULL    |       |
| cc_num           | text | YES  |     | NULL    |       |
| cc_exp           | text | YES  |     | NULL    |       |
| txn_date         | text | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
8 rows in set (0.00 sec)

Unlike the scenario in the first section (in which we wanted to store credit card data in Kafka and just exclude it from a certain target system), this time we want to extract data from the source system but only certain fields that we need for our particular analytics pipeline. It may be that it’s inefficient to ingest such a large number of redundant fields, or that the data is sensitive and we’re not allowed to store it in our topic. In this case we specify just a list of fields to include:

curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-day10-00/config \
  -H "Content-Type: application/json" -d '{
    "connector.class"                  : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url"                   : "jdbc:mysql://mysql:3306/demo",
    "connection.user"                  : "mysqluser",
    "connection.password"              : "mysqlpw",
    "topic.prefix"                     : "day10-",
    "poll.interval.ms"                 : 10000,
    "tasks.max"                        : 1,
    "table.include"                  : "production_data",
    "mode"                             : "bulk",
    "transforms"                       : "selectFields",
    "transforms.selectFields.type"     : "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.selectFields.include": "item,cost,units,txn_date"
  }'

The resulting Kafka topic is populated with only the fields of interest:

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day10-production_data -C -o-1 -u -q -J | jq  '.payload'
{
  "cost"    : { "string": "48.54" },
  "units"   : { "string": "41" },
  "item"    : { "string": "Oak Aged Yeti Imperial Stout" },
  "txn_date": { "string": "Mon Dec 14 11:43:56 GMT 2020" }
}

Renaming fields

Perhaps you want to keep all the fields in the payload - but you want to change the name of them. This could be for various reasons, including:

  • Standardise common naming for the same business measures as data is ingested into Kafka

  • Change a field to fit an existing name in a target object in a sink connector

Here’s an example renaming a field in a sink connector:

curl -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day10-02/config \
  -d '{
      "connector.class"            : "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url"             : "jdbc:mysql://mysql:3306/demo",
      "connection.user"            : "mysqluser",
      "connection.password"        : "mysqlpw",
      "topics"                     : "day10-production_data",
      "tasks.max"                  : "4",
      "auto.create"                : "true",
      "auto.evolve"                : "true",
      "transforms"                 : "renameTS",
      "transforms.renameTS.type"   : "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.renameTS.renames": "txn_date:transaction_timestamp"
      }'

The resulting table in the database has the amended field name (transaction_timestamp):

mysql> describe `day10-production_data`;
+-----------------------+------+------+-----+---------+-------+
| Field                 | Type | Null | Key | Default | Extra |
+-----------------------+------+------+-----+---------+-------+
| cost                  | text | YES  |     | NULL    |       |
| units                 | text | YES  |     | NULL    |       |
| card_type             | text | YES  |     | NULL    |       |
| item                  | text | YES  |     | NULL    |       |
| customer_remarks      | text | YES  |     | NULL    |       |
| cc_num                | text | YES  |     | NULL    |       |
| cc_exp                | text | YES  |     | NULL    |       |
| transaction_timestamp | text | YES  |     | NULL    |       |
+-----------------------+------+------+-----+---------+-------+
8 rows in set (0.01 sec)