Skip to content

Commit

Permalink
[CARBONDATA-4288][CARBONDATA-4289] Fix various issues with Index Serv…
Browse files Browse the repository at this point in the history
…er caching mechanism.

Why is this PR needed?
There are 2 issues in the Index Server flow:
In case when there is a main table with a SI table with prepriming disabled and index serve
enabled, new load to main table and SI table put the cache for the main table in the index
server. Cache is also getting again when a select query is fired. This issue happens because
during load to SI table, getSplits is called on the main table segment which is in Insert In
Progress state. Index server considers this segment as a legacy segment because it's index
size = 0 and does not put it's entry in the tableToExecutor mapping. In the getsplits method
isRefreshneeded is false the first time getSplits is called. During the select query, in
getSplits method isRefreshNeeded is true and the previous loaded entry is removed from the
driver but since there is no entry for that table in tableToExecutor mapping, the previous
cache value becomes dead cache and always stays in the index server. The newly loaded cache
is loaded to a new executor and 2 copies of cache for the same segment is being mantained.
Concurrent select queries to the index server shows wrong cache values in the Index server.

What changes were proposed in this PR?
The following changes are proposed to the index server code:
Removing cache object from the index server in case the segment is INSERT IN PROGRESS and
in the case of legacy segment adding the value in tabeToExecutor mappping so that the cache
is also removed from the executor side.
Concurrent queries were able adding duplicate cache values to other executors. Changed logic
of assign executors method so that concurrent queries are not able to add cache for same segment
in other executors

This closes #4219
  • Loading branch information
vikramahuja1001 authored and kunal642 committed Sep 20, 2021
1 parent 22342f8 commit ce860d0
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
// scalastyle:on
service.shutdownNow()
val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD].getName)
// remove the cache of Inprogress segments, this case is required during the case of
// loading to SI. We do get splits of main table in case of Inprogress segment. No
// need to load it to the cache.
val inProgressSegments = indexInputFormat.getValidSegments.asScala.collect {
case seg if DistributedRDDUtils
.isSegmentInProgress(indexInputFormat, seg.getSegmentNo) => seg.getSegmentNo
}
if (inProgressSegments.nonEmpty) {
IndexStoreManager.getInstance().clearInvalidSegments(indexInputFormat.getCarbonTable,
inProgressSegments.asJava)
}
LOGGER.info(s"Time taken to collect ${ inputSplits.size } blocklets : " +
(System.currentTimeMillis() - startTime))
val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapper
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, TableStatusReadCommittedScope}
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
Expand Down Expand Up @@ -108,7 +108,25 @@ object DistributedRDDUtils {
val wrapper: IndexInputSplit = legacySegment
.asInstanceOf[IndexInputSplitWrapper].getDistributable
val executor = validExecutorIds(index % validExecutorIds.length)
wrapper.setLocations(Array("executor_" + executor))
// Below code is used to support concurrent queries on same segment going to
// the same executor for caching. Putting a new HashMap in tableToExecutorMapping
// and whichever query is able to put entry in existingSegmentMapping decides the
// executor which will be used later on. For one thread oldMapping will be null and
// for the second it will be the executor returned which will be used.
tableToExecutorMapping.putIfAbsent(tableUniqueName,
new ConcurrentHashMap[String, String]())
val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName)
val oldMapping = existingSegmentMapping.putIfAbsent(wrapper.getSegment.getSegmentNo,
s"${executor}")
if (oldMapping == null) {
val newSegmentMapping = new ConcurrentHashMap[String, String]()
newSegmentMapping.put(wrapper.getSegment.getSegmentNo, s"${executor}")
tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping)
wrapper.setLocations(Array("executor_" + executor))
} else {
wrapper.setLocations(Array("executor_" + existingSegmentMapping
.get(wrapper.getSegment.getSegmentNo)))
}
legacySegment
}
} else { Seq() }
Expand Down Expand Up @@ -164,6 +182,15 @@ object DistributedRDDUtils {
}
}

def isSegmentInProgress(request: IndexInputFormat, segment: String): Boolean = {
request.getReadCommittedScope.getSegmentList.find(_.getLoadName
.equalsIgnoreCase(segment)) match {
case Some(value) => value.getSegmentStatus.equals(SegmentStatus.INSERT_IN_PROGRESS) || value
.getSegmentStatus.equals(SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)
case None => false
}
}

/**
* Remove the table mapping from index server when the table is dropped.
*/
Expand Down Expand Up @@ -313,31 +340,32 @@ object DistributedRDDUtils {
case None => throw new RuntimeException("Could not find any alive executors.")
}
}
val existingExecutorMapping = executorToCacheSizeMapping.get(newHost)
if (existingExecutorMapping != null) {
val existingSize = existingExecutorMapping.get(newExecutor)
if (existingSize != null) {
existingExecutorMapping.put(newExecutor, existingSize + segment.getIndexSize
.toInt)
} else {
existingExecutorMapping.put(newExecutor, segment.getIndexSize
.toInt)
}
} else {
val newExecutorMapping = new ConcurrentHashMap[String, Long]()
newExecutorMapping.put(newExecutor, segment.getIndexSize)
executorToCacheSizeMapping.put(newHost, newExecutorMapping)
}
tableToExecutorMapping.putIfAbsent(tableUniqueName, new ConcurrentHashMap[String, String]())
val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName)
if (existingSegmentMapping == null) {
val newSegmentMapping = new ConcurrentHashMap[String, String]()
newSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping)
val oldMapping = existingSegmentMapping.putIfAbsent(segment.getSegmentNo,
s"${ newHost }_$newExecutor")
if (oldMapping == null) {
updateCacheSize(newHost, newExecutor, segment)
s"executor_${newHost}_$newExecutor"
} else {
existingSegmentMapping.putIfAbsent(segment.getSegmentNo, s"${newHost}_$newExecutor")
tableToExecutorMapping.putIfAbsent(tableUniqueName, existingSegmentMapping)
s"executor_$oldMapping"
}
}
}

private def updateCacheSize(host: String, executor: String, segment: Segment) = {
val existingExecutorMapping = executorToCacheSizeMapping.get(host)
if (existingExecutorMapping != null) {
val existingSize = existingExecutorMapping.get(executor)
var totalSize = segment.getIndexSize
if (existingSize != null) {
totalSize += existingSize
}
s"executor_${newHost}_$newExecutor"
existingExecutorMapping.put(executor, totalSize.toInt)
} else {
val newExecutorMapping = new ConcurrentHashMap[String, Long]()
newExecutorMapping.put(executor, segment.getIndexSize)
executorToCacheSizeMapping.put(host, newExecutorMapping)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.indexserver

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ConcurrentHashMap, Executors}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -226,4 +226,28 @@ class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
assert(FileFactory.isFileExist(indexServerTempFolder))
assert(FileFactory.isFileExist(tmpPathAnother))
}

test("test concurrent assigning of executors") {
executorCache.clear()
tableCache.clear()
// val executorsList: Map[String, Seq[String]]
val executorList = Map("EX1" -> Seq("1"), "EX2" -> Seq("2"))
val seg = new Segment("5")
seg.setIndexSize(10)
val executorService = Executors.newFixedThreadPool(8)
for (num <- 1 to 8) {
executorService.submit(
new Runnable {
override def run(): Unit = {
DistributedRDDUtils.assignExecutor("tablename", seg, executorList)
}
}).get()
}
executorService.shutdownNow()
assert(executorCache.size() == 1)
assert(executorCache.entrySet().iterator().next().getValue
.entrySet().iterator().next().getValue == 10)
executorCache.clear()
tableCache.clear()
}
}

0 comments on commit ce860d0

Please sign in to comment.