Skip to content

Commit

Permalink
Merge pull request #599 from EspressoSystems/abdul/semaphore
Browse files Browse the repository at this point in the history
add semaphore to ratelimit number of fetches
  • Loading branch information
jbearer authored May 29, 2024
2 parents 61c1592 + 78b0d4d commit 7e1a763
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async-compatibility-layer = { version = "1.1", default-features = false, feature
"logging-utils",
] }
async-std = { version = "1.9.0", features = ["unstable", "attributes"] }
async-lock = "3.3.0"
async-trait = "0.1"
bincode = "1.3"
bit-vec = { version = "0.6.3", features = ["serde_std"] }
Expand Down
17 changes: 17 additions & 0 deletions src/data_source/fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ use crate::{
Header, Payload, QueryResult, Transaction, VidShare,
};
use anyhow::Context;
use async_lock::Semaphore;
use async_std::{
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
task::sleep,
Expand Down Expand Up @@ -136,6 +137,7 @@ pub struct Builder<Types, S, P> {
storage: S,
provider: P,
retry_delay: Option<Duration>,
rate_limit: Option<usize>,
range_chunk_size: usize,
minor_scan_interval: Duration,
major_scan_interval: usize,
Expand All @@ -152,6 +154,7 @@ impl<Types, S, P> Builder<Types, S, P> {
storage,
provider,
retry_delay: None,
rate_limit: None,
range_chunk_size: 25,
// By default, we run minor proactive scans fairly frequently: once every minute. These
// scans are cheap (moreso the more frequently they run) and can help us keep up with
Expand All @@ -176,6 +179,12 @@ impl<Types, S, P> Builder<Types, S, P> {
self
}

/// Set the maximum delay between retries of fetches.
pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
self.rate_limit = Some(with_rate_limit);
self
}

/// Set the number of items to process at a time when loading a range or stream.
///
/// This determines:
Expand Down Expand Up @@ -900,6 +909,14 @@ where
vid_common_fetcher = vid_common_fetcher.with_retry_delay(delay);
}

if let Some(limit) = builder.rate_limit {
let permit = Arc::new(Semaphore::new(limit));

payload_fetcher = payload_fetcher.with_rate_limit(permit.clone());
leaf_fetcher = leaf_fetcher.with_rate_limit(permit.clone());
vid_common_fetcher = vid_common_fetcher.with_rate_limit(permit);
}

let height = builder.storage.block_height().await? as u64;
let pruned_height = builder.storage.load_pruned_height().await?;
Ok(Self {
Expand Down
4 changes: 1 addition & 3 deletions src/data_source/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,7 @@ where
/// [`build`](fetching::Builder::build). For a convenient constructor that uses the default
/// fetching options, see [`Config::connect`].
pub async fn connect(config: Config, provider: P) -> Result<Builder<Types, P>, Error> {
let builder = Self::builder(SqlStorage::connect(config).await?, provider);

Ok(builder)
Ok(Self::builder(SqlStorage::connect(config).await?, provider))
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! implementations of [`Provider`] for various data availability sources.
//!
use async_lock::Semaphore;
use async_std::{
sync::{Arc, Mutex},
task::{sleep, spawn},
Expand Down Expand Up @@ -58,6 +59,7 @@ const BACKOFF_FACTOR: u32 = 4;
// spam our peers, and since backoff allows us to first try a few times with a faster delay, we can
// safely wait a long while before retrying failed requests.
const DEFAULT_RETRY_DELAY: Duration = Duration::from_secs(5 * 60);
const DEFAULT_RATE_LIMIT: usize = 32;

/// A callback to process the result of a request.
///
Expand All @@ -83,13 +85,15 @@ pub struct Fetcher<T, C> {
#[derivative(Debug = "ignore")]
in_progress: Arc<Mutex<HashMap<T, BTreeSet<C>>>>,
retry_delay: Duration,
permit: Arc<Semaphore>,
}

impl<T, C> Default for Fetcher<T, C> {
fn default() -> Self {
Self {
in_progress: Default::default(),
retry_delay: DEFAULT_RETRY_DELAY,
permit: Arc::new(Semaphore::new(DEFAULT_RATE_LIMIT)),
}
}
}
Expand All @@ -99,6 +103,11 @@ impl<T, C> Fetcher<T, C> {
self.retry_delay = retry_delay;
self
}

pub fn with_rate_limit(mut self, permit: Arc<Semaphore>) -> Self {
self.permit = permit;
self
}
}

impl<T, C> Fetcher<T, C> {
Expand Down Expand Up @@ -130,6 +139,7 @@ impl<T, C> Fetcher<T, C> {
C: Callback<T::Response> + 'static,
{
let in_progress = self.in_progress.clone();
let permit = self.permit.clone();
let max_retry_delay = self.retry_delay;

spawn(async move {
Expand Down Expand Up @@ -158,6 +168,8 @@ impl<T, C> Fetcher<T, C> {
// Now we are responsible for fetching the object, reach out to the provider.
let mut delay = min(MIN_RETRY_DELAY, max_retry_delay);
let res = loop {
// Acquire a permit from the semaphore to rate limit the number of concurrent fetch requests
let permit = permit.acquire().await;
if let Some(res) = provider.fetch(req).await {
break res;
}
Expand All @@ -174,6 +186,7 @@ impl<T, C> Fetcher<T, C> {
// accumulating because a peer which _should_ have the resource isn't providing it.
// In this case, we would require manual intervention on the peer anyways.
tracing::warn!("failed to fetch {req:?}, will retry in {delay:?}");
drop(permit);
sleep(delay).await;

// Try a few times with a short delay, on the off chance that the problem resolves
Expand Down

0 comments on commit 7e1a763

Please sign in to comment.