diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java index 331687f5..535deb68 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java @@ -99,7 +99,6 @@ public void profile(String detectorId, ActionListener listener, listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT)); return; } - calculateTotalResponsesToWait(detectorId, profilesToCollect, listener); } @@ -118,10 +117,38 @@ private void calculateTotalResponsesToWait( ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, xContentParser.nextToken(), xContentParser); AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId); + + prepareProfile(detector, listener, profilesToCollect); + } catch (Exception e) { + listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e)); + } + } else { + listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId)); + } + }, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception)))); + } + + private void prepareProfile( + AnomalyDetector detector, + ActionListener listener, + Set profilesToCollect + ) { + String detectorId = detector.getDetectorId(); + GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId); + client.get(getRequest, ActionListener.wrap(getResponse -> { + if (getResponse != null && getResponse.isExists()) { + try ( + XContentParser parser = XContentType.JSON + .xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); + long enabledTimeMs = job.getEnabledTime().toEpochMilli(); + boolean isMultiEntityDetector = detector.isMultientityDetector(); int totalResponsesToWait = 0; - if (profilesToCollect.contains(DetectorProfileName.ERROR)) { totalResponsesToWait++; } @@ -158,50 +185,20 @@ private void calculateTotalResponsesToWait( new MultiResponsesDelegateActionListener( listener, totalResponsesToWait, - "Fail to fetch profile for " + detectorId, + CommonErrorMessages.FAIL_FETCH_ERR_MSG + detectorId, false ); - prepareProfile(detector, delegateListener, profilesToCollect); - } catch (Exception e) { - listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e)); - } - } else { - listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId)); - } - }, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception)))); - } - - private void prepareProfile( - AnomalyDetector detector, - MultiResponsesDelegateActionListener listener, - Set profilesToCollect - ) { - String detectorId = detector.getDetectorId(); - GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId); - client.get(getRequest, ActionListener.wrap(getResponse -> { - if (getResponse != null && getResponse.isExists()) { - try ( - XContentParser parser = XContentType.JSON - .xContent() - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()) - ) { - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); - long enabledTimeMs = job.getEnabledTime().toEpochMilli(); - if (profilesToCollect.contains(DetectorProfileName.ERROR)) { GetRequest getStateRequest = new GetRequest(DetectorInternalState.DETECTOR_STATE_INDEX, detectorId); - client.get(getStateRequest, onGetDetectorState(listener, detectorId, enabledTimeMs)); + client.get(getStateRequest, onGetDetectorState(delegateListener, detectorId, enabledTimeMs)); } - boolean isMultiEntityDetector = detector.isMultientityDetector(); - // total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide // when to consolidate results and return to users if (isMultiEntityDetector) { if (profilesToCollect.contains(DetectorProfileName.TOTAL_ENTITIES)) { - profileEntityStats(listener, detector); + profileEntityStats(delegateListener, detector); } if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE) || profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE) @@ -210,24 +207,24 @@ private void prepareProfile( || profilesToCollect.contains(DetectorProfileName.ACTIVE_ENTITIES) || profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS) || profilesToCollect.contains(DetectorProfileName.STATE)) { - profileModels(detector, profilesToCollect, job, true, listener); + profileModels(detector, profilesToCollect, job, true, delegateListener); } } else { if (profilesToCollect.contains(DetectorProfileName.STATE) || profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) { - profileStateRelated(detector, listener, job.isEnabled(), profilesToCollect); + profileStateRelated(detector, delegateListener, job.isEnabled(), profilesToCollect); } if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE) || profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE) || profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES) || profilesToCollect.contains(DetectorProfileName.MODELS)) { - profileModels(detector, profilesToCollect, job, false, listener); + profileModels(detector, profilesToCollect, job, false, delegateListener); } } - } catch (IOException | XContentParseException | NullPointerException e) { - logger.error(e); - listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + } catch (Exception e) { + logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + listener.onFailure(e); } } else { onGetDetectorForPrepare(listener, profilesToCollect); @@ -261,20 +258,19 @@ private void profileEntityStats(MultiResponsesDelegateActionListener { listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId()); }) - ); + }, searchException -> { + logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId()); + listener.onFailure(searchException); + })); } } - private void onGetDetectorForPrepare( - MultiResponsesDelegateActionListener listener, - Set profiles - ) { + private void onGetDetectorForPrepare(ActionListener listener, Set profiles) { DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder(); if (profiles.contains(DetectorProfileName.STATE)) { profileBuilder.state(DetectorState.DISABLED); } - listener.respondImmediately(profileBuilder.build()); + listener.onResponse(profileBuilder.build()); } /** @@ -340,8 +336,8 @@ private ActionListener onGetDetectorState( listener.onResponse(profileBuilder.build()); } catch (IOException | XContentParseException | NullPointerException e) { - logger.error(e); - listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + listener.onFailure(e); } } else { // detector state for this detector does not exist @@ -475,7 +471,7 @@ private ActionListener onInittedEver( "Fail to find any anomaly result with anomaly score larger than 0 after AD job enabled time for detector {}", detector.getDetectorId() ); - listener.failImmediately(new RuntimeException("Fail to find detector state: " + detector.getDetectorId(), exception)); + listener.onFailure(exception); } }); } @@ -523,7 +519,7 @@ private ActionListener onPollRCFUpdates( new ParameterizedMessage("Fail to get init progress through messaging for {}", detector.getDetectorId()), exception ); - listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detector.getDetectorId(), exception); + listener.onFailure(exception); } }); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/EntityProfileRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/EntityProfileRunner.java index 97e82924..23ee639a 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/EntityProfileRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/EntityProfileRunner.java @@ -20,7 +20,6 @@ import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.CATEGORY_FIELD_LIMIT; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; -import java.io.IOException; import java.security.InvalidParameterException; import java.util.List; import java.util.Optional; @@ -35,7 +34,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; @@ -113,25 +111,7 @@ public void profile( new InvalidParameterException(CommonErrorMessages.CATEGORICAL_FIELD_NUMBER_SURPASSED + CATEGORY_FIELD_LIMIT) ); } else { - int totalResponsesToWait = 0; - if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS) - || profilesToCollect.contains(EntityProfileName.STATE)) { - totalResponsesToWait++; - } - if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) { - totalResponsesToWait++; - } - if (profilesToCollect.contains(EntityProfileName.MODELS)) { - totalResponsesToWait++; - } - MultiResponsesDelegateActionListener delegateListener = - new MultiResponsesDelegateActionListener( - listener, - totalResponsesToWait, - "Fail to fetch profile for " + entityValue + " of detector " + detectorId, - false - ); - prepareEntityProfile(delegateListener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0)); + prepareEntityProfile(listener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0)); } } catch (Exception t) { listener.onFailure(t); @@ -143,7 +123,7 @@ public void profile( } private void prepareEntityProfile( - MultiResponsesDelegateActionListener delegateListener, + ActionListener listener, String detectorId, String entityValue, Set profilesToCollect, @@ -158,8 +138,8 @@ private void prepareEntityProfile( request, ActionListener .wrap( - r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, delegateListener), - delegateListener::failImmediately + r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, listener), + listener::onFailure ) ); } @@ -171,7 +151,7 @@ private void getJob( Set profilesToCollect, AnomalyDetector detector, EntityProfileResponse entityProfileResponse, - MultiResponsesDelegateActionListener delegateListener + ActionListener listener ) { GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId); client.get(getRequest, ActionListener.wrap(getResponse -> { @@ -184,6 +164,25 @@ private void getJob( ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); + int totalResponsesToWait = 0; + if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS) + || profilesToCollect.contains(EntityProfileName.STATE)) { + totalResponsesToWait++; + } + if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) { + totalResponsesToWait++; + } + if (profilesToCollect.contains(EntityProfileName.MODELS)) { + totalResponsesToWait++; + } + MultiResponsesDelegateActionListener delegateListener = + new MultiResponsesDelegateActionListener( + listener, + totalResponsesToWait, + "Fail to fetch profile for " + entityValue + " of detector " + detectorId, + false + ); + if (profilesToCollect.contains(EntityProfileName.MODELS)) { EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue); if (false == job.isEnabled()) { @@ -233,20 +232,20 @@ private void getJob( delegateListener.onResponse(builder.build()); })); } - } catch (IOException | XContentParseException | NullPointerException e) { - logger.error(e); - delegateListener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + } catch (Exception e) { + logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + listener.onFailure(e); } } else { - sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener); + sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener); } }, exception -> { if (exception instanceof IndexNotFoundException) { logger.info(exception.getMessage()); - sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener); + sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener); } else { logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detectorId, exception); - delegateListener.failImmediately(exception); + listener.onFailure(exception); } })); } @@ -285,14 +284,14 @@ private void sendUnknownState( String categoryField, String entityValue, boolean immediate, - MultiResponsesDelegateActionListener delegateListener + ActionListener delegateListener ) { EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue); if (profilesToCollect.contains(EntityProfileName.STATE)) { builder.state(EntityState.UNKNOWN); } if (immediate) { - delegateListener.respondImmediately(builder.build()); + delegateListener.onResponse(builder.build()); } else { delegateListener.onResponse(builder.build()); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonErrorMessages.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonErrorMessages.java index e5446ac3..253c6b30 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonErrorMessages.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonErrorMessages.java @@ -34,4 +34,5 @@ public class CommonErrorMessages { public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector "; public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than "; public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid"; + public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for "; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfile.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfile.java index a74070be..3a7dbe58 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfile.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfile.java @@ -214,7 +214,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (modelProfile != null) { builder.field(CommonName.MODEL, modelProfile); } - if (state != null) { + if (state != EntityState.UNKNOWN) { builder.field(CommonName.STATE, state); } builder.endObject(); @@ -263,7 +263,7 @@ public String toString() { if (modelProfile != null) { builder.append(CommonName.MODELS, modelProfile); } - if (state != null) { + if (state != EntityState.UNKNOWN) { builder.append(CommonName.STATE, state); } return builder.toString(); @@ -330,7 +330,7 @@ public void merge(Mergeable other) { if (otherProfile.modelProfile != null) { this.modelProfile = otherProfile.modelProfile; } - if (otherProfile.getState() != null) { + if (otherProfile.getState() != EntityState.UNKNOWN) { this.state = otherProfile.getState(); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java index 52c9380a..f580b264 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java @@ -109,20 +109,4 @@ private void handleSavedResponses() { this.delegate.onResponse(response0); } } - - public void failImmediately(Exception e) { - this.delegate.onFailure(new RuntimeException(finalErrorMsg, e)); - } - - public void failImmediately(String errMsg) { - this.delegate.onFailure(new RuntimeException(errMsg)); - } - - public void failImmediately(String errMsg, Exception e) { - this.delegate.onFailure(new RuntimeException(errMsg, e)); - } - - public void respondImmediately(T o) { - this.delegate.onResponse(o); - } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractProfileRunnerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractProfileRunnerTests.java new file mode 100644 index 00000000..5f3b4900 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractProfileRunnerTests.java @@ -0,0 +1,158 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.elasticsearch.Version; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.transport.TransportAddress; +import org.junit.Before; +import org.junit.BeforeClass; + +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfileName; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; + +public class AbstractProfileRunnerTests extends AbstractADTest { + protected enum DetectorStatus { + INDEX_NOT_EXIST, + NO_DOC, + EXIST + } + + protected enum JobStatus { + INDEX_NOT_EXIT, + DISABLED, + ENABLED + } + + protected enum ErrorResultStatus { + INDEX_NOT_EXIT, + NO_ERROR, + SHINGLE_ERROR, + STOPPED_ERROR, + NULL_POINTER_EXCEPTION + } + + protected AnomalyDetectorProfileRunner runner; + protected Client client; + protected DiscoveryNodeFilterer nodeFilter; + protected AnomalyDetector detector; + protected ClusterService clusterService; + + protected static Set stateOnly; + protected static Set stateNError; + protected static Set modelProfile; + protected static Set stateInitProgress; + protected static Set totalInitProgress; + protected static Set initProgressErrorProfile; + + protected static String noFullShingleError = "No full shingle in current detection window"; + protected static String stoppedError = + "Stopped detector as job failed consecutively for more than 3 times: Having trouble querying data." + + " Maybe all of your features have been disabled."; + + protected static String clusterName; + protected static DiscoveryNode discoveryNode1; + + protected int requiredSamples; + protected int neededSamples; + + // profile model related + protected String node1; + protected String nodeName1; + + protected String node2; + protected String nodeName2; + protected DiscoveryNode discoveryNode2; + + protected long modelSize; + protected String model1Id; + protected String model0Id; + + protected int shingleSize; + + protected int detectorIntervalMin; + protected GetResponse detectorGetReponse; + protected String messaingExceptionError = "blah"; + + @BeforeClass + public static void setUpOnce() { + stateOnly = new HashSet(); + stateOnly.add(DetectorProfileName.STATE); + stateNError = new HashSet(); + stateNError.add(DetectorProfileName.ERROR); + stateNError.add(DetectorProfileName.STATE); + stateInitProgress = new HashSet(); + stateInitProgress.add(DetectorProfileName.INIT_PROGRESS); + stateInitProgress.add(DetectorProfileName.STATE); + modelProfile = new HashSet( + Arrays + .asList( + DetectorProfileName.SHINGLE_SIZE, + DetectorProfileName.MODELS, + DetectorProfileName.COORDINATING_NODE, + DetectorProfileName.TOTAL_SIZE_IN_BYTES + ) + ); + totalInitProgress = new HashSet( + Arrays.asList(DetectorProfileName.TOTAL_ENTITIES, DetectorProfileName.INIT_PROGRESS) + ); + initProgressErrorProfile = new HashSet( + Arrays.asList(DetectorProfileName.INIT_PROGRESS, DetectorProfileName.ERROR) + ); + clusterName = "test-cluster-name"; + discoveryNode1 = new DiscoveryNode( + "nodeName1", + "node1", + new TransportAddress(TransportAddress.META_ADDRESS, 9300), + emptyMap(), + emptySet(), + Version.CURRENT + ); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + client = mock(Client.class); + nodeFilter = mock(DiscoveryNodeFilterer.class); + clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build()); + + requiredSamples = 128; + neededSamples = 5; + + runner = new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter, requiredSamples); + + detectorIntervalMin = 3; + detectorGetReponse = mock(GetResponse.class); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java index fd92d61e..bc40ab6d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java @@ -21,7 +21,6 @@ import static java.util.Collections.emptySet; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; @@ -30,7 +29,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,17 +40,12 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.transport.RemoteTransportException; -import org.junit.Before; -import org.junit.BeforeClass; import com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException; import com.amazon.opendistroforelasticsearch.ad.common.exception.ResourceNotFoundException; @@ -72,96 +65,8 @@ import com.amazon.opendistroforelasticsearch.ad.transport.ProfileResponse; import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction; import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingResponse; -import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; - -public class AnomalyDetectorProfileRunnerTests extends AbstractADTest { - private AnomalyDetectorProfileRunner runner; - private Client client; - private DiscoveryNodeFilterer nodeFilter; - private AnomalyDetector detector; - private ClusterService clusterService; - - private static Set stateOnly; - private static Set stateNError; - private static Set modelProfile; - private static Set stateInitProgress; - private static String noFullShingleError = "No full shingle in current detection window"; - private static String stoppedError = "Stopped detector as job failed consecutively for more than 3 times: Having trouble querying data." - + " Maybe all of your features have been disabled."; - - private int requiredSamples; - private int neededSamples; - - // profile model related - private String node1; - private String nodeName1; - private DiscoveryNode discoveryNode1; - - private String node2; - private String nodeName2; - private DiscoveryNode discoveryNode2; - - private long modelSize; - private String model1Id; - private String model0Id; - - private int shingleSize; - - private int detectorIntervalMin; - private GetResponse detectorGetReponse; - private String messaingExceptionError = "blah"; - - @BeforeClass - public static void setUpOnce() { - stateOnly = new HashSet(); - stateOnly.add(DetectorProfileName.STATE); - stateNError = new HashSet(); - stateNError.add(DetectorProfileName.ERROR); - stateNError.add(DetectorProfileName.STATE); - stateInitProgress = new HashSet(); - stateInitProgress.add(DetectorProfileName.INIT_PROGRESS); - stateInitProgress.add(DetectorProfileName.STATE); - modelProfile = new HashSet( - Arrays - .asList( - DetectorProfileName.SHINGLE_SIZE, - DetectorProfileName.MODELS, - DetectorProfileName.COORDINATING_NODE, - DetectorProfileName.TOTAL_SIZE_IN_BYTES - ) - ); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - client = mock(Client.class); - nodeFilter = mock(DiscoveryNodeFilterer.class); - clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build()); - - requiredSamples = 128; - neededSamples = 5; - - runner = new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter, requiredSamples); - - detectorIntervalMin = 3; - detectorGetReponse = mock(GetResponse.class); - } - - enum DetectorStatus { - INDEX_NOT_EXIST, - NO_DOC, - EXIST - } - - enum JobStatus { - INDEX_NOT_EXIT, - DISABLED, - ENABLED - } +public class AnomalyDetectorProfileRunnerTests extends AbstractProfileRunnerTests { enum RCFPollingStatus { INIT_NOT_EXIT, REMOTE_INIT_NOT_EXIT, @@ -173,13 +78,14 @@ enum RCFPollingStatus { INITTING } - enum ErrorResultStatus { - INDEX_NOT_EXIT, - NO_ERROR, - SHINGLE_ERROR, - STOPPED_ERROR - } - + /** + * Convenience methods for single-stream detector profile tests set up + * @param detectorStatus Detector config status + * @param jobStatus Detector job status + * @param rcfPollingStatus RCF polling result status + * @param errorResultStatus Error result status + * @throws IOException when failing the getting request + */ @SuppressWarnings("unchecked") private void setUpClientGet( DetectorStatus detectorStatus, @@ -188,6 +94,7 @@ private void setUpClientGet( ErrorResultStatus errorResultStatus ) throws IOException { detector = TestHelpers.randomAnomalyDetectorWithInterval(new IntervalTimeConfiguration(detectorIntervalMin, ChronoUnit.MINUTES)); + doAnswer(invocation -> { Object[] args = invocation.getArguments(); GetRequest request = (GetRequest) args[0]; @@ -639,9 +546,9 @@ public void testInitNoUpdateNoIndex() throws IOException, InterruptedException { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { - logger.error(exception); + LOG.error(exception); for (StackTraceElement ste : exception.getStackTrace()) { - logger.info(ste); + LOG.info(ste); } assertTrue("Should not reach here ", false); inProgressLatch.countDown(); @@ -661,9 +568,9 @@ public void testInitNoIndex() throws IOException, InterruptedException { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { - logger.error(exception); + LOG.error(exception); for (StackTraceElement ste : exception.getStackTrace()) { - logger.info(ste); + LOG.info(ste); } assertTrue("Should not reach here ", false); inProgressLatch.countDown(); @@ -674,4 +581,21 @@ public void testInitNoIndex() throws IOException, InterruptedException { public void testInvalidRequiredSamples() { expectThrows(IllegalArgumentException.class, () -> new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter, 0)); } + + public void testFailRCFPolling() throws IOException, InterruptedException { + setUpClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, RCFPollingStatus.EXCEPTION, ErrorResultStatus.NO_ERROR); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + // this means we don't exit with failImmediately. failImmediately can make we return early when there are other concurrent + // requests + assertTrue(exception.getMessage(), exception.getMessage().contains("Exceptions:")); + inProgressLatch.countDown(); + }), stateNError); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index c084aa08..20909dab 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -28,8 +28,10 @@ import static org.powermock.api.mockito.PowerMockito.when; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -38,6 +40,7 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Consumer; +import java.util.stream.IntStream; import org.apache.http.Header; import org.apache.http.HttpEntity; @@ -357,7 +360,7 @@ public static AnomalyDetector randomAnomalyDetectorWithInterval(TimeConfiguratio null, randomInt(), Instant.now().truncatedTo(ChronoUnit.SECONDS), - null, + categoryField, randomUser() ); } @@ -666,6 +669,24 @@ public static GetResponse createGetResponse(ToXContentObject o, String id, Strin ); } + public static GetResponse createBrokenGetResponse(String id, String indexName) throws IOException { + ByteBuffer[] buffers = new ByteBuffer[0]; + return new GetResponse( + new GetResult( + indexName, + MapperService.SINGLE_MAPPING_NAME, + id, + UNASSIGNED_SEQ_NO, + 0, + -1, + true, + BytesReference.fromByteBuffers(buffers), + Collections.emptyMap(), + Collections.emptyMap() + ) + ); + } + public static SearchResponse createSearchResponse(ToXContentObject o) throws IOException { XContentBuilder content = o.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS); @@ -779,4 +800,11 @@ public static String toJsonString(ToXContentObject object) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); return TestHelpers.xContentBuilderToString(object.toXContent(builder, ToXContent.EMPTY_PARAMS)); } + + public static SearchHits createSearchHits(int totalHits) { + List hitList = new ArrayList<>(); + IntStream.range(0, totalHits).forEach(i -> hitList.add(new SearchHit(i))); + SearchHit[] hitArray = new SearchHit[hitList.size()]; + return new SearchHits(hitList.toArray(hitArray), new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 1.0F); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfileTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfileTests.java new file mode 100644 index 00000000..5e195f7b --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfileTests.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.model; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +import java.io.IOException; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import test.com.amazon.opendistroforelasticsearch.ad.util.JsonDeserializer; + +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.common.exception.JsonPathNotFoundException; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; + +public class EntityProfileTests extends AbstractADTest { + public void testMerge() { + EntityProfile profile1 = new EntityProfile(null, null, null, -1, -1, null, null, EntityState.INIT); + + EntityProfile profile2 = new EntityProfile(null, null, null, -1, -1, null, null, EntityState.UNKNOWN); + + profile1.merge(profile2); + assertEquals(profile1.getState(), EntityState.INIT); + } + + public void testToXContent() throws IOException, JsonPathNotFoundException { + EntityProfile profile1 = new EntityProfile(null, null, null, -1, -1, null, null, EntityState.INIT); + + XContentBuilder builder = jsonBuilder(); + profile1.toXContent(builder, ToXContent.EMPTY_PARAMS); + String json = Strings.toString(builder); + + assertEquals("INIT", JsonDeserializer.getTextValue(json, CommonName.STATE)); + + EntityProfile profile2 = new EntityProfile(null, null, null, -1, -1, null, null, EntityState.UNKNOWN); + + builder = jsonBuilder(); + profile2.toXContent(builder, ToXContent.EMPTY_PARAMS); + json = Strings.toString(builder); + + assertTrue(false == JsonDeserializer.hasChildNode(json, CommonName.STATE)); + } +} diff --git a/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index 30949a86..f65f218b 100644 --- a/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -27,13 +27,9 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; -import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -53,8 +49,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -182,13 +176,6 @@ public void setUp() throws Exception { ); } - private SearchHits createSearchHits(int totalHits) { - List hitList = new ArrayList<>(); - IntStream.range(0, totalHits).forEach(i -> hitList.add(new SearchHit(i))); - SearchHit[] hitArray = new SearchHit[hitList.size()]; - return new SearchHits(hitList.toArray(hitArray), new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 1.0F); - } - public void testTwoCategoricalFields() throws IOException { expectThrows( IllegalArgumentException.class, @@ -200,7 +187,7 @@ public void testTwoCategoricalFields() throws IOException { public void testNoCategoricalField() throws IOException { SearchResponse mockResponse = mock(SearchResponse.class); int totalHits = 1001; - when(mockResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(mockResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); doAnswer(invocation -> { Object[] args = invocation.getArguments(); assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length == 2); @@ -250,7 +237,7 @@ public void testTextField() throws IOException { SearchResponse detectorResponse = mock(SearchResponse.class); int totalHits = 9; - when(detectorResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(detectorResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); // extend NodeClient since its execute method is final and mockito does not allow to mock final methods // we can also use spy to overstep the final methods @@ -313,11 +300,11 @@ private void testValidTypeTemplate(String filedTypeName) throws IOException { SearchResponse detectorResponse = mock(SearchResponse.class); int totalHits = 9; - when(detectorResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(detectorResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); SearchResponse userIndexResponse = mock(SearchResponse.class); int userIndexHits = 0; - when(userIndexResponse.getHits()).thenReturn(createSearchHits(userIndexHits)); + when(userIndexResponse.getHits()).thenReturn(TestHelpers.createSearchHits(userIndexHits)); // extend NodeClient since its execute method is final and mockito does not allow to mock final methods // we can also use spy to overstep the final methods @@ -397,14 +384,14 @@ private void testUpdateTemplate(String fieldTypeName) throws IOException { SearchResponse detectorResponse = mock(SearchResponse.class); int totalHits = 9; - when(detectorResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(detectorResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); GetResponse getDetectorResponse = TestHelpers .createGetResponse(detector, detector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX); SearchResponse userIndexResponse = mock(SearchResponse.class); int userIndexHits = 0; - when(userIndexResponse.getHits()).thenReturn(createSearchHits(userIndexHits)); + when(userIndexResponse.getHits()).thenReturn(TestHelpers.createSearchHits(userIndexHits)); // extend NodeClient since its execute method is final and mockito does not allow to mock final methods // we can also use spy to overstep the final methods @@ -496,7 +483,7 @@ public void testMoreThanTenMultiEntityDetectors() throws IOException { int totalHits = 11; - when(mockResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(mockResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); doAnswer(invocation -> { Object[] args = invocation.getArguments(); @@ -530,7 +517,7 @@ public void testTenMultiEntityDetectorsUpdateSingleEntityAdToMulti() throws IOEx .createGetResponse(existingDetector, existingDetector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX); SearchResponse searchResponse = mock(SearchResponse.class); - when(searchResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(searchResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); doAnswer(invocation -> { Object[] args = invocation.getArguments(); @@ -602,7 +589,7 @@ public void testTenMultiEntityDetectorsUpdateExistingMultiEntityAd() throws IOEx .createGetResponse(detector, detector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX); SearchResponse searchResponse = mock(SearchResponse.class); - when(searchResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(searchResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); doAnswer(invocation -> { Object[] args = invocation.getArguments(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java b/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java new file mode 100644 index 00000000..0be67df3 --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java @@ -0,0 +1,261 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.elasticsearch.search.aggregations.metrics; + +import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; +import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; + +import com.amazon.opendistroforelasticsearch.ad.AbstractProfileRunnerTests; +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState; +import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction; +import com.amazon.opendistroforelasticsearch.ad.transport.ProfileNodeResponse; +import com.amazon.opendistroforelasticsearch.ad.transport.ProfileResponse; +import com.carrotsearch.hppc.BitMixer; + +/** + * Run tests in ES package since InternalCardinality has only package private constructors + * and we cannot mock it since it is a final class. + * + */ +public class CardinalityProfileTests extends AbstractProfileRunnerTests { + enum ADResultStatus { + NO_RESULT, + EXCEPTION + } + + enum CardinalityStatus { + EXCEPTION, + NORMAL + } + + @SuppressWarnings("unchecked") + private void setUpMultiEntityClientGet(DetectorStatus detectorStatus, JobStatus jobStatus, ErrorResultStatus errorResultStatus) + throws IOException { + detector = TestHelpers + .randomAnomalyDetectorWithInterval(new IntervalTimeConfiguration(detectorIntervalMin, ChronoUnit.MINUTES), true); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + GetRequest request = (GetRequest) args[0]; + ActionListener listener = (ActionListener) args[1]; + + if (request.index().equals(ANOMALY_DETECTORS_INDEX)) { + switch (detectorStatus) { + case EXIST: + listener + .onResponse( + TestHelpers.createGetResponse(detector, detector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX) + ); + break; + default: + assertTrue("should not reach here", false); + break; + } + } else if (request.index().equals(ANOMALY_DETECTOR_JOB_INDEX)) { + AnomalyDetectorJob job = null; + switch (jobStatus) { + case ENABLED: + job = TestHelpers.randomAnomalyDetectorJob(true); + listener + .onResponse( + TestHelpers.createGetResponse(job, detector.getDetectorId(), AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX) + ); + break; + default: + assertTrue("should not reach here", false); + break; + } + } else if (request.index().equals(DetectorInternalState.DETECTOR_STATE_INDEX)) { + switch (errorResultStatus) { + case NO_ERROR: + break; + case NULL_POINTER_EXCEPTION: + GetResponse response = mock(GetResponse.class); + when(response.isExists()).thenReturn(true); + doThrow(NullPointerException.class).when(response).getSourceAsString(); + listener.onResponse(response); + break; + default: + assertTrue("should not reach here", false); + break; + } + } + return null; + }).when(client).get(any(), any()); + } + + @SuppressWarnings("unchecked") + private void setUpMultiEntityClientSearch(ADResultStatus resultStatus, CardinalityStatus cardinalityStatus) { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[1]; + SearchRequest request = (SearchRequest) args[0]; + if (request.indices()[0].equals(CommonName.ANOMALY_RESULT_INDEX_ALIAS)) { + switch (resultStatus) { + case NO_RESULT: + SearchResponse mockResponse = mock(SearchResponse.class); + when(mockResponse.getHits()).thenReturn(TestHelpers.createSearchHits(0)); + listener.onResponse(mockResponse); + break; + case EXCEPTION: + listener.onFailure(new RuntimeException()); + break; + default: + assertTrue("should not reach here", false); + break; + } + } else { + switch (cardinalityStatus) { + case EXCEPTION: + listener.onFailure(new RuntimeException()); + break; + case NORMAL: + SearchResponse response = mock(SearchResponse.class); + List aggs = new ArrayList<>(1); + HyperLogLogPlusPlus hyperLogLog = new HyperLogLogPlusPlus( + AbstractHyperLogLog.MIN_PRECISION, + BigArrays.NON_RECYCLING_INSTANCE, + 0 + ); + for (int i = 0; i < 100; i++) { + hyperLogLog.collect(0, BitMixer.mix64(randomIntBetween(1, 100))); + } + aggs.add(new InternalCardinality(CommonName.TOTAL_ENTITIES, hyperLogLog, new HashMap<>())); + when(response.getAggregations()).thenReturn(InternalAggregations.from(aggs)); + listener.onResponse(response); + break; + default: + assertTrue("should not reach here", false); + break; + } + + } + + return null; + }).when(client).search(any(), any()); + } + + @SuppressWarnings("unchecked") + private void setUpProfileAction() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + + ActionListener listener = (ActionListener) args[2]; + + ProfileNodeResponse profileNodeResponse1 = new ProfileNodeResponse(discoveryNode1, new HashMap<>(), shingleSize, 0, 0); + List profileNodeResponses = Arrays.asList(profileNodeResponse1); + listener.onResponse(new ProfileResponse(new ClusterName(clusterName), profileNodeResponses, Collections.emptyList())); + + return null; + }).when(client).execute(eq(ProfileAction.INSTANCE), any(), any()); + } + + public void testFailGetEntityStats() throws IOException, InterruptedException { + setUpMultiEntityClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, ErrorResultStatus.NO_ERROR); + setUpMultiEntityClientSearch(ADResultStatus.NO_RESULT, CardinalityStatus.EXCEPTION); + setUpProfileAction(); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + // this means we don't exit with failImmediately. failImmediately can make we return early when there are other concurrent + // requests + assertTrue(exception.getMessage(), exception.getMessage().contains("Exceptions:")); + inProgressLatch.countDown(); + + }), totalInitProgress); + + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + public void testFailGetState() throws IOException, InterruptedException { + setUpMultiEntityClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, ErrorResultStatus.NULL_POINTER_EXCEPTION); + setUpMultiEntityClientSearch(ADResultStatus.NO_RESULT, CardinalityStatus.NORMAL); + setUpProfileAction(); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + // this means we don't exit with failImmediately. failImmediately can make we return early when there are other concurrent + // requests + assertTrue(exception.getMessage(), exception.getMessage().contains("Exceptions:")); + inProgressLatch.countDown(); + + }), initProgressErrorProfile); + + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + public void testFaiConfirmInitted() throws IOException, InterruptedException { + setUpMultiEntityClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, ErrorResultStatus.NO_ERROR); + setUpMultiEntityClientSearch(ADResultStatus.EXCEPTION, CardinalityStatus.NORMAL); + setUpProfileAction(); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + // this means we don't exit with failImmediately. failImmediately can make we return early when there are other concurrent + // requests + assertTrue(exception.getMessage(), exception.getMessage().contains("Exceptions:")); + inProgressLatch.countDown(); + + }), totalInitProgress); + + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } +}