From 378ca6292f8d311ad42f089b76d2d452e873f46f Mon Sep 17 00:00:00 2001 From: pmadrigal Date: Mon, 16 Nov 2015 15:27:48 +0100 Subject: [PATCH 1/4] partial Not filter --- CHANGELOG.md | 2 +- .../datasource/mongodb/reader/MongodbReader.scala | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c375f0c..bbc052c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## 0.9.1 (November 2015) -* Bug fixed +* Partial NOT filter working ## 0.9.0 (October 2015) diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala index 921250f..bc066d0 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala @@ -24,7 +24,7 @@ import com.stratio.datasource.mongodb.partitioner.MongodbPartition import org.apache.spark.Partition import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.UTF8String -import scala.util.Try +import scala.util.{Failure, Success, Try} import java.util.regex.Pattern /** @@ -78,10 +78,14 @@ class MongodbReader( MongoCredential.createCredential(user,database,password)}, config.get[MongodbSSLOptions](MongodbConfig.SSLOptions), config.properties.filterKeys(_.contains(MongodbConfig.ListMongoClientOptions)))) + + val emptyFilter = MongoDBObject(List()) + val filter = Try(queryPartition(filters)).getOrElse(emptyFilter) + dbCursor = (for { client <- mongoClient collection <- Option(client(config(MongodbConfig.Database))(config(MongodbConfig.Collection))) - dbCursor <- Option(collection.find(queryPartition(filters), selectFields(requiredColumns))) + dbCursor <- Option(collection.find(filter, selectFields(requiredColumns))) } yield { mongoPartition.partitionRange.minKey.foreach(min => dbCursor.addSpecial("$min", min)) mongoPartition.partitionRange.maxKey.foreach(max => dbCursor.addSpecial("$max", max)) @@ -138,7 +142,6 @@ class MongodbReader( queryBuilder.get } - filtersToDBObject(filters) } From 5bd74aa797742e3ff98a6a900c3d4e78cb1c809c Mon Sep 17 00:00:00 2001 From: pmadrigal Date: Mon, 16 Nov 2015 15:50:15 +0100 Subject: [PATCH 2/4] remove some imports not needed --- .../com/stratio/datasource/mongodb/reader/MongodbReader.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala index bc066d0..dfb416d 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala @@ -24,7 +24,7 @@ import com.stratio.datasource.mongodb.partitioner.MongodbPartition import org.apache.spark.Partition import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.UTF8String -import scala.util.{Failure, Success, Try} +import scala.util.Try import java.util.regex.Pattern /** @@ -78,7 +78,6 @@ class MongodbReader( MongoCredential.createCredential(user,database,password)}, config.get[MongodbSSLOptions](MongodbConfig.SSLOptions), config.properties.filterKeys(_.contains(MongodbConfig.ListMongoClientOptions)))) - val emptyFilter = MongoDBObject(List()) val filter = Try(queryPartition(filters)).getOrElse(emptyFilter) From 1f3e1e827eafaa023c7bb6f41cd5406e7796b4b7 Mon Sep 17 00:00:00 2001 From: pmadrigal Date: Mon, 16 Nov 2015 16:41:11 +0100 Subject: [PATCH 3/4] change writer test --- .../mongodb/writer/MongodbWriterIT.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spark-mongodb/src/test/scala/com/stratio/datasource/mongodb/writer/MongodbWriterIT.scala b/spark-mongodb/src/test/scala/com/stratio/datasource/mongodb/writer/MongodbWriterIT.scala index 69184fc..4f6b02f 100644 --- a/spark-mongodb/src/test/scala/com/stratio/datasource/mongodb/writer/MongodbWriterIT.scala +++ b/spark-mongodb/src/test/scala/com/stratio/datasource/mongodb/writer/MongodbWriterIT.scala @@ -210,7 +210,7 @@ with ScalaBinaryVersion{ } } - it should "manage the search fields and the update query, it has to read the same value from the search fields in " + + it should "manage the update fields and the update query, it has to read the same value from the fields in " + "configuration" + scalaBinaryVersion in { withEmbedMongoFixture(List()) { mongodbProc => @@ -230,17 +230,17 @@ with ScalaBinaryVersion{ import scala.collection.JavaConversions._ - dbCursor.iterator().toList.forall { case obj: BasicDBObject => - obj.getInt("att1") == 1 - } should be (true) + dbCursor.iterator().toList.foreach{ case obj: BasicDBObject => + obj.getInt("att1") should be (1) + } mongodbBatchWriter.saveWithPk(dbUpdateIterator) val dbCursor2 = dbCollection.find(MongoDBObject("att3" -> "holo")) - dbCursor2.iterator().toList.forall { case obj: BasicDBObject => - obj.getInt("att1") == 2 - } should be (true) + dbCursor2.iterator().toList.foreach { case obj: BasicDBObject => + obj.getInt("att1") should be (2) + } } } From 5c645a7b4aec2ae4695deaefa7c97a3cb1a055bd Mon Sep 17 00:00:00 2001 From: pmadrigal Date: Tue, 17 Nov 2015 08:40:20 +0100 Subject: [PATCH 4/4] changelog updated --- CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bbc052c..8580053 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,13 @@ # Changelog -## 0.9.1 (November 2015) +## 0.9.2 (November 2015) * Partial NOT filter working +## 0.9.1 (November 2015) + +* Refactor update _id on writer(_idField property not necessary) + ## 0.9.0 (October 2015) * Mapping config for client