Skip to content

Commit

Permalink
Marks end criteria reached for the segment if the Index cannot consum…
Browse files Browse the repository at this point in the history
…e more rows (apache#14479)
  • Loading branch information
noob-se7en authored and davecromberge committed Dec 9, 2024
1 parent 20de168 commit da9d2cb
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ public enum ControllerResponseStatus {
public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup";
// Stop reason sent by server as force commit message received
public static final String REASON_FORCE_COMMIT_MESSAGE_RECEIVED = "forceCommitMessageReceived";
// Stop reason sent by server as mutable index cannot consume more rows
// (like size reaching close to its limit or number of col values for a col is about to overflow int max)
public static final String REASON_INDEX_CAPACITY_THRESHOLD_BREACHED = "indexCapacityThresholdBreached";

// Canned responses
public static final Response RESP_NOT_LEADER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ private boolean endCriteriaReached() {
_numRowsConsumed, _numRowsIndexed);
_stopReason = SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED;
return true;
} else if (!canAddMore()) {
_segmentLogger.info(
"Stopping consumption as mutable index cannot consume more rows - numRowsConsumed={} "
+ "numRowsIndexed={}",
_numRowsConsumed, _numRowsIndexed);
_stopReason = SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED;
return true;
}
return false;

Expand Down Expand Up @@ -697,6 +704,11 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee
return prematureExit;
}

@VisibleForTesting
boolean canAddMore() {
return _realtimeSegment.canAddMore();
}

public class PartitionConsumer implements Runnable {
public void run() {
long initialConsumptionEnd = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,19 @@ public void testEndCriteriaChecking()
segmentDataManager._timeSupplier.set(endTime);
Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
}

// test end criteria reached if any of the index cannot take more rows
try (FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(false, new TimeSupplier(), null,
null, null)) {
segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());

segmentDataManager.setIndexCapacityThresholdBreached(true);

Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
Assert.assertEquals(segmentDataManager.getStopReason(),
SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED);
}
}

private void setHasMessagesFetched(FakeRealtimeSegmentDataManager segmentDataManager, boolean hasMessagesFetched)
Expand Down Expand Up @@ -907,6 +920,7 @@ public static class FakeRealtimeSegmentDataManager extends RealtimeSegmentDataMa
public Map<Integer, Semaphore> _semaphoreMap;
public boolean _stubConsumeLoop = true;
private TimeSupplier _timeSupplier;
private boolean _indexCapacityThresholdBreached;

private static InstanceDataManagerConfig makeInstanceDataManagerConfig() {
InstanceDataManagerConfig dataManagerConfig = mock(InstanceDataManagerConfig.class);
Expand Down Expand Up @@ -1087,6 +1101,15 @@ public void setFinalOffset(long offset) {
setOffset(offset, "_finalOffset");
}

@Override
protected boolean canAddMore() {
return !_indexCapacityThresholdBreached;
}

public void setIndexCapacityThresholdBreached(boolean indexCapacityThresholdBreached) {
_indexCapacityThresholdBreached = indexCapacityThresholdBreached;
}

public boolean invokeEndCriteriaReached() {
Method endCriteriaReached = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final File _consumerDir;

private final Map<String, IndexContainer> _indexContainerMap = new HashMap<>();
private boolean _indexCapacityThresholdBreached;

private final IdMap<FixedIntArray> _recordIdMap;

Expand Down Expand Up @@ -828,7 +829,20 @@ private void addNewRow(int docId, GenericRow row) {
Object[] values = (Object[]) value;
for (Map.Entry<IndexType, MutableIndex> indexEntry : indexContainer._mutableIndexes.entrySet()) {
try {
indexEntry.getValue().add(values, dictIds, docId);
MutableIndex mutableIndex = indexEntry.getValue();
mutableIndex.add(values, dictIds, docId);
// Few of the Immutable version of the mutable index are bounded by size like FixedBitMVForwardIndex.
// If num of values overflows or size is above limit, A mutable index is unable to convert to
// an immutable index and segment build fails causing the realtime consumption to stop.
// Hence, The below check is a temporary measure to avoid such scenarios until immutable index
// implementations are changed.
if (!_indexCapacityThresholdBreached && !mutableIndex.canAddMore()) {
_logger.info(
"Index: {} for column: {} cannot consume more rows, marking _indexCapacityThresholdBreached as true",
indexEntry.getKey(), column
);
_indexCapacityThresholdBreached = true;
}
} catch (Exception e) {
recordIndexingError(indexEntry.getKey(), e);
}
Expand Down Expand Up @@ -1265,6 +1279,10 @@ private boolean isAggregateMetricsEnabled() {
return _recordIdMap != null;
}

public boolean canAddMore() {
return !_indexCapacityThresholdBreached;
}

// NOTE: Okay for single-writer
@SuppressWarnings("NonAtomicOperationOnVolatileField")
private static class ValuesInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
private static final int INCREMENT_PERCENTAGE = 100;
//Increments the Initial size by 100% of initial capacity every time we runs out of capacity

// Conservative figure to not breach 2GB size limit for immutable index
private final static int DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN = 450_000_000;

// For single writer multiple readers setup, use ArrayList for writer and CopyOnWriteArrayList for reader
private final List<FixedByteSingleValueMultiColWriter> _headerWriters = new ArrayList<>();
private final List<FixedByteSingleValueMultiColReader> _headerReaders = new CopyOnWriteArrayList<>();
Expand All @@ -124,6 +127,7 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
private int _currentCapacity = 0;
private int _prevRowStartIndex = 0; // Offset in the data-buffer for the last row added.
private int _prevRowLength = 0; // Number of values in the column for the last row added.
private int _numValues = 0;

public FixedByteMVMutableForwardIndex(int maxNumberOfMultiValuesPerRow, int avgMultiValueCount, int rowCountPerChunk,
int columnSizeInBytes, PinotDataBufferMemoryManager memoryManager, String context, boolean isDictionaryEncoded,
Expand Down Expand Up @@ -200,6 +204,7 @@ private int getRowInCurrentHeader(int row) {

private int updateHeader(int row, int numValues) {
assert (numValues <= _maxNumberOfMultiValuesPerRow);
_numValues += numValues;
int newStartIndex = _prevRowStartIndex + _prevRowLength;
if (newStartIndex + numValues > _currentCapacity) {
addDataBuffer(_incrementalCapacity);
Expand Down Expand Up @@ -414,6 +419,11 @@ public void setDoubleMV(int docId, double[] values) {
}
}

@Override
public boolean canAddMore() {
return _numValues < DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN;
}

@Override
public void close()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.indexsegment.mutable;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.testng.Assert;
import org.testng.annotations.Test;


public class MutableSegmentEntriesAboveThresholdTest {
private static final File TEMP_DIR =
new File(FileUtils.getTempDirectory(), MutableSegmentEntriesAboveThresholdTest.class.getSimpleName());
private static final String AVRO_FILE = "data/test_data-mv.avro";
private Schema _schema;

private static class FakeMutableForwardIndex implements MutableForwardIndex {

private final MutableForwardIndex _mutableForwardIndex;
private static final int THRESHOLD = 2;
private int _numValues;

FakeMutableForwardIndex(MutableForwardIndex mutableForwardIndex) {
_mutableForwardIndex = mutableForwardIndex;
_numValues = 0;
}

@Override
public boolean canAddMore() {
return _numValues < THRESHOLD;
}

@Override
public void setDictIdMV(int docId, int[] dictIds) {
_numValues += dictIds.length;
_mutableForwardIndex.setDictIdMV(docId, dictIds);
}

@Override
public int getLengthOfShortestElement() {
return _mutableForwardIndex.getLengthOfShortestElement();
}

@Override
public int getLengthOfLongestElement() {
return _mutableForwardIndex.getLengthOfLongestElement();
}

@Override
public void setDictId(int docId, int dictId) {
_mutableForwardIndex.setDictId(docId, dictId);
}

@Override
public boolean isDictionaryEncoded() {
return _mutableForwardIndex.isDictionaryEncoded();
}

@Override
public boolean isSingleValue() {
return _mutableForwardIndex.isSingleValue();
}

@Override
public FieldSpec.DataType getStoredType() {
return _mutableForwardIndex.getStoredType();
}

@Override
public void close()
throws IOException {
_mutableForwardIndex.close();
}
}

private File getAvroFile() {
URL resourceUrl = MutableSegmentImplTest.class.getClassLoader().getResource(AVRO_FILE);
Assert.assertNotNull(resourceUrl);
return new File(resourceUrl.getFile());
}

private MutableSegmentImpl getMutableSegment(File avroFile)
throws Exception {
FileUtils.deleteQuietly(TEMP_DIR);

SegmentGeneratorConfig config =
SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, TEMP_DIR, "testTable");
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
driver.build();

_schema = config.getSchema();
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(_schema, "testSegment");
return MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
Collections.emptyMap(),
false, false, null, null, null, null, null, null, Collections.emptyList());
}

@Test
public void testNoLimitBreached()
throws Exception {
File avroFile = getAvroFile();
MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
try (RecordReader recordReader = RecordReaderFactory
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
mutableSegment.index(recordReader.next(reuse), defaultMetadata);
}
}
assert mutableSegment.canAddMore();
}

@Test
public void testLimitBreached()
throws Exception {
File avroFile = getAvroFile();
MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);

Field indexContainerMapField = MutableSegmentImpl.class.getDeclaredField("_indexContainerMap");
indexContainerMapField.setAccessible(true);
Map<String, Object> colVsIndexContainer = (Map<String, Object>) indexContainerMapField.get(mutableSegment);

for (Map.Entry<String, Object> entry : colVsIndexContainer.entrySet()) {
Object indexContainer = entry.getValue();
Field mutableIndexesField = indexContainer.getClass().getDeclaredField("_mutableIndexes");
mutableIndexesField.setAccessible(true);
Map<IndexType, MutableIndex> indexTypeVsMutableIndex =
(Map<IndexType, MutableIndex>) mutableIndexesField.get(indexContainer);

MutableForwardIndex mutableForwardIndex = null;
for (IndexType indexType : indexTypeVsMutableIndex.keySet()) {
if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) {
mutableForwardIndex = (MutableForwardIndex) indexTypeVsMutableIndex.get(indexType);
}
}

assert mutableForwardIndex != null;

indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(),
new FakeMutableForwardIndex(mutableForwardIndex));
}
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
try (RecordReader recordReader = RecordReaderFactory
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
mutableSegment.index(recordReader.next(reuse), defaultMetadata);
}
}

assert !mutableSegment.canAddMore();
}
}
Loading

0 comments on commit da9d2cb

Please sign in to comment.