Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-12407: lazily load token in PrimaryKeyWithSource #1500

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ protected PrimaryKey computeNext()
if (rowId == PostingList.END_OF_STREAM)
return endOfData();

var primaryKey = primaryKeyMap.primaryKeyFromRowId(rowId);
return new PrimaryKeyWithSource(primaryKey, primaryKeyMap.getSSTableId(), rowId);
return new PrimaryKeyWithSource(primaryKeyMap, rowId, searcherContext.minimumKey, searcherContext.maximumKey);
}
catch (Throwable t)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,38 @@

public class PrimaryKeyWithSource implements PrimaryKey
{
private final PrimaryKey primaryKey;
private final PrimaryKeyMap primaryKeyMap;
private final SSTableId<?> sourceSstableId;
private final long sourceRowId;
private PrimaryKey delegatePrimaryKey;
private final PrimaryKey sourceSstableMinKey;
private final PrimaryKey sourceSstableMaxKey;

public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId<?> sstableId, long sstableRowId)
public PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
{
assert primaryKey != null : "Cannot construct a PrimaryKeyWithSource with a null primaryKey";
this.primaryKey = primaryKey;
this.sourceSstableId = sstableId;
this.primaryKeyMap = primaryKeyMap;
this.sourceSstableId = primaryKeyMap.getSSTableId();
this.sourceRowId = sstableRowId;
this.sourceSstableMinKey = sourceSstableMinKey;
this.sourceSstableMaxKey = sourceSstableMaxKey;
}

public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId<?> sourceSstableId, long sourceRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
{
this.delegatePrimaryKey = primaryKey;
this.primaryKeyMap = null;
this.sourceSstableId = sourceSstableId;
this.sourceRowId = sourceRowId;
this.sourceSstableMinKey = sourceSstableMinKey;
this.sourceSstableMaxKey = sourceSstableMaxKey;
}

private PrimaryKey primaryKey()
{
if (delegatePrimaryKey == null)
delegatePrimaryKey = primaryKeyMap.primaryKeyFromRowId(sourceRowId);

return delegatePrimaryKey;
}

public long getSourceRowId()
Expand All @@ -53,43 +75,43 @@ public SSTableId<?> getSourceSstableId()
@Override
public Token token()
{
return primaryKey.token();
return primaryKey().token();
}

@Override
public DecoratedKey partitionKey()
{
return primaryKey.partitionKey();
return primaryKey().partitionKey();
}

@Override
public Clustering clustering()
{
return primaryKey.clustering();
return primaryKey().clustering();
}

@Override
public PrimaryKey loadDeferred()
{
return primaryKey.loadDeferred();
return primaryKey().loadDeferred();
}

@Override
public ByteSource asComparableBytes(ByteComparable.Version version)
{
return primaryKey.asComparableBytes(version);
return primaryKey().asComparableBytes(version);
}

@Override
public ByteSource asComparableBytesMinPrefix(ByteComparable.Version version)
{
return primaryKey.asComparableBytesMinPrefix(version);
return primaryKey().asComparableBytesMinPrefix(version);
}

@Override
public ByteSource asComparableBytesMaxPrefix(ByteComparable.Version version)
{
return primaryKey.asComparableBytesMaxPrefix(version);
return primaryKey().asComparableBytesMaxPrefix(version);
}

@Override
Expand All @@ -101,7 +123,13 @@ public int compareTo(PrimaryKey o)
if (sourceSstableId.equals(other.sourceSstableId))
return Long.compare(sourceRowId, other.sourceRowId);
}
return primaryKey.compareTo(o);

if (sourceSstableMinKey.compareTo(o) > 0)
return 1;
else if (sourceSstableMaxKey.compareTo(o) < 0)
return -1;
else
return primaryKey().compareTo(o);
}

@Override
Expand All @@ -110,21 +138,31 @@ public boolean equals(Object o)
if (o instanceof PrimaryKeyWithSource)
{
var other = (PrimaryKeyWithSource) o;
// If they are from the same source sstable, we can compare the row ids directly.
if (sourceSstableId.equals(other.sourceSstableId))
return sourceRowId == other.sourceRowId;
}
return primaryKey.equals(o);

if (!(o instanceof PrimaryKey))
return false;

var other = (PrimaryKey) o;
// If the source sstable does not contain the PrimaryKey, the keys cannot be equal.
if (sourceSstableMinKey.compareTo(other) > 0 || sourceSstableMaxKey.compareTo(other) < 0)
return false;

return primaryKey().equals(other);
}

@Override
public int hashCode()
{
return primaryKey.hashCode();
return primaryKey().hashCode();
}

@Override
public String toString()
{
return String.format("%s (source sstable: %s, %s)", primaryKey, sourceSstableId, sourceRowId);
return String.format("%s (source sstable: %s, %s)", primaryKey(), sourceSstableId, sourceRowId);
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.disk.ModernResettableByteBuffersIndexOutput;
import org.apache.cassandra.index.sai.disk.PostingList;
import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource;
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexInput;
Expand All @@ -44,6 +45,7 @@
import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.io.sstable.SSTableId;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;

Expand Down Expand Up @@ -134,7 +136,7 @@ public class SegmentMetadata implements Comparable<SegmentMetadata>
private static final Logger logger = LoggerFactory.getLogger(SegmentMetadata.class);

@SuppressWarnings("resource")
private SegmentMetadata(IndexInput input, IndexContext context, Version version) throws IOException
private SegmentMetadata(IndexInput input, IndexContext context, Version version, SSTableId<?> sstableId) throws IOException
{
PrimaryKey.Factory primaryKeyFactory = context.keyFactory();
AbstractType<?> termsType = context.getValidator();
Expand All @@ -144,8 +146,10 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version)
this.numRows = input.readLong();
this.minSSTableRowId = input.readLong();
this.maxSSTableRowId = input.readLong();
this.minKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
this.maxKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
PrimaryKey min = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
PrimaryKey max = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
this.minKey = new PrimaryKeyWithSource(min, sstableId, -1, min, max);
this.maxKey = new PrimaryKeyWithSource(max, sstableId, Long.MAX_VALUE, min, max);
this.minTerm = readBytes(input);
this.maxTerm = readBytes(input);
TermsDistribution td = null;
Expand All @@ -164,7 +168,7 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version)
}

@SuppressWarnings("resource")
public static List<SegmentMetadata> load(MetadataSource source, IndexContext context) throws IOException
public static List<SegmentMetadata> load(MetadataSource source, IndexContext context, SSTableId<?> sstableId) throws IOException
{

IndexInput input = source.get(NAME);
Expand All @@ -175,7 +179,7 @@ public static List<SegmentMetadata> load(MetadataSource source, IndexContext con

for (int i = 0; i < segmentCount; i++)
{
segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion()));
segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion(), sstableId));
}

return segmentMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.plan.Orderer;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.PrimaryKeyListUtil;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey;
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.io.sstable.format.SSTableReader;
Expand Down Expand Up @@ -87,7 +88,7 @@ public V1SearchableIndex(SSTableContext sstableContext, IndexComponents.ForRead

final MetadataSource source = MetadataSource.loadMetadata(perIndexComponents);

metadatas = SegmentMetadata.load(source, indexContext);
metadatas = SegmentMetadata.load(source, indexContext, sstableContext.sstable().getId());

for (SegmentMetadata metadata : metadatas)
{
Expand Down Expand Up @@ -230,7 +231,7 @@ public List<CloseableIterator<PrimaryKeyWithSortKey>> orderResultsBy(QueryContex
for (Segment segment : segments)
{
// Only pass the primary keys in a segment's range to the segment index.
var segmentKeys = getKeysInRange(keys, segment);
var segmentKeys = PrimaryKeyListUtil.getKeysInRange(keys, segment.metadata.minKey, segment.metadata.maxKey);
var segmentLimit = segment.proportionalAnnLimit(limit, totalRows);
results.add(segment.orderResultsBy(context, segmentKeys, orderer, segmentLimit));
}
Expand Down Expand Up @@ -287,37 +288,6 @@ public long estimateMatchingRowsCount(Expression predicate, AbstractBounds<Parti
return rowCount;
}

/** Create a sublist of the keys within (inclusive) the segment's bounds */
protected List<PrimaryKey> getKeysInRange(List<PrimaryKey> keys, Segment segment)
{
int minIndex = findBoundaryIndex(keys, segment, true);
int maxIndex = findBoundaryIndex(keys, segment, false);
return keys.subList(minIndex, maxIndex);
}

private int findBoundaryIndex(List<PrimaryKey> keys, Segment segment, boolean findMin)
{
// The minKey and maxKey are sometimes just partition keys (not primary keys), so binarySearch
// may not return the index of the least/greatest match.
var key = findMin ? segment.metadata.minKey : segment.metadata.maxKey;
int index = Collections.binarySearch(keys, key);
if (index < 0)
return -index - 1;
if (findMin)
{
while (index > 0 && keys.get(index - 1).equals(key))
index--;
}
else
{
while (index < keys.size() - 1 && keys.get(index + 1).equals(key))
index++;
// We must include the PrimaryKey at the boundary
index++;
}
return index;
}

@Override
public void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.plan.Orderer;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.PrimaryKeyListUtil;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithScore;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey;
import org.apache.cassandra.index.sai.utils.RangeUtil;
Expand Down Expand Up @@ -288,10 +289,10 @@ public CloseableIterator<PrimaryKeyWithSortKey> orderResultsBy(QueryContext cont
// Compute the keys that exist in the current memtable and their corresponding graph ordinals
var keysInGraph = new HashSet<PrimaryKey>();
var relevantOrdinals = new IntHashSet();
keys.stream()
.dropWhile(k -> k.compareTo(minimumKey) < 0)
.takeWhile(k -> k.compareTo(maximumKey) <= 0)
.forEach(k ->

var keysInRange = PrimaryKeyListUtil.getKeysInRange(keys, minimumKey, maximumKey);

keysInRange.forEach(k ->
{
var v = graph.vectorForKey(k);
if (v == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.index.sai.utils;

import java.util.Collections;
import java.util.List;

public class PrimaryKeyListUtil
{

/** Create a sublist of the keys within (inclusive) the segment's bounds */
public static List<PrimaryKey> getKeysInRange(List<PrimaryKey> keys, PrimaryKey minKey, PrimaryKey maxKey)
{
int minIndex = PrimaryKeyListUtil.findBoundaryIndex(keys, minKey, true);
int maxIndex = PrimaryKeyListUtil.findBoundaryIndex(keys, maxKey, false);
return keys.subList(minIndex, maxIndex);
}

private static int findBoundaryIndex(List<PrimaryKey> keys, PrimaryKey key, boolean findMin)
{
// The minKey and maxKey are sometimes just partition keys (not primary keys), so binarySearch
// may not return the index of the least/greatest match.
int index = Collections.binarySearch(keys, key);
if (index < 0)
return -index - 1;
if (findMin)
{
while (index > 0 && keys.get(index - 1).equals(key))
index--;
}
else
{
while (index < keys.size() - 1 && keys.get(index + 1).equals(key))
index++;
// Because we use this for subList, we need to return the index of the next element
index++;
}
return index;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void canReadPerIndexMetadata() throws Throwable
IndexComponents.ForRead components = indexDescriptor.perIndexComponents(intContext);
final MetadataSource source = MetadataSource.loadMetadata(components);

List<SegmentMetadata> metadatas = SegmentMetadata.load(source, intContext);
List<SegmentMetadata> metadatas = SegmentMetadata.load(source, intContext, null);

assertEquals(1, metadatas.size());
assertEquals(100, metadatas.get(0).numRows);
Expand Down Expand Up @@ -167,7 +167,7 @@ public void canSearchBDKIndex() throws Throwable

final MetadataSource source = MetadataSource.loadMetadata(components);

List<SegmentMetadata> metadatas = SegmentMetadata.load(source, intContext);
List<SegmentMetadata> metadatas = SegmentMetadata.load(source, intContext, null);

BKDReader bkdReader = new BKDReader(intContext,
components.get(IndexComponentType.KD_TREE).createFileHandle(),
Expand All @@ -188,7 +188,7 @@ public void canSearchTermsIndex() throws Throwable

final MetadataSource source = MetadataSource.loadMetadata(components);

SegmentMetadata metadata = SegmentMetadata.load(source, textContext).get(0);
SegmentMetadata metadata = SegmentMetadata.load(source, textContext, null).get(0);

long root = metadata.getIndexRoot(IndexComponentType.TERMS_DATA);
Map<String,String> map = metadata.componentMetadatas.get(IndexComponentType.TERMS_DATA).attributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int
MetadataSource source = MetadataSource.loadMetadata(components);

// verify segment count
List<SegmentMetadata> segmentMetadatas = SegmentMetadata.load(source, indexContext);
List<SegmentMetadata> segmentMetadatas = SegmentMetadata.load(source, indexContext, null);
assertEquals(segments, segmentMetadatas.size());

// verify segment metadata
Expand Down