Skip to content

Commit

Permalink
maybe_spawn_fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Jan 15, 2025
1 parent 61811df commit f481341
Showing 1 changed file with 17 additions and 26 deletions.
43 changes: 17 additions & 26 deletions crates/stages/stages/src/stages/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ pub struct S3Stage {
max_concurrent_requests: u64,
/// Channel to receive the downloaded ranges from the fetch task.
fetch_rx: Option<UnboundedReceiver<Result<S3DownloaderResponse, DownloaderError>>>,
/// Downloaded ranges by the fetch task.
downloaded_ranges: Vec<RangeInclusive<u64>>,
}

impl<Provider> Stage<Provider> for S3Stage
Expand All @@ -58,13 +56,13 @@ where
) -> Poll<Result<(), StageError>> {
// We are currently fetching and may have downloaded ranges that we can process.
if let Some(rx) = &mut self.fetch_rx {

// Whether we have downloaded all the required files.
let mut is_done = false;

let response = match ready!(rx.poll_recv(cx)) {
Some(Ok(response)) => {
is_done = response.is_done;
self.downloaded_ranges.push(response.range);
Ok(())
}
Some(Err(_)) => todo!(), // TODO: DownloaderError -> StageError
Expand All @@ -78,10 +76,13 @@ where
return Poll::Ready(response)
}

// Spawns the downloader task
self.spawn_fetch(input);
// Spawns the downloader task if there are any missing files
if let Some(fetch_rx) = self.maybe_spawn_fetch(input) {
self.fetch_rx = Some(fetch_rx);
return Poll::Pending
}

Poll::Pending
Poll::Ready(Ok(()))
}

fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError>
Expand All @@ -98,23 +99,8 @@ where
// Re-initializes the provider to detect the new additions
static_file_provider.initialize_index()?;

let highest_block = self
.downloaded_ranges
.last()
.map(|r| *r.end())
.unwrap_or_else(|| input.checkpoint().block_number);

for block_range in self.downloaded_ranges.drain(..) {
// Populate TransactionBlock table
for block_number in block_range {
// TODO: should be error if none since we always expect them to exist here
if let Some(indice) = static_file_provider.block_body_indices(block_number)? {
if indice.tx_count() > 0 {
tx_block_cursor.append(indice.last_tx_num(), &block_number)?;
}
}
}
}
// TODO logic for appending tx_block
// tx_block_cursor.append(indice.last_tx_num(), &block_number)?;

let checkpoint = StageCheckpoint { block_number: highest_block, stage_checkpoint: None };
provider.save_stage_checkpoint(StageId::Bodies, checkpoint)?;
Expand All @@ -137,12 +123,12 @@ where
}

impl S3Stage {
/// Spawns a task that will fetch all the missing static files from the remote server.
/// It will only spawn a task to fetch files from the remote server, it there are any missing static files.
///
/// Every time a block range is ready with all the necessary files, it sends a
/// [`S3DownloaderResponse`] to `self.fetch_rx`. If it's the last requested block range, the
/// response will have `is_done` set to true.
fn spawn_fetch(&mut self, input: ExecInput) {
fn maybe_spawn_fetch(&mut self, input: ExecInput) -> Option<UnboundedReceiver<Result<S3DownloaderResponse, DownloaderError>>> {
let checkpoint = input.checkpoint();
// TODO: input target can only be certain numbers. eg. 499_999 , 999_999 etc.

Expand Down Expand Up @@ -171,6 +157,11 @@ impl S3Stage {
requests.push((block_range, block_range_requests));
}

// Return None, if we have downloaded all the files that are required.
if requests.is_empty() {
return None
}

let static_file_directory = self.static_file_directory.clone();
let url = self.url.clone();
let max_concurrent_requests = self.max_concurrent_requests;
Expand Down Expand Up @@ -204,7 +195,7 @@ impl S3Stage {
}
});

self.fetch_rx = Some(fetch_rx);
Some(fetch_rx)
}
}

Expand Down

0 comments on commit f481341

Please sign in to comment.