Skip to content

Commit

Permalink
[controller] Make to_be_stopped_instances in aggregatedHealthStatus A…
Browse files Browse the repository at this point in the history
…PI optional (#1274)

In Helix's aggregated stoppable check, the "to_be_stopped_instances" is optional and will be omitted in case there are no instances that have previously been approved. This commit makes our controller handle this.
  • Loading branch information
nisargthakkar authored Nov 4, 2024
1 parent 66bfa36 commit 9b6f027
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.List;


public class AggregatedHealthStatusRequest {
List<String> instances;
List<String> to_be_stopped_instances;
String cluster_id;
private final String cluster_id;
private final List<String> instances;
private final List<String> to_be_stopped_instances;

@JsonCreator
public AggregatedHealthStatusRequest(
@JsonProperty("cluster_id") String cluster_id,
@JsonProperty("instances") List<String> instances,
@JsonProperty("to_be_stopped_instances") List<String> to_be_stopped_instances,
@JsonProperty("cluster_id") String cluster_id) {
this.instances = instances;
this.to_be_stopped_instances = to_be_stopped_instances;
@JsonProperty("to_be_stopped_instances") List<String> to_be_stopped_instances) {
if (cluster_id == null) {
throw new IllegalArgumentException("'cluster_id' is required");
}
this.cluster_id = cluster_id;
}

@JsonProperty("cluster_id")
public void setClusterId(String cluster_id) {
this.cluster_id = cluster_id;
if (instances == null) {
throw new IllegalArgumentException("'instances' is required");
}
this.instances = instances;

if (to_be_stopped_instances == null) {
this.to_be_stopped_instances = Collections.emptyList();
} else {
this.to_be_stopped_instances = to_be_stopped_instances;
}
}

@JsonProperty("cluster_id")
Expand All @@ -35,18 +43,8 @@ public List<String> getInstances() {
return instances;
}

@JsonProperty("instances")
public void setInstances(List<String> instances) {
this.instances = instances;
}

@JsonProperty("to_be_stopped_instances")
public List<String> getToBeStoppedInstances() {
return to_be_stopped_instances;
}

@JsonProperty("to_be_stopped_instances")
public void setToBeStoppedInstances(List<String> to_be_stopped_instances) {
this.to_be_stopped_instances = to_be_stopped_instances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,4 @@ public class ControllerApiConstants {

public static final String NEARLINE_PRODUCER_COMPRESSION_ENABLED = "nearline_producer_compression_enabled";
public static final String NEARLINE_PRODUCER_COUNT_PER_WRITER = "nearline_producer_count_per_writer";

public static final String AGGR_HEALTH_STATUS_URI = "/aggregatedHealthStatus";
}
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ public StoppableNodeStatusResponse getAggregatedHealthStatus(
List<String> toBeStoppedInstances) throws JsonProcessingException {

AggregatedHealthStatusRequest request =
new AggregatedHealthStatusRequest(instances, toBeStoppedInstances, clusterName);
new AggregatedHealthStatusRequest(clusterName, instances, toBeStoppedInstances);
String requestString = OBJECT_MAPPER.writeValueAsString(request);
return request(
ControllerRoute.AGGREGATED_HEALTH_STATUS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.linkedin.venice.controllerapi;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import com.linkedin.venice.utils.ObjectMapperFactory;
import org.testng.Assert;
import org.testng.annotations.Test;


public class TestAggregatedHealthStatusRequest {
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();

@Test
public void testDeserializationWithAllFields() throws JsonProcessingException {
String json =
"{\"cluster_id\":\"cluster1\",\"instances\":[\"instance1\",\"instance2\"],\"to_be_stopped_instances\":[\"instance3\",\"instance4\"]}";
AggregatedHealthStatusRequest request = OBJECT_MAPPER.readValue(json, AggregatedHealthStatusRequest.class);

assertEquals(request.getClusterId(), "cluster1");
assertEquals(request.getInstances().size(), 2);
assertEquals(request.getInstances().get(0), "instance1");
assertEquals(request.getInstances().get(1), "instance2");
assertEquals(request.getToBeStoppedInstances().size(), 2);
assertEquals(request.getToBeStoppedInstances().get(0), "instance3");
assertEquals(request.getToBeStoppedInstances().get(1), "instance4");
}

@Test
public void testDeserializationWithMandatoryFields() throws JsonProcessingException {
String json = "{\"cluster_id\":\"cluster1\",\"instances\":[\"instance1\",\"instance2\"]}";
AggregatedHealthStatusRequest request = OBJECT_MAPPER.readValue(json, AggregatedHealthStatusRequest.class);

assertEquals(request.getClusterId(), "cluster1");
assertEquals(request.getInstances().size(), 2);
assertEquals(request.getInstances().get(0), "instance1");
assertEquals(request.getInstances().get(1), "instance2");
assertNotNull(request.getToBeStoppedInstances());
assertTrue(request.getToBeStoppedInstances().isEmpty());
}

@Test
public void testDeserializationWithMissingMandatoryFields() {
String json = "{\"instances\":[\"instance1\",\"instance2\"]}";
ValueInstantiationException e = Assert.expectThrows(
ValueInstantiationException.class,
() -> OBJECT_MAPPER.readValue(json, AggregatedHealthStatusRequest.class));
assertTrue(e.getMessage().contains("'cluster_id' is required"), e.getMessage());

String json2 = "{\"cluster_id\":\"cluster1\"}";
ValueInstantiationException e2 = Assert.expectThrows(
ValueInstantiationException.class,
() -> OBJECT_MAPPER.readValue(json2, AggregatedHealthStatusRequest.class));
assertTrue(e2.getMessage().contains("'instances' is required"), e2.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.nio.client.HttpAsyncClient;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -218,10 +219,30 @@ public void testAggregatedHealthStatusCall() throws IOException, ExecutionExcept
httpClient.start();
int serverPort = venice.getChildRegions().get(0).getClusters().get(clusterName).getVeniceServers().get(0).getPort();
String server = Utils.getHelixNodeIdentifier(Utils.getHostName(), serverPort);

// API call with all fields
Map<String, Object> payloads = new HashMap<>();
payloads.put("cluster_id", clusterName);
payloads.put("instances", Arrays.asList(server));
payloads.put("instances", Collections.singletonList(server));
payloads.put("to_be_stopped_instances", Collections.emptyList());

InstanceRemovableStatuses statuses = makeAggregatedHealthStatusCall(httpClient, payloads);
Assert.assertTrue(statuses.getNonStoppableInstancesWithReasons().containsKey(server));

// API call without optional to_be_stopped_instances
Map<String, Object> payloads2 = new HashMap<>();
payloads2.put("cluster_id", clusterName);
payloads2.put("instances", Collections.singletonList(server));

InstanceRemovableStatuses statuses2 = makeAggregatedHealthStatusCall(httpClient, payloads2);
Assert.assertTrue(statuses2.getNonStoppableInstancesWithReasons().containsKey(server));

httpClient.close();
}

private InstanceRemovableStatuses makeAggregatedHealthStatusCall(
HttpAsyncClient httpClient,
Map<String, Object> payloads) throws IOException, ExecutionException, InterruptedException {
StringEntity entity = new StringEntity(OBJECT_MAPPER.writeValueAsString(payloads), ContentType.APPLICATION_JSON);

final HttpPost post = new HttpPost(
Expand All @@ -232,9 +253,7 @@ public void testAggregatedHealthStatusCall() throws IOException, ExecutionExcept

Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200);
String responseString = IOUtils.toString(httpResponse.getEntity().getContent());
InstanceRemovableStatuses statuses = OBJECT_MAPPER.readValue(responseString, InstanceRemovableStatuses.class);
Assert.assertTrue(statuses.getNonStoppableInstancesWithReasons().containsKey(server));
httpClient.close();
return OBJECT_MAPPER.readValue(responseString, InstanceRemovableStatuses.class);
}

@Test(timeOut = TEST_TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.linkedin.venice.controller.server;

import static com.linkedin.venice.controllerapi.ControllerApiConstants.AGGR_HEALTH_STATUS_URI;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_LOG_COMPACTION_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_MIN_IN_SYNC_REPLICA;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_RETENTION_IN_MS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TOPIC;
import static com.linkedin.venice.controllerapi.ControllerRoute.AGGREGATED_HEALTH_STATUS;
import static com.linkedin.venice.controllerapi.ControllerRoute.LEADER_CONTROLLER;
import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_CHILD_CLUSTERS;
import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_KAFKA_TOPIC_LOG_COMPACTION;
Expand Down Expand Up @@ -211,7 +211,9 @@ public Route getAggregatedHealthStatus(Admin admin) {
InstanceRemovableStatuses statuses =
admin.getAggregatedHealthStatus(cluster, instanceList, toBeStoppedInstanceList, isSslEnabled());
if (statuses.getRedirectUrl() != null) {
response.redirect(statuses.getRedirectUrl() + AGGR_HEALTH_STATUS_URI, HttpStatus.SC_MOVED_TEMPORARILY);
response.redirect(
statuses.getRedirectUrl() + AGGREGATED_HEALTH_STATUS.getPath(),
HttpStatus.SC_MOVED_TEMPORARILY);
return null;
} else {
responseObject.setNonStoppableInstancesWithReason(statuses.getNonStoppableInstancesWithReasons());
Expand Down

0 comments on commit 9b6f027

Please sign in to comment.