Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(tree): add cross-block caching #13769

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion crates/engine/tree/src/tree/cached_state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! Implements a state provider that has a shared cache in front of it.
use alloy_primitives::{map::B256HashMap, Address, StorageKey, StorageValue, B256};
use metrics::Gauge;
use moka::sync::CacheBuilder;
use moka::{sync::CacheBuilder, PredicateError};
use reth_errors::ProviderResult;
use reth_metrics::Metrics;
use reth_primitives::{Account, Bytecode};
use reth_provider::{
AccountReader, BlockHashReader, HashedPostStateProvider, StateProofProvider, StateProvider,
StateRootProvider, StorageRootProvider,
};
use reth_revm::db::BundleState;
use reth_trie::{
updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
Expand Down Expand Up @@ -44,6 +45,74 @@ where
}
}

impl<S> CachedStateProvider<S> {
/// Creates a new [`SavedCache`] from the given state updates, block hash, and account storage
/// roots.
///
/// This does not update the code cache, because no changes are required to the code cache on
/// state change.
///
/// NOTE: Consumers should ensure that these caches are not in use by a state provider for a
/// previous block - otherwise, this update will cause that state provider to contain future
/// state, which would be incorrect.
pub(crate) fn save_cache(
self,
executed_block_hash: B256,
state_updates: &BundleState,
) -> Result<SavedCache, PredicateError> {
let Self { caches, metrics, state_provider: _ } = self;

for (addr, account) in &state_updates.state {
// If the account was not modified, as in not changed and not destroyed, then we have
// nothing to do w.r.t. this particular account and can move on
if account.status.is_not_modified() {
continue
}

// if the account was destroyed, invalidate from the account / storage caches
if account.was_destroyed() {
// invalidate the account cache entry if destroyed
caches.account_cache.invalidate(addr);

// have to dereference here or else the closure moves the state update's lifetime
// into the closure / out of the method body
let addr = *addr;

// we also do not need to keep track of the returned PredicateId string
caches
.storage_cache
.invalidate_entries_if(move |(account_addr, _), _| addr == *account_addr)?;
continue
}

// if we have an account that was modified, but it has a `None` account info, some wild
// error has occurred because this state should be unrepresentable. An account with
// `None` current info, should be destroyed.
let Some(ref account_info) = account.info else {
todo!("error handling - a modified account has None info")
};

// insert will update if present, so we just use the new account info as the new value
// for the account cache
caches.account_cache.insert(*addr, Some(Account::from(account_info)));

// now we iterate over all storage and make updates to the cached storage values
for (storage_key, slot) in &account.storage {
// we convert the storage key from U256 to B256 because that is how it's represented
// in the cache
caches
.storage_cache
.insert((*addr, (*storage_key).into()), Some(slot.present_value));
}
}

// create a saved cache with the executed block hash, same metrics, and updated caches
let saved_cache = SavedCache { hash: executed_block_hash, caches, metrics };

Ok(saved_cache)
}
}

/// Metrics for the cached state provider, showing hits / misses for each cache
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.caching")]
Expand Down Expand Up @@ -269,7 +338,10 @@ impl ProviderCacheBuilder {
ProviderCaches {
code_cache: CacheBuilder::new(self.code_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
// we build the storage cache with closure invalidation so we can use
// `invalidate_entries_if` for storage invalidation
storage_cache: CacheBuilder::new(self.storage_cache_size)
.support_invalidation_closures()
.build_with_hasher(DefaultHashBuilder::default()),
account_cache: CacheBuilder::new(self.account_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
Expand All @@ -286,3 +358,29 @@ impl Default for ProviderCacheBuilder {
Self { code_cache_size: 1000000, storage_cache_size: 1000000, account_cache_size: 1000000 }
}
}

/// A saved cache that has been used for executing a specific block, which has been updated for its
/// execution.
#[derive(Debug)]
pub(crate) struct SavedCache {
/// The hash of the block these caches were used to execute.
hash: B256,

/// The caches used for the provider.
caches: ProviderCaches,

/// Metrics for the cached state provider
metrics: CachedStateMetrics,
}

impl SavedCache {
/// Returns the hash for this cache
pub(crate) const fn executed_block_hash(&self) -> B256 {
self.hash
}

/// Splits the cache into its caches and metrics, consuming it.
pub(crate) fn split(self) -> (ProviderCaches, CachedStateMetrics) {
(self.caches, self.metrics)
}
}
32 changes: 30 additions & 2 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use alloy_rpc_types_engine::{
PayloadValidationError,
};
use block_buffer::BlockBuffer;
use cached_state::SavedCache;
use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
use reth_chain_state::{
CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain,
Expand Down Expand Up @@ -538,6 +539,8 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
/// The engine API variant of this handler
engine_kind: EngineApiKind,
/// The most recent cache used for execution.
most_recent_cache: Option<SavedCache>,
/// state root task thread pool
state_root_task_pool: Arc<rayon::ThreadPool>,
}
Expand Down Expand Up @@ -632,6 +635,7 @@ where
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
engine_kind,
most_recent_cache: None,
state_root_task_pool,
}
}
Expand Down Expand Up @@ -2192,6 +2196,19 @@ where
Ok(None)
}

/// This fetches the most recent saved cache, using the hash of the block we are trying to
/// execute on top of.
///
/// If the hash does not match the saved cache's hash, then the only saved cache doesn't contain
/// state useful for this block's execution, and we return `None`.
///
/// If there is no cache saved, this returns `None`.
///
/// This `take`s the cache, to avoid cloning the entire cache.
fn take_latest_cache(&mut self, parent_hash: B256) -> Option<SavedCache> {
self.most_recent_cache.take_if(|cache| cache.executed_block_hash() == parent_hash)
}

fn insert_block_without_senders(
&mut self,
block: SealedBlockFor<N::Block>,
Expand Down Expand Up @@ -2260,8 +2277,13 @@ where

// Use cached state provider before executing, this does nothing currently, will be used in
// prewarming
let caches = ProviderCacheBuilder::default().build_caches();
let cache_metrics = CachedStateMetrics::zeroed();
let (caches, cache_metrics) =
if let Some(cache) = self.take_latest_cache(block.parent_hash()) {
cache.split()
} else {
(ProviderCacheBuilder::default().build_caches(), CachedStateMetrics::zeroed())
};

let state_provider =
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);

Expand Down Expand Up @@ -2464,6 +2486,12 @@ where
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
debug!(target: "engine::tree", ?root_elapsed, block=?sealed_block.num_hash(), "Calculated state root");

// apply state updates to cache and save it
let Ok(saved_cache) = state_provider.save_cache(sealed_block.hash(), &output.state) else {
todo!("error bubbling for save_cache errors")
};
self.most_recent_cache = Some(saved_cache);
Comment on lines +2490 to +2493
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we even do error handling here?
we could just reset to None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, yeah let's just reset in this case


let executed: ExecutedBlock<N> = ExecutedBlock {
block: sealed_block.clone(),
senders: Arc::new(block.senders().to_vec()),
Expand Down
Loading