Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-9104: Port over chunk cache improvements from DSE #1495

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3968200
Remove unused BufferHolder methods
blambov Jan 8, 2025
1ad3ac4
DB-2904 port: Use same size buffers in chunk cache
blambov Jan 8, 2025
cfa0842
Always use off-heap memory for chunk cache.
blambov Jan 8, 2025
0f84fc2
Use networking buffer pool for compressed reads.
blambov Jan 8, 2025
31a5fda
Allow buffer pool to return one-buffer multi-page chunks
blambov Jan 8, 2025
6aa0130
Port over some chunk cache tests
blambov Jan 9, 2025
1a15fa5
Set up for on-heap memory usage test
blambov Jan 10, 2025
c4f14ee
Introduce fileID and invalidate file by dropping id.
blambov Jan 9, 2025
307aa7f
Store addresses and attachments to avoid a direct buffer per entry
blambov Jan 10, 2025
be317b2
Sleep for jmap
blambov Jan 10, 2025
1313839
Remove pre-computed key hash
blambov Jan 10, 2025
36be7e5
Revert "Sleep for jmap"
blambov Jan 10, 2025
9190348
Revert "Set up for on-heap memory usage test"
blambov Jan 10, 2025
d4e230e
Review changes and license fix
blambov Jan 10, 2025
d5a22c8
Drop ChunkReader reference from Key
blambov Jan 10, 2025
fa2551f
Revert unneeded change
blambov Jan 10, 2025
89817b4
Test improvements
blambov Jan 10, 2025
c0c4716
Use page splitting for large buffers too, to avoid having to store a …
blambov Jan 10, 2025
7ab70b0
Fix test.
blambov Jan 10, 2025
2539b18
Review comments
blambov Jan 13, 2025
c01dbb7
Move code unchanged in ChunkCache.java
blambov Jan 13, 2025
5262c59
Fix test compilation
blambov Jan 13, 2025
7f0d6ba
Change sizeOfFile to accept File
blambov Jan 13, 2025
6686f6d
Fix and test chunk cache retention after early open
blambov Jan 13, 2025
462fb34
Provide precise end position for early-open sstables
blambov Jan 13, 2025
d755d32
Move cache invalidation FileHandle.Builder creation
blambov Jan 14, 2025
9fc879e
Add comment and remove unused method
blambov Jan 14, 2025
09199b3
Test fix
blambov Jan 14, 2025
3f08d7f
Test fix
blambov Jan 14, 2025
ba1232f
Invalidate cache only on request by calling file handle builder's inv…
blambov Jan 14, 2025
226bc15
Revert "Invalidate cache only on request by calling file handle build…
blambov Jan 14, 2025
743e53a
Invalidate both on making SequentialWriter and on global SSTableReade…
blambov Jan 14, 2025
1bbc990
Remove invalidation in SequentialWriter and rely on invalidation duri…
blambov Jan 14, 2025
dac75ad
Change order of cache invalidation and obsoletion
blambov Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
613 changes: 417 additions & 196 deletions src/java/org/apache/cassandra/cache/ChunkCache.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,12 @@ protected void checkKeyOrder(DecoratedKey decoratedKey)
throw new AssertionError("Last written key " + currentKey + " >= current key " + decoratedKey + " writing into " + getDataFile());
}

protected void invalidateCacheAtBoundary(FileHandle dfile)
protected void invalidateCacheAtPreviousBoundary(FileHandle dfile, long newBoundary)
{
if (lastEarlyOpenLength != 0 && dfile.dataLength() > lastEarlyOpenLength)
if (lastEarlyOpenLength != 0 && newBoundary > lastEarlyOpenLength)
dfile.invalidateIfCached(lastEarlyOpenLength);

lastEarlyOpenLength = dfile.dataLength();
lastEarlyOpenLength = newBoundary;
}

public long getFilePointer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public boolean openEarly(Consumer<SSTableReader> callWhenReady)
dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(boundary.dataLength));
int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete(boundary.dataLength);
invalidateCacheAtBoundary(dfile);
invalidateCacheAtPreviousBoundary(dfile, boundary.dataLength);
SSTableReader sstable = BigTableReader.internalOpen(descriptor,
components(), metadata,
ifile, dfile,
Expand Down Expand Up @@ -246,7 +246,7 @@ protected SSTableReader openFinal(SSTableReader.OpenReason openReason)
if (compression)
dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(0));
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete();
invalidateCacheAtBoundary(dfile);
invalidateCacheAtPreviousBoundary(dfile, Long.MAX_VALUE);
SSTableReader sstable = SSTableReader.internalOpen(descriptor,
components(),
metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.function.Consumer;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.tries.IncrementalTrieWriter;
import org.apache.cassandra.io.tries.Walker;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
Expand Down Expand Up @@ -112,7 +112,14 @@ private void refreshReadableBoundary()

try (FileHandle fh = fhBuilder.complete(writer.getLastFlushOffset()))
{
PartitionIndex pi = new PartitionIndexEarly(fh, partialIndexTail.root(), partialIndexTail.count(), firstKey, partialIndexLastKey, partialIndexTail.cutoff(), partialIndexTail.tail(), version);
PartitionIndex pi = new PartitionIndexEarly(fh,
partialIndexTail.root(),
partialIndexTail.count(),
SSTable.getMinimalKey(firstKey),
SSTable.getMinimalKey(partialIndexLastKey),
partialIndexTail.cutoff(),
partialIndexTail.tail(),
version);
partialIndexConsumer.accept(pi);
partialIndexConsumer = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ protected void addIndexBlock() throws IOException
firstClustering = null;
++rowIndexCount;
}

long partitionStart()
{
return initialPosition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,19 @@ public RowIndexEntry endPartition() throws IOException
@SuppressWarnings("resource")
public boolean openEarly(Consumer<SSTableReader> callWhenReady)
{
long dataLength = dataFile.position();
// Because the partition index writer is one partition behind, we want the file to stop at the start of the
// last partition that was written.
long dataLength = partitionWriter.partitionStart();

return iwriter.buildPartial(dataLength, partitionIndex ->
{
StatsMetadata stats = statsMetadata();
FileHandle ifile = iwriter.rowIndexFHBuilder.complete(iwriter.rowIndexFile.getLastFlushOffset());
if (compression)
dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(dataFile.getLastFlushOffset()));
dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(dataLength));
int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete(dataFile.getLastFlushOffset());
invalidateCacheAtBoundary(dfile);
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete(dataLength);
invalidateCacheAtPreviousBoundary(dfile, dataLength);
SSTableReader sstable = TrieIndexSSTableReader.internalOpen(descriptor,
components(), metadata,
ifile, dfile, partitionIndex, iwriter.bf.sharedCopy(),
Expand Down Expand Up @@ -231,9 +233,9 @@ protected SSTableReader openFinal(SSTableReader.OpenReason openReason)
FileHandle rowIndexFile = iwriter.rowIndexFHBuilder.complete();
int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
if (compression)
dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(dataFile.getLastFlushOffset()));
dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(0));
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete();
invalidateCacheAtBoundary(dfile);
invalidateCacheAtPreviousBoundary(dfile, Long.MAX_VALUE);
SSTableReader sstable = TrieIndexSSTableReader.internalOpen(descriptor,
components(),
this.metadata,
Expand Down
8 changes: 5 additions & 3 deletions src/java/org/apache/cassandra/io/storage/StorageProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.metadata.ZeroCopyMetadata;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileHandle;
Expand Down Expand Up @@ -256,14 +257,15 @@ public File createDirectory(String dir, DirectoryType type)
public void invalidateFileSystemCache(File file)
{
INativeLibrary.instance.trySkipCache(file, 0, 0);
if (ChunkCache.instance != null)
ChunkCache.instance.invalidateFile(file);
}

@Override
public void invalidateFileSystemCache(Descriptor desc, boolean tidied)
{
StorageProvider.instance.invalidateFileSystemCache(desc.fileFor(Component.DATA));
StorageProvider.instance.invalidateFileSystemCache(desc.fileFor(Component.ROW_INDEX));
StorageProvider.instance.invalidateFileSystemCache(desc.fileFor(Component.PARTITION_INDEX));
for (Component component : SSTable.discoverComponentsFor(desc))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory I like that option, but I'll note 2 things:

  • discoverComponentsFor only include components whose files still exists, but this invalidateFileSystemCache method run after obsoletion.commit() in SSTableReader.GlobalTidy.tidy() and the later deletes the files, so I wonder if this couldn't be a problem in some case (but I'm only so familiar with the whole tidying code so maybe the obsoletion only do something in cases where nothing can be in caches?
  • discoverComponentsFor also has the misleading behavior of only including "hard-coded" components, meaning no "custom" ones and so none of the SAI files. To include SAI files we'd probably have to call SSTable.readTOC(desc, false), though that implies the TOC is still there so previous point also a question here. I'll note that C* proper never put SAI files into the chunk cache, and that we can override this method in CNDB, so I'm ok if we prefer to stick to hard coded components here and leave the concern for SAI files to CNDB, but figure it was worth mentioning.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the test to catch this problem.

Changed the order of cache invalidation and obsoletion to fix it. Looking at relevant code in CC and CNDB, there doesn't appear to be anything that depends on this order.

Kept the discovery's use of discoverComponentsFor for now, because that is what the obsoletion code does. How do SAI components get deleted?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do SAI components get deleted?

Honestly, I'm not sure. StorageAttachedIndexGroup.handleNotification has a bunch of code that run when it's notified of the removal of a sstable, but look at it right now, I'm not finding where it actually delete the files in the case of, say, a compacted sstable. And SSTableIndex has a comment that says it's happening in LogTransaction, but I'm not sure how given the tidier...

@jasonstack do you know off the top of your head?

invalidateFileSystemCache(desc.fileFor(component));
}

protected Config.DiskAccessMode accessMode(Component component)
Expand Down
49 changes: 16 additions & 33 deletions src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@

import org.apache.cassandra.utils.memory.BufferPools;

/**
* Buffer manager used for reading from a ChunkReader when cache is not in use. Instances of this class are
* reader-specific and thus do not need to be thread-safe since the reader itself isn't.
*
* The instances reuse themselves as the BufferHolder to avoid having to return a new object for each rebuffer call.
*/
/// Buffer manager used for reading from a [ChunkReader] when cache is not in use. They use a buffer produced by the
/// "networking" buffer pool, which is the one to be used for buffers that are not to be retained for a long time
/// (the lifetime of this object is contained by the lifetime of a [RandomAccessReader] which is contained in a read
/// operation's lifetime).
///
/// Instances of this class are reader-specific and thus do not need to be thread-safe since the reader itself isn't.
///
/// The instances reuse themselves as the BufferHolder to avoid having to return a new object for each rebuffer call.
public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer.BufferHolder
{
protected final ChunkReader source;
Expand All @@ -45,14 +47,20 @@ public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer
protected BufferManagingRebufferer(ChunkReader wrapped)
{
this.source = wrapped;
buffer = BufferPools.forChunkCache().get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
// Note: This class uses the networking buffer pool which makes better sense for short-lifetime buffers.
// Because this is meant to be used when the chunk cache is disabled, it also makes sense to use any memory
// that may have been allocated for in-flight data by using the chunk-cache pool.
// However, if some new functionality decides to use this class in the presence of the chunk cache (e.g.
// cache-bypassing compaction), using the chunk-cache pool here will certainly cause hard-to-diagnose issues
// that we would prefer to avoid.
buffer = BufferPools.forNetworking().get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
pcmanus marked this conversation as resolved.
Show resolved Hide resolved
buffer.limit(0);
}

@Override
public void closeReader()
{
BufferPools.forChunkCache().put(buffer);
BufferPools.forNetworking().put(buffer);
offset = -1;
}

Expand Down Expand Up @@ -102,31 +110,6 @@ public ByteBuffer buffer()
return buffer.duplicate();
}

@Override
public ByteOrder order()
{
return buffer.order();
}

@Override
public FloatBuffer floatBuffer()
{
return buffer.asFloatBuffer();
}

@Override
public IntBuffer intBuffer()
{
return buffer.asIntBuffer();
}

@Override
public LongBuffer longBuffer()
{
return buffer.asLongBuffer();
}


public long offset()
{
return offset;
Expand Down
13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/io/util/ChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,17 @@ public interface ChunkReader extends RebuffererFactory
* This is not guaranteed to be fulfilled.
*/
BufferType preferredBufferType();

/**
* In some cases we may end up with both compressed and uncompressed data for the same file in
* the cache. This type is used to distinguish between them.
*/
enum ReaderType
{
SIMPLE,
COMPRESSED;
/** The number of types. Declared as a constant to avoid allocating on values(). */
public static final int COUNT = ReaderType.values().length;
}
ReaderType type();
}
88 changes: 43 additions & 45 deletions src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.storage.StorageProvider;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.memory.BufferPools;

public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader
{
Expand Down Expand Up @@ -91,11 +92,14 @@ public Rebufferer instantiateRebufferer()
return new BufferManagingRebufferer.Aligned(this);
}

public ReaderType type()
{
return ReaderType.COMPRESSED;
}

public static class Standard extends CompressedChunkReader
{
// we read the raw compressed bytes into this buffer, then uncompressed them into the provided one.
private final ThreadLocalByteBufferHolder bufferHolder;

public Standard(ChannelProxy channel, CompressionMetadata metadata)
{
this(channel, metadata, 0);
Expand All @@ -104,7 +108,6 @@ public Standard(ChannelProxy channel, CompressionMetadata metadata)
public Standard(ChannelProxy channel, CompressionMetadata metadata, long startOffset)
{
super(channel, metadata, startOffset);
bufferHolder = new ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
}

@Override
Expand All @@ -122,57 +125,54 @@ public void readChunk(long position, ByteBuffer uncompressed)
: chunk.length;

long chunkOffset = chunk.offset - onDiskStartOffset;
if (chunk.length < maxCompressedLength)
boolean shouldDecompress = chunk.length < maxCompressedLength;
if (shouldDecompress || shouldCheckCrc) // when we need to read the CRC too, follow the decompression path to avoid a second channel read call
{
ByteBuffer compressed = bufferHolder.getBuffer(length);
ByteBuffer compressed = BufferPools.forNetworking().getAtLeast(length, metadata.compressor().preferredBufferType());

if (channel.read(compressed, chunkOffset) != length)
throw new CorruptBlockException(channel.filePath(), chunk);

compressed.flip();
compressed.limit(chunk.length);
uncompressed.clear();

if (shouldCheckCrc)
try
{
int checksum = (int) ChecksumType.CRC32.of(compressed);

compressed.limit(length);
int storedChecksum = compressed.getInt();
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);
if (channel.read(compressed, chunkOffset) != length)
throw new CorruptBlockException(channel.filePath(), chunk);

compressed.position(0).limit(chunk.length);
}
if (shouldCheckCrc)
{
// compute checksum of the compressed data
compressed.position(0).limit(chunk.length);
int checksum = (int) ChecksumType.CRC32.of(compressed);
// the remaining bytes are the checksum
compressed.limit(length);
int storedChecksum = compressed.getInt();
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);
}

try
{
metadata.compressor().uncompress(compressed, uncompressed);
compressed.position(0).limit(chunk.length);
uncompressed.clear();

try
{
if (shouldDecompress)
metadata.compressor().uncompress(compressed, uncompressed);
else
uncompressed.put(compressed);
}
catch (IOException e)
{
throw new CorruptBlockException(channel.filePath(), chunk, e);
}
}
catch (IOException e)
finally
{
throw new CorruptBlockException(channel.filePath(), chunk, e);
BufferPools.forNetworking().put(compressed);
}
}
else
{
uncompressed.position(0).limit(chunk.length);
if (channel.read(uncompressed, chunkOffset) != chunk.length)
throw new CorruptBlockException(channel.filePath(), chunk);

if (shouldCheckCrc)
{
uncompressed.flip();
int checksum = (int) ChecksumType.CRC32.of(uncompressed);

ByteBuffer scratch = bufferHolder.getBuffer(Integer.BYTES);

if (channel.read(scratch, chunkOffset + chunk.length) != Integer.BYTES)
throw new CorruptBlockException(channel.filePath(), chunk);
int storedChecksum = scratch.getInt(0);
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);
}
}
uncompressed.flip();
}
Expand Down Expand Up @@ -223,24 +223,22 @@ public void readChunk(long position, ByteBuffer uncompressed)
int chunkOffsetInSegment = Ints.checkedCast(chunk.offset - segmentOffset);
ByteBuffer compressedChunk = region.buffer();

compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length);

uncompressed.clear();

try
{
if (shouldCheckCrc())
{
compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length);
int checksum = (int) ChecksumType.CRC32.of(compressedChunk);

compressedChunk.limit(compressedChunk.capacity());
int storedChecksum = compressedChunk.getInt();
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);

compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length);
}

compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length);
uncompressed.clear();

if (chunk.length < maxCompressedLength)
metadata.compressor().uncompress(compressedChunk, uncompressed);
else
Expand Down
Loading