From 7fa6471fba5a2786c1c5101d8d26927a2a2006da Mon Sep 17 00:00:00 2001 From: W1thOut <860290239@qq,com> Date: Sat, 14 Aug 2021 22:53:06 +0800 Subject: [PATCH] delete segment expect remaining numbe --- .../sql/catalyst/CarbonDDLSqlParser.scala | 2 + ...arbonDeleteLoadByRemainNumberCommand.scala | 79 ++++++++++++++ .../sql/parser/CarbonSpark2SqlParser.scala | 12 ++- .../DeleteSegmentByRemainNumberTestCase.scala | 102 ++++++++++++++++++ 4 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByRemainNumberCommand.scala create mode 100644 integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/deletesegment/DeleteSegmentByRemainNumberTestCase.scala diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 29ce9fb6b38..95d9b3f7d11 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -180,6 +180,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val REGISTER = carbonKeyWord("REGISTER") protected val PROPERTIES = carbonKeyWord("PROPERTIES") protected val REFRESH = carbonKeyWord("REFRESH") + protected val EXPECT = carbonKeyWord("EXPECT") + protected val REMAIN_NUMBER = carbonKeyWord("REMAIN_NUMBER") // For materialized view // Keywords used in this parser diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByRemainNumberCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByRemainNumberCommand.scala new file mode 100644 index 00000000000..96fc8a5ee95 --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByRemainNumberCommand.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{Checker, DataCommand} +import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, withEvents} + +/** + * A command for delete by remaining number. + * In general, keep the latest segment. + * + * @param remaining expected remaining quantity after deletion + */ +case class CarbonDeleteLoadByRemainNumberCommand( + remaining: String, + databaseNameOp: Option[String], + tableName: String) + extends DataCommand { + + override def processData(sparkSession: SparkSession): Seq[Row] = { + Checker.validateTableExists(databaseNameOp, tableName, sparkSession) + val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + setAuditTable(carbonTable) + setAuditInfo(Map("remaining number" -> remaining)) + if (!carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") + } + + // if insert overwrite in progress, do not allow delete segment + if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) { + throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment") + } + + val segments = CarbonStore.readSegments(carbonTable.getTablePath, showHistory = false, None) + if (segments.length == remaining.toInt) { + return Seq.empty + } + + // Through the remaining number, get the delete id + val deleteSegmentIds = segments.filter(segment => segment.getSegmentStatus != SegmentStatus.MARKED_FOR_DELETE) + .sortBy(_.getLoadStartTime) + .map(_.getLoadName) + .reverse + .drop(remaining.toInt) + + withEvents(DeleteSegmentByIdPreEvent(carbonTable, deleteSegmentIds, sparkSession), + DeleteSegmentByIdPostEvent(carbonTable, deleteSegmentIds, sparkSession)) { + CarbonStore.deleteLoadById( + deleteSegmentIds, + CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), + tableName, + carbonTable + ) + } + Seq.empty + } + + override protected def opName: String = "DELETE SEGMENT BY REMAIN_NUMBER" +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 56edba0a181..34f6fa086ef 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -78,8 +78,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { cacheManagement | insertStageData | indexCommands | mvCommands | describeCommands protected lazy val segmentManagement: Parser[LogicalPlan] = - deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | addSegment | - showSegments + deleteSegmentByID | deleteSegmentByLoadDate | deleteSegmentByRemainNumber | + deleteStage | cleanFiles | addSegment | showSegments protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn @@ -508,6 +508,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonDeleteLoadByIdCommand(loadIds, dbName, tableName.toLowerCase()) } + protected lazy val deleteSegmentByRemainNumber: Parser[LogicalPlan] = + DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~ + (EXPECT ~> (SEGMENT ~ "." ~ REMAIN_NUMBER) ~> "=" ~> segmentId) <~ + opt(";") ^^ { + case dbName ~ tableName ~ loadIds => + CarbonDeleteLoadByRemainNumberCommand(loadIds, dbName, tableName.toLowerCase()) + } + protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] = DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~ (WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~ diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/deletesegment/DeleteSegmentByRemainNumberTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/deletesegment/DeleteSegmentByRemainNumberTestCase.scala new file mode 100644 index 00000000000..d59ee805233 --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/deletesegment/DeleteSegmentByRemainNumberTestCase.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.deletesegment + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +/** + * test class for testing the delete segment expect remaining number. + */ +class DeleteSegmentByRemainNumberTestCase extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { + val DELETED_STATUS = "Marked for Delete" + + val SUCCESSFUL_STATUS = "Success" + + override def beforeAll { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy") + } + + override def beforeEach(): Unit = { + initTestTable + } + + test("delete segment, remain_number = 1") { + sql("delete from table deleteSegmentTable expect segment.remain_number = 1") + val rows = sql("show segments on deleteSegmentTable").collect() + assertResult(SUCCESSFUL_STATUS)(rows(0).get(1)) + assertResult(DELETED_STATUS)(rows(1).get(1)) + assertResult(DELETED_STATUS)(rows(2).get(1)) + } + + test("delete segment, remain nothing") { + sql("delete from table deleteSegmentTable expect segment.remain_number = 0") + val rows = sql("show segments on deleteSegmentTable").collect() + rows.foreach(row => assertResult(DELETED_STATUS)(row.get(1))) + } + + test("delete segment, remain all") { + sql("delete from table deleteSegmentTable expect segment.remain_number = 3") + val rows = sql("show segments on deleteSegmentTable").collect() + rows.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1))) + } + + test("delete segment, remain_number = -1") { + val ex = intercept[Exception] { + sql("delete from table deleteSegmentTable expect segment.remain_number = -1") + } + assert(ex.getMessage.contains("not found in database")) + } + + test("delete segment after update") { + sql("update deleteSegmentTable d set (d.country) = ('fr') where d.country = 'aus'") + sql("delete from table deleteSegmentTable expect segment.remain_number = 1") + val rows = sql("select * from deleteSegmentTable").collect() + rows.foreach(row => assertResult("fr")(row(2))) + } + + test("delete segment after delete newest segment by segmentId") { + sql("delete from table deleteSegmentTable where segment.id in (2)") + sql("delete from table deleteSegmentTable expect segment.remain_number = 1") + val rows = sql("show segments on deleteSegmentTable").collect() + assertResult(DELETED_STATUS)(rows(0).get(1)) + assertResult(SUCCESSFUL_STATUS)(rows(1).get(1)) + assertResult(DELETED_STATUS)(rows(2).get(1)) + } + + private def initTestTable = { + sql("drop table if exists deleteSegmentTable") + sql( + "CREATE table deleteSegmentTable (ID int, date String, country String, name " + + "String, phonetype String, serialname String, salary String) STORED AS carbondata" + ) + sql( + s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv' + | INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv' + | INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv' + | INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + } +}