Skip to content

Commit

Permalink
[controller] Enable backend to do dual reads on DVC incremental push …
Browse files Browse the repository at this point in the history
…status (#1219)

* [controller] Enable backend to do dual reads on DVC incremental push status

Controller will first try to read from version level key for DVC
incremental push status; if version level key exists and the overall
status is END_OF_INCREMENTAL_PUSH, controller would still check
the partition level key and combine the result from both keys,
in case the DVC is partially upgraded.

During partition level push status analysis, the instances that are
reporting version level keys will be ignored.

Other changes:
Add a few more logs for each job status polling request to
help us further confirm that the version level keys are working
well, besides the signal of successful push jobs.
  • Loading branch information
huangminchn authored Oct 18, 2024
1 parent 2bab676 commit 911f9be
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public static PushStatusKey getPushKey(int version) {
return getFullPushKey(version);
}

public static PushStatusKey getPushKey(int version, Optional<String> incrementalPushVersion) {
return incrementalPushVersion.map(s -> getIncrementalPushKey(version, s)).orElseGet(() -> getFullPushKey(version));
}

public static PushStatusKey getPushKey(int version, int partitionId, Optional<String> incrementalPushVersion) {
return getPushKey(version, partitionId, incrementalPushVersion, Optional.empty());
}
Expand Down Expand Up @@ -66,6 +70,13 @@ private static PushStatusKey getFullPushKey(int version, int partitionId) {
return pushStatusKey;
}

private static PushStatusKey getIncrementalPushKey(int version, String incrementalPushVersion) {
PushStatusKey pushStatusKey = new PushStatusKey();
pushStatusKey.keyStrings = Arrays.asList(version, incrementalPushVersion);
pushStatusKey.messageType = PushStatusKeyType.INCREMENTAL_PUSH.ordinal();
return pushStatusKey;
}

private static PushStatusKey getIncrementalPushKey(int version, int partitionId, String incrementalPushVersion) {
PushStatusKey pushStatusKey = new PushStatusKey();
pushStatusKey.keyStrings = Arrays.asList(version, partitionId, incrementalPushVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@ public PushStatusStoreReader(
this.heartbeatExpirationTimeInSeconds = heartbeatExpirationTimeInSeconds;
}

public Map<CharSequence, Integer> getVersionStatus(String storeName, int version) {
public Map<CharSequence, Integer> getVersionStatus(
String storeName,
int version,
Optional<String> incrementalPushVersion) {
AvroSpecificStoreClient<PushStatusKey, PushStatusValue> client = getVeniceClient(storeName);
PushStatusKey pushStatusKey = PushStatusStoreUtils.getPushKey(version);
PushStatusKey pushStatusKey = PushStatusStoreUtils.getPushKey(version, incrementalPushVersion);
try {
PushStatusValue pushStatusValue = client.get(pushStatusKey).get(60, TimeUnit.SECONDS);
if (pushStatusValue == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -365,7 +366,7 @@ public void testNullResponseWhenVersionLevelKeyIsNotWritten()
when(completableFutureMock.get(anyLong(), any())).thenReturn(null);

// Test that push status store reader will also return null instead of empty map in this case
Assert.assertNull(storeReaderSpy.getVersionStatus(storeName, storeVersion));
Assert.assertNull(storeReaderSpy.getVersionStatus(storeName, storeVersion, Optional.empty()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -60,10 +61,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
String storeName = Version.parseStoreFromKafkaTopicName(topicName);
int version = Version.parseVersionFromVersionTopicName(topicName);
Map<CharSequence, Integer> instances = null;
if (!incrementalPushVersion.isPresent()) {
// For batch pushes, try to read from version level status key first.
instances = reader.getVersionStatus(storeName, version);
}
// Try to read from version level status key first.
instances = reader.getVersionStatus(storeName, version, incrementalPushVersion);
if (instances == null) {
// Fallback to partition level status key if version level status key is not found.
return getDaVinciPartitionLevelPushStatusAndDetails(
Expand All @@ -73,13 +72,16 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
incrementalPushVersion,
maxOfflineInstanceCount,
maxOfflineInstanceRatio,
useDaVinciSpecificExecutionStatusForError);
useDaVinciSpecificExecutionStatusForError,
Collections.EMPTY_SET);
} else {
// DaVinci starts using new status key format, which contains status for all partitions in one key.
// Only batch pushes will use this key; incremental pushes will still use partition level status key.
LOGGER.info("Getting Da Vinci version level push status for topic: {}", topicName);
LOGGER.info("Got Da Vinci version level push status for topic: {}", topicName);
final int totalInstanceCount = instances.size();
ExecutionStatus completeStatus = ExecutionStatus.COMPLETED;
ExecutionStatus completeStatus = incrementalPushVersion.isPresent()
? ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED
: ExecutionStatus.COMPLETED;
int completedInstanceCount = 0;
boolean allInstancesCompleted = true;
int liveInstanceCount = 0;
Expand Down Expand Up @@ -170,16 +172,32 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
}
String statusDetail = statusDetailStringBuilder.toString();
if (allInstancesCompleted) {
LOGGER.info(
"All {} live Da Vinci instances are at {} state for topic: {}, and there are {} offline instances based on version level push status.",
liveInstanceCount,
completeStatus,
topicName,
offlineInstanceCount);
// To cover edge case that some instances are not upgraded to new config yet:
// In case Da Vinci instances are partially upgraded to the release that produces version level status key,
// we should always try to query the partition level status key for the old instances.
// To cover edge case that some instances finish upgrades recently:
// Besides, for instances that start reporting version level status key, they might have reported partition
// level status key before, and we should also ignore the partition level status key for them.
ExecutionStatusWithDetails partitionLevelStatus = getDaVinciPartitionLevelPushStatusAndDetails(
reader,
topicName,
partitionCount,
incrementalPushVersion,
maxOfflineInstanceCount,
maxOfflineInstanceRatio,
useDaVinciSpecificExecutionStatusForError);
useDaVinciSpecificExecutionStatusForError,
instances.keySet());
LOGGER.info(
"Always query partition level status for topic: {} after version level status key is found."
+ " Push status result from partition level key: {}",
topicName,
partitionLevelStatus.getStatus());
if (partitionLevelStatus.getStatus() != ExecutionStatus.COMPLETED) {
// Do not report COMPLETED, instead, report status from the partition level status key.
statusDetailStringBuilder.append(
Expand Down Expand Up @@ -215,7 +233,8 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe
Optional<String> incrementalPushVersion,
int maxOfflineInstanceCount,
double maxOfflineInstanceRatio,
boolean useDaVinciSpecificExecutionStatusForError) {
boolean useDaVinciSpecificExecutionStatusForError,
Set<CharSequence> instancesToIgnore) {
if (reader == null) {
throw new VeniceException("PushStatusStoreReader is null");
}
Expand Down Expand Up @@ -249,6 +268,17 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe
boolean allInstancesCompleted = true;
totalReplicaCount += instances.size();
for (Map.Entry<CharSequence, Integer> entry: instances.entrySet()) {
// Ignore the instance that are in the ignore set
if (instancesToIgnore.contains(entry.getKey())) {
totalReplicaCount--;
// Log about this decision
LOGGER.debug(
"Skipping ingestion status report from instance: {} for topic: {}, partition: {}",
entry.getKey().toString(),
topicName,
partitionId);
continue;
}
String instanceName = entry.getKey().toString();
PushStatusStoreReader.InstanceStatus instanceStatus = instanceLivenessCache
.computeIfAbsent(instanceName, ignored -> reader.getInstanceStatus(storeName, instanceName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThre
doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("h"));

Map<CharSequence, Integer> map = new HashMap<>();
/**
* 2 is STARTED state, 10 is COMPLETED state.
* Reference: {@link ExecutionStatus}
*/
map.put("a", 2);
map.put("b", 10);
map.put("c", 10);
Expand All @@ -49,17 +53,84 @@ public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThre
map.put("h", 2);

// Test partition level key first
doReturn(null).when(reader).getVersionStatus("store", 1);
doReturn(null).when(reader).getVersionStatus("store", 1, Optional.empty());
doReturn(map).when(reader).getPartitionStatus("store", 1, 0, Optional.empty());

// 1 offline instances is below the fail fast threshold, the overall DaVinci status can be COMPLETED.
validatePushStatus(reader, "store_v1", 2, 0.25, ExecutionStatus.COMPLETED);
validatePushStatus(reader, "store_v1", Optional.empty(), 2, 0.25, ExecutionStatus.COMPLETED);

// Test version level key
doReturn(map).when(reader).getVersionStatus("store", 1);
doReturn(map).when(reader).getVersionStatus("store", 1, Optional.empty());
doReturn(Collections.emptyMap()).when(reader).getPartitionStatus("store", 1, 0, Optional.empty());
// 1 offline instances is below the fail fast threshold, the overall DaVinci status can be COMPLETED.
validatePushStatus(reader, "store_v1", 2, 0.25, ExecutionStatus.COMPLETED);
validatePushStatus(reader, "store_v1", Optional.empty(), 2, 0.25, ExecutionStatus.COMPLETED);

// Test migration case: node "b" and "c" were reporting partition status at STARTED state, but later it completed
// and reported
// version level key status as COMPLETED.
Map<CharSequence, Integer> retiredStatusMap = new HashMap<>();
retiredStatusMap.put("b", 2);
retiredStatusMap.put("c", 2);
doReturn(retiredStatusMap).when(reader).getPartitionStatus("store", 1, 0, Optional.empty());
validatePushStatus(reader, "store_v1", Optional.empty(), 2, 0.25, ExecutionStatus.COMPLETED);

// Test partition level key for incremental push
String incrementalPushVersion = "incrementalPushVersion";
Map<CharSequence, Integer> incrementalPushStatusMap = new HashMap<>();
/**
* 7 is START_OF_INCREMENTAL_PUSH_RECEIVED state, 8 is END_OF_INCREMENTAL_PUSH_RECEIVED state.
* Reference: {@link ExecutionStatus}
*/
incrementalPushStatusMap.put("a", 7);
incrementalPushStatusMap.put("b", 8);
incrementalPushStatusMap.put("c", 8);
incrementalPushStatusMap.put("d", 8);
incrementalPushStatusMap.put("e", 7);
incrementalPushStatusMap.put("f", 7);
incrementalPushStatusMap.put("g", 7);
incrementalPushStatusMap.put("h", 7);

doReturn(null).when(reader).getVersionStatus("store", 1, Optional.of(incrementalPushVersion));
doReturn(incrementalPushStatusMap).when(reader)
.getPartitionStatus("store", 1, 0, Optional.of(incrementalPushVersion));
// 1 offline instances is below the fail fast threshold, the overall DaVinci status can be
// END_OF_INCREMENTAL_PUSH_RECEIVED.
validatePushStatus(
reader,
"store_v1",
Optional.of(incrementalPushVersion),
2,
0.25,
ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);

// Test version level key for incremental push
doReturn(incrementalPushStatusMap).when(reader).getVersionStatus("store", 1, Optional.of(incrementalPushVersion));
doReturn(Collections.emptyMap()).when(reader)
.getPartitionStatus("store", 1, 0, Optional.of(incrementalPushVersion));
// 1 offline instances is below the fail fast threshold, the overall DaVinci status can be
// END_OF_INCREMENTAL_PUSH_RECEIVED.
validatePushStatus(
reader,
"store_v1",
Optional.of(incrementalPushVersion),
2,
0.25,
ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);

// Test migration case: node "b" and "c" were reporting partition status at START_OF_INCREMENTAL_PUSH_RECEIVED
// state, but later it completed and reported version level key status as END_OF_INCREMENTAL_PUSH_RECEIVED.
Map<CharSequence, Integer> retiredIncrementalPushStatusMap = new HashMap<>();
retiredIncrementalPushStatusMap.put("b", 7);
retiredIncrementalPushStatusMap.put("c", 8);
doReturn(retiredIncrementalPushStatusMap).when(reader)
.getPartitionStatus("store", 1, 0, Optional.of(incrementalPushVersion));
validatePushStatus(
reader,
"store_v1",
Optional.of(incrementalPushVersion),
2,
0.25,
ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
}

@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
Expand All @@ -76,8 +147,8 @@ public void testDaVinciPushStatusScan(boolean useDaVinciSpecificExecutionStatusF
map.put("b", 3);
map.put("c", 3);
map.put("d", 10);
doReturn(null).when(reader).getVersionStatus("store", 1);
doReturn(null).when(reader).getVersionStatus("store", 2);
doReturn(null).when(reader).getVersionStatus("store", 1, Optional.empty());
doReturn(null).when(reader).getVersionStatus("store", 2, Optional.empty());
doReturn(map).when(reader).getPartitionStatus("store", 1, 0, Optional.empty());
doReturn(map).when(reader).getPartitionStatus("store", 2, 0, Optional.empty());

Expand Down Expand Up @@ -174,14 +245,15 @@ private void validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold(
private void validatePushStatus(
PushStatusStoreReader reader,
String topicName,
Optional<String> incrementalPushVersion,
int maxOfflineInstanceCount,
double maxOfflineInstanceRatio,
ExecutionStatus expectedStatus) {
ExecutionStatusWithDetails executionStatusWithDetails = PushMonitorUtils.getDaVinciPushStatusAndDetails(
reader,
topicName,
1,
Optional.empty(),
incrementalPushVersion,
maxOfflineInstanceCount,
maxOfflineInstanceRatio,
true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.tehuti.MockTehutiReporter;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import java.util.ArrayList;
Expand All @@ -40,7 +41,7 @@ public class AggVersionedIngestionStatsTest {
*/
@Test
public void testIngestionOffsetRewind() {
MetricsRepository metricsRepo = new MetricsRepository();
MetricsRepository metricsRepo = MetricsRepositoryUtils.createSingleThreadedMetricsRepository();
MockTehutiReporter reporter = new MockTehutiReporter();
VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class);

Expand Down Expand Up @@ -120,7 +121,7 @@ public void testIngestionOffsetRewind() {

@Test
public void testStatsCanUpdateVersionStatus() {
MetricsRepository metricsRepo = new MetricsRepository();
MetricsRepository metricsRepo = MetricsRepositoryUtils.createSingleThreadedMetricsRepository();
MockTehutiReporter reporter = new MockTehutiReporter();
VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class);

Expand Down

0 comments on commit 911f9be

Please sign in to comment.