You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Trying to get this plugin working (thanks very much by the way :) )
my s3-sink connector works when I comment out the lines, when I comment them back in it fails and I can't understand the error. Here's the stack trace:
[2021-07-16 11:10:16,067] ERROR Failed to create job for /home/ec2-user/kafka/config/s3-sink.properties (org.apache.kafka.connect.cli.ConnectStandalone:110)
[2021-07-16 11:10:16,068] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:304)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:249)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:615)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:623)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:531)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:514)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:496)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:144)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:375)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Trying to get this plugin working (thanks very much by the way :) )
my s3-sink connector works when I comment out the lines, when I comment them back in it fails and I can't understand the error. Here's the stack trace:
[2021-07-16 11:10:16,067] ERROR Failed to create job for /home/ec2-user/kafka/config/s3-sink.properties (org.apache.kafka.connect.cli.ConnectStandalone:110)
[2021-07-16 11:10:16,068] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:304)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:249)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:615)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:623)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:531)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:514)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:496)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:144)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:375)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
here's my sink config:
tasks.max=1
topics=events
topics.dir=es_tests
s3.region=eu-west-2
s3.bucket.name=msk-ingestion
s3.part.size=5242880
flush.size=10
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
partitioner.class=com.canelmas.kafka.connect.FieldAndTimeBasedPartitioner
partition.field.name=officeBid
partition.duration.ms=86400000
path.format="'year'=YYYY/'month'=MM/'day'=dd"
timestamp.extractor=RecordField
timestamp.field=modified
The plugin is definitely being loaded because I can see it loading further up in the logs.
Any idea about what I'm doing wrong?
Thanks in advance!
The text was updated successfully, but these errors were encountered: