Skip to content

Commit

Permalink
Fix the profile API returns prematurely.
Browse files Browse the repository at this point in the history
MultiResponsesDelegateActionListener helps send multiple requests asynchronously and return one final response altogether. While waiting for all inflight requests, the method respondImmediately and failImmediately can stop waiting and return immediately. While these two methods are convenient, it is easy to misuse them and cause bugs (see opendistro-for-elasticsearch#339 for example). This PR removes the method respondImmediately and failImmediately and refactor profile runner to avoid using them.

This PR also stops printing out the unknown entity state since it is not useful.

Testing done:
1. Added unit tests to verify the bug fix.
2. Manual tests to run profile calls for single-stream and multi-entity detectors for different phases of the detector lifecycle (disabled, init, running). Verified profile results make sense.
  • Loading branch information
kaituo committed Dec 23, 2020
1 parent 2ae77ed commit 4fb17b0
Show file tree
Hide file tree
Showing 11 changed files with 631 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public void profile(String detectorId, ActionListener<DetectorProfile> listener,
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}

calculateTotalResponsesToWait(detectorId, profilesToCollect, listener);
}

Expand All @@ -118,10 +117,38 @@ private void calculateTotalResponsesToWait(
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, xContentParser.nextToken(), xContentParser);
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);

prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}

private void prepareProfile(
AnomalyDetector detector,
ActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

boolean isMultiEntityDetector = detector.isMultientityDetector();

int totalResponsesToWait = 0;

if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
totalResponsesToWait++;
}
Expand Down Expand Up @@ -158,50 +185,20 @@ private void calculateTotalResponsesToWait(
new MultiResponsesDelegateActionListener<DetectorProfile>(
listener,
totalResponsesToWait,
"Fail to fetch profile for " + detectorId,
CommonErrorMessages.FAIL_FETCH_ERR_MSG + detectorId,
false
);

prepareProfile(detector, delegateListener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}

private void prepareProfile(
AnomalyDetector detector,
MultiResponsesDelegateActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
GetRequest getStateRequest = new GetRequest(DetectorInternalState.DETECTOR_STATE_INDEX, detectorId);
client.get(getStateRequest, onGetDetectorState(listener, detectorId, enabledTimeMs));
client.get(getStateRequest, onGetDetectorState(delegateListener, detectorId, enabledTimeMs));
}

boolean isMultiEntityDetector = detector.isMultientityDetector();

// total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide
// when to consolidate results and return to users
if (isMultiEntityDetector) {
if (profilesToCollect.contains(DetectorProfileName.TOTAL_ENTITIES)) {
profileEntityStats(listener, detector);
profileEntityStats(delegateListener, detector);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
Expand All @@ -210,24 +207,24 @@ private void prepareProfile(
|| profilesToCollect.contains(DetectorProfileName.ACTIVE_ENTITIES)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(DetectorProfileName.STATE)) {
profileModels(detector, profilesToCollect, job, true, listener);
profileModels(detector, profilesToCollect, job, true, delegateListener);
}
} else {
if (profilesToCollect.contains(DetectorProfileName.STATE)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
profileStateRelated(detector, listener, job.isEnabled(), profilesToCollect);
profileStateRelated(detector, delegateListener, job.isEnabled(), profilesToCollect);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
|| profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES)
|| profilesToCollect.contains(DetectorProfileName.MODELS)) {
profileModels(detector, profilesToCollect, job, false, listener);
profileModels(detector, profilesToCollect, job, false, delegateListener);
}
}

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
} catch (Exception e) {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
onGetDetectorForPrepare(listener, profilesToCollect);
Expand Down Expand Up @@ -261,20 +258,19 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
DetectorProfile profile = profileBuilder.totalEntities(value).build();
listener.onResponse(profile);
}, searchException -> { listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId()); })
);
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
}
}

private void onGetDetectorForPrepare(
MultiResponsesDelegateActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profiles
) {
private void onGetDetectorForPrepare(ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profiles) {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
if (profiles.contains(DetectorProfileName.STATE)) {
profileBuilder.state(DetectorState.DISABLED);
}
listener.respondImmediately(profileBuilder.build());
listener.onResponse(profileBuilder.build());
}

/**
Expand Down Expand Up @@ -340,8 +336,8 @@ private ActionListener<GetResponse> onGetDetectorState(
listener.onResponse(profileBuilder.build());

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
// detector state for this detector does not exist
Expand Down Expand Up @@ -475,7 +471,7 @@ private ActionListener<SearchResponse> onInittedEver(
"Fail to find any anomaly result with anomaly score larger than 0 after AD job enabled time for detector {}",
detector.getDetectorId()
);
listener.failImmediately(new RuntimeException("Fail to find detector state: " + detector.getDetectorId(), exception));
listener.onFailure(exception);
}
});
}
Expand Down Expand Up @@ -523,7 +519,7 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
new ParameterizedMessage("Fail to get init progress through messaging for {}", detector.getDetectorId()),
exception
);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detector.getDetectorId(), exception);
listener.onFailure(exception);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.CATEGORY_FIELD_LIMIT;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.List;
import java.util.Optional;
Expand All @@ -35,7 +34,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -113,25 +111,7 @@ public void profile(
new InvalidParameterException(CommonErrorMessages.CATEGORICAL_FIELD_NUMBER_SURPASSED + CATEGORY_FIELD_LIMIT)
);
} else {
int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(EntityProfileName.STATE)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.MODELS)) {
totalResponsesToWait++;
}
MultiResponsesDelegateActionListener<EntityProfile> delegateListener =
new MultiResponsesDelegateActionListener<EntityProfile>(
listener,
totalResponsesToWait,
"Fail to fetch profile for " + entityValue + " of detector " + detectorId,
false
);
prepareEntityProfile(delegateListener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0));
prepareEntityProfile(listener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0));
}
} catch (Exception t) {
listener.onFailure(t);
Expand All @@ -143,7 +123,7 @@ public void profile(
}

private void prepareEntityProfile(
MultiResponsesDelegateActionListener<EntityProfile> delegateListener,
ActionListener<EntityProfile> listener,
String detectorId,
String entityValue,
Set<EntityProfileName> profilesToCollect,
Expand All @@ -158,8 +138,8 @@ private void prepareEntityProfile(
request,
ActionListener
.wrap(
r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, delegateListener),
delegateListener::failImmediately
r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, listener),
listener::onFailure
)
);
}
Expand All @@ -171,7 +151,7 @@ private void getJob(
Set<EntityProfileName> profilesToCollect,
AnomalyDetector detector,
EntityProfileResponse entityProfileResponse,
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
ActionListener<EntityProfile> listener
) {
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
Expand All @@ -184,6 +164,25 @@ private void getJob(
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);

int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(EntityProfileName.STATE)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.MODELS)) {
totalResponsesToWait++;
}
MultiResponsesDelegateActionListener<EntityProfile> delegateListener =
new MultiResponsesDelegateActionListener<EntityProfile>(
listener,
totalResponsesToWait,
"Fail to fetch profile for " + entityValue + " of detector " + detectorId,
false
);

if (profilesToCollect.contains(EntityProfileName.MODELS)) {
EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue);
if (false == job.isEnabled()) {
Expand Down Expand Up @@ -233,20 +232,20 @@ private void getJob(
delegateListener.onResponse(builder.build());
}));
}
} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
delegateListener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
} catch (Exception e) {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener);
sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
logger.info(exception.getMessage());
sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener);
sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener);
} else {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detectorId, exception);
delegateListener.failImmediately(exception);
listener.onFailure(exception);
}
}));
}
Expand Down Expand Up @@ -285,14 +284,14 @@ private void sendUnknownState(
String categoryField,
String entityValue,
boolean immediate,
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
ActionListener<EntityProfile> delegateListener
) {
EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue);
if (profilesToCollect.contains(EntityProfileName.STATE)) {
builder.state(EntityState.UNKNOWN);
}
if (immediate) {
delegateListener.respondImmediately(builder.build());
delegateListener.onResponse(builder.build());
} else {
delegateListener.onResponse(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ public class CommonErrorMessages {
public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector ";
public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than ";
public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid";
public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for ";
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (modelProfile != null) {
builder.field(CommonName.MODEL, modelProfile);
}
if (state != null) {
if (state != EntityState.UNKNOWN) {
builder.field(CommonName.STATE, state);
}
builder.endObject();
Expand Down Expand Up @@ -263,7 +263,7 @@ public String toString() {
if (modelProfile != null) {
builder.append(CommonName.MODELS, modelProfile);
}
if (state != null) {
if (state != EntityState.UNKNOWN) {
builder.append(CommonName.STATE, state);
}
return builder.toString();
Expand Down Expand Up @@ -330,7 +330,7 @@ public void merge(Mergeable other) {
if (otherProfile.modelProfile != null) {
this.modelProfile = otherProfile.modelProfile;
}
if (otherProfile.getState() != null) {
if (otherProfile.getState() != EntityState.UNKNOWN) {
this.state = otherProfile.getState();
}
}
Expand Down
Loading

0 comments on commit 4fb17b0

Please sign in to comment.