Skip to content

Commit

Permalink
rare reroute with npe
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Oct 24, 2024
1 parent 0bded88 commit 96c0766
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,49 @@

package org.opensearch.cluster.coordination;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingChangesObserver;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.index.Index;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.discovery.Discovery;
import org.opensearch.index.IndexService;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.BlockClusterStateProcessing;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;

import java.util.List;
Expand All @@ -71,6 +84,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -409,4 +423,145 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
}


public void testDisassociateNodesWhileShardInit() throws InterruptedException {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build());
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
String node2 = internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());

final ClusterService clusterService = internalCluster().clusterService(clusterManagerName);
blockShardStartedResponse(clusterManagerName, clusterService);

final String index = "index";

//create index with 3 primary and 1 replica each
prepareCreate(index).setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
//.put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")
).get();
ensureGreen(index);

// close to have some unassigned started shards shards..
client().admin().indices().prepareClose(index).get();

//block so that replicas are always in init and not started
blockReplicaStart.set(true);
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName);
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterState.Builder builder = ClusterState.builder(currentState);
// open index
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
.state(IndexMetadata.State.OPEN)
.build();

builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metadata().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
return allocationService.reroute(updatedState, "reroute");
}

@Override
public void onFailure(String source, Exception e) {
logger.error(e.getMessage(), e);
}
});

ensureYellow(index);
waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3);

logger.info("Initializing shards");
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));

//trigger 2nd reroute after shard in initialized
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return allocationService.reroute(currentState, "reroute");
}

@Override
public void onFailure(String source, Exception e) {}
});
ensureYellow(index);
waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3);
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
//remove the primary node of replica shard which is in init
ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0);
ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId());

ClusterState.Builder builder = ClusterState.builder(currentState);
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId()));
currentState = builder.build();
logger.info("removed the node {}", primaryShard.currentNodeId());
logger.info("shard {}", next);
return allocationService.disassociateDeadNodes(currentState, true, "reroute");
}

@Override
public void onFailure(String source, Exception e) {}
});
//sleep for reroute to get triggeredn
Thread.sleep(60 * 1000);

waitUntil(() -> clusterService.state().nodes().getSize() == 3);
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
blockReplicaStart.set(false);

clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterState.Builder builder = ClusterState.builder(currentState);
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
.state(IndexMetadata.State.OPEN)
.build();
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metadata().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();

return allocationService.reroute(updatedState, "reroute");
}

@Override
public void onFailure(String source, Exception e) {}
});

ensureGreen(index);
}

AtomicBoolean blockReplicaStart = new AtomicBoolean(false);
private void blockShardStartedResponse(String master, ClusterService service) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> {

if (blockReplicaStart.get()) {
ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request;
final ShardRouting matched = service.state().getRoutingTable().getByAllocationId(req.getShardId(), req.getAllocationId());

if (matched != null && matched.primary() == false) {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} else {
handler.messageReceived(request, channel, task);
}
} else {
handler.messageReceived(request, channel, task);
}
});
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
* @opensearch.internal
*/
public static class StartedShardEntry extends TransportRequest {

final ShardId shardId;
final String allocationId;
final long primaryTerm;
Expand Down Expand Up @@ -883,6 +884,15 @@ public String toString() {
message
);
}

public ShardId getShardId() {
return shardId;
}

public String getAllocationId() {
return allocationId;
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterSt
throw new AssertionError("update task submitted to ClusterManagerService cannot remove cluster-manager");
}
} catch (Exception e) {
logger.trace(
logger.info(
() -> new ParameterizedMessage(
"failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}",
previousClusterState.version(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch(
RoutingNodes routingNodes = allocation.routingNodes();
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId());
if (primaryShard == null) {
logger.trace("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard);
logger.info("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard);
return null;
}
assert primaryShard.currentNodeId() != null;
Expand All @@ -111,7 +111,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch(
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
logger.info("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
return null;
}

Expand Down

0 comments on commit 96c0766

Please sign in to comment.