Skip to content

Commit

Permalink
[fc][test]make streamingBatchGet return partial response (#597)
Browse files Browse the repository at this point in the history
* Make streamingBatchGet(keys) return VeniceResponseCompletableFuture to return partial response
* Add unit tests
  • Loading branch information
m-nagarajan authored Aug 24, 2023
1 parent 5d43de4 commit f74942c
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ protected CompletableFuture<VeniceResponseMap<K, V>> streamingBatchGet(
BatchGetRequestContext<K, V> requestContext,
Set<K> keys) {
long startTimeInNS = System.nanoTime();
CompletableFuture<VeniceResponseMap<K, V>> innerFuture = super.streamingBatchGet(requestContext, keys);
return recordMetrics(requestContext, keys.size(), innerFuture, startTimeInNS, clientStatsForBatchGet);
CompletableFuture<VeniceResponseMap<K, V>> streamingBatchGetFuture = super.streamingBatchGet(requestContext, keys);
recordMetrics(requestContext, keys.size(), streamingBatchGetFuture, startTimeInNS, clientStatsForBatchGet);
return streamingBatchGetFuture;
}

private <R> CompletableFuture<R> recordMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.linkedin.r2.transport.common.Client;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.streaming.VeniceResponseMap;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.compression.CompressionStrategy;
Expand All @@ -40,6 +41,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -78,25 +80,37 @@ private void setUpClient() {
}

private void setUpClient(boolean useStreamingBatchGetAsDefault) {
setUpClient(useStreamingBatchGetAsDefault, false, false);
setUpClient(useStreamingBatchGetAsDefault, false, false, false);
}

private void setUpClient(
boolean useStreamingBatchGetAsDefault,
boolean transportClientThrowsException,
boolean transportClientThrowsPartialException) {
boolean transportClientThrowsPartialException,
boolean transportClientPartialIncomplete) {
setUpClient(
useStreamingBatchGetAsDefault,
transportClientThrowsException,
transportClientThrowsPartialException,
transportClientPartialIncomplete,
true,
TimeUnit.SECONDS.toMillis(30));
}

/**
* @param useStreamingBatchGetAsDefault use streaming batch get or single get based batch get
* @param transportClientThrowsException throws exception for both the keys
* @param transportClientThrowsPartialException responds correct value for the 1st key and throws exception for the 2nd key
* @param transportClientPartialIncomplete responds correct value for the 1st key and not do anything for 2nd key
* @param mockTransportClient mock the transport client to be able to respond with the actual values or exception.
* If not, the hosts won't be reachable as it's not setup to be reachable.
* @param routingLeakedRequestCleanupThresholdMS time to set routingLeakedRequestCleanupThresholdMS client config.
*/
private void setUpClient(
boolean useStreamingBatchGetAsDefault,
boolean transportClientThrowsException,
boolean transportClientThrowsPartialException, // only applicable for useStreamingBatchGetAsDefault
boolean transportClientPartialIncomplete, // only applicable for useStreamingBatchGetAsDefault
boolean mockTransportClient,
long routingLeakedRequestCleanupThresholdMS) {
clientConfigBuilder = new ClientConfig.ClientConfigBuilder<>().setStoreName(STORE_NAME)
Expand Down Expand Up @@ -161,7 +175,7 @@ private void setUpClient(
if (transportClientThrowsException) {
doReturn(batchGetValueFuture0).when(mockedTransportClient).post(any(), any(), any());
batchGetValueFuture0.completeExceptionally(new VeniceClientException("Exception for client to return 503"));
} else if (transportClientThrowsPartialException) {
} else if (transportClientThrowsPartialException || transportClientPartialIncomplete) {
// return valid response for 1 route(1 key) and exception for the other
batchGetResponse0 = new TransportClientResponseForRoute(
"0",
Expand All @@ -174,7 +188,9 @@ private void setUpClient(
batchGetValueFuture0.complete(batchGetResponse0);
doReturn(batchGetValueFuture1).when(mockedTransportClient)
.post(eq(REPLICA2_NAME + "/storage/test_store_v1"), any(), any());
batchGetValueFuture1.completeExceptionally(new VeniceClientException("Exception for client to return 503"));
if (!transportClientPartialIncomplete) {
batchGetValueFuture1.completeExceptionally(new VeniceClientException("Exception for client to return 503"));
}
} else {
batchGetResponse0 = new TransportClientResponseForRoute(
"0",
Expand Down Expand Up @@ -390,7 +406,7 @@ public void testGet() throws ExecutionException, InterruptedException, IOExcepti
@Test(timeOut = TEST_TIMEOUT)
public void testGetWithExceptionFromTransportLayer() throws IOException {
try {
setUpClient(false, true, false);
setUpClient(false, true, false, false);
getRequestContext = new GetRequestContext();
statsAvroGenericStoreClient.get(getRequestContext, "test_key").get().toString();
fail();
Expand All @@ -406,7 +422,7 @@ public void testGetWithExceptionFromTransportLayer() throws IOException {
@Test(timeOut = TEST_TIMEOUT)
public void testGetToUnreachableClient() throws IOException {
try {
setUpClient(false, false, false, false, 2 * Time.MS_PER_SECOND);
setUpClient(false, false, false, false, false, 2 * Time.MS_PER_SECOND);
getRequestContext = new GetRequestContext();
statsAvroGenericStoreClient.get(getRequestContext, "test_key").get();
fail();
Expand Down Expand Up @@ -448,7 +464,7 @@ public void testBatchGet(boolean useStreamingBatchGetAsDefault)
@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT)
public void testBatchGetWithExceptionFromTransportLayer(boolean useStreamingBatchGetAsDefault) throws IOException {
try {
setUpClient(useStreamingBatchGetAsDefault, true, false);
setUpClient(useStreamingBatchGetAsDefault, true, false, false);
batchGetRequestContext = new BatchGetRequestContext<>();
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get();
fail();
Expand All @@ -468,7 +484,7 @@ public void testBatchGetWithExceptionFromTransportLayer(boolean useStreamingBatc
public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useStreamingBatchGetAsDefault)
throws IOException {
try {
setUpClient(useStreamingBatchGetAsDefault, false, true);
setUpClient(useStreamingBatchGetAsDefault, false, true, false);
batchGetRequestContext = new BatchGetRequestContext<>();
Map<String, String> value =
(Map<String, String>) statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get();
Expand All @@ -494,10 +510,175 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt
}
}

/**
* Condition to test: batchGet API either returns full results or exception, but no partial results.
* In this test:
* setup: 1 key returns valid value and the other key doesn't return anything.
* Behavior: this test calls batchGet().get() without timeout, so waits till routingLeakedRequestCleanupThresholdMS
* times out and returns exception with "At least one route did not complete".
*/
@Test(timeOut = TEST_TIMEOUT, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*VeniceClientException: At least one route did not complete")
public void testBatchGetWithTimeoutV1() throws IOException, ExecutionException, InterruptedException {
long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1);
try {
setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS);
batchGetRequestContext = new BatchGetRequestContext<>();
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get();
fail();
} finally {
// wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased
metrics = getStats(clientConfig);
TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> {
assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0);
});
validateMultiGetMetrics(false, true, true, false);
tearDown();
}
}

/**
* Condition to test: batchGet API either returns full results or exception, but no partial results.
* In this test:
* setup: 1 key returns valid value and the other key doesn't return anything.
* Behavior: routingLeakedRequestCleanupThresholdMS times out before batchGet().get(timeout),
* so returns exception with "At least one route did not complete".
*/
@Test(timeOut = TEST_TIMEOUT, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*VeniceClientException: At least one route did not complete")
public void testBatchGetWithTimeoutV2()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1);
try {
setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS);
batchGetRequestContext = new BatchGetRequestContext<>();
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(2, TimeUnit.SECONDS);
fail();
} finally {
// wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased
metrics = getStats(clientConfig);
TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> {
assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0);
});
validateMultiGetMetrics(false, true, true, false);
tearDown();
}
}

/**
* Condition to test: batchGet API either returns full results or exception, but no partial results.
* In this test:
* setup: 1 key returns valid value and the other key doesn't return anything.
* Behavior: batchGet().get(timeout) times out before routingLeakedRequestCleanupThresholdMS,
* so AppTimeOutTrackingCompletableFuture returns TimeoutException confirming no partial returns.
*/
@Test(timeOut = TEST_TIMEOUT, expectedExceptions = TimeoutException.class)
public void testBatchGetWithTimeoutV3()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(2);
try {
setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS);
batchGetRequestContext = new BatchGetRequestContext<>();
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(1, TimeUnit.SECONDS);
fail();
} finally {
// wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased
metrics = getStats(clientConfig);
TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> {
assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0);
});
validateMultiGetMetrics(false, true, true, false);
tearDown();
}
}

/**
* Condition to test: streamingBatchGet(keys) API returns partial results in case of future.get(timeout)
* setup: 1 key returns valid value and the other key doesn't return anything.
* Behavior: this test calls streamingBatchGet().get() without timeout, so waits till routingLeakedRequestCleanupThresholdMS
* times out and returns exception with "At least one route did not complete".
*/
@Test(timeOut = TEST_TIMEOUT, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*VeniceClientException: At least one route did not complete")
public void testStreamingBatchGetWithTimeoutV1() throws IOException, ExecutionException, InterruptedException {
long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1);
try {
setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS);
batchGetRequestContext = new BatchGetRequestContext<>();
CompletableFuture<VeniceResponseMap<String, String>> future =
statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS);
future.get();
fail();
} finally {
// wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased
metrics = getStats(clientConfig);
TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> {
assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0);
});
validateMultiGetMetrics(false, true, true, false);
tearDown();
}
}

/**
* Condition to test: streamingBatchGet(keys) API returns partial results in case of future.get(timeout)
* setup: 1 key returns valid value and the other key doesn't return anything.
* Behavior: routingLeakedRequestCleanupThresholdMS times out before streamingBatchGet().get(timeout),
* so returns exception with "At least one route did not complete".
*/
@Test(timeOut = TEST_TIMEOUT, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*VeniceClientException: At least one route did not complete")
public void testStreamingBatchGetWithTimeoutV2()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1);
try {
setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS);
batchGetRequestContext = new BatchGetRequestContext<>();
CompletableFuture<VeniceResponseMap<String, String>> future =
statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS);
future.get(2, TimeUnit.SECONDS);
fail();
} finally {
// wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased
metrics = getStats(clientConfig);
TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> {
assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0);
});
validateMultiGetMetrics(false, true, true, false);
tearDown();
}
}

/**
* Condition to test: streamingBatchGet(keys) API returns partial results in case of future.get(timeout)
* setup: 1 key returns valid value and the other key doesn't return anything.
* Behavior: streamingBatchGet().get(timeout) times out before routingLeakedRequestCleanupThresholdMS,
* so returns partial response.
*/
@Test(timeOut = TEST_TIMEOUT)
public void testStreamingBatchGetWithTimeoutV3()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
try {
long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(2);
setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS);
batchGetRequestContext = new BatchGetRequestContext<>();
CompletableFuture<VeniceResponseMap<String, String>> future =
statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS);
VeniceResponseMap<String, String> value = future.get(1, TimeUnit.SECONDS);
assertEquals(value.size(), 1);
assertFalse(value.isFullResponse());
assertTrue(BATCH_GET_VALUE_RESPONSE.get("test_key_1").contentEquals(value.get("test_key_1")));
// wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased
metrics = getStats(clientConfig);
TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> {
assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0);
});
validateMultiGetMetrics(false, true, true, false);
} finally {
tearDown();
}
}

@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT)
public void testBatchGetToUnreachableClient(boolean useStreamingBatchGetAsDefault) throws IOException {
try {
setUpClient(useStreamingBatchGetAsDefault, false, false, false, 6 * Time.MS_PER_SECOND);
setUpClient(useStreamingBatchGetAsDefault, false, false, false, false, TimeUnit.SECONDS.toMillis(1));
batchGetRequestContext = new BatchGetRequestContext<>();
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get();
fail();
Expand Down
Loading

0 comments on commit f74942c

Please sign in to comment.