diff --git a/crates/stages/stages/src/stages/s3/mod.rs b/crates/stages/stages/src/stages/s3/mod.rs index 6b92f8d3a73f..75fcf073aaae 100644 --- a/crates/stages/stages/src/stages/s3/mod.rs +++ b/crates/stages/stages/src/stages/s3/mod.rs @@ -36,8 +36,6 @@ pub struct S3Stage { max_concurrent_requests: u64, /// Channel to receive the downloaded ranges from the fetch task. fetch_rx: Option>>, - /// Downloaded ranges by the fetch task. - downloaded_ranges: Vec>, } impl Stage for S3Stage @@ -58,13 +56,13 @@ where ) -> Poll> { // We are currently fetching and may have downloaded ranges that we can process. if let Some(rx) = &mut self.fetch_rx { + // Whether we have downloaded all the required files. let mut is_done = false; let response = match ready!(rx.poll_recv(cx)) { Some(Ok(response)) => { is_done = response.is_done; - self.downloaded_ranges.push(response.range); Ok(()) } Some(Err(_)) => todo!(), // TODO: DownloaderError -> StageError @@ -78,10 +76,13 @@ where return Poll::Ready(response) } - // Spawns the downloader task - self.spawn_fetch(input); + // Spawns the downloader task if there are any missing files + if let Some(fetch_rx) = self.maybe_spawn_fetch(input) { + self.fetch_rx = Some(fetch_rx); + return Poll::Pending + } - Poll::Pending + Poll::Ready(Ok(())) } fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result @@ -98,23 +99,8 @@ where // Re-initializes the provider to detect the new additions static_file_provider.initialize_index()?; - let highest_block = self - .downloaded_ranges - .last() - .map(|r| *r.end()) - .unwrap_or_else(|| input.checkpoint().block_number); - - for block_range in self.downloaded_ranges.drain(..) { - // Populate TransactionBlock table - for block_number in block_range { - // TODO: should be error if none since we always expect them to exist here - if let Some(indice) = static_file_provider.block_body_indices(block_number)? { - if indice.tx_count() > 0 { - tx_block_cursor.append(indice.last_tx_num(), &block_number)?; - } - } - } - } + // TODO logic for appending tx_block + // tx_block_cursor.append(indice.last_tx_num(), &block_number)?; let checkpoint = StageCheckpoint { block_number: highest_block, stage_checkpoint: None }; provider.save_stage_checkpoint(StageId::Bodies, checkpoint)?; @@ -137,12 +123,12 @@ where } impl S3Stage { - /// Spawns a task that will fetch all the missing static files from the remote server. + /// It will only spawn a task to fetch files from the remote server, it there are any missing static files. /// /// Every time a block range is ready with all the necessary files, it sends a /// [`S3DownloaderResponse`] to `self.fetch_rx`. If it's the last requested block range, the /// response will have `is_done` set to true. - fn spawn_fetch(&mut self, input: ExecInput) { + fn maybe_spawn_fetch(&mut self, input: ExecInput) -> Option>> { let checkpoint = input.checkpoint(); // TODO: input target can only be certain numbers. eg. 499_999 , 999_999 etc. @@ -171,6 +157,11 @@ impl S3Stage { requests.push((block_range, block_range_requests)); } + // Return None, if we have downloaded all the files that are required. + if requests.is_empty() { + return None + } + let static_file_directory = self.static_file_directory.clone(); let url = self.url.clone(); let max_concurrent_requests = self.max_concurrent_requests; @@ -204,7 +195,7 @@ impl S3Stage { } }); - self.fetch_rx = Some(fetch_rx); + Some(fetch_rx) } }