From 101ad9481d3a461d36b489719856f2bbf9db1fc7 Mon Sep 17 00:00:00 2001 From: Dee-Pac Date: Fri, 6 Nov 2020 01:27:08 -0800 Subject: [PATCH] [#247] [spark] Bump Spark Version to 2.4.7 | Bump Spark Version to 2.4.7 along with other stack to align with gcp_dataproc_1.5.x --- .travis.yml | 2 +- build/gimel | 2 +- build/install_dependencies | 2 +- gimel-dataapi/gimel-common/pom.xml | 27 +++---- .../gimel-elasticsearch-6.2/pom.xml | 18 ++++- .../gimel-connectors/gimel-jdbc/pom.xml | 2 +- .../gimel-connectors/gimel-sftp/pom.xml | 15 +++- gimel-dataapi/gimel-logger/pom.xml | 27 ------- gimel-parser/pom.xml | 6 +- .../generic-deserializers/pom.xml | 4 +- .../generic-serializers/pom.xml | 2 +- gimel-serde/pom.xml | 18 ++--- gimel-serde/serde-common/pom.xml | 18 +++-- .../gimel/serde/common/avro/AvroUtils.scala | 76 ++++++++++--------- pom.xml | 23 +++--- 15 files changed, 122 insertions(+), 120 deletions(-) diff --git a/.travis.yml b/.travis.yml index a776739e..52ae7e49 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,4 +29,4 @@ before_install: # Build install: -- build/gimel -Dmaven.test.skip=true -pl gimel-dataapi/gimel-tools -am | egrep -v "Download|Copy|Including|WARNING" +- sh build/gimel -Dmaven.test.skip=true -pl gimel-dataapi/gimel-tools -am | egrep -v "Download|Copy|Including|WARNING" diff --git a/build/gimel b/build/gimel index 0d4bf254..1ab899c8 100755 --- a/build/gimel +++ b/build/gimel @@ -17,7 +17,7 @@ this_dir=`dirname build/install_dependencies` # Source reusable functions -source ${this_dir}/gimel_functions +. ${this_dir}/gimel_functions #--------------------------------Begin execution of Steps------------------------------------------------# diff --git a/build/install_dependencies b/build/install_dependencies index 90159c1d..89fdf003 100755 --- a/build/install_dependencies +++ b/build/install_dependencies @@ -19,7 +19,7 @@ this_script=`pwd`/$BASH_SOURCE this_dir=`dirname $this_script` # Source reusable functions -source ${this_dir}/gimel_functions +. ${this_dir}/gimel_functions #--------------------------------Begin execution of Steps------------------------------------------------# diff --git a/gimel-dataapi/gimel-common/pom.xml b/gimel-dataapi/gimel-common/pom.xml index 52834328..39726a8c 100644 --- a/gimel-dataapi/gimel-common/pom.xml +++ b/gimel-dataapi/gimel-common/pom.xml @@ -55,9 +55,9 @@ under the License. ${packaging.scope} - org.scala-lang - scala-xml - 2.11.0-M4 + org.scala-lang.modules + scala-xml_${scala.binary.version} + ${scala.xml.version} ${scala.packaging.scope} @@ -82,12 +82,6 @@ under the License. com.paypal.gimel gimel-logger ${gimel.version}-SNAPSHOT - - - org.apache.kafka - kafka-clients - - org.apache.kafka @@ -97,7 +91,7 @@ under the License. com.paypal.gimel serde-common - 1.0-SNAPSHOT + 2.4.7-SNAPSHOT ${packaging.scope} @@ -106,12 +100,6 @@ under the License. ${kafka.version} ${packaging.scope} - - com.databricks - spark-avro_${scala.binary.version} - 4.0.0 - ${packaging.scope} - org.apache.curator curator-framework @@ -197,7 +185,7 @@ under the License. ${confluent.version} test - + io.netty netty @@ -211,7 +199,7 @@ under the License. test - net.jpountz.lz4 + net.jpountz.lz4 lz4 1.3.0 test @@ -220,16 +208,19 @@ under the License. com.fasterxml.jackson.core jackson-core ${jackson.version} + compile com.fasterxml.jackson.core jackson-annotations ${jackson.version} + compile com.fasterxml.jackson.core jackson-databind ${jackson.version} + compile diff --git a/gimel-dataapi/gimel-connectors/gimel-elasticsearch-6.2/pom.xml b/gimel-dataapi/gimel-connectors/gimel-elasticsearch-6.2/pom.xml index 41196140..063b3600 100644 --- a/gimel-dataapi/gimel-connectors/gimel-elasticsearch-6.2/pom.xml +++ b/gimel-dataapi/gimel-connectors/gimel-elasticsearch-6.2/pom.xml @@ -45,9 +45,23 @@ under the License. org.elasticsearch - elasticsearch-spark-20_${scala.binary.version} + elasticsearch-spark-20_${elastic.scala.binary.version} ${elasticsearch.version} ${packaging.scope} + + + org.apache.spark + * + + + org.slf4j + log4j-over-slf4j + + + org.apache.hive + * + + org.elasticsearch @@ -78,7 +92,7 @@ under the License. org.apache.maven.plugins maven-shade-plugin - 3.0.0 + ${maven.shade.plugin.version} diff --git a/gimel-dataapi/gimel-connectors/gimel-jdbc/pom.xml b/gimel-dataapi/gimel-connectors/gimel-jdbc/pom.xml index 26d7c30f..8c6088ea 100644 --- a/gimel-dataapi/gimel-connectors/gimel-jdbc/pom.xml +++ b/gimel-dataapi/gimel-connectors/gimel-jdbc/pom.xml @@ -83,7 +83,7 @@ under the License. org.apache.maven.plugins maven-shade-plugin - 3.0.0 + ${maven.shade.plugin.version} diff --git a/gimel-dataapi/gimel-connectors/gimel-sftp/pom.xml b/gimel-dataapi/gimel-connectors/gimel-sftp/pom.xml index 9d07b5cf..5c2c66b0 100644 --- a/gimel-dataapi/gimel-connectors/gimel-sftp/pom.xml +++ b/gimel-dataapi/gimel-connectors/gimel-sftp/pom.xml @@ -39,9 +39,19 @@ under the License. com.springml - spark-sftp_${scala.binary.version} + spark-sftp_${springml.scala.binary.version} ${spark.sftp.version} ${packaging.scope} + + + org.apache.spark + * + + + org.scala-lang + * + + org.scalatest @@ -53,12 +63,11 @@ under the License. src/main/scala - src/test/scala org.apache.maven.plugins maven-shade-plugin - 3.0.0 + ${maven.shade.plugin.version} diff --git a/gimel-dataapi/gimel-logger/pom.xml b/gimel-dataapi/gimel-logger/pom.xml index 924f466c..e0688787 100644 --- a/gimel-dataapi/gimel-logger/pom.xml +++ b/gimel-dataapi/gimel-logger/pom.xml @@ -50,33 +50,6 @@ under the License. ${scala.version} ${scala.packaging.scope} - - com.paypal.gimel - gimel-logging_${gimel.logging.spark.binary.version} - ${gimel.logging.version} - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - org.apache.kafka - * - - - org.apache.kafka - kafka-log4j-appender - - - com.googlecode.protobuf-java-format - protobuf-java-format - - - src/main/scala diff --git a/gimel-parser/pom.xml b/gimel-parser/pom.xml index 85a1b352..9896b5e3 100644 --- a/gimel-parser/pom.xml +++ b/gimel-parser/pom.xml @@ -15,9 +15,9 @@ - org.scala-lang - scala-xml - 2.11.0-M4 + org.scala-lang.modules + scala-xml_${scala.binary.version} + ${scala.xml.version} ${scala.packaging.scope} diff --git a/gimel-serde/gimel-deserializers/generic-deserializers/pom.xml b/gimel-serde/gimel-deserializers/generic-deserializers/pom.xml index a9df87a5..5b3865d9 100644 --- a/gimel-serde/gimel-deserializers/generic-deserializers/pom.xml +++ b/gimel-serde/gimel-deserializers/generic-deserializers/pom.xml @@ -18,7 +18,7 @@ com.paypal.gimel serde-common - 1.0-SNAPSHOT + 2.4.7-SNAPSHOT org.scala-lang @@ -43,7 +43,7 @@ com.paypal.gimel serde-common - 1.0-SNAPSHOT + 2.4.7-SNAPSHOT test-jar test diff --git a/gimel-serde/gimel-serializers/generic-serializers/pom.xml b/gimel-serde/gimel-serializers/generic-serializers/pom.xml index b0c2c148..adf7925c 100644 --- a/gimel-serde/gimel-serializers/generic-serializers/pom.xml +++ b/gimel-serde/gimel-serializers/generic-serializers/pom.xml @@ -18,7 +18,7 @@ com.paypal.gimel serde-common - 1.0-SNAPSHOT + 2.4.7-SNAPSHOT org.scala-lang diff --git a/gimel-serde/pom.xml b/gimel-serde/pom.xml index c775d59e..50bf47b7 100644 --- a/gimel-serde/pom.xml +++ b/gimel-serde/pom.xml @@ -97,14 +97,14 @@ - - org.apache.avro - avro-maven-plugin - - - String - - + + + + + + + + org.apache.maven.plugins maven-surefire-plugin @@ -164,7 +164,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.0.0 + ${maven.shade.plugin.version} diff --git a/gimel-serde/serde-common/pom.xml b/gimel-serde/serde-common/pom.xml index 9b5d1edc..de2de468 100644 --- a/gimel-serde/serde-common/pom.xml +++ b/gimel-serde/serde-common/pom.xml @@ -50,12 +50,18 @@ - - com.databricks - spark-avro_${scala.binary.version} - 4.0.0 - ${packaging.scope} - + + + + + + + + org.apache.spark + spark-avro_2.12 + ${spark.version} + ${packaging.scope} + io.spray spray-json_${scala.binary.version} diff --git a/gimel-serde/serde-common/src/main/scala/com/paypal/gimel/serde/common/avro/AvroUtils.scala b/gimel-serde/serde-common/src/main/scala/com/paypal/gimel/serde/common/avro/AvroUtils.scala index 9173ec9d..0d0bf0c1 100644 --- a/gimel-serde/serde-common/src/main/scala/com/paypal/gimel/serde/common/avro/AvroUtils.scala +++ b/gimel-serde/serde-common/src/main/scala/com/paypal/gimel/serde/common/avro/AvroUtils.scala @@ -25,16 +25,15 @@ import scala.collection.JavaConverters._ import scala.collection.immutable.Map import scala.collection.mutable -import com.databricks.spark.avro.SchemaConverters._ import org.apache.avro.Schema import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io.DecoderFactory import org.apache.avro.io.EncoderFactory import org.apache.avro.specific.SpecificDatumWriter import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.avro._ +import org.apache.spark.sql.functions._ import spray.json._ import spray.json.DefaultJsonProtocol._ import spray.json.JsValue @@ -109,6 +108,26 @@ object AvroUtils extends Serializable { newGenericRec } + /** + * Takes An Avro Schema String and Returns the list of field names in the "fields" list + * @param schemaString + * @return List(fieldNames) + */ + def getTopLevelFieldNamesFromAvro(schemaString: String): Seq[String] = { + // Parse as JsValue + val schemaAsJsVal = schemaString.parseJson + // Convert to JsObject + val schemaAsJsObject = schemaAsJsVal.asJsObject + // Get the Map of each element & Value + val schemaElementsMap: Map[String, JsValue] = schemaAsJsObject.fields + // These fields will be added with "to-add" fields + val schemaFields: Seq[JsValue] = schemaAsJsObject.getFields("fields").head.convertTo[Seq[JsValue]] + schemaFields.map{ x => + x.asJsObject.fields.head._2.toString().replace(""""""", "") + } + + } + /** * Adds additional fields to the Avro Schema * @@ -189,39 +208,24 @@ object AvroUtils extends Serializable { */ def getDeserializedDataFrame(dataframe: DataFrame, columnToDeserialize: String, avroSchemaString: String): DataFrame = { val originalFields: Array[String] = dataframe.columns.filter(field => field != columnToDeserialize) - val newAvroSchemaString = addAdditionalFieldsToSchema(originalFields.toList, avroSchemaString) - try { - dataframe.map { eachRow => - val recordToDeserialize: Array[Byte] = eachRow.getAs(columnToDeserialize).asInstanceOf[Array[Byte]] - val originalColumnsMap = originalFields.map { - field => { - val index = eachRow.fieldIndex(field) - if (eachRow.isNullAt(index)) { - (field -> "null") - } else { - (field -> eachRow.getAs(field).toString) - } - } - } - val deserializedGenericRecord: GenericRecord = bytesToGenericRecordWithSchemaRecon(recordToDeserialize, avroSchemaString, avroSchemaString) - val newDeserializedGenericRecord: GenericRecord = copyToGenericRecord(deserializedGenericRecord, avroSchemaString, newAvroSchemaString) - originalColumnsMap.foreach { kv => newDeserializedGenericRecord.put(kv._1, kv._2) } - val avroSchemaObj: Schema = (new Schema.Parser).parse(newAvroSchemaString) - val converter = AvroToSQLSchemaConverter.createConverterToSQL(avroSchemaObj) - converter(newDeserializedGenericRecord).asInstanceOf[Row] - } { - val avroSchema: Schema = (new Schema.Parser).parse(newAvroSchemaString) - val schemaType: SchemaType = toSqlType(avroSchema) - val encoder = RowEncoder(schemaType.dataType.asInstanceOf[StructType]) - encoder - }.toDF - } catch { - case ex: Throwable => { - ex.printStackTrace() - throw ex - } - } + logger.debug(s"Original Fields \n${originalFields}") + logger.debug(s"schema \n${avroSchemaString}") + val fieldsInAvro = getTopLevelFieldNamesFromAvro(avroSchemaString ) + logger.debug(s"Avro Fields \n${fieldsInAvro}") + logger.debug(s"**************** schema before deserialize ************************") + dataframe.printSchema() + val op = dataframe.withColumn("avro", from_avro(col(columnToDeserialize), avroSchemaString) ) + logger.debug(s"**************** schema after deserialize ************************") + op.printSchema() + op.show(2) + logger.debug(s"**************** Fields in avro that will be projected in dataFrame ************************") + logger.debug(fieldsInAvro.mkString(",")) + val colsToSelect: Seq[String] = fieldsInAvro.map{ x => s"avro.${x}"} + logger.debug(colsToSelect.mkString(",")) + val k = op.select(colsToSelect.head, colsToSelect.tail: _*) + k + } /** diff --git a/pom.xml b/pom.xml index 3635f3ea..1d5c6342 100644 --- a/pom.xml +++ b/pom.xml @@ -61,10 +61,10 @@ under the License. - general + dataproc_1.5x - general + dataproc_1.5x true @@ -75,18 +75,23 @@ under the License. provided provided 3.4.13 - 2.11 + 2.12 3.0.1 4.1.0 2.4.7 - 2.11.8 - 2.3 - 2.3.0 - 2.7.3 - 1.2.1 - 1.1.2 + 2.12.10 + 1.3.0 + 2.4 + 2.4.7 + 2.10.0 + 2.3.7 + 1.5.0 1.8 + 0.17.1 2.8.0 + 2.11 + 2.11 + 3.2.4