Skip to content

Commit

Permalink
tweaks to make overriding behavior of ACTUAL writes stick
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacAttack committed Nov 14, 2024
1 parent f24976e commit 70650b2
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -51,8 +49,8 @@ public class HeartbeatMonitoringService extends AbstractVeniceService {
private final String localRegionName;

// store -> version -> partition -> region -> (timestamp, RTS)
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> followerHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> leaderHeartbeatTimeStamps;
HeartbeatVersionedStats versionStatsReporter;

public HeartbeatMonitoringService(
Expand All @@ -76,7 +74,7 @@ public HeartbeatMonitoringService(
}

private synchronized void initializeEntry(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps,
Version version,
int partition,
boolean isFollower) {
Expand All @@ -87,20 +85,21 @@ private synchronized void initializeEntry(
heartbeatTimestamps.computeIfAbsent(version.getStoreName(), storeKey -> new VeniceConcurrentHashMap<>())
.computeIfAbsent(version.getNumber(), versionKey -> new VeniceConcurrentHashMap<>())
.computeIfAbsent(partition, partitionKey -> {
Map<String, Pair<Long, Boolean>> regionTimestamps = new VeniceConcurrentHashMap<>();
Map<String, HeartbeatTimeStampEntry> regionTimestamps = new VeniceConcurrentHashMap<>();
if (version.isActiveActiveReplicationEnabled() && !isFollower) {
for (String region: regionNames) {
regionTimestamps.put(region, new MutablePair<>(System.currentTimeMillis(), false));
regionTimestamps.put(region, new HeartbeatTimeStampEntry(System.currentTimeMillis(), false, false));
}
} else {
regionTimestamps.put(localRegionName, new MutablePair<>(System.currentTimeMillis(), false));
regionTimestamps
.put(localRegionName, new HeartbeatTimeStampEntry(System.currentTimeMillis(), false, false));
}
return regionTimestamps;
});
}

private synchronized void removeEntry(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps,
Version version,
int partition) {
heartbeatTimestamps.computeIfPresent(version.getStoreName(), (storeKey, versionMap) -> {
Expand Down Expand Up @@ -173,21 +172,21 @@ public Map<String, ReplicaHeartbeatInfo> getHeartbeatInfo(
}

Map<String, ReplicaHeartbeatInfo> getHeartbeatInfoFromMap(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestampMap,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestampMap,
String leaderState,
long currentTimestamp,
String versionTopicName,
int partitionFilter,
boolean filterLagReplica) {
Map<String, ReplicaHeartbeatInfo> result = new VeniceConcurrentHashMap<>();
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> storeName: heartbeatTimestampMap
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> storeName: heartbeatTimestampMap
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>> version: storeName.getValue()
for (Map.Entry<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>> version: storeName.getValue()
.entrySet()) {
for (Map.Entry<Integer, Map<String, Pair<Long, Boolean>>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Pair<Long, Boolean>> region: partition.getValue().entrySet()) {
for (Map.Entry<Integer, Map<String, HeartbeatTimeStampEntry>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, HeartbeatTimeStampEntry> region: partition.getValue().entrySet()) {
String topicName = Version.composeKafkaTopic(storeName.getKey(), version.getKey());
long heartbeatTs = region.getValue().getLeft();
long heartbeatTs = region.getValue().timestamp;
long lag = currentTimestamp - heartbeatTs;
if (!versionTopicName.equals(topicName)) {
continue;
Expand All @@ -204,7 +203,7 @@ Map<String, ReplicaHeartbeatInfo> getHeartbeatInfoFromMap(
replicaId,
region.getKey(),
leaderState,
region.getValue().getRight(),
region.getValue().readyToServe,
heartbeatTs,
lag);
result.put(replicaId + "-" + region.getKey(), replicaHeartbeatInfo);
Expand Down Expand Up @@ -274,20 +273,23 @@ private void recordHeartbeat(
int partition,
String region,
Long timestamp,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps,
boolean isReadyToServe,
boolean retainHighestTimeStamp) {
if (region != null) {
heartbeatTimestamps.computeIfPresent(store, (storeKey, perVersionMap) -> {
perVersionMap.computeIfPresent(version, (versionKey, perPartitionMap) -> {
perPartitionMap.computeIfPresent(partition, (partitionKey, perRegionMap) -> {
// If we are retaining only the highest timestamp for a given heartbeat, d
// If we are retaining only the highest timestamp for a given heartbeat, if the current held heartbeat
// is of a higher value AND was an entry was consumed (not a place holder value by the process) then
// we will No-Op in favor of retaining that higher timestamp. This behavior is specific to follower
// nodes because the intent of this metric is to only show the lag of the follower relative to the leader
if (retainHighestTimeStamp && perRegionMap.get(region) != null
&& perRegionMap.get(region).getLeft() > timestamp) {
&& perRegionMap.get(region).timestamp > timestamp && !perRegionMap.get(region).consumedFromUpstream) {
// No-Op
} else {
// record the heartbeat time stamp
perRegionMap.put(region, new MutablePair<>(timestamp, isReadyToServe));
perRegionMap.put(region, new HeartbeatTimeStampEntry(timestamp, isReadyToServe, true));
}
return perRegionMap;
});
Expand All @@ -298,29 +300,29 @@ private void recordHeartbeat(
}
}

protected Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> getLeaderHeartbeatTimeStamps() {
protected Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> getLeaderHeartbeatTimeStamps() {
return leaderHeartbeatTimeStamps;
}

protected Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> getFollowerHeartbeatTimeStamps() {
protected Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> getFollowerHeartbeatTimeStamps() {
return followerHeartbeatTimeStamps;
}

protected void recordLags(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps,
ReportLagFunction lagFunction) {
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> storeName: heartbeatTimestamps
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> storeName: heartbeatTimestamps
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>> version: storeName.getValue()
for (Map.Entry<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>> version: storeName.getValue()
.entrySet()) {
for (Map.Entry<Integer, Map<String, Pair<Long, Boolean>>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Pair<Long, Boolean>> region: partition.getValue().entrySet()) {
for (Map.Entry<Integer, Map<String, HeartbeatTimeStampEntry>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, HeartbeatTimeStampEntry> region: partition.getValue().entrySet()) {
lagFunction.apply(
storeName.getKey(),
version.getKey(),
region.getKey(),
region.getValue().getLeft(),
region.getValue().getRight());
region.getValue().timestamp,
region.getValue().readyToServe);
}
}
}
Expand All @@ -339,17 +341,17 @@ protected void record() {
}

protected void checkAndMaybeLogHeartbeatDelayMap(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps) {
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps) {
long currentTimestamp = System.currentTimeMillis();
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> storeName: heartbeatTimestamps
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> storeName: heartbeatTimestamps
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>> version: storeName.getValue()
for (Map.Entry<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>> version: storeName.getValue()
.entrySet()) {
for (Map.Entry<Integer, Map<String, Pair<Long, Boolean>>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Pair<Long, Boolean>> region: partition.getValue().entrySet()) {
long heartbeatTs = region.getValue().getLeft();
for (Map.Entry<Integer, Map<String, HeartbeatTimeStampEntry>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, HeartbeatTimeStampEntry> region: partition.getValue().entrySet()) {
long heartbeatTs = region.getValue().timestamp;
long lag = currentTimestamp - heartbeatTs;
if (lag > DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS && region.getValue().getRight()) {
if (lag > DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS && region.getValue().readyToServe) {
String replicaId = Utils
.getReplicaId(Version.composeKafkaTopic(storeName.getKey(), version.getKey()), partition.getKey());
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

public class HeartbeatTimeStampEntry {
/**
* Whether this heartbeat entry is for a partition which is ready to serve
*/
public final boolean readyToServe;

/**
* Whether this heartbeat entry was consumed from input or if the system initialized it as a default entry
*/
public final boolean consumedFromUpstream;

/**
* The timestamp associated with this entry
*/
public final long timestamp;

public HeartbeatTimeStampEntry(long timestamp, boolean readyToServe, boolean consumedFromUpstream) {
this.readyToServe = readyToServe;
this.consumedFromUpstream = consumedFromUpstream;
this.timestamp = timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;


public class HeartbeatVersionedStats extends AbstractVeniceAggVersionedStats<HeartbeatStat, HeartbeatStatReporter> {
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderMonitors;
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerMonitors;
private final Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> leaderMonitors;
private final Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> followerMonitors;

public HeartbeatVersionedStats(
MetricsRepository metricsRepository,
ReadOnlyStoreRepository metadataRepository,
Supplier<HeartbeatStat> statsInitiator,
StatsSupplier<HeartbeatStatReporter> reporterSupplier,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderMonitors,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerMonitors) {
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> leaderMonitors,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> followerMonitors) {
super(metricsRepository, metadataRepository, statsInitiator, reporterSupplier, true);
this.leaderMonitors = leaderMonitors;
this.followerMonitors = followerMonitors;
Expand All @@ -36,7 +35,7 @@ public void recordFollowerLag(
String region,
long heartbeatTs,
boolean isReadyToServe) {
// If the partition is ready to serve, report its lag to the main lag metric. Otherwise, report it
// If the partition is ready to serve, report it's lage to the main lag metric. Otherwise, report it
// to the catch up metric.
// The metric which isn't updated is squelched by reporting the currentTime (so as to appear caught up and mute
// alerts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand All @@ -51,7 +49,7 @@ public void testGetHeartbeatInfoFromMap() {
HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class);
doCallRealMethod().when(heartbeatMonitoringService)
.getHeartbeatInfoFromMap(anyMap(), anyString(), anyLong(), anyString(), anyInt(), anyBoolean());
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderMap =
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> leaderMap =
new VeniceConcurrentHashMap<>();
String store = "testStore";
int version = 1;
Expand All @@ -61,7 +59,7 @@ public void testGetHeartbeatInfoFromMap() {
leaderMap.put(store, new VeniceConcurrentHashMap<>());
leaderMap.get(store).put(version, new VeniceConcurrentHashMap<>());
leaderMap.get(store).get(version).put(partition, new VeniceConcurrentHashMap<>());
leaderMap.get(store).get(version).get(partition).put(region, new MutablePair<>(timestamp, true));
leaderMap.get(store).get(version).get(partition).put(region, new HeartbeatTimeStampEntry(timestamp, true, true));
Assert.assertEquals(
heartbeatMonitoringService
.getHeartbeatInfoFromMap(
Expand Down Expand Up @@ -247,16 +245,14 @@ public void testAddLeaderLagMonitor() {
.get(TEST_STORE)
.get(futureVersion.getNumber())
.get(1)
.get(LOCAL_FABRIC)
.getLeft();
.get(LOCAL_FABRIC).timestamp;
Assert.assertTrue(value >= baseTimeStamp + 1001L);

value = heartbeatMonitoringService.getFollowerHeartbeatTimeStamps()
.get(TEST_STORE)
.get(futureVersion.getNumber())
.get(1)
.get(REMOTE_FABRIC)
.getLeft();
.get(REMOTE_FABRIC).timestamp;
Assert.assertTrue(value >= baseTimeStamp + 1001L);

// Leader state transitions
Expand Down Expand Up @@ -329,15 +325,13 @@ public void testAddLeaderLagMonitor() {
.get(TEST_STORE)
.get(futureVersion.getNumber())
.get(1)
.get(REMOTE_FABRIC)
.getLeft();
.get(REMOTE_FABRIC).timestamp;
Assert.assertEquals((long) value, baseTimeStamp + 1003L);
value = heartbeatMonitoringService.getFollowerHeartbeatTimeStamps()
.get(TEST_STORE)
.get(currentVersion.getNumber())
.get(1)
.get(REMOTE_FABRIC)
.getLeft();
.get(REMOTE_FABRIC).timestamp;
Assert.assertEquals((long) value, baseTimeStamp + 1003L);

// Drop/Error some
Expand Down

0 comments on commit 70650b2

Please sign in to comment.