Kafka Connect SAP is a generic set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
To install the connector from source,
Clone this repository to your local desktop, and then bring up a command prompt in the directory.
and use the following command.
mvn clean install -DskipTests
which should produce the Kafka Connector jar file kafka-connector-hana_m-n.jar
in the modules/scala_m/target
folder, where m
corresponds to Scala binary version and n
corresponds to the connector version.
Include the Jdbc Jar
Please refer to SAP Developer License Agreement for the use of the driver jar.
- Follow the steps in SAP HANA Client Interface Programming Reference guide to access the SAP HANA Jdbc jar.
- The maven coordinate of the driver is
com.sap.cloud.db.jdbc:ngdbc:x.x.x
and the drivers are available at the central maven repository https://search.maven.org/artifact/com.sap.cloud.db.jdbc/ngdbc.
There are some examples that can be executed by following the instructions. For the detail of these examples, refer to Examples.
The demo examples included in Examples use Kafka Connect running in different environments such as standalone and distributed modes. For general information on how to run Kafka Connect, refer to Kafka Connect documentation
The kafka connector for SAP Systems
provides a wide set of configuration options both for source & sink.
The full list of configuration options for kafka connector for SAP Systems
is as follows:
-
Sink
-
topics
- This setting can be used to specifya comma-separated list of topics
. Must not have spaces. -
auto.create
- This setting allows the creation of a new table in SAP DBs if the table specified in{topic}.table.name
does not exist. Should be aBoolean
. Default isfalse
. -
auto.evolve
- This setting allows the evolution of the table schema with some restriction, namely when the record contains additional nullable fields that are not present previously, the corresponding columns will be added. In contrast, when the record contains less fields, the table schema will not be changed. Should be aBoolean
. Default isfalse
. -
auto.schema.update
- Whether to auto update Schema from Database for every record flush. Default isfalse
. -
batch.size
- This setting can be used to specify the number of records that can be pushed into SAP DB table in a single flush. Should be anInteger
. Default is3000
. -
max.retries
- (deprecated) This setting can be used to specify the maximum no. of retries that can be made to re-establish the connection to SAP DB in case the connection is lost. Should be anInteger
. Default is10
. This property is currently ignored as the task will automatically retry when a connection error results in a RetriableException for both source and sink tasks. -
{topic}.table.name
- This setting allows specifying the SAP DBs table name where the data needs to be written to. Should be aString
. Must be compatible to SAP DB Table name like"SCHEMA"."TABLE"
. -
{topic}.table.type
- This is a DB specific configuration setting which allows creation of Row & Column tables ifauto.create
is set to true. Default value iscolumn
. And supported values arecolumn, row
. -
{topic}.insert.mode
- This setting can be used to specify one of the available insertion modesinsert
andupsert
. Default isinsert
. -
{topic}.delete.enabled
- This setting can be used to allow the deletion of the record when its corresponding tombstone record is received by the connector. Default isfalse
. -
{topic}.pk.mode
- This setting can be used to specify the primary key mode required whenauto.create
is set totrue
& the table name specified in{topic}.table.name
does not exist in SAP DB. Default isnone
. And supported values arerecord_key, record_value
. -
{topic}.pk.fields
- This setting can be used to specifya comma-separated list of primary key fields
when{topic}.pk.mode
is set torecord_key
orrecord_value
. Must not have spaces. -
{topic}.table.partition.mode
- This is a SapDB Sink specific configuration setting which determines the table partitioning in SAP DB. Default value isnone
. And supported values arenone, hash, round_robin
. -
{topic}.table.partition.count
- This is a SapDB Sink specific configuration setting which determines the number of partitions the table should have. Required whenauto.create
is set totrue
and table specified in{topic}.table.name
does not exist in SAP DBs. Should be anInteger
. Default value is0
.
-
-
Source
-
topics
- This setting can be used to specifya comma-separated list of topics
. Must not have spaces. -
mode
- This setting can be used to specify the mode in which data should be fetched from SAP DB table. Default isbulk
. And supported values arebulk, incrementing
. -
queryMode
- This setting can be used to specify the query mode in which data should be fetched from SAP DB table. Default istable
. And supported values aretable, query ( to support sql queries )
. When usingqueryMode: query
it is also required to havequery
parameter defined. This query parameter needs to be prepended by TopicName. If theincrementing.column.name
property is used together to constrain the result, then it can be omitted from its where clause. -
batch.max.rows
- Max rows to include in a single batch call. Should be an integer. Default is100
. -
{topic}.table.name
- This setting allows specifying the SAP DB table name where the data needs to be read from. Should be aString
. Must be compatible to SAP DB Table name like"SCHEMA"."TABLE"
. -
{topic}.query
- This setting allows specifying the query statement whenqueryMode
is set toquery
. Should be aString
. -
{topic}.poll.interval.ms
- This setting allows specifying the poll interval at which the data should be fetched from SAP DB table. Should be anInteger
. Default value is60000
. -
{topic}.incrementing.column.name
- In order to fetch data from a SAP DB table whenmode
is set toincrementing
, an incremental ( or auto-incremental ) column needs to be provided. The type of the column can be numeric types such asINTEGER
,FLOAT
,DECIMAL
, datetime types such asDATE
,TIME
,TIMESTAMP
, and character typesVARCHAR
,NVARCHAR
containing alpha-numeric characters. This considers SAP DB Timeseries tables also. Should be a valid column name ( respresented as aString
) present in the table. See data types in SAP HANA -
{topic}.partition.count
- This setting can be used to specify the no. of topic partitions that the Source connector can use to publish the data. Should be anInteger
. Default value is1
. -
numeric.mapping
- This setting can be used to control whether the DECIMAL column types are mapped to the default decimal type or one of the primitive types. The supported values arenone
,best_fit
, andbest_fit_eager_double
. The default value isnone
.
-
-
Transformations
EscapeFieldNameCharacters
- This SMT translates the field names by escaping certain characters of the names to make them valid in the naming scheme of the target. Each escaped character is represented in a sequence of UTF-8 bytes, each in form<esc><xx>
, where<esc>
is the specified escape character and<xx>
is the hexiadecimal value of the byte.type
-com.sap.kafka.connect.transforms.EscapeFieldNameCharacters$Key
andcom.sap.kafka.connect.transforms.EscapeFieldNameCharacters$Value
escape.char
- The escape character to be used.valid.chars.default
- This value specifies the valid character set used in escaping those characters outside of the specified set. When this value is not set, the names are unescaped.valid.chars.first
- This value optinally specifies the valid character set for the first character if this difers from the rest.
Folder examples
includes some example scenarios. In addtion, the unit tests
provide examples on every possible mode in which the connector can be configured.
We welcome comments, questions, and bug reports. Please create an issue to obtain support.
Contributions are accepted by sending Pull Requests to this repo.
Due to legal reasons, contributors will be asked to accept a DCO when they create the first pull request to this project. This happens in an automated fashion during the submission process. SAP uses the standard DCO text of the Linux Foundation.
Currently only SAP Hana is supported.
Copyright (c) 2015-2022 SAP SE or an SAP affiliate company and kafka-connect-sap contributors. Please see our LICENSE for copyright and license information. Detailed information including third-party components and their licensing/copyright information is available via the REUSE tool.