Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-9104: Port over chunk cache improvements from DSE #1495

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3968200
Remove unused BufferHolder methods
blambov Jan 8, 2025
1ad3ac4
DB-2904 port: Use same size buffers in chunk cache
blambov Jan 8, 2025
cfa0842
Always use off-heap memory for chunk cache.
blambov Jan 8, 2025
0f84fc2
Use networking buffer pool for compressed reads.
blambov Jan 8, 2025
31a5fda
Allow buffer pool to return one-buffer multi-page chunks
blambov Jan 8, 2025
6aa0130
Port over some chunk cache tests
blambov Jan 9, 2025
1a15fa5
Set up for on-heap memory usage test
blambov Jan 10, 2025
c4f14ee
Introduce fileID and invalidate file by dropping id.
blambov Jan 9, 2025
307aa7f
Store addresses and attachments to avoid a direct buffer per entry
blambov Jan 10, 2025
be317b2
Sleep for jmap
blambov Jan 10, 2025
1313839
Remove pre-computed key hash
blambov Jan 10, 2025
36be7e5
Revert "Sleep for jmap"
blambov Jan 10, 2025
9190348
Revert "Set up for on-heap memory usage test"
blambov Jan 10, 2025
d4e230e
Review changes and license fix
blambov Jan 10, 2025
d5a22c8
Drop ChunkReader reference from Key
blambov Jan 10, 2025
fa2551f
Revert unneeded change
blambov Jan 10, 2025
89817b4
Test improvements
blambov Jan 10, 2025
c0c4716
Use page splitting for large buffers too, to avoid having to store a …
blambov Jan 10, 2025
7ab70b0
Fix test.
blambov Jan 10, 2025
2539b18
Review comments
blambov Jan 13, 2025
c01dbb7
Move code unchanged in ChunkCache.java
blambov Jan 13, 2025
5262c59
Fix test compilation
blambov Jan 13, 2025
7f0d6ba
Change sizeOfFile to accept File
blambov Jan 13, 2025
6686f6d
Fix and test chunk cache retention after early open
blambov Jan 13, 2025
462fb34
Provide precise end position for early-open sstables
blambov Jan 13, 2025
d755d32
Move cache invalidation FileHandle.Builder creation
blambov Jan 14, 2025
9fc879e
Add comment and remove unused method
blambov Jan 14, 2025
09199b3
Test fix
blambov Jan 14, 2025
3f08d7f
Test fix
blambov Jan 14, 2025
ba1232f
Invalidate cache only on request by calling file handle builder's inv…
blambov Jan 14, 2025
226bc15
Revert "Invalidate cache only on request by calling file handle build…
blambov Jan 14, 2025
743e53a
Invalidate both on making SequentialWriter and on global SSTableReade…
blambov Jan 14, 2025
1bbc990
Remove invalidation in SequentialWriter and rely on invalidation duri…
blambov Jan 14, 2025
dac75ad
Change order of cache invalidation and obsoletion
blambov Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
483 changes: 358 additions & 125 deletions src/java/org/apache/cassandra/cache/ChunkCache.java

Large diffs are not rendered by default.

49 changes: 16 additions & 33 deletions src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@

import org.apache.cassandra.utils.memory.BufferPools;

/**
* Buffer manager used for reading from a ChunkReader when cache is not in use. Instances of this class are
* reader-specific and thus do not need to be thread-safe since the reader itself isn't.
*
* The instances reuse themselves as the BufferHolder to avoid having to return a new object for each rebuffer call.
*/
/// Buffer manager used for reading from a [ChunkReader] when cache is not in use. They use a buffer produced by the
/// "networking" buffer pool, which is the one to be used for buffers that are not to be retained for a long time
/// (the lifetime of this object is contained by the lifetime of a [RandomAccessReader] which is contained in a read
/// operation's lifetime).
///
/// Instances of this class are reader-specific and thus do not need to be thread-safe since the reader itself isn't.
///
/// The instances reuse themselves as the BufferHolder to avoid having to return a new object for each rebuffer call.
public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer.BufferHolder
{
protected final ChunkReader source;
Expand All @@ -45,14 +47,20 @@ public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer
protected BufferManagingRebufferer(ChunkReader wrapped)
{
this.source = wrapped;
buffer = BufferPools.forChunkCache().get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
// Note: This class uses the networking buffer pool which makes better sense for short-lifetime buffers.
// Because this is meant to be used when the chunk cache is disabled, it also makes sense to use any memory
// that may have been allocated for in-flight data by using the chunk-cache pool.
// However, if some new functionality decides to use this class in the presence of the chunk cache (e.g.
// cache-bypassing compaction), using the chunk-cache pool here will certainly cause hard-to-diagnose issues
// that we would prefer to avoid.
buffer = BufferPools.forNetworking().get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
pcmanus marked this conversation as resolved.
Show resolved Hide resolved
buffer.limit(0);
}

@Override
public void closeReader()
{
BufferPools.forChunkCache().put(buffer);
BufferPools.forNetworking().put(buffer);
offset = -1;
}

Expand Down Expand Up @@ -102,31 +110,6 @@ public ByteBuffer buffer()
return buffer.duplicate();
}

@Override
public ByteOrder order()
{
return buffer.order();
}

@Override
public FloatBuffer floatBuffer()
{
return buffer.asFloatBuffer();
}

@Override
public IntBuffer intBuffer()
{
return buffer.asIntBuffer();
}

@Override
public LongBuffer longBuffer()
{
return buffer.asLongBuffer();
}


public long offset()
{
return offset;
Expand Down
13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/io/util/ChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,17 @@ public interface ChunkReader extends RebuffererFactory
* This is not guaranteed to be fulfilled.
*/
BufferType preferredBufferType();

/**
* In some cases we may end up with both compressed and uncompressed data for the same file in
* the cache. This type is used to distinguish between them.
*/
enum ReaderType
{
SIMPLE,
COMPRESSED;
/** The number of types. Declared as a constant to avoid allocating on values(). */
public static final int COUNT = ReaderType.values().length;
}
ReaderType type();
}
88 changes: 43 additions & 45 deletions src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.storage.StorageProvider;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.memory.BufferPools;

public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader
{
Expand Down Expand Up @@ -91,11 +92,14 @@ public Rebufferer instantiateRebufferer()
return new BufferManagingRebufferer.Aligned(this);
}

public ReaderType type()
{
return ReaderType.COMPRESSED;
}

public static class Standard extends CompressedChunkReader
{
// we read the raw compressed bytes into this buffer, then uncompressed them into the provided one.
private final ThreadLocalByteBufferHolder bufferHolder;

public Standard(ChannelProxy channel, CompressionMetadata metadata)
{
this(channel, metadata, 0);
Expand All @@ -104,7 +108,6 @@ public Standard(ChannelProxy channel, CompressionMetadata metadata)
public Standard(ChannelProxy channel, CompressionMetadata metadata, long startOffset)
{
super(channel, metadata, startOffset);
bufferHolder = new ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
}

@Override
Expand All @@ -122,57 +125,54 @@ public void readChunk(long position, ByteBuffer uncompressed)
: chunk.length;

long chunkOffset = chunk.offset - onDiskStartOffset;
if (chunk.length < maxCompressedLength)
boolean shouldDecompress = chunk.length < maxCompressedLength;
if (shouldDecompress || shouldCheckCrc) // when we need to read the CRC too, follow the decompression path to avoid a second channel read call
{
ByteBuffer compressed = bufferHolder.getBuffer(length);
ByteBuffer compressed = BufferPools.forNetworking().getAtLeast(length, metadata.compressor().preferredBufferType());

if (channel.read(compressed, chunkOffset) != length)
throw new CorruptBlockException(channel.filePath(), chunk);

compressed.flip();
compressed.limit(chunk.length);
uncompressed.clear();

if (shouldCheckCrc)
try
{
int checksum = (int) ChecksumType.CRC32.of(compressed);

compressed.limit(length);
int storedChecksum = compressed.getInt();
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);
if (channel.read(compressed, chunkOffset) != length)
throw new CorruptBlockException(channel.filePath(), chunk);

compressed.position(0).limit(chunk.length);
}
if (shouldCheckCrc)
{
// compute checksum of the compressed data
compressed.position(0).limit(chunk.length);
int checksum = (int) ChecksumType.CRC32.of(compressed);
// the remaining bytes are the checksum
compressed.limit(length);
int storedChecksum = compressed.getInt();
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);
}

try
{
metadata.compressor().uncompress(compressed, uncompressed);
compressed.position(0).limit(chunk.length);
uncompressed.clear();

try
{
if (shouldDecompress)
metadata.compressor().uncompress(compressed, uncompressed);
else
uncompressed.put(compressed);
}
catch (IOException e)
{
throw new CorruptBlockException(channel.filePath(), chunk, e);
}
}
catch (IOException e)
finally
{
throw new CorruptBlockException(channel.filePath(), chunk, e);
BufferPools.forNetworking().put(compressed);
}
}
else
{
uncompressed.position(0).limit(chunk.length);
if (channel.read(uncompressed, chunkOffset) != chunk.length)
throw new CorruptBlockException(channel.filePath(), chunk);

if (shouldCheckCrc)
{
uncompressed.flip();
int checksum = (int) ChecksumType.CRC32.of(uncompressed);

ByteBuffer scratch = bufferHolder.getBuffer(Integer.BYTES);

if (channel.read(scratch, chunkOffset + chunk.length) != Integer.BYTES)
throw new CorruptBlockException(channel.filePath(), chunk);
int storedChecksum = scratch.getInt(0);
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);
}
}
uncompressed.flip();
}
Expand Down Expand Up @@ -223,24 +223,22 @@ public void readChunk(long position, ByteBuffer uncompressed)
int chunkOffsetInSegment = Ints.checkedCast(chunk.offset - segmentOffset);
ByteBuffer compressedChunk = region.buffer();

compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length);

uncompressed.clear();

try
{
if (shouldCheckCrc())
{
compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length);
int checksum = (int) ChecksumType.CRC32.of(compressedChunk);

compressedChunk.limit(compressedChunk.capacity());
int storedChecksum = compressedChunk.getInt();
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);

compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length);
}

compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length);
uncompressed.clear();

if (chunk.length < maxCompressedLength)
metadata.compressor().uncompress(compressedChunk, uncompressed);
else
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/io/util/FileHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public String name()

public void tidy()
{
chunkCache.ifPresent(cache -> cache.invalidateFile(name()));
ChunkCache.removeFileIdFromCache(channel.getFile());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that not using the local chunkCache here is a regression, for 2 reasons:

  1. it assumes chunkCache.get() == ChunkCache.instance. But currently in CNDB we sometimes use a different chunk cache instance for some filse, where this would be incorrect. Admittedly, the benefits of that separate instance are debatable, but it is used currently.
  2. it also kind of assumes that if the chunk cache is used, then this instance of FileHandle is meant to use it, which is technically not guarantee. In theory, nothing quite forbid the same file from being opened in one place with use of the chunk cache but also in another where it doesn't. Given that FileHandle is used a lot, including in CNDB, I think it's better not to rely on this never being a legit use case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole code is now dropped, because it is obliterating the effect of early open. Whatever solution we choose, it cannot be done during handle cleanup.

try
{
if (compressionMetadata != null)
Expand Down
24 changes: 0 additions & 24 deletions src/java/org/apache/cassandra/io/util/MmappedRegions.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,30 +254,6 @@ public ByteBuffer buffer()
return buffer.duplicate();
}

@Override
public ByteOrder order()
{
return buffer.order();
}

public FloatBuffer floatBuffer()
{
// this does an implicit duplicate(), so we need to expose it directly to avoid doing it twice unnecessarily
return buffer.asFloatBuffer();
}

public IntBuffer intBuffer()
{
// this does an implicit duplicate(), so we need to expose it directly to avoid doing it twice unnecessarily
return buffer.asIntBuffer();
}

public LongBuffer longBuffer()
{
// this does an implicit duplicate(), so we need to expose it directly to avoid doing it twice unnecessarily
return buffer.asLongBuffer();
}

public long offset()
{
return offset;
Expand Down
54 changes: 0 additions & 54 deletions src/java/org/apache/cassandra/io/util/Rebufferer.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,6 @@ interface BufferHolder
*/
ByteBuffer buffer();

/**
* Return the order of the underlying {@link ByteBuffer} held by this class. This is only relevant for the
* {@link #floatBuffer()}, {@link #intBuffer()} and {@link #longBuffer()} methods because the caller cannot
* change the order of those returned buffer objects. Further, it is not generally relevant for calls to
* {@link #buffer()} since the call to {@link ByteBuffer#duplicate()} sets the byte order to
* {@link ByteOrder#BIG_ENDIAN} and the caller can change the order of the returned buffer.
*/
ByteOrder order();

default FloatBuffer floatBuffer()
{
throw new UnsupportedOperationException("not implemented in " + this.getClass());
}

default IntBuffer intBuffer()
{
throw new UnsupportedOperationException("not implemented in " + this.getClass());
}

default LongBuffer longBuffer()
{
throw new UnsupportedOperationException("not implemented in " + this.getClass());
}

/**
* Position in the file of the start of the buffer.
*/
Expand All @@ -97,30 +73,6 @@ public ByteBuffer buffer()
return EMPTY_BUFFER;
}

@Override
public ByteOrder order()
{
return EMPTY_BUFFER.order();
}

@Override
public FloatBuffer floatBuffer()
{
return EMPTY_BUFFER.asFloatBuffer();
}

@Override
public IntBuffer intBuffer()
{
return EMPTY_BUFFER.asIntBuffer();
}

@Override
public LongBuffer longBuffer()
{
return EMPTY_BUFFER.asLongBuffer();
}

@Override
public long offset()
{
Expand All @@ -144,12 +96,6 @@ public ByteBuffer buffer()
return EMPTY.buffer();
}

@Override
public ByteOrder order()
{
return EMPTY.order();
}

@Override
public long offset()
{
Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/io/util/SequentialWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;

import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.utils.PageAware;
Expand Down Expand Up @@ -124,6 +125,9 @@ private static FileChannel openChannel(File file)
try { channel.close(); }
catch (Throwable t2) { t.addSuppressed(t2); }
}

// Invalidate any cache entries that may exist for a previous file with the same name.
ChunkCache.removeFileIdFromCache(file);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in CNDB we use the ChunkCache also for SAI, should we add some specifc handling ?

cc @pcmanus

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to the file handle builder to use the supplied cache instance.

return channel;
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public void invalidateIfCached(long position)
{
}

public ReaderType type()
{
return ReaderType.SIMPLE;
}

@Override
public String toString()
{
Expand Down
Loading