Skip to content

Commit

Permalink
[controller] Add system store health check service in child controller (
Browse files Browse the repository at this point in the history
#606)

Add system store health check service in child controller, it is controlled by newly added configs:
"controller.system.store.health.check.enabled" and "controller.system.store.health.check.interval" by default 1h.
This PR is the first part of the change, the follow up change is to add parent controller support to query and repair system store.
  • Loading branch information
sixpluszero authored Aug 29, 2023
1 parent 53e1ce6 commit b65db20
Show file tree
Hide file tree
Showing 18 changed files with 751 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,25 @@ private ConfigKeys() {
public static final String CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.parent.external.superset.schema.generation.enabled";

/**
* Whether to check system store health in child controller. Default is false.
*/
public static final String CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_ENABLED =
"controller.system.store.health.check.enabled";

/**
* Frequency to run system store health check in child controller. Default is 1h.
*/
public static final String CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_INTERVAL_SECONDS =
"controller.system.store.health.check.interval.seconds";

/**
* The wait time before validating system store heartbeat during system store health check in child controller.
* Default is 1min.
*/
public static final String CONTROLLER_SYSTEM_STORE_HEALTH_CHECK_HEARTBEAT_WAIT_TIME_SECONDS =
"controller.system.store.health.check.heartbeat.wait.time.seconds";

/**
* Whether to initialize system schemas when controller starts. Default is true.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public enum PushStatusKeyType {

public static final String SERVER_INCREMENTAL_PUSH_PREFIX = "SERVER_SIDE_INCREMENTAL_PUSH_STATUS";
public static final String ONGOING_INCREMENTAL_PUSH_STATUSES_KEY = "ONGOING_INCREMENTAL_PUSHES";
// This constant is the heartbeat key name for controller to check system store ingestion health.
public static final String CONTROLLER_HEARTBEAT_INSTANCE_NAME = "CONTROLLER_HEARTBEAT";

public static PushStatusKey getHeartbeatKey(String instanceName) {
PushStatusKey pushStatusKey = new PushStatusKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@


/**
* PushStatusStoreReader is a helper class for Venice controller reading PushStatus/Heartbeat messages.
* One PushStatusStoreReader for one regular Venice store.
* Don't keep a map of [storeName->client] to minimize states kept by controller.
* This class is a helper class for Venice controller to read PushStatus / Heartbeat messages.
*/
public class PushStatusStoreReader implements Closeable {
private static final Logger LOGGER = LogManager.getLogger(PushStatusStoreReader.class);
private static final int DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS = 3;
private final Map<String, AvroSpecificStoreClient<PushStatusKey, PushStatusValue>> veniceClients =
new VeniceConcurrentHashMap<>();
private final D2Client d2Client;
Expand Down Expand Up @@ -195,7 +194,8 @@ public long getHeartbeat(String storeName, String instanceName) {
AvroSpecificStoreClient<PushStatusKey, PushStatusValue> client = getVeniceClient(storeName);
PushStatusKey pushStatusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName);
try {
PushStatusValue pushStatusValue = client.get(pushStatusKey).get();
PushStatusValue pushStatusValue =
client.get(pushStatusKey).get(DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (pushStatusValue == null) {
return 0;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,22 @@ public PushStatusStoreWriter(VeniceWriterFactory writerFactory, String instanceN
this.derivedSchemaId = derivedSchemaId;
}

public void writeHeartbeat(String storeName) {
public void writeHeartbeat(String storeName, long heartbeat) {
VeniceWriter writer = veniceWriterCache.prepareVeniceWriter(storeName);
PushStatusKey pushStatusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName);
PushStatusValue pushStatusValue = new PushStatusValue();
pushStatusValue.reportTimestamp = System.currentTimeMillis();
pushStatusValue.reportTimestamp = heartbeat;
pushStatusValue.instances = Collections.emptyMap();
LOGGER.info("Sending heartbeat of {}", instanceName);
writer.put(
pushStatusKey,
pushStatusValue,
AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion());

}

public void writeHeartbeat(String storeName) {
writeHeartbeat(storeName, System.currentTimeMillis());
}

public void writePushStatus(String storeName, int version, int partitionId, ExecutionStatus status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public enum MetaStoreDataType implements VeniceEnumValue {
Arrays.asList(KEY_STRING_STORE_NAME, KEY_STRING_CLUSTER_NAME, KEY_STRING_VERSION_NUMBER, KEY_STRING_PARTITION_ID)
), STORE_CLUSTER_CONFIG(4, Collections.singletonList(KEY_STRING_STORE_NAME)),
STORE_VALUE_SCHEMA(5, Arrays.asList(KEY_STRING_STORE_NAME, KEY_STRING_SCHEMA_ID)),
VALUE_SCHEMAS_WRITTEN_PER_STORE_VERSION(6, Arrays.asList(KEY_STRING_STORE_NAME, KEY_STRING_VERSION_NUMBER));
VALUE_SCHEMAS_WRITTEN_PER_STORE_VERSION(6, Arrays.asList(KEY_STRING_STORE_NAME, KEY_STRING_VERSION_NUMBER)),
HEARTBEAT(7, Collections.singletonList(KEY_STRING_STORE_NAME));

private static final MetaStoreDataType[] TYPES_ARRAY = EnumUtils.getEnumValuesArray(MetaStoreDataType.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.linkedin.venice.system.store;

import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.Closeable;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class MetaStoreReader implements Closeable {
private static final Logger LOGGER = LogManager.getLogger(MetaStoreReader.class);
private static final int DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS = 3;
private final Map<String, AvroSpecificStoreClient<StoreMetaKey, StoreMetaValue>> veniceClients =
new VeniceConcurrentHashMap<>();
private final D2Client d2Client;
private final String clusterDiscoveryD2ServiceName;

public MetaStoreReader(D2Client d2Client, String clusterDiscoveryD2ServiceName) {
this.d2Client = d2Client;
this.clusterDiscoveryD2ServiceName = clusterDiscoveryD2ServiceName;
}

public long getHeartbeat(String storeName) {
AvroSpecificStoreClient<StoreMetaKey, StoreMetaValue> client = getVeniceClient(storeName);
StoreMetaKey key =
MetaStoreDataType.HEARTBEAT.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
try {
StoreMetaValue value = client.get(key).get(DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (value == null) {
return 0;
} else {
return value.timestamp;
}
} catch (Exception e) {
throw new VeniceException(e);
}
}

AvroSpecificStoreClient<StoreMetaKey, StoreMetaValue> getVeniceClient(String storeName) {
return veniceClients.computeIfAbsent(storeName, (s) -> {
ClientConfig clientConfig =
ClientConfig.defaultGenericClientConfig(VeniceSystemStoreUtils.getMetaStoreName(storeName))
.setD2Client(d2Client)
.setD2ServiceName(clusterDiscoveryD2ServiceName)
.setSpecificValueClass(StoreMetaValue.class);
return ClientFactory.getAndStartSpecificAvroClient(clientConfig);
});
}

@Override
public void close() {
veniceClients.forEach((storeName, veniceClient) -> {
try {
veniceClient.close();
} catch (Exception e) {
LOGGER.error("Can not close VeniceClient.", e);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -146,6 +147,15 @@ public void writeStoreValueSchemas(String storeName, Collection<SchemaEntry> val
});
}

public void writeHeartbeat(String storeName, long heartbeatTimestamp) {
write(
storeName,
MetaStoreDataType.HEARTBEAT,
() -> Collections.singletonMap(KEY_STRING_STORE_NAME, storeName),
StoreMetaValue::new,
heartbeatTimestamp);
}

/**
* Improved version of writeStoreValueSchemas. Instead of writing all value schemas into one K/V pair we write it to
* a different key space where each K/V pair only represents one version of the value schema. This allows us to store
Expand Down Expand Up @@ -330,13 +340,22 @@ private void write(
MetaStoreDataType dataType,
Supplier<Map<String, String>> keyStringSupplier,
Supplier<StoreMetaValue> valueSupplier) {
write(storeName, dataType, keyStringSupplier, valueSupplier, System.currentTimeMillis());
}

private void write(
String storeName,
MetaStoreDataType dataType,
Supplier<Map<String, String>> keyStringSupplier,
Supplier<StoreMetaValue> valueSupplier,
long timestamp) {
String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
StoreMetaKey key = dataType.getStoreMetaKey(keyStringSupplier.get());
StoreMetaValue value = valueSupplier.get();
value.timestamp = System.currentTimeMillis();
writeMessageWithRetry(metaStoreName, vw -> {
vw.put(key, value, AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.currentProtocolVersion.get());
});
value.timestamp = timestamp;
writeMessageWithRetry(
metaStoreName,
vw -> vw.put(key, value, AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.currentProtocolVersion.get()));
}

private void update(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.linkedin.venice.system.store;

import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class MetaStoreReaderTest {
private static final String CLUSTER_DISCOVERY_D2_SERVICE_NAME =
ClientConfig.DEFAULT_CLUSTER_DISCOVERY_D2_SERVICE_NAME + "_test";

private D2Client d2ClientMock;
private AvroSpecificStoreClient<StoreMetaKey, StoreMetaValue> storeClientMock;
private final static String storeName = "venice-test-meta-store";

@BeforeMethod
public void setUp() {
d2ClientMock = mock(D2Client.class);
storeClientMock = mock(AvroSpecificStoreClient.class);
}

@Test()
public void testGetHeartbeatFromMetaStore() throws ExecutionException, InterruptedException, TimeoutException {
StoreMetaKey storeMetaKey =
MetaStoreDataType.HEARTBEAT.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
StoreMetaValue storeMetaValue = new StoreMetaValue();
storeMetaValue.timestamp = 123L;

MetaStoreReader storeReaderSpy = spy(new MetaStoreReader(d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME));
CompletableFuture<StoreMetaValue> completableFutureMock = mock(CompletableFuture.class);

doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.get(storeMetaKey)).thenReturn(completableFutureMock);
when(completableFutureMock.get(anyLong(), any())).thenReturn(storeMetaValue);

Assert.assertEquals(storeReaderSpy.getHeartbeat(storeName), 123L);
verify(completableFutureMock).get(anyLong(), any());
verify(storeClientMock).get(storeMetaKey);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.system.store;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
Expand All @@ -9,8 +10,13 @@
import static org.mockito.Mockito.when;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.writer.VeniceWriter;
import java.util.Collections;
import java.util.concurrent.locks.ReentrantLock;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.Test;


Expand All @@ -31,4 +37,32 @@ public void testMetaStoreWriterWillRestartUponProduceFailure() {
verify(badWriter, times(1)).delete(any(), any());
verify(goodWriter, times(1)).delete(any(), any());
}

@Test
public void testMetaStoreWriterSendHeartbeatMessage() {
// Mock
MetaStoreWriter metaStoreWriter = mock(MetaStoreWriter.class);
ReentrantLock reentrantLock = new ReentrantLock();
String metaStoreName = "testStore";
when(metaStoreWriter.getOrCreateMetaStoreWriterLock(anyString())).thenReturn(reentrantLock);
VeniceWriter goodWriter = mock(VeniceWriter.class);
when(metaStoreWriter.getOrCreateMetaStoreWriter(anyString())).thenReturn(goodWriter);
doCallRealMethod().when(metaStoreWriter).writeHeartbeat(anyString(), anyLong());
doCallRealMethod().when(metaStoreWriter).writeMessageWithRetry(anyString(), any());
long timestamp = 123L;

// Action
metaStoreWriter.writeHeartbeat(metaStoreName, timestamp);
ArgumentCaptor<StoreMetaKey> keyArgumentCaptor = ArgumentCaptor.forClass(StoreMetaKey.class);
ArgumentCaptor<StoreMetaValue> valueArgumentCaptor = ArgumentCaptor.forClass(StoreMetaValue.class);
ArgumentCaptor<Integer> schemaArgumentCaptor = ArgumentCaptor.forClass(Integer.class);
verify(goodWriter).put(keyArgumentCaptor.capture(), valueArgumentCaptor.capture(), schemaArgumentCaptor.capture());

// Assertion
StoreMetaKey capturedKey = keyArgumentCaptor.getValue();
Assert.assertEquals(capturedKey.keyStrings, Collections.singletonList(metaStoreName));
Assert.assertEquals(capturedKey.metadataType, MetaStoreDataType.HEARTBEAT.getValue());
StoreMetaValue capturedValue = valueArgumentCaptor.getValue();
Assert.assertEquals(capturedValue.timestamp, timestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter;
import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter;
import com.linkedin.venice.schema.GeneratedSchemaID;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType;
Expand All @@ -38,6 +39,7 @@
import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue;
import com.linkedin.venice.status.protocol.PushJobDetails;
import com.linkedin.venice.status.protocol.PushJobStatusRecordKey;
import com.linkedin.venice.system.store.MetaStoreReader;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.VeniceProperties;
Expand Down Expand Up @@ -743,6 +745,8 @@ void updateRoutersClusterConfig(
*/
MetaStoreWriter getMetaStoreWriter();

MetaStoreReader getMetaStoreReader();

/**
* Return {@link PushStatusStoreRecordDeleter}.
*/
Expand Down Expand Up @@ -943,4 +947,6 @@ default void clearInstanceMonitor(String clusterName) {
}

Optional<PushStatusStoreReader> getPushStatusStoreReader();

Optional<PushStatusStoreWriter> getPushStatusStoreWriter();
}
Loading

0 comments on commit b65db20

Please sign in to comment.