Skip to content

Commit

Permalink
[Remote Segment Store] Use files from segmentInfosSnapshot in upload (o…
Browse files Browse the repository at this point in the history
…pensearch-project#8487)

* Use files from segmentInfosSnapshot in upload

---------

Signed-off-by: Sachin Kale <[email protected]>
Co-authored-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale and Sachin Kale authored Jul 18, 2023
1 parent 2ba1157 commit 230a06d
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.opensearch.remotestore;

import org.junit.After;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -49,6 +51,13 @@ public Settings indexSettings() {
return defaultIndexSettings();
}

IndexResponse indexSingleDoc(String indexName) {
return client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

private Settings defaultIndexSettings() {
return Settings.builder()
.put(super.indexSettings())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 3)
public class RemoteStoreForceMergeIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
private static final String TOTAL_OPERATIONS = "total-operations";
private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Before
public void setup() {
setupRepo();
}

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush, boolean flushAfterMerge, long deletedDocs) {
long totalOperations = 0;
long maxSeqNo = -1;
List<IndexResponse> indexResponseList = new ArrayList<>();
for (int i = 0; i < numberOfIterations; i++) {
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc(INDEX_NAME);
maxSeqNo = response.getSeqNo();
indexResponseList.add(response);
}
totalOperations += numberOfOperations;
if (invokeFlush) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
}
if (deletedDocs == -1) {
deletedDocs = totalOperations;
}
int length = indexResponseList.size();
for (int j = 0; j < deletedDocs; j++) {
maxSeqNo = client().prepareDelete().setIndex(INDEX_NAME).setId(indexResponseList.get(length - j - 1).getId()).get().getSeqNo();
}
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(flushAfterMerge).get();
refresh(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), totalOperations - deletedDocs);
Map<String, Long> indexingStats = new HashMap<>();
indexingStats.put(TOTAL_OPERATIONS, totalOperations);
indexingStats.put(MAX_SEQ_NO_TOTAL, maxSeqNo);
return indexingStats;
}

private void verifyRestoredData(Map<String, Long> indexStats, long deletedDocs) {
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) - deletedDocs);
IndexResponse response = indexSingleDoc(INDEX_NAME);
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
refresh(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1 - deletedDocs);
}

private void testRestoreWithMergeFlow(int numberOfIterations, boolean invokeFlush, boolean flushAfterMerge, long deletedDocs)
throws IOException {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, flushAfterMerge, deletedDocs);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);

if (deletedDocs == -1) {
verifyRestoredData(indexStats, indexStats.get(TOTAL_OPERATIONS));
} else {
verifyRestoredData(indexStats, deletedDocs);
}
}

// Following integ tests use randomBoolean to control the number of integ tests. If we use the separate
// values for each of the flags, number of integ tests become 16 in comparison to current 2.
// We have run all the 16 tests on local and they run fine.
public void testRestoreForceMergeSingleIteration() throws IOException {
boolean invokeFLush = randomBoolean();
boolean flushAfterMerge = randomBoolean();
testRestoreWithMergeFlow(1, invokeFLush, flushAfterMerge, randomIntBetween(0, 10));
}

public void testRestoreForceMergeMultipleIterations() throws IOException {
boolean invokeFLush = randomBoolean();
boolean flushAfterMerge = randomBoolean();
testRestoreWithMergeFlow(randomIntBetween(2, 5), invokeFLush, flushAfterMerge, randomIntBetween(0, 10));
}

public void testRestoreForceMergeMultipleIterationsDeleteAll() throws IOException {
boolean invokeFLush = randomBoolean();
boolean flushAfterMerge = randomBoolean();
testRestoreWithMergeFlow(randomIntBetween(2, 3), invokeFLush, flushAfterMerge, -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.UUIDs;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -128,7 +126,7 @@ private void indexDocs() {
}
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
indexSingleDoc(INDEX_NAME);
}
}
}
Expand All @@ -149,12 +147,4 @@ private void assertResponseStats(RemoteRefreshSegmentTracker.Stats stats) {
assertTrue(stats.uploadBytesPerSecMovingAverage > 0);
assertTrue(stats.uploadTimeMovingAverage > 0);
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
Expand Down Expand Up @@ -40,13 +39,10 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -216,77 +212,50 @@ private synchronized boolean syncSegments() {
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

List<String> segmentInfosFiles = localSegmentsPostRefresh.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.collect(Collectors.toList());
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));

if (latestSegmentInfos.isPresent()) {
// SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain
// all the segments from last commit if they are merged away but not yet committed.
// Each metadata file in the remote segment store represents a commit and the following
// statement keeps sure that each metadata will always contain all the segments from last commit + refreshed
// segments.
SegmentInfos segmentCommitInfos;
try {
segmentCommitInfos = SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get());
} catch (Exception e) {
// Seeing discrepancy in segment infos and files on disk. SegmentInfosSnapshot is returning
// a segment_N file which does not exist on local disk.
logger.error("Exception occurred while SegmentInfos.readCommit(..)", e);
logger.error("segmentInfosFiles={} diskFiles={}", localSegmentsPostRefresh, storeDirectory.listAll());
throw e;
}
localSegmentsPostRefresh.addAll(segmentCommitInfos.files(true));
segmentInfosFiles.stream()
.filter(file -> !file.equals(latestSegmentInfos.get()))
.forEach(localSegmentsPostRefresh::remove);

// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
try {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(
refreshTimeMs,
refreshClockTimeMs,
refreshSeqNo,
lastRefreshedCheckpoint,
checkpoint
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
successful.set(true);
} catch (Exception e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log
// integration.
logger.warn("Exception in post new segment upload actions", e);
}
// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
try {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(
refreshTimeMs,
refreshClockTimeMs,
refreshSeqNo,
lastRefreshedCheckpoint,
checkpoint
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
successful.set(true);
} catch (Exception e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// as part of exponential back-off retry logic. This should not affect durability of the indexed data
// with remote trans-log integration.
logger.warn("Exception in post new segment upload actions", e);
}
}

@Override
public void onFailure(Exception e) {
logger.warn("Exception while uploading new segments to the remote segment store", e);
}
}, latch);
@Override
public void onFailure(Exception e) {
logger.warn("Exception while uploading new segments to the remote segment store", e);
}
}, latch);

// Start the segments files upload
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
latch.await();
}
// Start the segments files upload
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
latch.await();
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log integration.
// as part of exponential back-off retry logic. This should not affect durability of the indexed data
// with remote trans-log integration.
logger.warn("Exception while uploading new segments to the remote segment store", e);
}
} catch (Throwable t) {
Expand Down

0 comments on commit 230a06d

Please sign in to comment.