From f3d6c2b38581960cdda356a5ec0f3024a3b9c444 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Tue, 17 Sep 2024 20:36:54 +0530 Subject: [PATCH 1/9] add metrics for async shard fetch Signed-off-by: Rahul Karajgikar --- .../TransportIndicesShardStoresAction.java | 27 +++++++--- .../cluster/ClusterManagerMetrics.java | 13 +++++ .../org/opensearch/cluster/ClusterModule.java | 3 ++ .../gateway/AsyncShardBatchFetch.java | 12 +++-- .../opensearch/gateway/AsyncShardFetch.java | 13 +++-- .../gateway/AsyncShardFetchCache.java | 12 ++++- .../opensearch/gateway/GatewayAllocator.java | 27 ++++++---- .../gateway/ShardsBatchGatewayAllocator.java | 52 ++++++++++++++----- .../gateway/AsyncShardFetchTests.java | 11 +++- .../gateway/ShardBatchCacheTests.java | 5 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../fixtures/azure-fixture/docker-compose.yml | 1 - 12 files changed, 136 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index a8b97d0f344ae..1a3c657f5b1b8 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -37,6 +37,7 @@ import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; @@ -88,6 +89,7 @@ public class TransportIndicesShardStoresAction extends TransportClusterManagerNo private static final Logger logger = LogManager.getLogger(TransportIndicesShardStoresAction.class); private final TransportNodesListGatewayStartedShards listShardStoresInfo; + private final ClusterManagerMetrics clusterManagerMetrics; @Inject public TransportIndicesShardStoresAction( @@ -96,7 +98,8 @@ public TransportIndicesShardStoresAction( ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - TransportNodesListGatewayStartedShards listShardStoresInfo + TransportNodesListGatewayStartedShards listShardStoresInfo, + ClusterManagerMetrics clusterManagerMetrics ) { super( IndicesShardStoresAction.NAME, @@ -109,6 +112,7 @@ public TransportIndicesShardStoresAction( true ); this.listShardStoresInfo = listShardStoresInfo; + this.clusterManagerMetrics = clusterManagerMetrics; } @Override @@ -154,7 +158,7 @@ protected void clusterManagerOperation( // we could fetch all shard store info from every node once (nNodes requests) // we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards // for fetching shard stores info, that operates on a list of shards instead of a single shard - new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener).start(); + new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener, clusterManagerMetrics).start(); } @Override @@ -175,12 +179,14 @@ private class AsyncShardStoresInfoFetches { private final ActionListener listener; private CountDown expectedOps; private final Queue fetchResponses; + private final ClusterManagerMetrics clusterManagerMetrics; AsyncShardStoresInfoFetches( DiscoveryNodes nodes, RoutingNodes routingNodes, Set> shards, - ActionListener listener + ActionListener listener, + ClusterManagerMetrics clusterManagerMetrics ) { this.nodes = nodes; this.routingNodes = routingNodes; @@ -188,6 +194,7 @@ private class AsyncShardStoresInfoFetches { this.listener = listener; this.fetchResponses = new ConcurrentLinkedQueue<>(); this.expectedOps = new CountDown(shards.size()); + this.clusterManagerMetrics = clusterManagerMetrics; } void start() { @@ -195,7 +202,14 @@ void start() { listener.onResponse(new IndicesShardStoresResponse()); } else { for (Tuple shard : shards) { - InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shard.v1(), shard.v2(), listShardStoresInfo); + InternalAsyncFetch fetch = new InternalAsyncFetch( + logger, + "shard_stores", + shard.v1(), + shard.v2(), + listShardStoresInfo, + clusterManagerMetrics + ); fetch.fetchData(nodes, Collections.emptyMap()); } } @@ -213,9 +227,10 @@ private class InternalAsyncFetch extends AsyncShardFetch deciderList; final ShardsAllocator shardsAllocator; + private final ClusterManagerMetrics clusterManagerMetrics; public ClusterModule( Settings settings, @@ -166,6 +167,7 @@ public ClusterModule( settings, clusterManagerMetrics ); + this.clusterManagerMetrics = clusterManagerMetrics; } public static List getNamedWriteables() { @@ -456,6 +458,7 @@ protected void configure() { bind(TaskResultsService.class).asEagerSingleton(); bind(AllocationDeciders.class).toInstance(allocationDeciders); bind(ShardsAllocator.class).toInstance(shardsAllocator); + bind(ClusterManagerMetrics.class).toInstance(clusterManagerMetrics); } public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, ShardsBatchGatewayAllocator shardsBatchGatewayAllocator) { diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java index df642a9f5a743..d86d41bb1a359 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.logging.Loggers; import org.opensearch.core.index.shard.ShardId; @@ -48,7 +49,8 @@ public abstract class AsyncShardBatchFetch extend Class clazz, V emptyShardResponse, Predicate emptyShardResponsePredicate, - ShardBatchResponseFactory responseFactory + ShardBatchResponseFactory responseFactory, + ClusterManagerMetrics clusterManagerMetrics ) { super( logger, @@ -64,7 +66,8 @@ public abstract class AsyncShardBatchFetch extend clazz, emptyShardResponse, emptyShardResponsePredicate, - responseFactory + responseFactory, + clusterManagerMetrics ) ); } @@ -116,9 +119,10 @@ public ShardBatchCache( Class clazz, V emptyResponse, Predicate emptyShardResponsePredicate, - ShardBatchResponseFactory responseFactory + ShardBatchResponseFactory responseFactory, + ClusterManagerMetrics clusterManagerMetrics ) { - super(Loggers.getLogger(logger, "_" + logKey), type); + super(Loggers.getLogger(logger, "_" + logKey), type, clusterManagerMetrics); this.batchSize = shardAttributesMap.size(); this.emptyShardResponsePredicate = emptyShardResponsePredicate; cache = new HashMap<>(); diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index b664dd573ce67..5c0449fc3cc2e 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -35,6 +35,7 @@ import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.allocation.RoutingAllocation; @@ -94,7 +95,8 @@ protected AsyncShardFetch( String type, ShardId shardId, String customDataPath, - Lister, T> action + Lister, T> action, + ClusterManagerMetrics clusterManagerMetrics ) { this.logger = logger; this.type = type; @@ -102,7 +104,7 @@ protected AsyncShardFetch( shardAttributesMap.put(shardId, new ShardAttributes(customDataPath)); this.action = (Lister, T>) action; this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; - cache = new ShardCache<>(logger, reroutingKey, type); + cache = new ShardCache<>(logger, reroutingKey, type, clusterManagerMetrics); } /** @@ -223,7 +225,7 @@ protected synchronized void processAsyncFetch(List responses, List response) { @Override public void onFailure(Exception e) { + // TODO: add failure metrics here List failures = new ArrayList<>(nodes.length); for (final DiscoveryNode node : nodes) { failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e)); @@ -284,8 +287,8 @@ static class ShardCache extends AsyncShardFetchCache private final Map> cache; - public ShardCache(Logger logger, String logKey, String type) { - super(Loggers.getLogger(logger, "_" + logKey), type); + public ShardCache(Logger logger, String logKey, String type, ClusterManagerMetrics clusterManagerMetrics) { + super(Loggers.getLogger(logger, "_" + logKey), type, clusterManagerMetrics); cache = new HashMap<>(); } diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java index 2a4e6181467b0..5bc9910e6edd8 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -51,10 +52,12 @@ public abstract class AsyncShardFetchCache { private final Logger logger; private final String type; + private final ClusterManagerMetrics clusterManagerMetrics; - protected AsyncShardFetchCache(Logger logger, String type) { + protected AsyncShardFetchCache(Logger logger, String type, ClusterManagerMetrics clusterManagerMetrics) { this.logger = logger; this.type = type; + this.clusterManagerMetrics = clusterManagerMetrics; } abstract void initData(DiscoveryNode node); @@ -167,8 +170,10 @@ void processResponses(List responses, long fetchingRound) { if (nodeEntry != null) { if (validateNodeResponse(nodeEntry, fetchingRound)) { // if the entry is there, for the right fetching round and not marked as failed already, process it - logger.trace("marking {} as done for [{}], result is [{}]", nodeEntry.getNodeId(), type, response); + logger.info("marking {} as done for [{}], result is [{}]", nodeEntry.getNodeId(), type, response); putData(response.getNode(), response); + logger.info("incrementing async fetch total counter"); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalFetchesCounter, 1.0); } } } @@ -202,6 +207,9 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail nodeEntry.getFetchingRound(), fetchingRound ); + logger.info("incrementing async fetch total counter and async fetch failure counter"); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalFetchesCounter, 1.0); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, 1.0); } else if (nodeEntry.isFailed() == false) { // if the entry is there, for the right fetching round and not marked as failed already, process it Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause()); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index c8ef9364ebba9..eaacb5dbfbd17 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -92,11 +93,12 @@ public class GatewayAllocator implements ExistingShardsAllocator { public GatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShards startedAction, - TransportNodesListShardStoreMetadata storeAction + TransportNodesListShardStoreMetadata storeAction, + ClusterManagerMetrics clusterManagerMetrics ) { this.rerouteService = rerouteService; - this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); - this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); + this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction, clusterManagerMetrics); + this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction, clusterManagerMetrics); } @Override @@ -251,9 +253,10 @@ class InternalAsyncFetch extends AsyncShardFetch String type, ShardId shardId, String customDataPath, - Lister, T> action + Lister, T> action, + ClusterManagerMetrics clusterManagerMetrics ) { - super(logger, type, shardId, customDataPath, action); + super(logger, type, shardId, customDataPath, action, clusterManagerMetrics); } @Override @@ -274,9 +277,11 @@ protected void reroute(String reroutingKey, String reason) { class InternalPrimaryShardAllocator extends PrimaryShardAllocator { private final TransportNodesListGatewayStartedShards startedAction; + private final ClusterManagerMetrics clusterManagerMetrics; - InternalPrimaryShardAllocator(TransportNodesListGatewayStartedShards startedAction) { + InternalPrimaryShardAllocator(TransportNodesListGatewayStartedShards startedAction, ClusterManagerMetrics clusterManagerMetrics) { this.startedAction = startedAction; + this.clusterManagerMetrics = clusterManagerMetrics; } @Override @@ -291,7 +296,8 @@ protected AsyncShardFetch.FetchResult shardState = fetch.fetchData( @@ -313,9 +319,11 @@ protected AsyncShardFetch.FetchResult shardStores = fetch.fetchData( diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 5e2dcbcd70b40..c77838bf7726d 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -44,6 +45,7 @@ import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import java.util.ArrayList; import java.util.Collections; @@ -81,6 +83,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private TimeValue primaryShardsBatchGatewayAllocatorTimeout; private TimeValue replicaShardsBatchGatewayAllocatorTimeout; public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); + private ClusterManagerMetrics clusterManagerMetrics; /** * Number of shards we send in one batch to data nodes for fetching metadata @@ -160,7 +163,8 @@ public ShardsBatchGatewayAllocator( TransportNodesListGatewayStartedShardsBatch batchStartedAction, TransportNodesListShardStoreMetadataBatch batchStoreAction, Settings settings, - ClusterSettings clusterSettings + ClusterSettings clusterSettings, + ClusterManagerMetrics clusterManagerMetrics ) { this.rerouteService = rerouteService; this.primaryShardBatchAllocator = new InternalPrimaryBatchShardAllocator(); @@ -172,6 +176,7 @@ public ShardsBatchGatewayAllocator( clusterSettings.addSettingsUpdateConsumer(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setPrimaryBatchAllocatorTimeout); this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setReplicaBatchAllocatorTimeout); + this.clusterManagerMetrics = clusterManagerMetrics; } @Override @@ -187,6 +192,7 @@ protected ShardsBatchGatewayAllocator() { this(DEFAULT_SHARD_BATCH_SIZE, null); } + // for tests protected ShardsBatchGatewayAllocator(long batchSize, RerouteService rerouteService) { this.rerouteService = rerouteService; this.batchStartedAction = null; @@ -196,10 +202,9 @@ protected ShardsBatchGatewayAllocator(long batchSize, RerouteService rerouteServ this.maxBatchSize = batchSize; this.primaryShardsBatchGatewayAllocatorTimeout = null; this.replicaShardsBatchGatewayAllocatorTimeout = null; + this.clusterManagerMetrics = new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE); } - // for tests - @Override public int getNumberOfInFlightFetches() { int count = 0; @@ -413,7 +418,7 @@ else if (shardRouting.primary() == primary) { // add to batch if batch size full or last shard in unassigned list if (batchSize == 0 || iterator.hasNext() == false) { String batchUUId = UUIDs.base64UUID(); - ShardsBatch shardsBatch = new ShardsBatch(batchUUId, perBatchShards, primary); + ShardsBatch shardsBatch = new ShardsBatch(batchUUId, perBatchShards, primary, clusterManagerMetrics); // add the batch to list of current batches addBatch(shardsBatch, primary); batchesToBeAssigned.add(batchUUId); @@ -588,9 +593,21 @@ class InternalBatchAsyncFetch extends AsyncShardB Class clazz, V emptyShardResponse, Predicate emptyShardResponsePredicate, - ShardBatchResponseFactory responseFactory + ShardBatchResponseFactory responseFactory, + ClusterManagerMetrics clusterManagerMetrics ) { - super(logger, type, map, action, batchUUId, clazz, emptyShardResponse, emptyShardResponsePredicate, responseFactory); + super( + logger, + type, + map, + action, + batchUUId, + clazz, + emptyShardResponse, + emptyShardResponsePredicate, + responseFactory, + clusterManagerMetrics + ); } @Override @@ -650,16 +667,17 @@ protected boolean hasInitiatedFetching(ShardRouting shard) { * It should return false if there has never been a fetch for this batch. * This function is currently only used in the case of replica shards when all deciders returned NO/THROTTLE, and explain mode is ON. * Allocation explain and manual reroute APIs try to append shard store information (matching bytes) to the allocation decision. - * However, these APIs do not want to trigger a new asyncFetch for these ineligible shards, unless the data from nodes is already there. + * However, these APIs do not want to trigger a new asyncFetch for these ineligible shards + * They only want to use the data if it is already available. * This function is used to see if a fetch has happened to decide if it is possible to append shard store info without a new async fetch. * In the case when shard has a batch but no fetch has happened before, it would be because it is a new batch. * In the case when shard has a batch, and a fetch has happened before, and no fetch is ongoing, it would be because we have already completed fetch for all nodes. - * + *

* In order to check if a fetch has ever happened, we check 2 things: * 1. If the shard batch cache is empty, we know that fetch has never happened so we return false. * 2. If we see that the list of nodes to fetch from is empty, we know that all nodes have data or are ongoing a fetch. So we return true. * 3. Otherwise we return false. - * + *

* see {@link AsyncShardFetchCache#findNodesToFetch()} */ String batchId = getBatchId(shard, shard.primary()); @@ -669,7 +687,8 @@ protected boolean hasInitiatedFetching(ShardRouting shard) { logger.trace("Checking if fetching done for batch id {}", batchId); ShardsBatch shardsBatch = shard.primary() ? batchIdToStartedShardBatch.get(batchId) : batchIdToStoreShardBatch.get(batchId); // if fetchData has never been called, the per node cache will be empty and have no nodes - // this is because cache.fillShardCacheWithDataNodes(nodes) initialises this map and is called in AsyncShardFetch.fetchData + /// this is because {@link AsyncShardFetchCache#fillShardCacheWithDataNodes(DiscoveryNodes)} initialises this map + /// and is called in {@link AsyncShardFetch#fetchData(DiscoveryNodes, Map)} if (shardsBatch == null || shardsBatch.getAsyncFetcher().hasEmptyCache()) { logger.trace("Batch cache is empty for batch {} ", batchId); return false; @@ -739,7 +758,12 @@ public class ShardsBatch { private final Map batchInfo; - public ShardsBatch(String batchId, Map shardsWithInfo, boolean primary) { + public ShardsBatch( + String batchId, + Map shardsWithInfo, + boolean primary, + ClusterManagerMetrics clusterManagerMetrics + ) { this.batchId = batchId; this.batchInfo = new HashMap<>(shardsWithInfo); // create a ShardId -> customDataPath map for async fetch @@ -757,7 +781,8 @@ public ShardsBatch(String batchId, Map shardsWithInfo, bool GatewayStartedShard.class, new GatewayStartedShard(null, false, null, null), GatewayStartedShard::isEmpty, - new ShardBatchResponseFactory<>(true) + new ShardBatchResponseFactory<>(true), + clusterManagerMetrics ); } else { asyncBatch = new InternalBatchAsyncFetch<>( @@ -769,7 +794,8 @@ public ShardsBatch(String batchId, Map shardsWithInfo, bool NodeStoreFilesMetadata.class, new NodeStoreFilesMetadata(new StoreFilesMetadata(null, Store.MetadataSnapshot.EMPTY, Collections.emptyList()), null), NodeStoreFilesMetadata::isEmpty, - new ShardBatchResponseFactory<>(false) + new ShardBatchResponseFactory<>(false), + clusterManagerMetrics ); } } diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index db97c3ece94ba..8157703c3e2e9 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -35,10 +35,12 @@ import org.opensearch.Version; import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -399,7 +401,14 @@ static class Entry { private AtomicInteger reroute = new AtomicInteger(); TestFetch(ThreadPool threadPool) { - super(LogManager.getLogger(TestFetch.class), "test", new ShardId("test", "_na_", 1), "", null); + super( + LogManager.getLogger(TestFetch.class), + "test", + new ShardId("test", "_na_", 1), + "", + null, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); this.threadPool = threadPool; } diff --git a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java index 12030ad41d508..39c4ee8c8ca06 100644 --- a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java +++ b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java @@ -8,6 +8,7 @@ package org.opensearch.gateway; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -19,6 +20,7 @@ import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; import org.opensearch.indices.store.ShardAttributes; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import java.util.ArrayList; import java.util.HashMap; @@ -52,7 +54,8 @@ public void setupShardBatchCache(String batchId, int numberOfShards) { GatewayStartedShard.class, new GatewayStartedShard(null, false, null, null), GatewayStartedShard::isEmpty, - new ShardBatchResponseFactory<>(true) + new ShardBatchResponseFactory<>(true), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 440227436175d..d17e661615b0d 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2429,7 +2429,8 @@ public void onFailure(final Exception e) { nodeEnv, indicesService, namedXContentRegistry - ) + ), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ) ); actions.put( diff --git a/test/fixtures/azure-fixture/docker-compose.yml b/test/fixtures/azure-fixture/docker-compose.yml index 85e073e1803c1..c68d4739d1fd9 100644 --- a/test/fixtures/azure-fixture/docker-compose.yml +++ b/test/fixtures/azure-fixture/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3' services: azure-fixture: build: From d5e7c8e2b5d764813157ff3ff202dfc0e6d5e435 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Wed, 18 Sep 2024 13:58:35 +0530 Subject: [PATCH 2/9] add UT and remove stale logs Signed-off-by: Rahul Karajgikar --- .../opensearch/gateway/AsyncShardFetch.java | 1 - .../gateway/AsyncShardFetchCache.java | 10 +- .../gateway/ShardsBatchGatewayAllocator.java | 2 +- .../gateway/AsyncShardFetchTests.java | 174 +++++++++++++++++- .../fixtures/azure-fixture/docker-compose.yml | 1 + 5 files changed, 174 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 5c0449fc3cc2e..076368701e521 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -266,7 +266,6 @@ public void onResponse(BaseNodesResponse response) { @Override public void onFailure(Exception e) { - // TODO: add failure metrics here List failures = new ArrayList<>(nodes.length); for (final DiscoveryNode node : nodes) { failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e)); diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java index 5bc9910e6edd8..c84634b56039b 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java @@ -167,13 +167,12 @@ Map getCacheData(DiscoveryNodes nodes, Set failedNodes void processResponses(List responses, long fetchingRound) { for (K response : responses) { BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId()); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalFetchesCounter, 1.0); if (nodeEntry != null) { if (validateNodeResponse(nodeEntry, fetchingRound)) { // if the entry is there, for the right fetching round and not marked as failed already, process it - logger.info("marking {} as done for [{}], result is [{}]", nodeEntry.getNodeId(), type, response); + logger.trace("marking {} as done for [{}], result is [{}]", nodeEntry.getNodeId(), type, response); putData(response.getNode(), response); - logger.info("incrementing async fetch total counter"); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalFetchesCounter, 1.0); } } } @@ -198,6 +197,8 @@ private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound } private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) { + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalFetchesCounter, 1.0); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, 1.0); if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( @@ -207,9 +208,6 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail nodeEntry.getFetchingRound(), fetchingRound ); - logger.info("incrementing async fetch total counter and async fetch failure counter"); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalFetchesCounter, 1.0); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, 1.0); } else if (nodeEntry.isFailed() == false) { // if the entry is there, for the right fetching round and not marked as failed already, process it Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause()); diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index c77838bf7726d..d7c0a66ba3424 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -83,7 +83,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private TimeValue primaryShardsBatchGatewayAllocatorTimeout; private TimeValue replicaShardsBatchGatewayAllocatorTimeout; public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); - private ClusterManagerMetrics clusterManagerMetrics; + private final ClusterManagerMetrics clusterManagerMetrics; /** * Number of shards we send in one batch to data nodes for fetching metadata diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 8157703c3e2e9..92d5a7fdd6ba5 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -40,7 +40,8 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -56,6 +57,12 @@ import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.anyDouble; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class AsyncShardFetchTests extends OpenSearchTestCase { private final DiscoveryNode node1 = new DiscoveryNode( @@ -80,13 +87,29 @@ public class AsyncShardFetchTests extends OpenSearchTestCase { private ThreadPool threadPool; private TestFetch test; + private Counter asyncFetchTotalCounter; + private Counter asyncFetchFailureCounter; + private Counter dummyCounter; @Override @Before public void setUp() throws Exception { super.setUp(); this.threadPool = new TestThreadPool(getTestName()); - this.test = new TestFetch(threadPool); + final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + this.asyncFetchFailureCounter = mock(Counter.class); + this.asyncFetchTotalCounter = mock(Counter.class); + this.dummyCounter = mock(Counter.class); + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer(invocationOnMock -> { + String counterName = (String) invocationOnMock.getArguments()[0]; + if (counterName.contains("allocation.reroute.async.fetch.total.count")) { + return asyncFetchTotalCounter; + } else if (counterName.contains("allocation.reroute.async.fetch.failure.count")) { + return asyncFetchFailureCounter; + } + return dummyCounter; + }); + this.test = new TestFetch(threadPool, metricsRegistry); } @After @@ -102,14 +125,24 @@ public void testClose() throws Exception { AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); + // counter remains 0 because fetch is ongoing + verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); + // counter goes up because fetch completed + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); + // verify we get back the data node assertThat(test.reroute.get(), equalTo(1)); test.close(); try { test.fetchData(nodes, emptyMap()); + // counter should not go up when calling fetchData since fetch never completed + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); fail("fetch data should fail when closed"); } catch (IllegalStateException e) { // all is well @@ -127,12 +160,19 @@ public void testFullCircleSingleNodeSuccess() throws Exception { // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); + // total counter goes up by 1 after success + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); + // verify we get back the data node assertThat(test.reroute.get(), equalTo(1)); fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); + // counter remains same because fetchData does not trigger new async fetch + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } public void testFullCircleSingleNodeFailure() throws Exception { @@ -147,24 +187,41 @@ public void testFullCircleSingleNodeFailure() throws Exception { // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); + // Failure results in increased counter for failure and total count + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); + // failure, fetched data exists, but has no data assertThat(test.reroute.get(), equalTo(1)); fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); + // counter remains same because fetchData does not trigger new async fetch + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // on failure, we reset the failure on a successive call to fetchData, and try again afterwards test.addSimulation(node1.getId(), response1); fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); + // No additional failure, empty data so no change in counter + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); + // Success will increase total counter but not failure counter + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); + // 2 reroutes, cause we have a failure that we clear assertThat(test.reroute.get(), equalTo(3)); fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); + // counter remains same because fetchData does not trigger new async fetch + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); } public void testIgnoreResponseFromDifferentRound() throws Exception { @@ -175,20 +232,33 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); + // counter 0 because fetchData is not completed + verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // handle a response with incorrect round id, wait on reroute incrementing test.processAsyncFetch(Collections.singletonList(response1), Collections.emptyList(), 0); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(1)); + // counter increments to 1 because we called processAsyncFetch with a valid response, even though the round was incorrect + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire a response (with correct round id), wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); + // total counter now goes up by 1 because fetchData completed + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); + // verify we get back the data node assertThat(test.reroute.get(), equalTo(2)); fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); + // total counter remains same because fetchdata does not trigger new async fetch + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } public void testIgnoreFailureFromDifferentRound() throws Exception { @@ -200,6 +270,9 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); + // counter 0 because fetchData still ongoing + verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // handle a failure with incorrect round id, wait on reroute incrementing test.processAsyncFetch( @@ -209,14 +282,23 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { ); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(1)); + // total and failure counter go up by 1 because of the failure + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); + // total and failure counter go up by 1 because of the failure + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(2)).add(anyDouble()); // failure, fetched data exists, but has no data assertThat(test.reroute.get(), equalTo(2)); fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); + // total and failure counter remain same because fetchData does not trigger new async fetch + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(2)).add(anyDouble()); } public void testTwoNodesOnSetup() throws Exception { @@ -228,16 +310,29 @@ public void testTwoNodesOnSetup() throws Exception { AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); + // counter 0 because fetch ongoing + verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); + // counter 1 because one fetch completed + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); + // there is still another on going request, so no data assertThat(test.getNumberOfInFlightFetches(), equalTo(1)); fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); + // counter still 1 because fetchData did not trigger new async fetch + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); + // counter 2 because 2 fetches completed + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // no more ongoing requests, we should fetch the data assertThat(test.reroute.get(), equalTo(2)); fetchData = test.fetchData(nodes, emptyMap()); @@ -245,6 +340,9 @@ public void testTwoNodesOnSetup() throws Exception { assertThat(fetchData.getData().size(), equalTo(2)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); assertThat(fetchData.getData().get(node2), sameInstance(response2)); + // counter still 2 because fetchData call did not trigger new async fetch + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } public void testTwoNodesOnSetupAndFailure() throws Exception { @@ -256,34 +354,53 @@ public void testTwoNodesOnSetupAndFailure() throws Exception { AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); + // counter 0 because both fetches ongoing + verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); + // counter 1 because one fetch completed + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); + // failure counter up by 1 because one fetch failed + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); assertThat(test.reroute.get(), equalTo(2)); + // since one of those failed, we should only have one entry fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); + // total and failure counters same because fetchData did not trigger new async fetch + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); } public void testTwoNodesAddedInBetween() throws Exception { DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build(); test.addSimulation(node1.getId(), response1); - // no fetched data, 2 requests still on going + // no fetched data, request still on going AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); + // counter 0 because both fetches ongoing + verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); + // counter 1 because fetch completed + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // now, add a second node to the nodes, it should add it to the ongoing requests nodes = DiscoveryNodes.builder(nodes).add(node2).build(); @@ -291,16 +408,25 @@ public void testTwoNodesAddedInBetween() throws Exception { // no fetch data, has a new node introduced fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); + // counter still 1 because second fetch ongoing + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); + // counter now 2 because 2 fetches completed + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); - // since one of those failed, we should only have one entry + // since both succeeded, we should have 2 entries fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(2)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); assertThat(fetchData.getData().get(node2), sameInstance(response2)); + // counter still 2 because fetchData did not trigger new async fetch + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } public void testClearCache() throws Exception { @@ -314,21 +440,33 @@ public void testClearCache() throws Exception { AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); + // counter 0 because fetch ongoing + verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); + // counter 1 because 1 fetch completed + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // verify we get back right data from node fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); + // counter still 1 because a new fetch is not called + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // second fetch gets same data fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); + // counter still 1 because a new fetch is not called + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.clearCacheForNode(node1.getId()); @@ -338,15 +476,24 @@ public void testClearCache() throws Exception { // no fetched data, new request on going fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); + // counter still 1 because new fetch is still ongoing + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); + // counter now 2 because second fetch completed + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // verify we get new data back fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); + // counter still 2 because fetchData did not trigger new async fetch + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } public void testConcurrentRequestAndClearCache() throws Exception { @@ -357,12 +504,18 @@ public void testConcurrentRequestAndClearCache() throws Exception { AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); + // counter 0 because fetch ongoing + verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // clear cache while request is still on going, before it is processed test.clearCacheForNode(node1.getId()); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); + // counter 1 because fetch completed, even though cache was wiped + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // prepare next request test.addSimulation(node1.getId(), response1_2); @@ -370,15 +523,24 @@ public void testConcurrentRequestAndClearCache() throws Exception { // verify still no fetched data, request still on going fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); + // counter unchanged because fetch ongoing + verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); + // counter 2 because second fetch completed + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // verify we get new data back fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); + // counter unchanged because fetchData does not trigger new async fetch + verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -400,14 +562,14 @@ static class Entry { private final Map simulations = new ConcurrentHashMap<>(); private AtomicInteger reroute = new AtomicInteger(); - TestFetch(ThreadPool threadPool) { + TestFetch(ThreadPool threadPool, MetricsRegistry metricsRegistry) { super( LogManager.getLogger(TestFetch.class), "test", new ShardId("test", "_na_", 1), "", null, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(metricsRegistry) ); this.threadPool = threadPool; } diff --git a/test/fixtures/azure-fixture/docker-compose.yml b/test/fixtures/azure-fixture/docker-compose.yml index c68d4739d1fd9..85e073e1803c1 100644 --- a/test/fixtures/azure-fixture/docker-compose.yml +++ b/test/fixtures/azure-fixture/docker-compose.yml @@ -1,3 +1,4 @@ +version: '3' services: azure-fixture: build: From 6ed2e911260fd912d4920b9976bd699ae739927d Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Wed, 18 Sep 2024 14:11:26 +0530 Subject: [PATCH 3/9] rename metric Signed-off-by: Rahul Karajgikar --- .../org/opensearch/cluster/ClusterManagerMetrics.java | 8 ++++---- .../java/org/opensearch/gateway/AsyncShardFetchCache.java | 4 ++-- .../java/org/opensearch/gateway/AsyncShardFetchTests.java | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java index ab9a56e5de5d6..43902fb0f647d 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java @@ -35,7 +35,7 @@ public final class ClusterManagerMetrics { public final Counter leaderCheckFailureCounter; public final Counter followerChecksFailureCounter; public final Counter asyncFetchFailureCounter; - public final Counter asyncFetchTotalFetchesCounter; + public final Counter asyncFetchTotalCounter; public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { clusterStateAppliersHistogram = metricsRegistry.createHistogram( @@ -74,12 +74,12 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { COUNTER_METRICS_UNIT ); asyncFetchFailureCounter = metricsRegistry.createCounter( - "allocation.reroute.async.fetch.failure.count", + "async.fetch.failure.count", "Counter for number of failed async fetches", COUNTER_METRICS_UNIT ); - asyncFetchTotalFetchesCounter = metricsRegistry.createCounter( - "allocation.reroute.async.fetch.total.count", + asyncFetchTotalCounter = metricsRegistry.createCounter( + "async.fetch.total.count", "Counter for total number of async fetches", COUNTER_METRICS_UNIT ); diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java index c84634b56039b..3a928444f7e23 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java @@ -167,7 +167,7 @@ Map getCacheData(DiscoveryNodes nodes, Set failedNodes void processResponses(List responses, long fetchingRound) { for (K response : responses) { BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId()); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalFetchesCounter, 1.0); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalCounter, 1.0); if (nodeEntry != null) { if (validateNodeResponse(nodeEntry, fetchingRound)) { // if the entry is there, for the right fetching round and not marked as failed already, process it @@ -197,7 +197,7 @@ private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound } private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) { - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalFetchesCounter, 1.0); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalCounter, 1.0); clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, 1.0); if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 92d5a7fdd6ba5..514eada23a5e6 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -102,9 +102,9 @@ public void setUp() throws Exception { this.dummyCounter = mock(Counter.class); when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer(invocationOnMock -> { String counterName = (String) invocationOnMock.getArguments()[0]; - if (counterName.contains("allocation.reroute.async.fetch.total.count")) { + if (counterName.contains("async.fetch.total.count")) { return asyncFetchTotalCounter; - } else if (counterName.contains("allocation.reroute.async.fetch.failure.count")) { + } else if (counterName.contains("async.fetch.failure.count")) { return asyncFetchFailureCounter; } return dummyCounter; From fcf6ac064434a9ba18d1719b334cf883adfe2da6 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Mon, 23 Sep 2024 13:59:06 +0530 Subject: [PATCH 4/9] use success metric instead of total metric Signed-off-by: Rahul Karajgikar --- .../cluster/ClusterManagerMetrics.java | 6 +- .../gateway/AsyncShardFetchCache.java | 5 +- .../gateway/AsyncShardFetchTests.java | 107 +++++++++--------- 3 files changed, 58 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java index 43902fb0f647d..f5ff6616bc559 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java @@ -35,7 +35,7 @@ public final class ClusterManagerMetrics { public final Counter leaderCheckFailureCounter; public final Counter followerChecksFailureCounter; public final Counter asyncFetchFailureCounter; - public final Counter asyncFetchTotalCounter; + public final Counter asyncFetchSuccessCounter; public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { clusterStateAppliersHistogram = metricsRegistry.createHistogram( @@ -78,8 +78,8 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { "Counter for number of failed async fetches", COUNTER_METRICS_UNIT ); - asyncFetchTotalCounter = metricsRegistry.createCounter( - "async.fetch.total.count", + asyncFetchSuccessCounter = metricsRegistry.createCounter( + "async.fetch.success.count", "Counter for total number of async fetches", COUNTER_METRICS_UNIT ); diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java index 3a928444f7e23..be7dc3a1b3690 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java @@ -167,7 +167,7 @@ Map getCacheData(DiscoveryNodes nodes, Set failedNodes void processResponses(List responses, long fetchingRound) { for (K response : responses) { BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId()); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalCounter, 1.0); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchSuccessCounter, 1.0); if (nodeEntry != null) { if (validateNodeResponse(nodeEntry, fetchingRound)) { // if the entry is there, for the right fetching round and not marked as failed already, process it @@ -197,8 +197,6 @@ private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound } private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) { - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchTotalCounter, 1.0); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, 1.0); if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( @@ -230,6 +228,7 @@ boolean retryableException(Throwable unwrappedCause) { void processFailures(List failures, long fetchingRound) { for (FailedNodeException failure : failures) { logger.trace("processing failure {} for [{}]", failure, type); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, 1.0); BaseNodeEntry nodeEntry = getCache().get(failure.nodeId()); if (nodeEntry != null) { handleNodeFailure(nodeEntry, failure, fetchingRound); diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 514eada23a5e6..2981c4dc0834b 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -87,7 +87,7 @@ public class AsyncShardFetchTests extends OpenSearchTestCase { private ThreadPool threadPool; private TestFetch test; - private Counter asyncFetchTotalCounter; + private Counter asyncFetchSuccessCounter; private Counter asyncFetchFailureCounter; private Counter dummyCounter; @@ -98,12 +98,12 @@ public void setUp() throws Exception { this.threadPool = new TestThreadPool(getTestName()); final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); this.asyncFetchFailureCounter = mock(Counter.class); - this.asyncFetchTotalCounter = mock(Counter.class); + this.asyncFetchSuccessCounter = mock(Counter.class); this.dummyCounter = mock(Counter.class); when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenAnswer(invocationOnMock -> { String counterName = (String) invocationOnMock.getArguments()[0]; - if (counterName.contains("async.fetch.total.count")) { - return asyncFetchTotalCounter; + if (counterName.contains("async.fetch.success.count")) { + return asyncFetchSuccessCounter; } else if (counterName.contains("async.fetch.failure.count")) { return asyncFetchFailureCounter; } @@ -126,13 +126,13 @@ public void testClose() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // counter remains 0 because fetch is ongoing - verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); // counter goes up because fetch completed - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // verify we get back the data node @@ -141,7 +141,7 @@ public void testClose() throws Exception { try { test.fetchData(nodes, emptyMap()); // counter should not go up when calling fetchData since fetch never completed - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); fail("fetch data should fail when closed"); } catch (IllegalStateException e) { @@ -161,7 +161,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); // total counter goes up by 1 after success - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // verify we get back the data node @@ -171,7 +171,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // counter remains same because fetchData does not trigger new async fetch - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -187,8 +187,7 @@ public void testFullCircleSingleNodeFailure() throws Exception { // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); - // Failure results in increased counter for failure and total count - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // failure, fetched data exists, but has no data @@ -197,7 +196,7 @@ public void testFullCircleSingleNodeFailure() throws Exception { assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); // counter remains same because fetchData does not trigger new async fetch - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // on failure, we reset the failure on a successive call to fetchData, and try again afterwards @@ -205,12 +204,12 @@ public void testFullCircleSingleNodeFailure() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // No additional failure, empty data so no change in counter - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); - // Success will increase total counter but not failure counter - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + // Success counter will increase + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // 2 reroutes, cause we have a failure that we clear @@ -220,7 +219,7 @@ public void testFullCircleSingleNodeFailure() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // counter remains same because fetchData does not trigger new async fetch - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); } @@ -233,7 +232,7 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // counter 0 because fetchData is not completed - verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // handle a response with incorrect round id, wait on reroute incrementing @@ -241,13 +240,13 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(1)); // counter increments to 1 because we called processAsyncFetch with a valid response, even though the round was incorrect - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire a response (with correct round id), wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); // total counter now goes up by 1 because fetchData completed - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // verify we get back the data node @@ -257,7 +256,7 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // total counter remains same because fetchdata does not trigger new async fetch - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -271,7 +270,7 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // counter 0 because fetchData still ongoing - verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // handle a failure with incorrect round id, wait on reroute incrementing @@ -282,22 +281,22 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { ); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(1)); - // total and failure counter go up by 1 because of the failure - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + // failure counter goes up by 1 because of the failure + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); - // total and failure counter go up by 1 because of the failure - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + // failure counter goes up by 1 because of the failure + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(2)).add(anyDouble()); // failure, fetched data exists, but has no data assertThat(test.reroute.get(), equalTo(2)); fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); - // total and failure counter remain same because fetchData does not trigger new async fetch - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + // counters remain same because fetchData does not trigger new async fetch + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(2)).add(anyDouble()); } @@ -311,13 +310,13 @@ public void testTwoNodesOnSetup() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // counter 0 because fetch ongoing - verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); // counter 1 because one fetch completed - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // there is still another on going request, so no data @@ -325,13 +324,13 @@ public void testTwoNodesOnSetup() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter still 1 because fetchData did not trigger new async fetch - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // counter 2 because 2 fetches completed - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // no more ongoing requests, we should fetch the data assertThat(test.reroute.get(), equalTo(2)); @@ -341,7 +340,7 @@ public void testTwoNodesOnSetup() throws Exception { assertThat(fetchData.getData().get(node1), sameInstance(response1)); assertThat(fetchData.getData().get(node2), sameInstance(response2)); // counter still 2 because fetchData call did not trigger new async fetch - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -355,7 +354,7 @@ public void testTwoNodesOnSetupAndFailure() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // counter 0 because both fetches ongoing - verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the first response, it should trigger a reroute @@ -364,13 +363,13 @@ public void testTwoNodesOnSetupAndFailure() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter 1 because one fetch completed - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // failure counter up by 1 because one fetch failed - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); assertThat(test.reroute.get(), equalTo(2)); @@ -379,8 +378,8 @@ public void testTwoNodesOnSetupAndFailure() throws Exception { assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); - // total and failure counters same because fetchData did not trigger new async fetch - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + // success and failure counters same because fetchData did not trigger new async fetch + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); } @@ -393,13 +392,13 @@ public void testTwoNodesAddedInBetween() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // counter 0 because both fetches ongoing - verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); // counter 1 because fetch completed - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // now, add a second node to the nodes, it should add it to the ongoing requests @@ -409,13 +408,13 @@ public void testTwoNodesAddedInBetween() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter still 1 because second fetch ongoing - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // counter now 2 because 2 fetches completed - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // since both succeeded, we should have 2 entries @@ -425,7 +424,7 @@ public void testTwoNodesAddedInBetween() throws Exception { assertThat(fetchData.getData().get(node1), sameInstance(response1)); assertThat(fetchData.getData().get(node2), sameInstance(response2)); // counter still 2 because fetchData did not trigger new async fetch - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -441,13 +440,13 @@ public void testClearCache() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // counter 0 because fetch ongoing - verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); // counter 1 because 1 fetch completed - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // verify we get back right data from node @@ -456,7 +455,7 @@ public void testClearCache() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // counter still 1 because a new fetch is not called - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // second fetch gets same data @@ -465,7 +464,7 @@ public void testClearCache() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // counter still 1 because a new fetch is not called - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.clearCacheForNode(node1.getId()); @@ -477,13 +476,13 @@ public void testClearCache() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter still 1 because new fetch is still ongoing - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); // counter now 2 because second fetch completed - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // verify we get new data back @@ -492,7 +491,7 @@ public void testClearCache() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); // counter still 2 because fetchData did not trigger new async fetch - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -505,7 +504,7 @@ public void testConcurrentRequestAndClearCache() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // counter 0 because fetch ongoing - verify(asyncFetchTotalCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // clear cache while request is still on going, before it is processed @@ -514,7 +513,7 @@ public void testConcurrentRequestAndClearCache() throws Exception { test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); // counter 1 because fetch completed, even though cache was wiped - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // prepare next request @@ -524,13 +523,13 @@ public void testConcurrentRequestAndClearCache() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter unchanged because fetch ongoing - verify(asyncFetchTotalCounter, times(1)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); // counter 2 because second fetch completed - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // verify we get new data back @@ -539,7 +538,7 @@ public void testConcurrentRequestAndClearCache() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); // counter unchanged because fetchData does not trigger new async fetch - verify(asyncFetchTotalCounter, times(2)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } From e02c51cd2ead60029a7ccef56951c7f41725a254 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Mon, 23 Sep 2024 18:06:06 +0530 Subject: [PATCH 5/9] empty commit to rerun gradle check Signed-off-by: Rahul Karajgikar From 9deb4de44f63dd6e2423b1773e029132c431b269 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Wed, 25 Sep 2024 16:53:05 +0530 Subject: [PATCH 6/9] calculate metrics based on list size, add stronger assertions to tests Signed-off-by: Rahul Karajgikar --- .../cluster/ClusterManagerMetrics.java | 2 +- .../gateway/AsyncShardFetchCache.java | 4 +- .../gateway/AsyncShardFetchTests.java | 64 ++++++++++++++++--- 3 files changed, 59 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java index f5ff6616bc559..ba63b3fd2d6d0 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java @@ -80,7 +80,7 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { ); asyncFetchSuccessCounter = metricsRegistry.createCounter( "async.fetch.success.count", - "Counter for total number of async fetches", + "Counter for number of successful async fetches", COUNTER_METRICS_UNIT ); diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java index be7dc3a1b3690..9b0a95f611e0e 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java @@ -165,9 +165,9 @@ Map getCacheData(DiscoveryNodes nodes, Set failedNodes } void processResponses(List responses, long fetchingRound) { + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchSuccessCounter, Double.valueOf(responses.size())); for (K response : responses) { BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId()); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchSuccessCounter, 1.0); if (nodeEntry != null) { if (validateNodeResponse(nodeEntry, fetchingRound)) { // if the entry is there, for the right fetching round and not marked as failed already, process it @@ -226,9 +226,9 @@ boolean retryableException(Throwable unwrappedCause) { } void processFailures(List failures, long fetchingRound) { + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, Double.valueOf(failures.size())); for (FailedNodeException failure : failures) { logger.trace("processing failure {} for [{}]", failure, type); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, 1.0); BaseNodeEntry nodeEntry = getCache().get(failure.nodeId()); if (nodeEntry != null) { handleNodeFailure(nodeEntry, failure, fetchingRound); diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 2981c4dc0834b..c25150873a1ce 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -132,6 +132,7 @@ public void testClose() throws Exception { // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); // counter goes up because fetch completed + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -141,6 +142,7 @@ public void testClose() throws Exception { try { test.fetchData(nodes, emptyMap()); // counter should not go up when calling fetchData since fetch never completed + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); fail("fetch data should fail when closed"); @@ -161,6 +163,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); // total counter goes up by 1 after success + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -171,6 +174,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // counter remains same because fetchData does not trigger new async fetch + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -188,6 +192,7 @@ public void testFullCircleSingleNodeFailure() throws Exception { // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(1.0); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // failure, fetched data exists, but has no data @@ -197,6 +202,7 @@ public void testFullCircleSingleNodeFailure() throws Exception { assertThat(fetchData.getData().size(), equalTo(0)); // counter remains same because fetchData does not trigger new async fetch verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(1.0); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // on failure, we reset the failure on a successive call to fetchData, and try again afterwards @@ -205,11 +211,14 @@ public void testFullCircleSingleNodeFailure() throws Exception { assertThat(fetchData.hasData(), equalTo(false)); // No additional failure, empty data so no change in counter verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(1.0); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); // Success counter will increase + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(1.0); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // 2 reroutes, cause we have a failure that we clear @@ -219,7 +228,9 @@ public void testFullCircleSingleNodeFailure() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // counter remains same because fetchData does not trigger new async fetch + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(1.0); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); } @@ -239,15 +250,20 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { test.processAsyncFetch(Collections.singletonList(response1), Collections.emptyList(), 0); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(1)); - // counter increments to 1 because we called processAsyncFetch with a valid response, even though the round was incorrect + // success counter increments to 1 because we called processAsyncFetch with a valid response, even though the round was incorrect + // failure counter also increments by 1 with empty list + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); - verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(0.0); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // fire a response (with correct round id), wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); - // total counter now goes up by 1 because fetchData completed + // success counter now goes up by 1 because fetchData completed + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); - verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(0.0); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // verify we get back the data node assertThat(test.reroute.get(), equalTo(2)); @@ -256,8 +272,10 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // total counter remains same because fetchdata does not trigger new async fetch + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); - verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(0.0); + verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); } public void testIgnoreFailureFromDifferentRound() throws Exception { @@ -281,14 +299,19 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { ); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(1)); + // success counter called with empty list // failure counter goes up by 1 because of the failure - verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(0.0); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(1.0); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); // fire a response, wait on reroute incrementing test.fireSimulationAndWait(node1.getId()); // failure counter goes up by 1 because of the failure - verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(0.0); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(2)).add(1.0); verify(asyncFetchFailureCounter, times(2)).add(anyDouble()); // failure, fetched data exists, but has no data assertThat(test.reroute.get(), equalTo(2)); @@ -296,7 +319,9 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); // counters remain same because fetchData does not trigger new async fetch - verify(asyncFetchSuccessCounter, times(0)).add(anyDouble()); + verify(asyncFetchSuccessCounter, times(1)).add(0.0); + verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(2)).add(1.0); verify(asyncFetchFailureCounter, times(2)).add(anyDouble()); } @@ -316,6 +341,7 @@ public void testTwoNodesOnSetup() throws Exception { // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); // counter 1 because one fetch completed + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -324,12 +350,14 @@ public void testTwoNodesOnSetup() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter still 1 because fetchData did not trigger new async fetch + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // counter 2 because 2 fetches completed + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // no more ongoing requests, we should fetch the data @@ -340,6 +368,7 @@ public void testTwoNodesOnSetup() throws Exception { assertThat(fetchData.getData().get(node1), sameInstance(response1)); assertThat(fetchData.getData().get(node2), sameInstance(response2)); // counter still 2 because fetchData call did not trigger new async fetch + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -363,13 +392,16 @@ public void testTwoNodesOnSetupAndFailure() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter 1 because one fetch completed + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // failure counter up by 1 because one fetch failed + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(1.0); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); assertThat(test.reroute.get(), equalTo(2)); @@ -379,7 +411,9 @@ public void testTwoNodesOnSetupAndFailure() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // success and failure counters same because fetchData did not trigger new async fetch + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); + verify(asyncFetchFailureCounter, times(1)).add(1.0); verify(asyncFetchFailureCounter, times(1)).add(anyDouble()); } @@ -398,6 +432,7 @@ public void testTwoNodesAddedInBetween() throws Exception { // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); // counter 1 because fetch completed + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -408,12 +443,14 @@ public void testTwoNodesAddedInBetween() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter still 1 because second fetch ongoing + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // counter now 2 because 2 fetches completed + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -424,6 +461,7 @@ public void testTwoNodesAddedInBetween() throws Exception { assertThat(fetchData.getData().get(node1), sameInstance(response1)); assertThat(fetchData.getData().get(node2), sameInstance(response2)); // counter still 2 because fetchData did not trigger new async fetch + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -446,6 +484,7 @@ public void testClearCache() throws Exception { test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); // counter 1 because 1 fetch completed + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -455,6 +494,7 @@ public void testClearCache() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // counter still 1 because a new fetch is not called + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -464,6 +504,7 @@ public void testClearCache() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // counter still 1 because a new fetch is not called + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -476,12 +517,14 @@ public void testClearCache() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter still 1 because new fetch is still ongoing + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); // counter now 2 because second fetch completed + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -491,6 +534,7 @@ public void testClearCache() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); // counter still 2 because fetchData did not trigger new async fetch + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); } @@ -513,6 +557,7 @@ public void testConcurrentRequestAndClearCache() throws Exception { test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); // counter 1 because fetch completed, even though cache was wiped + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -523,12 +568,14 @@ public void testConcurrentRequestAndClearCache() throws Exception { fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // counter unchanged because fetch ongoing + verify(asyncFetchSuccessCounter, times(1)).add(1.0); verify(asyncFetchSuccessCounter, times(1)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); // counter 2 because second fetch completed + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); @@ -538,6 +585,7 @@ public void testConcurrentRequestAndClearCache() throws Exception { assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); // counter unchanged because fetchData does not trigger new async fetch + verify(asyncFetchSuccessCounter, times(2)).add(1.0); verify(asyncFetchSuccessCounter, times(2)).add(anyDouble()); verify(asyncFetchFailureCounter, times(0)).add(anyDouble()); From 83a4a2d78696cd13caaae157a83bf9bddb814f4a Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Wed, 25 Sep 2024 22:19:08 +0530 Subject: [PATCH 7/9] change info log to trace log Signed-off-by: Rahul Karajgikar --- .../src/main/java/org/opensearch/gateway/AsyncShardFetch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 076368701e521..6017743ef2bd0 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -225,7 +225,7 @@ protected synchronized void processAsyncFetch(List responses, List Date: Mon, 30 Sep 2024 10:40:43 +0530 Subject: [PATCH 8/9] add changelog Signed-off-by: Rahul Karajgikar --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b628e9277959d..9a45bbaa879fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430)) - Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923)) - Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718)) +- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) From 22db7efe3e528dd8bdc753cf67a6c298485e16c7 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Mon, 30 Sep 2024 11:41:58 +0530 Subject: [PATCH 9/9] empty commit Signed-off-by: Rahul Karajgikar