diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java b/src/java/org/apache/cassandra/index/sai/IndexContext.java index 94b8cd1afed1..44afc8abbae0 100644 --- a/src/java/org/apache/cassandra/index/sai/IndexContext.java +++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java @@ -19,11 +19,9 @@ package org.apache.cassandra.index.sai; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -77,9 +75,7 @@ import org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics; import org.apache.cassandra.index.sai.metrics.IndexMetrics; import org.apache.cassandra.index.sai.plan.Expression; -import org.apache.cassandra.index.sai.plan.Orderer; import org.apache.cassandra.index.sai.utils.PrimaryKey; -import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.index.sai.view.IndexViewManager; import org.apache.cassandra.index.sai.view.View; @@ -88,7 +84,6 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.Pair; @@ -442,9 +437,9 @@ public MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker) // but they are not a problem as post-filtering would get rid of them. // The keys matched in other indexes cannot be safely subtracted // as indexes may contain false positives caused by deletes and updates. - private KeyRangeIterator getNonEqIterator(QueryContext context, Expression expression, AbstractBounds keyRange) + private KeyRangeIterator getNonEqIterator(QueryContext context, Expression expression, AbstractBounds keyRange, boolean isNonReducing) { - KeyRangeIterator allKeys = scanMemtable(keyRange); + KeyRangeIterator allKeys = scanMemtable(keyRange, isNonReducing); if (TypeUtil.supportsRounding(expression.validator)) { return allKeys; @@ -452,16 +447,16 @@ private KeyRangeIterator getNonEqIterator(QueryContext context, Expression expre else { Expression negExpression = expression.negated(); - KeyRangeIterator matchedKeys = searchMemtable(context, negExpression, keyRange, Integer.MAX_VALUE); + KeyRangeIterator matchedKeys = searchMemtable(context, negExpression, keyRange, isNonReducing, Integer.MAX_VALUE); return KeyRangeAntiJoinIterator.create(allKeys, matchedKeys); } } - public KeyRangeIterator searchMemtable(QueryContext context, Expression expression, AbstractBounds keyRange, int limit) + public KeyRangeIterator searchMemtable(QueryContext context, Expression expression, AbstractBounds keyRange, boolean isNonReducing, int limit) { if (expression.getOp().isNonEquality()) { - return getNonEqIterator(context, expression, keyRange); + return getNonEqIterator(context, expression, keyRange, isNonReducing); } Collection memtables = liveMemtables.values(); @@ -471,7 +466,7 @@ public KeyRangeIterator searchMemtable(QueryContext context, Expression expressi return KeyRangeIterator.empty(); } - KeyRangeUnionIterator.Builder builder = KeyRangeUnionIterator.builder(); + KeyRangeUnionIterator.Builder builder = KeyRangeUnionIterator.builder(isNonReducing); try { @@ -489,7 +484,7 @@ public KeyRangeIterator searchMemtable(QueryContext context, Expression expressi } } - private KeyRangeIterator scanMemtable(AbstractBounds keyRange) + private KeyRangeIterator scanMemtable(AbstractBounds keyRange, boolean isNonReducing) { Collection memtables = liveMemtables.keySet(); if (memtables.isEmpty()) @@ -497,7 +492,7 @@ private KeyRangeIterator scanMemtable(AbstractBounds keyRange return KeyRangeIterator.empty(); } - KeyRangeIterator.Builder builder = KeyRangeUnionIterator.builder(memtables.size()); + KeyRangeIterator.Builder builder = KeyRangeUnionIterator.builder(memtables.size(), isNonReducing); try { @@ -516,29 +511,6 @@ private KeyRangeIterator scanMemtable(AbstractBounds keyRange } } - // Search all memtables for all PrimaryKeys in list. - public List> orderResultsBy(QueryContext context, List source, Orderer orderer, int limit) - { - Collection memtables = liveMemtables.values(); - - if (memtables.isEmpty()) - return List.of(); - - List> result = new ArrayList<>(memtables.size()); - try - { - for (MemtableIndex index : memtables) - result.add(index.orderResultsBy(context, source, orderer, limit)); - - return result; - } - catch (Exception ex) - { - FileUtils.closeQuietly(result); - throw ex; - } - } - public long liveMemtableWriteCount() { return liveMemtables.values().stream().mapToLong(MemtableIndex::writeCount).sum(); diff --git a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java index c885f672bf2c..aaae1da90ecb 100644 --- a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java @@ -93,7 +93,7 @@ private static SearchableIndex createSearchableIndex(SSTableContext sstableConte if (CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.getBoolean()) { logger.info("Creating dummy (empty) index searcher for sstable {} as SAI index reads are disabled", sstableContext.sstable.descriptor); - return new EmptyIndex(); + return new EmptyIndex(null); } return perIndexComponents.onDiskFormat().newSearchableIndex(sstableContext, perIndexComponents); @@ -230,11 +230,11 @@ public KeyRangeIterator search(Expression expression, int limit) throws IOException { if (expression.getOp().isNonEquality()) - { return getNonEqIterator(expression, keyRange, context, defer); - } - - return searchableIndex.search(expression, keyRange, context, defer, limit); + else if (expression.getOp() == Expression.Op.IS_NULL) + return searchableIndex.searchNulls(keyRange, context); + else + return searchableIndex.search(expression, keyRange, context, defer, limit); } public List> orderBy(Orderer orderer, @@ -332,9 +332,9 @@ public int hashCode() return Objects.hashCode(sstableContext, indexContext); } - public List> orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, long totalRows) throws IOException + public List> orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, long totalRows, boolean canSkipOutOfWindowPKs) throws IOException { - return searchableIndex.orderResultsBy(context, keys, orderer, limit, totalRows); + return searchableIndex.orderResultsBy(context, keys, orderer, limit, totalRows, canSkipOutOfWindowPKs); } public String toString() diff --git a/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java b/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java index 501d2c9833ed..97c6e2f1aa84 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java @@ -27,6 +27,7 @@ import org.apache.cassandra.db.virtual.SimpleDataSet; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.sai.QueryContext; +import org.apache.cassandra.index.sai.SSTableContext; import org.apache.cassandra.index.sai.disk.v1.Segment; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; @@ -38,6 +39,13 @@ public class EmptyIndex implements SearchableIndex { + private final SSTableContext sstableContext; + + public EmptyIndex(SSTableContext sstableContext) + { + this.sstableContext = sstableContext; + } + @Override public long indexFileCacheSize() { @@ -96,6 +104,15 @@ public KeyRangeIterator search(Expression expression, return KeyRangeIterator.empty(); } + @Override + public KeyRangeIterator searchNulls(AbstractBounds keyRange, QueryContext context) throws IOException + { + // If an index is empty, then all the rows were null. + // TODO test the pathological case where we have a large sstable with an empty column. + // In that case, we might have a performance regression here. + return PrimaryKeyMapIterator.create(sstableContext, keyRange); + } + @Override public List> orderBy(Orderer orderer, Expression slice, @@ -136,7 +153,7 @@ public void close() throws IOException } @Override - public List> orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, long totalRows) throws IOException + public List> orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, long totalRows, boolean canSkipOutOfWindowPKs) throws IOException { return List.of(); } diff --git a/src/java/org/apache/cassandra/index/sai/disk/PostingListKeyRangeIterator.java b/src/java/org/apache/cassandra/index/sai/disk/PostingListKeyRangeIterator.java index d500225a6fd7..8cbc323f1a8c 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PostingListKeyRangeIterator.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PostingListKeyRangeIterator.java @@ -116,7 +116,7 @@ protected PrimaryKey computeNext() return endOfData(); var primaryKey = primaryKeyMap.primaryKeyFromRowId(rowId); - return new PrimaryKeyWithSource(primaryKey, primaryKeyMap.getSSTableId(), rowId); + return new PrimaryKeyWithSource(primaryKey, primaryKeyMap.getSSTableId(), rowId, primaryKeyMap.getMinTimestamp(), primaryKeyMap.getMaxTimestamp()); } catch (Throwable t) { @@ -163,7 +163,7 @@ private long getNextRowId() throws IOException { long targetSstableRowId; if (skipToToken instanceof PrimaryKeyWithSource - && ((PrimaryKeyWithSource) skipToToken).getSourceSstableId().equals(primaryKeyMap.getSSTableId())) + && ((PrimaryKeyWithSource) skipToToken).matchesSource(primaryKeyMap.getSSTableId())) { targetSstableRowId = ((PrimaryKeyWithSource) skipToToken).getSourceRowId(); } diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java index 2b56c8dc3d6a..a0a1f7394804 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java @@ -76,6 +76,18 @@ default void close() throws IOException */ SSTableId getSSTableId(); + /** + * Returns the minimum timestamp of the for the sstable associated with this {@link PrimaryKeyMap} + * @return the minimum timestamp + */ + long getMinTimestamp(); + + /** + * Returns the maximum timestamp of the for the sstable associated with this {@link PrimaryKeyMap} + * @return the maximum timestamp + */ + long getMaxTimestamp(); + /** * Returns a {@link PrimaryKey} for a row Id * diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMapIterator.java b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMapIterator.java index b8f7e87d58d3..2cc3ff5eba72 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMapIterator.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMapIterator.java @@ -54,15 +54,19 @@ private enum KeyFilter private final PrimaryKeyMap keys; private final KeyFilter filter; + private final long minTimestamp; + private final long maxTimestamp; private long currentRowId; - private PrimaryKeyMapIterator(PrimaryKeyMap keys, PrimaryKey min, PrimaryKey max, long startRowId, KeyFilter filter) + private PrimaryKeyMapIterator(PrimaryKeyMap keys, PrimaryKey min, PrimaryKey max, long startRowId, KeyFilter filter, long minTimestamp, long maxTimestamp) { super(min, max, keys.count()); this.keys = keys; this.filter = filter; this.currentRowId = startRowId; + this.minTimestamp = minTimestamp; + this.maxTimestamp = maxTimestamp; } public static KeyRangeIterator create(SSTableContext ctx, AbstractBounds keyRange) throws IOException @@ -76,16 +80,12 @@ public static KeyRangeIterator create(SSTableContext ctx, AbstractBounds= 0 && currentRowId < keys.count()) { - PrimaryKey key = keys.primaryKeyFromRowId(currentRowId++); + long rowId = currentRowId++; + PrimaryKey key = keys.primaryKeyFromRowId(rowId); if (filter == KeyFilter.KEYS_WITH_CLUSTERING && key.hasEmptyClustering()) continue; - return key; + return new PrimaryKeyWithSource(key, keys.getSSTableId(), rowId, minTimestamp, maxTimestamp); } return endOfData(); } diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java index 7f9f7e89f51e..7375e6f833ec 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java @@ -20,6 +20,7 @@ import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.io.sstable.SSTableId; @@ -29,25 +30,47 @@ public class PrimaryKeyWithSource implements PrimaryKey { private final PrimaryKey primaryKey; - private final SSTableId sourceSstableId; + private final SSTableId source; private final long sourceRowId; + private final long ssTableMinTimestamp; + private final long ssTableMaxTimestamp; - public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId sstableId, long sstableRowId) + public PrimaryKeyWithSource(PrimaryKey primaryKey, Memtable memtable) { assert primaryKey != null : "Cannot construct a PrimaryKeyWithSource with a null primaryKey"; this.primaryKey = primaryKey; - this.sourceSstableId = sstableId; + // We don't need to store the source for memtables because their lookups are generally cheap. + // We use the memtable to get the min timestamp for the relevant min timestamp. + this.source = null; + this.sourceRowId = -1; // memtables don't have row ids + this.ssTableMinTimestamp = memtable.getMinTimestamp(); + this.ssTableMaxTimestamp = Long.MAX_VALUE; // memtables don't have max timestamps + } + + public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId sstableId, long sstableRowId, long sstableMinTimestamp, long sstableMaxTimestamp) + { + assert primaryKey != null : "Cannot construct a PrimaryKeyWithSource with a null primaryKey"; + this.primaryKey = primaryKey; + this.source = sstableId; this.sourceRowId = sstableRowId; + this.ssTableMinTimestamp = sstableMinTimestamp; + this.ssTableMaxTimestamp = sstableMaxTimestamp; } public long getSourceRowId() { + assert sourceRowId >= 0 : "Source row id is not set"; return sourceRowId; } - public SSTableId getSourceSstableId() + public boolean matchesSource(SSTableId sstableId) + { + return sstableId.equals(source); + } + + public boolean isInTimestampWindow(long minTimestamp, long maxTimestamp) { - return sourceSstableId; + return ssTableMinTimestamp <= maxTimestamp && ssTableMaxTimestamp >= minTimestamp; } @Override @@ -95,10 +118,10 @@ public ByteSource asComparableBytesMaxPrefix(ByteComparable.Version version) @Override public int compareTo(PrimaryKey o) { - if (o instanceof PrimaryKeyWithSource) + if (source != null && o instanceof PrimaryKeyWithSource) { var other = (PrimaryKeyWithSource) o; - if (sourceSstableId.equals(other.sourceSstableId)) + if (source.equals(other.source)) return Long.compare(sourceRowId, other.sourceRowId); } return primaryKey.compareTo(o); @@ -107,10 +130,10 @@ public int compareTo(PrimaryKey o) @Override public boolean equals(Object o) { - if (o instanceof PrimaryKeyWithSource) + if (source != null && o instanceof PrimaryKeyWithSource) { var other = (PrimaryKeyWithSource) o; - if (sourceSstableId.equals(other.sourceSstableId)) + if (source.equals(other.source)) return sourceRowId == other.sourceRowId; } return primaryKey.equals(o); @@ -125,6 +148,6 @@ public int hashCode() @Override public String toString() { - return String.format("%s (source sstable: %s, %s)", primaryKey, sourceSstableId, sourceRowId); + return String.format("%s (source sstable: %s, %s)", primaryKey, source, sourceRowId); } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java index 7aea633376ee..d6845bcc8f75 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java @@ -67,7 +67,10 @@ public interface SearchableIndex extends Closeable public KeyRangeIterator search(Expression expression, AbstractBounds keyRange, QueryContext context, - boolean defer, int limit) throws IOException; + boolean defer, + int limit) throws IOException; + + public KeyRangeIterator searchNulls(AbstractBounds keyRange, QueryContext context) throws IOException; public List> orderBy(Orderer orderer, Expression slice, @@ -80,7 +83,8 @@ public List> orderResultsBy(QueryContex List keys, Orderer orderer, int limit, - long totalRows) throws IOException; + long totalRows, + boolean canSkipOutOfWindowPKs) throws IOException; List getSegments(); diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponentType.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponentType.java index 165f484eae66..7137cfa8fa4c 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponentType.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponentType.java @@ -111,7 +111,12 @@ public enum IndexComponentType * * V1 V2 */ - GROUP_COMPLETION_MARKER("GroupComplete"); + GROUP_COMPLETION_MARKER("GroupComplete"), + + /** + * Stores the list of row ids for null values + */ + NULL_POSTING_LIST("NullPostingList"),; public final String representation; diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexFeatureSet.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexFeatureSet.java index 46df62e39728..a7f847b5d7aa 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexFeatureSet.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexFeatureSet.java @@ -47,6 +47,11 @@ public interface IndexFeatureSet */ boolean hasTermsHistogram(); + /** + * @return true if the index has an index of null valued primary keys + */ + boolean hasNullIndex(); + /** * The {@code Accumulator} is used to accumulate the {@link IndexFeatureSet} responses from * multiple sources. This will include all the SSTables included in a query and all the indexes @@ -66,6 +71,7 @@ class Accumulator boolean isRowAware = true; boolean hasVectorIndexChecksum = true; boolean hasTermsHistogram = true; + boolean hasNullIndex = true; boolean complete = false; public Accumulator() @@ -87,6 +93,8 @@ public void accumulate(IndexFeatureSet indexFeatureSet) hasVectorIndexChecksum = false; if (!indexFeatureSet.hasTermsHistogram()) hasTermsHistogram = false; + if (!indexFeatureSet.hasNullIndex()) + hasNullIndex = false; } /** @@ -117,6 +125,12 @@ public boolean hasTermsHistogram() { return hasTermsHistogram; } + + @Override + public boolean hasNullIndex() + { + return hasNullIndex; + } }; } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java index 6411072486b8..a06ee2ff0e88 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java @@ -34,6 +34,7 @@ import org.apache.cassandra.index.sai.disk.v4.V4OnDiskFormat; import org.apache.cassandra.index.sai.disk.v5.V5OnDiskFormat; import org.apache.cassandra.index.sai.disk.v6.V6OnDiskFormat; +import org.apache.cassandra.index.sai.disk.v7.V7OnDiskFormat; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.utils.bytecomparable.ByteComparable; @@ -58,17 +59,18 @@ public class Version public static final Version DC = new Version("dc", V5OnDiskFormat.instance, (c, i, g) -> stargazerFileNameFormat(c, i, g, "dc")); // histograms in index metadata public static final Version EB = new Version("eb", V6OnDiskFormat.instance, (c, i, g) -> stargazerFileNameFormat(c, i, g, "eb")); + public static final Version FA = new Version("fa", V7OnDiskFormat.instance, (c, i, g) -> stargazerFileNameFormat(c, i, g, "fa")); // These are in reverse-chronological order so that the latest version is first. Version matching tests // are more likely to match the latest version so we want to test that one first. - public static final List ALL = Lists.newArrayList(EB, DC, DB, CA, BA, AA); + public static final List ALL = Lists.newArrayList(FA, EB, DC, DB, CA, BA, AA); public static final Version EARLIEST = AA; public static final Version VECTOR_EARLIEST = BA; // The latest version can be configured to be an earlier version to support partial upgrades that don't // write newer versions of the on-disk formats. This is volatile rather than final so that tests may // use reflection to change it and safely publish across threads. - private static volatile Version LATEST = parse(System.getProperty("cassandra.sai.latest.version", DC.version)); + private static volatile Version LATEST = parse(System.getProperty("cassandra.sai.latest.version", FA.version)); private static final Pattern GENERATION_PATTERN = Pattern.compile("\\d+"); diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/IndexSearcher.java b/src/java/org/apache/cassandra/index/sai/disk/v1/IndexSearcher.java index 52988a685cd5..646703033d20 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/IndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/IndexSearcher.java @@ -37,10 +37,15 @@ import org.apache.cassandra.index.sai.disk.PostingList; import org.apache.cassandra.index.sai.disk.PostingListKeyRangeIterator; import org.apache.cassandra.index.sai.disk.PrimaryKeyMap; +import org.apache.cassandra.index.sai.disk.format.IndexComponentType; +import org.apache.cassandra.index.sai.disk.io.IndexInput; +import org.apache.cassandra.index.sai.disk.v1.postings.PostingsReader; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.iterators.RowIdToPrimaryKeyWithSortKeyIterator; +import org.apache.cassandra.index.sai.metrics.QueryEventListener; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; +import org.apache.cassandra.index.sai.utils.IndexFileUtils; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.PrimaryKeyWithByteComparable; import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; @@ -101,6 +106,27 @@ protected IndexSearcher(PrimaryKeyMap.Factory primaryKeyMapFactory, */ public abstract KeyRangeIterator search(Expression expression, AbstractBounds keyRange, QueryContext queryContext, boolean defer, int limit) throws IOException; + + public KeyRangeIterator searchNulls(QueryContext queryContext) throws IOException + { + var nullPl = metadata.componentMetadatas.get(IndexComponentType.NULL_POSTING_LIST); + if (nullPl.root < 0) + return KeyRangeIterator.empty(); + + var postingsFile = indexFiles.getFile(IndexComponentType.NULL_POSTING_LIST); + IndexInput postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile); + var blocksSummary = new PostingsReader.BlocksSummary(postingsSummaryInput, + nullPl.root, + PostingsReader.InputCloser.NOOP); + + IndexInput postingsInput = IndexFileUtils.instance.openInput(postingsFile); + var postingList = new PostingsReader(postingsInput, + blocksSummary, + QueryEventListener.PostingListEventListener.NO_OP, + PostingsReader.InputCloser.NOOP); + return toPrimaryKeyIterator(postingList, queryContext); + } + /** * Order the on-disk index synchronously and produce an iterator in score order * @@ -115,7 +141,7 @@ protected IndexSearcher(PrimaryKeyMap.Factory primaryKeyMapFactory, @Override - public CloseableIterator orderResultsBy(SSTableReader reader, QueryContext context, List keys, Orderer orderer, int limit) throws IOException + public CloseableIterator orderResultsBy(SSTableReader reader, QueryContext context, List keys, Orderer orderer, int limit, boolean canSkipOutOfWindowPKs) throws IOException { return SortingIterator.createCloseable( orderer.getComparator(), @@ -152,7 +178,7 @@ private ByteComparable encode(ByteBuffer input) protected KeyRangeIterator toPrimaryKeyIterator(PostingList postingList, QueryContext queryContext) throws IOException { - if (postingList == null || postingList.size() == 0) + if (postingList == null || postingList.isEmpty()) return KeyRangeIterator.empty(); IndexSearcherContext searcherContext = new IndexSearcherContext(metadata.minKey, diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java index 102699ca0288..ec36dcba0069 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java @@ -34,14 +34,17 @@ import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.disk.MemtableTermsIterator; import org.apache.cassandra.index.sai.disk.PerIndexWriter; +import org.apache.cassandra.index.sai.disk.format.IndexComponentType; import org.apache.cassandra.index.sai.disk.format.IndexComponents; -import org.apache.cassandra.index.sai.disk.format.Version; +import org.apache.cassandra.index.sai.disk.io.IndexOutput; +import org.apache.cassandra.index.sai.disk.v1.postings.PostingsWriter; import org.apache.cassandra.index.sai.disk.vector.VectorMemtableIndex; import org.apache.cassandra.index.sai.disk.v1.kdtree.ImmutableOneDimPointValues; import org.apache.cassandra.index.sai.disk.v1.kdtree.NumericIndexWriter; import org.apache.cassandra.index.sai.disk.v1.trie.InvertedIndexWriter; import org.apache.cassandra.index.sai.memory.MemtableIndex; import org.apache.cassandra.index.sai.memory.RowMapping; +import org.apache.cassandra.index.sai.postings.IntArrayPostingList; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.utils.ByteBufferUtil; @@ -180,10 +183,20 @@ private long flush(DecoratedKey minKey, DecoratedKey maxKey, AbstractType ter return 0; } + // Write the row ids that had null values. This is done after the vector index is written because we implicitly + // rely on the fact that all rows are null valued if the index is empty. + IntArrayList nullRowIds = indexNullValuedPrimaryKeys(indexMetas); + + // We include the row ids of the null valued rows in the min/max row id range because those are necessary + // for proper ORDER BY queries. Practically speaking, this means that the whole segment's range is covered. + long minSSTableRowId = Math.min(terms.getMinSSTableRowId(), nullRowIds.isEmpty() ? Long.MAX_VALUE : nullRowIds.get(0)); + long maxSSTableRowId = Math.max(terms.getMaxSSTableRowId(), nullRowIds.isEmpty() ? Long.MIN_VALUE : nullRowIds.get(nullRowIds.size() - 1)); + metadataBuilder.setKeyRange(pkFactory.createPartitionKeyOnly(minKey), pkFactory.createPartitionKeyOnly(maxKey)); - metadataBuilder.setRowIdRange(terms.getMinSSTableRowId(), terms.getMaxSSTableRowId()); + metadataBuilder.setRowIdRange(minSSTableRowId, maxSSTableRowId); metadataBuilder.setTermRange(terms.getMinTerm(), terms.getMaxTerm()); metadataBuilder.setComponentsMetadata(indexMetas); + metadataBuilder.setNullValuedRows(nullRowIds.size()); SegmentMetadata metadata = metadataBuilder.build(); try (MetadataWriter writer = new MetadataWriter(perIndexComponents)) @@ -207,10 +220,15 @@ private void flushVectorIndex(DecoratedKey minKey, DecoratedKey maxKey, long sta SegmentMetadata.ComponentMetadataMap metadataMap = vectorIndex.writeData(perIndexComponents); + // Write the row ids that had null values. This is done after the vector index is written because we implicitly + // rely on the fact that all rows are null valued if the index is empty. + IntArrayList nullValuedRows = indexNullValuedPrimaryKeys(metadataMap); + SegmentMetadata metadata = new SegmentMetadata(0, rowMapping.size(), // TODO this isn't the right size metric. 0, rowMapping.maxSegmentRowId, + nullValuedRows.size(), pkFactory.createPartitionKeyOnly(minKey), pkFactory.createPartitionKeyOnly(maxKey), ByteBufferUtil.bytes(0), // VSTODO by pass min max terms for vectors @@ -226,6 +244,27 @@ private void flushVectorIndex(DecoratedKey minKey, DecoratedKey maxKey, long sta completeIndexFlush(rowMapping.size(), startTime, stopwatch); } + private IntArrayList indexNullValuedPrimaryKeys(SegmentMetadata.ComponentMetadataMap metadataMap) throws IOException + { + // Write the row ids that had null values. + SegmentMetadata.ComponentMetadataMap componentMetadataMap = new SegmentMetadata.ComponentMetadataMap(); + IntArrayList nullRowIds = rowMapping.convertToPostings(memtableIndex.nullValuedPrimaryKeys()); + try (IndexOutput output = perIndexComponents.addOrGet(IndexComponentType.NULL_POSTING_LIST).openOutput(); + PostingsWriter postingsWriter = new PostingsWriter(output)) + { + // Because we have a single posting list, we let -1 indicate an empty list. + long root = -1; + if (!nullRowIds.isEmpty()) + root = postingsWriter.write(new IntArrayPostingList(nullRowIds.toArray())); + postingsWriter.complete(); + long offset = postingsWriter.getStartOffset(); + long length = postingsWriter.getFilePointer() - offset; + componentMetadataMap.put(IndexComponentType.NULL_POSTING_LIST, root, offset, length); + metadataMap.addAll(componentMetadataMap); + return nullRowIds; + } + } + private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwatch) throws IOException { perIndexComponents.markComplete(); diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/PartitionAwarePrimaryKeyMap.java b/src/java/org/apache/cassandra/index/sai/disk/v1/PartitionAwarePrimaryKeyMap.java index 1e50f90fe195..1c1384da8993 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/PartitionAwarePrimaryKeyMap.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/PartitionAwarePrimaryKeyMap.java @@ -65,9 +65,8 @@ public static class PartitionAwarePrimaryKeyMapFactory implements Factory private final LongArray.Factory offsetReaderFactory; private final MetadataSource metadata; private final KeyFetcher keyFetcher; - private final IPartitioner partitioner; private final PrimaryKey.Factory primaryKeyFactory; - private final SSTableId sstableId; + private final SSTableReader sstableReader; private final long count; private FileHandle token = null; @@ -92,10 +91,9 @@ public PartitionAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComp this.tokenReaderFactory = new BlockPackedReader(token, tokensMeta); this.offsetReaderFactory = new MonotonicBlockPackedReader(offset, offsetsMeta); - this.partitioner = sstable.metadata().partitioner; this.keyFetcher = new KeyFetcher(sstable); this.primaryKeyFactory = primaryKeyFactory; - this.sstableId = sstable.getId(); + this.sstableReader = sstable; } catch (Throwable t) { @@ -109,7 +107,7 @@ public PrimaryKeyMap newPerSSTablePrimaryKeyMap() final LongArray rowIdToToken = new LongArray.DeferredLongArray(() -> tokenReaderFactory.open()); final LongArray rowIdToOffset = new LongArray.DeferredLongArray(() -> offsetReaderFactory.open()); - return new PartitionAwarePrimaryKeyMap(rowIdToToken, rowIdToOffset, partitioner, keyFetcher, primaryKeyFactory, sstableId); + return new PartitionAwarePrimaryKeyMap(rowIdToToken, rowIdToOffset, keyFetcher, primaryKeyFactory, sstableReader); } @Override @@ -131,29 +129,40 @@ public void close() throws IOException private final KeyFetcher keyFetcher; private final RandomAccessReader reader; private final PrimaryKey.Factory primaryKeyFactory; - private final SSTableId sstableId; + private final SSTableReader ssTableReader; private final ByteBuffer tokenBuffer = ByteBuffer.allocate(Long.BYTES); private PartitionAwarePrimaryKeyMap(LongArray rowIdToToken, LongArray rowIdToOffset, - IPartitioner partitioner, KeyFetcher keyFetcher, PrimaryKey.Factory primaryKeyFactory, - SSTableId sstableId) + SSTableReader ssTableReader) { this.rowIdToToken = rowIdToToken; this.rowIdToOffset = rowIdToOffset; - this.partitioner = partitioner; + this.partitioner = ssTableReader.metadata().partitioner; this.keyFetcher = keyFetcher; this.reader = keyFetcher.createReader(); this.primaryKeyFactory = primaryKeyFactory; - this.sstableId = sstableId; + this.ssTableReader = ssTableReader; } @Override public SSTableId getSSTableId() { - return sstableId; + return ssTableReader.getId(); + } + + @Override + public long getMinTimestamp() + { + return ssTableReader.getMinTimestamp(); + } + + @Override + public long getMaxTimestamp() + { + return ssTableReader.getMaxTimestamp(); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index 20beac1b6f2a..b50fb4221963 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -218,9 +218,6 @@ private boolean maybeAbort() private void addTerm(ByteBuffer term, PrimaryKey key, long sstableRowId, AbstractType type) throws IOException { - if (!indexContext.validateMaxTermSize(key.partitionKey(), term)) - return; - if (currentBuilder == null) { currentBuilder = newSegmentBuilder(sstableRowId); @@ -231,10 +228,12 @@ else if (shouldFlush(sstableRowId)) currentBuilder = newSegmentBuilder(sstableRowId); } - if (term.remaining() == 0 && !indexContext.getValidator().allowsEmpty()) - return; + long allocated; + if ((term.remaining() == 0 && !indexContext.getValidator().allowsEmpty()) || !indexContext.validateMaxTermSize(key.partitionKey(), term)) + allocated = currentBuilder.addNullValuedSSTableRowId(key, sstableRowId); + else + allocated = currentBuilder.addAll(term, type, key, sstableRowId); - long allocated = currentBuilder.addAll(term, type, key, sstableRowId); limiter.increment(allocated); } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/Segment.java b/src/java/org/apache/cassandra/index/sai/disk/v1/Segment.java index 87fe0a998e5f..ec9d4f3e711f 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/Segment.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/Segment.java @@ -147,6 +147,11 @@ public KeyRangeIterator search(Expression expression, AbstractBounds orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit) throws IOException + public CloseableIterator orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, boolean canSkipOutOfWindowPKs) throws IOException { - return index.orderResultsBy(sstableContext.sstable, context, keys, orderer, limit); + return index.orderResultsBy(sstableContext.sstable, context, keys, orderer, limit, canSkipOutOfWindowPKs); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java index 504df79c924a..520414b48075 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.carrotsearch.hppc.IntArrayList; import io.github.jbellis.jvector.pq.ProductQuantization; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; @@ -48,12 +49,15 @@ import org.apache.cassandra.index.sai.disk.format.IndexComponentType; import org.apache.cassandra.index.sai.disk.format.Version; import org.apache.cassandra.index.sai.disk.format.IndexComponents; +import org.apache.cassandra.index.sai.disk.io.IndexOutput; import org.apache.cassandra.index.sai.disk.v1.kdtree.BKDTreeRamBuffer; import org.apache.cassandra.index.sai.disk.v1.kdtree.MutableOneDimPointValues; import org.apache.cassandra.index.sai.disk.v1.kdtree.NumericIndexWriter; +import org.apache.cassandra.index.sai.disk.v1.postings.PostingsWriter; import org.apache.cassandra.index.sai.disk.v1.trie.InvertedIndexWriter; import org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph; import org.apache.cassandra.index.sai.disk.vector.CompactionGraph; +import org.apache.cassandra.index.sai.postings.IntArrayPostingList; import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.TypeUtil; @@ -114,6 +118,7 @@ public abstract class SegmentBuilder private long maxSSTableRowId = -1; private long segmentRowIdOffset = 0; int rowCount = 0; + IntArrayList nullValuedRows = new IntArrayList(); int maxSegmentRowId = -1; // in token order private PrimaryKey minKey; @@ -443,11 +448,39 @@ public SegmentMetadata flush() throws IOException metadataBuilder.setKeyRange(minKey, maxKey); metadataBuilder.setRowIdRange(minSSTableRowId, maxSSTableRowId); metadataBuilder.setTermRange(minTerm, maxTerm); + metadataBuilder.setNullValuedRows(nullValuedRows.size()); flushInternal(metadataBuilder); + + indexNullValuedPrimaryKeys(metadataBuilder); + return metadataBuilder.build(); } + private void indexNullValuedPrimaryKeys(SegmentMetadataBuilder metadataBuilder) throws IOException + { + // Write the row ids that had null values. + SegmentMetadata.ComponentMetadataMap componentMetadataMap = new SegmentMetadata.ComponentMetadataMap(); + try (IndexOutput output = components.addOrGet(IndexComponentType.NULL_POSTING_LIST).openOutput(); + PostingsWriter postingsWriter = new PostingsWriter(output)) + { + // Because we have a single posting list, we let -1 indicate an empty list. + long root = -1; + if (!nullValuedRows.isEmpty()) + root = postingsWriter.write(new IntArrayPostingList(nullValuedRows.toArray())); + postingsWriter.complete(); + long offset = postingsWriter.getStartOffset(); + long length = postingsWriter.getFilePointer() - offset; + componentMetadataMap.put(IndexComponentType.NULL_POSTING_LIST, root, offset, length); + metadataBuilder.addComponentsMetadata(componentMetadataMap); + } + } + + public long addNullValuedSSTableRowId(PrimaryKey key, long sstableRowId) + { + return add(null, key, sstableRowId); + } + public long addAll(ByteBuffer term, AbstractType type, PrimaryKey key, long sstableRowId) { long totalSize = 0; @@ -475,12 +508,6 @@ private long add(ByteBuffer term, PrimaryKey key, long sstableRowId) minKey = minKey == null ? key : minKey; maxKey = key; - // Note that the min and max terms are not encoded. - minTerm = TypeUtil.min(term, minTerm, termComparator, Version.latest()); - maxTerm = TypeUtil.max(term, maxTerm, termComparator, Version.latest()); - - rowCount++; - // segmentRowIdOffset should encode sstableRowId into Integer int segmentRowId = Math.toIntExact(sstableRowId - segmentRowIdOffset); @@ -489,9 +516,25 @@ private long add(ByteBuffer term, PrimaryKey key, long sstableRowId) maxSegmentRowId = Math.max(maxSegmentRowId, segmentRowId); - long bytesAllocated = supportsAsyncAdd() - ? addInternalAsync(term, segmentRowId) - : addInternal(term, segmentRowId); + long bytesAllocated; + if (term == null) + { + nullValuedRows.add(segmentRowId); + bytesAllocated = Integer.BYTES; + } + else + { + // Note that the min and max terms are not encoded. + minTerm = TypeUtil.min(term, minTerm, termComparator, Version.latest()); + maxTerm = TypeUtil.max(term, maxTerm, termComparator, Version.latest()); + + rowCount++; + + bytesAllocated = supportsAsyncAdd() + ? addInternalAsync(term, segmentRowId) + : addInternal(term, segmentRowId); + } + totalBytesAllocated += bytesAllocated; return bytesAllocated; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java index 00361d6f27ab..c90779f981cc 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java @@ -76,6 +76,11 @@ public class SegmentMetadata implements Comparable */ public final long numRows; + /** + * number of rows with null values in current segment. -1 indicates that this information is not available. + */ + public final int nullValuedRows; + /** * Ordered by their token position in current segment */ @@ -105,6 +110,7 @@ public class SegmentMetadata implements Comparable long numRows, long minSSTableRowId, long maxSSTableRowId, + int nullValuedRows, PrimaryKey minKey, PrimaryKey maxKey, ByteBuffer minTerm, @@ -123,6 +129,7 @@ public class SegmentMetadata implements Comparable this.minSSTableRowId = minSSTableRowId; this.maxSSTableRowId = maxSSTableRowId; this.numRows = numRows; + this.nullValuedRows = nullValuedRows; this.minKey = minKey; this.maxKey = maxKey; this.minTerm = minTerm; @@ -160,6 +167,7 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version) } } this.termsDistribution = td; + this.nullValuedRows = version.onOrAfter(Version.FA) ? input.readInt() : -1; this.componentMetadatas = new SegmentMetadata.ComponentMetadataMap(input); } @@ -218,6 +226,12 @@ public static void write(MetadataWriter writer, List segments) } } + if (writer.version().onOrAfter(Version.FA)) + { + assert metadata.nullValuedRows >= 0; + output.writeInt(metadata.nullValuedRows); + } + metadata.componentMetadatas.write(output); } } @@ -394,6 +408,11 @@ public Map> asMap() return metaAttributes; } + public void addAll(ComponentMetadataMap other) + { + metas.putAll(other.metas); + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadataBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadataBuilder.java index bfc14c2f3125..9d85649dc60f 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadataBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadataBuilder.java @@ -70,6 +70,7 @@ public class SegmentMetadataBuilder private ByteBuffer maxTerm; private long numRows; + private int nullValuedRows; private final TermsDistribution.Builder termsDistributionBuilder; @@ -99,6 +100,11 @@ public void setRowIdRange(long minRowId, long maxRowId) this.maxRowId = maxRowId; } + public void setNullValuedRows(int nullValuedRows) + { + this.nullValuedRows = nullValuedRows; + } + /** * Sets the term range of the data indexed by this segment. * We need this method because we cannot automatically record min and max term. We need exact @@ -117,6 +123,14 @@ public void setComponentsMetadata(SegmentMetadata.ComponentMetadataMap metadataM this.metadataMap = metadataMap; } + public void addComponentsMetadata(SegmentMetadata.ComponentMetadataMap metadataMap) + { + if (this.metadataMap == null) + this.metadataMap = metadataMap; + else + this.metadataMap.addAll(metadataMap); + } + /** * Should be called whenever a point is added to the index. * Points must be added in the index term order. @@ -148,6 +162,7 @@ void add(ByteComparable term, int rowCount) numRows, minRowId, maxRowId, + nullValuedRows, minKey, maxKey, minTerm, diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java index 5dbd732f4676..53923f52e632 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java @@ -141,6 +141,12 @@ public boolean hasTermsHistogram() { return false; } + + @Override + public boolean hasNullIndex() + { + return false; + } }; protected V1OnDiskFormat() @@ -168,7 +174,7 @@ public PrimaryKeyMap.Factory newPrimaryKeyMapFactory(IndexComponents.ForRead per public SearchableIndex newSearchableIndex(SSTableContext sstableContext, IndexComponents.ForRead perIndexComponents) { return perIndexComponents.isEmpty() - ? new EmptyIndex() + ? new EmptyIndex(sstableContext) : new V1SearchableIndex(sstableContext, perIndexComponents); } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java index b7b0ef0222f3..1df36e00fcf0 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java @@ -193,6 +193,30 @@ public KeyRangeIterator search(Expression expression, } } + @Override + public KeyRangeIterator searchNulls(AbstractBounds keyRange, QueryContext context) throws IOException + { + KeyRangeConcatIterator.Builder rangeConcatIteratorBuilder = KeyRangeConcatIterator.builder(segments.size()); + + try + { + for (Segment segment : segments) + { + if (segment.intersects(keyRange)) + { + rangeConcatIteratorBuilder.add(segment.searchNulls(context)); + } + } + + return rangeConcatIteratorBuilder.build(); + } + catch (Throwable t) + { + FileUtils.closeQuietly(rangeConcatIteratorBuilder.ranges()); + throw t; + } + } + @Override public List> orderBy(Orderer orderer, Expression slice, AbstractBounds keyRange, @@ -222,7 +246,7 @@ public List> orderBy(Orderer orderer, E } @Override - public List> orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, long totalRows) throws IOException + public List> orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, long totalRows, boolean canSkipOutOfWindowPKs) throws IOException { var results = new ArrayList>(segments.size()); try @@ -232,7 +256,7 @@ public List> orderResultsBy(QueryContex // Only pass the primary keys in a segment's range to the segment index. var segmentKeys = getKeysInRange(keys, segment); var segmentLimit = segment.proportionalAnnLimit(limit, totalRows); - results.add(segment.orderResultsBy(context, segmentKeys, orderer, segmentLimit)); + results.add(segment.orderResultsBy(context, segmentKeys, orderer, segmentLimit, canSkipOutOfWindowPKs)); } return results; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/postings/MergePostingList.java b/src/java/org/apache/cassandra/index/sai/disk/v1/postings/MergePostingList.java index a4aaa916368b..540995c733fe 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/postings/MergePostingList.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/postings/MergePostingList.java @@ -59,6 +59,11 @@ public static PostingList merge(List postings) return new MergePostingList(postings); } + public static PostingList merge(PostingList... postings) + { + return merge(List.of(postings)); + } + @Override public int nextPosting() throws IOException { diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/RowAwarePrimaryKeyMap.java b/src/java/org/apache/cassandra/index/sai/disk/v2/RowAwarePrimaryKeyMap.java index 3bff4e85084e..340ae69e69fa 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/RowAwarePrimaryKeyMap.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/RowAwarePrimaryKeyMap.java @@ -71,7 +71,6 @@ public class RowAwarePrimaryKeyMap implements PrimaryKeyMap @ThreadSafe public static class RowAwarePrimaryKeyMapFactory implements Factory { - private final IndexComponents.ForRead perSSTableComponents; private final LongArray.Factory tokenReaderFactory; private final SortedTermsReader sortedTermsReader; private final long count; @@ -79,16 +78,13 @@ public static class RowAwarePrimaryKeyMapFactory implements Factory private FileHandle termsDataBlockOffsets = null; private FileHandle termsData = null; private FileHandle termsTrie = null; - private final IPartitioner partitioner; - private final ClusteringComparator clusteringComparator; private final PrimaryKey.Factory primaryKeyFactory; - private final SSTableId sstableId; + private final SSTableReader ssTableReader; public RowAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents, PrimaryKey.Factory primaryKeyFactory, SSTableReader sstable) { try { - this.perSSTableComponents = perSSTableComponents; MetadataSource metadataSource = MetadataSource.loadMetadata(perSSTableComponents); NumericValuesMeta tokensMeta = new NumericValuesMeta(metadataSource.get(perSSTableComponents.get(IndexComponentType.TOKEN_VALUES))); count = tokensMeta.valueCount; @@ -101,10 +97,8 @@ public RowAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents this.termsData = perSSTableComponents.get(IndexComponentType.PRIMARY_KEY_BLOCKS).createFileHandle(); this.termsTrie = perSSTableComponents.get(IndexComponentType.PRIMARY_KEY_TRIE).createFileHandle(); this.sortedTermsReader = new SortedTermsReader(termsData, termsDataBlockOffsets, termsTrie, sortedTermsMeta, blockOffsetsMeta); - this.partitioner = sstable.metadata().partitioner; this.primaryKeyFactory = primaryKeyFactory; - this.clusteringComparator = sstable.metadata().comparator; - this.sstableId = sstable.getId(); + this.ssTableReader = sstable; } catch (Throwable t) { @@ -115,16 +109,13 @@ public RowAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents @Override public PrimaryKeyMap newPerSSTablePrimaryKeyMap() { - final LongArray rowIdToToken = new LongArray.DeferredLongArray(() -> tokenReaderFactory.open()); + final LongArray rowIdToToken = new LongArray.DeferredLongArray(tokenReaderFactory::open); try { return new RowAwarePrimaryKeyMap(rowIdToToken, - sortedTermsReader, sortedTermsReader.openCursor(), - partitioner, primaryKeyFactory, - clusteringComparator, - sstableId); + ssTableReader); } catch (IOException e) { @@ -146,35 +137,42 @@ public void close() throws IOException } private final LongArray rowIdToToken; - private final SortedTermsReader sortedTermsReader; private final SortedTermsReader.Cursor cursor; private final IPartitioner partitioner; private final PrimaryKey.Factory primaryKeyFactory; private final ClusteringComparator clusteringComparator; - private final SSTableId sstableId; + private final SSTableReader ssTableReader; private final ByteBuffer tokenBuffer = ByteBuffer.allocate(Long.BYTES); private RowAwarePrimaryKeyMap(LongArray rowIdToToken, - SortedTermsReader sortedTermsReader, SortedTermsReader.Cursor cursor, - IPartitioner partitioner, PrimaryKey.Factory primaryKeyFactory, - ClusteringComparator clusteringComparator, - SSTableId sstableId) + SSTableReader ssTableReader) { this.rowIdToToken = rowIdToToken; - this.sortedTermsReader = sortedTermsReader; this.cursor = cursor; - this.partitioner = partitioner; this.primaryKeyFactory = primaryKeyFactory; - this.clusteringComparator = clusteringComparator; - this.sstableId = sstableId; + this.partitioner = ssTableReader.metadata().partitioner; + this.clusteringComparator = ssTableReader.metadata().comparator; + this.ssTableReader = ssTableReader; } @Override public SSTableId getSSTableId() { - return sstableId; + return ssTableReader.getId(); + } + + @Override + public long getMinTimestamp() + { + return ssTableReader.getMinTimestamp(); + } + + @Override + public long getMaxTimestamp() + { + return ssTableReader.getMaxTimestamp(); } public long count() diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java index 8ad67fcfd9e1..832f8ef007ed 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java @@ -85,6 +85,12 @@ public boolean hasTermsHistogram() { return false; } + + @Override + public boolean hasNullIndex() + { + return false; + } }; protected V2OnDiskFormat() diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java index 62748e3332c4..f4e93ff00434 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/V2VectorIndexSearcher.java @@ -464,17 +464,18 @@ private SparseBits bitSetForSearch() @Override public CloseableIterator orderResultsBy(SSTableReader reader, - QueryContext context, - List keys, - Orderer orderer, - int limit) throws IOException + QueryContext context, + List keys, + Orderer orderer, + int limit, + boolean canSkipOutOfWindowPKs) throws IOException { if (keys.isEmpty()) return CloseableIterator.emptyIterator(); int rerankK = indexContext.getIndexWriterConfig().getSourceModel().rerankKFor(limit, graph.getCompression()); // Convert PKs to segment row ids and map to ordinals, skipping any that don't exist in this segment - var segmentOrdinalPairs = flatmapPrimaryKeysToBitsAndRows(keys); + var segmentOrdinalPairs = flatmapPrimaryKeysToBitsAndRows(keys, canSkipOutOfWindowPKs); var numRows = segmentOrdinalPairs.size(); final CostEstimate cost = estimateCost(rerankK, numRows); Tracing.logAndTrace(logger, "{} relevant rows out of {} in range in index of {} nodes; estimate for LIMIT {} is {}", @@ -502,10 +503,11 @@ public CloseableIterator orderResultsBy(SSTableReader rea * Build a mapping of segment row id to ordinal for the given primary keys, skipping any that don't exist in this * segment. * @param keysInRange the primary keys to map + * @param canSkipOutOfWindowPKs whether to skip primary keys that are out of the timestamp window * @return a mapping of segment row id to ordinal * @throws IOException */ - private IntIntPairArray flatmapPrimaryKeysToBitsAndRows(List keysInRange) throws IOException + private IntIntPairArray flatmapPrimaryKeysToBitsAndRows(List keysInRange, boolean canSkipOutOfWindowPKs) throws IOException { var segmentOrdinalPairs = new IntIntPairArray(keysInRange.size()); int lastSegmentRowId = -1; @@ -516,17 +518,37 @@ private IntIntPairArray flatmapPrimaryKeysToBitsAndRows(List keysInR // (if most of the keys belong to this sstable, bsearch will actually be slower) var comparisonsSavedByBsearch = new QuickSlidingWindowReservoir(10); boolean preferSeqScanToBsearch = false; + long minTimestamp = primaryKeyMap.getMinTimestamp(); + long maxTimestamp = primaryKeyMap.getMaxTimestamp(); for (int i = 0; i < keysInRange.size();) { // turn the pk back into a row id, with a fast path for the case where the pk is from this sstable var primaryKey = keysInRange.get(i); + var isPrimaryKeyWithSource = primaryKey instanceof PrimaryKeyWithSource; long sstableRowId; - if (primaryKey instanceof PrimaryKeyWithSource - && ((PrimaryKeyWithSource) primaryKey).getSourceSstableId().equals(primaryKeyMap.getSSTableId())) - sstableRowId = ((PrimaryKeyWithSource) primaryKey).getSourceRowId(); - else + if (!isPrimaryKeyWithSource) + { sstableRowId = primaryKeyMap.exactRowIdOrInvertedCeiling(primaryKey); + } + else + { + var pkms = (PrimaryKeyWithSource) primaryKey; + if (pkms.matchesSource(primaryKeyMap.getSSTableId())) + { + sstableRowId = pkms.getSourceRowId(); + } + else if (!canSkipOutOfWindowPKs || pkms.isInTimestampWindow(minTimestamp, maxTimestamp)) + { + sstableRowId = primaryKeyMap.exactRowIdOrInvertedCeiling(primaryKey); + } + else + { + // The key is not in the timestamp window, so we can skip it. + i++; + continue; + } + } if (sstableRowId < 0) { @@ -595,9 +617,11 @@ private IntIntPairArray flatmapPrimaryKeysToBitsAndRows(List keysInR // This requirement is required by the ordinals view. There are cases where we have broken this // requirement, and in order to make future debugging easier, we check here and throw an exception // with additional detail. - if (segmentRowId <= lastSegmentRowId) + if (segmentRowId < lastSegmentRowId) throw new IllegalStateException("Row ids must ascend monotonically. Got " + segmentRowId + " after " + lastSegmentRowId + " for " + primaryKey + " on sstable " + primaryKeyMap.getSSTableId()); + if (segmentRowId == lastSegmentRowId) + continue; lastSegmentRowId = segmentRowId; int ordinal = ordinalsView.getOrdinalForRowId(segmentRowId); if (ordinal >= 0) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v3/V3OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v3/V3OnDiskFormat.java index 1f70cd2031f1..99b697ed4541 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v3/V3OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v3/V3OnDiskFormat.java @@ -81,6 +81,12 @@ public boolean hasTermsHistogram() { return false; } + + @Override + public boolean hasNullIndex() + { + return false; + } }; @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v6/V6OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v6/V6OnDiskFormat.java index 98a4eb356a5c..941954f1e3bf 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v6/V6OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v6/V6OnDiskFormat.java @@ -44,6 +44,12 @@ public boolean hasTermsHistogram() { return true; } + + @Override + public boolean hasNullIndex() + { + return false; + } }; @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java new file mode 100644 index 000000000000..ec413724fd70 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.disk.v7; + +import java.util.EnumSet; +import java.util.Set; + +import org.apache.cassandra.index.sai.IndexContext; +import org.apache.cassandra.index.sai.disk.format.IndexComponentType; +import org.apache.cassandra.index.sai.disk.format.IndexFeatureSet; +import org.apache.cassandra.index.sai.disk.v6.V6OnDiskFormat; + +public class V7OnDiskFormat extends V6OnDiskFormat +{ + public static final V7OnDiskFormat instance = new V7OnDiskFormat(); + + private static final IndexFeatureSet v7IndexFeatureSet = new IndexFeatureSet() + { + @Override + public boolean isRowAware() + { + return true; + } + + @Override + public boolean hasVectorIndexChecksum() + { + return false; + } + + @Override + public boolean hasTermsHistogram() + { + return true; + } + + @Override + public boolean hasNullIndex() + { + return true; + } + }; + + + private static final Set LITERAL_COMPONENTS = EnumSet.of(IndexComponentType.COLUMN_COMPLETION_MARKER, + IndexComponentType.META, + IndexComponentType.TERMS_DATA, + IndexComponentType.POSTING_LISTS, + IndexComponentType.NULL_POSTING_LIST); + + private static final Set NUMERIC_COMPONENTS = EnumSet.of(IndexComponentType.COLUMN_COMPLETION_MARKER, + IndexComponentType.META, + IndexComponentType.KD_TREE, + IndexComponentType.KD_TREE_POSTING_LISTS, + IndexComponentType.NULL_POSTING_LIST); + + private static final Set VECTOR_COMPONENTS = EnumSet.of(IndexComponentType.COLUMN_COMPLETION_MARKER, + IndexComponentType.META, + IndexComponentType.PQ, + IndexComponentType.TERMS_DATA, + IndexComponentType.POSTING_LISTS, + IndexComponentType.NULL_POSTING_LIST); + + @Override + public Set perIndexComponentTypes(IndexContext context) + { + return context.isLiteral() ? LITERAL_COMPONENTS : context.isVector() ? VECTOR_COMPONENTS : NUMERIC_COMPONENTS; + } + + @Override + public IndexFeatureSet indexFeatureSet() + { + return v7IndexFeatureSet; + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/VectorMemtableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/vector/VectorMemtableIndex.java index 25231748149f..06541134fa08 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/VectorMemtableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/VectorMemtableIndex.java @@ -51,15 +51,19 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.QueryContext; +import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource; import org.apache.cassandra.index.sai.disk.format.IndexComponents; +import org.apache.cassandra.index.sai.disk.format.Version; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.memory.MemtableIndex; +import org.apache.cassandra.index.sai.memory.TrieMemoryIndex; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.PrimaryKeyWithScore; import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; +import org.apache.cassandra.index.sai.utils.PrimaryKeys; import org.apache.cassandra.index.sai.utils.RangeUtil; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.tracing.Tracing; @@ -91,12 +95,14 @@ public class VectorMemtableIndex implements MemtableIndex private PrimaryKey maximumKey; private final NavigableSet primaryKeys = new ConcurrentSkipListSet<>(); + private final PrimaryKeys EMPTY_PRIMARY_KEYS; private final Memtable mt; public VectorMemtableIndex(IndexContext indexContext, Memtable mt) { this.indexContext = indexContext; this.graph = new CassandraOnHeapGraph<>(indexContext, true, mt); + this.EMPTY_PRIMARY_KEYS = Version.latest().onDiskFormat().indexFeatureSet().hasNullIndex() ? new PrimaryKeys() : null; this.mt = mt; } @@ -110,7 +116,14 @@ public Memtable getMemtable() public void index(DecoratedKey key, Clustering clustering, ByteBuffer value, Memtable memtable, OpOrder.Group opGroup) { if (value == null || value.remaining() == 0) + { + if (EMPTY_PRIMARY_KEYS != null) + { + long bytes = EMPTY_PRIMARY_KEYS.add(indexContext.keyFactory().create(key, clustering)); + memtable.markExtraOnHeapUsed(bytes, opGroup); + } return; + } var primaryKey = indexContext.keyFactory().create(key, clustering); long allocatedBytes = index(primaryKey, value); @@ -132,6 +145,7 @@ private long index(PrimaryKey primaryKey, ByteBuffer value) @Override public void update(DecoratedKey key, Clustering clustering, ByteBuffer oldValue, ByteBuffer newValue, Memtable memtable, OpOrder.Group opGroup) { + //todo handle EMPTY_PRIMARY_KEYS int oldRemaining = oldValue == null ? 0 : oldValue.remaining(); int newRemaining = newValue == null ? 0 : newValue.remaining(); if (oldRemaining == 0 && newRemaining == 0) @@ -184,9 +198,15 @@ else if (primaryKey.compareTo(maximumKey) > 0) maximumKey = primaryKey; } + // todo test bounded ann for the latest change too @Override public KeyRangeIterator search(QueryContext context, Expression expr, AbstractBounds keyRange, int limit) { + if (expr.getOp() == Expression.Op.IS_NULL) + return EMPTY_PRIMARY_KEYS.isEmpty() + ? KeyRangeIterator.empty() + : new TrieMemoryIndex.SortedSetKeyRangeIterator(EMPTY_PRIMARY_KEYS.keys(), mt); + if (expr.getOp() != Expression.Op.BOUNDED_ANN) throw new IllegalArgumentException(indexContext.logMessage("Only BOUNDED_ANN is supported, received: " + expr)); var qv = vts.createFloatVector(expr.lower.value.vector); @@ -278,7 +298,7 @@ private CloseableIterator searchInternal(QueryContext con @Override - public CloseableIterator orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit) + public CloseableIterator orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, boolean canSkipOutOfWindowPKs) { if (minimumKey == null) // This case implies maximumKey is empty too. @@ -293,6 +313,13 @@ public CloseableIterator orderResultsBy(QueryContext cont .takeWhile(k -> k.compareTo(maximumKey) <= 0) .forEach(k -> { + // We use max value for the upper bound of the timestamp window because memtables are constantly written to. + // TODO do we have multiple memtables? Is it valid to check the source here? The graph lookup should + // be pretty cheap, so leaving it as is for now. + if (canSkipOutOfWindowPKs + && k instanceof PrimaryKeyWithSource + && !((PrimaryKeyWithSource) k).isInTimestampWindow(mt.getMinTimestamp(), Long.MAX_VALUE)) + return; var v = graph.vectorForKey(k); if (v == null) return; @@ -429,6 +456,12 @@ public Iterator>> iterator(DecoratedKe throw new UnsupportedOperationException(); } + @Override + public Iterator nullValuedPrimaryKeys() + { + return EMPTY_PRIMARY_KEYS != null ? EMPTY_PRIMARY_KEYS.iterator() : Collections.emptyIterator(); + } + /** returns true if the index is non-empty and should be flushed */ public boolean preFlush(ToIntFunction ordinalMapper) { @@ -470,7 +503,7 @@ public long estimatedOffHeapMemoryUsed() @Override public boolean isEmpty() { - return graph.isEmpty(); + return graph.isEmpty() && (EMPTY_PRIMARY_KEYS == null || EMPTY_PRIMARY_KEYS.isEmpty()); } @Nullable diff --git a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIterator.java b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIterator.java index 800ca7bccdb4..7c1330b8d4d8 100644 --- a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIterator.java +++ b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIterator.java @@ -113,7 +113,7 @@ public static class Builder extends KeyRangeIterator.Builder private final List rangeIterators; public Builder(int size) { - super(IteratorType.CONCAT); + super(IteratorType.CONCAT, true); this.rangeIterators = new ArrayList<>(size); } diff --git a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeIntersectionIterator.java b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeIntersectionIterator.java index f4a464aa8ff3..87dded1a7fcf 100644 --- a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeIntersectionIterator.java +++ b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeIntersectionIterator.java @@ -19,11 +19,13 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Queue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,16 +58,23 @@ public static boolean shouldDefer(int numberOfExpressions) public final List ranges; private final int[] rangeStats; + private final boolean isNonReducing; + private final Queue queuedPrimaryKeys; - private KeyRangeIntersectionIterator(Builder.Statistics statistics, List ranges) + private KeyRangeIntersectionIterator(Builder.Statistics statistics, List ranges, boolean isNonReducing) { super(statistics); this.ranges = ranges; this.rangeStats = new int[ranges.size()]; + this.isNonReducing = isNonReducing; + this.queuedPrimaryKeys = isNonReducing ? new ArrayDeque<>() : null; } protected PrimaryKey computeNext() { + if (isNonReducing && !queuedPrimaryKeys.isEmpty()) + return queuedPrimaryKeys.poll(); + // The highest primary key seen on any range iterator so far. // It can become null when we reach the end of the iterator. PrimaryKey highestKey = ranges.get(0).hasNext() ? ranges.get(0).next() : null; @@ -90,6 +99,8 @@ protected PrimaryKey computeNext() { // We jumped over the highest key seen so far, so make it the new highest key. highestKey = nextKey; + if (isNonReducing) + queuedPrimaryKeys.clear(); // Remember this iterator to avoid advancing it again, because it is already at the highest key alreadyAdvanced = index; // This iterator jumped over, so the other iterators are lagging behind now, @@ -98,6 +109,8 @@ protected PrimaryKey computeNext() // the other iterators except this one to match the new highest key. continue outer; } + if (isNonReducing) + queuedPrimaryKeys.add(nextKey); assert comparisonResult == 0 : String.format("skipTo skipped to an item smaller than the target; " + "iterator: %s, target key: %s, returned key: %s", range, highestKey, nextKey); @@ -122,6 +135,16 @@ protected PrimaryKey computeNext() rangeStats[idxOfSmallest] = a; } + if (isNonReducing) + { + // In the non-reducing case, we need to fetch all the keys that satisfy the intersection. + for (KeyRangeIterator source : ranges) + { + while (source.hasNext() && source.peek().compareTo(highestKey) == 0) + queuedPrimaryKeys.add(source.next()); + } + } + return highestKey; } return endOfData(); @@ -154,7 +177,7 @@ protected void performSkipTo(PrimaryKey nextToken) private PrimaryKey nextOrNull(KeyRangeIterator iterator, PrimaryKey minKey) { iterator.skipTo(minKey); - return iterator.hasNext() ? iterator.next() : null; + return iterator.hasNext() ? (isNonReducing ? iterator.peek() : iterator.next()) : null; } public void close() throws IOException @@ -162,32 +185,36 @@ public void close() throws IOException ranges.forEach(FileUtils::closeQuietly); } - public static Builder builder(List ranges) + public static Builder builder(int size, boolean isNonReducing) { - var builder = new Builder(ranges.size()); - for (var range : ranges) - builder.add(range); - return builder; + return new Builder(size, isNonReducing); } public static Builder builder(int size) { - return new Builder(size); + return new Builder(size, false); + } + + + public static Builder builder(boolean isNonReducing) + { + return builder(4, isNonReducing); } public static Builder builder() { - return builder(4); + return builder(4, false); } public static class Builder extends KeyRangeIterator.Builder { protected List rangeIterators; - private Builder(int size) + private Builder(int size, boolean isNonReducing) { - super(IteratorType.INTERSECTION); + super(IteratorType.INTERSECTION, isNonReducing); rangeIterators = new ArrayList<>(size); + } public KeyRangeIterator.Builder add(KeyRangeIterator range) @@ -238,7 +265,7 @@ protected KeyRangeIterator buildIterator() if (rangeCount() == 1) return rangeIterators.get(0); - return new KeyRangeIntersectionIterator(statistics, rangeIterators); + return new KeyRangeIntersectionIterator(statistics, rangeIterators, isNonReducing); } } } diff --git a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeIterator.java index 51b83875c669..3dcde70971f4 100644 --- a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeIterator.java +++ b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeIterator.java @@ -126,10 +126,15 @@ public enum IteratorType @VisibleForTesting protected final Statistics statistics; + protected final boolean isNonReducing; - public Builder(IteratorType type) + public Builder(IteratorType type, boolean isNonReducing) { + if (type == IteratorType.CONCAT && !isNonReducing) + throw new IllegalArgumentException("Concat is always non-reducing"); + statistics = new Statistics(type); + this.isNonReducing = isNonReducing; } public PrimaryKey getMinimum() diff --git a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeNonReducingLeftJoinIterator.java b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeNonReducingLeftJoinIterator.java new file mode 100644 index 000000000000..25476fca65ac --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeNonReducingLeftJoinIterator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.iterators; + +import java.io.IOException; + +import org.apache.cassandra.index.sai.utils.PrimaryKey; +import org.apache.cassandra.io.util.FileUtils; + +/** + * A {@link KeyRangeIterator} that performs a non-reducing left join between two iterators so that all of the + * primary keys from the left iterator are returned, and only the primary keys from the right iterator that match + * the primary keys from the left iterator are returned. + */ +public class KeyRangeNonReducingLeftJoinIterator extends KeyRangeIterator +{ + private final KeyRangeIterator left; + private final KeyRangeIterator right; + + public KeyRangeNonReducingLeftJoinIterator(KeyRangeIterator left, KeyRangeIterator right) + { + super(left); + this.left = left; + this.right = right; + } + + @Override + protected void performSkipTo(PrimaryKey nextKey) + { + left.skipTo(nextKey); + } + + @Override + protected PrimaryKey computeNext() + { + if (!left.hasNext()) + return endOfData(); + + // TODO is this skipTo performant enough? What about on large sstables with an empty column? + right.skipTo(left.peek()); + + return (right.hasNext() && right.peek().equals(left.peek())) + ? right.next() + : left.next(); + } + + @Override + public void close() throws IOException + { + FileUtils.closeQuietly(left); + FileUtils.closeQuietly(right); + } +} diff --git a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeTermIterator.java b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeTermIterator.java index 83e695eef2f4..de4a0eb0c053 100644 --- a/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeTermIterator.java +++ b/src/java/org/apache/cassandra/index/sai/iterators/KeyRangeTermIterator.java @@ -65,17 +65,24 @@ private KeyRangeTermIterator(KeyRangeIterator union, Set reference @SuppressWarnings("resource") - public static KeyRangeTermIterator build(final Expression e, Set perSSTableIndexes, AbstractBounds keyRange, QueryContext queryContext, boolean defer, int limit) + public static KeyRangeTermIterator build(final Expression e, Set perSSTableIndexes, AbstractBounds keyRange, QueryContext queryContext, boolean defer, int limit, boolean isNonReducing) { - KeyRangeIterator rangeIterator = buildRangeIterator(e, perSSTableIndexes, keyRange, queryContext, defer, limit); - return new KeyRangeTermIterator(rangeIterator, perSSTableIndexes, queryContext); + KeyRangeIterator rangeIterator = buildRangeIterator(e, perSSTableIndexes, keyRange, queryContext, defer, limit, isNonReducing); + if (!isNonReducing) + return new KeyRangeTermIterator(rangeIterator, perSSTableIndexes, queryContext); + + // In this code path, we get the nulls iterator and merge it with the range iterator + var isNullExpression = new Expression(e.context, Expression.Op.IS_NULL); + var nullsIterator = buildRangeIterator(isNullExpression, perSSTableIndexes, keyRange, queryContext, defer, limit, true); + var merged = new KeyRangeNonReducingLeftJoinIterator(rangeIterator, nullsIterator); + return new KeyRangeTermIterator(merged, perSSTableIndexes, queryContext); } - private static KeyRangeIterator buildRangeIterator(final Expression e, Set perSSTableIndexes, AbstractBounds keyRange, QueryContext queryContext, boolean defer, int limit) + private static KeyRangeIterator buildRangeIterator(final Expression e, Set perSSTableIndexes, AbstractBounds keyRange, QueryContext queryContext, boolean defer, int limit, boolean isNonReducing) { final List tokens = new ArrayList<>(1 + perSSTableIndexes.size()); - KeyRangeIterator memtableIterator = e.context.searchMemtable(queryContext, e, keyRange, limit); + KeyRangeIterator memtableIterator = e.context.searchMemtable(queryContext, e, keyRange, isNonReducing, limit); if (memtableIterator != null) tokens.add(memtableIterator); @@ -106,7 +113,7 @@ private static KeyRangeIterator buildRangeIterator(final Expression e, Set ranges; + private final boolean isNonReducing; - private KeyRangeUnionIterator(Builder.Statistics statistics, List ranges) + private KeyRangeUnionIterator(Builder.Statistics statistics, List ranges, boolean isNonReducing) { super(statistics); - this.ranges = new ArrayList<>(ranges); + this.ranges = ranges; + this.isNonReducing = isNonReducing; } public PrimaryKey computeNext() @@ -58,10 +60,10 @@ public PrimaryKey computeNext() else { int cmp = candidate.peek().compareTo(range.peek()); - if (cmp == 0) - range.next(); - else if (cmp > 0) + if (cmp > 0) candidate = range; + else if (!isNonReducing && cmp == 0) + range.next(); } } if (candidate == null) @@ -84,28 +86,34 @@ public void close() throws IOException ranges.forEach(FileUtils::closeQuietly); } - public static Builder builder(int size) + public static Builder builder(int size, boolean isNonReducing) { - return new Builder(size); + return new Builder(size, isNonReducing); } public static Builder builder() { - return builder(10); + return builder(false); + } + + public static Builder builder(boolean isNonReducing) + { + return builder(10, isNonReducing); } - public static KeyRangeIterator build(Iterable tokens) + + public static KeyRangeIterator build(Iterable tokens, boolean isNonReducing) { - return KeyRangeUnionIterator.builder(Iterables.size(tokens)).add(tokens).build(); + return KeyRangeUnionIterator.builder(Iterables.size(tokens), isNonReducing).add(tokens).build(); } public static class Builder extends KeyRangeIterator.Builder { protected List rangeIterators; - public Builder(int size) + public Builder(int size, boolean isNonReducing) { - super(IteratorType.UNION); + super(IteratorType.UNION, isNonReducing); this.rangeIterators = new ArrayList<>(size); } @@ -164,7 +172,7 @@ protected KeyRangeIterator buildIterator() default: //TODO Need to test whether an initial sort improves things - return new KeyRangeUnionIterator(statistics, rangeIterators); + return new KeyRangeUnionIterator(statistics, rangeIterators, isNonReducing); } } } diff --git a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndex.java b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndex.java index 22bdc284bf3f..75a139d36111 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndex.java @@ -81,6 +81,8 @@ default void update(DecoratedKey key, Clustering clustering, ByteBuffer oldValue Iterator>> iterator(DecoratedKey min, DecoratedKey max); + Iterator nullValuedPrimaryKeys(); + static MemtableIndex createIndex(IndexContext indexContext, Memtable mt) { return indexContext.isVector() ? new VectorMemtableIndex(indexContext, mt) : new TrieMemtableIndex(indexContext, mt); diff --git a/src/java/org/apache/cassandra/index/sai/memory/RowMapping.java b/src/java/org/apache/cassandra/index/sai/memory/RowMapping.java index a435a6e47169..03959eccd6d5 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/RowMapping.java +++ b/src/java/org/apache/cassandra/index/sai/memory/RowMapping.java @@ -134,6 +134,26 @@ protected Pair computeNext() }; } + // todo minkey/maxkey not used here, should it be? + public IntArrayList convertToPostings(Iterator primaryKeyIterator) + { + if (!primaryKeyIterator.hasNext()) + return new IntArrayList(0); + + IntArrayList postings = new IntArrayList(); + do + { + PrimaryKey primaryKey = primaryKeyIterator.next(); + Integer segmentRowId = rowMapping.get(primaryKey::asComparableBytes); + + if (segmentRowId != null) + postings.add(segmentRowId); + } + while (primaryKeyIterator.hasNext()); + + return postings; + } + /** * Complete building in memory RowMapping, mark it as immutable. */ diff --git a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java index 57517d9192d0..0b534659028e 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java @@ -57,6 +57,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.analyzer.AbstractAnalyzer; +import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource; import org.apache.cassandra.index.sai.disk.format.Version; import org.apache.cassandra.index.sai.disk.v6.TermsDistribution; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; @@ -220,7 +221,7 @@ public KeyRangeIterator exactMatch(Expression expression, AbstractBounds keyRange) @@ -228,7 +229,7 @@ private KeyRangeIterator rangeMatch(Expression expression, AbstractBounds subtrie = getSubtrie(expression); var capacity = Math.max(MINIMUM_QUEUE_SIZE, lastQueueSize.get()); - var mergingIteratorBuilder = MergingKeyRangeIterator.builder(keyBounds, indexContext.keyFactory(), capacity); + var mergingIteratorBuilder = MergingKeyRangeIterator.builder(keyBounds, memtable, indexContext.keyFactory(), capacity); lastQueueSize.set(mergingIteratorBuilder.size()); if (!Version.latest().onOrAfter(Version.DB) && TypeUtil.isComposite(expression.validator)) @@ -542,9 +543,9 @@ static class MergingKeyRangeIterator extends KeyRangeIterator this.keySets = new SortingSingletonOrSetIterator(keySets); } - static Builder builder(AbstractBounds keyRange, PrimaryKey.Factory factory, int capacity) + static Builder builder(AbstractBounds keyRange, Memtable mt, PrimaryKey.Factory factory, int capacity) { - return new Builder(keyRange, factory, capacity); + return new Builder(keyRange, mt, factory, capacity); } @Override @@ -575,13 +576,14 @@ static class Builder private final PrimaryKey min; private final PrimaryKey max; private long count; + private final Memtable mt; - - Builder(AbstractBounds keyRange, PrimaryKey.Factory factory, int capacity) + Builder(AbstractBounds keyRange, Memtable mt, PrimaryKey.Factory factory, int capacity) { this.min = factory.createTokenOnly(keyRange.left.getToken()); this.max = factory.createTokenOnly(keyRange.right.getToken()); this.keySets = new ArrayList<>(capacity); + this.mt = mt; } public void add(PrimaryKeys primaryKeys) @@ -592,9 +594,9 @@ public void add(PrimaryKeys primaryKeys) int size = primaryKeys.size(); SortedSet keys = primaryKeys.keys(); if (size == 1) - keySets.add(keys.first()); + keySets.add(new PrimaryKeyWithSource(keys.first(), mt)); else - keySets.add(new SortedSetKeyRangeIterator(keys, min, max, size)); + keySets.add(new SortedSetKeyRangeIterator(keys, mt, min, max, size)); count += size; } @@ -616,22 +618,25 @@ public MergingKeyRangeIterator build() } } - static class SortedSetKeyRangeIterator extends KeyRangeIterator + public static class SortedSetKeyRangeIterator extends KeyRangeIterator { private SortedSet primaryKeySet; private Iterator iterator; private PrimaryKey lastComputedKey; + private final Memtable mt; - public SortedSetKeyRangeIterator(SortedSet source) + public SortedSetKeyRangeIterator(SortedSet source, Memtable mt) { super(source.first(), source.last(), source.size()); this.primaryKeySet = source; + this.mt = mt; } - private SortedSetKeyRangeIterator(SortedSet source, PrimaryKey min, PrimaryKey max, long count) + private SortedSetKeyRangeIterator(SortedSet source, Memtable mt, PrimaryKey min, PrimaryKey max, long count) { super(min, max, count); this.primaryKeySet = source; + this.mt = mt; } @@ -642,7 +647,7 @@ protected PrimaryKey computeNext() if (iterator == null) iterator = primaryKeySet.iterator(); lastComputedKey = iterator.hasNext() ? iterator.next() : endOfData(); - return lastComputedKey; + return lastComputedKey != null ? new PrimaryKeyWithSource(lastComputedKey, mt) : null; } @Override diff --git a/src/java/org/apache/cassandra/index/sai/memory/TrieMemtableIndex.java b/src/java/org/apache/cassandra/index/sai/memory/TrieMemtableIndex.java index d01715023dbc..9919698d667c 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemtableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemtableIndex.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -46,6 +47,7 @@ import org.apache.cassandra.index.sai.iterators.KeyRangeLazyIterator; import org.apache.cassandra.index.sai.iterators.KeyRangeConcatIterator; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; +import org.apache.cassandra.index.sai.iterators.KeyRangeUnionIterator; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; import org.apache.cassandra.index.sai.utils.PrimaryKey; @@ -75,6 +77,7 @@ public class TrieMemtableIndex implements MemtableIndex private final LongAdder writeCount = new LongAdder(); private final LongAdder estimatedOnHeapMemoryUsed = new LongAdder(); private final LongAdder estimatedOffHeapMemoryUsed = new LongAdder(); + private final PrimaryKeys EMPTY_PRIMARY_KEYS; private final Memtable memtable; private final Context sensorContext; @@ -93,6 +96,7 @@ public TrieMemtableIndex(IndexContext indexContext, Memtable memtable) } this.sensorContext = Context.from(indexContext); this.requestTracker = RequestTracker.instance; + this.EMPTY_PRIMARY_KEYS = Version.latest().onDiskFormat().indexFeatureSet().hasNullIndex() ? new PrimaryKeys() : null; } @Override @@ -128,7 +132,7 @@ public long estimatedOffHeapMemoryUsed() @Override public boolean isEmpty() { - return getMinTerm() == null; + return getMinTerm() == null && (EMPTY_PRIMARY_KEYS == null || EMPTY_PRIMARY_KEYS.isEmpty()); } // Returns the minimum indexed term in the combined memory indexes. @@ -168,12 +172,25 @@ public ByteBuffer getMaxTerm() @Override public void index(DecoratedKey key, Clustering clustering, ByteBuffer value, Memtable memtable, OpOrder.Group opGroup) { - if (value == null || (value.remaining() == 0 && !validator.allowsEmpty())) - return; - + writeCount.increment(); RequestSensors sensors = requestTracker.get(); if (sensors != null) sensors.registerSensor(sensorContext, Type.INDEX_WRITE_BYTES); + + if (value == null || (value.remaining() == 0 && !validator.allowsEmpty())) + { + if (EMPTY_PRIMARY_KEYS == null) + return; + // TODO handle updates?? or at least test them since we don't do them here + var bytes = EMPTY_PRIMARY_KEYS.add(indexContext.keyFactory().create(key, clustering)); + memtable.markExtraOnHeapUsed(bytes, opGroup); + estimatedOnHeapMemoryUsed.add(bytes); + if (sensors != null) + sensors.incrementSensor(sensorContext, Type.INDEX_WRITE_BYTES, bytes); + return; + } + // todo handle column tombstones + rangeIndexes[boundaries.getShardForKey(key)].add(key, clustering, value, @@ -189,12 +206,16 @@ public void index(DecoratedKey key, Clustering clustering, ByteBuffer value, Mem if (sensors != null) sensors.incrementSensor(sensorContext, Type.INDEX_WRITE_BYTES, allocatedBytes); }); - writeCount.increment(); } @Override public KeyRangeIterator search(QueryContext queryContext, Expression expression, AbstractBounds keyRange, int limit) { + if (expression.getOp() == Expression.Op.IS_NULL) + return EMPTY_PRIMARY_KEYS.isEmpty() + ? KeyRangeIterator.empty() + : new TrieMemoryIndex.SortedSetKeyRangeIterator(EMPTY_PRIMARY_KEYS.keys(), memtable); + int startShard = boundaries.getShardForToken(keyRange.left.getToken()); int endShard = keyRange.right.isMinimum() ? boundaries.shardCount() - 1 : boundaries.getShardForToken(keyRange.right.getToken()); @@ -257,7 +278,7 @@ public long estimateMatchingRowsCount(Expression expression, AbstractBounds orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit) + public CloseableIterator orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, boolean canSkipOutOfWindowPKs) { if (keys.isEmpty()) return CloseableIterator.emptyIterator(); @@ -316,6 +337,12 @@ public Iterator>> iterator(DecoratedKe new PrimaryKeysMergeReducer(rangeIterators.size())); } + @Override + public Iterator nullValuedPrimaryKeys() + { + return EMPTY_PRIMARY_KEYS != null ? EMPTY_PRIMARY_KEYS.iterator() : Collections.emptyIterator(); + } + // The PrimaryKeysMergeReducer receives the range iterators from each of the range indexes selected based on the // min and max keys passed to the iterator method. It doesn't strictly do any reduction because the terms in each // range index are unique. It will receive at most one range index entry per selected range index before getReduced diff --git a/src/java/org/apache/cassandra/index/sai/plan/Expression.java b/src/java/org/apache/cassandra/index/sai/plan/Expression.java index a1bd9acddc4b..4f3f52de8211 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/Expression.java +++ b/src/java/org/apache/cassandra/index/sai/plan/Expression.java @@ -60,7 +60,7 @@ public enum Op EQ, MATCH, PREFIX, NOT_EQ, RANGE, CONTAINS_KEY, CONTAINS_VALUE, NOT_CONTAINS_VALUE, NOT_CONTAINS_KEY, - IN, ORDER_BY, BOUNDED_ANN; + IN, ORDER_BY, BOUNDED_ANN, IS_NULL; public static Op valueOf(Operator operator) { @@ -148,7 +148,6 @@ public boolean isContains() public Bound lower, upper; private float boundedAnnEuclideanDistanceThreshold = 0; private float searchRadiusMeters = 0; - private float searchRadiusDegreesSquared = 0; public int topK; // These variables are only meant to be used for final validation of the range search. They are not // meant to be used when searching the index. See the 'add' method below for additional explanation. @@ -156,6 +155,14 @@ public boolean isContains() final List exclusions = new ArrayList<>(); + public Expression(IndexContext indexContext, Op operation) + { + this.context = indexContext; + this.analyzerFactory = indexContext.getAnalyzerFactory(); + this.validator = indexContext.getValidator(); + this.operation = operation; + } + public Expression(IndexContext indexContext) { this.context = indexContext; diff --git a/src/java/org/apache/cassandra/index/sai/plan/Plan.java b/src/java/org/apache/cassandra/index/sai/plan/Plan.java index 2bb379a16f14..31c485d61758 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/Plan.java +++ b/src/java/org/apache/cassandra/index/sai/plan/Plan.java @@ -381,6 +381,13 @@ final boolean contains(Function condition) return res == ControlFlow.Break; } + /** Returns true if all nodes in the plan match the condition */ + final boolean allMatch(Function condition) + { + ControlFlow res = forEach(node -> (condition.apply(node)) ? ControlFlow.Continue : ControlFlow.Break); + return res == ControlFlow.Continue; + } + /** * Returns a new plan with the given node filtering restriction removed. * Searches for the subplan to remove recursively down the tree. @@ -602,9 +609,22 @@ protected final KeysIterationCost cost() * The node itself isn't supposed for doing the actual work, but rather serves as a director which * delegates the work to the query controller through the passed Executor. * - * @param executor does all the hard work like fetching keys from the indexes or ANN sort + * @param executor does all the hard work like fetching keys from the indexes or ANN sort + */ + public final Iterator execute(Executor executor) + { + return execute(executor, false); + } + + /** + * Executes the operation represented by this node. + * The node itself isn't supposed for doing the actual work, but rather serves as a director which + * delegates the work to the query controller through the passed Executor. + * + * @param executor does all the hard work like fetching keys from the indexes or ANN sort + * @param isNonReducing if true, the executor should not reduce the number of keys returned */ - protected abstract Iterator execute(Executor executor); + protected abstract Iterator execute(Executor executor, boolean isNonReducing); protected abstract KeysIteration withAccess(Access patterns); @@ -643,6 +663,8 @@ protected final Plan withUpdatedSubplans(Function updater) // There are no subplans so it is a noop return this; } + + protected abstract Expression predicate(); } /** @@ -669,6 +691,13 @@ protected Orderer ordering() return null; } + @Nullable + @Override + protected Expression predicate() + { + return null; + } + @Override protected double estimateSelectivity() { @@ -676,7 +705,7 @@ protected double estimateSelectivity() } @Override - protected KeyRangeIterator execute(Executor executor) + protected KeyRangeIterator execute(Executor executor, boolean isNonReducing) { return KeyRangeIterator.empty(); } @@ -721,6 +750,13 @@ protected Orderer ordering() return null; } + @Nullable + @Override + protected Expression predicate() + { + return null; + } + @Override protected double estimateSelectivity() { @@ -728,7 +764,7 @@ protected double estimateSelectivity() } @Override - protected KeyRangeIterator execute(Executor executor) + protected KeyRangeIterator execute(Executor executor, boolean isNonReducing) { // Not supported because it doesn't make a lot of sense. // A direct scan of table data would be certainly faster. @@ -803,6 +839,13 @@ protected final Orderer ordering() return ordering; } + @Nullable + @Override + protected Expression predicate() + { + return predicate; + } + @Override protected final KeysIterationCost estimateCost() { @@ -875,11 +918,11 @@ private double estimateCostPerSkip(double step) } @Override - protected Iterator execute(Executor executor) + protected Iterator execute(Executor executor, boolean isNonReducing) { return (ordering != null) ? executor.getTopKRows(predicate, max(1, round((float) access.expectedAccessCount(factory.tableMetrics.rows)))) - : executor.getKeysFromIndex(predicate); + : executor.getKeysFromIndex(predicate, isNonReducing); } public String getIndexName() @@ -1041,13 +1084,13 @@ protected Orderer ordering() } @Override - protected KeyRangeIterator execute(Executor executor) + protected KeyRangeIterator execute(Executor executor, boolean isNonReducing) { - KeyRangeIterator.Builder builder = KeyRangeUnionIterator.builder(); + KeyRangeIterator.Builder builder = KeyRangeUnionIterator.builder(isNonReducing); try { for (KeysIteration plan : subplansSupplier.get()) - builder.add((KeyRangeIterator) plan.execute(executor)); + builder.add((KeyRangeIterator) plan.execute(executor, isNonReducing)); return builder.build(); } catch (Throwable t) @@ -1177,13 +1220,13 @@ protected KeysIterationCost estimateCost() } @Override - protected KeyRangeIterator execute(Executor executor) + protected KeyRangeIterator execute(Executor executor, boolean isNonReducing) { KeyRangeIterator.Builder builder = KeyRangeIntersectionIterator.builder(); try { for (KeysIteration plan : subplansSupplier.get()) - builder.add((KeyRangeIterator) plan.execute(executor)); + builder.add((KeyRangeIterator) plan.execute(executor, isNonReducing)); return builder.build(); } @@ -1279,11 +1322,21 @@ protected Orderer ordering() } @Override - protected Iterator execute(Executor executor) + protected Iterator execute(Executor executor, boolean isNonReducing) { - KeyRangeIterator sourceIterator = (KeyRangeIterator) source.execute(executor); + assert !isNonReducing : "KeysSort does not support non-reducing execution"; + // If any of the nodes are for plans that do not allow non-reducing execution, we must fetch all of the + // relevant keys. + boolean canBeNonReducing = allMatch(node -> { + if (!(node instanceof Plan.Leaf)) + return true; + Expression expression = ((Plan.Leaf) node).predicate(); + return !expression.context.isNonFrozenCollection() && expression.context.indexFeatureSet().hasNullIndex(); + }); + // However, we do need a non-reducing execution here, because we need to fetch all keys from the source + KeyRangeIterator sourceIterator = (KeyRangeIterator) source.execute(executor, canBeNonReducing); int softLimit = max(1, round((float) access.expectedAccessCount(factory.tableMetrics.rows))); - return executor.getTopKRows(sourceIterator, softLimit); + return executor.getTopKRows(sourceIterator, softLimit, canBeNonReducing); } @Override @@ -1328,9 +1381,17 @@ protected Orderer ordering() return ordering; } + @Nullable + @Override + protected Expression predicate() + { + return null; + } + @Override - protected Iterator execute(Executor executor) + protected Iterator execute(Executor executor, boolean isNonReducing) { + assert !isNonReducing : "AnnIndexScan does not support non-reducing execution"; int softLimit = max(1, round((float) access.expectedAccessCount(factory.tableMetrics.rows))); return executor.getTopKRows((Expression) null, softLimit); } @@ -1834,9 +1895,9 @@ public TableMetrics(long rows, double avgCellsPerRow, double avgBytesPerRow, int */ public interface Executor { - Iterator getKeysFromIndex(Expression predicate); + Iterator getKeysFromIndex(Expression predicate, boolean isNonReducing); Iterator getTopKRows(Expression predicate, int softLimit); - Iterator getTopKRows(KeyRangeIterator keys, int softLimit); + Iterator getTopKRows(KeyRangeIterator keys, int softLimit, boolean isNonReducing); } /** diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java index 85285359bd04..9cc505703ac0 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java @@ -429,10 +429,10 @@ public Iterator buildIterator(Plan plan) * Creates an iterator over keys of rows that match given WHERE predicate. * Does not cache the iterator! */ - private KeyRangeIterator buildIterator(Expression predicate) + private KeyRangeIterator buildIterator(Expression predicate, boolean isNonReducing) { QueryView view = getQueryView(predicate.context); - return KeyRangeTermIterator.build(predicate, view.referencedIndexes, mergeRange, queryContext, false, Integer.MAX_VALUE); + return KeyRangeTermIterator.build(predicate, view.referencedIndexes, mergeRange, queryContext, false, Integer.MAX_VALUE, isNonReducing); } /** @@ -511,7 +511,7 @@ public void buildPlanForExpressions(Plan.Builder builder, Collection } @Override - public Iterator getKeysFromIndex(Expression predicate) + public Iterator getKeysFromIndex(Expression predicate, boolean isNonReducing) { Collection rangeIterators = keyIterators.get(predicate); // This will be non-empty only if we created the iterator as part of the query planning process. @@ -522,7 +522,7 @@ public Iterator getKeysFromIndex(Expression predicate) return iterator; } - return buildIterator(predicate); + return buildIterator(predicate, isNonReducing); } /** @@ -554,7 +554,7 @@ public CloseableIterator getTopKRows(Expression predicate /** * Use the configured {@link Orderer} to sort the rows from the given source iterator. */ - public CloseableIterator getTopKRows(KeyRangeIterator source, int softLimit) + public CloseableIterator getTopKRows(KeyRangeIterator source, int softLimit, boolean canSkipOutOfWindowPKs) { try { @@ -564,7 +564,7 @@ public CloseableIterator getTopKRows(KeyRangeIterator sou FileUtils.closeQuietly(source); return CloseableIterator.emptyIterator(); } - var result = getTopKRows(primaryKeys, softLimit); + var result = getTopKRows(primaryKeys, softLimit, canSkipOutOfWindowPKs); // We cannot close the source iterator eagerly because it produces partially loaded PrimaryKeys // that might not be needed until a deeper search into the ordering index, which happens after // we exit this block. @@ -603,16 +603,17 @@ private List materializeKeys(KeyRangeIterator source) return primaryKeys; } - private CloseableIterator getTopKRows(List sourceKeys, int softLimit) + private CloseableIterator getTopKRows(List sourceKeys, int softLimit, boolean canSkipOutOfWindowPKs) { Tracing.logAndTrace(logger, "SAI predicates produced {} keys", sourceKeys.size()); QueryView view = getQueryView(orderer.context); var memtableResults = view.memtableIndexes.stream() - .map(index -> index.orderResultsBy(queryContext, - sourceKeys, - orderer, - softLimit)) - .collect(Collectors.toList()); + .map(index -> index.orderResultsBy(queryContext, + sourceKeys, + orderer, + softLimit, + canSkipOutOfWindowPKs)) + .collect(Collectors.toList()); try { var totalRows = view.getTotalSStableRows(); @@ -620,7 +621,8 @@ private CloseableIterator getTopKRows(List so sourceKeys, orderer, softLimit, - totalRows); + totalRows, + canSkipOutOfWindowPKs); var sstableScoredPrimaryKeyIterators = searchSSTables(view, ssTableSearcher); sstableScoredPrimaryKeyIterators.addAll(memtableResults); return MergeIterator.getNonReducingCloseable(sstableScoredPrimaryKeyIterators, orderer.getComparator()); @@ -861,7 +863,7 @@ private long estimateMatchingRowCountUsingIndex(Expression predicate) { // For older indexes we don't have histograms, so we need to construct the iterator // and ask for the posting list size. - KeyRangeIterator iterator = buildIterator(predicate); + KeyRangeIterator iterator = buildIterator(predicate, false); // We're not going to consume the iterator here, so memorize it for future uses. // It can be used when executing the plan. diff --git a/src/java/org/apache/cassandra/index/sai/utils/MemtableOrdering.java b/src/java/org/apache/cassandra/index/sai/utils/MemtableOrdering.java index bbc9398c46c4..96d1ee6a023a 100644 --- a/src/java/org/apache/cassandra/index/sai/utils/MemtableOrdering.java +++ b/src/java/org/apache/cassandra/index/sai/utils/MemtableOrdering.java @@ -57,5 +57,5 @@ List> orderBy(QueryContext queryContext * * Assumes that the given spans the same rows as the implementing index's segment. */ - CloseableIterator orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit); + CloseableIterator orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, boolean canSkipOutOfWindowPKs); } diff --git a/src/java/org/apache/cassandra/index/sai/utils/SegmentOrdering.java b/src/java/org/apache/cassandra/index/sai/utils/SegmentOrdering.java index 3fe522d679c3..b1f1cce3cda2 100644 --- a/src/java/org/apache/cassandra/index/sai/utils/SegmentOrdering.java +++ b/src/java/org/apache/cassandra/index/sai/utils/SegmentOrdering.java @@ -57,5 +57,5 @@ public interface SegmentOrdering * results the query requested. The keys passed to the method will already be limited to keys in the segment's * Primary Key range. */ - CloseableIterator orderResultsBy(SSTableReader reader, QueryContext context, List keys, Orderer orderer, int limit) throws IOException; + CloseableIterator orderResultsBy(SSTableReader reader, QueryContext context, List keys, Orderer orderer, int limit, boolean canSkipOutOfWindowPKs) throws IOException; } diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorTester.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorTester.java index bcb0577b4a76..63ad26475770 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorTester.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorTester.java @@ -185,7 +185,7 @@ abstract static class Versioned extends VectorTester @Parameterized.Parameters(name = "{0}") public static Collection data() { - return Stream.of(Version.CA, Version.DC).map(v -> new Object[]{ v }).collect(Collectors.toList()); + return Stream.of(Version.CA, Version.DC, Version.FA).map(v -> new Object[]{ v }).collect(Collectors.toList()); } @Before diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorUpdateDeleteTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorUpdateDeleteTest.java index aeac6d77b9da..3d778d0fe39f 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorUpdateDeleteTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorUpdateDeleteTest.java @@ -993,4 +993,77 @@ public void testSameRowInMultipleSSTablesWithSameTimestamp() throws Throwable assertRows(execute("SELECT ck FROM %s ORDER BY val ANN OF [1.0, 2.0, 3.0] LIMIT 2"), row(1)); }); } + + @Test + public void testSplitRowSameTimestampNoQueryOptimizer() throws Throwable + { + QueryController.QUERY_OPT_LEVEL = 0; + createTable("CREATE TABLE %s (pk int primary key, str1 text, str2 text, val vector)"); + createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(str1) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(str2) USING 'StorageAttachedIndex'"); + disableCompaction(); + + // Insert two apparently complete rows. They don't technically count because the timestamps are the same. + // The real row is (0, 'B', 'B', [1.0, 2.0, 3.0]) because the tiebreaker for two cells with the same + // timestamp is the value itself (we take the higher value). + execute("INSERT INTO %s (pk, str1, str2, val) VALUES (0, 'A', 'B', [1.0, 2.0, 3.0]) USING TIMESTAMP 1"); + flush(); + execute("INSERT INTO %s (pk, str1, str2, val) VALUES (0, 'B', 'A', [1.0, 2.0, 3.0]) USING TIMESTAMP 1"); + + beforeAndAfterFlush(() -> { + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10")); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'B' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' OR str2 = 'A' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10")); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'B' OR str2 = 'A' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' OR str2 = 'B' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'B' OR str2 = 'B' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' AND str2 = 'A' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10")); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' AND str2 = 'B' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10")); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'B' AND str2 = 'B' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + }); + } + + @Test + public void testSplitRowWithNoQueryOptimizer() throws Throwable + { + QueryController.QUERY_OPT_LEVEL = 0; + createTable("CREATE TABLE %s (pk int primary key, str1 text, str2 text, int1 int, int2 int, val vector)"); + createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(str1) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(str2) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(int1) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(int2) USING 'StorageAttachedIndex'"); + disableCompaction(); + + execute("INSERT INTO %s (pk, str1, int1) VALUES (0, 'A', 10)"); + execute("INSERT INTO %s (pk, str1, str2, int1, int2, val) VALUES (1, 'B', 'C', 20, 30, [1.0, 2.0, 3.0])"); + flush(); + compact(); + execute("INSERT INTO %s (pk, str2, int2) VALUES (0, 'B', 40)"); + flush(); + // Write the vector by itself + execute("INSERT INTO %s (pk, val) VALUES (0, [1.0, 2.0, 3.0])"); + + beforeAndAfterFlush(() -> { + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' OR str2 = 'A' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' AND str2 = 'A' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10")); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' AND str2 = 'B' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + + // Additional assertions for numeric columns + assertRows(execute("SELECT pk FROM %s WHERE int1 = 10 ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE int1 = 20 OR int2 = 30 ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(1)); + assertRows(execute("SELECT pk FROM %s WHERE int1 = 10 AND int2 = 40 ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE int1 = 10 AND int2 = 40 ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + + // Mixed queries involving text and integer columns + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'A' AND int1 = 10 ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE str2 = 'B' AND int2 = 40 ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE str1 = 'B' AND int1 = 20 AND int2 = 30 ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(1)); + assertRows(execute("SELECT pk FROM %s WHERE (str1 = 'A' OR str2 = 'B') AND int1 = 10 ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(0)); + assertRows(execute("SELECT pk FROM %s WHERE (int1 = 20 OR int2 = 40) AND str1 = 'B' ORDER BY val ann of [1.0, 2.0, 3.0] LIMIT 10"), row(1)); + }); + } + } diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/kdtree/KDTreeIndexBuilder.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/kdtree/KDTreeIndexBuilder.java index fdbd1f9d91f8..8e1526327c77 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/kdtree/KDTreeIndexBuilder.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/kdtree/KDTreeIndexBuilder.java @@ -83,6 +83,18 @@ public SSTableId getSSTableId() return new SequenceBasedSSTableId(0); } + @Override + public long getMinTimestamp() + { + return 0; + } + + @Override + public long getMaxTimestamp() + { + return Long.MAX_VALUE; + } + @Override public PrimaryKey primaryKeyFromRowId(long sstableRowId) { diff --git a/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeUnionIteratorTest.java b/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeUnionIteratorTest.java index e2a92ee0dd05..925507285d16 100644 --- a/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeUnionIteratorTest.java +++ b/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeUnionIteratorTest.java @@ -199,7 +199,7 @@ public void testBuilder() {{ add(new LongIterator(new long[]{1L, 2L, 4L})); add(new LongIterator(new long[]{3L, 5L, 6L})); - }}); + }}, false); Assert.assertEquals(convert(1L, 2L, 3L, 4L, 5L, 6L), convert(tokens)); @@ -253,7 +253,7 @@ public void testMergingMultipleIterators() builderB.add(new LongIterator(new long[] { 7L, 9L, 11L })); builderB.add(new LongIterator(new long[] { 2L, 4L, 6L })); - KeyRangeIterator union = KeyRangeUnionIterator.build(Arrays.asList(builderA.build(), builderB.build())); + KeyRangeIterator union = KeyRangeUnionIterator.build(Arrays.asList(builderA.build(), builderB.build()), false); Assert.assertEquals(convert(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L), convert(union)); } diff --git a/test/unit/org/apache/cassandra/index/sai/plan/PlanTest.java b/test/unit/org/apache/cassandra/index/sai/plan/PlanTest.java index bd1179ac6886..b32fa9593558 100644 --- a/test/unit/org/apache/cassandra/index/sai/plan/PlanTest.java +++ b/test/unit/org/apache/cassandra/index/sai/plan/PlanTest.java @@ -356,7 +356,7 @@ public void annSortFilterLimit() // because we're getting top of the rows already prefiltered by the index: Plan.Executor executor = Mockito.mock(Plan.Executor.class); Objects.requireNonNull(plan.firstNodeOfType(Plan.KeysIteration.class)).execute(executor); - Mockito.verify(executor, Mockito.times(1)).getTopKRows((KeyRangeIterator) Mockito.any(), Mockito.eq(limit)); + Mockito.verify(executor, Mockito.times(1)).getTopKRows((KeyRangeIterator) Mockito.any(), Mockito.eq(limit), Mockito.eq(true)); } @Test @@ -508,7 +508,7 @@ public void rangeIterator() Plan.Executor executor = new Plan.Executor() { @Override - public Iterator getKeysFromIndex(Expression predicate) + public Iterator getKeysFromIndex(Expression predicate, boolean isReducing) { return iterators.get(predicate); } @@ -520,7 +520,7 @@ public Iterator getTopKRows(Expression predicate, int soft } @Override - public Iterator getTopKRows(KeyRangeIterator keys, int softLimit) + public Iterator getTopKRows(KeyRangeIterator keys, int softLimit, boolean isNonReducing) { throw new UnsupportedOperationException(); }