diff --git a/gradle.properties b/gradle.properties index 1f8906d..9c10b8a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ scalaArmVersion=2.0 scalaVersion=2.12.13 shadowGradlePlugin=6.1.0 slf4jApiVersion=1.7.25 -sparkVersion=3.4.0 +sparkVersion=3.5.0 # Version and base tags can be overridden at build time. connectorVersion=0.14.0-SNAPSHOT diff --git a/src/main/scala/io/pravega/connectors/spark/NonTransactionPravegaWriter.scala b/src/main/scala/io/pravega/connectors/spark/NonTransactionPravegaWriter.scala index 8639357..09add0f 100755 --- a/src/main/scala/io/pravega/connectors/spark/NonTransactionPravegaWriter.scala +++ b/src/main/scala/io/pravega/connectors/spark/NonTransactionPravegaWriter.scala @@ -47,6 +47,8 @@ class NonTransactionPravegaWriter( schema: StructType) extends StreamingWrite with BatchWrite with Logging { + override def useCommitCoordinator() = true + override def createBatchWriterFactory(info: PhysicalWriteInfo): NonTransactionPravegaWriterFactory = NonTransactionPravegaWriterFactory(scopeName, streamName, clientConfig, schema) diff --git a/src/main/scala/io/pravega/connectors/spark/TransactionPravegaWriter.scala b/src/main/scala/io/pravega/connectors/spark/TransactionPravegaWriter.scala index a5b0b94..130b3b2 100755 --- a/src/main/scala/io/pravega/connectors/spark/TransactionPravegaWriter.scala +++ b/src/main/scala/io/pravega/connectors/spark/TransactionPravegaWriter.scala @@ -141,6 +141,7 @@ class TransactionPravegaWriter( log.debug(s"abort: END: epochId=$epochId, messages=${messages.mkString(",")}") } + override def useCommitCoordinator() = true override def createBatchWriterFactory(info: PhysicalWriteInfo): TransactionPravegaWriterFactory = TransactionPravegaWriterFactory(scopeName, streamName, clientConfig, transactionTimeoutMs, schema)