diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 7e90a4e429..7694b33fff 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -337,6 +337,25 @@ private ConfigKeys() { public static final String CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED = "controller.parent.external.superset.schema.generation.enabled"; + /** + * Whether to check system store health in child controller. Default is false. + */ + public static final String CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_ENABLED = + "controller.system.store.health.check.enabled"; + + /** + * Frequency to run system store health check in child controller. Default is 1h. + */ + public static final String CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_INTERVAL_SECONDS = + "controller.system.store.health.check.interval.seconds"; + + /** + * The wait time before validating system store heartbeat during system store health check in child controller. + * Default is 1min. + */ + public static final String CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_HEARTBEAT_WAIT_TIME_SECONDS = + "controller.system.store.health.check.heartbeat.wait.time.seconds"; + /** * Whether to initialize system schemas when controller starts. Default is true. */ diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/common/PushStatusStoreUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/common/PushStatusStoreUtils.java index a5b34f9942..e1d4049e5f 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/common/PushStatusStoreUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/common/PushStatusStoreUtils.java @@ -15,6 +15,8 @@ public enum PushStatusKeyType { public static final String SERVER_INCREMENTAL_PUSH_PREFIX = "SERVER_SIDE_INCREMENTAL_PUSH_STATUS"; public static final String ONGOING_INCREMENTAL_PUSH_STATUSES_KEY = "ONGOING_INCREMENTAL_PUSHES"; + // This constant is the heartbeat key name for controller to check system store ingestion health. + public static final String CONTROLLER_HEARTBEAT_INSTANCE_NAME = "CONTROLLER_HEARTBEAT"; public static PushStatusKey getHeartbeatKey(String instanceName) { PushStatusKey pushStatusKey = new PushStatusKey(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java index ccc7d6794d..ef74783640 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java @@ -30,12 +30,11 @@ /** - * PushStatusStoreReader is a helper class for Venice controller reading PushStatus/Heartbeat messages. - * One PushStatusStoreReader for one regular Venice store. - * Don't keep a map of [storeName->client] to minimize states kept by controller. + * This class is a helper class for Venice controller to read PushStatus / Heartbeat messages. */ public class PushStatusStoreReader implements Closeable { private static final Logger LOGGER = LogManager.getLogger(PushStatusStoreReader.class); + private static final int DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS = 3; private final Map> veniceClients = new VeniceConcurrentHashMap<>(); private final D2Client d2Client; @@ -195,7 +194,8 @@ public long getHeartbeat(String storeName, String instanceName) { AvroSpecificStoreClient client = getVeniceClient(storeName); PushStatusKey pushStatusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName); try { - PushStatusValue pushStatusValue = client.get(pushStatusKey).get(); + PushStatusValue pushStatusValue = + client.get(pushStatusKey).get(DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); if (pushStatusValue == null) { return 0; } else { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java index 6a24f70798..2a8b2d00f5 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java @@ -43,17 +43,22 @@ public PushStatusStoreWriter(VeniceWriterFactory writerFactory, String instanceN this.derivedSchemaId = derivedSchemaId; } - public void writeHeartbeat(String storeName) { + public void writeHeartbeat(String storeName, long heartbeat) { VeniceWriter writer = veniceWriterCache.prepareVeniceWriter(storeName); PushStatusKey pushStatusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName); PushStatusValue pushStatusValue = new PushStatusValue(); - pushStatusValue.reportTimestamp = System.currentTimeMillis(); + pushStatusValue.reportTimestamp = heartbeat; pushStatusValue.instances = Collections.emptyMap(); LOGGER.info("Sending heartbeat of {}", instanceName); writer.put( pushStatusKey, pushStatusValue, AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion()); + + } + + public void writeHeartbeat(String storeName) { + writeHeartbeat(storeName, System.currentTimeMillis()); } public void writePushStatus(String storeName, int version, int partitionId, ExecutionStatus status) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreDataType.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreDataType.java index ada01709e8..ad13503130 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreDataType.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreDataType.java @@ -29,7 +29,8 @@ public enum MetaStoreDataType implements VeniceEnumValue { Arrays.asList(KEY_STRING_STORE_NAME, KEY_STRING_CLUSTER_NAME, KEY_STRING_VERSION_NUMBER, KEY_STRING_PARTITION_ID) ), STORE_CLUSTER_CONFIG(4, Collections.singletonList(KEY_STRING_STORE_NAME)), STORE_VALUE_SCHEMA(5, Arrays.asList(KEY_STRING_STORE_NAME, KEY_STRING_SCHEMA_ID)), - VALUE_SCHEMAS_WRITTEN_PER_STORE_VERSION(6, Arrays.asList(KEY_STRING_STORE_NAME, KEY_STRING_VERSION_NUMBER)); + VALUE_SCHEMAS_WRITTEN_PER_STORE_VERSION(6, Arrays.asList(KEY_STRING_STORE_NAME, KEY_STRING_VERSION_NUMBER)), + HEARTBEAT(7, Collections.singletonList(KEY_STRING_STORE_NAME)); private static final MetaStoreDataType[] TYPES_ARRAY = EnumUtils.getEnumValuesArray(MetaStoreDataType.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreReader.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreReader.java new file mode 100644 index 0000000000..8cd9199c96 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreReader.java @@ -0,0 +1,73 @@ +package com.linkedin.venice.system.store; + +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; + +import com.linkedin.d2.balancer.D2Client; +import com.linkedin.venice.client.store.AvroSpecificStoreClient; +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.ClientFactory; +import com.linkedin.venice.common.VeniceSystemStoreUtils; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.systemstore.schemas.StoreMetaKey; +import com.linkedin.venice.systemstore.schemas.StoreMetaValue; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.io.Closeable; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class MetaStoreReader implements Closeable { + private static final Logger LOGGER = LogManager.getLogger(MetaStoreReader.class); + private static final int DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS = 3; + private final Map> veniceClients = + new VeniceConcurrentHashMap<>(); + private final D2Client d2Client; + private final String clusterDiscoveryD2ServiceName; + + public MetaStoreReader(D2Client d2Client, String clusterDiscoveryD2ServiceName) { + this.d2Client = d2Client; + this.clusterDiscoveryD2ServiceName = clusterDiscoveryD2ServiceName; + } + + public long getHeartbeat(String storeName) { + AvroSpecificStoreClient client = getVeniceClient(storeName); + StoreMetaKey key = + MetaStoreDataType.HEARTBEAT.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + try { + StoreMetaValue value = client.get(key).get(DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (value == null) { + return 0; + } else { + return value.timestamp; + } + } catch (Exception e) { + throw new VeniceException(e); + } + } + + AvroSpecificStoreClient getVeniceClient(String storeName) { + return veniceClients.computeIfAbsent(storeName, (s) -> { + ClientConfig clientConfig = + ClientConfig.defaultGenericClientConfig(VeniceSystemStoreUtils.getMetaStoreName(storeName)) + .setD2Client(d2Client) + .setD2ServiceName(clusterDiscoveryD2ServiceName) + .setSpecificValueClass(StoreMetaValue.class); + return ClientFactory.getAndStartSpecificAvroClient(clientConfig); + }); + } + + @Override + public void close() { + veniceClients.forEach((storeName, veniceClient) -> { + try { + veniceClient.close(); + } catch (Exception e) { + LOGGER.error("Can not close VeniceClient.", e); + } + }); + } + +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java index 749dc2ca4d..61fc97859f 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -146,6 +147,15 @@ public void writeStoreValueSchemas(String storeName, Collection val }); } + public void writeHeartbeat(String storeName, long heartbeatTimestamp) { + write( + storeName, + MetaStoreDataType.HEARTBEAT, + () -> Collections.singletonMap(KEY_STRING_STORE_NAME, storeName), + StoreMetaValue::new, + heartbeatTimestamp); + } + /** * Improved version of writeStoreValueSchemas. Instead of writing all value schemas into one K/V pair we write it to * a different key space where each K/V pair only represents one version of the value schema. This allows us to store @@ -330,13 +340,22 @@ private void write( MetaStoreDataType dataType, Supplier> keyStringSupplier, Supplier valueSupplier) { + write(storeName, dataType, keyStringSupplier, valueSupplier, System.currentTimeMillis()); + } + + private void write( + String storeName, + MetaStoreDataType dataType, + Supplier> keyStringSupplier, + Supplier valueSupplier, + long timestamp) { String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); StoreMetaKey key = dataType.getStoreMetaKey(keyStringSupplier.get()); StoreMetaValue value = valueSupplier.get(); - value.timestamp = System.currentTimeMillis(); - writeMessageWithRetry(metaStoreName, vw -> { - vw.put(key, value, AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.currentProtocolVersion.get()); - }); + value.timestamp = timestamp; + writeMessageWithRetry( + metaStoreName, + vw -> vw.put(key, value, AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.currentProtocolVersion.get())); } private void update( diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreReaderTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreReaderTest.java new file mode 100644 index 0000000000..1ba79b9764 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreReaderTest.java @@ -0,0 +1,58 @@ +package com.linkedin.venice.system.store; + +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.d2.balancer.D2Client; +import com.linkedin.venice.client.store.AvroSpecificStoreClient; +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.systemstore.schemas.StoreMetaKey; +import com.linkedin.venice.systemstore.schemas.StoreMetaValue; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class MetaStoreReaderTest { + private static final String CLUSTER_DISCOVERY_D2_SERVICE_NAME = + ClientConfig.DEFAULT_CLUSTER_DISCOVERY_D2_SERVICE_NAME + "_test"; + + private D2Client d2ClientMock; + private AvroSpecificStoreClient storeClientMock; + private final static String storeName = "venice-test-meta-store"; + + @BeforeMethod + public void setUp() { + d2ClientMock = mock(D2Client.class); + storeClientMock = mock(AvroSpecificStoreClient.class); + } + + @Test() + public void testGetHeartbeatFromMetaStore() throws ExecutionException, InterruptedException, TimeoutException { + StoreMetaKey storeMetaKey = + MetaStoreDataType.HEARTBEAT.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + StoreMetaValue storeMetaValue = new StoreMetaValue(); + storeMetaValue.timestamp = 123L; + + MetaStoreReader storeReaderSpy = spy(new MetaStoreReader(d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME)); + CompletableFuture completableFutureMock = mock(CompletableFuture.class); + + doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any()); + when(storeClientMock.get(storeMetaKey)).thenReturn(completableFutureMock); + when(completableFutureMock.get(anyLong(), any())).thenReturn(storeMetaValue); + + Assert.assertEquals(storeReaderSpy.getHeartbeat(storeName), 123L); + verify(completableFutureMock).get(anyLong(), any()); + verify(storeClientMock).get(storeMetaKey); + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java index 8f81ca144d..ec04beeeda 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java @@ -1,6 +1,7 @@ package com.linkedin.venice.system.store; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; @@ -9,8 +10,13 @@ import static org.mockito.Mockito.when; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.systemstore.schemas.StoreMetaKey; +import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.writer.VeniceWriter; +import java.util.Collections; import java.util.concurrent.locks.ReentrantLock; +import org.mockito.ArgumentCaptor; +import org.testng.Assert; import org.testng.annotations.Test; @@ -31,4 +37,32 @@ public void testMetaStoreWriterWillRestartUponProduceFailure() { verify(badWriter, times(1)).delete(any(), any()); verify(goodWriter, times(1)).delete(any(), any()); } + + @Test + public void testMetaStoreWriterSendHeartbeatMessage() { + // Mock + MetaStoreWriter metaStoreWriter = mock(MetaStoreWriter.class); + ReentrantLock reentrantLock = new ReentrantLock(); + String metaStoreName = "testStore"; + when(metaStoreWriter.getOrCreateMetaStoreWriterLock(anyString())).thenReturn(reentrantLock); + VeniceWriter goodWriter = mock(VeniceWriter.class); + when(metaStoreWriter.getOrCreateMetaStoreWriter(anyString())).thenReturn(goodWriter); + doCallRealMethod().when(metaStoreWriter).writeHeartbeat(anyString(), anyLong()); + doCallRealMethod().when(metaStoreWriter).writeMessageWithRetry(anyString(), any()); + long timestamp = 123L; + + // Action + metaStoreWriter.writeHeartbeat(metaStoreName, timestamp); + ArgumentCaptor keyArgumentCaptor = ArgumentCaptor.forClass(StoreMetaKey.class); + ArgumentCaptor valueArgumentCaptor = ArgumentCaptor.forClass(StoreMetaValue.class); + ArgumentCaptor schemaArgumentCaptor = ArgumentCaptor.forClass(Integer.class); + verify(goodWriter).put(keyArgumentCaptor.capture(), valueArgumentCaptor.capture(), schemaArgumentCaptor.capture()); + + // Assertion + StoreMetaKey capturedKey = keyArgumentCaptor.getValue(); + Assert.assertEquals(capturedKey.keyStrings, Collections.singletonList(metaStoreName)); + Assert.assertEquals(capturedKey.metadataType, MetaStoreDataType.HEARTBEAT.getValue()); + StoreMetaValue capturedValue = valueArgumentCaptor.getValue(); + Assert.assertEquals(capturedValue.timestamp, timestamp); + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index bc05e8275f..d4490681c4 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -29,6 +29,7 @@ import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter; +import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType; @@ -38,6 +39,7 @@ import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue; import com.linkedin.venice.status.protocol.PushJobDetails; import com.linkedin.venice.status.protocol.PushJobStatusRecordKey; +import com.linkedin.venice.system.store.MetaStoreReader; import com.linkedin.venice.system.store.MetaStoreWriter; import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.VeniceProperties; @@ -743,6 +745,8 @@ void updateRoutersClusterConfig( */ MetaStoreWriter getMetaStoreWriter(); + MetaStoreReader getMetaStoreReader(); + /** * Return {@link PushStatusStoreRecordDeleter}. */ @@ -943,4 +947,6 @@ default void clearInstanceMonitor(String clusterName) { } Optional getPushStatusStoreReader(); + + Optional getPushStatusStoreWriter(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java index f856c2ad75..5fd391d395 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java @@ -6,6 +6,7 @@ import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.controller.stats.AggPartitionHealthStats; import com.linkedin.venice.controller.stats.VeniceAdminStats; +import com.linkedin.venice.controller.systemstore.SystemStoreHealthCheckService; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixAdapterSerializer; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; @@ -73,6 +74,7 @@ public class HelixVeniceClusterResources implements VeniceResource { private final Optional accessController; private final ExecutorService errorPartitionResetExecutorService = Executors.newSingleThreadExecutor(); private final StoragePersonaRepository storagePersonaRepository; + private final SystemStoreHealthCheckService systemStoreHealthCheckService; private ErrorPartitionResetTask errorPartitionResetTask = null; private final Optional metaStoreWriter; @@ -206,6 +208,26 @@ public HelixVeniceClusterResources( veniceAdminStats = new VeniceAdminStats(metricsRepository, "venice-admin-" + clusterName); this.storagePersonaRepository = new StoragePersonaRepository(clusterName, this.storeMetadataRepository, adapterSerializer, zkClient); + if (!config.isParent() && config.isSystemStoreHealthCheckEnabled()) { + if (!admin.getPushStatusStoreReader().isPresent()) { + throw new VeniceException("Da Vinci push status reader is not enabled."); + } + if (!admin.getPushStatusStoreWriter().isPresent()) { + throw new VeniceException("Da Vinci push status writer is not enabled."); + } + this.systemStoreHealthCheckService = new SystemStoreHealthCheckService( + storeMetadataRepository, + metricsRepository, + clusterName, + admin.getMetaStoreReader(), + admin.getMetaStoreWriter(), + admin.getPushStatusStoreReader().get(), + admin.getPushStatusStoreWriter().get(), + config.getSystemStoreHealthCheckIntervalSeconds(), + config.getSystemStoreHealthCheckHeartbeatWaitTimeSeconds()); + } else { + this.systemStoreHealthCheckService = null; + } } private List getActiveActiveRealTimeSourceKafkaURLs(VeniceControllerConfig config) { @@ -337,6 +359,22 @@ public void stopLeakedPushStatusCleanUpService() { } } + public void startSystemStoreHealthCheckService() { + if (systemStoreHealthCheckService != null) { + systemStoreHealthCheckService.start(); + } + } + + public void stopSystemStoreHealthCheckService() { + if (systemStoreHealthCheckService != null) { + try { + systemStoreHealthCheckService.stop(); + } catch (Exception e) { + LOGGER.error("Error when stopping system store health check service for cluster: {}", clusterName); + } + } + } + public ReadWriteStoreRepository getStoreMetadataRepository() { return storeMetadataRepository; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java index 7aacb42b17..c092466d9e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java @@ -46,6 +46,9 @@ import static com.linkedin.venice.ConfigKeys.CONTROLLER_STORE_GRAVEYARD_CLEANUP_SLEEP_INTERVAL_BETWEEN_LIST_FETCH_MINUTES; import static com.linkedin.venice.ConfigKeys.CONTROLLER_SYSTEM_SCHEMA_CLUSTER_NAME; import static com.linkedin.venice.ConfigKeys.CONTROLLER_SYSTEM_STORE_ACL_SYNCHRONIZATION_DELAY_MS; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_ENABLED; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_HEARTBEAT_WAIT_TIME_SECONDS; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_INTERVAL_SECONDS; import static com.linkedin.venice.ConfigKeys.CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED; import static com.linkedin.venice.ConfigKeys.CONTROLLER_ZK_SHARED_META_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_ENABLED; @@ -277,6 +280,12 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig { private final int storeGraveyardCleanupSleepIntervalBetweenListFetchMinutes; + private final boolean systemStoreHealthCheckEnabled; + + private final int systemStoreHealthCheckIntervalSeconds; + + private final int systemStoreHealthCheckHeartbeatWaitTimeSeconds; + private final boolean parentExternalSupersetSchemaGenerationEnabled; private final boolean systemSchemaInitializationAtStartTimeEnabled; @@ -483,6 +492,11 @@ public VeniceControllerConfig(VeniceProperties props) { this.storeGraveyardCleanupDelayMinutes = props.getInt(CONTROLLER_STORE_GRAVEYARD_CLEANUP_DELAY_MINUTES, 0); this.storeGraveyardCleanupSleepIntervalBetweenListFetchMinutes = props.getInt(CONTROLLER_STORE_GRAVEYARD_CLEANUP_SLEEP_INTERVAL_BETWEEN_LIST_FETCH_MINUTES, 15); + this.systemStoreHealthCheckEnabled = props.getBoolean(CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_ENABLED, false); + this.systemStoreHealthCheckIntervalSeconds = + props.getInt(CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_INTERVAL_SECONDS, 3600); + this.systemStoreHealthCheckHeartbeatWaitTimeSeconds = + props.getInt(CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_HEARTBEAT_WAIT_TIME_SECONDS, 60); this.clusterDiscoveryD2ServiceName = props.getString(CLUSTER_DISCOVERY_D2_SERVICE, ClientConfig.DEFAULT_CLUSTER_DISCOVERY_D2_SERVICE_NAME); this.parentExternalSupersetSchemaGenerationEnabled = @@ -913,6 +927,18 @@ public int getStoreGraveyardCleanupSleepIntervalBetweenListFetchMinutes() { return storeGraveyardCleanupSleepIntervalBetweenListFetchMinutes; } + public boolean isSystemStoreHealthCheckEnabled() { + return systemStoreHealthCheckEnabled; + } + + public int getSystemStoreHealthCheckIntervalSeconds() { + return systemStoreHealthCheckIntervalSeconds; + } + + public int getSystemStoreHealthCheckHeartbeatWaitTimeSeconds() { + return systemStoreHealthCheckHeartbeatWaitTimeSeconds; + } + public boolean isParentExternalSupersetSchemaGenerationEnabled() { return parentExternalSupersetSchemaGenerationEnabled; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java index ab9c05ee95..73ce81a27f 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java @@ -195,6 +195,7 @@ private void initClusterResources() { clusterResources.refresh(); clusterResources.startErrorPartitionResetTask(); clusterResources.startLeakedPushStatusCleanUpService(); + clusterResources.startSystemStoreHealthCheckService(); } /** @@ -298,6 +299,7 @@ private synchronized void closeHelixManager() { /** synchronized because concurrent calls could cause a NPE */ private synchronized void clearResources() { if (clusterResources != null) { + clusterResources.stopSystemStoreHealthCheckService(); /** * Leaked push status clean up service depends on VeniceHelixAdmin, so VeniceHelixAdmin should be stopped after * its dependent service. diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 4a4681fad2..23a0b04d71 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -187,6 +187,7 @@ import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue; import com.linkedin.venice.status.protocol.PushJobDetails; import com.linkedin.venice.status.protocol.PushJobStatusRecordKey; +import com.linkedin.venice.system.store.MetaStoreReader; import com.linkedin.venice.system.store.MetaStoreWriter; import com.linkedin.venice.utils.AvroSchemaUtils; import com.linkedin.venice.utils.EncodingUtils; @@ -353,6 +354,7 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner { private final SharedHelixReadOnlyZKSharedSystemStoreRepository zkSharedSystemStoreRepository; private final SharedHelixReadOnlyZKSharedSchemaRepository zkSharedSchemaRepository; private final MetaStoreWriter metaStoreWriter; + private final MetaStoreReader metaStoreReader; private final D2Client d2Client; private final Map clusterToLiveClusterConfigRepo; private final boolean usePushStatusStoreToReadServerIncrementalPushStatus; @@ -523,6 +525,7 @@ public VeniceHelixAdmin( isControllerClusterHAAS = commonConfig.isControllerClusterLeaderHAAS(); coloLeaderClusterName = commonConfig.getClusterName(); pushJobStatusStoreClusterName = commonConfig.getPushJobStatusStoreClusterName(); + // TODO: We need to consider removing this config, as push status store is rolled out everywhere. if (commonConfig.isDaVinciPushStatusStoreEnabled()) { pushStatusStoreReader = Optional.of( new PushStatusStoreReader( @@ -558,6 +561,7 @@ public VeniceHelixAdmin( veniceWriterFactory, zkSharedSchemaRepository, pubSubTopicRepository); + metaStoreReader = new MetaStoreReader(d2Client, commonConfig.getClusterDiscoveryD2ServiceName()); clusterToLiveClusterConfigRepo = new VeniceConcurrentHashMap<>(); dataRecoveryManager = new DataRecoveryManager( @@ -7293,6 +7297,11 @@ public MetaStoreWriter getMetaStoreWriter() { return metaStoreWriter; } + @Override + public MetaStoreReader getMetaStoreReader() { + return metaStoreReader; + } + /** * @see Admin#getPushStatusStoreRecordDeleter() */ @@ -7810,6 +7819,11 @@ public Optional getPushStatusStoreReader() { return pushStatusStoreReader; } + @Override + public Optional getPushStatusStoreWriter() { + return pushStatusStoreWriter; + } + public Optional getSslFactory() { return sslFactory; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index f475bfde05..7691adc684 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -185,6 +185,7 @@ import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter; +import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; import com.linkedin.venice.schema.AvroSchemaParseUtils; import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.schema.SchemaData; @@ -199,6 +200,7 @@ import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue; import com.linkedin.venice.status.protocol.PushJobDetails; import com.linkedin.venice.status.protocol.PushJobStatusRecordKey; +import com.linkedin.venice.system.store.MetaStoreReader; import com.linkedin.venice.system.store.MetaStoreWriter; import com.linkedin.venice.utils.AvroSchemaUtils; import com.linkedin.venice.utils.CollectionUtils; @@ -4701,6 +4703,11 @@ public MetaStoreWriter getMetaStoreWriter() { return getVeniceHelixAdmin().getMetaStoreWriter(); } + @Override + public MetaStoreReader getMetaStoreReader() { + return getVeniceHelixAdmin().getMetaStoreReader(); + } + /** * @see Admin#getPushStatusStoreRecordDeleter() */ @@ -5183,4 +5190,9 @@ public void removeStoreFromGraveyard(String clusterName, String storeName) { public Optional getPushStatusStoreReader() { throw new VeniceUnsupportedOperationException("Parent controller does not have Da Vinci push status store reader"); } + + @Override + public Optional getPushStatusStoreWriter() { + throw new VeniceUnsupportedOperationException("Parent controller does not have Da Vinci push status store writer"); + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/SystemStoreCheckStats.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/SystemStoreCheckStats.java new file mode 100644 index 0000000000..f2fcbd209c --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/SystemStoreCheckStats.java @@ -0,0 +1,25 @@ +package com.linkedin.venice.controller.stats; + +import com.linkedin.venice.stats.AbstractVeniceStats; +import com.linkedin.venice.stats.Gauge; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; +import java.util.function.Supplier; + + +public class SystemStoreCheckStats extends AbstractVeniceStats { + private final Sensor badMetaSystemStoreCount; + private final Sensor badPushStatusSystemStoreCount; + + public SystemStoreCheckStats( + MetricsRepository metricsRepository, + String name, + Supplier badMetaSystemStoreValueSupplier, + Supplier badPushStatusStoreValueSupplier) { + super(metricsRepository, name); + badMetaSystemStoreCount = + registerSensorIfAbsent("bad_meta_system_store_count", new Gauge(badMetaSystemStoreValueSupplier.get())); + badPushStatusSystemStoreCount = + registerSensorIfAbsent("bad_push_status_system_store_count", new Gauge(badPushStatusStoreValueSupplier.get())); + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreHealthCheckService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreHealthCheckService.java new file mode 100644 index 0000000000..d959e47c07 --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreHealthCheckService.java @@ -0,0 +1,241 @@ +package com.linkedin.venice.controller.systemstore; + +import static java.lang.Thread.currentThread; + +import com.linkedin.venice.common.PushStatusStoreUtils; +import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.common.VeniceSystemStoreUtils; +import com.linkedin.venice.controller.stats.SystemStoreCheckStats; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.ReadWriteStoreRepository; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; +import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; +import com.linkedin.venice.service.AbstractVeniceService; +import com.linkedin.venice.system.store.MetaStoreReader; +import com.linkedin.venice.system.store.MetaStoreWriter; +import com.linkedin.venice.utils.RetryUtils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import io.tehuti.metrics.MetricsRepository; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * This class is used to schedule periodic check of user system stores in Venice cluster to make sure system stores has + * good version and is ingesting correctly. This service is expected to be setup in the leader controller of the cluster, + * and it will periodically scan all user system stores and collect bad system stores. Parent controller will hit the + * controller endpoint to collect information about bad system stores. + */ +public class SystemStoreHealthCheckService extends AbstractVeniceService { + public static final Logger LOGGER = LogManager.getLogger(SystemStoreHealthCheckService.class); + private final ReadWriteStoreRepository storeRepository; + private final PushStatusStoreReader pushStatusStoreReader; + private final PushStatusStoreWriter pushStatusStoreWriter; + private final MetaStoreReader metaStoreReader; + private final MetaStoreWriter metaStoreWriter; + private final AtomicLong badMetaStoreCount = new AtomicLong(0); + private final AtomicLong badPushStatusStoreCount = new AtomicLong(0); + + private final int checkPeriodInSeconds; + private final int heartbeatWaitTimeSeconds; + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicReference> unhealthySystemStoreSet = new AtomicReference<>(); + private final SystemStoreCheckStats systemStoreCheckStats; + private ScheduledExecutorService checkServiceExecutor; + + public SystemStoreHealthCheckService( + ReadWriteStoreRepository storeRepository, + MetricsRepository metricsRepository, + String clusterName, + MetaStoreReader metaStoreReader, + MetaStoreWriter metaStoreWriter, + PushStatusStoreReader pushStatusStoreReader, + PushStatusStoreWriter pushStatusStoreWriter, + int systemStoreCheckPeriodInSeconds, + int systemStoreHealthCheckHeartbeatWaitTimeSeconds) { + this.storeRepository = storeRepository; + this.metaStoreWriter = metaStoreWriter; + this.metaStoreReader = metaStoreReader; + this.pushStatusStoreWriter = pushStatusStoreWriter; + this.pushStatusStoreReader = pushStatusStoreReader; + this.checkPeriodInSeconds = systemStoreCheckPeriodInSeconds; + this.heartbeatWaitTimeSeconds = systemStoreHealthCheckHeartbeatWaitTimeSeconds; + this.unhealthySystemStoreSet.set(new HashSet<>()); + this.systemStoreCheckStats = + new SystemStoreCheckStats(metricsRepository, clusterName, badMetaStoreCount::get, badPushStatusStoreCount::get); + } + + /** + * Return unhealthy system store name set. This API is expected to be called by parent controller. + */ + public Set getUnhealthySystemStoreSet() { + return unhealthySystemStoreSet.get(); + } + + @Override + public boolean startInner() { + checkServiceExecutor = Executors.newScheduledThreadPool(1); + isRunning.set(true); + checkServiceExecutor.scheduleWithFixedDelay( + new SystemStoreHealthCheckTask(), + checkPeriodInSeconds, + checkPeriodInSeconds, + TimeUnit.SECONDS); + LOGGER.info("System store health check executor service is started."); + return true; + } + + @Override + public void stopInner() { + isRunning.set(false); + checkServiceExecutor.shutdownNow(); + try { + if (!checkServiceExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("Current task in system store health check executor service is not terminated after 5 seconds."); + } + } catch (InterruptedException e) { + currentThread().interrupt(); + } + LOGGER.info("System store health check executor service is shutdown."); + } + + class SystemStoreHealthCheckTask implements Runnable { + @Override + public void run() { + if (!getIsRunning().get()) { + return; + } + Set newUnhealthySystemStoreSet = new HashSet<>(); + Map systemStoreToHeartbeatTimestampMap = new VeniceConcurrentHashMap<>(); + checkAndSendHeartbeatToSystemStores(newUnhealthySystemStoreSet, systemStoreToHeartbeatTimestampMap); + try { + // Sleep for enough time for system store to consume heartbeat messages. + Thread.sleep(TimeUnit.SECONDS.toMillis(heartbeatWaitTimeSeconds)); + } catch (InterruptedException e) { + LOGGER.info("Caught interrupted exception, will exit now."); + return; + } + + checkSystemStoreHeartbeat(newUnhealthySystemStoreSet, systemStoreToHeartbeatTimestampMap); + if (!getIsRunning().get()) { + return; + } + // Update the unhealthy system store set. + unhealthySystemStoreSet.set(newUnhealthySystemStoreSet); + long newBadMetaSystemStoreCount = newUnhealthySystemStoreSet.stream() + .filter(x -> VeniceSystemStoreType.getSystemStoreType(x).equals(VeniceSystemStoreType.META_STORE)) + .count(); + badMetaStoreCount.set(newBadMetaSystemStoreCount); + badPushStatusStoreCount.set(newUnhealthySystemStoreSet.size() - newBadMetaSystemStoreCount); + LOGGER.info("Collected unhealthy system stores: {}", newUnhealthySystemStoreSet.toString()); + } + } + + void checkAndSendHeartbeatToSystemStores( + Set newUnhealthySystemStoreSet, + Map systemStoreToHeartbeatTimestampMap) { + for (Store store: getStoreRepository().getAllStores()) { + if (!getIsRunning().get()) { + return; + } + // For user store, if corresponding system store flag is not true, it indicates system store is not created. + if (!VeniceSystemStoreUtils.isUserSystemStore(store.getName())) { + if (!store.isDaVinciPushStatusStoreEnabled()) { + newUnhealthySystemStoreSet + .add(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(store.getName())); + } + if (!store.isStoreMetaSystemStoreEnabled()) { + newUnhealthySystemStoreSet.add(VeniceSystemStoreType.META_STORE.getSystemStoreName(store.getName())); + } + continue; + } + /** + * System store does not have an online serving version. + */ + if (store.getCurrentVersion() == 0) { + newUnhealthySystemStoreSet.add(store.getName()); + continue; + } + VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(store.getName()); + String userStoreName = systemStoreType.extractRegularStoreName(store.getName()); + long currentTimestamp = System.currentTimeMillis(); + if (VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.equals(systemStoreType)) { + getPushStatusStoreWriter().writeHeartbeat(userStoreName, currentTimestamp); + } else { + getMetaStoreWriter().writeHeartbeat(userStoreName, currentTimestamp); + } + systemStoreToHeartbeatTimestampMap.put(store.getName(), currentTimestamp); + } + } + + void checkSystemStoreHeartbeat( + Set newUnhealthySystemStoreSet, + Map systemStoreToHeartbeatTimestampMap) { + for (Map.Entry entry: systemStoreToHeartbeatTimestampMap.entrySet()) { + if (!getIsRunning().get()) { + return; + } + if (!isSystemStoreIngesting(entry.getKey(), entry.getValue())) { + newUnhealthySystemStoreSet.add(entry.getKey()); + } + } + } + + boolean isSystemStoreIngesting(String systemStoreName, long heartbeatTimestamp) { + VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(systemStoreName); + String userStoreName = systemStoreType.extractRegularStoreName(systemStoreName); + try { + return RetryUtils.executeWithMaxRetriesAndFixedAttemptDuration(() -> { + long retrievedTimestamp; + if (systemStoreType == VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE) { + retrievedTimestamp = getPushStatusStoreReader() + .getHeartbeat(userStoreName, PushStatusStoreUtils.CONTROLLER_HEARTBEAT_INSTANCE_NAME); + } else { + retrievedTimestamp = getMetaStoreReader().getHeartbeat(userStoreName); + } + if (retrievedTimestamp < heartbeatTimestamp) { + throw new VeniceException("Heartbeat not refreshed."); + } + return true; + }, 3, Duration.ofSeconds(1), Collections.singletonList(VeniceException.class)); + } catch (VeniceException e) { + return false; + } + } + + MetaStoreReader getMetaStoreReader() { + return metaStoreReader; + } + + PushStatusStoreReader getPushStatusStoreReader() { + return pushStatusStoreReader; + } + + MetaStoreWriter getMetaStoreWriter() { + return metaStoreWriter; + } + + PushStatusStoreWriter getPushStatusStoreWriter() { + return pushStatusStoreWriter; + } + + AtomicBoolean getIsRunning() { + return isRunning; + } + + ReadWriteStoreRepository getStoreRepository() { + return storeRepository; + } +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/systemstore/TestSystemStoreHealthCheckService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/systemstore/TestSystemStoreHealthCheckService.java new file mode 100644 index 0000000000..65d07cdb35 --- /dev/null +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/systemstore/TestSystemStoreHealthCheckService.java @@ -0,0 +1,165 @@ +package com.linkedin.venice.controller.systemstore; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.meta.ReadWriteStoreRepository; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; +import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; +import com.linkedin.venice.system.store.MetaStoreReader; +import com.linkedin.venice.system.store.MetaStoreWriter; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestSystemStoreHealthCheckService { + @Test + public void testCheckHeartbeat() { + MetaStoreReader metaStoreReader = mock(MetaStoreReader.class); + PushStatusStoreReader pushStatusStoreReader = mock(PushStatusStoreReader.class); + SystemStoreHealthCheckService systemStoreHealthCheckService = mock(SystemStoreHealthCheckService.class); + when(systemStoreHealthCheckService.getMetaStoreReader()).thenReturn(metaStoreReader); + when(systemStoreHealthCheckService.getPushStatusStoreReader()).thenReturn(pushStatusStoreReader); + when(systemStoreHealthCheckService.isSystemStoreIngesting(anyString(), anyLong())).thenCallRealMethod(); + + when(pushStatusStoreReader.getHeartbeat(anyString(), anyString())).thenReturn(1L, 1L, 10L); + when(metaStoreReader.getHeartbeat(anyString())).thenReturn(1L, 1L, 10L); + String userStoreName = "testStore"; + String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(userStoreName); + // Eventually should succeed. + Assert.assertTrue(systemStoreHealthCheckService.isSystemStoreIngesting(metaStoreName, 10L)); + // Eventually should fail. + Assert.assertFalse(systemStoreHealthCheckService.isSystemStoreIngesting(metaStoreName, 11L)); + + String pushStatusStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(userStoreName); + Assert.assertTrue(systemStoreHealthCheckService.isSystemStoreIngesting(pushStatusStoreName, 10L)); + Assert.assertFalse(systemStoreHealthCheckService.isSystemStoreIngesting(pushStatusStoreName, 11L)); + + } + + @Test + void testCheckSystemStoreHeartbeat() { + AtomicBoolean isRunning = new AtomicBoolean(false); + MetaStoreReader metaStoreReader = mock(MetaStoreReader.class); + PushStatusStoreReader pushStatusStoreReader = mock(PushStatusStoreReader.class); + SystemStoreHealthCheckService systemStoreHealthCheckService = mock(SystemStoreHealthCheckService.class); + when(systemStoreHealthCheckService.getMetaStoreReader()).thenReturn(metaStoreReader); + when(systemStoreHealthCheckService.getPushStatusStoreReader()).thenReturn(pushStatusStoreReader); + when(systemStoreHealthCheckService.isSystemStoreIngesting(anyString(), anyLong())).thenReturn(true, false); + when(systemStoreHealthCheckService.getIsRunning()).thenReturn(isRunning); + Set newUnhealthySystemStoreSet = new HashSet<>(); + Map systemStoreToHeartbeatTimestampMap = new VeniceConcurrentHashMap<>(); + systemStoreToHeartbeatTimestampMap.put(VeniceSystemStoreType.META_STORE.getSystemStoreName("test_store"), 1L); + systemStoreToHeartbeatTimestampMap + .put(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName("test_store"), 1L); + doCallRealMethod().when(systemStoreHealthCheckService) + .checkSystemStoreHeartbeat(newUnhealthySystemStoreSet, systemStoreToHeartbeatTimestampMap); + systemStoreHealthCheckService + .checkSystemStoreHeartbeat(newUnhealthySystemStoreSet, systemStoreToHeartbeatTimestampMap); + Assert.assertTrue(newUnhealthySystemStoreSet.isEmpty()); + verify(systemStoreHealthCheckService, times(0)).isSystemStoreIngesting(anyString(), anyLong()); + isRunning.set(true); + systemStoreHealthCheckService + .checkSystemStoreHeartbeat(newUnhealthySystemStoreSet, systemStoreToHeartbeatTimestampMap); + Assert.assertEquals(newUnhealthySystemStoreSet.size(), 1); + Assert.assertTrue( + newUnhealthySystemStoreSet + .contains(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName("test_store"))); + verify(systemStoreHealthCheckService, times(2)).isSystemStoreIngesting(anyString(), anyLong()); + + } + + @Test + public void testSendHeartbeat() { + MetaStoreWriter metaStoreWriter = mock(MetaStoreWriter.class); + PushStatusStoreWriter pushStatusStoreWriter = mock(PushStatusStoreWriter.class); + ReadWriteStoreRepository storeRepository = mock(ReadWriteStoreRepository.class); + String testStore1 = "test_store_1"; + Store userStore1 = mock(Store.class); + when(userStore1.getName()).thenReturn(testStore1); + when(userStore1.isStoreMetaSystemStoreEnabled()).thenReturn(true); + when(userStore1.isDaVinciPushStatusStoreEnabled()).thenReturn(true); + Store metaStore1 = mock(Store.class); + when(metaStore1.getName()).thenReturn(VeniceSystemStoreType.META_STORE.getSystemStoreName(testStore1)); + when(metaStore1.getCurrentVersion()).thenReturn(0); + Store pushStatusStore1 = mock(Store.class); + when(pushStatusStore1.getName()) + .thenReturn(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(testStore1)); + when(pushStatusStore1.getCurrentVersion()).thenReturn(1); + + String testStore2 = "test_store_2"; + Store userStore2 = mock(Store.class); + when(userStore2.getName()).thenReturn(testStore2); + when(userStore2.isStoreMetaSystemStoreEnabled()).thenReturn(true); + when(userStore2.isDaVinciPushStatusStoreEnabled()).thenReturn(true); + Store metaStore2 = mock(Store.class); + when(metaStore2.getName()).thenReturn(VeniceSystemStoreType.META_STORE.getSystemStoreName(testStore2)); + when(metaStore2.getCurrentVersion()).thenReturn(1); + Store pushStatusStore2 = mock(Store.class); + when(pushStatusStore2.getName()) + .thenReturn(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(testStore2)); + when(pushStatusStore2.getCurrentVersion()).thenReturn(0); + + String testStore3 = "test_store_3"; + Store userStore3 = mock(Store.class); + when(userStore3.getName()).thenReturn(testStore3); + when(userStore3.isStoreMetaSystemStoreEnabled()).thenReturn(false); + when(userStore3.isDaVinciPushStatusStoreEnabled()).thenReturn(false); + + when(storeRepository.getAllStores()).thenReturn( + Arrays.asList(metaStore1, pushStatusStore1, userStore1, metaStore2, pushStatusStore2, userStore2, userStore3)); + AtomicBoolean isRunning = new AtomicBoolean(false); + + SystemStoreHealthCheckService systemStoreHealthCheckService = mock(SystemStoreHealthCheckService.class); + when(systemStoreHealthCheckService.getMetaStoreWriter()).thenReturn(metaStoreWriter); + when(systemStoreHealthCheckService.getPushStatusStoreWriter()).thenReturn(pushStatusStoreWriter); + when(systemStoreHealthCheckService.getStoreRepository()).thenReturn(storeRepository); + doCallRealMethod().when(systemStoreHealthCheckService).checkAndSendHeartbeatToSystemStores(anySet(), anyMap()); + when(systemStoreHealthCheckService.getIsRunning()).thenReturn(isRunning); + + Set newUnhealthySystemStoreSet = new HashSet<>(); + Map systemStoreToHeartbeatTimestampMap = new VeniceConcurrentHashMap<>(); + + systemStoreHealthCheckService + .checkAndSendHeartbeatToSystemStores(newUnhealthySystemStoreSet, systemStoreToHeartbeatTimestampMap); + Assert.assertTrue(newUnhealthySystemStoreSet.isEmpty()); + Assert.assertTrue(systemStoreToHeartbeatTimestampMap.isEmpty()); + + isRunning.set(true); + systemStoreHealthCheckService + .checkAndSendHeartbeatToSystemStores(newUnhealthySystemStoreSet, systemStoreToHeartbeatTimestampMap); + Assert.assertEquals(newUnhealthySystemStoreSet.size(), 4); + Assert.assertTrue( + newUnhealthySystemStoreSet.contains(VeniceSystemStoreType.META_STORE.getSystemStoreName(testStore1))); + Assert.assertTrue( + newUnhealthySystemStoreSet + .contains(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(testStore2))); + Assert.assertTrue( + newUnhealthySystemStoreSet.contains(VeniceSystemStoreType.META_STORE.getSystemStoreName(testStore3))); + Assert.assertTrue( + newUnhealthySystemStoreSet + .contains(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(testStore3))); + + Assert.assertEquals(systemStoreToHeartbeatTimestampMap.size(), 2); + Assert.assertNotNull( + systemStoreToHeartbeatTimestampMap + .get(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(testStore1))); + Assert.assertNotNull( + systemStoreToHeartbeatTimestampMap.get(VeniceSystemStoreType.META_STORE.getSystemStoreName(testStore2))); + } +}