diff --git a/core/src/main/java/org/apache/carbondata/core/catalog/BaseCatalog.java b/core/src/main/java/org/apache/carbondata/core/catalog/BaseCatalog.java new file mode 100644 index 00000000000..e1eb8027421 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/catalog/BaseCatalog.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.catalog; + +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.format.TableInfo; + +public interface BaseCatalog { + + long getLastSchemaModificationTime(String schemaFilePath); + + long saveSchema(AbsoluteTableIdentifier identifier, TableInfo thriftTableInfo); + + TableInfo getSchema(String schemaFilePath); + +} diff --git a/core/src/main/java/org/apache/carbondata/core/catalog/CatalogFactory.java b/core/src/main/java/org/apache/carbondata/core/catalog/CatalogFactory.java new file mode 100644 index 00000000000..c91b7b07b8b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/catalog/CatalogFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.catalog; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; + +import org.apache.log4j.Logger; + +public class CatalogFactory { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CatalogFactory.class.getName()); + + private static final CatalogFactory INSTANCE = new CatalogFactory(); + + private Object catalog; + + private CatalogFactory() { + String catalogClass = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_CATALOG_IMPL, + CarbonCommonConstants.CARBON_CATALOG_IMPL_DEFAULT); + try { + catalog = Class.forName(catalogClass).newInstance(); + } catch (Exception e) { + LOGGER.error("Error while loading class: " + catalogClass); + } + } + + public static CatalogFactory getInstance() { + return INSTANCE; + } + + public BaseCatalog getCatalog() { + return ((BaseCatalog) catalog); + } + + public T getCatalog(Class type) { + return type.cast(catalog); + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index fc53960a2df..fee5d181833 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -2876,4 +2876,10 @@ private CarbonCommonConstants() { public static final String CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT = "true"; + @CarbonProperty + public static final String CARBON_CATALOG_IMPL = "carbon.catalog.impl"; + + @CarbonProperty public static final String CARBON_CATALOG_IMPL_DEFAULT = + "org.apache.spark.sql.catalog.CarbonCatalogImpl"; + } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java index f8a1d0bcaf8..a7f1d88b029 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java @@ -61,8 +61,7 @@ org.apache.carbondata.format.TableSchema fromWrapperToExternalTableSchema( * @param tableName * @return */ - org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo, - String dbName, String tableName); + org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo); /** * @param externalSchemaEvolutionEntry diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java index 255fa6fe2f2..7495f026a51 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java @@ -306,7 +306,7 @@ private org.apache.carbondata.format.BucketingInfo fromWrapperToExternalBucketin */ @Override public org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo( - TableInfo wrapperTableInfo, String dbName, String tableName) { + TableInfo wrapperTableInfo) { org.apache.carbondata.format.TableSchema thriftFactTable = fromWrapperToExternalTableSchema(wrapperTableInfo.getFactTable()); return new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<>()); diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 0f71c360d39..5ecee6729cb 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -50,6 +50,7 @@ import java.util.UUID; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.catalog.CatalogFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.block.AbstractIndex; @@ -101,7 +102,6 @@ import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.reader.CarbonHeaderReader; import org.apache.carbondata.core.reader.CarbonIndexFileReader; -import org.apache.carbondata.core.reader.ThriftReader; import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator; import org.apache.carbondata.core.scan.model.ProjectionDimension; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; @@ -2015,13 +2015,7 @@ public static org.apache.carbondata.format.TableInfo readSchemaFile(String schem public static org.apache.carbondata.format.TableInfo readSchemaFile(String schemaFilePath, Configuration conf) throws IOException { - TBaseCreator createTBase = org.apache.carbondata.format.TableInfo::new; - ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, conf); - thriftReader.open(); - org.apache.carbondata.format.TableInfo tableInfo = - (org.apache.carbondata.format.TableInfo) thriftReader.read(); - thriftReader.close(); - return tableInfo; + return CatalogFactory.getInstance().getCatalog().getSchema(schemaFilePath); } public static ColumnSchema thriftColumnSchemaToWrapperColumnSchema( diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java index 236c3fc9763..f24d7484ed5 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java @@ -1425,7 +1425,7 @@ public List getSchema_evoluti org.apache.carbondata.format.TableSchema thriftFactTable = new org.apache.carbondata.format.TableSchema("tableId", thriftColumnSchemas, schemaEvol); org.apache.carbondata.format.TableInfo actualResult = thriftWrapperSchemaConverter - .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName); + .fromWrapperToExternalTableInfo(wrapperTableInfo); org.apache.carbondata.format.TableInfo expectedResult = new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList()); diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java index 24105799f83..6d36dfae084 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java @@ -93,8 +93,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) { CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo); ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl(); org.apache.carbondata.format.TableInfo thriftTable = schemaConverter - .fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(), - carbonTable.getTableName()); + .fromWrapperToExternalTableInfo(carbonTable.getTableInfo()); assertTrue(null != thriftTable); } @@ -135,8 +134,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) { CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo); ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl(); org.apache.carbondata.format.TableInfo thriftTable = schemaConverter - .fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(), - carbonTable.getTableName()); + .fromWrapperToExternalTableInfo(carbonTable.getTableInfo()); assertTrue(null != thriftTable); } diff --git a/examples/spark/pom.xml b/examples/spark/pom.xml index 64f0d9f20c4..a8787273b3c 100644 --- a/examples/spark/pom.xml +++ b/examples/spark/pom.xml @@ -198,15 +198,6 @@ true - - spark-2.3 - - true - - - 2.3 - - spark-2.4 @@ -215,6 +206,9 @@ spark-3.1 + + true + 3.1 2.10.0 diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java index db4f30c7128..ef4a05e9746 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java @@ -228,10 +228,7 @@ public CarbonTable createTable( SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); org.apache.carbondata.format.TableInfo thriftTableInfo = - schemaConverter.fromWrapperToExternalTableInfo( - tableInfo, - tableInfo.getDatabaseName(), - tableInfo.getFactTable().getTableName()); + schemaConverter.fromWrapperToExternalTableInfo(tableInfo); org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry = new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime()); thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history() diff --git a/index/examples/pom.xml b/index/examples/pom.xml index bccbae1bc39..c7b2a17390c 100644 --- a/index/examples/pom.xml +++ b/index/examples/pom.xml @@ -79,12 +79,6 @@ - - spark-2.3 - - 2.3 - - spark-2.4 diff --git a/index/secondary-index/pom.xml b/index/secondary-index/pom.xml index b1a2414ffc3..20820a37756 100644 --- a/index/secondary-index/pom.xml +++ b/index/secondary-index/pom.xml @@ -156,15 +156,6 @@ true - - spark-2.3 - - true - - - 2.3 - - spark-2.4 @@ -173,6 +164,9 @@ spark-3.1 + + true + 3.1 diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml index 5eca6fbbdc0..41f5bd07580 100644 --- a/integration/flink/pom.xml +++ b/integration/flink/pom.xml @@ -218,29 +218,6 @@ - - spark-2.3 - - true - - - 2.3 - - - - org.apache.carbondata - carbondata-spark_${spark.binary.version} - ${project.version} - test - - - org.apache.hive - hive-exec - - - - - spark-2.4 @@ -263,6 +240,9 @@ spark-3.1 + + true + 3.1 diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java index 19720347d96..43cf1694158 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java @@ -255,8 +255,7 @@ private static void writeSchemaFile(TableInfo tableInfo) throws IOException { ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false); thriftWriter.open(FileWriteOperation.OVERWRITE); thriftWriter.write(schemaConverter - .fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(), - tableInfo.getFactTable().getTableName())); + .fromWrapperToExternalTableInfo(tableInfo)); thriftWriter.close(); } diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java index 19464068a93..c5450f079d1 100755 --- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -21,16 +21,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.catalog.BaseCatalog; +import org.apache.carbondata.core.catalog.CatalogFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -40,7 +40,6 @@ import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.index.IndexType; @@ -48,30 +47,23 @@ import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.reader.ThriftReader; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.statusmanager.FileFormat; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.api.CarbonInputFormat; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; -import org.apache.carbondata.presto.PrestoFilterUtil; import com.google.gson.Gson; import com.google.inject.Inject; -import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.spi.connector.SchemaTableName; -import io.prestosql.spi.predicate.TupleDomain; -import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.Logger; -import org.apache.thrift.TBase; import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY; import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; @@ -192,20 +184,7 @@ private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String org.apache.carbondata.format.TableInfo tableInfo; long modifiedTime = System.currentTimeMillis(); if (isTransactionalTable) { - //Step 2: read the metadata (tableInfo) of the table. - ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() { - // TBase is used to read and write thrift objects. - // TableInfo is a kind of TBase used to read and write table information. - // TableInfo is generated by thrift, - // see schema.thrift under format/src/main/thrift for details. - public TBase create() { - return new org.apache.carbondata.format.TableInfo(); - } - }; - ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config); - thriftReader.open(); - tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read(); - thriftReader.close(); + tableInfo = CatalogFactory.getInstance().getCatalog().getSchema(schemaFilePath); modifiedTime = schemaFile.getLastModifiedTime(); } else { tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config); diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala index 75ba584ca0f..5cafb6fe47e 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala @@ -153,10 +153,7 @@ object CarbonDataStoreCreator { val schemaConverter: SchemaConverter = new ThriftWrapperSchemaConverterImpl() val thriftTableInfo: TableInfo = - schemaConverter.fromWrapperToExternalTableInfo( - tableInfo, - tableInfo.getDatabaseName, - tableInfo.getFactTable.getTableName) + schemaConverter.fromWrapperToExternalTableInfo(tableInfo) val schemaEvolutionEntry: SchemaEvolutionEntry = new org.apache.carbondata.format.SchemaEvolutionEntry( tableInfo.getLastUpdatedTime) diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml index 6d278fcec12..6b7b4a98e91 100644 --- a/integration/spark/pom.xml +++ b/integration/spark/pom.xml @@ -606,50 +606,6 @@ true - - spark-2.3 - - true - - - 2.3 - - - - - org.apache.maven.plugins - maven-compiler-plugin - - - src/main/spark3.1 - src/main/spark2.4 - src/main/common2.4and3.1 - - - - - org.codehaus.mojo - build-helper-maven-plugin - 3.0.0 - - - add-source - generate-sources - - add-source - - - - src/main/spark2.3 - src/main/common2.3and2.4 - - - - - - - - spark-2.4 @@ -693,6 +649,9 @@ spark-3.1 + + true + 3.1 diff --git a/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/catalog/CarbonCatalog.scala b/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/catalog/CarbonCatalog.scala new file mode 100644 index 00000000000..84f45333508 --- /dev/null +++ b/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/catalog/CarbonCatalog.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} + +import org.apache.carbondata.core.catalog.BaseCatalog + +trait CarbonCatalog extends BaseCatalog { + + def createTable(tableDefinition: CatalogTable, + ignoreIfExists: Boolean, + validateLocation: Boolean = true)(sparkSession: SparkSession): Unit + + def tableExists(name: TableIdentifier)(sparkSession: SparkSession): Boolean + + def getDatabaseMetadata(name: String)(sparkSession: SparkSession): CatalogDatabase + + def getTableMetadata(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): CatalogTable + + def listDatabases()(sparkSession: SparkSession): Seq[String] + + def listTables(dbName: String)(sparkSession: SparkSession): Seq[TableIdentifier] + + def dropTable(name: TableIdentifier, + ignoreIfNotExists: Boolean, + purge: Boolean)(sparkSession: SparkSession): Unit + + /** + * This method will write the schema thrift file in carbon store and load table metadata + */ + + + // def alterTable(wrapperTableInfo: TableInfo)(sparkSession: SparkSession): Unit +} diff --git a/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/catalog/CarbonCatalogImpl.scala b/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/catalog/CarbonCatalogImpl.scala new file mode 100644 index 00000000000..7c8b548cc9f --- /dev/null +++ b/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/catalog/CarbonCatalogImpl.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog + +import java.io.{FileNotFoundException, IOException} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} + +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.fileoperations.FileWriteOperation +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.reader.ThriftReader +import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.writer.ThriftWriter +import org.apache.carbondata.format.{TableInfo => ExternalTableInfo} + +private[catalog] object CarbonCatalogImpl extends CarbonCatalog { + + def createTable(tableDefinition: CatalogTable, + ignoreIfExists: Boolean, + validateLocation: Boolean = true)(sparkSession: SparkSession): Unit = { + sparkSession.sessionState.catalog.createTable(tableDefinition, ignoreIfExists, validateLocation) + } + + def tableExists(name: TableIdentifier)(sparkSession: SparkSession): Boolean = { + sparkSession.sessionState.catalog.tableExists(name) + } + + override def getDatabaseMetadata(name: String)(sparkSession: SparkSession): CatalogDatabase = { + sparkSession.sessionState.catalog.getDatabaseMetadata(name) + } + + override def listDatabases()(sparkSession: SparkSession): Seq[String] = { + sparkSession.sessionState.catalog.listDatabases() + } + + override def listTables(dbName: String)(sparkSession: SparkSession): Seq[TableIdentifier] = { + sparkSession.sessionState.catalog.listTables(dbName) + } + + override def getTableMetadata(tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): CatalogTable = { + sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) + } + + override def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean, + purge: Boolean)(sparkSession: SparkSession): Unit = { + sparkSession.sessionState.catalog.dropTable(name, ignoreIfNotExists = true, purge = false) + } + + override def getSchema(schemaFilePath: String): ExternalTableInfo = { + if (FileFactory.isFileExist(schemaFilePath)) { + val createTBase: ThriftReader.TBaseCreator = new TBaseCreator { + override def create() = new ExternalTableInfo() + } + val thriftReader = new ThriftReader(schemaFilePath, + createTBase, + FileFactory.getConfiguration) + thriftReader.open() + val tableInfo = thriftReader.read.asInstanceOf[ExternalTableInfo] + thriftReader.close() + tableInfo + } else { + throw new FileNotFoundException("Schema file does not exist: " + schemaFilePath) + } + } + + override def getLastSchemaModificationTime(schemaFilePath: String): Long = { + val schemaFile = FileFactory.getCarbonFile(schemaFilePath) + if (schemaFile.exists()) { + schemaFile.getLastModifiedTime + } else { + -1L + } + } + + /** + * This method will write the schema thrift file in carbon store and load table metadata. + */ + def saveSchema(identifier: AbsoluteTableIdentifier, thriftTableInfo: ExternalTableInfo): Long = { + val schemaMetadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath) + if (!FileFactory.isFileExist(schemaMetadataPath)) { + val isDirCreated = FileFactory + .mkdirs(schemaMetadataPath, FileFactory.getConfiguration) + if (!isDirCreated) { + throw new IOException(s"Failed to create the metadata directory $schemaMetadataPath") + } + } + val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) + val thriftWriter = new ThriftWriter(schemaFilePath, false) + thriftWriter.open(FileWriteOperation.OVERWRITE) + thriftWriter.write(thriftTableInfo) + thriftWriter.close() + val modifiedTime = System.currentTimeMillis() + FileFactory.getCarbonFile(schemaFilePath).setLastModifiedTime(modifiedTime) + modifiedTime + } + + // def alterTable(wrapperTableInfo: TableInfo)(sparkSession: SparkSession): Unit = { + // val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") + // val hiveClient = sparkSession + // .sessionState + // .catalog + // .externalCatalog.asInstanceOf[HiveExternalCatalog] + // .client + // hiveClient.runSqlHive(s"ALTER TABLE " + + // s"`${wrapperTableInfo.getDatabaseName}`.`${wrapperTableInfo.getFactTable.getTableName}` + // " + + // s"SET SERDEPROPERTIES($schemaParts)") + // } +} diff --git a/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala b/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala index c1a7d7f2e64..bea0cd908a2 100644 --- a/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala +++ b/integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala @@ -19,11 +19,13 @@ package org.apache.spark.sql.execution import org.apache.log4j.Logger import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalog.CarbonCatalog +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.util.CreateTableCommonUtil.getCatalogTable import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean) extends RunnableCommand { @@ -34,7 +36,8 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo assert(table.tableType != CatalogTableType.VIEW) assert(table.provider.isDefined) val sessionState = sparkSession.sessionState - if (sessionState.catalog.tableExists(table.identifier)) { + if(CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).tableExists(table.identifier)(sparkSession)) { if (ignoreIfExists) { return Seq.empty[Row] } else { @@ -42,10 +45,12 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } } val newTable: CatalogTable = getCatalogTable(sparkSession, sessionState, table, LOGGER) - // We will return Nil or throw exception at the beginning if the table already exists, so when // we reach here, the table should not exist and we should set `ignoreIfExists` to false. - sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) + CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]) + .createTable(newTable, ignoreIfExists = false, + validateLocation = false)(sparkSession) Seq.empty[Row] } } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala index 1850b9a3972..f14fa512828 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala @@ -22,7 +22,9 @@ import java.util import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.view.{MVCatalog, MVCatalogFactory, MVManager, MVSchema, MVStatus} @@ -31,7 +33,8 @@ class MVManagerInSpark(session: SparkSession) extends MVManager { override def getDatabases: util.List[String] = { CarbonThreadUtil.threadSet(CarbonCommonConstants.CARBON_ENABLE_MV, "true") try { - session.sessionState.catalog.listDatabases().asJava + CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).listDatabases()(session).asJava } finally { CarbonThreadUtil.threadUnset(CarbonCommonConstants.CARBON_ENABLE_MV) } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index b73772f0975..9cfbca340a1 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -20,10 +20,9 @@ package org.apache.spark.sql import java.util.concurrent.ConcurrentHashMap import org.apache.hadoop.fs.Path -import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} -import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener} import org.apache.spark.sql.execution.command.CreateFunctionCommand import org.apache.spark.sql.execution.command.mutation.merge.udf.BlockPathsUDF @@ -34,6 +33,7 @@ import org.apache.spark.sql.secondaryindex.events._ import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} @@ -342,8 +342,8 @@ object CarbonEnv { */ def getDatabaseLocation(dbName: String, sparkSession: SparkSession): String = { var databaseLocation = - sparkSession.sessionState.catalog.asInstanceOf[SessionCatalog].getDatabaseMetadata(dbName) - .locationUri.toString + CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .getDatabaseMetadata(dbName)(sparkSession).locationUri.toString // for default database and db ends with .db // check whether the carbon store and hive store is same or different. if ((!EnvHelper.isLegacy(sparkSession)) && diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala index 0fd07425486..b58b8f7975b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.command.cache import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -30,6 +30,7 @@ import org.apache.spark.sql.types.StringType import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.CacheProvider +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.index.IndexStoreManager import org.apache.carbondata.core.indexstore.BlockletIndexWrapper import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexFactory @@ -158,7 +159,8 @@ case class CarbonShowCacheCommand(showExecutorCache: Boolean, } } var carbonTables = mutable.ArrayBuffer[CarbonTable]() - sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { + CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).listTables(currentDatabase)(sparkSession).foreach { tableIdent => try { val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala index 6bcea43b815..6e7b64414ef 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala @@ -20,12 +20,14 @@ package org.apache.spark.sql.execution.command.index import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.DataCommand import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.index.CarbonIndexUtil import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.metadata.index.IndexType import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} @@ -51,7 +53,8 @@ extends DataCommand { triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments, sparkSession) } else { // repairing si for all index tables in the mentioned database in the repair command - sparkSession.sessionState.catalog.listTables(dbName).foreach { + CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).listTables(dbName)(sparkSession).foreach { tableIdent => triggerRepair(tableIdent.table, dbName, indexnameOp, segments, sparkSession) } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index 160908d6177..802b1c775be 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql._ +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.CarbonParserUtil.initializeSpatialIndexInstance import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException @@ -30,6 +31,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.util.{AlterTableUtil, SparkUtil} import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec @@ -96,7 +98,8 @@ case class RefreshCarbonTableCommand( initializeSpatialIndexInstance(tableProperties.get(SPATIAL_INDEX_CLASS), indexName, tableProperties.asScala) val tableIdentifier = new TableIdentifier(tableName, Some(tableInfo.getDatabaseName)) - if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { + if (CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).tableExists(tableIdentifier)(sparkSession)) { // In direct upgrade scenario, if spatial table already exists then on refresh command, // update the property in metadata and fail table creation. LOGGER.info(s"Updating $SPATIAL_INDEX_INSTANCE table property on $tableName") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 6aea625871f..cd561b02d96 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -89,7 +89,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( newCols = newCols.filter(x => !x.isComplexColumn) schemaEvolutionEntry.setAdded(newCols.toList.asJava) val thriftTable = schemaConverter - .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + .fromWrapperToExternalTableInfo(wrapperTableInfo) // carbon columns based on schema order val carbonColumns = carbonTable.getCreateOrderColumn().asScala .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index a8995725ae2..7bdf1750ac3 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution.command.schema import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand} import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil, MockClassForAlterRevertTests} import org.apache.spark.sql.index.CarbonIndexUtil @@ -27,6 +27,7 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.index.IndexStoreManager @@ -58,7 +59,8 @@ private[sql] case class CarbonAlterTableRenameCommand( if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) { throw new MalformedCarbonCommandException("Database name should be same for both tables") } - val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table) + val tableExists = CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).tableExists(newTableIdentifier)(sparkSession) if (tableExists) { throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " + s"already exists") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala index 934812b4435..ddc108dd80c 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala @@ -19,12 +19,14 @@ package org.apache.spark.sql.execution.command.table import org.apache.hadoop.hive.metastore.api.AlreadyExistsException import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSource, Row, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, DropTableCommand, MetadataCommand} +import org.apache.spark.sql.execution.command.{DropTableCommand, MetadataCommand} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory /** * this command wrap schema generation and CreateDataSourceTableCommand @@ -41,7 +43,8 @@ case class CarbonCreateDataSourceTableCommand( assert(table.tableType != CatalogTableType.VIEW) assert(table.provider.isDefined) val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - if (sparkSession.sessionState.catalog.tableExists(table.identifier)) { + if (CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).tableExists(table.identifier)(sparkSession)) { if (ignoreIfExists) { return Seq.empty[Row] } else { diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala index 505c4810dac..523f91b65fe 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.util.control.Breaks.{break, breakable} import org.apache.spark.sql.{CarbonEnv, EnvHelper, Row, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -35,6 +36,7 @@ import org.apache.spark.sql.types.{ArrayType, MapType, MetadataBuilder, StringTy import org.apache.carbondata.common.Strings import org.apache.carbondata.common.exceptions.DeprecatedFeatureException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.PartitionInfo @@ -119,10 +121,8 @@ private[sql] case class CarbonDescribeFormattedCommand( } else { "false" } - - val catalog = sparkSession.sessionState.catalog - val catalogTable = catalog.getTableMetadata(tblIdentifier) - + val catalogTable = CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).getTableMetadata(tblIdentifier)(sparkSession) val pageSizeInMb: String = if (tblProps.get(CarbonCommonConstants.TABLE_PAGE_SIZE_INMB) .isDefined) { tblProps(CarbonCommonConstants.TABLE_PAGE_SIZE_INMB) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala index 8c7e8251fa6..70a30281c7c 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.strategy import org.apache.commons.lang3.StringUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, CustomDeterministicExpression, InsertIntoCarbonTable, SparkSession, SQLContext} +import org.apache.spark.sql._ import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression, Rand} @@ -35,6 +36,7 @@ import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil} @@ -145,7 +147,8 @@ object CarbonPlanHelper { def isTableExists(tableIdent: TableIdentifier, sparkSession: SparkSession): Boolean = { val dbOption = tableIdent.database.map(_.toLowerCase) val tableIdentifier = TableIdentifier(tableIdent.table.toLowerCase(), dbOption) - sparkSession.sessionState.catalog.tableExists(tableIdentifier) + CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).tableExists(tableIdentifier)(sparkSession) } /** diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index f4f8c793175..c8637d6c1cf 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -18,13 +18,14 @@ package org.apache.spark.sql.execution.strategy import org.apache.spark.sql._ +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.index.{DropIndexCommand, ShowIndexesCommand} -import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand} +import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand import org.apache.spark.sql.execution.command.mutation.CarbonTruncateCommand import org.apache.spark.sql.execution.command.schema._ import org.apache.spark.sql.execution.command.table.{CarbonCreateTableLikeCommand, CarbonDropTableCommand, CarbonShowCreateTableCommand, CarbonShowTablesCommand} @@ -36,6 +37,7 @@ import org.apache.spark.sql.secondaryindex.command.CarbonCreateSecondaryIndexCom import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory /** * Carbon strategies for ddl commands @@ -200,7 +202,7 @@ object DDLStrategy extends SparkStrategy { indexModel, tableProperties, ifNotExists, isDeferredRefresh, isCreateSIndex)) :: Nil } else { sys.error(s"Operation not allowed because either table " + - s"${indexModel.tableName} doesn't exist or not a carbon table.") + s"${ indexModel.tableName } doesn't exist or not a carbon table.") } case showIndex@ShowIndexesCommand(_, _) => try { @@ -211,7 +213,8 @@ object DDLStrategy extends SparkStrategy { } case dropIndex@DropIndexCommand(ifExistsSet, databaseNameOp, parentTableName, tableName, _) => val tableIdentifier = TableIdentifier(parentTableName, databaseNameOp) - val isParentTableExists = sparkSession.sessionState.catalog.tableExists(tableIdentifier) + val isParentTableExists = CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .tableExists(tableIdentifier)(sparkSession) if (!isParentTableExists) { if (!ifExistsSet) { sys.error(s"Table $tableIdentifier does not exist") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 418640512b8..8ecdbd20b4a 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive -import java.io.IOException import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -25,11 +24,13 @@ import scala.collection.JavaConverters._ import net.jodah.expiringmap.{ExpirationPolicy, ExpiringMap} import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSource, EnvHelper, SparkSession} import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonFileMetastore.updateTableSchemaModifiedTime import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType @@ -37,29 +38,28 @@ import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.fileoperations.FileWriteOperation import org.apache.carbondata.core.index.IndexStoreManager -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema -import org.apache.carbondata.core.metadata.schema.SchemaReader -import org.apache.carbondata.core.metadata.schema.table +import org.apache.carbondata.core.metadata.schema.{table, SchemaReader} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.events.{CreateCarbonRelationPostEvent, LookupRelationPostEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} object MatchLogicalRelation { def unapply( logicalPlan: LogicalPlan - ): Option[(BaseRelation, Any, Option[CatalogTable])] = logicalPlan match { - case l: LogicalRelation => Some(l.relation, l.output, l.catalogTable) - case _ => None + ): Option[(BaseRelation, Any, Option[CatalogTable])] = { + logicalPlan match { + case l: LogicalRelation => Some(l.relation, l.output, l.catalogTable) + case _ => None + } } } @@ -72,50 +72,49 @@ private object CarbonFileMetastore { def checkIfRefreshIsNeeded(absoluteTableIdentifier: AbsoluteTableIdentifier, localTimeStamp: Long): Boolean = { val schemaFilePath = CarbonTablePath.getSchemaFilePath(absoluteTableIdentifier.getTablePath) - val schemaCarbonFile = FileFactory.getCarbonFile(schemaFilePath) - if (schemaCarbonFile.exists()) { - val oldTime = Option(CarbonFileMetastore.getTableModifiedTime(absoluteTableIdentifier - .getCarbonTableIdentifier - .getTableId)) - val newTime = schemaCarbonFile.getLastModifiedTime - val isSchemaModified = oldTime match { - case Some(cacheTime) => - cacheTime != newTime - case None => true - } - if (isSchemaModified) { - if (CarbonMetadata.getInstance() - .getCarbonTable(absoluteTableIdentifier - .getCarbonTableIdentifier - .getTableUniqueName) != null) { - synchronized { - if (CarbonMetadata.getInstance() - .getCarbonTable(absoluteTableIdentifier - .getCarbonTableIdentifier - .getTableUniqueName) != null) { - CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier - .getCarbonTableIdentifier.getTableUniqueName) - IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier) - } + val oldTime = Option(getTableModifiedTime(absoluteTableIdentifier + .getCarbonTableIdentifier + .getTableId)) + val newTime = CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).getLastSchemaModificationTime(schemaFilePath) + if (newTime == -1L) { + return true + } + val isSchemaModified = oldTime match { + case Some(cacheTime) => + cacheTime != newTime + case None => true + } + if (isSchemaModified) { + if (CarbonMetadata.getInstance() + .getCarbonTable(absoluteTableIdentifier + .getCarbonTableIdentifier + .getTableUniqueName) != null) { + synchronized { + if (CarbonMetadata.getInstance() + .getCarbonTable(absoluteTableIdentifier + .getCarbonTableIdentifier + .getTableUniqueName) != null) { + CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier + .getCarbonTableIdentifier.getTableUniqueName) + IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier) } } - true - } else { - localTimeStamp != newTime } - } else { true + } else { + localTimeStamp != newTime } } - def updateTableSchemaModifiedTime(tableUniqueId: String, timeStamp: Long): Unit = { - tableModifiedTimeStore.put(tableUniqueId, timeStamp) - } - def getTableModifiedTime(tableUniqueId: String): java.lang.Long = { tableModifiedTimeStore.get(tableUniqueId) } + def updateTableSchemaModifiedTime(tableUniqueId: String, timeStamp: Long): Unit = { + tableModifiedTimeStore.put(tableUniqueId, timeStamp) + } + def removeStaleEntries(invalidTableUniqueIds: List[String]) { for (invalidKey <- invalidTableUniqueIds) { tableModifiedTimeStore.remove(invalidKey) @@ -175,15 +174,83 @@ class CarbonFileMetastore extends CarbonMetaStore { relation } + private def readCarbonSchema(identifier: AbsoluteTableIdentifier, + parameters: Map[String, String], inferSchema: Boolean): Option[CarbonTable] = { + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val dbName = identifier.getCarbonTableIdentifier.getDatabaseName + val tableName = identifier.getCarbonTableIdentifier.getTableName + val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName) + val tablePath = identifier.getTablePath + var schemaRefreshTime = System.currentTimeMillis() + val wrapperTableInfo = + if (inferSchema) { + val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName) + val tblInfoFromCache = if (carbonTbl != null) { + carbonTbl.getTableInfo + } else { + CarbonUtil.convertGsonToTableInfo(parameters.asJava) + } + + val thriftTableInfo: TableInfo = if (tblInfoFromCache != null) { + // In case the TableInfo is present in the Carbon Metadata Cache + // then get the table info from the cache rather than inferring from + // the CarbonData file. + schemaConverter + .fromWrapperToExternalTableInfo(tblInfoFromCache) + } else { + schemaConverter + .fromWrapperToExternalTableInfo(SchemaReader + .inferSchema(identifier, false)) + } + val wrapperTableInfo = + schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath) + wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true") + wrapperTableInfo.setTransactionalTable(false) + Some(wrapperTableInfo) + } else { + val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) + schemaRefreshTime = CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]). + getLastSchemaModificationTime(tableMetadataFile) + val tableInfo = CatalogFactory.getInstance() + .getCatalog(classOf[CarbonCatalog]).getSchema(tableMetadataFile) + val wrapperTableInfo = + schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) + Some(wrapperTableInfo) + } + wrapperTableInfo.map { tableInfo => + updateSchemasUpdatedTime(tableInfo.getFactTable.getTableId, schemaRefreshTime) + CarbonMetadata.getInstance().removeTable(tableUniqueName) + CarbonMetadata.getInstance().loadTableMetadata(tableInfo) + CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + } + } + /** - * This method will overwrite the existing schema and update it with the given details. + * This method will put the updated timestamp of schema file in the table modified time store map */ - def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, - oldTableIdentifier: CarbonTableIdentifier, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - carbonStorePath: String)(sparkSession: SparkSession): String = { - updateTableSchemaForAlter(newTableIdentifier, - oldTableIdentifier, thriftTableInfo, null, carbonStorePath) (sparkSession) + private def updateSchemasUpdatedTime(tableUniqueId: String, timeStamp: Long) { + CarbonFileMetastore.updateTableSchemaModifiedTime(tableUniqueId, timeStamp) + } + + override def isSchemaRefreshed(absoluteTableIdentifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): Boolean = { + val localTimeStamp = Option(CarbonFileMetastore.getTableModifiedTime(absoluteTableIdentifier + .getCarbonTableIdentifier + .getTableId)) + if (localTimeStamp.isDefined) { + if (CarbonFileMetastore.checkIfRefreshIsNeeded(absoluteTableIdentifier, localTimeStamp.get)) { + sparkSession.sessionState + .catalog.refreshTable(TableIdentifier(absoluteTableIdentifier.getTableName, + Some(absoluteTableIdentifier.getDatabaseName))) + true + } else { + false + } + } else { + true + } } def lookupRelation(dbName: Option[String], tableName: String) @@ -209,7 +276,7 @@ class CarbonFileMetastore extends CarbonMetaStore { throw new NoSuchTableException(database, tableIdentifier.table) } val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( - catalogTable.location.toString, database, tableIdentifier.table) + catalogTable.location.toString, database, tableIdentifier.table) CarbonEnv.getInstance(sparkSession).carbonMetaStore. createCarbonRelation(catalogTable.storage.properties, identifier, sparkSession) case _ => @@ -259,7 +326,7 @@ class CarbonFileMetastore extends CarbonMetaStore { def tableExists( table: String, databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean = { - tableExists(TableIdentifier(table, databaseOp))(sparkSession) + tableExists(TableIdentifier(table, databaseOp))(sparkSession) } override def tableExists(tableIdentifier: TableIdentifier) @@ -268,7 +335,7 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(tableIdentifier)(sparkSession) } catch { case _: NoSuchTableException => - LOGGER.debug(s"Table ${tableIdentifier.table} does not exist.") + LOGGER.debug(s"Table ${ tableIdentifier.table } does not exist.") return false case ex: Exception => throw ex @@ -276,62 +343,6 @@ class CarbonFileMetastore extends CarbonMetaStore { true } - private def readCarbonSchema(identifier: AbsoluteTableIdentifier, - parameters: Map[String, String], inferSchema: Boolean): Option[CarbonTable] = { - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val dbName = identifier.getCarbonTableIdentifier.getDatabaseName - val tableName = identifier.getCarbonTableIdentifier.getTableName - val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName) - val tablePath = identifier.getTablePath - var schemaRefreshTime = System.currentTimeMillis() - val wrapperTableInfo = - if (inferSchema) { - val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName) - val tblInfoFromCache = if (carbonTbl != null) { - carbonTbl.getTableInfo - } else { - CarbonUtil.convertGsonToTableInfo(parameters.asJava) - } - - val thriftTableInfo: TableInfo = if (tblInfoFromCache != null) { - // In case the TableInfo is present in the Carbon Metadata Cache - // then get the table info from the cache rather than inferring from - // the CarbonData file. - schemaConverter - .fromWrapperToExternalTableInfo(tblInfoFromCache, dbName, tableName) - } else { - schemaConverter - .fromWrapperToExternalTableInfo(SchemaReader - .inferSchema(identifier, false), - dbName, tableName) - } - val wrapperTableInfo = - schemaConverter - .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath) - wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true") - wrapperTableInfo.setTransactionalTable(false) - Some(wrapperTableInfo) - } else { - val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) - if (FileFactory.isFileExist(tableMetadataFile)) { - val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) - val wrapperTableInfo = - schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) - schemaRefreshTime = FileFactory - .getCarbonFile(tableMetadataFile).getLastModifiedTime - Some(wrapperTableInfo) - } else { - None - } - } - wrapperTableInfo.map { tableInfo => - updateSchemasUpdatedTime(tableInfo.getFactTable.getTableId, schemaRefreshTime) - CarbonMetadata.getInstance().removeTable(tableUniqueName) - CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - } - } - /** * This method will overwrite the existing schema and update it with the given details */ @@ -340,7 +351,7 @@ class CarbonFileMetastore extends CarbonMetaStore { oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, - tablePath: String) (sparkSession: SparkSession): String = { + tablePath: String)(sparkSession: SparkSession): Unit = { val identifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier) val schemaConverter = new ThriftWrapperSchemaConverterImpl if (schemaEvolutionEntry != null) { @@ -356,9 +367,26 @@ class CarbonFileMetastore extends CarbonMetaStore { newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, oldTableIdentifier.getTableId) - val path = createSchemaThriftFile(newAbsoluteTableIdentifier, thriftTableInfo) + val modifiedTime = CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .saveSchema(newAbsoluteTableIdentifier, thriftTableInfo) + updateTableSchemaModifiedTime(newAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, + modifiedTime) addCarbonTableToCache(wrapperTableInfo, newAbsoluteTableIdentifier) - path + } + + protected def addCarbonTableToCache( + tableInfo: table.TableInfo, + absoluteTableIdentifier: AbsoluteTableIdentifier): Unit = { + val identifier = absoluteTableIdentifier.getCarbonTableIdentifier + removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName) + CarbonMetadata.getInstance().loadTableMetadata(tableInfo) + } + + /** + * This method will remove the table meta from CarbonMetadata cache. + */ + def removeTableFromMetadata(dbName: String, tableName: String): Unit = { + CarbonMetadata.getInstance.removeTable(dbName, tableName) } /** @@ -366,7 +394,7 @@ class CarbonFileMetastore extends CarbonMetaStore { */ def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, - absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession): String = { + absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession): Unit = { val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, @@ -375,9 +403,9 @@ class CarbonFileMetastore extends CarbonMetaStore { absoluteTableIdentifier.getTablePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) - val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo) + CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .saveSchema(absoluteTableIdentifier, thriftTableInfo) addCarbonTableToCache(wrapperTableInfo, absoluteTableIdentifier) - path } /** @@ -386,15 +414,16 @@ class CarbonFileMetastore extends CarbonMetaStore { * Load CarbonTable from wrapper tableInfo * */ - def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) { + def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String): Unit = { val schemaConverter = new ThriftWrapperSchemaConverterImpl val dbName = tableInfo.getDatabaseName val tableName = tableInfo.getFactTable.getTableName - val thriftTableInfo = schemaConverter.fromWrapperToExternalTableInfo( - tableInfo, dbName, tableName) + val thriftTableInfo = schemaConverter.fromWrapperToExternalTableInfo(tableInfo) val identifier = AbsoluteTableIdentifier .from(tablePath, dbName, tableName, thriftTableInfo.getFact_table.getTable_id) - createSchemaThriftFile(identifier, thriftTableInfo) + updateSchemasUpdatedTime(identifier.getCarbonTableIdentifier.getTableId, + CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .saveSchema(identifier, thriftTableInfo)) LOGGER.info(s"Table $tableName for Database $dbName created successfully.") } @@ -413,46 +442,6 @@ class CarbonFileMetastore extends CarbonMetaStore { CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",") } - /** - * This method will write the schema thrift file in carbon store and load table metadata - */ - private def createSchemaThriftFile( - identifier: AbsoluteTableIdentifier, - thriftTableInfo: TableInfo): String = { - val schemaMetadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath) - if (!FileFactory.isFileExist(schemaMetadataPath)) { - val isDirCreated = FileFactory - .mkdirs(schemaMetadataPath, SparkSession.getActiveSession.get.sessionState.newHadoopConf()) - if (!isDirCreated) { - throw new IOException(s"Failed to create the metadata directory $schemaMetadataPath") - } - } - val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) - val thriftWriter = new ThriftWriter(schemaFilePath, false) - thriftWriter.open(FileWriteOperation.OVERWRITE) - thriftWriter.write(thriftTableInfo) - thriftWriter.close() - val modifiedTime = System.currentTimeMillis() - FileFactory.getCarbonFile(schemaFilePath).setLastModifiedTime(modifiedTime) - updateSchemasUpdatedTime(identifier.getCarbonTableIdentifier.getTableId, modifiedTime) - identifier.getTablePath - } - - protected def addCarbonTableToCache( - tableInfo: table.TableInfo, - absoluteTableIdentifier: AbsoluteTableIdentifier): Unit = { - val identifier = absoluteTableIdentifier.getCarbonTableIdentifier - removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName) - CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - } - - /** - * This method will remove the table meta from CarbonMetadata cache. - */ - def removeTableFromMetadata(dbName: String, tableName: String): Unit = { - CarbonMetadata.getInstance.removeTable(dbName, tableName) - } - def updateMetadataByThriftTable(schemaFilePath: String, tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit = { tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) @@ -464,18 +453,6 @@ class CarbonFileMetastore extends CarbonMetaStore { wrapperTableInfo.getOrCreateAbsoluteTableIdentifier()) } - - def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - try { - val tablePath = lookupRelation(tableIdentifier)(sparkSession) - .asInstanceOf[CarbonRelation].carbonTable.getTablePath - FileFactory.isFileExist(tablePath) - } catch { - case _: Exception => - false - } - } - def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession) { val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName @@ -498,42 +475,17 @@ class CarbonFileMetastore extends CarbonMetaStore { } } - /** - * This method will put the updated timestamp of schema file in the table modified time store map - */ - private def updateSchemasUpdatedTime(tableUniqueId: String, timeStamp: Long) { - CarbonFileMetastore.updateTableSchemaModifiedTime(tableUniqueId, timeStamp) - } - - override def isSchemaRefreshed(absoluteTableIdentifier: AbsoluteTableIdentifier, - sparkSession: SparkSession): Boolean = { - val localTimeStamp = Option(CarbonFileMetastore.getTableModifiedTime(absoluteTableIdentifier - .getCarbonTableIdentifier - .getTableId)) - if (localTimeStamp.isDefined) { - if (CarbonFileMetastore.checkIfRefreshIsNeeded(absoluteTableIdentifier, localTimeStamp.get)) { - sparkSession.sessionState - .catalog.refreshTable(TableIdentifier(absoluteTableIdentifier.getTableName, - Some(absoluteTableIdentifier.getDatabaseName))) - true - } else { - false - } - } else { - true - } - } - override def isReadFromHiveMetaStore: Boolean = false override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = { CarbonMetadata.getInstance().getAllTables.asScala } - override def getThriftTableInfo(carbonTable: CarbonTable): TableInfo = { - val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath) - CarbonUtil.readSchemaFile(tableMetadataFile) + val tableInfo = carbonTable.getTableInfo + // remove TableInfo from props to avoid it being written to schema file. + tableInfo.getFactTable.getTableProperties.remove(carbonTable.getTableId) + new ThriftWrapperSchemaConverterImpl().fromWrapperToExternalTableInfo(tableInfo) } override def createCarbonDataSourceHadoopRelation( @@ -568,9 +520,10 @@ class CarbonFileMetastore extends CarbonMetaStore { } def removeStaleTimeStampEntries(sparkSession: SparkSession): Unit = { - val tablesList = sparkSession.sessionState.catalog.listDatabases().flatMap { - database => - sparkSession.sessionState.catalog.listTables(database) + val tablesList = CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .listDatabases()(sparkSession).flatMap { database => + CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .listTables(database)(sparkSession) .map(table => CarbonTable.buildUniqueName(database, table.table)) } val cachedTableList = if (EnvHelper.isLegacy(sparkSession)) { @@ -590,5 +543,4 @@ class CarbonFileMetastore extends CarbonMetaStore { } CarbonFileMetastore.removeStaleEntries(invalidTableIds.toList) } - } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala index b6fb6ee52ed..70ca750cc4c 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala @@ -18,11 +18,13 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.ql.exec.UDF import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.index.CarbonIndexUtil import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -49,7 +51,8 @@ object CarbonHiveIndexMetadataUtil { sparkSession: SparkSession): Unit = { try { val tableIdentifier = TableIdentifier(tableName, Some(databaseName)) - sparkSession.sessionState.catalog.dropTable(tableIdentifier, true, false) + CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]).dropTable(tableIdentifier, + ignoreIfNotExists = true, purge = false)(sparkSession) } catch { case e: Exception => LOGGER.error( diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 1f15f9086fd..ca17ae521b7 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -63,11 +63,6 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { override def isSchemaRefreshed(absoluteTableIdentifier: AbsoluteTableIdentifier, sparkSession: SparkSession): Boolean = true - override def isTablePathExists(tableIdentifier: TableIdentifier) - (sparkSession: SparkSession): Boolean = { - tableExists(tableIdentifier)(sparkSession) - } - override def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier) (sparkSession: SparkSession): Unit = { val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName @@ -88,9 +83,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { override def getThriftTableInfo(carbonTable: CarbonTable): format.TableInfo = { val schemaConverter = new ThriftWrapperSchemaConverterImpl - schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo, - carbonTable.getDatabaseName, - carbonTable.getTableName) + schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo) } @@ -107,7 +100,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { thriftTableInfo: format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, tablePath: String) - (sparkSession: SparkSession): String = { + (sparkSession: SparkSession): Unit = { val schemaConverter = new ThriftWrapperSchemaConverterImpl if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) @@ -120,28 +113,6 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { schemaConverter) } - /** - * This method will overwrite the existing schema and update it with the given details - * - * @param newTableIdentifier - * @param thriftTableInfo - * @param carbonTablePath - * @param sparkSession - */ - override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, - oldTableIdentifier: CarbonTableIdentifier, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - carbonTablePath: String)(sparkSession: SparkSession): String = { - val schemaConverter = new ThriftWrapperSchemaConverterImpl - updateHiveMetaStoreForAlter( - newTableIdentifier, - oldTableIdentifier, - thriftTableInfo, - carbonTablePath, - sparkSession, - schemaConverter) - } - private def updateHiveMetaStoreForAlter(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, @@ -191,7 +162,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, identifier: AbsoluteTableIdentifier) - (sparkSession: SparkSession): String = { + (sparkSession: SparkSession): Unit = { val schemaConverter = new ThriftWrapperSchemaConverterImpl val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index 75265c296d3..d72db4e1121 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -75,20 +75,7 @@ trait CarbonMetaStore { oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, - carbonStorePath: String)(sparkSession: SparkSession): String - - /** - * This method will overwrite the existing schema and update it with the given details - * - * @param newTableIdentifier - * @param thriftTableInfo - * @param carbonStorePath - * @param sparkSession - */ - def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, - oldTableIdentifier: CarbonTableIdentifier, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - carbonStorePath: String)(sparkSession: SparkSession): String + carbonStorePath: String)(sparkSession: SparkSession): Unit /** * This method will is used to remove the evolution entry in case of failure. @@ -101,13 +88,13 @@ trait CarbonMetaStore { def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, absoluteTableIdentifier: AbsoluteTableIdentifier) - (sparkSession: SparkSession): String + (sparkSession: SparkSession): Unit /** * Prepare Thrift Schema from wrapper TableInfo and write to disk */ - def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) + def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String): Unit /** * Generates schema string to save it in hive metastore @@ -130,8 +117,6 @@ trait CarbonMetaStore { tableInfo: org.apache.carbondata.format.TableInfo, dbName: String, tableName: String, tablePath: String): Unit - def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean - def dropTable(tableIdentifier: AbsoluteTableIdentifier) (sparkSession: SparkSession) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala index c9f4c2a33e4..f27955317ca 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala @@ -21,8 +21,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSource, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTablePartition, CatalogTableType, ExternalCatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -31,7 +32,7 @@ import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter} import org.apache.spark.util.{CarbonReflectionUtils, PartitionCacheKey, PartitionCacheManager} import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema @@ -117,8 +118,9 @@ object CarbonSessionUtil { val allPartitions = PartitionCacheManager.get(PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, CarbonUtil.getExpiration_time(carbonTable))).asScala ExternalCatalogUtils.prunePartitionsByFilter( - sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(carbonTable.getTableName, - Some(carbonTable.getDatabaseName))), + CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .getTableMetadata(TableIdentifier(carbonTable.getTableName, + Some(carbonTable.getDatabaseName)))(sparkSession), allPartitions, partitionFilters, sparkSession.sessionState.conf.sessionLocalTimeZone @@ -163,6 +165,9 @@ object CarbonSessionUtil { colArray += structFiled } ) + + // TODO: Fix for spark3.0 + // Alter the data schema of a table identified by the provided database and table name. // updated schema should contain all the existing data columns. This alterTableDataSchema API // should be called after any alter with existing schema which updates the catalog table with diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala index 675b1b84195..c3bb4fa8772 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import java.net.URI import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand} @@ -27,6 +28,8 @@ import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.util.CarbonReflectionUtils +import org.apache.carbondata.core.catalog.CatalogFactory + /** * Create table 'using carbondata' and insert the query result into it. * @@ -56,7 +59,8 @@ case class CreateCarbonSourceTableAsSelectCommand( val tableName = tableIdentWithDB.unquotedString setAuditTable(db, table.identifier.table) - if (sessionState.catalog.tableExists(tableIdentWithDB)) { + if (CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .tableExists(tableIdentWithDB)(sparkSession)) { assert(mode != SaveMode.Overwrite, s"Expect the table $tableName has been dropped when the save mode is Overwrite") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index a3e3b41c109..42b6b30f1b7 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution.command import org.apache.hadoop.hive.metastore.api.InvalidOperationException import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Attribute @@ -27,6 +28,7 @@ import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams} @@ -41,9 +43,10 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) var rows: Seq[Row] = Seq() val dbName = command.databaseName var tablesInDB: Seq[TableIdentifier] = null - if (sparkSession.sessionState.catalog.listDatabases().exists(_.equalsIgnoreCase(dbName))) { - tablesInDB = sparkSession.sessionState.catalog.listTables(dbName) - .filterNot(table => try { + if (CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .listDatabases()(sparkSession).exists(_.equalsIgnoreCase(dbName))) { + tablesInDB = CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .listTables(dbName)(sparkSession).filterNot(table => try { CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession).isIndexTable } catch { case ex: NoSuchTableException => @@ -66,7 +69,8 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) } val hiveDatabaseLocation = - sparkSession.sessionState.catalog.getDatabaseMetadata(dbName).locationUri.toString + CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .getDatabaseMetadata(dbName)(sparkSession).locationUri.toString if (!carbonDatabaseLocation.equals(hiveDatabaseLocation)) { throw new InvalidOperationException("Drop database is prohibited when" + diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala index 903506ab2b8..4b812ea18c0 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala @@ -23,11 +23,13 @@ import scala.collection.JavaConverters._ import org.apache.log4j.Logger import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.DataCommand import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.SchemaReader @@ -52,15 +54,15 @@ case class RegisterIndexTableCommand(dbName: Option[String], indexTableName: Str setAuditTable(databaseName, indexTableName) setAuditInfo(Map("Parent TableName" -> parentTable)) // 1. check if the main and index table exist - if (!sparkSession.sessionState.catalog.tableExists( - TableIdentifier(parentTable, Some(databaseName)))) { + if (!CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]).tableExists( + TableIdentifier(parentTable, Some(databaseName)))(sparkSession)) { val message: String = s"Secondary Index Table registration for table [$indexTableName] with" + s" table [$databaseName.$parentTable] failed." + s"Table [$parentTable] does not exists under database [$databaseName]" CarbonException.analysisException(message) } - if (!sparkSession.sessionState.catalog.tableExists( - TableIdentifier(indexTableName, Some(databaseName)))) { + if (!CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]).tableExists( + TableIdentifier(indexTableName, Some(databaseName)))(sparkSession)) { val message: String = s"Secondary Index Table registration for table [$indexTableName] with" + s" table [$databaseName.$parentTable] failed." + s"Secondary Index Table [$indexTableName] does not exists under" + diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableColumnRenameEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableColumnRenameEventListener.scala index d673b05d587..b3326084738 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableColumnRenameEventListener.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableColumnRenameEventListener.scala @@ -33,6 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.index.IndexType import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.events._ import org.apache.carbondata.events.exception.PostEventException import org.apache.carbondata.format.TableInfo diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index cfa6e310a84..3de707db9cd 100644 --- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -385,8 +385,7 @@ object AlterTableUtil { // read the latest schema file val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(thriftTableInfo, carbonTable.getDatabaseName, carbonTable.getTableName, carbonTable.getTablePath) - schemaConverter.fromWrapperToExternalTableInfo( - wrapperTableInfo, carbonTable.getDatabaseName, carbonTable.getTableName) + schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo) } /** @@ -399,8 +398,8 @@ object AlterTableUtil { * @param sparkSession */ def modifyTableProperties(tableIdentifier: TableIdentifier, properties: Map[String, String], - propKeys: Seq[String], set: Boolean) - (sparkSession: SparkSession, catalog: SessionCatalog): Unit = { + propKeys: Seq[String], set: Boolean) + (sparkSession: SparkSession, catalog: SessionCatalog): Unit = { val tableName = tableIdentifier.table val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) @@ -486,8 +485,8 @@ object AlterTableUtil { // direct property. lowerCasePropertiesMap.foreach { property => if (validateTableProperties(property._1) || - (property._1.startsWith(CarbonCommonConstants.SPATIAL_INDEX) && - property._1.endsWith("instance"))) { + (property._1.startsWith(CarbonCommonConstants.SPATIAL_INDEX) && + property._1.endsWith("instance"))) { tblPropertiesMap.put(property._1, property._2) } else { val errorMessage = "Error: Invalid option(s): " + property._1.toString() @@ -518,7 +517,7 @@ object AlterTableUtil { .put(propKey.toLowerCase, CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT) } else if (propKey.equalsIgnoreCase(CarbonCommonConstants.SORT_COLUMNS)) { val errorMessage = "Error: Invalid option(s): " + propKey + - ", please set SORT_COLUMNS as empty instead of unset" + ", please set SORT_COLUMNS as empty instead of unset" throw new MalformedCarbonCommandException(errorMessage) } else { tblPropertiesMap.remove(propKey.toLowerCase) diff --git a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala index 20a8f5bf30a..17f11603309 100644 --- a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala +++ b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import org.apache.spark.CarbonInputMetrics import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalog.CarbonCatalog import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression => SparkExpression, PlanExpression} @@ -30,6 +31,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.carbondata.core.catalog.CatalogFactory import org.apache.carbondata.core.index.IndexFilter import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.scan.expression.Expression @@ -116,7 +118,8 @@ abstract class CarbonDataSourceScanHelper(relation: CarbonDatasourceHadoopRelati // prune dynamic partitions based on filter val sparkExpression = SparkSQLUtil.getSparkSession val runtimePartitions = ExternalCatalogUtils.prunePartitionsByFilter( - sparkExpression.sessionState.catalog.getTableMetadata(tableIdentifier.get), + CatalogFactory.getInstance().getCatalog(classOf[CarbonCatalog]) + .getTableMetadata(tableIdentifier.get)(sparkExpression), selectedCatalogPartitions, dynamicFilter, sparkExpression.sessionState.conf.sessionLocalTimeZone diff --git a/mv/plan/pom.xml b/mv/plan/pom.xml index 9865ff781cd..d1bd06789b4 100644 --- a/mv/plan/pom.xml +++ b/mv/plan/pom.xml @@ -143,49 +143,6 @@ - - spark-2.3 - - true - - - 2.3 - - - - - org.apache.maven.plugins - maven-compiler-plugin - - - src/main/spark3.1 - src/main/spark2.4 - - - - - org.codehaus.mojo - build-helper-maven-plugin - 3.0.0 - - - add-source - generate-sources - - add-source - - - - src/main/spark2.3 - src/main/common2.3and2.4 - - - - - - - - spark-2.4 @@ -228,6 +185,9 @@ spark-3.1 + + true + 3.1 diff --git a/pom.xml b/pom.xml index 69dca97237d..2f53662887d 100644 --- a/pom.xml +++ b/pom.xml @@ -126,11 +126,11 @@ 2.7.2 4.3.4 4.3-alpha1 - 2.11 - 2.11.8 + 2.12 + 2.12.8 compile - 2.3.4 - 2.3 + 3.1.1 + 3.1 4.8 compile compile @@ -572,82 +572,6 @@ 4.5.2 - - spark-2.3 - - true - - - 2.3 - 2.3.4 - 2.11 - 2.11.8 - - - - - org.codehaus.mojo - flatten-maven-plugin - 1.2.2 - - ${project.build.directory} - - - - - flatten - process-resources - - flatten - - - - - flatten.clean - clean - - clean - - - - - - org.eluder.coveralls - coveralls-maven-plugin - 4.3.0 - - opPwqWW41vYppv6KISea3u1TJvE1ugJ5Y - UTF-8 - - ${basedir}/target/carbondata-coverage-report/carbondata-coverage-report.xml - - - - ${basedir}/common/src/main/java - ${basedir}/core/src/main/java - ${basedir}/geo/src/main/java - ${basedir}/geo/src/main/scala - ${basedir}/processing/src/main/java - ${basedir}/hadoop/src/main/java - ${basedir}/integration/spark/src/main/scala - ${basedir}/integration/spark/src/main/spark2.3 - ${basedir}/integration/spark/src/main/common2.3and2.4 - ${basedir}/integration/spark/src/main/java - ${basedir}/integration/hive/src/main/scala - ${basedir}/integration/hive/src/main/java - ${basedir}/streaming/src/main/java - ${basedir}/streaming/src/main/scala - ${basedir}/sdk/sdk/src/main/java - ${basedir}/index/bloom/src/main/java - ${basedir}/index/lucene/src/main/java - ${basedir}/index/secondary-index/src/main/scala - ${basedir}/index/secondary-index/src/main/java - - - - - - spark-2.4 @@ -724,6 +648,9 @@ spark-3.1 + + true + 3.1 3.1.1 diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index 5503924a062..4cc6bca1d05 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -249,11 +249,8 @@ public static boolean deleteCompactionRequiredFile(String metaFolderPath, + CarbonCommonConstants.majorCompactionRequiredFile; } try { - if (FileFactory - .isFileExist(compactionRequiredFile)) { - if (FileFactory - .getCarbonFile(compactionRequiredFile) - .delete()) { + if (FileFactory.isFileExist(compactionRequiredFile)) { + if (FileFactory.getCarbonFile(compactionRequiredFile).delete()) { LOGGER.info("Deleted the compaction request file " + compactionRequiredFile); return true; } else { diff --git a/streaming/pom.xml b/streaming/pom.xml index 3889bf1b80e..c727c881acd 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -132,47 +132,6 @@ true - - spark-2.3 - - true - - - 2.3 - - - - - org.apache.maven.plugins - maven-compiler-plugin - - - src/main/spark3.1 - - - - - org.codehaus.mojo - build-helper-maven-plugin - 3.0.0 - - - add-source - generate-sources - - add-source - - - - src/main/spark2.x - - - - - - - - spark-2.4 @@ -213,6 +172,9 @@ spark-3.1 + + true + 3.1.1 3.1