Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the logic of RCA conditional execution tag system #423

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/rca_cluster_manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"max-flow-units-per-vertex-buffer": 200,

"tags": {
"locus": "cluster_manager-node"
"locus": "cluster_manager-node,data-node"
},

"remote-peers": ["ip1", "ip2", "ip3"],
Expand Down
2 changes: 1 addition & 1 deletion config/rca_idle_cluster_manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"max-flow-units-per-vertex-buffer": 200,

"tags": {
"locus": "idle-cluster_manager-node"
"locus": "idle-cluster_manager-node,data-node"
},

"remote-peers": ["ip1", "ip2", "ip3"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ private void start() {
return;
}

subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
String currentLocus = RcaUtil.getPriorityLocus(rcaConf.getTagMap().get("locus"));
subscriptionManager.setCurrentLocus(currentLocus);
this.connectedComponents = getRcaGraphComponents(rcaConf);

// Mute the rca nodes after the graph creation and before the scheduler start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ public final class Version {
* transferred packets should be dropped. Every increment here should be accompanied with a line
* describing the version bump.
*
* Note: The RCA version is agnostic of OpenSearch version.
* <p>Note: The RCA version is agnostic of OpenSearch version.
*/
static final class Major {
// Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change
// Bumping this post the Commons
// Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service
// Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8)
// change
static final int RCA_MAJ_VERSION = 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,40 @@ public static List<ConnectedComponent> getAnalysisGraphComponents(AnalysisGraph
return Stats.getInstance().getConnectedComponents();
}

/**
* As there is possibility for host locus tags to be hybrid, in terms of rca subscription we
* still have to identify the host with single tag, the most priority one.
*/
public static String getPriorityLocus(String hostLocus) {
if (hostLocus == null || hostLocus.isEmpty()) {
return "";
}
List<String> hostLociStrings =
Arrays.asList(hostLocus.split(RcaConsts.RcaTagConstants.SEPARATOR));
// Non-empty string was split -> guaranteed to be of size at least one.
return hostLociStrings.get(0);
}

public static boolean containsAny(List<String> containerList, List<String> containedList) {
for (String elem : containedList) {
if (containerList.contains(elem)) {
return true;
}
}
return false;
}

public static boolean doTagsMatch(Node<?> node, RcaConf conf) {
Map<String, String> rcaTagMap = conf.getTagMap();
for (Map.Entry<String, String> tag : node.getTags().entrySet()) {
String rcaConfTagvalue = rcaTagMap.get(tag.getKey());
String rcaConfTag = rcaTagMap.get(tag.getKey());
if (rcaConfTag == null) {
return false;
}
List<String> rcaConfTagStrings = Arrays.asList(rcaConfTag.split(","));

return tag.getValue() != null
&& Arrays.asList(tag.getValue().split(",")).contains(rcaConfTagvalue);
&& containsAny(rcaConfTagStrings, Arrays.asList(tag.getValue().split(",")));
}
return true;
}
Expand All @@ -70,12 +98,14 @@ public static boolean shouldExecuteLocally(Node<?> node, RcaConf conf) {
final Map<String, String> nodeTagMap = node.getTags();

if (confTagMap != null && nodeTagMap != null) {
final String hostLocus = confTagMap.get(RcaConsts.RcaTagConstants.TAG_LOCUS);
final String hostLoci = confTagMap.get(RcaConsts.RcaTagConstants.TAG_LOCUS);
final String nodeLoci = nodeTagMap.get(RcaConsts.RcaTagConstants.TAG_LOCUS);
if (nodeLoci != null && !nodeLoci.isEmpty()) {
List<String> nodeLociStrings =
Arrays.asList(nodeLoci.split(RcaConsts.RcaTagConstants.SEPARATOR));
return nodeLociStrings.contains(hostLocus);
List<String> hostLociStrings =
Arrays.asList(hostLoci.split(RcaConsts.RcaTagConstants.SEPARATOR));
return containsAny(hostLociStrings, nodeLociStrings);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public void construct() {
// Use EVALUATION_INTERVAL_SECONDS instead of RCA_PERIOD which resolved to 12 seconds.
// This is resulting in this RCA not getting executed in every 5 seconds.
Rca<ResourceFlowUnit<HotNodeSummary>> threadMetricsRca =
new ThreadMetricsRca(threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS);
new ThreadMetricsRca(
threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS);
threadMetricsRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
Expand Down Expand Up @@ -502,17 +503,15 @@ private void constructShardResourceUsageGraph() {
Metric cpuUtilization = new CPU_Utilization(EVALUATION_INTERVAL_SECONDS);

cpuUtilization.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);

addLeaf(cpuUtilization);

// High CPU Utilization RCA
HotShardRca hotShardRca =
new HotShardRca(EVALUATION_INTERVAL_SECONDS, RCA_PERIOD, cpuUtilization);
hotShardRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
hotShardRca.addAllUpstreams(Arrays.asList(cpuUtilization));

// Hot Shard Cluster RCA which consumes the above
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.performanceanalyzer.rca.scheduler;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -27,6 +29,7 @@
import org.opensearch.performanceanalyzer.rca.framework.core.Node;
import org.opensearch.performanceanalyzer.rca.framework.core.Queryable;
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;
import org.opensearch.performanceanalyzer.rca.framework.util.RcaConsts;
import org.opensearch.performanceanalyzer.rca.framework.util.RcaUtil;
import org.opensearch.performanceanalyzer.rca.messages.DataMsg;
import org.opensearch.performanceanalyzer.rca.messages.IntentMsg;
Expand Down Expand Up @@ -398,4 +401,51 @@ public void mergeLists() {
AssertHelper.compareLists(l1.get(i), ret.get(i));
}
}

@Test
public void testHybridLocusTags() {
Node<MetricFlowUnit> cpuUtilization = new CPU_Utilization(5);
cpuUtilization.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);

Node<MetricFlowUnit> hotShardClusterRca = new CPU_Utilization(5);
hotShardClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE);

RcaConf nonDedicatedClusterManagerConf =
new RcaConf() {
@Override
public Map<String, String> getTagMap() {
return new HashMap<String, String>() {
{
this.put(
RcaConsts.RcaTagConstants.TAG_LOCUS,
"cluster_manager-node,data-node");
}
};
}
};

assertTrue(RcaUtil.shouldExecuteLocally(cpuUtilization, nonDedicatedClusterManagerConf));
assertTrue(
RcaUtil.shouldExecuteLocally(hotShardClusterRca, nonDedicatedClusterManagerConf));

RcaConf dedicatedClusterManagerConf =
new RcaConf() {
@Override
public Map<String, String> getTagMap() {
return new HashMap<String, String>() {
{
this.put(
RcaConsts.RcaTagConstants.TAG_LOCUS,
"cluster_manager-node");
}
};
}
};

assertFalse(RcaUtil.shouldExecuteLocally(cpuUtilization, dedicatedClusterManagerConf));
assertTrue(RcaUtil.shouldExecuteLocally(hotShardClusterRca, dedicatedClusterManagerConf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ private List<ConnectedComponent> createAndExecuteRcaGraph(AppContext appContext)

RcaConf rcaConf = new RcaConf(dataNodeRcaConf);
subscriptionManager = new SubscriptionManager(new GRPCConnectionManager(false));
subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
String currentLocus = RcaUtil.getPriorityLocus(rcaConf.getTagMap().get("locus"));
subscriptionManager.setCurrentLocus(currentLocus);

WireHopper wireHopper =
new WireHopper(
Expand Down Expand Up @@ -664,7 +665,8 @@ public void testHotShardClusterApiResponse() throws Exception {
RcaConf rcaConf = new RcaConf(dataNodeRcaConf);
SubscriptionManager subscriptionManager =
new SubscriptionManager(new GRPCConnectionManager(false));
subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
String currentLocus = RcaUtil.getPriorityLocus(rcaConf.getTagMap().get("locus"));
subscriptionManager.setCurrentLocus(currentLocus);

AppContext appContext = RcaTestHelper.setMyIp("192.168.0.1", AllMetrics.NodeRole.DATA);

Expand Down Expand Up @@ -697,7 +699,8 @@ public void testHotShardClusterApiResponse() throws Exception {
RcaConf rcaConf2 = new RcaConf(clusterManagerNodeRcaConf);
SubscriptionManager subscriptionManager2 =
new SubscriptionManager(new GRPCConnectionManager(false));
subscriptionManager2.setCurrentLocus(rcaConf2.getTagMap().get("locus"));
String currentLocus2 = RcaUtil.getPriorityLocus(rcaConf2.getTagMap().get("locus"));
subscriptionManager2.setCurrentLocus(currentLocus2);

AppContext appContextClusterManager =
RcaTestHelper.setMyIp("192.168.0.4", AllMetrics.NodeRole.ELECTED_CLUSTER_MANAGER);
Expand Down