Skip to content

Commit

Permalink
[CARBONDATA-4211] Fix - from xx Insert into select fails if an SQL st…
Browse files Browse the repository at this point in the history
…atement contains multiple inserts

Why is this PR needed?
When multiple inserts with single query is used, it fails from SparkPlan with: java.lang.ClassCastException:
GenericInternalRow cannot be cast to UnsafeRow.
For every successful insert/load we return Segment ID as a row. For multiple inserts also, we are returning
a row containing Segment ID but while processing in spark ClassCastException is thrown.

What changes were proposed in this PR?
When multiple insert query is given, it has Union node in the plan. Based on its presence, made changes
to use flag isMultipleInserts to call class UnionCommandExec and implemented custom sideEffectResult which
converts GenericInternalRow to UnsafeRow and return.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #4151
  • Loading branch information
ShreelekhyaG authored and akashrn5 committed Jun 19, 2021
1 parent fdd00ab commit d8f7df9
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ case class InsertIntoCarbonTable (table: CarbonDatasourceHadoopRelation,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean,
ifNotExists: Boolean)
ifNotExists: Boolean,
containsMultipleInserts: Boolean)
extends Command {

override def output: Seq[Attribute] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import java.util.Locale
import scala.collection.mutable

import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonCountStar, CarbonDatasourceHadoopRelation, CarbonToSparkAdapter, CountStarPlan, InsertIntoCarbonTable, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, AttributeReference, Cast, Descending, Expression, IntegerLiteral, Literal, NamedExpression, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, IntegerLiteral, Literal, NamedExpression, ScalaUDF, SortOrder, UnsafeProjection}
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, PhysicalOperation}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftSemi}
import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, Limit, LogicalPlan, Project, ReturnAnswer, Sort}
import org.apache.spark.sql.execution.{CarbonTakeOrderedAndProjectExec, FilterExec, PlanLater, ProjectExec, SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec, LoadDataCommand}
import org.apache.spark.sql.execution.{CarbonTakeOrderedAndProjectExec, FilterExec, LeafExecNode, PlanLater, ProjectExec, SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec, LoadDataCommand, RunnableCommand}
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.execution.joins.{BroadCastPolygonFilterPushJoin, BuildLeft, BuildRight}
import org.apache.spark.sql.execution.strategy.CarbonPlanHelper.isCarbonTable
Expand All @@ -57,7 +58,16 @@ object DMLStrategy extends SparkStrategy {
case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
case insert: InsertIntoCarbonTable =>
ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
if (insert.containsMultipleInserts) {
// Successful insert in carbon will return segment ID in a row.
// In-case of this specific multiple inserts scenario the Union node executes in the
// physical plan phase of the command, so the rows should be of unsafe row object.
// So we should override the sideEffectResult to prepare the content of command's
// corresponding rdd from physical plan of insert into command.
UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
} else {
ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
}
case insert: InsertIntoHadoopFsRelationCommand
if insert.catalogTable.isDefined && isCarbonTable(insert.catalogTable.get.identifier) =>
DataWritingCommandExec(DMLHelper.insertInto(insert), planLater(insert.query)) :: Nil
Expand Down Expand Up @@ -365,3 +375,26 @@ object DMLStrategy extends SparkStrategy {
}
}

/**
* This class will be used when Union node is present in plan with multiple inserts.
* It is a physical operator that executes the run method of a RunnableCommand and
* saves the result to prevent multiple executions.
*/
case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode {

protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
val internalRow = cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
// To make GenericInternalRow to UnsafeRow
val row = unsafeProjection(internalRow.head)
Seq(row)
}

override def output: Seq[Attribute] = cmd.output

protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,15 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi

case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
castChildOutput(p, relation, child)
// when plan contains Union, it can have multiple insert statements as its children
castChildOutput(p, relation, child, plan.isInstanceOf[Union])
}
}

def castChildOutput(p: InsertIntoTable,
relation: LogicalRelation,
child: LogicalPlan): LogicalPlan = {
child: LogicalPlan,
containsMultipleInserts: Boolean): LogicalPlan = {
val carbonDSRelation = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
if (carbonDSRelation.carbonRelation.output.size > CarbonCommonConstants
.DEFAULT_MAX_NUMBER_OF_COLUMNS) {
Expand Down Expand Up @@ -300,7 +302,8 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi

val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)

InsertIntoCarbonTable(carbonDSRelation, p.partition, newChild, overwrite, true)
InsertIntoCarbonTable(carbonDSRelation, p.partition, newChild, overwrite, true,
containsMultipleInserts)
} else {
CarbonException.analysisException(
"Cannot insert into target table because number of columns mismatch")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,27 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
}

test("test load with multiple inserts") {
sql("drop table if exists catalog_returns_5")
sql("drop table if exists catalog_returns_6")
sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," +
"cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'")
sql("insert into catalog_returns_5 values(1,2,3)")
sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" +
" (cr_returned_date_sk int) stored as carbondata")
val df = sql(
"from catalog_returns_5 insert overwrite table catalog_returns_6 partition " +
"(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " +
"cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " +
"catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " +
"cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk")
assert(df.collect().size == 2)
checkAnswer(df, Seq(Row("0"), Row("1")))
checkAnswer(sql("select * from catalog_returns_6"), Seq(Row(2, 3, 1)))
sql("drop table if exists catalog_returns_5")
sql("drop table if exists catalog_returns_6")
}

override def afterEach {
sql("DROP TABLE if exists loadtest")
sql("drop table if exists invalidMeasures")
Expand Down

0 comments on commit d8f7df9

Please sign in to comment.