diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java index a177d9672d..b7f887529a 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java @@ -126,8 +126,9 @@ protected CompletableFuture> streamingBatchGet( BatchGetRequestContext requestContext, Set keys) { long startTimeInNS = System.nanoTime(); - CompletableFuture> innerFuture = super.streamingBatchGet(requestContext, keys); - return recordMetrics(requestContext, keys.size(), innerFuture, startTimeInNS, clientStatsForBatchGet); + CompletableFuture> streamingBatchGetFuture = super.streamingBatchGet(requestContext, keys); + recordMetrics(requestContext, keys.size(), streamingBatchGetFuture, startTimeInNS, clientStatsForBatchGet); + return streamingBatchGetFuture; } private CompletableFuture recordMetrics( diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java index 5a4a6eaf03..131d961b3b 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java @@ -14,6 +14,7 @@ import com.linkedin.r2.transport.common.Client; import com.linkedin.venice.client.exceptions.VeniceClientException; +import com.linkedin.venice.client.store.streaming.VeniceResponseMap; import com.linkedin.venice.client.store.transport.TransportClient; import com.linkedin.venice.client.store.transport.TransportClientResponse; import com.linkedin.venice.compression.CompressionStrategy; @@ -40,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.avro.Schema; import org.testng.annotations.BeforeClass; @@ -78,25 +80,37 @@ private void setUpClient() { } private void setUpClient(boolean useStreamingBatchGetAsDefault) { - setUpClient(useStreamingBatchGetAsDefault, false, false); + setUpClient(useStreamingBatchGetAsDefault, false, false, false); } private void setUpClient( boolean useStreamingBatchGetAsDefault, boolean transportClientThrowsException, - boolean transportClientThrowsPartialException) { + boolean transportClientThrowsPartialException, + boolean transportClientPartialIncomplete) { setUpClient( useStreamingBatchGetAsDefault, transportClientThrowsException, transportClientThrowsPartialException, + transportClientPartialIncomplete, true, TimeUnit.SECONDS.toMillis(30)); } + /** + * @param useStreamingBatchGetAsDefault use streaming batch get or single get based batch get + * @param transportClientThrowsException throws exception for both the keys + * @param transportClientThrowsPartialException responds correct value for the 1st key and throws exception for the 2nd key + * @param transportClientPartialIncomplete responds correct value for the 1st key and not do anything for 2nd key + * @param mockTransportClient mock the transport client to be able to respond with the actual values or exception. + * If not, the hosts won't be reachable as it's not setup to be reachable. + * @param routingLeakedRequestCleanupThresholdMS time to set routingLeakedRequestCleanupThresholdMS client config. + */ private void setUpClient( boolean useStreamingBatchGetAsDefault, boolean transportClientThrowsException, boolean transportClientThrowsPartialException, // only applicable for useStreamingBatchGetAsDefault + boolean transportClientPartialIncomplete, // only applicable for useStreamingBatchGetAsDefault boolean mockTransportClient, long routingLeakedRequestCleanupThresholdMS) { clientConfigBuilder = new ClientConfig.ClientConfigBuilder<>().setStoreName(STORE_NAME) @@ -161,7 +175,7 @@ private void setUpClient( if (transportClientThrowsException) { doReturn(batchGetValueFuture0).when(mockedTransportClient).post(any(), any(), any()); batchGetValueFuture0.completeExceptionally(new VeniceClientException("Exception for client to return 503")); - } else if (transportClientThrowsPartialException) { + } else if (transportClientThrowsPartialException || transportClientPartialIncomplete) { // return valid response for 1 route(1 key) and exception for the other batchGetResponse0 = new TransportClientResponseForRoute( "0", @@ -174,7 +188,9 @@ private void setUpClient( batchGetValueFuture0.complete(batchGetResponse0); doReturn(batchGetValueFuture1).when(mockedTransportClient) .post(eq(REPLICA2_NAME + "/storage/test_store_v1"), any(), any()); - batchGetValueFuture1.completeExceptionally(new VeniceClientException("Exception for client to return 503")); + if (!transportClientPartialIncomplete) { + batchGetValueFuture1.completeExceptionally(new VeniceClientException("Exception for client to return 503")); + } } else { batchGetResponse0 = new TransportClientResponseForRoute( "0", @@ -390,7 +406,7 @@ public void testGet() throws ExecutionException, InterruptedException, IOExcepti @Test(timeOut = TEST_TIMEOUT) public void testGetWithExceptionFromTransportLayer() throws IOException { try { - setUpClient(false, true, false); + setUpClient(false, true, false, false); getRequestContext = new GetRequestContext(); statsAvroGenericStoreClient.get(getRequestContext, "test_key").get().toString(); fail(); @@ -406,7 +422,7 @@ public void testGetWithExceptionFromTransportLayer() throws IOException { @Test(timeOut = TEST_TIMEOUT) public void testGetToUnreachableClient() throws IOException { try { - setUpClient(false, false, false, false, 2 * Time.MS_PER_SECOND); + setUpClient(false, false, false, false, false, 2 * Time.MS_PER_SECOND); getRequestContext = new GetRequestContext(); statsAvroGenericStoreClient.get(getRequestContext, "test_key").get(); fail(); @@ -448,7 +464,7 @@ public void testBatchGet(boolean useStreamingBatchGetAsDefault) @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) public void testBatchGetWithExceptionFromTransportLayer(boolean useStreamingBatchGetAsDefault) throws IOException { try { - setUpClient(useStreamingBatchGetAsDefault, true, false); + setUpClient(useStreamingBatchGetAsDefault, true, false, false); batchGetRequestContext = new BatchGetRequestContext<>(); statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); fail(); @@ -468,7 +484,7 @@ public void testBatchGetWithExceptionFromTransportLayer(boolean useStreamingBatc public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useStreamingBatchGetAsDefault) throws IOException { try { - setUpClient(useStreamingBatchGetAsDefault, false, true); + setUpClient(useStreamingBatchGetAsDefault, false, true, false); batchGetRequestContext = new BatchGetRequestContext<>(); Map value = (Map) statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); @@ -494,10 +510,175 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt } } + /** + * Condition to test: batchGet API either returns full results or exception, but no partial results. + * In this test: + * setup: 1 key returns valid value and the other key doesn't return anything. + * Behavior: this test calls batchGet().get() without timeout, so waits till routingLeakedRequestCleanupThresholdMS + * times out and returns exception with "At least one route did not complete". + */ + @Test(timeOut = TEST_TIMEOUT, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*VeniceClientException: At least one route did not complete") + public void testBatchGetWithTimeoutV1() throws IOException, ExecutionException, InterruptedException { + long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1); + try { + setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } finally { + // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased + metrics = getStats(clientConfig); + TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + }); + validateMultiGetMetrics(false, true, true, false); + tearDown(); + } + } + + /** + * Condition to test: batchGet API either returns full results or exception, but no partial results. + * In this test: + * setup: 1 key returns valid value and the other key doesn't return anything. + * Behavior: routingLeakedRequestCleanupThresholdMS times out before batchGet().get(timeout), + * so returns exception with "At least one route did not complete". + */ + @Test(timeOut = TEST_TIMEOUT, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*VeniceClientException: At least one route did not complete") + public void testBatchGetWithTimeoutV2() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1); + try { + setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(2, TimeUnit.SECONDS); + fail(); + } finally { + // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased + metrics = getStats(clientConfig); + TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + }); + validateMultiGetMetrics(false, true, true, false); + tearDown(); + } + } + + /** + * Condition to test: batchGet API either returns full results or exception, but no partial results. + * In this test: + * setup: 1 key returns valid value and the other key doesn't return anything. + * Behavior: batchGet().get(timeout) times out before routingLeakedRequestCleanupThresholdMS, + * so AppTimeOutTrackingCompletableFuture returns TimeoutException confirming no partial returns. + */ + @Test(timeOut = TEST_TIMEOUT, expectedExceptions = TimeoutException.class) + public void testBatchGetWithTimeoutV3() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(2); + try { + setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(1, TimeUnit.SECONDS); + fail(); + } finally { + // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased + metrics = getStats(clientConfig); + TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + }); + validateMultiGetMetrics(false, true, true, false); + tearDown(); + } + } + + /** + * Condition to test: streamingBatchGet(keys) API returns partial results in case of future.get(timeout) + * setup: 1 key returns valid value and the other key doesn't return anything. + * Behavior: this test calls streamingBatchGet().get() without timeout, so waits till routingLeakedRequestCleanupThresholdMS + * times out and returns exception with "At least one route did not complete". + */ + @Test(timeOut = TEST_TIMEOUT, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*VeniceClientException: At least one route did not complete") + public void testStreamingBatchGetWithTimeoutV1() throws IOException, ExecutionException, InterruptedException { + long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1); + try { + setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS); + batchGetRequestContext = new BatchGetRequestContext<>(); + CompletableFuture> future = + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS); + future.get(); + fail(); + } finally { + // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased + metrics = getStats(clientConfig); + TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + }); + validateMultiGetMetrics(false, true, true, false); + tearDown(); + } + } + + /** + * Condition to test: streamingBatchGet(keys) API returns partial results in case of future.get(timeout) + * setup: 1 key returns valid value and the other key doesn't return anything. + * Behavior: routingLeakedRequestCleanupThresholdMS times out before streamingBatchGet().get(timeout), + * so returns exception with "At least one route did not complete". + */ + @Test(timeOut = TEST_TIMEOUT, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*VeniceClientException: At least one route did not complete") + public void testStreamingBatchGetWithTimeoutV2() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1); + try { + setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS); + batchGetRequestContext = new BatchGetRequestContext<>(); + CompletableFuture> future = + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS); + future.get(2, TimeUnit.SECONDS); + fail(); + } finally { + // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased + metrics = getStats(clientConfig); + TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + }); + validateMultiGetMetrics(false, true, true, false); + tearDown(); + } + } + + /** + * Condition to test: streamingBatchGet(keys) API returns partial results in case of future.get(timeout) + * setup: 1 key returns valid value and the other key doesn't return anything. + * Behavior: streamingBatchGet().get(timeout) times out before routingLeakedRequestCleanupThresholdMS, + * so returns partial response. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetWithTimeoutV3() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + try { + long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(2); + setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS); + batchGetRequestContext = new BatchGetRequestContext<>(); + CompletableFuture> future = + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS); + VeniceResponseMap value = future.get(1, TimeUnit.SECONDS); + assertEquals(value.size(), 1); + assertFalse(value.isFullResponse()); + assertTrue(BATCH_GET_VALUE_RESPONSE.get("test_key_1").contentEquals(value.get("test_key_1"))); + // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased + metrics = getStats(clientConfig); + TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + }); + validateMultiGetMetrics(false, true, true, false); + } finally { + tearDown(); + } + } + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) public void testBatchGetToUnreachableClient(boolean useStreamingBatchGetAsDefault) throws IOException { try { - setUpClient(useStreamingBatchGetAsDefault, false, false, false, 6 * Time.MS_PER_SECOND); + setUpClient(useStreamingBatchGetAsDefault, false, false, false, false, TimeUnit.SECONDS.toMillis(1)); batchGetRequestContext = new BatchGetRequestContext<>(); statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); fail(); diff --git a/clients/venice-thin-client/src/test/java/com/linkedin/venice/client/store/StatTrackingStoreClientTest.java b/clients/venice-thin-client/src/test/java/com/linkedin/venice/client/store/StatTrackingStoreClientTest.java index 5935890273..d6f61c901e 100644 --- a/clients/venice-thin-client/src/test/java/com/linkedin/venice/client/store/StatTrackingStoreClientTest.java +++ b/clients/venice-thin-client/src/test/java/com/linkedin/venice/client/store/StatTrackingStoreClientTest.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.testng.Assert.fail; import com.linkedin.venice.client.exceptions.VeniceClientException; import com.linkedin.venice.client.exceptions.VeniceClientHttpException; @@ -110,6 +111,7 @@ public Schema getLatestValueSchema() { private static class StoreClientForMultiGetStreamTest extends SimpleStoreClient { private final Map resultMap; private final boolean fullResponse; + private final boolean noCompletion; public StoreClientForMultiGetStreamTest( TransportClient transportClient, @@ -117,10 +119,12 @@ public StoreClientForMultiGetStreamTest( boolean needSchemaReader, Executor deserializationExecutor, Map resultMap, - boolean fullResponse) { + boolean fullResponse, + boolean noCompletion) { super(transportClient, storeName, needSchemaReader, deserializationExecutor); this.resultMap = resultMap; this.fullResponse = fullResponse; + this.noCompletion = noCompletion; } @Override @@ -140,11 +144,14 @@ public void streamingBatchGet(final Set keys, StreamingCallback callbac } else { if (keys.size() > 1) { // Only return one result - trackingStreamingCallback.onRecordReceived(keys.iterator().next(), null); + K key = keys.iterator().next(); + trackingStreamingCallback.onRecordReceived(key, resultMap.get(key)); } } - trackingStreamingCallback.onDeserializationCompletion(Optional.empty(), 10, 5); - trackingStreamingCallback.onCompletion(Optional.empty()); + if (!noCompletion) { + trackingStreamingCallback.onDeserializationCompletion(Optional.empty(), 10, 5); + trackingStreamingCallback.onCompletion(Optional.empty()); + } } } } @@ -237,7 +244,8 @@ public void testMultiGet() throws ExecutionException, InterruptedException { true, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), result, - true); + true, + false); StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient<>( innerClient, @@ -279,6 +287,7 @@ public void testMultiGetWithPartialResponse() throws ExecutionException, Interru true, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), Collections.emptyMap(), + false, false); StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient<>( @@ -287,6 +296,66 @@ public void testMultiGetWithPartialResponse() throws ExecutionException, Interru statTrackingStoreClient.batchGet(keySet).get(); } + @Test(expectedExceptions = TimeoutException.class) + public void testMultiGetWithPartialResponseAfterTimeout() + throws ExecutionException, InterruptedException, TimeoutException { + Set keySet = new HashSet<>(); + String keyPrefix = "key_"; + for (int i = 0; i < 10; ++i) { + keySet.add(keyPrefix + i); + } + + MetricsRepository repository = new MetricsRepository(); + + InternalAvroStoreClient innerClient = new StoreClientForMultiGetStreamTest( + mock(TransportClient.class), + storeName, + true, + AbstractAvroStoreClient.getDefaultDeserializationExecutor(), + Collections.emptyMap(), + false, + true); + + StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient<>( + innerClient, + ClientConfig.defaultGenericClientConfig(storeName).setMetricsRepository(repository)); + statTrackingStoreClient.batchGet(keySet).get(2000, TimeUnit.MILLISECONDS); + fail(); + } + + @Test + public void testStreamingMultiGetWithPartialResponseAfterTimeout() + throws ExecutionException, InterruptedException, TimeoutException { + Map result = new HashMap<>(); + Set keySet = new HashSet<>(); + String keyPrefix = "key_"; + for (int i = 0; i < 5; ++i) { + result.put(keyPrefix + i, "value_" + i); + } + for (int i = 0; i < 10; ++i) { + keySet.add(keyPrefix + i); + } + + MetricsRepository repository = new MetricsRepository(); + + InternalAvroStoreClient innerClient = new StoreClientForMultiGetStreamTest( + mock(TransportClient.class), + storeName, + true, + AbstractAvroStoreClient.getDefaultDeserializationExecutor(), + result, + false, + true); + + StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient<>( + innerClient, + ClientConfig.defaultGenericClientConfig(storeName).setMetricsRepository(repository)); + CompletableFuture> future = statTrackingStoreClient.streamingBatchGet(keySet); + VeniceResponseMap batchGetResult = future.get(2000, TimeUnit.MILLISECONDS); + Assert.assertEquals(batchGetResult.size(), 1); + Assert.assertTrue(!batchGetResult.isFullResponse()); + } + @Test public void testGetWithException() throws InterruptedException { CompletableFuture mockInnerFuture = new CompletableFuture(); @@ -301,7 +370,7 @@ public void testGetWithException() throws InterruptedException { ClientConfig.defaultGenericClientConfig(mockStoreClient.getStoreName()).setMetricsRepository(repository)); try { statTrackingStoreClient.get("key").get(); - Assert.fail("ExecutionException should be thrown"); + fail("ExecutionException should be thrown"); } catch (ExecutionException e) { // expected } @@ -342,7 +411,7 @@ public void testMultiGetWithException() throws InterruptedException { keySet.add("key"); try { statTrackingStoreClient.batchGet(keySet).get(); - Assert.fail("ExecutionException should be thrown"); + fail("ExecutionException should be thrown"); } catch (ExecutionException e) { // expected } @@ -472,7 +541,8 @@ public void multiGetStreamTest() { true, AbstractAvroStoreClient.getDefaultDeserializationExecutor(), Collections.emptyMap(), - true); + true, + false); MetricsRepository repository = new MetricsRepository(); StatTrackingStoreClient statTrackingStoreClient = new StatTrackingStoreClient<>( innerClient,