Skip to content

Commit

Permalink
delete segment expect remaining numbe
Browse files Browse the repository at this point in the history
  • Loading branch information
W1thOut committed Aug 14, 2021
1 parent d8f7df9 commit 7fa6471
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val REGISTER = carbonKeyWord("REGISTER")
protected val PROPERTIES = carbonKeyWord("PROPERTIES")
protected val REFRESH = carbonKeyWord("REFRESH")
protected val EXPECT = carbonKeyWord("EXPECT")
protected val REMAIN_NUMBER = carbonKeyWord("REMAIN_NUMBER")

// For materialized view
// Keywords used in this parser
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command.management

import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, withEvents}

/**
* A command for delete by remaining number.
* In general, keep the latest segment.
*
* @param remaining expected remaining quantity after deletion
*/
case class CarbonDeleteLoadByRemainNumberCommand(
remaining: String,
databaseNameOp: Option[String],
tableName: String)
extends DataCommand {

override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
setAuditTable(carbonTable)
setAuditInfo(Map("remaining number" -> remaining))
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}

// if insert overwrite in progress, do not allow delete segment
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
}

val segments = CarbonStore.readSegments(carbonTable.getTablePath, showHistory = false, None)
if (segments.length == remaining.toInt) {
return Seq.empty
}

// Through the remaining number, get the delete id
val deleteSegmentIds = segments.filter(segment => segment.getSegmentStatus != SegmentStatus.MARKED_FOR_DELETE)
.sortBy(_.getLoadStartTime)
.map(_.getLoadName)
.reverse
.drop(remaining.toInt)

withEvents(DeleteSegmentByIdPreEvent(carbonTable, deleteSegmentIds, sparkSession),
DeleteSegmentByIdPostEvent(carbonTable, deleteSegmentIds, sparkSession)) {
CarbonStore.deleteLoadById(
deleteSegmentIds,
CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
tableName,
carbonTable
)
}
Seq.empty
}

override protected def opName: String = "DELETE SEGMENT BY REMAIN_NUMBER"
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
cacheManagement | insertStageData | indexCommands | mvCommands | describeCommands

protected lazy val segmentManagement: Parser[LogicalPlan] =
deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | addSegment |
showSegments
deleteSegmentByID | deleteSegmentByLoadDate | deleteSegmentByRemainNumber |
deleteStage | cleanFiles | addSegment | showSegments

protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn

Expand Down Expand Up @@ -508,6 +508,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
CarbonDeleteLoadByIdCommand(loadIds, dbName, tableName.toLowerCase())
}

protected lazy val deleteSegmentByRemainNumber: Parser[LogicalPlan] =
DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
(EXPECT ~> (SEGMENT ~ "." ~ REMAIN_NUMBER) ~> "=" ~> segmentId) <~
opt(";") ^^ {
case dbName ~ tableName ~ loadIds =>
CarbonDeleteLoadByRemainNumberCommand(loadIds, dbName, tableName.toLowerCase())
}

protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] =
DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
(WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.deletesegment

import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties

/**
* test class for testing the delete segment expect remaining number.
*/
class DeleteSegmentByRemainNumberTestCase extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
val DELETED_STATUS = "Marked for Delete"

val SUCCESSFUL_STATUS = "Success"

override def beforeAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
}

override def beforeEach(): Unit = {
initTestTable
}

test("delete segment, remain_number = 1") {
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
val rows = sql("show segments on deleteSegmentTable").collect()
assertResult(SUCCESSFUL_STATUS)(rows(0).get(1))
assertResult(DELETED_STATUS)(rows(1).get(1))
assertResult(DELETED_STATUS)(rows(2).get(1))
}

test("delete segment, remain nothing") {
sql("delete from table deleteSegmentTable expect segment.remain_number = 0")
val rows = sql("show segments on deleteSegmentTable").collect()
rows.foreach(row => assertResult(DELETED_STATUS)(row.get(1)))
}

test("delete segment, remain all") {
sql("delete from table deleteSegmentTable expect segment.remain_number = 3")
val rows = sql("show segments on deleteSegmentTable").collect()
rows.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1)))
}

test("delete segment, remain_number = -1") {
val ex = intercept[Exception] {
sql("delete from table deleteSegmentTable expect segment.remain_number = -1")
}
assert(ex.getMessage.contains("not found in database"))
}

test("delete segment after update") {
sql("update deleteSegmentTable d set (d.country) = ('fr') where d.country = 'aus'")
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
val rows = sql("select * from deleteSegmentTable").collect()
rows.foreach(row => assertResult("fr")(row(2)))
}

test("delete segment after delete newest segment by segmentId") {
sql("delete from table deleteSegmentTable where segment.id in (2)")
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
val rows = sql("show segments on deleteSegmentTable").collect()
assertResult(DELETED_STATUS)(rows(0).get(1))
assertResult(SUCCESSFUL_STATUS)(rows(1).get(1))
assertResult(DELETED_STATUS)(rows(2).get(1))
}

private def initTestTable = {
sql("drop table if exists deleteSegmentTable")
sql(
"CREATE table deleteSegmentTable (ID int, date String, country String, name " +
"String, phonetype String, serialname String, salary String) STORED AS carbondata"
)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
}
}

0 comments on commit 7fa6471

Please sign in to comment.