Skip to content

Commit

Permalink
[server][da-vinci-client] Kill SIT if missing SOP for 1 hr (#622)
Browse files Browse the repository at this point in the history
For topic which are missing START_OF_PUSH control message, the store ingestion task keeps waiting on it thus blocking ST threads till it barfs out after 24hrs. This PR fails the push if it does not see SOP for 1 hour.

---------

Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 and Sourav Maji authored Sep 8, 2023
1 parent 36638a3 commit 1989dac
Showing 1 changed file with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -136,7 +138,10 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {

private static final String CONSUMER_TASK_ID_FORMAT = StoreIngestionTask.class.getSimpleName() + " for [ Topic: %s ]";
public static long SCHEMA_POLLING_DELAY_MS = SECONDS.toMillis(5);
public static long STORE_VERSION_POLLING_DELAY_MS = MINUTES.toMillis(1);

private static final long SCHEMA_POLLING_TIMEOUT_MS = MINUTES.toMillis(5);
private static final long SOP_POLLING_TIMEOUT_MS = HOURS.toMillis(1);

private static final int MAX_CONSUMER_ACTION_ATTEMPTS = 5;
private static final int MAX_IDLE_COUNTER = 100;
Expand Down Expand Up @@ -200,6 +205,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final BooleanSupplier isCurrentVersion;
protected final Optional<HybridStoreConfig> hybridStoreConfig;
protected final Consumer<DataValidationException> divErrorMetricCallback;
private final ExecutorService missingSOPCheckExecutor = Executors.newSingleThreadExecutor();

protected final long readCycleDelayMs;
protected final long emptyPollSleepMs;
Expand Down Expand Up @@ -274,6 +280,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final StatusReportAdapter statusReportAdapter;

private final Optional<ObjectCacheBackend> cacheBackend;
private final Runnable runnableForKillIngestionTasksForMissingSOP;

protected final String localKafkaServer;
protected final int localKafkaClusterId;
Expand Down Expand Up @@ -406,6 +413,8 @@ public StoreIngestionTask(
new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion),
amplificationFactorAdapter);

this.runnableForKillIngestionTasksForMissingSOP = () -> waitForStateVersion(kafkaVersionTopic);
this.missingSOPCheckExecutor.execute(runnableForKillIngestionTasksForMissingSOP);
this.cacheBackend = cacheBackend;
this.localKafkaServer = this.kafkaProps.getProperty(KAFKA_BOOTSTRAP_SERVERS);
this.localKafkaServerSingletonSet = Collections.singleton(localKafkaServer);
Expand Down Expand Up @@ -472,6 +481,29 @@ protected int nextSeqNum() {
return consumerActionSequenceNumber.incrementAndGet();
}

private void waitForStateVersion(String kafkaTopic) {
long startTime = System.currentTimeMillis();

for (;;) {
StoreVersionState state = storageEngine.getStoreVersionState();
long elapsedTime = System.currentTimeMillis() - startTime;
if (state != null || !isRunning()) {
break;
}

if (elapsedTime > SOP_POLLING_TIMEOUT_MS) {
LOGGER.warn("Killing the ingestion as Version state is not available for {} after {}", kafkaTopic, elapsedTime);
kill();
}
try {
Thread.sleep(STORE_VERSION_POLLING_DELAY_MS);
} catch (InterruptedException e) {
LOGGER.info("Received interruptedException while waiting for store version.");
break;
}
}
}

/**
* Adds an asynchronous partition subscription request for the task.
*/
Expand Down

0 comments on commit 1989dac

Please sign in to comment.