Skip to content

Commit

Permalink
Merge pull request #603 from EspressoSystems/jb/limit-proactive-scan
Browse files Browse the repository at this point in the history
Rate limit the proactive fetching task
  • Loading branch information
imabdulbasit authored Jun 2, 2024
2 parents 05743e5 + 2c501f0 commit b295b3d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 98 deletions.
94 changes: 10 additions & 84 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

[package]
name = "hotshot-query-service"
version = "0.1.27"
version = "0.1.28"
authors = ["Espresso Systems <hello@espressosys.com>"]
edition = "2021"
license = "GPL-3.0-or-later"
Expand Down Expand Up @@ -85,9 +85,9 @@ prometheus = "0.13"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = "0.8"
surf-disco = "0.7"
surf-disco = "0.6"
tagged-base64 = "0.4"
tide-disco = "0.7"
tide-disco = "0.6"
time = "0.3"
toml = "0.8"
tracing = "0.1"
Expand Down
77 changes: 67 additions & 10 deletions src/data_source/fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ pub struct Builder<Types, S, P> {
major_scan_interval: usize,
major_scan_offset: usize,
proactive_range_chunk_size: Option<usize>,
active_fetch_delay: Duration,
chunk_fetch_delay: Duration,
proactive_fetching: bool,
_types: PhantomData<Types>,
}
Expand All @@ -168,6 +170,8 @@ impl<Types, S, P> Builder<Types, S, P> {
// don't all pause for a major scan together.
major_scan_offset: 0,
proactive_range_chunk_size: None,
active_fetch_delay: Duration::from_millis(50),
chunk_fetch_delay: Duration::from_millis(100),
proactive_fetching: true,
_types: Default::default(),
}
Expand Down Expand Up @@ -240,6 +244,32 @@ impl<Types, S, P> Builder<Types, S, P> {
self
}

/// Add a delay between active fetches in proactive scans.
///
/// This can be used to limit the rate at which this query service makes requests to other query
/// services during proactive scans. This is useful if the query service has a lot of blocks to
/// catch up on, as without a delay, scanning can be extremely burdensome on the peer.
pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
self.active_fetch_delay = active_fetch_delay;
self
}

/// Adds a delay between chunk fetches during proactive scans.
///
/// In a proactive scan, we retrieve a range of objects from a provider or local storage (e.g., a database).
/// Without a delay between fetching these chunks, the process can become very CPU-intensive, especially
/// when chunks are retrieved from local storage. While there is already a delay for active fetches
/// (`active_fetch_delay`), situations may arise when subscribed to an old stream that fetches most of the data
/// from local storage.
///
/// This additional delay helps to limit constant maximum CPU usage
/// and ensures that local storage remains accessible to all processes,
/// not just the proactive scanner.
pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
self.chunk_fetch_delay = chunk_fetch_delay;
self
}

/// Run without [proactive fetching](self#proactive-fetching).
///
/// This can reduce load on the CPU and the database, but increases the probability that
Expand Down Expand Up @@ -826,6 +856,10 @@ where
leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
vid_common_fetcher: Arc<VidCommonFetcher<Types, S, P>>,
range_chunk_size: usize,
// Duration to sleep after each active fetch,
active_fetch_delay: Duration,
// Duration to sleep after each chunk fetched
chunk_fetch_delay: Duration,
}

#[derive(Debug)]
Expand Down Expand Up @@ -934,6 +968,8 @@ where
leaf_fetcher: Arc::new(leaf_fetcher),
vid_common_fetcher: Arc::new(vid_common_fetcher),
range_chunk_size: builder.range_chunk_size,
active_fetch_delay: builder.active_fetch_delay,
chunk_fetch_delay: builder.chunk_fetch_delay,
})
}
}
Expand Down Expand Up @@ -1001,9 +1037,34 @@ where
R: RangeBounds<usize> + Send + 'static,
T: RangedFetchable<Types>,
{
let chunk_fetch_delay = self.chunk_fetch_delay;
let active_fetch_delay = self.active_fetch_delay;

stream::iter(range_chunks(range, chunk_size))
.then(move |chunk| self.clone().get_chunk(chunk))
.then(move |chunk| {
let self_clone = self.clone();
async move {
{
let chunk = self_clone.get_chunk(chunk).await;

// Introduce a delay (`chunk_fetch_delay`) between fetching chunks.
// This helps to limit constant high CPU usage when fetching long range of data,
// especially for older streams that fetch most of the data from local storage
sleep(chunk_fetch_delay).await;
chunk
}
}
})
.flatten()
.then(move |f| async move {
match f {
// Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on the catchup provider.
// The delay applies between pending fetches, not between chunks.
Fetch::Pending(_) => sleep(active_fetch_delay).await,
Fetch::Ready(_) => (),
};
f
})
.boxed()
}

Expand Down Expand Up @@ -1232,15 +1293,11 @@ where
};
prev_height = block_height;

// Iterate over all blocks that we should have. Merely iterating over the `Fetch`es
// without awaiting them is enough to trigger active fetches of missing blocks,
// since we always trigger an active fetch when fetching by block number. Moreover,
// fetching the block is enough to trigger an active fetch of the corresponding leaf
// if it too is missing.
//
// The chunking behavior of `get_range` automatically ensures that, no matter how
// big the range is, we will release the read lock on storage every `chunk_size`
// items, so we don't starve out would-be writers.
// Iterate over all blocks that we should have. Fetching the block is enough to
// trigger an active fetch of the corresponding leaf if it too is missing. The
// chunking behavior of `get_range` automatically ensures that, no matter how big
// the range is, we will release the read lock on storage every `chunk_size` items,
// so we don't starve out would-be writers.
let mut blocks = self
.clone()
.get_range_with_chunk_size::<_, BlockQueryData<Types>>(
Expand Down
14 changes: 13 additions & 1 deletion src/data_source/storage/no_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,15 @@ pub mod testing {
async fn connect(db: &Self::Storage) -> Self {
match db {
Storage::Sql(db) => {
Self::Sql(db.config().connect(Default::default()).await.unwrap())
let cfg = db.config();
let builder = cfg
.builder(Default::default())
.await
.unwrap()
.with_active_fetch_delay(Duration::from_millis(1))
.with_chunk_fetch_delay(Duration::from_millis(1));

Self::Sql(builder.build().await.unwrap())
}
Storage::NoStorage { fetch_from_port } => {
tracing::info!("creating NoStorage node, fetching missing data from port {fetch_from_port}");
Expand All @@ -272,6 +280,10 @@ pub mod testing {
// don't have storage) and the test frequently goes back and looks up
// old objects.
.with_major_scan_interval(2)
// add minor delay for active fetch
.with_active_fetch_delay(Duration::from_millis(1))
// add minor delay between chunks during proactive scan
.with_chunk_fetch_delay(Duration::from_millis(1))
.build()
.await
.unwrap(),
Expand Down

0 comments on commit b295b3d

Please sign in to comment.