From 1989dac8ebf2c27298bacb58a5a6cab68a32b550 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Fri, 8 Sep 2023 11:27:22 -0700 Subject: [PATCH] [server][da-vinci-client] Kill SIT if missing SOP for 1 hr (#622) For topic which are missing START_OF_PUSH control message, the store ingestion task keeps waiting on it thus blocking ST threads till it barfs out after 24hrs. This PR fails the push if it does not see SOP for 1 hour. --------- Co-authored-by: Sourav Maji --- .../kafka/consumer/StoreIngestionTask.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 5a85dab65a..af1707656d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -111,6 +111,8 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -136,7 +138,10 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { private static final String CONSUMER_TASK_ID_FORMAT = StoreIngestionTask.class.getSimpleName() + " for [ Topic: %s ]"; public static long SCHEMA_POLLING_DELAY_MS = SECONDS.toMillis(5); + public static long STORE_VERSION_POLLING_DELAY_MS = MINUTES.toMillis(1); + private static final long SCHEMA_POLLING_TIMEOUT_MS = MINUTES.toMillis(5); + private static final long SOP_POLLING_TIMEOUT_MS = HOURS.toMillis(1); private static final int MAX_CONSUMER_ACTION_ATTEMPTS = 5; private static final int MAX_IDLE_COUNTER = 100; @@ -200,6 +205,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final BooleanSupplier isCurrentVersion; protected final Optional hybridStoreConfig; protected final Consumer divErrorMetricCallback; + private final ExecutorService missingSOPCheckExecutor = Executors.newSingleThreadExecutor(); protected final long readCycleDelayMs; protected final long emptyPollSleepMs; @@ -274,6 +280,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final StatusReportAdapter statusReportAdapter; private final Optional cacheBackend; + private final Runnable runnableForKillIngestionTasksForMissingSOP; protected final String localKafkaServer; protected final int localKafkaClusterId; @@ -406,6 +413,8 @@ public StoreIngestionTask( new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion), amplificationFactorAdapter); + this.runnableForKillIngestionTasksForMissingSOP = () -> waitForStateVersion(kafkaVersionTopic); + this.missingSOPCheckExecutor.execute(runnableForKillIngestionTasksForMissingSOP); this.cacheBackend = cacheBackend; this.localKafkaServer = this.kafkaProps.getProperty(KAFKA_BOOTSTRAP_SERVERS); this.localKafkaServerSingletonSet = Collections.singleton(localKafkaServer); @@ -472,6 +481,29 @@ protected int nextSeqNum() { return consumerActionSequenceNumber.incrementAndGet(); } + private void waitForStateVersion(String kafkaTopic) { + long startTime = System.currentTimeMillis(); + + for (;;) { + StoreVersionState state = storageEngine.getStoreVersionState(); + long elapsedTime = System.currentTimeMillis() - startTime; + if (state != null || !isRunning()) { + break; + } + + if (elapsedTime > SOP_POLLING_TIMEOUT_MS) { + LOGGER.warn("Killing the ingestion as Version state is not available for {} after {}", kafkaTopic, elapsedTime); + kill(); + } + try { + Thread.sleep(STORE_VERSION_POLLING_DELAY_MS); + } catch (InterruptedException e) { + LOGGER.info("Received interruptedException while waiting for store version."); + break; + } + } + } + /** * Adds an asynchronous partition subscription request for the task. */