From ae91be7a2951cfe253dfe0c5f79f6c7bcc533854 Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Thu, 16 Jan 2025 18:19:09 +0800 Subject: [PATCH] remove --- .../CacheInvalidateAccumulator.scala | 65 -------------- .../handler/CacheInvalidateEventHandler.scala | 87 ------------------- .../listener/CacheInvalidateListener.scala | 14 +-- .../listener/PDCacheInvalidateListener.scala | 45 ---------- .../pingcap/tikv/region/RegionManager.java | 7 +- .../tikv/region/RegionStoreClient.java | 2 +- 6 files changed, 6 insertions(+), 214 deletions(-) delete mode 100644 core/src/main/scala/com/pingcap/tispark/accumulator/CacheInvalidateAccumulator.scala delete mode 100644 core/src/main/scala/com/pingcap/tispark/handler/CacheInvalidateEventHandler.scala delete mode 100644 core/src/main/scala/com/pingcap/tispark/listener/PDCacheInvalidateListener.scala diff --git a/core/src/main/scala/com/pingcap/tispark/accumulator/CacheInvalidateAccumulator.scala b/core/src/main/scala/com/pingcap/tispark/accumulator/CacheInvalidateAccumulator.scala deleted file mode 100644 index a90fb37663..0000000000 --- a/core/src/main/scala/com/pingcap/tispark/accumulator/CacheInvalidateAccumulator.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2017 PingCAP, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.pingcap.tispark.accumulator - -import java.util - -import com.pingcap.tikv.event.CacheInvalidateEvent -import org.apache.spark.util.AccumulatorV2 - -import scala.collection.JavaConversions._ - -/** - * A cache invalidate request collector. - * - * In common execution of a spark job, executor nodes may receive cache invalidate information - * and flush executor's own cache store in that node, without explicitly notifying driver node - * to invalidate cache information. This class is used for accumulating cache invalidate event - * and make driver node easier to decide when to update it's PD-cache. - */ -class CacheInvalidateAccumulator - extends AccumulatorV2[CacheInvalidateEvent, Seq[CacheInvalidateEvent]] { - private final val eventSet: util.Set[CacheInvalidateEvent] = - new util.HashSet[CacheInvalidateEvent] - - override def isZero: Boolean = eventSet.isEmpty - - override def reset(): Unit = eventSet.clear() - - override def add(v: CacheInvalidateEvent): Unit = - eventSet.synchronized { - eventSet.add(v) - } - - override def copy(): AccumulatorV2[CacheInvalidateEvent, Seq[CacheInvalidateEvent]] = { - val accumulator = new CacheInvalidateAccumulator - eventSet.synchronized { - accumulator.eventSet.addAll(eventSet) - } - accumulator - } - - override def merge( - other: AccumulatorV2[CacheInvalidateEvent, Seq[CacheInvalidateEvent]]): Unit = - eventSet.addAll(other.value) - - override def value: Seq[CacheInvalidateEvent] = eventSet.toList - - def remove(event: CacheInvalidateEvent): Boolean = - eventSet.synchronized { - eventSet.remove(event) - } -} diff --git a/core/src/main/scala/com/pingcap/tispark/handler/CacheInvalidateEventHandler.scala b/core/src/main/scala/com/pingcap/tispark/handler/CacheInvalidateEventHandler.scala deleted file mode 100644 index 3284d125bf..0000000000 --- a/core/src/main/scala/com/pingcap/tispark/handler/CacheInvalidateEventHandler.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Copyright 2017 PingCAP, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.pingcap.tispark.handler - -import com.pingcap.tikv.event.CacheInvalidateEvent -import com.pingcap.tikv.event.CacheInvalidateEvent.CacheType -import com.pingcap.tikv.region.RegionManager -import com.pingcap.tispark.listener.CacheInvalidateListener -import org.slf4j.LoggerFactory - -/** - * A CacheInvalidateEventHandler as it's name indicates what this class will do. - * - * Since there's only one event and one event handler currently in the project, we - * don't need to over-design our event handler to support millions of other events. - * - * Refactor this if we need to support tons of events. - * - * @param regionManager Region manager used for sending invalidating cache. Usually - * it's Spark driver's regionManager - */ -class CacheInvalidateEventHandler(regionManager: RegionManager) { - private final val logger = LoggerFactory.getLogger(getClass.getName) - - def handle(event: CacheInvalidateEvent): Unit = { - try { - event.getCacheType match { - case CacheType.REGION_STORE => - // Used for updating region/store cache in the given regionManager - if (event.shouldUpdateRegion()) { - logger.info(s"Invalidating region ${event.getRegion.getId} cache at driver.") - val region = regionManager.getRegionByKey(event.getRegion.getStartKey) - if (region != null) { - regionManager.invalidateRegion(region) - } - } - - if (event.shouldUpdateStore()) { - logger.info(s"Invalidating store ${event.getStoreId} cache at driver.") - regionManager.invalidateStore(event.getStoreId) - } - case CacheType.LEADER => - // Used for updating leader information cached in the given regionManager - logger.info( - s"Invalidating leader of region:${event.getRegion.getId} store:${event.getStoreId} cache at driver.") - val region = regionManager.getRegionByKey(event.getRegion.getStartKey) - if (region != null) { - regionManager.updateLeader(region, event.getStoreId) - regionManager.invalidateRegion(region) - } - - case CacheType.REQ_FAILED => - logger.info(s"Request failed cache invalidation for region ${event.getRegion.getId}") - val region = regionManager.getRegionByKey(event.getRegion.getStartKey) - if (region != null) { - regionManager.onRequestFail(region) - } - case _ => throw new IllegalArgumentException("Unsupported cache invalidate type.") - } - } catch { - case e: Exception => - logger.error(s"Updating cache failed:${e.getMessage}") - return - } - CacheInvalidateListener.getInstance().CACHE_INVALIDATE_ACCUMULATOR.remove(event) - } -} - -object CacheInvalidateEventHandler { - def apply(regionManager: RegionManager): CacheInvalidateEventHandler = - new CacheInvalidateEventHandler(regionManager) -} diff --git a/core/src/main/scala/com/pingcap/tispark/listener/CacheInvalidateListener.scala b/core/src/main/scala/com/pingcap/tispark/listener/CacheInvalidateListener.scala index 6976e38541..c00d7dad8b 100644 --- a/core/src/main/scala/com/pingcap/tispark/listener/CacheInvalidateListener.scala +++ b/core/src/main/scala/com/pingcap/tispark/listener/CacheInvalidateListener.scala @@ -19,20 +19,15 @@ package com.pingcap.tispark.listener import com.pingcap.tikv.event.CacheInvalidateEvent import com.pingcap.tikv.region.RegionManager -import com.pingcap.tispark.accumulator.CacheInvalidateAccumulator -import com.pingcap.tispark.handler.CacheInvalidateEventHandler import org.apache.spark.SparkContext import org.slf4j.LoggerFactory class CacheInvalidateListener() extends Serializable with java.util.function.Function[CacheInvalidateEvent, Void] { - final val CACHE_ACCUMULATOR_NAME = "CacheInvalidateAccumulator" - final val CACHE_INVALIDATE_ACCUMULATOR = new CacheInvalidateAccumulator override def apply(t: CacheInvalidateEvent): Void = { // this operation shall be executed in executor nodes - CACHE_INVALIDATE_ACCUMULATOR.add(t) null } } @@ -71,12 +66,5 @@ object CacheInvalidateListener { def init( sc: SparkContext, regionManager: RegionManager, - manager: CacheInvalidateListener): Unit = - if (sc != null && regionManager != null) { - sc.register(manager.CACHE_INVALIDATE_ACCUMULATOR, manager.CACHE_ACCUMULATOR_NAME) - sc.addSparkListener( - new PDCacheInvalidateListener( - manager.CACHE_INVALIDATE_ACCUMULATOR, - CacheInvalidateEventHandler(regionManager))) - } + manager: CacheInvalidateListener): Unit = {} } diff --git a/core/src/main/scala/com/pingcap/tispark/listener/PDCacheInvalidateListener.scala b/core/src/main/scala/com/pingcap/tispark/listener/PDCacheInvalidateListener.scala deleted file mode 100644 index 87354447f3..0000000000 --- a/core/src/main/scala/com/pingcap/tispark/listener/PDCacheInvalidateListener.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Copyright 2017 PingCAP, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.pingcap.tispark.listener - -import java.util.logging.Logger - -import com.pingcap.tispark.accumulator.CacheInvalidateAccumulator -import com.pingcap.tispark.handler.CacheInvalidateEventHandler -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} - -class PDCacheInvalidateListener( - accumulator: CacheInvalidateAccumulator, - handler: CacheInvalidateEventHandler) - extends SparkListener { - private final val logger: Logger = Logger.getLogger(getClass.getName) - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = - if (accumulator != null && !accumulator.isZero && handler != null) { - synchronized { - if (!accumulator.isZero) { - val events = accumulator.value - logger.info( - s"Receiving ${events.size} cache invalidation request(s) from job ${jobEnd.jobId} at driver. " + - s"This indicates that there's exception(s) thrown in executor node when communicating with " + - s"TiKV, checkout executors' log for more information.") - events.foreach(handler.handle) - } - } - } -} diff --git a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java index 086e9e448f..81ac22c472 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java @@ -67,7 +67,7 @@ public RegionManager(ReadOnlyPDClient pdClient) { } public synchronized void setCacheInvalidateCallback( - Function cacheInvalidateCallback) { + Function cacheInvalidateCallback) { this.cacheInvalidateCallback = cacheInvalidateCallback; } @@ -195,7 +195,7 @@ public void invalidateRegion(TiRegion region) { } public void invalidateRange(ByteString startKey, ByteString endKey) { - cache.invalidateRange(startKey,endKey); + cache.invalidateRange(startKey, endKey); } public static class RegionCache { @@ -263,7 +263,8 @@ private synchronized TiRegion getRegionFromCache(Key key) { private synchronized void invalidateRange(ByteString startKey, ByteString endKey) { regionCache.remove(makeRange(startKey, endKey)); if (logger.isDebugEnabled()) { - logger.debug(String.format("invalidateRange success, startKey[%s], endKey[%s]", startKey, endKey)); + logger.debug( + String.format("invalidateRange success, startKey[%s], endKey[%s]", startKey, endKey)); } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java index 238cc6f918..ba26da9a3b 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java @@ -708,7 +708,7 @@ private List handleCopResponse( // we need to invalidate cache when region not find if (regionError.hasRegionNotFound()) { logger.info("invalidateRange when Re-splitting region task because of region not find."); - this.regionManager.invalidateRange(region.getStartKey(),region.getEndKey()); + this.regionManager.invalidateRange(region.getStartKey(), region.getEndKey()); } // Split ranges return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, storeType);