Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify source piece & data handling during object retrieval #3301

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions crates/subspace-core-primitives/src/pieces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ impl PieceIndex {
/// Piece index 1.
pub const ONE: PieceIndex = PieceIndex(1);

/// Create new instance
#[inline]
pub const fn new(n: u64) -> Self {
Self(n)
}

/// Create piece index from bytes.
#[inline]
pub const fn from_bytes(bytes: [u8; Self::SIZE]) -> Self {
Expand All @@ -114,8 +120,8 @@ impl PieceIndex {

/// Segment index piece index corresponds to
#[inline]
pub fn segment_index(&self) -> SegmentIndex {
SegmentIndex::from(self.0 / ArchivedHistorySegment::NUM_PIECES as u64)
pub const fn segment_index(&self) -> SegmentIndex {
SegmentIndex::new(self.0 / ArchivedHistorySegment::NUM_PIECES as u64)
}

/// Position of a piece in a segment
Expand All @@ -130,28 +136,41 @@ impl PieceIndex {
#[inline]
pub const fn source_position(&self) -> u32 {
assert!(self.is_source());
self.position() / (Self::source_ratio() as u32)

let source_start = self.position() / RecordedHistorySegment::ERASURE_CODING_RATE.1 as u32
* RecordedHistorySegment::ERASURE_CODING_RATE.0 as u32;
let source_offset = self.position() % RecordedHistorySegment::ERASURE_CODING_RATE.1 as u32;

source_start + source_offset
}

/// Returns the piece index for a source position and segment index.
/// Panics if the piece is not a source piece.
#[inline]
pub const fn from_source_position(
source_position: u32,
segment_index: SegmentIndex,
) -> PieceIndex {
let source_position = source_position as u64;
let start = source_position / RecordedHistorySegment::ERASURE_CODING_RATE.0 as u64
* RecordedHistorySegment::ERASURE_CODING_RATE.1 as u64;
let offset = source_position % RecordedHistorySegment::ERASURE_CODING_RATE.0 as u64;

PieceIndex(segment_index.first_piece_index().0 + start + offset)
}

/// Is this piece index a source piece?
#[inline]
pub const fn is_source(&self) -> bool {
// Source pieces are interleaved with parity pieces, source first
self.0 % Self::source_ratio() == 0
self.0 % (RecordedHistorySegment::ERASURE_CODING_RATE.1 as u64)
< (RecordedHistorySegment::ERASURE_CODING_RATE.0 as u64)
}

/// Returns the next source piece index
#[inline]
pub const fn next_source_index(&self) -> PieceIndex {
PieceIndex(self.0 + Self::source_ratio() - (self.0 % Self::source_ratio()))
}

/// The ratio of source pieces to all pieces
#[inline]
const fn source_ratio() -> u64 {
// Assumes the result is an integer
(RecordedHistorySegment::ERASURE_CODING_RATE.1
/ RecordedHistorySegment::ERASURE_CODING_RATE.0) as u64
PieceIndex::from_source_position(self.source_position() + 1, self.segment_index())
}
}

Expand Down
10 changes: 6 additions & 4 deletions crates/subspace-core-primitives/src/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,15 @@ impl SegmentIndex {
}

/// Get the first piece index in this segment.
pub fn first_piece_index(&self) -> PieceIndex {
PieceIndex::from(self.0 * ArchivedHistorySegment::NUM_PIECES as u64)
#[inline]
pub const fn first_piece_index(&self) -> PieceIndex {
PieceIndex::new(self.0 * ArchivedHistorySegment::NUM_PIECES as u64)
}

/// Get the last piece index in this segment.
pub fn last_piece_index(&self) -> PieceIndex {
PieceIndex::from((self.0 + 1) * ArchivedHistorySegment::NUM_PIECES as u64 - 1)
#[inline]
pub const fn last_piece_index(&self) -> PieceIndex {
PieceIndex::new((self.0 + 1) * ArchivedHistorySegment::NUM_PIECES as u64 - 1)
}

/// List of piece indexes that belong to this segment.
Expand Down
41 changes: 19 additions & 22 deletions shared/subspace-data-retrieval/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,18 +278,22 @@ where
.read_piece(next_source_piece_index, piece_index, piece_offset)
.await?;
next_source_piece_index = next_source_piece_index.next_source_index();
read_records_data.extend(piece.record().to_raw_record_chunks().flatten().copied());
// Discard piece data before the offset
read_records_data.extend(
piece
.record()
.to_raw_record_chunks()
.flatten()
.skip(piece_offset as usize)
.copied(),
);

if last_data_piece_in_segment {
// The last 2 bytes might contain segment padding, so we can't use them for object length or object data.
read_records_data.truncate(RawRecord::SIZE - 2);
read_records_data.truncate(read_records_data.len() - 2);
}

let data_length = self.decode_data_length(
&read_records_data[piece_offset as usize..],
piece_index,
piece_offset,
)?;
let data_length = self.decode_data_length(&read_records_data, piece_index, piece_offset)?;

let data_length = if let Some(data_length) = data_length {
data_length
Expand All @@ -311,12 +315,8 @@ where
next_source_piece_index = next_source_piece_index.next_source_index();
read_records_data.extend(piece.record().to_raw_record_chunks().flatten().copied());

self.decode_data_length(
&read_records_data[piece_offset as usize..],
piece_index,
piece_offset,
)?
.expect("Extra RawRecord is larger than the length encoding; qed")
self.decode_data_length(&read_records_data, piece_index, piece_offset)?
.expect("Extra RawRecord is larger than the length encoding; qed")
} else {
trace!(
piece_position_in_segment,
Expand Down Expand Up @@ -347,14 +347,10 @@ where
return Ok(None);
}

// Discard piece data before the offset
let mut data = read_records_data[piece_offset as usize..].to_vec();
drop(read_records_data);

// Read more pieces until we have enough data
if data_length as usize > data.len() {
if data_length as usize > read_records_data.len() {
let remaining_piece_count =
(data_length as usize - data.len()).div_ceil(RawRecord::SIZE);
(data_length as usize - read_records_data.len()).div_ceil(RawRecord::SIZE);
let remaining_piece_indexes = (next_source_piece_index..)
.filter(|i| i.is_source())
.take(remaining_piece_count)
Expand All @@ -363,14 +359,15 @@ where
.await?
.into_iter()
.for_each(|piece| {
data.extend(piece.record().to_raw_record_chunks().flatten().copied())
read_records_data
.extend(piece.record().to_raw_record_chunks().flatten().copied())
});
}

// Decode the data, and return it if it's valid
let data = Vec::<u8>::decode(&mut data.as_slice())?;
let read_records_data = Vec::<u8>::decode(&mut read_records_data.as_slice())?;

Ok(Some(data))
Ok(Some(read_records_data))
}

/// Fetch and assemble an object that can cross segment boundaries, which requires assembling
Expand Down
Loading