diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AggregatedHealthStatusRequest.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AggregatedHealthStatusRequest.java index 2e82f8d596..0eff306ead 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AggregatedHealthStatusRequest.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AggregatedHealthStatusRequest.java @@ -2,27 +2,35 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Collections; import java.util.List; public class AggregatedHealthStatusRequest { - List instances; - List to_be_stopped_instances; - String cluster_id; + private final String cluster_id; + private final List instances; + private final List to_be_stopped_instances; @JsonCreator public AggregatedHealthStatusRequest( + @JsonProperty("cluster_id") String cluster_id, @JsonProperty("instances") List instances, - @JsonProperty("to_be_stopped_instances") List to_be_stopped_instances, - @JsonProperty("cluster_id") String cluster_id) { - this.instances = instances; - this.to_be_stopped_instances = to_be_stopped_instances; + @JsonProperty("to_be_stopped_instances") List to_be_stopped_instances) { + if (cluster_id == null) { + throw new IllegalArgumentException("'cluster_id' is required"); + } this.cluster_id = cluster_id; - } - @JsonProperty("cluster_id") - public void setClusterId(String cluster_id) { - this.cluster_id = cluster_id; + if (instances == null) { + throw new IllegalArgumentException("'instances' is required"); + } + this.instances = instances; + + if (to_be_stopped_instances == null) { + this.to_be_stopped_instances = Collections.emptyList(); + } else { + this.to_be_stopped_instances = to_be_stopped_instances; + } } @JsonProperty("cluster_id") @@ -35,18 +43,8 @@ public List getInstances() { return instances; } - @JsonProperty("instances") - public void setInstances(List instances) { - this.instances = instances; - } - @JsonProperty("to_be_stopped_instances") public List getToBeStoppedInstances() { return to_be_stopped_instances; } - - @JsonProperty("to_be_stopped_instances") - public void setToBeStoppedInstances(List to_be_stopped_instances) { - this.to_be_stopped_instances = to_be_stopped_instances; - } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index 09953dbe50..bc1bf148c9 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -244,6 +244,4 @@ public class ControllerApiConstants { public static final String NEARLINE_PRODUCER_COMPRESSION_ENABLED = "nearline_producer_compression_enabled"; public static final String NEARLINE_PRODUCER_COUNT_PER_WRITER = "nearline_producer_count_per_writer"; - - public static final String AGGR_HEALTH_STATUS_URI = "/aggregatedHealthStatus"; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java index e122e8b372..fcd695c69d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java @@ -923,7 +923,7 @@ public StoppableNodeStatusResponse getAggregatedHealthStatus( List toBeStoppedInstances) throws JsonProcessingException { AggregatedHealthStatusRequest request = - new AggregatedHealthStatusRequest(instances, toBeStoppedInstances, clusterName); + new AggregatedHealthStatusRequest(clusterName, instances, toBeStoppedInstances); String requestString = OBJECT_MAPPER.writeValueAsString(request); return request( ControllerRoute.AGGREGATED_HEALTH_STATUS, diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/TestAggregatedHealthStatusRequest.java b/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/TestAggregatedHealthStatusRequest.java new file mode 100644 index 0000000000..4620498b1e --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/TestAggregatedHealthStatusRequest.java @@ -0,0 +1,60 @@ +package com.linkedin.venice.controllerapi; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; +import com.linkedin.venice.utils.ObjectMapperFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestAggregatedHealthStatusRequest { + private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance(); + + @Test + public void testDeserializationWithAllFields() throws JsonProcessingException { + String json = + "{\"cluster_id\":\"cluster1\",\"instances\":[\"instance1\",\"instance2\"],\"to_be_stopped_instances\":[\"instance3\",\"instance4\"]}"; + AggregatedHealthStatusRequest request = OBJECT_MAPPER.readValue(json, AggregatedHealthStatusRequest.class); + + assertEquals(request.getClusterId(), "cluster1"); + assertEquals(request.getInstances().size(), 2); + assertEquals(request.getInstances().get(0), "instance1"); + assertEquals(request.getInstances().get(1), "instance2"); + assertEquals(request.getToBeStoppedInstances().size(), 2); + assertEquals(request.getToBeStoppedInstances().get(0), "instance3"); + assertEquals(request.getToBeStoppedInstances().get(1), "instance4"); + } + + @Test + public void testDeserializationWithMandatoryFields() throws JsonProcessingException { + String json = "{\"cluster_id\":\"cluster1\",\"instances\":[\"instance1\",\"instance2\"]}"; + AggregatedHealthStatusRequest request = OBJECT_MAPPER.readValue(json, AggregatedHealthStatusRequest.class); + + assertEquals(request.getClusterId(), "cluster1"); + assertEquals(request.getInstances().size(), 2); + assertEquals(request.getInstances().get(0), "instance1"); + assertEquals(request.getInstances().get(1), "instance2"); + assertNotNull(request.getToBeStoppedInstances()); + assertTrue(request.getToBeStoppedInstances().isEmpty()); + } + + @Test + public void testDeserializationWithMissingMandatoryFields() { + String json = "{\"instances\":[\"instance1\",\"instance2\"]}"; + ValueInstantiationException e = Assert.expectThrows( + ValueInstantiationException.class, + () -> OBJECT_MAPPER.readValue(json, AggregatedHealthStatusRequest.class)); + assertTrue(e.getMessage().contains("'cluster_id' is required"), e.getMessage()); + + String json2 = "{\"cluster_id\":\"cluster1\"}"; + ValueInstantiationException e2 = Assert.expectThrows( + ValueInstantiationException.class, + () -> OBJECT_MAPPER.readValue(json2, AggregatedHealthStatusRequest.class)); + assertTrue(e2.getMessage().contains("'instances' is required"), e2.getMessage()); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java index a3779ebeb3..82fd3b9605 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java @@ -73,6 +73,7 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.message.BasicNameValuePair; +import org.apache.http.nio.client.HttpAsyncClient; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -218,10 +219,30 @@ public void testAggregatedHealthStatusCall() throws IOException, ExecutionExcept httpClient.start(); int serverPort = venice.getChildRegions().get(0).getClusters().get(clusterName).getVeniceServers().get(0).getPort(); String server = Utils.getHelixNodeIdentifier(Utils.getHostName(), serverPort); + + // API call with all fields Map payloads = new HashMap<>(); payloads.put("cluster_id", clusterName); - payloads.put("instances", Arrays.asList(server)); + payloads.put("instances", Collections.singletonList(server)); payloads.put("to_be_stopped_instances", Collections.emptyList()); + + InstanceRemovableStatuses statuses = makeAggregatedHealthStatusCall(httpClient, payloads); + Assert.assertTrue(statuses.getNonStoppableInstancesWithReasons().containsKey(server)); + + // API call without optional to_be_stopped_instances + Map payloads2 = new HashMap<>(); + payloads2.put("cluster_id", clusterName); + payloads2.put("instances", Collections.singletonList(server)); + + InstanceRemovableStatuses statuses2 = makeAggregatedHealthStatusCall(httpClient, payloads2); + Assert.assertTrue(statuses2.getNonStoppableInstancesWithReasons().containsKey(server)); + + httpClient.close(); + } + + private InstanceRemovableStatuses makeAggregatedHealthStatusCall( + HttpAsyncClient httpClient, + Map payloads) throws IOException, ExecutionException, InterruptedException { StringEntity entity = new StringEntity(OBJECT_MAPPER.writeValueAsString(payloads), ContentType.APPLICATION_JSON); final HttpPost post = new HttpPost( @@ -232,9 +253,7 @@ public void testAggregatedHealthStatusCall() throws IOException, ExecutionExcept Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200); String responseString = IOUtils.toString(httpResponse.getEntity().getContent()); - InstanceRemovableStatuses statuses = OBJECT_MAPPER.readValue(responseString, InstanceRemovableStatuses.class); - Assert.assertTrue(statuses.getNonStoppableInstancesWithReasons().containsKey(server)); - httpClient.close(); + return OBJECT_MAPPER.readValue(responseString, InstanceRemovableStatuses.class); } @Test(timeOut = TEST_TIMEOUT) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRoutes.java index c2580b5b11..629da787c3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRoutes.java @@ -1,11 +1,11 @@ package com.linkedin.venice.controller.server; -import static com.linkedin.venice.controllerapi.ControllerApiConstants.AGGR_HEALTH_STATUS_URI; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_LOG_COMPACTION_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_MIN_IN_SYNC_REPLICA; import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_RETENTION_IN_MS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.TOPIC; +import static com.linkedin.venice.controllerapi.ControllerRoute.AGGREGATED_HEALTH_STATUS; import static com.linkedin.venice.controllerapi.ControllerRoute.LEADER_CONTROLLER; import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_CHILD_CLUSTERS; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_KAFKA_TOPIC_LOG_COMPACTION; @@ -211,7 +211,9 @@ public Route getAggregatedHealthStatus(Admin admin) { InstanceRemovableStatuses statuses = admin.getAggregatedHealthStatus(cluster, instanceList, toBeStoppedInstanceList, isSslEnabled()); if (statuses.getRedirectUrl() != null) { - response.redirect(statuses.getRedirectUrl() + AGGR_HEALTH_STATUS_URI, HttpStatus.SC_MOVED_TEMPORARILY); + response.redirect( + statuses.getRedirectUrl() + AGGREGATED_HEALTH_STATUS.getPath(), + HttpStatus.SC_MOVED_TEMPORARILY); return null; } else { responseObject.setNonStoppableInstancesWithReason(statuses.getNonStoppableInstancesWithReasons());