diff --git a/build.gradle b/build.gradle index a7eb23f7c..6f0817bc8 100644 --- a/build.gradle +++ b/build.gradle @@ -238,18 +238,6 @@ ext{ cluster.setting("plugins.security.authcz.admin_dn", "\n- CN=kirk,OU=client,O=client,L=test, C=de") cluster.setting('plugins.security.restapi.roles_enabled', '["all_access", "security_rest_api_access"]') cluster.setting('plugins.security.system_indices.enabled', "true") - cluster.setting('plugins.security.system_indices.indices', '[' + - '".plugins-ml-config", ' + - '".plugins-ml-connector", ' + - '".plugins-ml-model-group", ' + - '".plugins-ml-model", ".plugins-ml-task", ' + - '".plugins-ml-conversation-meta", ' + - '".plugins-ml-conversation-interactions", ' + - '".plugins-flow-framework-config", ' + - '".plugins-flow-framework-templates", ' + - '".plugins-flow-framework-state"' + - ']' - ) cluster.setSecure(true) } } @@ -424,6 +412,14 @@ task integTestRemote(type: RestIntegTestTask) { // Automatically sets up the integration test cluster locally run { + doFirst { + // There seems to be an issue when running multi node run or integ tasks with unicast_hosts + // not being written, the waitForAllConditions ensures it's written + getClusters().forEach { cluster -> + cluster.waitForAllConditions() + } + } + useCluster testClusters.integTest } diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 1d450cd26..666a1541d 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -45,6 +45,7 @@ import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.WorkflowState; import org.opensearch.test.rest.OpenSearchRestTestCase; +import org.junit.After; import org.junit.Before; import java.io.IOException; @@ -75,56 +76,71 @@ public abstract class FlowFrameworkRestTestCase extends OpenSearchRestTestCase { @Before protected void setUpSettings() throws Exception { - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - // Initial cluster set up - - // Enable Flow Framework Plugin Rest APIs - Response response = TestHelpers.makeRequest( - client(), - "PUT", - "_cluster/settings", - null, - "{\"transient\":{\"plugins.flow_framework.enabled\":true}}", - List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) - ); - assertEquals(200, response.getStatusLine().getStatusCode()); - - // Enable ML Commons to run on non-ml nodes - response = TestHelpers.makeRequest( - client(), - "PUT", - "_cluster/settings", - null, - "{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}", - List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) - ); - assertEquals(200, response.getStatusLine().getStatusCode()); - - // Enable local model registration via URL - response = TestHelpers.makeRequest( - client(), - "PUT", - "_cluster/settings", - null, - "{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}", - List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) - ); - assertEquals(200, response.getStatusLine().getStatusCode()); - - // Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests - response = TestHelpers.makeRequest( - client(), - "PUT", - "_cluster/settings", - null, - "{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}", - List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) - ); - assertEquals(200, response.getStatusLine().getStatusCode()); + // Enable Flow Framework Plugin Rest APIs + Response response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"transient\":{\"plugins.flow_framework.enabled\":true}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); - // Ensure .plugins-ml-config is created before proceeding with integration tests - assertBusy(() -> { assertTrue(indexExistsWithAdminClient(".plugins-ml-config")); }, 60, TimeUnit.SECONDS); - } + // Enable ML Commons to run on non-ml nodes + response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); + + // Enable local model registration via URL + response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); + + // Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests + response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); + + // Set ML auto redeploy to false + response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"persistent\":{\"plugins.ml_commons.model_auto_redeploy.enable\":false}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); + + // Set ML auto redeploy retries to 0 + response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"persistent\":{\"plugins.ml_commons.model_auto_redeploy.lifetime_retry_times\":0}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); // Set up clients if running in security enabled cluster if (isHttps()) { @@ -132,7 +148,7 @@ protected void setUpSettings() throws Exception { String readAccessUserPassword = generatePassword(READ_ACCESS_USER); // Configure full access user and client, needs ML Full Access role as well - Response response = createUser( + response = createUser( FULL_ACCESS_USER, fullAccessUserPassword, List.of(FLOW_FRAMEWORK_FULL_ACCESS_ROLE, ML_COMMONS_FULL_ACCESS_ROLE) @@ -187,7 +203,7 @@ protected static void deleteIndexWithAdminClient(String name) throws IOException // Utility fn for checking if an index exists. Should only be used when not allowed in a regular context // (e.g., checking existence of system indices) - protected static boolean indexExistsWithAdminClient(String indexName) throws IOException { + public static boolean indexExistsWithAdminClient(String indexName) throws IOException { Request request = new Request("HEAD", "/" + indexName); Response response = adminClient().performRequest(request); return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); @@ -253,6 +269,40 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s } } + @SuppressWarnings("unchecked") + @After + protected void wipeAllODFEIndices() throws IOException { + Response response = adminClient().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all")); + MediaType xContentType = MediaType.fromMediaType(response.getEntity().getContentType()); + try ( + XContentParser parser = xContentType.xContent() + .createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + response.getEntity().getContent() + ) + ) { + XContentParser.Token token = parser.nextToken(); + List> parserList = null; + if (token == XContentParser.Token.START_ARRAY) { + parserList = parser.listOrderedMap().stream().map(obj -> (Map) obj).collect(Collectors.toList()); + } else { + parserList = Collections.singletonList(parser.mapOrdered()); + } + + for (Map index : parserList) { + String indexName = (String) index.get("index"); + // Do not reset ML/Flow Framework encryption index as this is needed to encrypt connector credentials + if (indexName != null + && !".opendistro_security".equals(indexName) + && !".plugins-ml-config".equals(indexName) + && !".plugins-flow-framework-config".equals(indexName)) { + adminClient().performRequest(new Request("DELETE", "/" + indexName)); + } + } + } + } + /** * wipeAllIndices won't work since it cannot delete security index. Use wipeAllSystemIndices instead. */ @@ -290,12 +340,13 @@ protected Response createWorkflow(RestClient client, Template template) throws E /** * Helper method to invoke the Create Workflow Rest Action with provision + * @param client the rest client * @param template the template to create * @throws Exception if the request fails * @return a rest response */ - protected Response createWorkflowWithProvision(Template template) throws Exception { - return TestHelpers.makeRequest(client(), "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null); + protected Response createWorkflowWithProvision(RestClient client, Template template) throws Exception { + return TestHelpers.makeRequest(client, "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null); } /** diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index c097fce6b..d580794f3 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -74,7 +74,7 @@ public void testFailedUpdateWorkflow() throws Exception { assertEquals(RestStatus.CREATED, TestHelpers.restStatus(responseCreate)); Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json"); - Thread.sleep(1000); + ResponseException exception = expectThrows(ResponseException.class, () -> updateWorkflow(client(), "123", template)); assertTrue(exception.getMessage().contains("Failed to get template: 123")); @@ -84,8 +84,14 @@ public void testFailedUpdateWorkflow() throws Exception { Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); - // Provision Template - Response provisionResponse = provisionWorkflow(client(), workflowId); + // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status + Response provisionResponse; + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + provisionResponse = provisionWorkflow(client(), workflowId); + } else { + provisionResponse = provisionWorkflow(client(), workflowId); + } assertEquals(RestStatus.OK, TestHelpers.restStatus(provisionResponse)); getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); @@ -163,12 +169,6 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception { assertNotNull(resourcesCreated.get(0).resourceId()); assertEquals("deploy_model", resourcesCreated.get(1).workflowStepName()); assertNotNull(resourcesCreated.get(1).resourceId()); - - // Deprovision the workflow to avoid opening circut breaker when running additional tests - Response deprovisionResponse = deprovisionWorkflow(client(), workflowId); - - // wait for deprovision to complete - Thread.sleep(5000); } public void testCreateAndProvisionCyclicalTemplate() throws Exception { @@ -215,8 +215,14 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception { String workflowId = (String) responseMap.get(WORKFLOW_ID); getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); - // Hit Provision API and assert status - response = provisionWorkflow(client(), workflowId); + // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + response = provisionWorkflow(client(), workflowId); + } else { + response = provisionWorkflow(client(), workflowId); + } + assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); @@ -231,19 +237,19 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception { assertNotNull(resourcesCreated.get(1).resourceId()); assertEquals("deploy_model", resourcesCreated.get(2).workflowStepName()); assertNotNull(resourcesCreated.get(2).resourceId()); - - // Deprovision the workflow to avoid opening circuit breaker when running additional tests - Response deprovisionResponse = deprovisionWorkflow(client(), workflowId); - - // wait for deprovision to complete - Thread.sleep(5000); } public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { Template template = TestHelpers.createTemplateFromFile("agent-framework.json"); // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter - Response response = createWorkflowWithProvision(template); + Response response; + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + response = createWorkflowWithProvision(client(), template); + } else { + response = createWorkflowWithProvision(client(), template); + } assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java index e83e7f08e..5985e928c 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; @@ -116,7 +117,12 @@ public void testCreateProvisionDeprovisionWorkflowWithFullAccess() throws Except assertEquals(RestStatus.OK, searchResponse.status()); // Invoke provision API - response = provisionWorkflow(fullAccessClient(), workflowId); + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + response = provisionWorkflow(fullAccessClient(), workflowId); + } else { + response = provisionWorkflow(fullAccessClient(), workflowId); + } assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); // Invoke status API