From 571146ea81334b103c3699512c8a4bcc0a62964c Mon Sep 17 00:00:00 2001 From: Kuldeep Ved <46494719+kuldeepved@users.noreply.github.com> Date: Tue, 5 Dec 2023 08:16:55 +0530 Subject: [PATCH] [Issue-170] Upgrade Spark Version to 3.5.0 (#174) Signed-off-by: Ved, Kuldeep --- gradle.properties | 2 +- .../pravega/connectors/spark/NonTransactionPravegaWriter.scala | 2 ++ .../io/pravega/connectors/spark/TransactionPravegaWriter.scala | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 1f8906d..9c10b8a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ scalaArmVersion=2.0 scalaVersion=2.12.13 shadowGradlePlugin=6.1.0 slf4jApiVersion=1.7.25 -sparkVersion=3.4.0 +sparkVersion=3.5.0 # Version and base tags can be overridden at build time. connectorVersion=0.14.0-SNAPSHOT diff --git a/src/main/scala/io/pravega/connectors/spark/NonTransactionPravegaWriter.scala b/src/main/scala/io/pravega/connectors/spark/NonTransactionPravegaWriter.scala index 8639357..09add0f 100755 --- a/src/main/scala/io/pravega/connectors/spark/NonTransactionPravegaWriter.scala +++ b/src/main/scala/io/pravega/connectors/spark/NonTransactionPravegaWriter.scala @@ -47,6 +47,8 @@ class NonTransactionPravegaWriter( schema: StructType) extends StreamingWrite with BatchWrite with Logging { + override def useCommitCoordinator() = true + override def createBatchWriterFactory(info: PhysicalWriteInfo): NonTransactionPravegaWriterFactory = NonTransactionPravegaWriterFactory(scopeName, streamName, clientConfig, schema) diff --git a/src/main/scala/io/pravega/connectors/spark/TransactionPravegaWriter.scala b/src/main/scala/io/pravega/connectors/spark/TransactionPravegaWriter.scala index a5b0b94..130b3b2 100755 --- a/src/main/scala/io/pravega/connectors/spark/TransactionPravegaWriter.scala +++ b/src/main/scala/io/pravega/connectors/spark/TransactionPravegaWriter.scala @@ -141,6 +141,7 @@ class TransactionPravegaWriter( log.debug(s"abort: END: epochId=$epochId, messages=${messages.mkString(",")}") } + override def useCommitCoordinator() = true override def createBatchWriterFactory(info: PhysicalWriteInfo): TransactionPravegaWriterFactory = TransactionPravegaWriterFactory(scopeName, streamName, clientConfig, transactionTimeoutMs, schema)