Skip to content

Commit

Permalink
Merge pull request #622 from EspressoSystems/abdul/save-pruned-height
Browse files Browse the repository at this point in the history
Do batch delete and pruned height upsert in one transaction block
  • Loading branch information
imabdulbasit authored Jun 5, 2024
2 parents 84b17ec + e3a0a84 commit 9c11647
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 21 deletions.
15 changes: 6 additions & 9 deletions src/data_source/fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,25 +362,22 @@ where
BackgroundTask::spawn("pruner", async move {
for i in 1.. {
sleep(cfg.interval()).await;
tracing::info!("pruner woke up for the {i}th time",);
tracing::warn!("pruner woke up for the {i}th time");

{
let mut storage = fetcher.storage.write().await;

match storage.storage.prune().await {
Ok(Some(height)) => {
storage.pruned_height = Some(height);
if let Err(e) = storage.storage.save_pruned_height(height).await {
tracing::error!("failed to save pruned height: {e:?}");
continue;
}
if let Err(e) = storage.commit().await {
tracing::error!("failed to commit: {e:?}")
}
tracing::warn!(
"pruner run {i} succeeded. Pruned to height {height}"
);
}
Ok(None) => (),
Err(e) => {
tracing::error!("pruning failed: {e:?}");
tracing::error!("pruner run {i} failed: {e:?}");
storage.revert().await;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/data_source/storage/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ impl Default for PrunerCfg {
batch_size: 1000,
// 80%
max_usage: 8000,
// 1 hour
interval: Duration::from_secs(3600),
// 1.5 hour
interval: Duration::from_secs(5400),
}
}
}
33 changes: 23 additions & 10 deletions src/data_source/storage/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,17 @@ impl PrunerConfig for SqlStorage {
}

impl SqlStorage {
async fn delete_batch(&mut self, height: u64) -> QueryResult<()> {
let mut tx = self.transaction().await?;

tx.execute("DELETE FROM header WHERE height <= $1", &[&(height as i64)])
.await?;

self.save_pruned_height(height).await?;

Ok(())
}

async fn get_minimum_height(&self) -> QueryResult<Option<u64>> {
let row = self
.query_one_static("SELECT MIN(height) as height FROM header")
Expand Down Expand Up @@ -585,11 +596,13 @@ impl PruneStorage for SqlStorage {
if let Some(target_height) = target_height {
while height < target_height {
height = min(height + batch_size, target_height);
self.query_opt("DELETE FROM header WHERE height <= $1", &[&(height as i64)])
.await?;
self.delete_batch(height).await?;
self.commit().await.map_err(|e| QueryError::Error {
message: format!("failed to commit {e}"),
})?;
pruned_height = Some(height);

tracing::info!("Pruned data up to height {height}");
tracing::warn!("Pruned data up to height {height}");
}
}

Expand All @@ -616,14 +629,14 @@ impl PruneStorage for SqlStorage {
&& height < min_retention_height
{
height = min(height + batch_size, min_retention_height);
self.query_opt(
"DELETE FROM header WHERE height <= $1",
&[&(height as i64)],
)
.await?;
usage = self.get_disk_usage().await?;
self.delete_batch(height).await?;
self.commit().await.map_err(|e| QueryError::Error {
message: format!("failed to commit {e}"),
})?;
pruned_height = Some(height);
tracing::info!("Pruned data up to height {height}");
tracing::warn!("Pruned data up to height {height}");

usage = self.get_disk_usage().await?;
}
}
}
Expand Down

0 comments on commit 9c11647

Please sign in to comment.