From ce860d0431f41e0594cee848149cdc344f728dd3 Mon Sep 17 00:00:00 2001 From: Vikram Ahuja Date: Fri, 17 Sep 2021 12:54:01 +0530 Subject: [PATCH] [CARBONDATA-4288][CARBONDATA-4289] Fix various issues with Index Server caching mechanism. Why is this PR needed? There are 2 issues in the Index Server flow: In case when there is a main table with a SI table with prepriming disabled and index serve enabled, new load to main table and SI table put the cache for the main table in the index server. Cache is also getting again when a select query is fired. This issue happens because during load to SI table, getSplits is called on the main table segment which is in Insert In Progress state. Index server considers this segment as a legacy segment because it's index size = 0 and does not put it's entry in the tableToExecutor mapping. In the getsplits method isRefreshneeded is false the first time getSplits is called. During the select query, in getSplits method isRefreshNeeded is true and the previous loaded entry is removed from the driver but since there is no entry for that table in tableToExecutor mapping, the previous cache value becomes dead cache and always stays in the index server. The newly loaded cache is loaded to a new executor and 2 copies of cache for the same segment is being mantained. Concurrent select queries to the index server shows wrong cache values in the Index server. What changes were proposed in this PR? The following changes are proposed to the index server code: Removing cache object from the index server in case the segment is INSERT IN PROGRESS and in the case of legacy segment adding the value in tabeToExecutor mappping so that the cache is also removed from the executor side. Concurrent queries were able adding duplicate cache values to other executors. Changed logic of assign executors method so that concurrent queries are not able to add cache for same segment in other executors This closes #4219 --- .../indexserver/DistributedPruneRDD.scala | 11 +++ .../indexserver/DistributedRDDUtils.scala | 76 +++++++++++++------ .../indexserver/DistributedRDDUtilsTest.scala | 26 ++++++- 3 files changed, 88 insertions(+), 25 deletions(-) diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala index dac47ffad01..bd0b5c9b798 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala @@ -115,6 +115,17 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS // scalastyle:on service.shutdownNow() val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD].getName) + // remove the cache of Inprogress segments, this case is required during the case of + // loading to SI. We do get splits of main table in case of Inprogress segment. No + // need to load it to the cache. + val inProgressSegments = indexInputFormat.getValidSegments.asScala.collect { + case seg if DistributedRDDUtils + .isSegmentInProgress(indexInputFormat, seg.getSegmentNo) => seg.getSegmentNo + } + if (inProgressSegments.nonEmpty) { + IndexStoreManager.getInstance().clearInvalidSegments(indexInputFormat.getCarbonTable, + inProgressSegments.asJava) + } LOGGER.info(s"Time taken to collect ${ inputSplits.size } blocklets : " + (System.currentTimeMillis() - startTime)) val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) { diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala index 1994e15b96c..41db8f19359 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala @@ -32,7 +32,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapper import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, TableStatusReadCommittedScope} -import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} @@ -108,7 +108,25 @@ object DistributedRDDUtils { val wrapper: IndexInputSplit = legacySegment .asInstanceOf[IndexInputSplitWrapper].getDistributable val executor = validExecutorIds(index % validExecutorIds.length) - wrapper.setLocations(Array("executor_" + executor)) + // Below code is used to support concurrent queries on same segment going to + // the same executor for caching. Putting a new HashMap in tableToExecutorMapping + // and whichever query is able to put entry in existingSegmentMapping decides the + // executor which will be used later on. For one thread oldMapping will be null and + // for the second it will be the executor returned which will be used. + tableToExecutorMapping.putIfAbsent(tableUniqueName, + new ConcurrentHashMap[String, String]()) + val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName) + val oldMapping = existingSegmentMapping.putIfAbsent(wrapper.getSegment.getSegmentNo, + s"${executor}") + if (oldMapping == null) { + val newSegmentMapping = new ConcurrentHashMap[String, String]() + newSegmentMapping.put(wrapper.getSegment.getSegmentNo, s"${executor}") + tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping) + wrapper.setLocations(Array("executor_" + executor)) + } else { + wrapper.setLocations(Array("executor_" + existingSegmentMapping + .get(wrapper.getSegment.getSegmentNo))) + } legacySegment } } else { Seq() } @@ -164,6 +182,15 @@ object DistributedRDDUtils { } } + def isSegmentInProgress(request: IndexInputFormat, segment: String): Boolean = { + request.getReadCommittedScope.getSegmentList.find(_.getLoadName + .equalsIgnoreCase(segment)) match { + case Some(value) => value.getSegmentStatus.equals(SegmentStatus.INSERT_IN_PROGRESS) || value + .getSegmentStatus.equals(SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) + case None => false + } + } + /** * Remove the table mapping from index server when the table is dropped. */ @@ -313,31 +340,32 @@ object DistributedRDDUtils { case None => throw new RuntimeException("Could not find any alive executors.") } } - val existingExecutorMapping = executorToCacheSizeMapping.get(newHost) - if (existingExecutorMapping != null) { - val existingSize = existingExecutorMapping.get(newExecutor) - if (existingSize != null) { - existingExecutorMapping.put(newExecutor, existingSize + segment.getIndexSize - .toInt) - } else { - existingExecutorMapping.put(newExecutor, segment.getIndexSize - .toInt) - } - } else { - val newExecutorMapping = new ConcurrentHashMap[String, Long]() - newExecutorMapping.put(newExecutor, segment.getIndexSize) - executorToCacheSizeMapping.put(newHost, newExecutorMapping) - } + tableToExecutorMapping.putIfAbsent(tableUniqueName, new ConcurrentHashMap[String, String]()) val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName) - if (existingSegmentMapping == null) { - val newSegmentMapping = new ConcurrentHashMap[String, String]() - newSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor") - tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping) + val oldMapping = existingSegmentMapping.putIfAbsent(segment.getSegmentNo, + s"${ newHost }_$newExecutor") + if (oldMapping == null) { + updateCacheSize(newHost, newExecutor, segment) + s"executor_${newHost}_$newExecutor" } else { - existingSegmentMapping.putIfAbsent(segment.getSegmentNo, s"${newHost}_$newExecutor") - tableToExecutorMapping.putIfAbsent(tableUniqueName, existingSegmentMapping) + s"executor_$oldMapping" + } + } + } + + private def updateCacheSize(host: String, executor: String, segment: Segment) = { + val existingExecutorMapping = executorToCacheSizeMapping.get(host) + if (existingExecutorMapping != null) { + val existingSize = existingExecutorMapping.get(executor) + var totalSize = segment.getIndexSize + if (existingSize != null) { + totalSize += existingSize } - s"executor_${newHost}_$newExecutor" + existingExecutorMapping.put(executor, totalSize.toInt) + } else { + val newExecutorMapping = new ConcurrentHashMap[String, Long]() + newExecutorMapping.put(executor, segment.getIndexSize) + executorToCacheSizeMapping.put(host, newExecutorMapping) } } diff --git a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala index ec75441fc1c..bc7ba13a23e 100644 --- a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala +++ b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala @@ -17,7 +17,7 @@ package org.apache.indexserver -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ConcurrentHashMap, Executors} import scala.collection.JavaConverters._ @@ -226,4 +226,28 @@ class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach { assert(FileFactory.isFileExist(indexServerTempFolder)) assert(FileFactory.isFileExist(tmpPathAnother)) } + + test("test concurrent assigning of executors") { + executorCache.clear() + tableCache.clear() + // val executorsList: Map[String, Seq[String]] + val executorList = Map("EX1" -> Seq("1"), "EX2" -> Seq("2")) + val seg = new Segment("5") + seg.setIndexSize(10) + val executorService = Executors.newFixedThreadPool(8) + for (num <- 1 to 8) { + executorService.submit( + new Runnable { + override def run(): Unit = { + DistributedRDDUtils.assignExecutor("tablename", seg, executorList) + } + }).get() + } + executorService.shutdownNow() + assert(executorCache.size() == 1) + assert(executorCache.entrySet().iterator().next().getValue + .entrySet().iterator().next().getValue == 10) + executorCache.clear() + tableCache.clear() + } }