From e3cb10e2938400692c755cf5009c503568a2078e Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 21 Oct 2024 18:19:26 +0300 Subject: [PATCH] Fix cache writes --- .../subspace-farmer/src/disk_piece_cache.rs | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/crates/subspace-farmer/src/disk_piece_cache.rs b/crates/subspace-farmer/src/disk_piece_cache.rs index 529908e3a0a..811a170d4e8 100644 --- a/crates/subspace-farmer/src/disk_piece_cache.rs +++ b/crates/subspace-farmer/src/disk_piece_cache.rs @@ -15,7 +15,6 @@ use futures::channel::mpsc; use futures::{stream, SinkExt, Stream, StreamExt}; use parking_lot::Mutex; use prometheus_client::registry::Registry; -use std::ops::Deref; use std::path::Path; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; @@ -66,15 +65,6 @@ struct FilePool { cursor: AtomicU8, } -impl Deref for FilePool { - type Target = DirectIoFile; - - fn deref(&self) -> &Self::Target { - let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed)); - &self.files[position % PIECES_READING_CONCURRENCY] - } -} - impl FilePool { fn open(path: &Path) -> io::Result { let files = (0..PIECES_READING_CONCURRENCY) @@ -87,12 +77,23 @@ impl FilePool { cursor: AtomicU8::new(0), }) } + + fn read(&self) -> &DirectIoFile { + let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed)); + &self.files[position % PIECES_READING_CONCURRENCY] + } + + fn write(&self) -> &DirectIoFile { + // Always the same file or else overlapping writes will be corrupted due to + // read/modify/write internals, which are in turn caused by alignment requirements + &self.files[0] + } } #[derive(Debug)] struct Inner { id: PieceCacheId, - file: FilePool, + files: FilePool, max_num_elements: u32, metrics: Option, } @@ -242,19 +243,22 @@ impl DiskPieceCache { return Err(DiskPieceCacheError::ZeroCapacity); } - let file = FilePool::open(&directory.join(Self::FILE_NAME))?; + let files = FilePool::open(&directory.join(Self::FILE_NAME))?; let expected_size = u64::from(Self::element_size()) * u64::from(capacity); // Align plot file size for disk sector size let expected_size = expected_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64; - if file.size()? != expected_size { - // Allocating the whole file (`set_len` below can create a sparse file, which will cause - // writes to fail later) - file.preallocate(expected_size) - .map_err(DiskPieceCacheError::CantPreallocateCacheFile)?; - // Truncating file (if necessary) - file.set_len(expected_size)?; + { + let file = files.write(); + if file.size()? != expected_size { + // Allocating the whole file (`set_len` below can create a sparse file, which will cause + // writes to fail later) + file.preallocate(expected_size) + .map_err(DiskPieceCacheError::CantPreallocateCacheFile)?; + // Truncating file (if necessary) + file.set_len(expected_size)?; + } } // ID for cache is ephemeral unless provided explicitly @@ -264,7 +268,7 @@ impl DiskPieceCache { Ok(Self { inner: Arc::new(Inner { id, - file, + files, max_num_elements: capacity, metrics, }), @@ -353,16 +357,16 @@ impl DiskPieceCache { let element_offset = u64::from(offset) * u64::from(Self::element_size()); let piece_index_bytes = piece_index.to_bytes(); + // File writes are read/write/modify internally, so combine all data here for more efficient + // write + let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE); + bytes.extend_from_slice(&piece_index_bytes); + bytes.extend_from_slice(piece.as_ref()); + bytes.extend_from_slice(blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref()); self.inner - .file - .write_all_at(&piece_index_bytes, element_offset)?; - self.inner - .file - .write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?; - self.inner.file.write_all_at( - blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(), - element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64, - )?; + .files + .write() + .write_all_at(&bytes, element_offset)?; Ok(()) } @@ -432,7 +436,8 @@ impl DiskPieceCache { element: &mut [u8], ) -> Result, DiskPieceCacheError> { self.inner - .file + .files + .read() .read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?; let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);