diff --git a/.travis.yml b/.travis.yml
index a776739e..52ae7e49 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,4 +29,4 @@ before_install:
# Build
install:
-- build/gimel -Dmaven.test.skip=true -pl gimel-dataapi/gimel-tools -am | egrep -v "Download|Copy|Including|WARNING"
+- sh build/gimel -Dmaven.test.skip=true -pl gimel-dataapi/gimel-tools -am | egrep -v "Download|Copy|Including|WARNING"
diff --git a/build/gimel b/build/gimel
index 0d4bf254..1ab899c8 100755
--- a/build/gimel
+++ b/build/gimel
@@ -17,7 +17,7 @@
this_dir=`dirname build/install_dependencies`
# Source reusable functions
-source ${this_dir}/gimel_functions
+. ${this_dir}/gimel_functions
#--------------------------------Begin execution of Steps------------------------------------------------#
diff --git a/build/install_dependencies b/build/install_dependencies
index 90159c1d..89fdf003 100755
--- a/build/install_dependencies
+++ b/build/install_dependencies
@@ -19,7 +19,7 @@
this_script=`pwd`/$BASH_SOURCE
this_dir=`dirname $this_script`
# Source reusable functions
-source ${this_dir}/gimel_functions
+. ${this_dir}/gimel_functions
#--------------------------------Begin execution of Steps------------------------------------------------#
diff --git a/gimel-dataapi/gimel-common/pom.xml b/gimel-dataapi/gimel-common/pom.xml
index 52834328..39726a8c 100644
--- a/gimel-dataapi/gimel-common/pom.xml
+++ b/gimel-dataapi/gimel-common/pom.xml
@@ -55,9 +55,9 @@ under the License.
${packaging.scope}
- org.scala-lang
- scala-xml
- 2.11.0-M4
+ org.scala-lang.modules
+ scala-xml_${scala.binary.version}
+ ${scala.xml.version}
${scala.packaging.scope}
@@ -82,12 +82,6 @@ under the License.
com.paypal.gimel
gimel-logger
${gimel.version}-SNAPSHOT
-
-
- org.apache.kafka
- kafka-clients
-
-
org.apache.kafka
@@ -97,7 +91,7 @@ under the License.
com.paypal.gimel
serde-common
- 1.0-SNAPSHOT
+ 2.4.7-SNAPSHOT
${packaging.scope}
@@ -106,12 +100,6 @@ under the License.
${kafka.version}
${packaging.scope}
-
- com.databricks
- spark-avro_${scala.binary.version}
- 4.0.0
- ${packaging.scope}
-
org.apache.curator
curator-framework
@@ -197,7 +185,7 @@ under the License.
${confluent.version}
test
-
+
io.netty
netty
@@ -211,7 +199,7 @@ under the License.
test
- net.jpountz.lz4
+ net.jpountz.lz4
lz4
1.3.0
test
@@ -220,16 +208,19 @@ under the License.
com.fasterxml.jackson.core
jackson-core
${jackson.version}
+ compile
com.fasterxml.jackson.core
jackson-annotations
${jackson.version}
+ compile
com.fasterxml.jackson.core
jackson-databind
${jackson.version}
+ compile
diff --git a/gimel-dataapi/gimel-connectors/gimel-elasticsearch-6.2/pom.xml b/gimel-dataapi/gimel-connectors/gimel-elasticsearch-6.2/pom.xml
index 41196140..063b3600 100644
--- a/gimel-dataapi/gimel-connectors/gimel-elasticsearch-6.2/pom.xml
+++ b/gimel-dataapi/gimel-connectors/gimel-elasticsearch-6.2/pom.xml
@@ -45,9 +45,23 @@ under the License.
org.elasticsearch
- elasticsearch-spark-20_${scala.binary.version}
+ elasticsearch-spark-20_${elastic.scala.binary.version}
${elasticsearch.version}
${packaging.scope}
+
+
+ org.apache.spark
+ *
+
+
+ org.slf4j
+ log4j-over-slf4j
+
+
+ org.apache.hive
+ *
+
+
org.elasticsearch
@@ -78,7 +92,7 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
- 3.0.0
+ ${maven.shade.plugin.version}
diff --git a/gimel-dataapi/gimel-connectors/gimel-jdbc/pom.xml b/gimel-dataapi/gimel-connectors/gimel-jdbc/pom.xml
index 26d7c30f..8c6088ea 100644
--- a/gimel-dataapi/gimel-connectors/gimel-jdbc/pom.xml
+++ b/gimel-dataapi/gimel-connectors/gimel-jdbc/pom.xml
@@ -83,7 +83,7 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
- 3.0.0
+ ${maven.shade.plugin.version}
diff --git a/gimel-dataapi/gimel-connectors/gimel-sftp/pom.xml b/gimel-dataapi/gimel-connectors/gimel-sftp/pom.xml
index 9d07b5cf..5c2c66b0 100644
--- a/gimel-dataapi/gimel-connectors/gimel-sftp/pom.xml
+++ b/gimel-dataapi/gimel-connectors/gimel-sftp/pom.xml
@@ -39,9 +39,19 @@ under the License.
com.springml
- spark-sftp_${scala.binary.version}
+ spark-sftp_${springml.scala.binary.version}
${spark.sftp.version}
${packaging.scope}
+
+
+ org.apache.spark
+ *
+
+
+ org.scala-lang
+ *
+
+
org.scalatest
@@ -53,12 +63,11 @@ under the License.
src/main/scala
- src/test/scala
org.apache.maven.plugins
maven-shade-plugin
- 3.0.0
+ ${maven.shade.plugin.version}
diff --git a/gimel-dataapi/gimel-logger/pom.xml b/gimel-dataapi/gimel-logger/pom.xml
index 924f466c..e0688787 100644
--- a/gimel-dataapi/gimel-logger/pom.xml
+++ b/gimel-dataapi/gimel-logger/pom.xml
@@ -50,33 +50,6 @@ under the License.
${scala.version}
${scala.packaging.scope}
-
- com.paypal.gimel
- gimel-logging_${gimel.logging.spark.binary.version}
- ${gimel.logging.version}
-
-
- org.slf4j
- slf4j-log4j12
-
-
- log4j
- log4j
-
-
- org.apache.kafka
- *
-
-
- org.apache.kafka
- kafka-log4j-appender
-
-
- com.googlecode.protobuf-java-format
- protobuf-java-format
-
-
-
src/main/scala
diff --git a/gimel-parser/pom.xml b/gimel-parser/pom.xml
index 85a1b352..9896b5e3 100644
--- a/gimel-parser/pom.xml
+++ b/gimel-parser/pom.xml
@@ -15,9 +15,9 @@
- org.scala-lang
- scala-xml
- 2.11.0-M4
+ org.scala-lang.modules
+ scala-xml_${scala.binary.version}
+ ${scala.xml.version}
${scala.packaging.scope}
diff --git a/gimel-serde/gimel-deserializers/generic-deserializers/pom.xml b/gimel-serde/gimel-deserializers/generic-deserializers/pom.xml
index a9df87a5..5b3865d9 100644
--- a/gimel-serde/gimel-deserializers/generic-deserializers/pom.xml
+++ b/gimel-serde/gimel-deserializers/generic-deserializers/pom.xml
@@ -18,7 +18,7 @@
com.paypal.gimel
serde-common
- 1.0-SNAPSHOT
+ 2.4.7-SNAPSHOT
org.scala-lang
@@ -43,7 +43,7 @@
com.paypal.gimel
serde-common
- 1.0-SNAPSHOT
+ 2.4.7-SNAPSHOT
test-jar
test
diff --git a/gimel-serde/gimel-serializers/generic-serializers/pom.xml b/gimel-serde/gimel-serializers/generic-serializers/pom.xml
index b0c2c148..adf7925c 100644
--- a/gimel-serde/gimel-serializers/generic-serializers/pom.xml
+++ b/gimel-serde/gimel-serializers/generic-serializers/pom.xml
@@ -18,7 +18,7 @@
com.paypal.gimel
serde-common
- 1.0-SNAPSHOT
+ 2.4.7-SNAPSHOT
org.scala-lang
diff --git a/gimel-serde/pom.xml b/gimel-serde/pom.xml
index c775d59e..50bf47b7 100644
--- a/gimel-serde/pom.xml
+++ b/gimel-serde/pom.xml
@@ -97,14 +97,14 @@
-
- org.apache.avro
- avro-maven-plugin
-
-
- String
-
-
+
+
+
+
+
+
+
+
org.apache.maven.plugins
maven-surefire-plugin
@@ -164,7 +164,7 @@
org.apache.maven.plugins
maven-shade-plugin
- 3.0.0
+ ${maven.shade.plugin.version}
diff --git a/gimel-serde/serde-common/pom.xml b/gimel-serde/serde-common/pom.xml
index 9b5d1edc..de2de468 100644
--- a/gimel-serde/serde-common/pom.xml
+++ b/gimel-serde/serde-common/pom.xml
@@ -50,12 +50,18 @@
-
- com.databricks
- spark-avro_${scala.binary.version}
- 4.0.0
- ${packaging.scope}
-
+
+
+
+
+
+
+
+ org.apache.spark
+ spark-avro_2.12
+ ${spark.version}
+ ${packaging.scope}
+
io.spray
spray-json_${scala.binary.version}
diff --git a/gimel-serde/serde-common/src/main/scala/com/paypal/gimel/serde/common/avro/AvroUtils.scala b/gimel-serde/serde-common/src/main/scala/com/paypal/gimel/serde/common/avro/AvroUtils.scala
index 9173ec9d..0d0bf0c1 100644
--- a/gimel-serde/serde-common/src/main/scala/com/paypal/gimel/serde/common/avro/AvroUtils.scala
+++ b/gimel-serde/serde-common/src/main/scala/com/paypal/gimel/serde/common/avro/AvroUtils.scala
@@ -25,16 +25,15 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.collection.mutable
-import com.databricks.spark.avro.SchemaConverters._
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.avro._
+import org.apache.spark.sql.functions._
import spray.json._
import spray.json.DefaultJsonProtocol._
import spray.json.JsValue
@@ -109,6 +108,26 @@ object AvroUtils extends Serializable {
newGenericRec
}
+ /**
+ * Takes An Avro Schema String and Returns the list of field names in the "fields" list
+ * @param schemaString
+ * @return List(fieldNames)
+ */
+ def getTopLevelFieldNamesFromAvro(schemaString: String): Seq[String] = {
+ // Parse as JsValue
+ val schemaAsJsVal = schemaString.parseJson
+ // Convert to JsObject
+ val schemaAsJsObject = schemaAsJsVal.asJsObject
+ // Get the Map of each element & Value
+ val schemaElementsMap: Map[String, JsValue] = schemaAsJsObject.fields
+ // These fields will be added with "to-add" fields
+ val schemaFields: Seq[JsValue] = schemaAsJsObject.getFields("fields").head.convertTo[Seq[JsValue]]
+ schemaFields.map{ x =>
+ x.asJsObject.fields.head._2.toString().replace(""""""", "")
+ }
+
+ }
+
/**
* Adds additional fields to the Avro Schema
*
@@ -189,39 +208,24 @@ object AvroUtils extends Serializable {
*/
def getDeserializedDataFrame(dataframe: DataFrame, columnToDeserialize: String, avroSchemaString: String): DataFrame = {
val originalFields: Array[String] = dataframe.columns.filter(field => field != columnToDeserialize)
- val newAvroSchemaString = addAdditionalFieldsToSchema(originalFields.toList, avroSchemaString)
- try {
- dataframe.map { eachRow =>
- val recordToDeserialize: Array[Byte] = eachRow.getAs(columnToDeserialize).asInstanceOf[Array[Byte]]
- val originalColumnsMap = originalFields.map {
- field => {
- val index = eachRow.fieldIndex(field)
- if (eachRow.isNullAt(index)) {
- (field -> "null")
- } else {
- (field -> eachRow.getAs(field).toString)
- }
- }
- }
- val deserializedGenericRecord: GenericRecord = bytesToGenericRecordWithSchemaRecon(recordToDeserialize, avroSchemaString, avroSchemaString)
- val newDeserializedGenericRecord: GenericRecord = copyToGenericRecord(deserializedGenericRecord, avroSchemaString, newAvroSchemaString)
- originalColumnsMap.foreach { kv => newDeserializedGenericRecord.put(kv._1, kv._2) }
- val avroSchemaObj: Schema = (new Schema.Parser).parse(newAvroSchemaString)
- val converter = AvroToSQLSchemaConverter.createConverterToSQL(avroSchemaObj)
- converter(newDeserializedGenericRecord).asInstanceOf[Row]
- } {
- val avroSchema: Schema = (new Schema.Parser).parse(newAvroSchemaString)
- val schemaType: SchemaType = toSqlType(avroSchema)
- val encoder = RowEncoder(schemaType.dataType.asInstanceOf[StructType])
- encoder
- }.toDF
- } catch {
- case ex: Throwable => {
- ex.printStackTrace()
- throw ex
- }
- }
+ logger.debug(s"Original Fields \n${originalFields}")
+ logger.debug(s"schema \n${avroSchemaString}")
+ val fieldsInAvro = getTopLevelFieldNamesFromAvro(avroSchemaString )
+ logger.debug(s"Avro Fields \n${fieldsInAvro}")
+ logger.debug(s"**************** schema before deserialize ************************")
+ dataframe.printSchema()
+ val op = dataframe.withColumn("avro", from_avro(col(columnToDeserialize), avroSchemaString) )
+ logger.debug(s"**************** schema after deserialize ************************")
+ op.printSchema()
+ op.show(2)
+ logger.debug(s"**************** Fields in avro that will be projected in dataFrame ************************")
+ logger.debug(fieldsInAvro.mkString(","))
+ val colsToSelect: Seq[String] = fieldsInAvro.map{ x => s"avro.${x}"}
+ logger.debug(colsToSelect.mkString(","))
+ val k = op.select(colsToSelect.head, colsToSelect.tail: _*)
+ k
+
}
/**
diff --git a/pom.xml b/pom.xml
index 3635f3ea..1d5c6342 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,10 +61,10 @@ under the License.
- general
+ dataproc_1.5x
- general
+ dataproc_1.5x
true
@@ -75,18 +75,23 @@ under the License.
provided
provided
3.4.13
- 2.11
+ 2.12
3.0.1
4.1.0
2.4.7
- 2.11.8
- 2.3
- 2.3.0
- 2.7.3
- 1.2.1
- 1.1.2
+ 2.12.10
+ 1.3.0
+ 2.4
+ 2.4.7
+ 2.10.0
+ 2.3.7
+ 1.5.0
1.8
+ 0.17.1
2.8.0
+ 2.11
+ 2.11
+ 3.2.4