Skip to content

Commit

Permalink
add downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Jan 13, 2025
1 parent 4e3810a commit 79066e8
Show file tree
Hide file tree
Showing 11 changed files with 705 additions and 0 deletions.
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ backon = { version = "1.2", default-features = false, features = [
] }
bincode = "1.3"
bitflags = "2.4"
blake3 = "1.5.5"
boyer-moore-magiclen = "0.2.16"
bytes = { version = "1.5", default-features = false }
cfg-if = "1.0"
Expand Down
8 changes: 8 additions & 0 deletions crates/stages/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ reth-db-api.workspace = true
reth-etl.workspace = true
reth-evm.workspace = true
reth-exex.workspace = true
reth-fs-util.workspace = true
reth-network-p2p.workspace = true
reth-primitives = { workspace = true, features = ["secp256k1"] }
reth-primitives-traits = { workspace = true, features = [
Expand Down Expand Up @@ -57,6 +58,12 @@ rayon.workspace = true
num-traits = "0.2.15"
tempfile = { workspace = true, optional = true }
bincode.workspace = true
blake3.workspace = true
reqwest = { workspace = true, default-features = false, features = [
"rustls-tls-native-roots",
"blocking"
] }
serde = { workspace = true, features = ["derive"] }

[dev-dependencies]
# reth
Expand All @@ -75,6 +82,7 @@ reth-testing-utils.workspace = true
reth-trie = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-network-peers.workspace = true
reth-tracing.workspace = true

alloy-rlp.workspace = true
itertools.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/stages/stages/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ mod index_storage_history;
/// Stage for computing state root.
mod merkle;
mod prune;
/// The s3 download stage
mod s3;
/// The sender recovery stage.
mod sender_recovery;
/// The transaction lookup stage
Expand All @@ -32,6 +34,7 @@ pub use index_account_history::*;
pub use index_storage_history::*;
pub use merkle::*;
pub use prune::*;
pub use s3::*;
pub use sender_recovery::*;
pub use tx_lookup::*;

Expand Down
30 changes: 30 additions & 0 deletions crates/stages/stages/src/stages/s3/downloader/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use alloy_primitives::B256;
use reth_fs_util::FsPathError;

#[derive(Debug, thiserror::Error)]
pub enum DownloaderError {
/// Requires a valid `total_size` {0}
#[error("requires a valid total_size")]
InvalidMetadataTotalSize(Option<usize>),
#[error("tried to access chunk on index {0}, but there's only {1} chunks")]
/// Invalid chunk access
InvalidChunk(usize, usize),
// File hash mismatch.
#[error("file hash does not match the expected one {0} != {1} ")]
InvalidFileHash(B256, B256),
// Empty content length returned from the server.
#[error("metadata got an empty content length from server")]
EmptyContentLength,
/// Reqwest error
#[error(transparent)]
FsPath(#[from] FsPathError),
/// Reqwest error
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
/// Std Io error
#[error(transparent)]
StdIo(#[from] std::io::Error),
/// Bincode error
#[error(transparent)]
Bincode(#[from] bincode::Error),
}
190 changes: 190 additions & 0 deletions crates/stages/stages/src/stages/s3/downloader/fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use super::{
error::DownloaderError,
meta::Metadata,
worker::{worker_fetch, WorkerRequest, WorkerResponse},
};
use alloy_primitives::B256;
use reqwest::{blocking::Client, header::CONTENT_LENGTH};
use std::{
collections::HashMap,
fs::{File, OpenOptions},
io::BufReader,
path::Path,
sync::mpsc::channel,
};
use tracing::{debug, error};

/// Downloads file from url to data file path.
///
/// If a `file_hash` is passed, it will verify it at the end.
///
/// ## Details
///
/// 1) A [`Metadata`] file is created or opened in `{target_dir}/download/{filename}.metadata`. It
/// tracks the download progress including total file size, downloaded bytes, chunk sizes, and
/// ranges that still need downloading. Allows for resumability.
/// 2) The target file is preallocated with the total size of the file in
/// `{target_dir}/download/{filename}`.
/// 3) Multiple `workers` are spawned for downloading of specific chunks of the file.
/// 4) `Orchestrator` manages workers, distributes chunk ranges, and ensures the download progresses
/// efficiently by dynamically assigning tasks to workers as they become available.
/// 5) Once the file is downloaded:
/// * If `file_hash` is `Some`, verifies its blake3 hash.
/// * Deletes the metadata file
/// * Moves downloaded file to target directory.
pub fn fetch(
filename: &str,
target_dir: &Path,
url: &str,
mut concurrent: u64,
file_hash: Option<B256>,
) -> Result<(), DownloaderError> {
// Create a temporary directory to download files to, before moving them to target_dir.
let download_dir = target_dir.join("download");
reth_fs_util::create_dir_all(&download_dir)?;

let data_file = download_dir.join(filename);
let mut metadata = metadata(&data_file, url)?;
if metadata.is_done() {
return Ok(())
}

// Ensure the file is preallocated so we can download it concurrently
{
let file = OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&data_file)?;

if file.metadata()?.len() as usize != metadata.total_size {
debug!(target: "sync::stages::s3::downloader", ?data_file, length = metadata.total_size, "Preallocating space.");
file.set_len(metadata.total_size as u64)?;
}
}

while !metadata.is_done() {
// Find the missing file chunks and the minimum number of workers required
let missing_chunks = metadata.needed_ranges();
concurrent = concurrent
.min(std::thread::available_parallelism()?.get() as u64)
.min(missing_chunks.len() as u64);

// Create channels for communication between workers and orchestrator
let (orchestrator_tx, orchestrator_rx) = channel();

// Initiate workers
for worker_id in 0..concurrent {
let orchestrator_tx = orchestrator_tx.clone();
let data_file = data_file.clone();
let url = url.to_string();
std::thread::spawn(move || {
if let Err(error) = worker_fetch(worker_id, &orchestrator_tx, data_file, url) {
let _ = orchestrator_tx.send(WorkerResponse::Err { worker_id, error });
}
});
}

// Drop the sender to allow the loop processing to exit once all workers are done
drop(orchestrator_tx);

let mut workers = HashMap::new();
let mut missing_chunks = missing_chunks.into_iter();

// Distribute chunk ranges to workers when they free up
while let Ok(worker_msg) = orchestrator_rx.recv() {
debug!(target: "sync::stages::s3::downloader", ?worker_msg, "received message from worker");

let available_worker = match worker_msg {
WorkerResponse::Ready { worker_id, tx } => {
workers.insert(worker_id, tx);
worker_id
}
WorkerResponse::DownloadedChunk { worker_id, chunk_index, written_bytes } => {
metadata.update_chunk(chunk_index, written_bytes)?;
worker_id
}
WorkerResponse::Err { worker_id, error } => {
error!(target: "sync::stages::s3::downloader", ?worker_id, "Worker found an error: {:?}", error);
return Err(error)
}
};

let worker = workers.get(&available_worker).expect("should exist");
match missing_chunks.next() {
Some((chunk_index, (start, end))) => {
let _ = worker.send(WorkerRequest::Download { chunk_index, start, end });
}
None => {
let _ = worker.send(WorkerRequest::Finish);
}
}
}
}

if let Some(file_hash) = file_hash {
check_file_hash(&data_file, &file_hash)?;
}

// Move downloaded file to desired directory.
metadata.delete()?;
reth_fs_util::rename(data_file, target_dir.join(filename))?;

Ok(())
}

/// Creates a metadata file used to keep track of the downloaded chunks. Useful on resuming after a
/// shutdown.
fn metadata(data_file: &Path, url: &str) -> Result<Metadata, DownloaderError> {
if Metadata::file_path(data_file).exists() {
debug!(target: "sync::stages::s3::downloader", ?data_file, "Loading metadata ");
return Metadata::load(data_file)
}

let client = Client::new();
let resp = client.head(url).send()?;
let total_length: usize = resp
.headers()
.get(CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok())
.ok_or(DownloaderError::EmptyContentLength)?;

debug!(target: "sync::stages::s3::downloader", ?data_file, "Creating metadata ");

Metadata::builder(data_file).with_total_size(total_length).build()
}

/// Ensures the file on path has the expected blake3 hash.
fn check_file_hash(path: &Path, expected: &B256) -> Result<(), DownloaderError> {
let mut reader = BufReader::new(File::open(path)?);
let mut hasher = blake3::Hasher::new();
std::io::copy(&mut reader, &mut hasher)?;

let file_hash = hasher.finalize();
if file_hash.as_bytes() != expected {
return Err(DownloaderError::InvalidFileHash(file_hash.as_bytes().into(), *expected))
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::b256;

#[test]
fn test_download() {
reth_tracing::init_test_tracing();

let b3sum = b256!("81a7318f69fc1d6bb0a58a24af302f3b978bc75a435e4ae5d075f999cd060cfd");
let url = "https://link.testfile.org/500MB";

let file = tempfile::NamedTempFile::new().unwrap();
let filename = file.path().file_name().unwrap().to_str().unwrap();
let target_dir = file.path().parent().unwrap();
fetch(filename, target_dir, url, 4, Some(b3sum)).unwrap();
}
}
Loading

0 comments on commit 79066e8

Please sign in to comment.