Skip to content

Commit

Permalink
refactor(streaming_encrypt): Implement single-pass streaming encryption
Browse files Browse the repository at this point in the history
Refactors streaming_encrypt_from_file to use a single pass through the file while
maintaining correct encryption. Key changes:

- Collects all source hashes while reading chunks
- Processes chunks 2+ immediately after getting their hashes
- Stores first two chunks for processing after all hashes are available
- Uses shrink_data_map to handle child map creation
- Returns shrunk data map for consistency with encrypt()

This maintains the streaming efficiency while ensuring correct encryption by:
1. Only reading each chunk once from disk
2. Keeping minimal data in memory
3. Preserving the encryption requirements (all hashes needed)
4. Properly handling the data map hierarchy

The function now matches the behavior of encrypt() while being more memory
efficient for large files.
  • Loading branch information
dirvine committed Dec 17, 2024
1 parent bf5c563 commit ba89ac4
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 492 deletions.
57 changes: 5 additions & 52 deletions src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,58 +6,11 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{get_num_chunks, get_start_end_positions};
use bytes::Bytes;
use rayon::prelude::*;
use xor_name::XorName;

#[derive(Clone)]
pub(crate) struct EncryptionBatch {
pub(crate) raw_chunks: Vec<RawChunk>,
}

/// The clear text bytes of a chunk
/// from a larger piece of data,
/// and its index in the set of chunks.
#[derive(Clone)]
pub struct RawChunk {
/// The index of this chunk, in the set of chunks
/// obtained from a larger piece of data.
pub index: usize,
/// The raw data.
pub data: Bytes,
/// The hash of the raw data in this chunk.
pub hash: XorName,
}

/// Hash all the chunks.
/// Creates [num cores] batches.
pub(crate) fn batch_chunks(bytes: Bytes) -> (usize, Vec<EncryptionBatch>) {
let data_size = bytes.len();
let num_chunks = get_num_chunks(data_size);

let raw_chunks: Vec<_> = (0..num_chunks)
.map(|index| (index, bytes.clone()))
.par_bridge()
.map(|(index, bytes)| {
let (start, end) = get_start_end_positions(data_size, index);
let data = bytes.slice(start..end);
let hash = XorName::from_content(data.as_ref());
RawChunk { index, data, hash }
})
.collect();

let mut raw_chunks = raw_chunks.into_iter().peekable();

let cpus = num_cpus::get();
let chunks_per_batch = usize::max(1, (num_chunks as f64 / cpus as f64).ceil() as usize);
let mut batches = vec![];

while raw_chunks.peek().is_some() {
batches.push(EncryptionBatch {
raw_chunks: raw_chunks.by_ref().take(chunks_per_batch).collect(),
});
}

(num_chunks, batches)
/// The actual encrypted content of the chunk
#[derive(Clone, Debug)]
pub struct EncryptedChunk {
/// The encrypted content of the chunk
pub content: Bytes,
}
151 changes: 5 additions & 146 deletions src/encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,90 +7,19 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
chunk::{EncryptionBatch, RawChunk},
data_map::DataMap,
aes::{self, Iv, Key, Pad},
error::Error,
utils::{get_pad_key_and_iv, xor},
ChunkInfo, EncryptedChunk, Result, COMPRESSION_QUALITY,
utils::xor,
Result, COMPRESSION_QUALITY,
};

use brotli::enc::BrotliEncoderParams;
use bytes::Bytes;
use itertools::Itertools;
use rayon::prelude::*;
use std::{io::Cursor, sync::Arc};
use xor_name::XorName;

/// Encrypt the chunks
pub(crate) fn encrypt(batches: Vec<EncryptionBatch>) -> (DataMap, Vec<EncryptedChunk>) {
let src_hashes = Arc::new(
batches
.iter()
.flat_map(|b| &b.raw_chunks)
.sorted_by_key(|c| c.index)
.map(|d| &d.hash)
.cloned()
.collect_vec(),
);

let (keys, chunks) = batches
.into_iter()
.map(|batch| (batch, src_hashes.clone()))
.par_bridge()
.map(|(batch, src_hashes)| {
batch
.raw_chunks
.par_iter()
.map(|chunk| {
let RawChunk { index, data, hash } = chunk.clone();

let src_size = data.len();
let pki = get_pad_key_and_iv(index, src_hashes.as_ref());
let encrypted_content = encrypt_chunk(data, pki)?;
let dst_hash = XorName::from_content(encrypted_content.as_ref());

Ok((
ChunkInfo {
index,
dst_hash,
src_hash: hash,
src_size,
},
EncryptedChunk {
content: encrypted_content,
},
))
})
.collect::<Vec<_>>()
})
.flatten()
.fold(
|| (vec![], vec![]),
|(mut keys, mut chunks),
result: std::result::Result<(ChunkInfo, EncryptedChunk), Error>| {
if let Ok((key, chunk)) = result {
keys.push(key);
chunks.push(chunk);
}
(keys, chunks)
},
)
.reduce(
|| (vec![], vec![]),
|(mut keys, mut chunks), (key_subset, chunk_subset)| {
keys.extend(key_subset);
chunks.extend(chunk_subset);
(keys, chunks)
},
);
use std::io::Cursor;

(DataMap::new(keys), chunks)
}

/// Encrypt the chunk
/// Encrypt a chunk
pub(crate) fn encrypt_chunk(content: Bytes, pki: (Pad, Key, Iv)) -> Result<Bytes> {
let (pad, key, iv) = pki;

let mut compressed = vec![];
let enc_params = BrotliEncoderParams {
quality: COMPRESSION_QUALITY,
Expand All @@ -105,73 +34,3 @@ pub(crate) fn encrypt_chunk(content: Bytes, pki: (Pad, Key, Iv)) -> Result<Bytes
let encrypted = aes::encrypt(Bytes::from(compressed), &key, &iv)?;
Ok(xor(&encrypted, &pad))
}

/// Encrypt chunks in a streaming fashion, processing them in the correct order to satisfy the
/// encryption requirements. Each chunk is encrypted using the hashes of two other chunks:
/// - For chunk 0: Uses hashes of the last two chunks
/// - For chunk 1: Uses hash of chunk 0 and the last chunk
/// - For chunks 2+: Uses hashes of the previous two chunks
pub(crate) fn encrypt_stream(chunks: Vec<RawChunk>) -> Result<DataMap> {
// Create a sorted vector of all hashes - we still need this for encryption
let src_hashes: Vec<_> = chunks.iter().map(|c| c.hash).collect();
let mut keys = Vec::with_capacity(chunks.len());

// First, process chunks 2 onwards in parallel since they only need their previous two hashes
let later_chunks: Vec<_> = chunks.iter().skip(2).collect();
let later_chunk_infos: Vec<ChunkInfo> = later_chunks
.into_par_iter()
.map(|chunk| {
let RawChunk { index, data, hash } = chunk;
let src_size = data.len();

let pki = get_pad_key_and_iv(*index, &src_hashes);
let encrypted_content = encrypt_chunk(data.clone(), pki)?;
let dst_hash = XorName::from_content(encrypted_content.as_ref());

Ok(ChunkInfo {
index: *index,
dst_hash,
src_hash: *hash,
src_size,
})
})
.collect::<Result<Vec<_>>>()?;

keys.extend(later_chunk_infos);

// Process chunk 1 (needs hash 0 and last hash)
let chunk = &chunks[1];
let pki = get_pad_key_and_iv(1, &src_hashes);
let encrypted_content = encrypt_chunk(chunk.data.clone(), pki)?;
let dst_hash = XorName::from_content(encrypted_content.as_ref());

// Insert at beginning since this is chunk 1
keys.insert(
0,
ChunkInfo {
index: 1,
dst_hash,
src_hash: chunk.hash,
src_size: chunk.data.len(),
},
);

// Process chunk 0 (needs last two hashes)
let chunk = &chunks[0];
let pki = get_pad_key_and_iv(0, &src_hashes);
let encrypted_content = encrypt_chunk(chunk.data.clone(), pki)?;
let dst_hash = XorName::from_content(encrypted_content.as_ref());

// Insert at beginning since this is chunk 0
keys.insert(
0,
ChunkInfo {
index: 0,
dst_hash,
src_hash: chunk.hash,
src_size: chunk.data.len(),
},
);

Ok(DataMap::new(keys))
}
Loading

0 comments on commit ba89ac4

Please sign in to comment.