Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-enabled multi-node integration tests and Fix integration test set up #432

Merged
merged 12 commits into from
Jan 22, 2024
Merged
20 changes: 8 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,6 @@ ext{
cluster.setting("plugins.security.authcz.admin_dn", "\n- CN=kirk,OU=client,O=client,L=test, C=de")
cluster.setting('plugins.security.restapi.roles_enabled', '["all_access", "security_rest_api_access"]')
cluster.setting('plugins.security.system_indices.enabled', "true")
cluster.setting('plugins.security.system_indices.indices', '[' +
'".plugins-ml-config", ' +
'".plugins-ml-connector", ' +
'".plugins-ml-model-group", ' +
'".plugins-ml-model", ".plugins-ml-task", ' +
'".plugins-ml-conversation-meta", ' +
'".plugins-ml-conversation-interactions", ' +
'".plugins-flow-framework-config", ' +
'".plugins-flow-framework-templates", ' +
'".plugins-flow-framework-state"' +
']'
)
cluster.setSecure(true)
}
}
Expand Down Expand Up @@ -424,6 +412,14 @@ task integTestRemote(type: RestIntegTestTask) {

// Automatically sets up the integration test cluster locally
run {
doFirst {
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
cluster.waitForAllConditions()
}
}

useCluster testClusters.integTest
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -75,64 +76,79 @@ public abstract class FlowFrameworkRestTestCase extends OpenSearchRestTestCase {
@Before
protected void setUpSettings() throws Exception {

if (!indexExistsWithAdminClient(".plugins-ml-config")) {
// Initial cluster set up

// Enable Flow Framework Plugin Rest APIs
Response response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"transient\":{\"plugins.flow_framework.enabled\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable ML Commons to run on non-ml nodes
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable local model registration via URL
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());
// Enable Flow Framework Plugin Rest APIs
Response response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"transient\":{\"plugins.flow_framework.enabled\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Ensure .plugins-ml-config is created before proceeding with integration tests
assertBusy(() -> { assertTrue(indexExistsWithAdminClient(".plugins-ml-config")); }, 60, TimeUnit.SECONDS);
}
// Enable ML Commons to run on non-ml nodes
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable local model registration via URL
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set ML auto redeploy to false
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.model_auto_redeploy.enable\":false}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set ML auto redeploy retries to 0
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.model_auto_redeploy.lifetime_retry_times\":0}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set up clients if running in security enabled cluster
if (isHttps()) {
String fullAccessUserPassword = generatePassword(FULL_ACCESS_USER);
String readAccessUserPassword = generatePassword(READ_ACCESS_USER);

// Configure full access user and client, needs ML Full Access role as well
Response response = createUser(
response = createUser(
FULL_ACCESS_USER,
fullAccessUserPassword,
List.of(FLOW_FRAMEWORK_FULL_ACCESS_ROLE, ML_COMMONS_FULL_ACCESS_ROLE)
Expand Down Expand Up @@ -187,7 +203,7 @@ protected static void deleteIndexWithAdminClient(String name) throws IOException

// Utility fn for checking if an index exists. Should only be used when not allowed in a regular context
// (e.g., checking existence of system indices)
protected static boolean indexExistsWithAdminClient(String indexName) throws IOException {
public static boolean indexExistsWithAdminClient(String indexName) throws IOException {
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
Request request = new Request("HEAD", "/" + indexName);
Response response = adminClient().performRequest(request);
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
Expand Down Expand Up @@ -253,6 +269,40 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s
}
}

@SuppressWarnings("unchecked")
@After
protected void wipeAllODFEIndices() throws IOException {
Response response = adminClient().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all"));
MediaType xContentType = MediaType.fromMediaType(response.getEntity().getContentType());
try (
XContentParser parser = xContentType.xContent()
.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.getEntity().getContent()
)
) {
XContentParser.Token token = parser.nextToken();
List<Map<String, Object>> parserList = null;
if (token == XContentParser.Token.START_ARRAY) {
parserList = parser.listOrderedMap().stream().map(obj -> (Map<String, Object>) obj).collect(Collectors.toList());
} else {
parserList = Collections.singletonList(parser.mapOrdered());
}

for (Map<String, Object> index : parserList) {
String indexName = (String) index.get("index");
// Do not reset ML/Flow Framework encryption index as this is needed to encrypt connector credentials
if (indexName != null
&& !".opendistro_security".equals(indexName)
&& !".plugins-ml-config".equals(indexName)
&& !".plugins-flow-framework-config".equals(indexName)) {
adminClient().performRequest(new Request("DELETE", "/" + indexName));
}
}
}
}

/**
* wipeAllIndices won't work since it cannot delete security index. Use wipeAllSystemIndices instead.
*/
Expand Down Expand Up @@ -290,12 +340,13 @@ protected Response createWorkflow(RestClient client, Template template) throws E

/**
* Helper method to invoke the Create Workflow Rest Action with provision
* @param client the rest client
* @param template the template to create
* @throws Exception if the request fails
* @return a rest response
*/
protected Response createWorkflowWithProvision(Template template) throws Exception {
return TestHelpers.makeRequest(client(), "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null);
protected Response createWorkflowWithProvision(RestClient client, Template template) throws Exception {
return TestHelpers.makeRequest(client, "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testFailedUpdateWorkflow() throws Exception {
assertEquals(RestStatus.CREATED, TestHelpers.restStatus(responseCreate));

Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json");
Thread.sleep(1000);

ResponseException exception = expectThrows(ResponseException.class, () -> updateWorkflow(client(), "123", template));
assertTrue(exception.getMessage().contains("Failed to get template: 123"));

Expand All @@ -84,8 +84,14 @@ public void testFailedUpdateWorkflow() throws Exception {
Map<String, Object> responseMap = entityAsMap(response);
String workflowId = (String) responseMap.get(WORKFLOW_ID);

// Provision Template
Response provisionResponse = provisionWorkflow(client(), workflowId);
// Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status
Response provisionResponse;
if (!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS);
provisionResponse = provisionWorkflow(client(), workflowId);
} else {
provisionResponse = provisionWorkflow(client(), workflowId);
}
assertEquals(RestStatus.OK, TestHelpers.restStatus(provisionResponse));
getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS);

Expand Down Expand Up @@ -163,12 +169,6 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(0).resourceId());
assertEquals("deploy_model", resourcesCreated.get(1).workflowStepName());
assertNotNull(resourcesCreated.get(1).resourceId());

// Deprovision the workflow to avoid opening circut breaker when running additional tests
Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);

// wait for deprovision to complete
Thread.sleep(5000);
}

public void testCreateAndProvisionCyclicalTemplate() throws Exception {
Expand Down Expand Up @@ -215,8 +215,14 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
String workflowId = (String) responseMap.get(WORKFLOW_ID);
getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED);

// Hit Provision API and assert status
response = provisionWorkflow(client(), workflowId);
// Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status
if (!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS);
response = provisionWorkflow(client(), workflowId);
} else {
response = provisionWorkflow(client(), workflowId);
}

assertEquals(RestStatus.OK, TestHelpers.restStatus(response));
getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS);

Expand All @@ -231,19 +237,19 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(1).resourceId());
assertEquals("deploy_model", resourcesCreated.get(2).workflowStepName());
assertNotNull(resourcesCreated.get(2).resourceId());

// Deprovision the workflow to avoid opening circuit breaker when running additional tests
Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);

// wait for deprovision to complete
Thread.sleep(5000);
}

public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
Template template = TestHelpers.createTemplateFromFile("agent-framework.json");

// Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter
Response response = createWorkflowWithProvision(template);
Response response;
if (!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS);
response = createWorkflowWithProvision(client(), template);
} else {
response = createWorkflowWithProvision(client(), template);
}
assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response));
Map<String, Object> responseMap = entityAsMap(response);
String workflowId = (String) responseMap.get(WORKFLOW_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;

Expand Down Expand Up @@ -116,7 +117,12 @@ public void testCreateProvisionDeprovisionWorkflowWithFullAccess() throws Except
assertEquals(RestStatus.OK, searchResponse.status());

// Invoke provision API
response = provisionWorkflow(fullAccessClient(), workflowId);
if (!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS);
response = provisionWorkflow(fullAccessClient(), workflowId);
} else {
response = provisionWorkflow(fullAccessClient(), workflowId);
}
assertEquals(RestStatus.OK, TestHelpers.restStatus(response));

// Invoke status API
Expand Down
Loading