From d85140c7226d531976983fcbe296cbbfc3dbf2d4 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 13 Jan 2025 17:49:22 -0600 Subject: [PATCH 1/9] CNDB-12407: lazily load token in PrimaryKeyWithSource Also, add sstable overlap check to avoid certain lookups. --- .../sai/disk/PostingListKeyRangeIterator.java | 3 +- .../index/sai/disk/PrimaryKeyWithSource.java | 70 ++++++++++++++----- .../index/sai/disk/v1/SegmentMetadata.java | 14 ++-- .../index/sai/disk/v1/V1SearchableIndex.java | 36 +--------- .../sai/disk/vector/VectorMemtableIndex.java | 9 +-- .../index/sai/utils/PrimaryKeyListUtil.java | 56 +++++++++++++++ .../sai/disk/v1/LegacyOnDiskFormatTest.java | 6 +- .../index/sai/disk/v1/SegmentFlushTest.java | 2 +- 8 files changed, 132 insertions(+), 64 deletions(-) create mode 100644 src/java/org/apache/cassandra/index/sai/utils/PrimaryKeyListUtil.java diff --git a/src/java/org/apache/cassandra/index/sai/disk/PostingListKeyRangeIterator.java b/src/java/org/apache/cassandra/index/sai/disk/PostingListKeyRangeIterator.java index d500225a6fd7..904880462bf9 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PostingListKeyRangeIterator.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PostingListKeyRangeIterator.java @@ -115,8 +115,7 @@ protected PrimaryKey computeNext() if (rowId == PostingList.END_OF_STREAM) return endOfData(); - var primaryKey = primaryKeyMap.primaryKeyFromRowId(rowId); - return new PrimaryKeyWithSource(primaryKey, primaryKeyMap.getSSTableId(), rowId); + return new PrimaryKeyWithSource(primaryKeyMap, rowId, searcherContext.minimumKey, searcherContext.maximumKey); } catch (Throwable t) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java index 7f9f7e89f51e..5710a49090cf 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java @@ -28,16 +28,38 @@ public class PrimaryKeyWithSource implements PrimaryKey { - private final PrimaryKey primaryKey; + private final PrimaryKeyMap primaryKeyMap; private final SSTableId sourceSstableId; private final long sourceRowId; + private PrimaryKey delegatePrimaryKey; + private final PrimaryKey sourceSstableMinKey; + private final PrimaryKey sourceSstableMaxKey; - public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId sstableId, long sstableRowId) + public PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey) { - assert primaryKey != null : "Cannot construct a PrimaryKeyWithSource with a null primaryKey"; - this.primaryKey = primaryKey; - this.sourceSstableId = sstableId; + this.primaryKeyMap = primaryKeyMap; + this.sourceSstableId = primaryKeyMap.getSSTableId(); this.sourceRowId = sstableRowId; + this.sourceSstableMinKey = sourceSstableMinKey; + this.sourceSstableMaxKey = sourceSstableMaxKey; + } + + public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId sourceSstableId, long sourceRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey) + { + this.delegatePrimaryKey = primaryKey; + this.primaryKeyMap = null; + this.sourceSstableId = sourceSstableId; + this.sourceRowId = sourceRowId; + this.sourceSstableMinKey = sourceSstableMinKey; + this.sourceSstableMaxKey = sourceSstableMaxKey; + } + + private PrimaryKey primaryKey() + { + if (delegatePrimaryKey == null) + delegatePrimaryKey = primaryKeyMap.primaryKeyFromRowId(sourceRowId); + + return delegatePrimaryKey; } public long getSourceRowId() @@ -53,43 +75,43 @@ public SSTableId getSourceSstableId() @Override public Token token() { - return primaryKey.token(); + return primaryKey().token(); } @Override public DecoratedKey partitionKey() { - return primaryKey.partitionKey(); + return primaryKey().partitionKey(); } @Override public Clustering clustering() { - return primaryKey.clustering(); + return primaryKey().clustering(); } @Override public PrimaryKey loadDeferred() { - return primaryKey.loadDeferred(); + return primaryKey().loadDeferred(); } @Override public ByteSource asComparableBytes(ByteComparable.Version version) { - return primaryKey.asComparableBytes(version); + return primaryKey().asComparableBytes(version); } @Override public ByteSource asComparableBytesMinPrefix(ByteComparable.Version version) { - return primaryKey.asComparableBytesMinPrefix(version); + return primaryKey().asComparableBytesMinPrefix(version); } @Override public ByteSource asComparableBytesMaxPrefix(ByteComparable.Version version) { - return primaryKey.asComparableBytesMaxPrefix(version); + return primaryKey().asComparableBytesMaxPrefix(version); } @Override @@ -101,7 +123,13 @@ public int compareTo(PrimaryKey o) if (sourceSstableId.equals(other.sourceSstableId)) return Long.compare(sourceRowId, other.sourceRowId); } - return primaryKey.compareTo(o); + + if (sourceSstableMinKey.compareTo(o) > 0) + return 1; + else if (sourceSstableMaxKey.compareTo(o) < 0) + return -1; + else + return primaryKey().compareTo(o); } @Override @@ -110,21 +138,31 @@ public boolean equals(Object o) if (o instanceof PrimaryKeyWithSource) { var other = (PrimaryKeyWithSource) o; + // If they are from the same source sstable, we can compare the row ids directly. if (sourceSstableId.equals(other.sourceSstableId)) return sourceRowId == other.sourceRowId; } - return primaryKey.equals(o); + + if (!(o instanceof PrimaryKey)) + return false; + + var other = (PrimaryKey) o; + // If the source sstable does not contain the PrimaryKey, the keys cannot be equal. + if (sourceSstableMinKey.compareTo(other) > 0 || sourceSstableMaxKey.compareTo(other) < 0) + return false; + + return primaryKey().equals(other); } @Override public int hashCode() { - return primaryKey.hashCode(); + return primaryKey().hashCode(); } @Override public String toString() { - return String.format("%s (source sstable: %s, %s)", primaryKey, sourceSstableId, sourceRowId); + return String.format("%s (source sstable: %s, %s)", primaryKey(), sourceSstableId, sourceRowId); } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java index 00361d6f27ab..141a3a3728b6 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java @@ -36,6 +36,7 @@ import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.disk.ModernResettableByteBuffersIndexOutput; import org.apache.cassandra.index.sai.disk.PostingList; +import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource; import org.apache.cassandra.index.sai.disk.format.IndexComponentType; import org.apache.cassandra.index.sai.disk.format.Version; import org.apache.cassandra.index.sai.disk.io.IndexInput; @@ -44,6 +45,7 @@ import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.TypeUtil; +import org.apache.cassandra.io.sstable.SSTableId; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.bytecomparable.ByteComparable; @@ -134,7 +136,7 @@ public class SegmentMetadata implements Comparable private static final Logger logger = LoggerFactory.getLogger(SegmentMetadata.class); @SuppressWarnings("resource") - private SegmentMetadata(IndexInput input, IndexContext context, Version version) throws IOException + private SegmentMetadata(IndexInput input, IndexContext context, Version version, SSTableId sstableId) throws IOException { PrimaryKey.Factory primaryKeyFactory = context.keyFactory(); AbstractType termsType = context.getValidator(); @@ -144,8 +146,10 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version) this.numRows = input.readLong(); this.minSSTableRowId = input.readLong(); this.maxSSTableRowId = input.readLong(); - this.minKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); - this.maxKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); + PrimaryKey min = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); + PrimaryKey max = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); + this.minKey = new PrimaryKeyWithSource(min, sstableId, -1, min, max); + this.maxKey = new PrimaryKeyWithSource(max, sstableId, Long.MAX_VALUE, min, max); this.minTerm = readBytes(input); this.maxTerm = readBytes(input); TermsDistribution td = null; @@ -164,7 +168,7 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version) } @SuppressWarnings("resource") - public static List load(MetadataSource source, IndexContext context) throws IOException + public static List load(MetadataSource source, IndexContext context, SSTableId sstableId) throws IOException { IndexInput input = source.get(NAME); @@ -175,7 +179,7 @@ public static List load(MetadataSource source, IndexContext con for (int i = 0; i < segmentCount; i++) { - segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion())); + segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion(), sstableId)); } return segmentMetadata; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java index b7b0ef0222f3..1eb72e25dcfa 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java @@ -41,6 +41,7 @@ import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; import org.apache.cassandra.index.sai.utils.PrimaryKey; +import org.apache.cassandra.index.sai.utils.PrimaryKeyListUtil; import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -87,7 +88,7 @@ public V1SearchableIndex(SSTableContext sstableContext, IndexComponents.ForRead final MetadataSource source = MetadataSource.loadMetadata(perIndexComponents); - metadatas = SegmentMetadata.load(source, indexContext); + metadatas = SegmentMetadata.load(source, indexContext, sstableContext.sstable().getId()); for (SegmentMetadata metadata : metadatas) { @@ -230,7 +231,7 @@ public List> orderResultsBy(QueryContex for (Segment segment : segments) { // Only pass the primary keys in a segment's range to the segment index. - var segmentKeys = getKeysInRange(keys, segment); + var segmentKeys = PrimaryKeyListUtil.getKeysInRange(keys, segment.metadata.minKey, segment.metadata.maxKey); var segmentLimit = segment.proportionalAnnLimit(limit, totalRows); results.add(segment.orderResultsBy(context, segmentKeys, orderer, segmentLimit)); } @@ -287,37 +288,6 @@ public long estimateMatchingRowsCount(Expression predicate, AbstractBounds getKeysInRange(List keys, Segment segment) - { - int minIndex = findBoundaryIndex(keys, segment, true); - int maxIndex = findBoundaryIndex(keys, segment, false); - return keys.subList(minIndex, maxIndex); - } - - private int findBoundaryIndex(List keys, Segment segment, boolean findMin) - { - // The minKey and maxKey are sometimes just partition keys (not primary keys), so binarySearch - // may not return the index of the least/greatest match. - var key = findMin ? segment.metadata.minKey : segment.metadata.maxKey; - int index = Collections.binarySearch(keys, key); - if (index < 0) - return -index - 1; - if (findMin) - { - while (index > 0 && keys.get(index - 1).equals(key)) - index--; - } - else - { - while (index < keys.size() - 1 && keys.get(index + 1).equals(key)) - index++; - // We must include the PrimaryKey at the boundary - index++; - } - return index; - } - @Override public void close() throws IOException { diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/VectorMemtableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/vector/VectorMemtableIndex.java index 25231748149f..7cbbfa97bcb6 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/VectorMemtableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/VectorMemtableIndex.java @@ -58,6 +58,7 @@ import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; import org.apache.cassandra.index.sai.utils.PrimaryKey; +import org.apache.cassandra.index.sai.utils.PrimaryKeyListUtil; import org.apache.cassandra.index.sai.utils.PrimaryKeyWithScore; import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; import org.apache.cassandra.index.sai.utils.RangeUtil; @@ -288,10 +289,10 @@ public CloseableIterator orderResultsBy(QueryContext cont // Compute the keys that exist in the current memtable and their corresponding graph ordinals var keysInGraph = new HashSet(); var relevantOrdinals = new IntHashSet(); - keys.stream() - .dropWhile(k -> k.compareTo(minimumKey) < 0) - .takeWhile(k -> k.compareTo(maximumKey) <= 0) - .forEach(k -> + + var keysInRange = PrimaryKeyListUtil.getKeysInRange(keys, minimumKey, maximumKey); + + keysInRange.forEach(k -> { var v = graph.vectorForKey(k); if (v == null) diff --git a/src/java/org/apache/cassandra/index/sai/utils/PrimaryKeyListUtil.java b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKeyListUtil.java new file mode 100644 index 000000000000..3f304432d752 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKeyListUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.utils; + +import java.util.Collections; +import java.util.List; + +public class PrimaryKeyListUtil +{ + + /** Create a sublist of the keys within (inclusive) the segment's bounds */ + public static List getKeysInRange(List keys, PrimaryKey minKey, PrimaryKey maxKey) + { + int minIndex = PrimaryKeyListUtil.findBoundaryIndex(keys, minKey, true); + int maxIndex = PrimaryKeyListUtil.findBoundaryIndex(keys, maxKey, false); + return keys.subList(minIndex, maxIndex); + } + + private static int findBoundaryIndex(List keys, PrimaryKey key, boolean findMin) + { + // The minKey and maxKey are sometimes just partition keys (not primary keys), so binarySearch + // may not return the index of the least/greatest match. + int index = Collections.binarySearch(keys, key); + if (index < 0) + return -index - 1; + if (findMin) + { + while (index > 0 && keys.get(index - 1).equals(key)) + index--; + } + else + { + while (index < keys.size() - 1 && keys.get(index + 1).equals(key)) + index++; + // Because we use this for subList, we need to return the index of the next element + index++; + } + return index; + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/LegacyOnDiskFormatTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/LegacyOnDiskFormatTest.java index 322e485bb6f1..9ea0b54482fb 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/LegacyOnDiskFormatTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/LegacyOnDiskFormatTest.java @@ -134,7 +134,7 @@ public void canReadPerIndexMetadata() throws Throwable IndexComponents.ForRead components = indexDescriptor.perIndexComponents(intContext); final MetadataSource source = MetadataSource.loadMetadata(components); - List metadatas = SegmentMetadata.load(source, intContext); + List metadatas = SegmentMetadata.load(source, intContext, null); assertEquals(1, metadatas.size()); assertEquals(100, metadatas.get(0).numRows); @@ -167,7 +167,7 @@ public void canSearchBDKIndex() throws Throwable final MetadataSource source = MetadataSource.loadMetadata(components); - List metadatas = SegmentMetadata.load(source, intContext); + List metadatas = SegmentMetadata.load(source, intContext, null); BKDReader bkdReader = new BKDReader(intContext, components.get(IndexComponentType.KD_TREE).createFileHandle(), @@ -188,7 +188,7 @@ public void canSearchTermsIndex() throws Throwable final MetadataSource source = MetadataSource.loadMetadata(components); - SegmentMetadata metadata = SegmentMetadata.load(source, textContext).get(0); + SegmentMetadata metadata = SegmentMetadata.load(source, textContext, null).get(0); long root = metadata.getIndexRoot(IndexComponentType.TERMS_DATA); Map map = metadata.componentMetadatas.get(IndexComponentType.TERMS_DATA).attributes; diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java index f6e9fbaaa95f..f22d7086e690 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java @@ -176,7 +176,7 @@ private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int MetadataSource source = MetadataSource.loadMetadata(components); // verify segment count - List segmentMetadatas = SegmentMetadata.load(source, indexContext); + List segmentMetadatas = SegmentMetadata.load(source, indexContext, null); assertEquals(segments, segmentMetadatas.size()); // verify segment metadata From 2c69a27ceee30ed57d91374ccca566eb8087040a Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 14 Jan 2025 13:47:42 -0600 Subject: [PATCH 2/9] Fix PrimaryKeyWithSource for min/max row in segment --- .../apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java | 2 +- .../apache/cassandra/index/sai/disk/v1/SegmentMetadata.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java index 5710a49090cf..51873976a7f3 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java @@ -163,6 +163,6 @@ public int hashCode() @Override public String toString() { - return String.format("%s (source sstable: %s, %s)", primaryKey(), sourceSstableId, sourceRowId); + return String.format("%s (source sstable: %s, %s)", delegatePrimaryKey, sourceSstableId, sourceRowId); } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java index 141a3a3728b6..7188a48e8bf6 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java @@ -148,8 +148,8 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version, this.maxSSTableRowId = input.readLong(); PrimaryKey min = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); PrimaryKey max = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); - this.minKey = new PrimaryKeyWithSource(min, sstableId, -1, min, max); - this.maxKey = new PrimaryKeyWithSource(max, sstableId, Long.MAX_VALUE, min, max); + this.minKey = new PrimaryKeyWithSource(min, sstableId, minSSTableRowId, min, max); + this.maxKey = new PrimaryKeyWithSource(max, sstableId, maxSSTableRowId, min, max); this.minTerm = readBytes(input); this.maxTerm = readBytes(input); TermsDistribution td = null; From e5febf43e85c72027c3f19e16275b22dd2d52c5e Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 14 Jan 2025 21:43:32 -0600 Subject: [PATCH 3/9] Load fully qualified PrK for SegmentMetadata extrema The bug here was caught by testHybridSearchHoleInClusteringColumnOrdering. It failed because the PartitionKey-only primary key said it was equal to one key but not another key because of the order of operations in the PrimaryKeyListUtil logic. One solution could have been to update the way that findBoundaryIndex works, but I think a more robust solution is to just load a fully qualified min/max PrK since those are used for boundary checks anyway. --- .../index/sai/disk/v1/SegmentMetadata.java | 42 +++++++++++++++---- .../index/sai/disk/v1/V1SearchableIndex.java | 2 +- .../index/sai/cql/VectorHybridSearchTest.java | 1 + 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java index 7188a48e8bf6..d6492986b101 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java @@ -32,8 +32,10 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.index.sai.IndexContext; +import org.apache.cassandra.index.sai.SSTableContext; import org.apache.cassandra.index.sai.disk.ModernResettableByteBuffersIndexOutput; import org.apache.cassandra.index.sai.disk.PostingList; import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource; @@ -136,9 +138,8 @@ public class SegmentMetadata implements Comparable private static final Logger logger = LoggerFactory.getLogger(SegmentMetadata.class); @SuppressWarnings("resource") - private SegmentMetadata(IndexInput input, IndexContext context, Version version, SSTableId sstableId) throws IOException + private SegmentMetadata(IndexInput input, IndexContext context, Version version, SSTableContext sstableContext) throws IOException { - PrimaryKey.Factory primaryKeyFactory = context.keyFactory(); AbstractType termsType = context.getValidator(); this.version = version; @@ -146,10 +147,35 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version, this.numRows = input.readLong(); this.minSSTableRowId = input.readLong(); this.maxSSTableRowId = input.readLong(); - PrimaryKey min = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); - PrimaryKey max = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); - this.minKey = new PrimaryKeyWithSource(min, sstableId, minSSTableRowId, min, max); - this.maxKey = new PrimaryKeyWithSource(max, sstableId, maxSSTableRowId, min, max); + + // Read the real min and max partition from the segment metadata. + DecoratedKey minPartition = DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)); + DecoratedKey maxPartition = DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)); + + // Get the fully qualified PrimaryKey min and max objects to ensure that we skip several edge cases related + // to possibly confusing equality semantics where PrimaryKeyWithSource slightly diverges from PrimaryKey where + // PrimaryKey is just a partition key without a materializable clustering key. + PrimaryKey min, max; + if (sstableContext.sstable.metadata().comparator.size() > 0) + { + try (var pkm = sstableContext.primaryKeyMapFactory().newPerSSTablePrimaryKeyMap()) + { + min = pkm.primaryKeyFromRowId(minSSTableRowId); + max = pkm.primaryKeyFromRowId(maxSSTableRowId); + assert min.partitionKey().equals(minPartition) : String.format("Min partition key mismatch: %s != %s", min, minPartition); + assert max.partitionKey().equals(maxPartition) : String.format("Max partition key mismatch: %s != %s", max, maxPartition); + } + } + else + { + var primaryKeyFactory = context.keyFactory(); + min = primaryKeyFactory.createPartitionKeyOnly(minPartition); + max = primaryKeyFactory.createPartitionKeyOnly(maxPartition); + } + + this.minKey = new PrimaryKeyWithSource(min, sstableContext.sstable.getId(), minSSTableRowId, min, max); + this.maxKey = new PrimaryKeyWithSource(max, sstableContext.sstable.getId(), maxSSTableRowId, min, max); + this.minTerm = readBytes(input); this.maxTerm = readBytes(input); TermsDistribution td = null; @@ -168,7 +194,7 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version, } @SuppressWarnings("resource") - public static List load(MetadataSource source, IndexContext context, SSTableId sstableId) throws IOException + public static List load(MetadataSource source, IndexContext context, SSTableContext sstableContext) throws IOException { IndexInput input = source.get(NAME); @@ -179,7 +205,7 @@ public static List load(MetadataSource source, IndexContext con for (int i = 0; i < segmentCount; i++) { - segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion(), sstableId)); + segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion(), sstableContext)); } return segmentMetadata; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java index 1eb72e25dcfa..d763358d60b6 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java @@ -88,7 +88,7 @@ public V1SearchableIndex(SSTableContext sstableContext, IndexComponents.ForRead final MetadataSource source = MetadataSource.loadMetadata(perIndexComponents); - metadatas = SegmentMetadata.load(source, indexContext, sstableContext.sstable().getId()); + metadatas = SegmentMetadata.load(source, indexContext, sstableContext); for (SegmentMetadata metadata : metadatas) { diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorHybridSearchTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorHybridSearchTest.java index b5952d8ce348..d1f7b6c766be 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorHybridSearchTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorHybridSearchTest.java @@ -105,6 +105,7 @@ public void testHybridSearchSequentialClusteringColumns() throws Throwable public void testHybridSearchHoleInClusteringColumnOrdering() throws Throwable { setMaxBruteForceRows(0); + QueryController.QUERY_OPT_LEVEL = 0; createTable(KEYSPACE, "CREATE TABLE %s (pk int, a int, val text, vec vector, PRIMARY KEY(pk, a))"); createIndex("CREATE CUSTOM INDEX ON %s(vec) USING 'StorageAttachedIndex'"); createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex'"); From 2ae0895823cfcff018457897978789f2ec10af97 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 14 Jan 2025 22:54:28 -0600 Subject: [PATCH 4/9] Remove unnecessary comparison logic Now that we have fully qualified PrK objects as boundaries, we can remove the check for the entry at the next index. --- .../index/sai/utils/PrimaryKeyListUtil.java | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/utils/PrimaryKeyListUtil.java b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKeyListUtil.java index 3f304432d752..d3e041c5f500 100644 --- a/src/java/org/apache/cassandra/index/sai/utils/PrimaryKeyListUtil.java +++ b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKeyListUtil.java @@ -24,33 +24,25 @@ public class PrimaryKeyListUtil { - /** Create a sublist of the keys within (inclusive) the segment's bounds */ + /** Create a sublist of the keys within the provided bounds (inclusive) */ public static List getKeysInRange(List keys, PrimaryKey minKey, PrimaryKey maxKey) { - int minIndex = PrimaryKeyListUtil.findBoundaryIndex(keys, minKey, true); - int maxIndex = PrimaryKeyListUtil.findBoundaryIndex(keys, maxKey, false); + int minIndex = PrimaryKeyListUtil.findBoundaryIndex(keys, minKey, false); + int maxIndex = PrimaryKeyListUtil.findBoundaryIndex(keys, maxKey, true); return keys.subList(minIndex, maxIndex); } - private static int findBoundaryIndex(List keys, PrimaryKey key, boolean findMin) + private static int findBoundaryIndex(List keys, PrimaryKey key, boolean findMax) { - // The minKey and maxKey are sometimes just partition keys (not primary keys), so binarySearch - // may not return the index of the least/greatest match. int index = Collections.binarySearch(keys, key); + if (index < 0) return -index - 1; - if (findMin) - { - while (index > 0 && keys.get(index - 1).equals(key)) - index--; - } - else - { - while (index < keys.size() - 1 && keys.get(index + 1).equals(key)) - index++; - // Because we use this for subList, we need to return the index of the next element + + if (findMax) + // We use this value for the subList upper bound, which is exclusive, so increment by 1. index++; - } + return index; } } From 409afb0e7b8492e8465e054f27c0d5198bfefc58 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 16 Jan 2025 23:17:15 -0600 Subject: [PATCH 5/9] Ensure PrK is loaded to allow closure of PrKMap --- .../apache/cassandra/index/sai/disk/v1/SegmentMetadata.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java index d6492986b101..5f71fa592db4 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java @@ -160,8 +160,10 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version, { try (var pkm = sstableContext.primaryKeyMapFactory().newPerSSTablePrimaryKeyMap()) { - min = pkm.primaryKeyFromRowId(minSSTableRowId); - max = pkm.primaryKeyFromRowId(maxSSTableRowId); + // We need to load eagerly to allow us to close the partition key map. Otherwise, all tests will + // pass due to the side effect of calling partitionKey(), but it'll fail when you remove the -ea flag. + min = pkm.primaryKeyFromRowId(minSSTableRowId).loadDeferred(); + max = pkm.primaryKeyFromRowId(maxSSTableRowId).loadDeferred(); assert min.partitionKey().equals(minPartition) : String.format("Min partition key mismatch: %s != %s", min, minPartition); assert max.partitionKey().equals(maxPartition) : String.format("Max partition key mismatch: %s != %s", max, maxPartition); } From 312e1b3fec607dba6b577711a5eea6e8329a9f28 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 17 Jan 2025 11:59:25 -0600 Subject: [PATCH 6/9] Fix source sstable key comparison logic; skip skipTo call with min token --- .../index/sai/disk/PrimaryKeyWithSource.java | 14 ++++++++------ .../cassandra/index/sai/plan/QueryController.java | 6 ++++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java index 51873976a7f3..19640689d4eb 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java @@ -122,14 +122,16 @@ public int compareTo(PrimaryKey o) var other = (PrimaryKeyWithSource) o; if (sourceSstableId.equals(other.sourceSstableId)) return Long.compare(sourceRowId, other.sourceRowId); + // Compare to the other source sstable's min and max keys to determine if the keys are comparable. + // Note that these are already loaded into memory as part of the segment's metadata, so they comparison + // is cheaper than loading the actual keys. + if (sourceSstableMinKey.compareTo(other.sourceSstableMaxKey) > 0) + return 1; + if (sourceSstableMaxKey.compareTo(other.sourceSstableMinKey) < 0) + return -1; } - if (sourceSstableMinKey.compareTo(o) > 0) - return 1; - else if (sourceSstableMaxKey.compareTo(o) < 0) - return -1; - else - return primaryKey().compareTo(o); + return primaryKey().compareTo(o); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java index 20191fb8d830..1f6f5c29514e 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java @@ -627,8 +627,10 @@ public CloseableIterator getTopKRows(KeyRangeIterator sou */ private List materializeKeys(KeyRangeIterator source) { - // Skip to the first key in the range - source.skipTo(primaryKeyFactory().createTokenOnly(mergeRange.left.getToken())); + // Skip to the first key in the range if it is not the minimum token + if (!mergeRange.left.isMinimum()) + source.skipTo(firstPrimaryKey); + if (!source.hasNext()) return List.of(); From 20043f224e15a8d1b928365d1a7cbd2dd1385a53 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 17 Jan 2025 13:58:29 -0600 Subject: [PATCH 7/9] Optimize sai union iterator for non-overlapping case --- .../sai/iterators/KeyRangeUnionIterator.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeUnionIterator.java b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeUnionIterator.java index bdb27028a7fe..b5ed9bec0be2 100644 --- a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeUnionIterator.java +++ b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeUnionIterator.java @@ -163,8 +163,19 @@ protected KeyRangeIterator buildIterator() return rangeIterators.get(0); default: - //TODO Need to test whether an initial sort improves things - return new KeyRangeUnionIterator(statistics, rangeIterators); + rangeIterators.sort((a, b) -> a.getMinimum().compareTo(b.getMinimum())); + boolean isDisjoint = true; + for (int i = 0; i < rangeIterators.size() - 1; i++) + { + if (rangeIterators.get(i).getMaximum().compareTo(rangeIterators.get(i + 1).getMinimum()) > 0) + { + isDisjoint = false; + break; + } + } + // If the iterators are not overlapping, then we can use the concat iterator which is more efficient + return isDisjoint ? new KeyRangeConcatIterator(statistics, rangeIterators) + : new KeyRangeUnionIterator(statistics, rangeIterators); } } } From 0955f4a1ce5de2400a4b5d1b089607b1b2421770 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 17 Jan 2025 15:02:33 -0600 Subject: [PATCH 8/9] Temp fix to workaround SegmentMetadata inconsistencies The min/max partition key is not always a pointer to the segment metadata's min/max sstable row id. I thought it was. Removing that assertion. --- .../index/sai/disk/v1/SegmentMetadata.java | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java index 5f71fa592db4..146de5ea308e 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java @@ -148,31 +148,21 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version, this.minSSTableRowId = input.readLong(); this.maxSSTableRowId = input.readLong(); - // Read the real min and max partition from the segment metadata. - DecoratedKey minPartition = DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)); - DecoratedKey maxPartition = DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)); + // The next values are the min/max partition keys. As a tempory test, we are skipping them because they are + // not as useful as the min/max row ids, which are always correct for both flushed and compacted sstables. + skipBytes(input); + skipBytes(input); // Get the fully qualified PrimaryKey min and max objects to ensure that we skip several edge cases related // to possibly confusing equality semantics where PrimaryKeyWithSource slightly diverges from PrimaryKey where // PrimaryKey is just a partition key without a materializable clustering key. - PrimaryKey min, max; - if (sstableContext.sstable.metadata().comparator.size() > 0) + final PrimaryKey min, max; + try (var pkm = sstableContext.primaryKeyMapFactory().newPerSSTablePrimaryKeyMap()) { - try (var pkm = sstableContext.primaryKeyMapFactory().newPerSSTablePrimaryKeyMap()) - { - // We need to load eagerly to allow us to close the partition key map. Otherwise, all tests will - // pass due to the side effect of calling partitionKey(), but it'll fail when you remove the -ea flag. - min = pkm.primaryKeyFromRowId(minSSTableRowId).loadDeferred(); - max = pkm.primaryKeyFromRowId(maxSSTableRowId).loadDeferred(); - assert min.partitionKey().equals(minPartition) : String.format("Min partition key mismatch: %s != %s", min, minPartition); - assert max.partitionKey().equals(maxPartition) : String.format("Max partition key mismatch: %s != %s", max, maxPartition); - } - } - else - { - var primaryKeyFactory = context.keyFactory(); - min = primaryKeyFactory.createPartitionKeyOnly(minPartition); - max = primaryKeyFactory.createPartitionKeyOnly(maxPartition); + // We need to load eagerly to allow us to close the partition key map. Otherwise, all tests will + // pass due to the side effect of calling partitionKey(), but it'll fail when you remove the -ea flag. + min = pkm.primaryKeyFromRowId(minSSTableRowId).loadDeferred(); + max = pkm.primaryKeyFromRowId(maxSSTableRowId).loadDeferred(); } this.minKey = new PrimaryKeyWithSource(min, sstableContext.sstable.getId(), minSSTableRowId, min, max); @@ -332,6 +322,12 @@ private static ByteBuffer readBytes(IndexInput input) throws IOException return ByteBuffer.wrap(bytes); } + private static void skipBytes(IndexInput input) throws IOException + { + int len = input.readVInt(); + input.skipBytes(len); + } + static void writeBytes(ByteBuffer buf, IndexOutput out) { try From fbfc9e34b6bfd27826dc72b0bf8a828f2be574ec Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 21 Jan 2025 15:36:20 -0600 Subject: [PATCH 9/9] Improve PrimaryKeyWithSource#equals impl --- .../index/sai/disk/PrimaryKeyWithSource.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java index 19640689d4eb..cc1b4ee5e215 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java @@ -143,17 +143,14 @@ public boolean equals(Object o) // If they are from the same source sstable, we can compare the row ids directly. if (sourceSstableId.equals(other.sourceSstableId)) return sourceRowId == other.sourceRowId; - } - - if (!(o instanceof PrimaryKey)) - return false; - var other = (PrimaryKey) o; - // If the source sstable does not contain the PrimaryKey, the keys cannot be equal. - if (sourceSstableMinKey.compareTo(other) > 0 || sourceSstableMaxKey.compareTo(other) < 0) - return false; + // If the source sstable does not contain the PrimaryKey, the keys cannot be equal. + if (sourceSstableMinKey.compareTo(other.sourceSstableMaxKey) > 0 + || sourceSstableMaxKey.compareTo(other.sourceSstableMinKey) < 0) + return false; + } - return primaryKey().equals(other); + return primaryKey().equals(o); } @Override