Skip to content

Commit

Permalink
Fix cache writes
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc authored and dariolina committed Oct 21, 2024
1 parent dded9af commit e3cb10e
Showing 1 changed file with 35 additions and 30 deletions.
65 changes: 35 additions & 30 deletions crates/subspace-farmer/src/disk_piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures::channel::mpsc;
use futures::{stream, SinkExt, Stream, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::ops::Deref;
use std::path::Path;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -66,15 +65,6 @@ struct FilePool {
cursor: AtomicU8,
}

impl Deref for FilePool {
type Target = DirectIoFile;

fn deref(&self) -> &Self::Target {
let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
&self.files[position % PIECES_READING_CONCURRENCY]
}
}

impl FilePool {
fn open(path: &Path) -> io::Result<Self> {
let files = (0..PIECES_READING_CONCURRENCY)
Expand All @@ -87,12 +77,23 @@ impl FilePool {
cursor: AtomicU8::new(0),
})
}

fn read(&self) -> &DirectIoFile {
let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
&self.files[position % PIECES_READING_CONCURRENCY]
}

fn write(&self) -> &DirectIoFile {
// Always the same file or else overlapping writes will be corrupted due to
// read/modify/write internals, which are in turn caused by alignment requirements
&self.files[0]
}
}

#[derive(Debug)]
struct Inner {
id: PieceCacheId,
file: FilePool,
files: FilePool,
max_num_elements: u32,
metrics: Option<DiskPieceCacheMetrics>,
}
Expand Down Expand Up @@ -242,19 +243,22 @@ impl DiskPieceCache {
return Err(DiskPieceCacheError::ZeroCapacity);
}

let file = FilePool::open(&directory.join(Self::FILE_NAME))?;
let files = FilePool::open(&directory.join(Self::FILE_NAME))?;

let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
// Align plot file size for disk sector size
let expected_size =
expected_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
if file.size()? != expected_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
// writes to fail later)
file.preallocate(expected_size)
.map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
// Truncating file (if necessary)
file.set_len(expected_size)?;
{
let file = files.write();
if file.size()? != expected_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
// writes to fail later)
file.preallocate(expected_size)
.map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
// Truncating file (if necessary)
file.set_len(expected_size)?;
}
}

// ID for cache is ephemeral unless provided explicitly
Expand All @@ -264,7 +268,7 @@ impl DiskPieceCache {
Ok(Self {
inner: Arc::new(Inner {
id,
file,
files,
max_num_elements: capacity,
metrics,
}),
Expand Down Expand Up @@ -353,16 +357,16 @@ impl DiskPieceCache {
let element_offset = u64::from(offset) * u64::from(Self::element_size());

let piece_index_bytes = piece_index.to_bytes();
// File writes are read/write/modify internally, so combine all data here for more efficient
// write
let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
bytes.extend_from_slice(&piece_index_bytes);
bytes.extend_from_slice(piece.as_ref());
bytes.extend_from_slice(blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref());
self.inner
.file
.write_all_at(&piece_index_bytes, element_offset)?;
self.inner
.file
.write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?;
self.inner.file.write_all_at(
blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64,
)?;
.files
.write()
.write_all_at(&bytes, element_offset)?;

Ok(())
}
Expand Down Expand Up @@ -432,7 +436,8 @@ impl DiskPieceCache {
element: &mut [u8],
) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
self.inner
.file
.files
.read()
.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;

let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
Expand Down

0 comments on commit e3cb10e

Please sign in to comment.