Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add success and failure count OTel metrics for async shard fetch #15976

Merged
merged 9 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -88,6 +89,7 @@
private static final Logger logger = LogManager.getLogger(TransportIndicesShardStoresAction.class);

private final TransportNodesListGatewayStartedShards listShardStoresInfo;
private final ClusterManagerMetrics clusterManagerMetrics;

@Inject
public TransportIndicesShardStoresAction(
Expand All @@ -96,7 +98,8 @@
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
TransportNodesListGatewayStartedShards listShardStoresInfo
TransportNodesListGatewayStartedShards listShardStoresInfo,
ClusterManagerMetrics clusterManagerMetrics
) {
super(
IndicesShardStoresAction.NAME,
Expand All @@ -109,6 +112,7 @@
true
);
this.listShardStoresInfo = listShardStoresInfo;
this.clusterManagerMetrics = clusterManagerMetrics;
}

@Override
Expand Down Expand Up @@ -154,7 +158,7 @@
// we could fetch all shard store info from every node once (nNodes requests)
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
// for fetching shard stores info, that operates on a list of shards instead of a single shard
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener).start();
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener, clusterManagerMetrics).start();

Check warning on line 161 in server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java#L161

Added line #L161 was not covered by tests
}

@Override
Expand All @@ -175,27 +179,37 @@
private final ActionListener<IndicesShardStoresResponse> listener;
private CountDown expectedOps;
private final Queue<InternalAsyncFetch.Response> fetchResponses;
private final ClusterManagerMetrics clusterManagerMetrics;

AsyncShardStoresInfoFetches(
DiscoveryNodes nodes,
RoutingNodes routingNodes,
Set<Tuple<ShardId, String>> shards,
ActionListener<IndicesShardStoresResponse> listener
ActionListener<IndicesShardStoresResponse> listener,
ClusterManagerMetrics clusterManagerMetrics
) {
this.nodes = nodes;
this.routingNodes = routingNodes;
this.shards = shards;
this.listener = listener;
this.fetchResponses = new ConcurrentLinkedQueue<>();
this.expectedOps = new CountDown(shards.size());
this.clusterManagerMetrics = clusterManagerMetrics;

Check warning on line 197 in server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java#L197

Added line #L197 was not covered by tests
}

void start() {
if (shards.isEmpty()) {
listener.onResponse(new IndicesShardStoresResponse());
} else {
for (Tuple<ShardId, String> shard : shards) {
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shard.v1(), shard.v2(), listShardStoresInfo);
InternalAsyncFetch fetch = new InternalAsyncFetch(

Check warning on line 205 in server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java#L205

Added line #L205 was not covered by tests
logger,
"shard_stores",
shard.v1(),
shard.v2(),

Check warning on line 209 in server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java#L208-L209

Added lines #L208 - L209 were not covered by tests
listShardStoresInfo,
clusterManagerMetrics
);
fetch.fetchData(nodes, Collections.emptyMap());
}
}
Expand All @@ -213,9 +227,10 @@
String type,
ShardId shardId,
String customDataPath,
TransportNodesListGatewayStartedShards action
TransportNodesListGatewayStartedShards action,
ClusterManagerMetrics clusterManagerMetrics
) {
super(logger, type, shardId, customDataPath, action);
super(logger, type, shardId, customDataPath, action, clusterManagerMetrics);

Check warning on line 233 in server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java#L233

Added line #L233 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public final class ClusterManagerMetrics {

public final Counter leaderCheckFailureCounter;
public final Counter followerChecksFailureCounter;
public final Counter asyncFetchFailureCounter;
public final Counter asyncFetchSuccessCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
Expand Down Expand Up @@ -71,6 +73,17 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
"Counter for number of failed leader checks",
COUNTER_METRICS_UNIT
);
asyncFetchFailureCounter = metricsRegistry.createCounter(
"async.fetch.failure.count",
"Counter for number of failed async fetches",
COUNTER_METRICS_UNIT
);
asyncFetchSuccessCounter = metricsRegistry.createCounter(
"async.fetch.success.count",
"Counter for number of successful async fetches",
COUNTER_METRICS_UNIT
);

}

public void recordLatency(Histogram histogram, Double value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public class ClusterModule extends AbstractModule {
// pkg private for tests
final Collection<AllocationDecider> deciderList;
final ShardsAllocator shardsAllocator;
private final ClusterManagerMetrics clusterManagerMetrics;

public ClusterModule(
Settings settings,
Expand All @@ -166,6 +167,7 @@ public ClusterModule(
settings,
clusterManagerMetrics
);
this.clusterManagerMetrics = clusterManagerMetrics;
}

public static List<Entry> getNamedWriteables() {
Expand Down Expand Up @@ -456,6 +458,7 @@ protected void configure() {
bind(TaskResultsService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
bind(ClusterManagerMetrics.class).toInstance(clusterManagerMetrics);
}

public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, ShardsBatchGatewayAllocator shardsBatchGatewayAllocator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -48,7 +49,8 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
Class<V> clazz,
V emptyShardResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
ShardBatchResponseFactory<T, V> responseFactory,
ClusterManagerMetrics clusterManagerMetrics
) {
super(
logger,
Expand All @@ -64,7 +66,8 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
clazz,
emptyShardResponse,
emptyShardResponsePredicate,
responseFactory
responseFactory,
clusterManagerMetrics
)
);
}
Expand Down Expand Up @@ -116,9 +119,10 @@ public ShardBatchCache(
Class<V> clazz,
V emptyResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
ShardBatchResponseFactory<T, V> responseFactory,
ClusterManagerMetrics clusterManagerMetrics
) {
super(Loggers.getLogger(logger, "_" + logKey), type);
super(Loggers.getLogger(logger, "_" + logKey), type, clusterManagerMetrics);
this.batchSize = shardAttributesMap.size();
this.emptyShardResponsePredicate = emptyShardResponsePredicate;
cache = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
Expand Down Expand Up @@ -94,15 +95,16 @@ protected AsyncShardFetch(
String type,
ShardId shardId,
String customDataPath,
Lister<? extends BaseNodesResponse<T>, T> action
Lister<? extends BaseNodesResponse<T>, T> action,
ClusterManagerMetrics clusterManagerMetrics
) {
this.logger = logger;
this.type = type;
shardAttributesMap = new HashMap<>();
shardAttributesMap.put(shardId, new ShardAttributes(customDataPath));
this.action = (Lister<BaseNodesResponse<T>, T>) action;
this.reroutingKey = "ShardId=[" + shardId.toString() + "]";
cache = new ShardCache<>(logger, reroutingKey, type);
cache = new ShardCache<>(logger, reroutingKey, type, clusterManagerMetrics);
}

/**
Expand Down Expand Up @@ -284,8 +286,8 @@ static class ShardCache<K extends BaseNodeResponse> extends AsyncShardFetchCache

private final Map<String, NodeEntry<K>> cache;

public ShardCache(Logger logger, String logKey, String type) {
super(Loggers.getLogger(logger, "_" + logKey), type);
public ShardCache(Logger logger, String logKey, String type, ClusterManagerMetrics clusterManagerMetrics) {
super(Loggers.getLogger(logger, "_" + logKey), type, clusterManagerMetrics);
cache = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
Expand Down Expand Up @@ -51,10 +52,12 @@ public abstract class AsyncShardFetchCache<K extends BaseNodeResponse> {

private final Logger logger;
private final String type;
private final ClusterManagerMetrics clusterManagerMetrics;

protected AsyncShardFetchCache(Logger logger, String type) {
protected AsyncShardFetchCache(Logger logger, String type, ClusterManagerMetrics clusterManagerMetrics) {
this.logger = logger;
this.type = type;
this.clusterManagerMetrics = clusterManagerMetrics;
}

abstract void initData(DiscoveryNode node);
Expand Down Expand Up @@ -162,6 +165,7 @@ Map<DiscoveryNode, K> getCacheData(DiscoveryNodes nodes, Set<String> failedNodes
}

void processResponses(List<K> responses, long fetchingRound) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchSuccessCounter, Double.valueOf(responses.size()));
for (K response : responses) {
BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId());
if (nodeEntry != null) {
Expand Down Expand Up @@ -222,6 +226,7 @@ boolean retryableException(Throwable unwrappedCause) {
}

void processFailures(List<FailedNodeException> failures, long fetchingRound) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, Double.valueOf(failures.size()));
for (FailedNodeException failure : failures) {
logger.trace("processing failure {} for [{}]", failure, type);
BaseNodeEntry nodeEntry = getCache().get(failure.nodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -92,11 +93,12 @@ public class GatewayAllocator implements ExistingShardsAllocator {
public GatewayAllocator(
RerouteService rerouteService,
TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetadata storeAction
TransportNodesListShardStoreMetadata storeAction,
ClusterManagerMetrics clusterManagerMetrics
) {
this.rerouteService = rerouteService;
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction, clusterManagerMetrics);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction, clusterManagerMetrics);
}

@Override
Expand Down Expand Up @@ -251,9 +253,10 @@ class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T>
String type,
ShardId shardId,
String customDataPath,
Lister<? extends BaseNodesResponse<T>, T> action
Lister<? extends BaseNodesResponse<T>, T> action,
ClusterManagerMetrics clusterManagerMetrics
) {
super(logger, type, shardId, customDataPath, action);
super(logger, type, shardId, customDataPath, action, clusterManagerMetrics);
}

@Override
Expand All @@ -274,9 +277,11 @@ protected void reroute(String reroutingKey, String reason) {
class InternalPrimaryShardAllocator extends PrimaryShardAllocator {

private final TransportNodesListGatewayStartedShards startedAction;
private final ClusterManagerMetrics clusterManagerMetrics;

InternalPrimaryShardAllocator(TransportNodesListGatewayStartedShards startedAction) {
InternalPrimaryShardAllocator(TransportNodesListGatewayStartedShards startedAction, ClusterManagerMetrics clusterManagerMetrics) {
this.startedAction = startedAction;
this.clusterManagerMetrics = clusterManagerMetrics;
}

@Override
Expand All @@ -291,7 +296,8 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
"shard_started",
shardId,
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
startedAction
startedAction,
clusterManagerMetrics
)
);
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetch.fetchData(
Expand All @@ -313,9 +319,11 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
class InternalReplicaShardAllocator extends ReplicaShardAllocator {

private final TransportNodesListShardStoreMetadata storeAction;
private final ClusterManagerMetrics clusterManagerMetrics;

InternalReplicaShardAllocator(TransportNodesListShardStoreMetadata storeAction) {
InternalReplicaShardAllocator(TransportNodesListShardStoreMetadata storeAction, ClusterManagerMetrics clusterManagerMetrics) {
this.storeAction = storeAction;
this.clusterManagerMetrics = clusterManagerMetrics;
}

@Override
Expand All @@ -330,7 +338,8 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS
"shard_store",
shard.shardId(),
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
storeAction
storeAction,
clusterManagerMetrics
)
);
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> shardStores = fetch.fetchData(
Expand Down
Loading
Loading