Skip to content

Commit

Permalink
Wiping system indices, moving ml config check to individual tests tha…
Browse files Browse the repository at this point in the history
…t create a connector

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Jan 21, 2024
1 parent 42e4cd4 commit d09c12c
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ jobs:
run: |
./gradlew integTest yamlRestTest
- name: Multi Nodes Integration Testing
if: matrix.java == 21
if: matrix.java == '21.0.1'
run: |
./gradlew integTest -PnumNodes=3
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -75,64 +76,57 @@ public abstract class FlowFrameworkRestTestCase extends OpenSearchRestTestCase {
@Before
protected void setUpSettings() throws Exception {

if (!indexExistsWithAdminClient(".plugins-ml-config")) {
// Initial cluster set up

// Enable Flow Framework Plugin Rest APIs
Response response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"transient\":{\"plugins.flow_framework.enabled\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable ML Commons to run on non-ml nodes
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable local model registration via URL
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());
// Enable Flow Framework Plugin Rest APIs
Response response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"transient\":{\"plugins.flow_framework.enabled\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Ensure .plugins-ml-config is created before proceeding with integration tests
assertBusy(() -> { assertTrue(indexExistsWithAdminClient(".plugins-ml-config")); }, 60, TimeUnit.SECONDS);
}
// Enable ML Commons to run on non-ml nodes
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable local model registration via URL
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set up clients if running in security enabled cluster
if (isHttps()) {
String fullAccessUserPassword = generatePassword(FULL_ACCESS_USER);
String readAccessUserPassword = generatePassword(READ_ACCESS_USER);

// Configure full access user and client, needs ML Full Access role as well
Response response = createUser(
response = createUser(
FULL_ACCESS_USER,
fullAccessUserPassword,
List.of(FLOW_FRAMEWORK_FULL_ACCESS_ROLE, ML_COMMONS_FULL_ACCESS_ROLE)
Expand Down Expand Up @@ -187,7 +181,7 @@ protected static void deleteIndexWithAdminClient(String name) throws IOException

// Utility fn for checking if an index exists. Should only be used when not allowed in a regular context
// (e.g., checking existence of system indices)
protected static boolean indexExistsWithAdminClient(String indexName) throws IOException {
public static boolean indexExistsWithAdminClient(String indexName) throws IOException {
Request request = new Request("HEAD", "/" + indexName);
Response response = adminClient().performRequest(request);
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
Expand Down Expand Up @@ -253,6 +247,37 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s
}
}

@SuppressWarnings("unchecked")
@After
protected void wipeAllODFEIndices() throws IOException {
Response response = adminClient().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all"));
MediaType xContentType = MediaType.fromMediaType(response.getEntity().getContentType());
try (
XContentParser parser = xContentType.xContent()
.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.getEntity().getContent()
)
) {
XContentParser.Token token = parser.nextToken();
List<Map<String, Object>> parserList = null;
if (token == XContentParser.Token.START_ARRAY) {
parserList = parser.listOrderedMap().stream().map(obj -> (Map<String, Object>) obj).collect(Collectors.toList());
} else {
parserList = Collections.singletonList(parser.mapOrdered());
}

for (Map<String, Object> index : parserList) {
String indexName = (String) index.get("index");
// Do not reset ML encryption index as this is needed to encrypt connector credentials
if (indexName != null && !".opendistro_security".equals(indexName) && !".plugins-ml-config".equals(indexName)) {
adminClient().performRequest(new Request("DELETE", "/" + indexName));
}
}
}
}

/**
* wipeAllIndices won't work since it cannot delete security index. Use wipeAllSystemIndices instead.
*/
Expand Down Expand Up @@ -290,12 +315,13 @@ protected Response createWorkflow(RestClient client, Template template) throws E

/**
* Helper method to invoke the Create Workflow Rest Action with provision
* @param client the rest client
* @param template the template to create
* @throws Exception if the request fails
* @return a rest response
*/
protected Response createWorkflowWithProvision(Template template) throws Exception {
return TestHelpers.makeRequest(client(), "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null);
protected Response createWorkflowWithProvision(RestClient client, Template template) throws Exception {
return TestHelpers.makeRequest(client, "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testFailedUpdateWorkflow() throws Exception {
assertEquals(RestStatus.CREATED, TestHelpers.restStatus(responseCreate));

Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json");
Thread.sleep(1000);

ResponseException exception = expectThrows(ResponseException.class, () -> updateWorkflow(client(), "123", template));
assertTrue(exception.getMessage().contains("Failed to get template: 123"));

Expand Down Expand Up @@ -215,8 +215,14 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
String workflowId = (String) responseMap.get(WORKFLOW_ID);
getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED);

// Hit Provision API and assert status
response = provisionWorkflow(client(), workflowId);
// Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status
if(!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 25, TimeUnit.SECONDS);
response = provisionWorkflow(client(), workflowId);
} else {
response = provisionWorkflow(client(), workflowId);
}

assertEquals(RestStatus.OK, TestHelpers.restStatus(response));
getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS);

Expand All @@ -243,7 +249,13 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
Template template = TestHelpers.createTemplateFromFile("agent-framework.json");

// Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter
Response response = createWorkflowWithProvision(template);
Response response;
if(!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 25, TimeUnit.SECONDS);
response = createWorkflowWithProvision(client(), template);
} else {
response = createWorkflowWithProvision(client(), template);
}
assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response));
Map<String, Object> responseMap = entityAsMap(response);
String workflowId = (String) responseMap.get(WORKFLOW_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;

Expand Down Expand Up @@ -116,7 +117,12 @@ public void testCreateProvisionDeprovisionWorkflowWithFullAccess() throws Except
assertEquals(RestStatus.OK, searchResponse.status());

// Invoke provision API
response = provisionWorkflow(fullAccessClient(), workflowId);
if(!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 25, TimeUnit.SECONDS);
response = provisionWorkflow(fullAccessClient(), workflowId);
} else {
response = provisionWorkflow(fullAccessClient(), workflowId);
}
assertEquals(RestStatus.OK, TestHelpers.restStatus(response));

// Invoke status API
Expand Down

0 comments on commit d09c12c

Please sign in to comment.