From efd69064df72247f1ed557eacc032311f4a8b15a Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 13 Dec 2022 09:57:26 +0000 Subject: [PATCH] Enable proposer boost re-orging (#2860) With proposer boosting implemented (#2822) we have an opportunity to re-org out late blocks. This PR adds three flags to the BN to control this behaviour: * `--disable-proposer-reorgs`: turn aggressive re-orging off (it's on by default). * `--proposer-reorg-threshold N`: attempt to orphan blocks with less than N% of the committee vote. If this parameter isn't set then N defaults to 20% when the feature is enabled. * `--proposer-reorg-epochs-since-finalization N`: only attempt to re-org late blocks when the number of epochs since finalization is less than or equal to N. The default is 2 epochs, meaning re-orgs will only be attempted when the chain is finalizing optimally. For safety Lighthouse will only attempt a re-org under very specific conditions: 1. The block being proposed is 1 slot after the canonical head, and the canonical head is 1 slot after its parent. i.e. at slot `n + 1` rather than building on the block from slot `n` we build on the block from slot `n - 1`. 2. The current canonical head received less than N% of the committee vote. N should be set depending on the proposer boost fraction itself, the fraction of the network that is believed to be applying it, and the size of the largest entity that could be hoarding votes. 3. The current canonical head arrived after the attestation deadline from our perspective. This condition was only added to support suppression of forkchoiceUpdated messages, but makes intuitive sense. 4. The block is being proposed in the first 2 seconds of the slot. This gives it time to propagate and receive the proposer boost. For the initial idea and background, see: https://github.com/ethereum/consensus-specs/pull/2353#issuecomment-950238004 There is also a specification for this feature here: https://github.com/ethereum/consensus-specs/pull/3034 Co-authored-by: Michael Sproul Co-authored-by: pawan --- .github/custom/clippy.toml | 1 + Cargo.lock | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 643 ++++++++++++++---- .../src/beacon_fork_choice_store.rs | 53 +- beacon_node/beacon_chain/src/builder.rs | 26 +- .../beacon_chain/src/canonical_head.rs | 31 +- beacon_node/beacon_chain/src/chain_config.rs | 25 +- beacon_node/beacon_chain/src/fork_revert.rs | 3 +- beacon_node/beacon_chain/src/lib.rs | 4 +- beacon_node/beacon_chain/src/metrics.rs | 34 +- .../beacon_chain/src/proposer_prep_service.rs | 13 +- .../beacon_chain/src/snapshot_cache.rs | 7 +- .../beacon_chain/src/state_advance_timer.rs | 15 +- beacon_node/beacon_chain/src/test_utils.rs | 127 +++- beacon_node/beacon_chain/tests/merge.rs | 2 + .../tests/payload_invalidation.rs | 21 +- beacon_node/client/src/builder.rs | 6 +- beacon_node/execution_layer/src/lib.rs | 26 + .../test_utils/execution_block_generator.rs | 162 ++++- .../src/test_utils/handle_rpc.rs | 8 + .../execution_layer/src/test_utils/hook.rs | 27 + .../src/test_utils/mock_execution_layer.rs | 15 + .../execution_layer/src/test_utils/mod.rs | 7 +- beacon_node/http_api/tests/common.rs | 27 +- .../http_api/tests/interactive_tests.rs | 506 +++++++++++++- beacon_node/src/cli.rs | 32 + beacon_node/src/config.rs | 26 + book/src/SUMMARY.md | 1 + book/src/builders.md | 12 +- book/src/late-block-re-orgs.md | 60 ++ common/task_executor/src/test_utils.rs | 7 + consensus/fork_choice/src/fork_choice.rs | 83 ++- .../fork_choice/src/fork_choice_store.rs | 3 +- consensus/fork_choice/tests/tests.rs | 8 +- consensus/proto_array/Cargo.toml | 1 + consensus/proto_array/src/error.rs | 9 + .../src/fork_choice_test_definition.rs | 17 +- .../execution_status.rs | 2 +- .../proto_array/src/justified_balances.rs | 62 ++ consensus/proto_array/src/lib.rs | 11 +- consensus/proto_array/src/proto_array.rs | 43 +- .../src/proto_array_fork_choice.rs | 278 +++++++- consensus/proto_array/src/ssz_container.rs | 18 +- lcli/src/main.rs | 8 + lcli/src/new_testnet.rs | 4 + lighthouse/tests/beacon_node.rs | 72 ++ lighthouse/tests/validator_client.rs | 18 + scripts/local_testnet/vars.env | 3 + validator_client/src/cli.rs | 14 +- validator_client/src/config.rs | 13 + validator_client/src/lib.rs | 1 + 51 files changed, 2270 insertions(+), 326 deletions(-) create mode 100644 beacon_node/execution_layer/src/test_utils/hook.rs create mode 100644 book/src/late-block-re-orgs.md create mode 100644 consensus/proto_array/src/justified_balances.rs diff --git a/.github/custom/clippy.toml b/.github/custom/clippy.toml index f50e35bcdfd..3ccbeee44a7 100644 --- a/.github/custom/clippy.toml +++ b/.github/custom/clippy.toml @@ -16,6 +16,7 @@ async-wrapper-methods = [ "task_executor::TaskExecutor::spawn_blocking_handle", "warp_utils::task::blocking_task", "warp_utils::task::blocking_json_task", + "beacon_chain::beacon_chain::BeaconChain::spawn_blocking_handle", "validator_client::http_api::blocking_signed_json_task", "execution_layer::test_utils::MockServer::new", "execution_layer::test_utils::MockServer::new_with_config", diff --git a/Cargo.lock b/Cargo.lock index 86f239afdd2..52d2c14a666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4998,6 +4998,7 @@ version = "0.2.0" dependencies = [ "eth2_ssz", "eth2_ssz_derive", + "safe_arith", "serde", "serde_derive", "serde_yaml", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 01e0ca0888c..6e3b70b29fb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -46,9 +46,8 @@ use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; -use crate::proposer_prep_service::PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; -use crate::snapshot_cache::SnapshotCache; +use crate::snapshot_cache::{BlockProductionPreState, SnapshotCache}; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; @@ -58,9 +57,7 @@ use crate::validator_monitor::{ HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, }; use crate::validator_pubkey_cache::ValidatorPubkeyCache; -use crate::BeaconForkChoiceStore; -use crate::BeaconSnapshot; -use crate::{metrics, BeaconChainError}; +use crate::{metrics, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead}; use eth2::types::{EventKind, SseBlock, SyncDuty}; use execution_layer::{ BuilderParams, ChainHealth, ExecutionLayer, FailedCondition, PayloadAttributes, PayloadStatus, @@ -75,7 +72,7 @@ use itertools::process_results; use itertools::Itertools; use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; -use proto_array::CountUnrealizedFull; +use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError}; use safe_arith::SafeArith; use slasher::Slasher; use slog::{crit, debug, error, info, trace, warn, Logger}; @@ -106,6 +103,7 @@ use store::{ use task_executor::{ShutdownReason, TaskExecutor}; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; +use types::consts::merge::INTERVALS_PER_SLOT; use types::*; use types::signed_block_and_blobs::BlockMaybeBlobs; @@ -128,6 +126,12 @@ pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1) /// The timeout for the eth1 finalization cache pub const ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_millis(200); +/// The latest delay from the start of the slot at which to attempt a 1-slot re-org. +fn max_re_org_slot_delay(seconds_per_slot: u64) -> Duration { + // Allow at least half of the attestation deadline for the block to propagate. + Duration::from_secs(seconds_per_slot) / INTERVALS_PER_SLOT as u32 / 2 +} + // These keys are all zero because they get stored in different columns, see `DBColumn` type. pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::zero(); pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero(); @@ -189,6 +193,21 @@ pub enum ProduceBlockVerification { NoVerification, } +/// Payload attributes for which the `beacon_chain` crate is responsible. +pub struct PrePayloadAttributes { + pub proposer_index: u64, + pub prev_randao: Hash256, +} + +/// Define whether a forkchoiceUpdate needs to be checked for an override (`Yes`) or has already +/// been checked (`AlreadyApplied`). It is safe to specify `Yes` even if re-orgs are disabled. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum OverrideForkchoiceUpdate { + #[default] + Yes, + AlreadyApplied, +} + /// The accepted clock drift for nodes gossiping blocks and attestations. See: /// /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/p2p-interface.md#configuration @@ -2903,6 +2922,7 @@ impl BeaconChain { if !payload_verification_status.is_optimistic() && block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot { + let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE); match fork_choice.get_head(current_slot, &self.spec) { // This block became the head, add it to the early attester cache. Ok(new_head_root) if new_head_root == block_root => { @@ -2936,6 +2956,7 @@ impl BeaconChain { "error" => ?e ), } + drop(fork_choice_timer); } // Register sync aggregate with validator monitor @@ -3310,6 +3331,7 @@ impl BeaconChain { // signed. If we miss the cache or we're producing a block that conflicts with the head, // fall back to getting the head from `slot - 1`. let state_load_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_STATE_LOAD_TIMES); + // Atomically read some values from the head whilst avoiding holding cached head `Arc` any // longer than necessary. let (head_slot, head_block_root) = { @@ -3317,8 +3339,19 @@ impl BeaconChain { (head.head_slot(), head.head_block_root()) }; let (state, state_root_opt) = if head_slot < slot { + // Attempt an aggressive re-org if configured and the conditions are right. + if let Some(re_org_state) = self.get_state_for_re_org(slot, head_slot, head_block_root) + { + info!( + self.log, + "Proposing block to re-org current head"; + "slot" => slot, + "head_to_reorg" => %head_block_root, + ); + (re_org_state.pre_state, re_org_state.state_root) + } // Normal case: proposing a block atop the current head. Use the snapshot cache. - if let Some(pre_state) = self + else if let Some(pre_state) = self .snapshot_cache .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .and_then(|snapshot_cache| { @@ -3358,6 +3391,400 @@ impl BeaconChain { Ok((state, state_root_opt)) } + /// Fetch the beacon state to use for producing a block if a 1-slot proposer re-org is viable. + /// + /// This function will return `None` if proposer re-orgs are disabled. + fn get_state_for_re_org( + &self, + slot: Slot, + head_slot: Slot, + canonical_head: Hash256, + ) -> Option> { + let re_org_threshold = self.config.re_org_threshold?; + + if self.spec.proposer_score_boost.is_none() { + warn!( + self.log, + "Ignoring proposer re-org configuration"; + "reason" => "this network does not have proposer boost enabled" + ); + return None; + } + + let slot_delay = self + .slot_clock + .seconds_from_current_slot_start(self.spec.seconds_per_slot) + .or_else(|| { + warn!( + self.log, + "Not attempting re-org"; + "error" => "unable to read slot clock" + ); + None + })?; + + // Attempt a proposer re-org if: + // + // 1. It seems we have time to propagate and still receive the proposer boost. + // 2. The current head block was seen late. + // 3. The `get_proposer_head` conditions from fork choice pass. + let proposing_on_time = slot_delay < max_re_org_slot_delay(self.spec.seconds_per_slot); + if !proposing_on_time { + debug!( + self.log, + "Not attempting re-org"; + "reason" => "not proposing on time", + ); + return None; + } + + let head_late = self.block_observed_after_attestation_deadline(canonical_head, head_slot); + if !head_late { + debug!( + self.log, + "Not attempting re-org"; + "reason" => "head not late" + ); + return None; + } + + // Is the current head weak and appropriate for re-orging? + let proposer_head_timer = + metrics::start_timer(&metrics::BLOCK_PRODUCTION_GET_PROPOSER_HEAD_TIMES); + let proposer_head = self + .canonical_head + .fork_choice_read_lock() + .get_proposer_head( + slot, + canonical_head, + re_org_threshold, + self.config.re_org_max_epochs_since_finalization, + ) + .map_err(|e| match e { + ProposerHeadError::DoNotReOrg(reason) => { + debug!( + self.log, + "Not attempting re-org"; + "reason" => %reason, + ); + } + ProposerHeadError::Error(e) => { + warn!( + self.log, + "Not attempting re-org"; + "error" => ?e, + ); + } + }) + .ok()?; + drop(proposer_head_timer); + let re_org_parent_block = proposer_head.parent_node.root; + + // Only attempt a re-org if we hit the snapshot cache. + let pre_state = self + .snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .and_then(|snapshot_cache| { + snapshot_cache.get_state_for_block_production(re_org_parent_block) + }) + .or_else(|| { + debug!( + self.log, + "Not attempting re-org"; + "reason" => "missed snapshot cache", + "parent_block" => ?re_org_parent_block, + ); + None + })?; + + info!( + self.log, + "Attempting re-org due to weak head"; + "weak_head" => ?canonical_head, + "parent" => ?re_org_parent_block, + "head_weight" => proposer_head.head_node.weight, + "threshold_weight" => proposer_head.re_org_weight_threshold + ); + + Some(pre_state) + } + + /// Get the proposer index and `prev_randao` value for a proposal at slot `proposal_slot`. + /// + /// The `proposer_head` may be the head block of `cached_head` or its parent. An error will + /// be returned for any other value. + pub fn get_pre_payload_attributes( + &self, + proposal_slot: Slot, + proposer_head: Hash256, + cached_head: &CachedHead, + ) -> Result, Error> { + let proposal_epoch = proposal_slot.epoch(T::EthSpec::slots_per_epoch()); + + let head_block_root = cached_head.head_block_root(); + let parent_block_root = cached_head.parent_block_root(); + + // The proposer head must be equal to the canonical head or its parent. + if proposer_head != head_block_root && proposer_head != parent_block_root { + warn!( + self.log, + "Unable to compute payload attributes"; + "block_root" => ?proposer_head, + "head_block_root" => ?head_block_root, + ); + return Ok(None); + } + + // Compute the proposer index. + let head_epoch = cached_head.head_slot().epoch(T::EthSpec::slots_per_epoch()); + let shuffling_decision_root = if head_epoch == proposal_epoch { + cached_head + .snapshot + .beacon_state + .proposer_shuffling_decision_root(proposer_head)? + } else { + proposer_head + }; + let cached_proposer = self + .beacon_proposer_cache + .lock() + .get_slot::(shuffling_decision_root, proposal_slot); + let proposer_index = if let Some(proposer) = cached_proposer { + proposer.index as u64 + } else { + if head_epoch + 2 < proposal_epoch { + warn!( + self.log, + "Skipping proposer preparation"; + "msg" => "this is a non-critical issue that can happen on unhealthy nodes or \ + networks.", + "proposal_epoch" => proposal_epoch, + "head_epoch" => head_epoch, + ); + + // Don't skip the head forward more than two epochs. This avoids burdening an + // unhealthy node. + // + // Although this node might miss out on preparing for a proposal, they should still + // be able to propose. This will prioritise beacon chain health over efficient + // packing of execution blocks. + return Ok(None); + } + + let (proposers, decision_root, _, fork) = + compute_proposer_duties_from_head(proposal_epoch, self)?; + + let proposer_offset = (proposal_slot % T::EthSpec::slots_per_epoch()).as_usize(); + let proposer = *proposers + .get(proposer_offset) + .ok_or(BeaconChainError::NoProposerForSlot(proposal_slot))?; + + self.beacon_proposer_cache.lock().insert( + proposal_epoch, + decision_root, + proposers, + fork, + )?; + + // It's possible that the head changes whilst computing these duties. If so, abandon + // this routine since the change of head would have also spawned another instance of + // this routine. + // + // Exit now, after updating the cache. + if decision_root != shuffling_decision_root { + warn!( + self.log, + "Head changed during proposer preparation"; + ); + return Ok(None); + } + + proposer as u64 + }; + + // Get the `prev_randao` value. + let prev_randao = if proposer_head == parent_block_root { + cached_head.parent_random() + } else { + cached_head.head_random() + }?; + + Ok(Some(PrePayloadAttributes { + proposer_index, + prev_randao, + })) + } + + /// Determine whether a fork choice update to the execution layer should be overridden. + /// + /// This is *only* necessary when proposer re-orgs are enabled, because we have to prevent the + /// execution layer from enshrining the block we want to re-org as the head. + /// + /// This function uses heuristics that align quite closely but not exactly with the re-org + /// conditions set out in `get_state_for_re_org` and `get_proposer_head`. The differences are + /// documented below. + fn overridden_forkchoice_update_params( + &self, + canonical_forkchoice_params: ForkchoiceUpdateParameters, + ) -> Result { + self.overridden_forkchoice_update_params_or_failure_reason(&canonical_forkchoice_params) + .or_else(|e| match e { + ProposerHeadError::DoNotReOrg(reason) => { + trace!( + self.log, + "Not suppressing fork choice update"; + "reason" => %reason, + ); + Ok(canonical_forkchoice_params) + } + ProposerHeadError::Error(e) => Err(e), + }) + } + + fn overridden_forkchoice_update_params_or_failure_reason( + &self, + canonical_forkchoice_params: &ForkchoiceUpdateParameters, + ) -> Result> { + let _timer = metrics::start_timer(&metrics::FORK_CHOICE_OVERRIDE_FCU_TIMES); + + // Never override if proposer re-orgs are disabled. + let re_org_threshold = self + .config + .re_org_threshold + .ok_or(DoNotReOrg::ReOrgsDisabled)?; + + let head_block_root = canonical_forkchoice_params.head_root; + + // Perform initial checks and load the relevant info from fork choice. + let info = self + .canonical_head + .fork_choice_read_lock() + .get_preliminary_proposer_head( + head_block_root, + re_org_threshold, + self.config.re_org_max_epochs_since_finalization, + ) + .map_err(|e| e.map_inner_error(Error::ProposerHeadForkChoiceError))?; + + // The slot of our potential re-org block is always 1 greater than the head block because we + // only attempt single-slot re-orgs. + let head_slot = info.head_node.slot; + let re_org_block_slot = head_slot + 1; + let fork_choice_slot = info.current_slot; + + // If a re-orging proposal isn't made by the `max_re_org_slot_delay` then we give up + // and allow the fork choice update for the canonical head through so that we may attest + // correctly. + let current_slot_ok = if head_slot == fork_choice_slot { + true + } else if re_org_block_slot == fork_choice_slot { + self.slot_clock + .start_of(re_org_block_slot) + .and_then(|slot_start| { + let now = self.slot_clock.now_duration()?; + let slot_delay = now.saturating_sub(slot_start); + Some(slot_delay <= max_re_org_slot_delay(self.spec.seconds_per_slot)) + }) + .unwrap_or(false) + } else { + false + }; + if !current_slot_ok { + return Err(DoNotReOrg::HeadDistance.into()); + } + + // Only attempt a re-org if we have a proposer registered for the re-org slot. + let proposing_at_re_org_slot = { + // The proposer shuffling has the same decision root as the next epoch attestation + // shuffling. We know our re-org block is not on the epoch boundary, so it has the + // same proposer shuffling as the head (but not necessarily the parent which may lie + // in the previous epoch). + let shuffling_decision_root = info + .head_node + .next_epoch_shuffling_id + .shuffling_decision_block; + let proposer_index = self + .beacon_proposer_cache + .lock() + .get_slot::(shuffling_decision_root, re_org_block_slot) + .ok_or_else(|| { + debug!( + self.log, + "Fork choice override proposer shuffling miss"; + "slot" => re_org_block_slot, + "decision_root" => ?shuffling_decision_root, + ); + DoNotReOrg::NotProposing + })? + .index as u64; + + self.execution_layer + .as_ref() + .ok_or(ProposerHeadError::Error(Error::ExecutionLayerMissing))? + .has_proposer_preparation_data_blocking(proposer_index) + }; + if !proposing_at_re_org_slot { + return Err(DoNotReOrg::NotProposing.into()); + } + + // If the current slot is already equal to the proposal slot (or we are in the tail end of + // the prior slot), then check the actual weight of the head against the re-org threshold. + let head_weak = if fork_choice_slot == re_org_block_slot { + info.head_node.weight < info.re_org_weight_threshold + } else { + true + }; + if !head_weak { + return Err(DoNotReOrg::HeadNotWeak { + head_weight: info.head_node.weight, + re_org_weight_threshold: info.re_org_weight_threshold, + } + .into()); + } + + // Check that the head block arrived late and is vulnerable to a re-org. This check is only + // a heuristic compared to the proper weight check in `get_state_for_re_org`, the reason + // being that we may have only *just* received the block and not yet processed any + // attestations for it. We also can't dequeue attestations for the block during the + // current slot, which would be necessary for determining its weight. + let head_block_late = + self.block_observed_after_attestation_deadline(head_block_root, head_slot); + if !head_block_late { + return Err(DoNotReOrg::HeadNotLate.into()); + } + + let parent_head_hash = info.parent_node.execution_status.block_hash(); + let forkchoice_update_params = ForkchoiceUpdateParameters { + head_root: info.parent_node.root, + head_hash: parent_head_hash, + justified_hash: canonical_forkchoice_params.justified_hash, + finalized_hash: canonical_forkchoice_params.finalized_hash, + }; + + debug!( + self.log, + "Fork choice update overridden"; + "canonical_head" => ?head_block_root, + "override" => ?info.parent_node.root, + "slot" => fork_choice_slot, + ); + + Ok(forkchoice_update_params) + } + + /// Check if the block with `block_root` was observed after the attestation deadline of `slot`. + fn block_observed_after_attestation_deadline(&self, block_root: Hash256, slot: Slot) -> bool { + let block_delays = self.block_times_cache.read().get_block_delays( + block_root, + self.slot_clock + .start_of(slot) + .unwrap_or_else(|| Duration::from_secs(0)), + ); + block_delays.observed.map_or(false, |delay| { + delay > self.slot_clock.unagg_attestation_production_delay() + }) + } + /// Produce a block for some `slot` upon the given `state`. /// /// Typically the `self.produce_block()` function should be used, instead of calling this @@ -3943,17 +4370,13 @@ impl BeaconChain { /// The `PayloadAttributes` are used by the EL to give it a look-ahead for preparing an optimal /// set of transactions for a new `ExecutionPayload`. /// - /// This function will result in a call to `forkchoiceUpdated` on the EL if: - /// - /// 1. We're in the tail-end of the slot (as defined by PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR) - /// 2. The head block is one slot (or less) behind the prepare slot (e.g., we're preparing for - /// the next slot and the block at the current slot is already known). + /// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the + /// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`). pub async fn prepare_beacon_proposer( self: &Arc, current_slot: Slot, ) -> Result<(), Error> { let prepare_slot = current_slot + 1; - let prepare_epoch = prepare_slot.epoch(T::EthSpec::slots_per_epoch()); // There's no need to run the proposer preparation routine before the bellatrix fork. if self.slot_is_prior_to_bellatrix(prepare_slot) { @@ -3971,158 +4394,99 @@ impl BeaconChain { return Ok(()); } - // Atomically read some values from the canonical head, whilst avoiding holding the cached - // head `Arc` any longer than necessary. + // Load the cached head and its forkchoice update parameters. // // Use a blocking task since blocking the core executor on the canonical head read lock can // block the core tokio executor. let chain = self.clone(); - let (head_slot, head_root, head_decision_root, head_random, forkchoice_update_params) = - self.spawn_blocking_handle( + let maybe_prep_data = self + .spawn_blocking_handle( move || { let cached_head = chain.canonical_head.cached_head(); - let head_block_root = cached_head.head_block_root(); - let decision_root = cached_head - .snapshot - .beacon_state - .proposer_shuffling_decision_root(head_block_root)?; - Ok::<_, Error>(( - cached_head.head_slot(), - head_block_root, - decision_root, - cached_head.head_random()?, - cached_head.forkchoice_update_parameters(), - )) + + // Don't bother with proposer prep if the head is more than + // `PREPARE_PROPOSER_HISTORIC_EPOCHS` prior to the current slot. + // + // This prevents the routine from running during sync. + let head_slot = cached_head.head_slot(); + if head_slot + T::EthSpec::slots_per_epoch() * PREPARE_PROPOSER_HISTORIC_EPOCHS + < current_slot + { + debug!( + chain.log, + "Head too old for proposer prep"; + "head_slot" => head_slot, + "current_slot" => current_slot, + ); + return Ok(None); + } + + let canonical_fcu_params = cached_head.forkchoice_update_parameters(); + let fcu_params = + chain.overridden_forkchoice_update_params(canonical_fcu_params)?; + let pre_payload_attributes = chain.get_pre_payload_attributes( + prepare_slot, + fcu_params.head_root, + &cached_head, + )?; + Ok::<_, Error>(Some((fcu_params, pre_payload_attributes))) }, - "prepare_beacon_proposer_fork_choice_read", + "prepare_beacon_proposer_head_read", ) .await??; - let head_epoch = head_slot.epoch(T::EthSpec::slots_per_epoch()); - - // Don't bother with proposer prep if the head is more than - // `PREPARE_PROPOSER_HISTORIC_EPOCHS` prior to the current slot. - // - // This prevents the routine from running during sync. - if head_slot + T::EthSpec::slots_per_epoch() * PREPARE_PROPOSER_HISTORIC_EPOCHS - < current_slot - { - debug!( - self.log, - "Head too old for proposer prep"; - "head_slot" => head_slot, - "current_slot" => current_slot, - ); - return Ok(()); - } - - // Ensure that the shuffling decision root is correct relative to the epoch we wish to - // query. - let shuffling_decision_root = if head_epoch == prepare_epoch { - head_decision_root - } else { - head_root - }; - - // Read the proposer from the proposer cache. - let cached_proposer = self - .beacon_proposer_cache - .lock() - .get_slot::(shuffling_decision_root, prepare_slot); - let proposer = if let Some(proposer) = cached_proposer { - proposer.index - } else { - if head_epoch + 2 < prepare_epoch { - warn!( - self.log, - "Skipping proposer preparation"; - "msg" => "this is a non-critical issue that can happen on unhealthy nodes or \ - networks.", - "prepare_epoch" => prepare_epoch, - "head_epoch" => head_epoch, - ); - - // Don't skip the head forward more than two epochs. This avoids burdening an - // unhealthy node. - // - // Although this node might miss out on preparing for a proposal, they should still - // be able to propose. This will prioritise beacon chain health over efficient - // packing of execution blocks. - return Ok(()); - } - - let (proposers, decision_root, _, fork) = - compute_proposer_duties_from_head(prepare_epoch, self)?; - let proposer_index = prepare_slot.as_usize() % (T::EthSpec::slots_per_epoch() as usize); - let proposer = *proposers - .get(proposer_index) - .ok_or(BeaconChainError::NoProposerForSlot(prepare_slot))?; - - self.beacon_proposer_cache.lock().insert( - prepare_epoch, - decision_root, - proposers, - fork, - )?; - - // It's possible that the head changes whilst computing these duties. If so, abandon - // this routine since the change of head would have also spawned another instance of - // this routine. - // - // Exit now, after updating the cache. - if decision_root != shuffling_decision_root { - warn!( - self.log, - "Head changed during proposer preparation"; - ); + let (forkchoice_update_params, pre_payload_attributes) = + if let Some((fcu, Some(pre_payload))) = maybe_prep_data { + (fcu, pre_payload) + } else { + // Appropriate log messages have already been logged above and in + // `get_pre_payload_attributes`. return Ok(()); - } - - proposer - }; + }; // If the execution layer doesn't have any proposer data for this validator then we assume // it's not connected to this BN and no action is required. + let proposer = pre_payload_attributes.proposer_index; if !execution_layer - .has_proposer_preparation_data(proposer as u64) + .has_proposer_preparation_data(proposer) .await { return Ok(()); } + let head_root = forkchoice_update_params.head_root; let payload_attributes = PayloadAttributes { timestamp: self .slot_clock .start_of(prepare_slot) .ok_or(Error::InvalidSlot(prepare_slot))? .as_secs(), - prev_randao: head_random, - suggested_fee_recipient: execution_layer - .get_suggested_fee_recipient(proposer as u64) - .await, + prev_randao: pre_payload_attributes.prev_randao, + suggested_fee_recipient: execution_layer.get_suggested_fee_recipient(proposer).await, }; debug!( self.log, "Preparing beacon proposer"; "payload_attributes" => ?payload_attributes, - "head_root" => ?head_root, "prepare_slot" => prepare_slot, "validator" => proposer, + "parent_root" => ?head_root, ); let already_known = execution_layer - .insert_proposer(prepare_slot, head_root, proposer as u64, payload_attributes) + .insert_proposer(prepare_slot, head_root, proposer, payload_attributes) .await; + // Only push a log to the user if this is the first time we've seen this proposer for this // slot. if !already_known { info!( self.log, "Prepared beacon proposer"; - "already_known" => already_known, "prepare_slot" => prepare_slot, "validator" => proposer, + "parent_root" => ?head_root, ); } @@ -4144,27 +4508,22 @@ impl BeaconChain { return Ok(()); }; - // If either of the following are true, send a fork-choice update message to the - // EL: - // - // 1. We're in the tail-end of the slot (as defined by - // PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR) - // 2. The head block is one slot (or less) behind the prepare slot (e.g., we're - // preparing for the next slot and the block at the current slot is already - // known). - if till_prepare_slot - <= self.slot_clock.slot_duration() / PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR - || head_slot + 1 >= prepare_slot - { + // If we are close enough to the proposal slot, send an fcU, which will have payload + // attributes filled in by the execution layer cache we just primed. + if till_prepare_slot <= self.config.prepare_payload_lookahead { debug!( self.log, - "Pushing update to prepare proposer"; + "Sending forkchoiceUpdate for proposer prep"; "till_prepare_slot" => ?till_prepare_slot, "prepare_slot" => prepare_slot ); - self.update_execution_engine_forkchoice(current_slot, forkchoice_update_params) - .await?; + self.update_execution_engine_forkchoice( + current_slot, + forkchoice_update_params, + OverrideForkchoiceUpdate::AlreadyApplied, + ) + .await?; } Ok(()) @@ -4173,7 +4532,8 @@ impl BeaconChain { pub async fn update_execution_engine_forkchoice( self: &Arc, current_slot: Slot, - params: ForkchoiceUpdateParameters, + input_params: ForkchoiceUpdateParameters, + override_forkchoice_update: OverrideForkchoiceUpdate, ) -> Result<(), Error> { let next_slot = current_slot + 1; @@ -4195,6 +4555,19 @@ impl BeaconChain { .as_ref() .ok_or(Error::ExecutionLayerMissing)?; + // Determine whether to override the forkchoiceUpdated message if we want to re-org + // the current head at the next slot. + let params = if override_forkchoice_update == OverrideForkchoiceUpdate::Yes { + let chain = self.clone(); + self.spawn_blocking_handle( + move || chain.overridden_forkchoice_update_params(input_params), + "update_execution_engine_forkchoice_override", + ) + .await?? + } else { + input_params + }; + // Take the global lock for updating the execution engine fork choice. // // Whilst holding this lock we must: diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index d3b17ba1ea0..b17613da0d3 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -7,6 +7,8 @@ use crate::{metrics, BeaconSnapshot}; use derivative::Derivative; use fork_choice::ForkChoiceStore; +use proto_array::JustifiedBalances; +use safe_arith::ArithError; use ssz_derive::{Decode, Encode}; use std::collections::BTreeSet; use std::marker::PhantomData; @@ -31,6 +33,7 @@ pub enum Error { MissingState(Hash256), InvalidPersistedBytes(ssz::DecodeError), BeaconStateError(BeaconStateError), + Arith(ArithError), } impl From for Error { @@ -39,27 +42,15 @@ impl From for Error { } } +impl From for Error { + fn from(e: ArithError) -> Self { + Error::Arith(e) + } +} + /// The number of validator balance sets that are cached within `BalancesCache`. const MAX_BALANCE_CACHE_SIZE: usize = 4; -/// Returns the effective balances for every validator in the given `state`. -/// -/// Any validator who is not active in the epoch of the given `state` is assigned a balance of -/// zero. -pub fn get_effective_balances(state: &BeaconState) -> Vec { - state - .validators() - .iter() - .map(|validator| { - if validator.is_active_at(state.current_epoch()) { - validator.effective_balance - } else { - 0 - } - }) - .collect() -} - #[superstruct( variants(V8), variant_attributes(derive(PartialEq, Clone, Debug, Encode, Decode)), @@ -113,7 +104,7 @@ impl BalancesCache { let item = CacheItem { block_root: epoch_boundary_root, epoch, - balances: get_effective_balances(state), + balances: JustifiedBalances::from_justified_state(state)?.effective_balances, }; if self.items.len() == MAX_BALANCE_CACHE_SIZE { @@ -152,7 +143,7 @@ pub struct BeaconForkChoiceStore, Cold: ItemStore< time: Slot, finalized_checkpoint: Checkpoint, justified_checkpoint: Checkpoint, - justified_balances: Vec, + justified_balances: JustifiedBalances, best_justified_checkpoint: Checkpoint, unrealized_justified_checkpoint: Checkpoint, unrealized_finalized_checkpoint: Checkpoint, @@ -181,7 +172,7 @@ where pub fn get_forkchoice_store( store: Arc>, anchor: &BeaconSnapshot, - ) -> Self { + ) -> Result { let anchor_state = &anchor.beacon_state; let mut anchor_block_header = anchor_state.latest_block_header().clone(); if anchor_block_header.state_root == Hash256::zero() { @@ -194,13 +185,14 @@ where root: anchor_root, }; let finalized_checkpoint = justified_checkpoint; + let justified_balances = JustifiedBalances::from_justified_state(anchor_state)?; - Self { + Ok(Self { store, balances_cache: <_>::default(), time: anchor_state.slot(), justified_checkpoint, - justified_balances: anchor_state.balances().clone().into(), + justified_balances, finalized_checkpoint, best_justified_checkpoint: justified_checkpoint, unrealized_justified_checkpoint: justified_checkpoint, @@ -208,7 +200,7 @@ where proposer_boost_root: Hash256::zero(), equivocating_indices: BTreeSet::new(), _phantom: PhantomData, - } + }) } /// Save the current state of `Self` to a `PersistedForkChoiceStore` which can be stored to the @@ -219,7 +211,7 @@ where time: self.time, finalized_checkpoint: self.finalized_checkpoint, justified_checkpoint: self.justified_checkpoint, - justified_balances: self.justified_balances.clone(), + justified_balances: self.justified_balances.effective_balances.clone(), best_justified_checkpoint: self.best_justified_checkpoint, unrealized_justified_checkpoint: self.unrealized_justified_checkpoint, unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint, @@ -233,13 +225,15 @@ where persisted: PersistedForkChoiceStore, store: Arc>, ) -> Result { + let justified_balances = + JustifiedBalances::from_effective_balances(persisted.justified_balances)?; Ok(Self { store, balances_cache: persisted.balances_cache, time: persisted.time, finalized_checkpoint: persisted.finalized_checkpoint, justified_checkpoint: persisted.justified_checkpoint, - justified_balances: persisted.justified_balances, + justified_balances, best_justified_checkpoint: persisted.best_justified_checkpoint, unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint, unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint, @@ -279,7 +273,7 @@ where &self.justified_checkpoint } - fn justified_balances(&self) -> &[u64] { + fn justified_balances(&self) -> &JustifiedBalances { &self.justified_balances } @@ -314,8 +308,9 @@ where self.justified_checkpoint.root, self.justified_checkpoint.epoch, ) { + // NOTE: could avoid this re-calculation by introducing a `PersistedCacheItem`. metrics::inc_counter(&metrics::BALANCES_CACHE_HITS); - self.justified_balances = balances; + self.justified_balances = JustifiedBalances::from_effective_balances(balances)?; } else { metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES); let justified_block = self @@ -332,7 +327,7 @@ where .map_err(Error::FailedToReadState)? .ok_or_else(|| Error::MissingState(justified_block.state_root()))?; - self.justified_balances = get_effective_balances(&state); + self.justified_balances = JustifiedBalances::from_justified_state(&state)?; } Ok(()) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 54ae834f9a9..aaf729c0252 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -24,6 +24,7 @@ use futures::channel::mpsc::Sender; use kzg::Kzg; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; +use proto_array::ReOrgThreshold; use slasher::Slasher; use slog::{crit, error, info, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; @@ -34,8 +35,8 @@ use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; use types::{ - BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes, - Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256, + PublicKeyBytes, Signature, SignedBeaconBlock, Slot, }; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing @@ -164,6 +165,21 @@ where self } + /// Sets the proposer re-org threshold. + pub fn proposer_re_org_threshold(mut self, threshold: Option) -> Self { + self.chain_config.re_org_threshold = threshold; + self + } + + /// Sets the proposer re-org max epochs since finalization. + pub fn proposer_re_org_max_epochs_since_finalization( + mut self, + epochs_since_finalization: Epoch, + ) -> Self { + self.chain_config.re_org_max_epochs_since_finalization = epochs_since_finalization; + self + } + /// Sets the store (database). /// /// Should generally be called early in the build chain. @@ -363,7 +379,8 @@ where let (genesis, updated_builder) = self.set_genesis_state(beacon_state)?; self = updated_builder; - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis); + let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis) + .map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?; let current_slot = None; let fork_choice = ForkChoice::from_anchor( @@ -481,7 +498,8 @@ where beacon_state: weak_subj_state, }; - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot); + let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot) + .map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?; let current_slot = Some(snapshot.beacon_block.slot()); let fork_choice = ForkChoice::from_anchor( diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 1aa8d8715fd..19eddf60263 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -34,7 +34,8 @@ use crate::persisted_fork_choice::PersistedForkChoice; use crate::{ beacon_chain::{ - BeaconForkChoice, BeaconStore, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, FORK_CHOICE_DB_KEY, + BeaconForkChoice, BeaconStore, OverrideForkchoiceUpdate, + BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, FORK_CHOICE_DB_KEY, }, block_times_cache::BlockTimesCache, events::ServerSentEventHandler, @@ -114,6 +115,11 @@ impl CachedHead { self.snapshot.beacon_block_root } + /// Returns the root of the parent of the head block. + pub fn parent_block_root(&self) -> Hash256 { + self.snapshot.beacon_block.parent_root() + } + /// Returns root of the `BeaconState` at the head of the beacon chain. /// /// ## Note @@ -146,6 +152,21 @@ impl CachedHead { Ok(root) } + /// Returns the randao mix for the parent of the block at the head of the chain. + /// + /// This is useful for re-orging the current head. The parent's RANDAO value is read from + /// the head's execution payload because it is unavailable in the beacon state's RANDAO mixes + /// array after being overwritten by the head block's RANDAO mix. + /// + /// This will error if the head block is not execution-enabled (post Bellatrix). + pub fn parent_random(&self) -> Result { + self.snapshot + .beacon_block + .message() + .execution_payload() + .map(|payload| payload.prev_randao()) + } + /// Returns the active validator count for the current epoch of the head state. /// /// Should only return `None` if the caches have not been built on the head state (this should @@ -765,6 +786,7 @@ impl BeaconChain { new_cached_head: &CachedHead, new_head_proto_block: ProtoBlock, ) -> Result<(), Error> { + let _timer = metrics::start_timer(&metrics::FORK_CHOICE_AFTER_NEW_HEAD_TIMES); let old_snapshot = &old_cached_head.snapshot; let new_snapshot = &new_cached_head.snapshot; let new_head_is_optimistic = new_head_proto_block @@ -902,6 +924,7 @@ impl BeaconChain { new_view: ForkChoiceView, finalized_proto_block: ProtoBlock, ) -> Result<(), Error> { + let _timer = metrics::start_timer(&metrics::FORK_CHOICE_AFTER_FINALIZATION_TIMES); let new_snapshot = &new_cached_head.snapshot; let finalized_block_is_optimistic = finalized_proto_block .execution_status @@ -1128,7 +1151,11 @@ fn spawn_execution_layer_updates( } if let Err(e) = chain - .update_execution_engine_forkchoice(current_slot, forkchoice_update_params) + .update_execution_engine_forkchoice( + current_slot, + forkchoice_update_params, + OverrideForkchoiceUpdate::Yes, + ) .await { crit!( diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 286cc17a963..c4c6966732d 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -1,9 +1,18 @@ -pub use proto_array::CountUnrealizedFull; +pub use proto_array::{CountUnrealizedFull, ReOrgThreshold}; use serde_derive::{Deserialize, Serialize}; -use types::Checkpoint; +use std::time::Duration; +use types::{Checkpoint, Epoch}; +pub const DEFAULT_RE_ORG_THRESHOLD: ReOrgThreshold = ReOrgThreshold(20); +pub const DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION: Epoch = Epoch::new(2); pub const DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT: u64 = 250; +/// Default fraction of a slot lookahead for payload preparation (12/3 = 4 seconds on mainnet). +pub const DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR: u32 = 3; + +/// Fraction of a slot lookahead for fork choice in the state advance timer (500ms on mainnet). +pub const FORK_CHOICE_LOOKAHEAD_FACTOR: u32 = 24; + #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] pub struct ChainConfig { /// Maximum number of slots to skip when importing a consensus message (e.g., block, @@ -21,6 +30,10 @@ pub struct ChainConfig { pub enable_lock_timeouts: bool, /// The max size of a message that can be sent over the network. pub max_network_size: usize, + /// Maximum percentage of committee weight at which to attempt re-orging the canonical head. + pub re_org_threshold: Option, + /// Maximum number of epochs since finalization for attempting a proposer re-org. + pub re_org_max_epochs_since_finalization: Epoch, /// Number of milliseconds to wait for fork choice before proposing a block. /// /// If set to 0 then block proposal will not wait for fork choice at all. @@ -47,6 +60,11 @@ pub struct ChainConfig { pub count_unrealized_full: CountUnrealizedFull, /// Optionally set timeout for calls to checkpoint sync endpoint. pub checkpoint_sync_url_timeout: u64, + /// The offset before the start of a proposal slot at which payload attributes should be sent. + /// + /// Low values are useful for execution engines which don't improve their payload after the + /// first call, and high values are useful for ensuring the EL is given ample notice. + pub prepare_payload_lookahead: Duration, } impl Default for ChainConfig { @@ -57,6 +75,8 @@ impl Default for ChainConfig { reconstruct_historic_states: false, enable_lock_timeouts: true, max_network_size: 10 * 1_048_576, // 10M + re_org_threshold: Some(DEFAULT_RE_ORG_THRESHOLD), + re_org_max_epochs_since_finalization: DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, fork_choice_before_proposal_timeout_ms: DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT, // Builder fallback configs that are set in `clap` will override these. builder_fallback_skips: 3, @@ -68,6 +88,7 @@ impl Default for ChainConfig { paranoid_block_proposal: false, count_unrealized_full: CountUnrealizedFull::default(), checkpoint_sync_url_timeout: 60, + prepare_payload_lookahead: Duration::from_secs(4), } } } diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index 3d48dfd8f60..6d5b5ddc4ae 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -147,7 +147,8 @@ pub fn reset_fork_choice_to_finalization, Cold: It beacon_state: finalized_state, }; - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), &finalized_snapshot); + let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), &finalized_snapshot) + .map_err(|e| format!("Unable to reset fork choice store for revert: {e:?}"))?; let mut fork_choice = ForkChoice::from_anchor( fc_store, diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 34307b25a6c..1c5017c4708 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -49,8 +49,8 @@ pub mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - CountUnrealized, ForkChoiceError, ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, - INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, + CountUnrealized, ForkChoiceError, OverrideForkchoiceUpdate, ProduceBlockVerification, + StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 8a0c61c5734..315f869514b 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -77,6 +77,11 @@ lazy_static! { "beacon_block_processing_attestation_observation_seconds", "Time spent hashing and remembering all the attestations in the block" ); + pub static ref BLOCK_PROCESSING_FORK_CHOICE: Result = try_create_histogram_with_buckets( + "beacon_block_processing_fork_choice_seconds", + "Time spent running fork choice's `get_head` during block import", + exponential_buckets(1e-3, 2.0, 8) + ); pub static ref BLOCK_SYNC_AGGREGATE_SET_BITS: Result = try_create_int_gauge( "block_sync_aggregate_set_bits", "The number of true bits in the last sync aggregate in a block" @@ -99,6 +104,11 @@ lazy_static! { "beacon_block_production_fork_choice_seconds", "Time taken to run fork choice before block production" ); + pub static ref BLOCK_PRODUCTION_GET_PROPOSER_HEAD_TIMES: Result = try_create_histogram_with_buckets( + "beacon_block_production_get_proposer_head_times", + "Time taken for fork choice to compute the proposer head before block production", + exponential_buckets(1e-3, 2.0, 8) + ); pub static ref BLOCK_PRODUCTION_STATE_LOAD_TIMES: Result = try_create_histogram( "beacon_block_production_state_load_seconds", "Time taken to load the base state for block production" @@ -322,10 +332,26 @@ lazy_static! { "beacon_reorgs_total", "Count of occasions fork choice has switched to a different chain" ); - pub static ref FORK_CHOICE_TIMES: Result = - try_create_histogram("beacon_fork_choice_seconds", "Full runtime of fork choice"); - pub static ref FORK_CHOICE_FIND_HEAD_TIMES: Result = - try_create_histogram("beacon_fork_choice_find_head_seconds", "Full runtime of fork choice find_head function"); + pub static ref FORK_CHOICE_TIMES: Result = try_create_histogram_with_buckets( + "beacon_fork_choice_seconds", + "Full runtime of fork choice", + linear_buckets(10e-3, 20e-3, 10) + ); + pub static ref FORK_CHOICE_OVERRIDE_FCU_TIMES: Result = try_create_histogram_with_buckets( + "beacon_fork_choice_override_fcu_seconds", + "Time taken to compute the optional forkchoiceUpdated override", + exponential_buckets(1e-3, 2.0, 8) + ); + pub static ref FORK_CHOICE_AFTER_NEW_HEAD_TIMES: Result = try_create_histogram_with_buckets( + "beacon_fork_choice_after_new_head_seconds", + "Time taken to run `after_new_head`", + exponential_buckets(1e-3, 2.0, 10) + ); + pub static ref FORK_CHOICE_AFTER_FINALIZATION_TIMES: Result = try_create_histogram_with_buckets( + "beacon_fork_choice_after_finalization_seconds", + "Time taken to run `after_finalization`", + exponential_buckets(1e-3, 2.0, 10) + ); pub static ref FORK_CHOICE_PROCESS_BLOCK_TIMES: Result = try_create_histogram( "beacon_fork_choice_process_block_seconds", "Time taken to add a block and all attestations to fork choice" diff --git a/beacon_node/beacon_chain/src/proposer_prep_service.rs b/beacon_node/beacon_chain/src/proposer_prep_service.rs index 9cd177b3409..140a9659fce 100644 --- a/beacon_node/beacon_chain/src/proposer_prep_service.rs +++ b/beacon_node/beacon_chain/src/proposer_prep_service.rs @@ -5,13 +5,9 @@ use std::sync::Arc; use task_executor::TaskExecutor; use tokio::time::sleep; -/// At 12s slot times, the means that the payload preparation routine will run 4s before the start -/// of each slot (`12 / 3 = 4`). -pub const PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR: u32 = 3; - /// Spawns a routine which ensures the EL is provided advance notice of any block producers. /// -/// This routine will run once per slot, at `slot_duration / PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR` +/// This routine will run once per slot, at `chain.prepare_payload_lookahead()` /// before the start of each slot. /// /// The service will not be started if there is no `execution_layer` on the `chain`. @@ -38,8 +34,8 @@ async fn proposer_prep_service( loop { match chain.slot_clock.duration_to_next_slot() { Some(duration) => { - let additional_delay = slot_duration - - chain.slot_clock.slot_duration() / PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR; + let additional_delay = + slot_duration.saturating_sub(chain.config.prepare_payload_lookahead); sleep(duration + additional_delay).await; debug!( @@ -65,14 +61,11 @@ async fn proposer_prep_service( }, "proposer_prep_update", ); - - continue; } None => { error!(chain.log, "Failed to read slot clock"); // If we can't read the slot clock, just wait another slot. sleep(slot_duration).await; - continue; } }; } diff --git a/beacon_node/beacon_chain/src/snapshot_cache.rs b/beacon_node/beacon_chain/src/snapshot_cache.rs index 40b73451cb0..d2846c08569 100644 --- a/beacon_node/beacon_chain/src/snapshot_cache.rs +++ b/beacon_node/beacon_chain/src/snapshot_cache.rs @@ -13,7 +13,10 @@ pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4; /// The minimum block delay to clone the state in the cache instead of removing it. /// This helps keep block processing fast during re-orgs from late blocks. -const MINIMUM_BLOCK_DELAY_FOR_CLONE: Duration = Duration::from_secs(6); +fn minimum_block_delay_for_clone(seconds_per_slot: u64) -> Duration { + // If the block arrived at the attestation deadline or later, it might get re-orged. + Duration::from_secs(seconds_per_slot) / 3 +} /// This snapshot is to be used for verifying a child of `self.beacon_block`. #[derive(Debug)] @@ -256,7 +259,7 @@ impl SnapshotCache { if let Some(cache) = self.snapshots.get(i) { // Avoid cloning the block during sync (when the `block_delay` is `None`). if let Some(delay) = block_delay { - if delay >= MINIMUM_BLOCK_DELAY_FOR_CLONE + if delay >= minimum_block_delay_for_clone(spec.seconds_per_slot) && delay <= Duration::from_secs(spec.seconds_per_slot) * 4 || block_slot > cache.beacon_block.slot() + 1 { diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 72fc973e546..f73223fa540 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -16,6 +16,7 @@ use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::{ beacon_chain::{ATTESTATION_CACHE_LOCK_TIMEOUT, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT}, + chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, snapshot_cache::StateAdvance, BeaconChain, BeaconChainError, BeaconChainTypes, }; @@ -133,7 +134,7 @@ async fn state_advance_timer( // Run fork choice 23/24s of the way through the slot (11.5s on mainnet). // We need to run after the state advance, so use the same condition as above. - let fork_choice_offset = slot_duration / 24; + let fork_choice_offset = slot_duration / FORK_CHOICE_LOOKAHEAD_FACTOR; let fork_choice_instant = if duration_to_next_slot > state_advance_offset { Instant::now() + duration_to_next_slot - fork_choice_offset } else { @@ -224,8 +225,20 @@ async fn state_advance_timer( return; } + // Re-compute the head, dequeuing attestations for the current slot early. beacon_chain.recompute_head_at_slot(next_slot).await; + // Prepare proposers so that the node can send payload attributes in the case where + // it decides to abandon a proposer boost re-org. + if let Err(e) = beacon_chain.prepare_beacon_proposer(current_slot).await { + warn!( + log, + "Unable to prepare proposer with lookahead"; + "error" => ?e, + "slot" => next_slot, + ); + } + // Use a blocking task to avoid blocking the core executor whilst waiting for locks // in `ForkChoiceSignalTx`. beacon_chain.task_executor.clone().spawn_blocking( diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index b88966b41a9..d6e8787f4e0 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -32,7 +32,7 @@ use rand::SeedableRng; use rayon::prelude::*; use sensitive_url::SensitiveUrl; use slog::Logger; -use slot_clock::TestingSlotClock; +use slot_clock::{SlotClock, TestingSlotClock}; use state_processing::per_block_processing::compute_timestamp_at_slot; use state_processing::{ state_advance::{complete_state_advance, partial_state_advance}, @@ -319,6 +319,12 @@ where self } + pub fn logger(mut self, log: Logger) -> Self { + self.log = log.clone(); + self.runtime.set_logger(log); + self + } + /// This mutator will be run before the `store_mutator`. pub fn initial_mutator(mut self, mutator: BoxedMutator) -> Self { assert!( @@ -524,10 +530,9 @@ pub struct BeaconChainHarness { pub rng: Mutex, } -pub type HarnessAttestations = Vec<( - Vec<(Attestation, SubnetId)>, - Option>, -)>; +pub type CommitteeAttestations = Vec<(Attestation, SubnetId)>; +pub type HarnessAttestations = + Vec<(CommitteeAttestations, Option>)>; pub type HarnessSyncContributions = Vec<( Vec<(SyncCommitteeMessage, usize)>, @@ -778,6 +783,21 @@ where sk.sign(message) } + /// Sign a beacon block using the proposer's key. + pub fn sign_beacon_block( + &self, + block: BeaconBlock, + state: &BeaconState, + ) -> SignedBeaconBlock { + let proposer_index = block.proposer_index() as usize; + block.sign( + &self.validator_keypairs[proposer_index].sk, + &state.fork(), + state.genesis_validators_root(), + &self.spec, + ) + } + /// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to /// `beacon_block_root`. The provided `state` should match the `block.state_root` for the /// `block` identified by `beacon_block_root`. @@ -851,13 +871,35 @@ where state_root: Hash256, head_block_root: SignedBeaconBlockHash, attestation_slot: Slot, - ) -> Vec, SubnetId)>> { + ) -> Vec> { + self.make_unaggregated_attestations_with_limit( + attesting_validators, + state, + state_root, + head_block_root, + attestation_slot, + None, + ) + .0 + } + + pub fn make_unaggregated_attestations_with_limit( + &self, + attesting_validators: &[usize], + state: &BeaconState, + state_root: Hash256, + head_block_root: SignedBeaconBlockHash, + attestation_slot: Slot, + limit: Option, + ) -> (Vec>, Vec) { let committee_count = state.get_committee_count_at_slot(state.slot()).unwrap(); let fork = self .spec .fork_at_epoch(attestation_slot.epoch(E::slots_per_epoch())); - state + let attesters = Mutex::new(vec![]); + + let attestations = state .get_beacon_committees_at_slot(attestation_slot) .expect("should get committees") .iter() @@ -869,6 +911,15 @@ where if !attesting_validators.contains(validator_index) { return None; } + + let mut attesters = attesters.lock(); + if let Some(limit) = limit { + if attesters.len() >= limit { + return None; + } + } + attesters.push(*validator_index); + let mut attestation = self .produce_unaggregated_attestation_for_block( attestation_slot, @@ -909,9 +960,19 @@ where Some((attestation, subnet_id)) }) - .collect() + .collect::>() }) - .collect() + .collect::>(); + + let attesters = attesters.into_inner(); + if let Some(limit) = limit { + assert_eq!( + limit, + attesters.len(), + "failed to generate `limit` attestations" + ); + } + (attestations, attesters) } /// A list of sync messages for the given state. @@ -1004,13 +1065,38 @@ where block_hash: SignedBeaconBlockHash, slot: Slot, ) -> HarnessAttestations { - let unaggregated_attestations = self.make_unaggregated_attestations( + self.make_attestations_with_limit( attesting_validators, state, state_root, block_hash, slot, - ); + None, + ) + .0 + } + + /// Produce exactly `limit` attestations. + /// + /// Return attestations and vec of validator indices that attested. + pub fn make_attestations_with_limit( + &self, + attesting_validators: &[usize], + state: &BeaconState, + state_root: Hash256, + block_hash: SignedBeaconBlockHash, + slot: Slot, + limit: Option, + ) -> (HarnessAttestations, Vec) { + let (unaggregated_attestations, attesters) = self + .make_unaggregated_attestations_with_limit( + attesting_validators, + state, + state_root, + block_hash, + slot, + limit, + ); let fork = self.spec.fork_at_epoch(slot.epoch(E::slots_per_epoch())); let aggregated_attestations: Vec>> = @@ -1029,7 +1115,7 @@ where .committee .iter() .find(|&validator_index| { - if !attesting_validators.contains(validator_index) { + if !attesters.contains(validator_index) { return false; } @@ -1080,10 +1166,13 @@ where }) .collect(); - unaggregated_attestations - .into_iter() - .zip(aggregated_attestations) - .collect() + ( + unaggregated_attestations + .into_iter() + .zip(aggregated_attestations) + .collect(), + attesters, + ) } pub fn make_sync_contributions( @@ -1736,6 +1825,12 @@ where self.chain.slot_clock.advance_slot(); } + /// Advance the clock to `lookahead` before the start of `slot`. + pub fn advance_to_slot_lookahead(&self, slot: Slot, lookahead: Duration) { + let time = self.chain.slot_clock.start_of(slot).unwrap() - lookahead; + self.chain.slot_clock.set_current_time(time); + } + /// Deprecated: Use make_block() instead /// /// Returns a newly created block, signed by the proposer for the given slot. diff --git a/beacon_node/beacon_chain/tests/merge.rs b/beacon_node/beacon_chain/tests/merge.rs index ea4e7cb8816..80ffc57be11 100644 --- a/beacon_node/beacon_chain/tests/merge.rs +++ b/beacon_node/beacon_chain/tests/merge.rs @@ -53,6 +53,7 @@ async fn merge_with_terminal_block_hash_override() { let harness = BeaconChainHarness::builder(E::default()) .spec(spec) + .logger(logging::test_logger()) .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() @@ -109,6 +110,7 @@ async fn base_altair_merge_with_terminal_block_after_fork() { let harness = BeaconChainHarness::builder(E::default()) .spec(spec) + .logger(logging::test_logger()) .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 35065a44225..2d8427e30e0 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -7,8 +7,9 @@ use beacon_chain::otb_verification_service::{ use beacon_chain::{ canonical_head::{CachedHead, CanonicalHead}, test_utils::{BeaconChainHarness, EphemeralHarnessType}, - BeaconChainError, BlockError, ExecutionPayloadError, NotifyExecutionLayer, StateSkipConfig, - WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, + BeaconChainError, BlockError, ExecutionPayloadError, NotifyExecutionLayer, + OverrideForkchoiceUpdate, StateSkipConfig, WhenSlotSkipped, + INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; use execution_layer::{ @@ -19,6 +20,7 @@ use execution_layer::{ use fork_choice::{ CountUnrealized, Error as ForkChoiceError, InvalidationOperation, PayloadVerificationStatus, }; +use logging::test_logger; use proto_array::{Error as ProtoArrayError, ExecutionStatus}; use slot_clock::SlotClock; use std::collections::HashMap; @@ -59,6 +61,7 @@ impl InvalidPayloadRig { let harness = BeaconChainHarness::builder(MainnetEthSpec) .spec(spec) + .logger(test_logger()) .deterministic_keypairs(VALIDATOR_COUNT) .mock_execution_layer() .fresh_ephemeral_store() @@ -386,7 +389,7 @@ impl InvalidPayloadRig { .fork_choice_write_lock() .get_head(self.harness.chain.slot().unwrap(), &self.harness.chain.spec) { - Err(ForkChoiceError::ProtoArrayError(e)) if e.contains(s) => (), + Err(ForkChoiceError::ProtoArrayStringError(e)) if e.contains(s) => (), other => panic!("expected {} error, got {:?}", s, other), }; } @@ -981,6 +984,10 @@ async fn payload_preparation() { ) .await; + rig.harness.advance_to_slot_lookahead( + next_slot, + rig.harness.chain.config.prepare_payload_lookahead, + ); rig.harness .chain .prepare_beacon_proposer(rig.harness.chain.slot().unwrap()) @@ -1059,7 +1066,7 @@ async fn invalid_parent() { &rig.harness.chain.spec, CountUnrealized::True, ), - Err(ForkChoiceError::ProtoArrayError(message)) + Err(ForkChoiceError::ProtoArrayStringError(message)) if message.contains(&format!( "{:?}", ProtoArrayError::ParentExecutionStatusIsInvalid { @@ -1126,7 +1133,11 @@ async fn payload_preparation_before_transition_block() { .get_forkchoice_update_parameters(); rig.harness .chain - .update_execution_engine_forkchoice(current_slot, forkchoice_update_params) + .update_execution_engine_forkchoice( + current_slot, + forkchoice_update_params, + OverrideForkchoiceUpdate::Yes, + ) .await .unwrap(); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index edd2815f847..d82ef32781e 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -775,7 +775,11 @@ where runtime_context.executor.spawn( async move { let result = inner_chain - .update_execution_engine_forkchoice(current_slot, params) + .update_execution_engine_forkchoice( + current_slot, + params, + Default::default(), + ) .await; // No need to exit early if setting the head fails. It will be set again if/when the diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 518c0355afd..9a83ba149c0 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -573,6 +573,16 @@ impl ExecutionLayer { .contains_key(&proposer_index) } + /// Check if a proposer is registered as a local validator, *from a synchronous context*. + /// + /// This method MUST NOT be called from an async task. + pub fn has_proposer_preparation_data_blocking(&self, proposer_index: u64) -> bool { + self.inner + .proposer_preparation_data + .blocking_lock() + .contains_key(&proposer_index) + } + /// Returns the fee-recipient address that should be used to build a block pub async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address { if let Some(preparation_data_entry) = @@ -1113,6 +1123,8 @@ impl ExecutionLayer { "finalized_block_hash" => ?finalized_block_hash, "justified_block_hash" => ?justified_block_hash, "head_block_hash" => ?head_block_hash, + "head_block_root" => ?head_block_root, + "current_slot" => current_slot, ); let next_slot = current_slot + 1; @@ -1614,6 +1626,20 @@ mod test { .await; } + #[tokio::test] + async fn test_forked_terminal_block() { + let runtime = TestRuntime::default(); + let (mock, block_hash) = MockExecutionLayer::default_params(runtime.task_executor.clone()) + .move_to_terminal_block() + .produce_forked_pow_block(); + assert!(mock + .el + .is_valid_terminal_pow_block_hash(block_hash, &mock.spec) + .await + .unwrap() + .unwrap()); + } + #[tokio::test] async fn finds_valid_terminal_block_hash() { let runtime = TestRuntime::default(); diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index a7ec429e456..b7206cbf878 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -92,13 +92,15 @@ pub struct PoWBlock { pub timestamp: u64, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct ExecutionBlockGenerator { /* * Common database */ + head_block: Option>, + finalized_block_hash: Option, blocks: HashMap>, - block_hashes: HashMap, + block_hashes: HashMap>, /* * PoW block parameters */ @@ -120,6 +122,8 @@ impl ExecutionBlockGenerator { terminal_block_hash: ExecutionBlockHash, ) -> Self { let mut gen = Self { + head_block: <_>::default(), + finalized_block_hash: <_>::default(), blocks: <_>::default(), block_hashes: <_>::default(), terminal_total_difficulty, @@ -136,13 +140,7 @@ impl ExecutionBlockGenerator { } pub fn latest_block(&self) -> Option> { - let hash = *self - .block_hashes - .iter() - .max_by_key(|(number, _)| *number) - .map(|(_, hash)| hash)?; - - self.block_by_hash(hash) + self.head_block.clone() } pub fn latest_execution_block(&self) -> Option { @@ -151,8 +149,18 @@ impl ExecutionBlockGenerator { } pub fn block_by_number(&self, number: u64) -> Option> { - let hash = *self.block_hashes.get(&number)?; - self.block_by_hash(hash) + // Get the latest canonical head block + let mut latest_block = self.latest_block()?; + loop { + let block_number = latest_block.block_number(); + if block_number < number { + return None; + } + if block_number == number { + return Some(latest_block); + } + latest_block = self.block_by_hash(latest_block.parent_hash())?; + } } pub fn execution_block_by_number(&self, number: u64) -> Option { @@ -213,10 +221,16 @@ impl ExecutionBlockGenerator { } pub fn insert_pow_block(&mut self, block_number: u64) -> Result<(), String> { + if let Some(finalized_block_hash) = self.finalized_block_hash { + return Err(format!( + "terminal block {} has been finalized. PoW chain has stopped building", + finalized_block_hash + )); + } let parent_hash = if block_number == 0 { ExecutionBlockHash::zero() - } else if let Some(hash) = self.block_hashes.get(&(block_number - 1)) { - *hash + } else if let Some(block) = self.block_by_number(block_number - 1) { + block.block_hash() } else { return Err(format!( "parent with block number {} not found", @@ -231,42 +245,102 @@ impl ExecutionBlockGenerator { parent_hash, )?; - self.insert_block(Block::PoW(block)) + // Insert block into block tree + self.insert_block(Block::PoW(block))?; + + // Set head + if let Some(head_total_difficulty) = + self.head_block.as_ref().and_then(|b| b.total_difficulty()) + { + if block.total_difficulty >= head_total_difficulty { + self.head_block = Some(Block::PoW(block)); + } + } else { + self.head_block = Some(Block::PoW(block)); + } + Ok(()) } - pub fn insert_block(&mut self, block: Block) -> Result<(), String> { + /// Insert a PoW block given the parent hash. + /// + /// Returns `Ok(hash)` of the inserted block. + /// Returns an error if the `parent_hash` does not exist in the block tree or + /// if the parent block is the terminal block. + pub fn insert_pow_block_by_hash( + &mut self, + parent_hash: ExecutionBlockHash, + unique_id: u64, + ) -> Result { + let parent_block = self.block_by_hash(parent_hash).ok_or_else(|| { + format!( + "Block corresponding to parent hash does not exist: {}", + parent_hash + ) + })?; + + let mut block = generate_pow_block( + self.terminal_total_difficulty, + self.terminal_block_number, + parent_block.block_number() + 1, + parent_hash, + )?; + + // Hack the block hash to make this block distinct from any other block with a different + // `unique_id` (the default is 0). + block.block_hash = ExecutionBlockHash::from_root(Hash256::from_low_u64_be(unique_id)); + block.block_hash = ExecutionBlockHash::from_root(block.tree_hash_root()); + + let hash = self.insert_block(Block::PoW(block))?; + + // Set head + if let Some(head_total_difficulty) = + self.head_block.as_ref().and_then(|b| b.total_difficulty()) + { + if block.total_difficulty >= head_total_difficulty { + self.head_block = Some(Block::PoW(block)); + } + } else { + self.head_block = Some(Block::PoW(block)); + } + Ok(hash) + } + + pub fn insert_block(&mut self, block: Block) -> Result { if self.blocks.contains_key(&block.block_hash()) { return Err(format!("{:?} is already known", block.block_hash())); - } else if self.block_hashes.contains_key(&block.block_number()) { - return Err(format!( - "block {} is already known, forking is not supported", - block.block_number() - )); - } else if block.block_number() != 0 && !self.blocks.contains_key(&block.parent_hash()) { + } else if block.parent_hash() != ExecutionBlockHash::zero() + && !self.blocks.contains_key(&block.parent_hash()) + { return Err(format!("parent block {:?} is unknown", block.parent_hash())); } - self.insert_block_without_checks(block) + Ok(self.insert_block_without_checks(block)) } - pub fn insert_block_without_checks(&mut self, block: Block) -> Result<(), String> { + pub fn insert_block_without_checks(&mut self, block: Block) -> ExecutionBlockHash { + let block_hash = block.block_hash(); self.block_hashes - .insert(block.block_number(), block.block_hash()); - self.blocks.insert(block.block_hash(), block); + .entry(block.block_number()) + .or_insert_with(Vec::new) + .push(block_hash); + self.blocks.insert(block_hash, block); - Ok(()) + block_hash } pub fn modify_last_block(&mut self, block_modifier: impl FnOnce(&mut Block)) { - if let Some((last_block_hash, block_number)) = - self.block_hashes.keys().max().and_then(|block_number| { - self.block_hashes - .get(block_number) - .map(|block| (block, *block_number)) + if let Some(last_block_hash) = self + .block_hashes + .iter_mut() + .max_by_key(|(block_number, _)| *block_number) + .and_then(|(_, block_hashes)| { + // Remove block hash, we will re-insert with the new block hash after modifying it. + block_hashes.pop() }) { - let mut block = self.blocks.remove(last_block_hash).unwrap(); + let mut block = self.blocks.remove(&last_block_hash).unwrap(); block_modifier(&mut block); + // Update the block hash after modifying the block match &mut block { Block::PoW(b) => b.block_hash = ExecutionBlockHash::from_root(b.tree_hash_root()), @@ -274,8 +348,17 @@ impl ExecutionBlockGenerator { *b.block_hash_mut() = ExecutionBlockHash::from_root(b.tree_hash_root()) } } - self.block_hashes.insert(block_number, block.block_hash()); - self.blocks.insert(block.block_hash(), block); + + // Update head. + if self + .head_block + .as_ref() + .map_or(true, |head| head.block_hash() == last_block_hash) + { + self.head_block = Some(block.clone()); + } + + self.insert_block_without_checks(block); } } @@ -415,6 +498,17 @@ impl ExecutionBlockGenerator { } }; + self.head_block = Some( + self.blocks + .get(&forkchoice_state.head_block_hash) + .unwrap() + .clone(), + ); + + if forkchoice_state.finalized_block_hash != ExecutionBlockHash::zero() { + self.finalized_block_hash = Some(forkchoice_state.finalized_block_hash); + } + Ok(JsonForkchoiceUpdatedV1Response { payload_status: JsonPayloadStatusV1 { status: JsonPayloadStatusV1Status::Valid, diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index bd02cc2f872..9c3b32b1046 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -124,6 +124,14 @@ pub async fn handle_rpc( let forkchoice_state: JsonForkchoiceStateV1 = get_param(params, 0)?; let payload_attributes: Option = get_param(params, 1)?; + if let Some(hook_response) = ctx + .hook + .lock() + .on_forkchoice_updated(forkchoice_state.clone(), payload_attributes.clone()) + { + return Ok(serde_json::to_value(hook_response).unwrap()); + } + let head_block_hash = forkchoice_state.head_block_hash; // Canned responses set by block hash take priority. diff --git a/beacon_node/execution_layer/src/test_utils/hook.rs b/beacon_node/execution_layer/src/test_utils/hook.rs new file mode 100644 index 00000000000..a3748103e3e --- /dev/null +++ b/beacon_node/execution_layer/src/test_utils/hook.rs @@ -0,0 +1,27 @@ +use crate::json_structures::*; + +type ForkChoiceUpdatedHook = dyn Fn( + JsonForkChoiceStateV1, + Option, + ) -> Option + + Send + + Sync; + +#[derive(Default)] +pub struct Hook { + forkchoice_updated: Option>, +} + +impl Hook { + pub fn on_forkchoice_updated( + &self, + state: JsonForkChoiceStateV1, + payload_attributes: Option, + ) -> Option { + (self.forkchoice_updated.as_ref()?)(state, payload_attributes) + } + + pub fn set_forkchoice_updated_hook(&mut self, f: Box) { + self.forkchoice_updated = Some(f); + } +} diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index ddddc2c9cea..f0f84491258 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -244,6 +244,21 @@ impl MockExecutionLayer { self } + pub fn produce_forked_pow_block(self) -> (Self, ExecutionBlockHash) { + let head_block = self + .server + .execution_block_generator() + .latest_block() + .unwrap(); + + let block_hash = self + .server + .execution_block_generator() + .insert_pow_block_by_hash(head_block.parent_hash(), 1) + .unwrap(); + (self, block_hash) + } + pub async fn with_terminal_block<'a, U, V>(self, func: U) -> Self where U: Fn(ChainSpec, ExecutionLayer, Option) -> V, diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index f5066879a78..f18ecbe6226 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -23,6 +23,7 @@ use types::{EthSpec, ExecutionBlockHash, Uint256}; use warp::{http::StatusCode, Filter, Rejection}; pub use execution_block_generator::{generate_pow_block, Block, ExecutionBlockGenerator}; +pub use hook::Hook; pub use mock_builder::{Context as MockBuilderContext, MockBuilder, Operation, TestingBuilder}; pub use mock_execution_layer::MockExecutionLayer; @@ -33,6 +34,7 @@ pub const DEFAULT_BUILDER_THRESHOLD_WEI: u128 = 1_000_000_000_000_000_000; mod execution_block_generator; mod handle_rpc; +mod hook; mod mock_builder; mod mock_execution_layer; @@ -99,6 +101,7 @@ impl MockServer { static_new_payload_response: <_>::default(), static_forkchoice_updated_response: <_>::default(), static_get_block_by_hash_response: <_>::default(), + hook: <_>::default(), new_payload_statuses: <_>::default(), fcu_payload_statuses: <_>::default(), _phantom: PhantomData, @@ -359,8 +362,7 @@ impl MockServer { .write() // The EF tests supply blocks out of order, so we must import them "without checks" and // trust they form valid chains. - .insert_block_without_checks(block) - .unwrap() + .insert_block_without_checks(block); } pub fn get_block(&self, block_hash: ExecutionBlockHash) -> Option> { @@ -441,6 +443,7 @@ pub struct Context { pub static_new_payload_response: Arc>>, pub static_forkchoice_updated_response: Arc>>, pub static_get_block_by_hash_response: Arc>>>, + pub hook: Arc>, // Canned responses by block hash. // diff --git a/beacon_node/http_api/tests/common.rs b/beacon_node/http_api/tests/common.rs index ec1448df7b3..9d6ad4050bf 100644 --- a/beacon_node/http_api/tests/common.rs +++ b/beacon_node/http_api/tests/common.rs @@ -1,5 +1,5 @@ use beacon_chain::{ - test_utils::{BeaconChainHarness, EphemeralHarnessType}, + test_utils::{BeaconChainHarness, BoxedMutator, EphemeralHarnessType}, BeaconChain, BeaconChainTypes, }; use directory::DEFAULT_ROOT_DIR; @@ -12,6 +12,7 @@ use lighthouse_network::{ types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield, SyncState}, ConnectedPoint, Enr, NetworkGlobals, PeerId, PeerManager, }; +use logging::test_logger; use network::{NetworkReceivers, NetworkSenders}; use sensitive_url::SensitiveUrl; use slog::Logger; @@ -19,6 +20,7 @@ use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::Duration; +use store::MemoryStore; use tokio::sync::oneshot; use types::{ChainSpec, EthSpec}; @@ -47,13 +49,30 @@ pub struct ApiServer> { pub external_peer_id: PeerId, } +type Mutator = BoxedMutator, MemoryStore>; + impl InteractiveTester { pub async fn new(spec: Option, validator_count: usize) -> Self { - let harness = BeaconChainHarness::builder(E::default()) + Self::new_with_mutator(spec, validator_count, None).await + } + + pub async fn new_with_mutator( + spec: Option, + validator_count: usize, + mutator: Option>, + ) -> Self { + let mut harness_builder = BeaconChainHarness::builder(E::default()) .spec_or_default(spec) .deterministic_keypairs(validator_count) - .fresh_ephemeral_store() - .build(); + .logger(test_logger()) + .mock_execution_layer() + .fresh_ephemeral_store(); + + if let Some(mutator) = mutator { + harness_builder = harness_builder.initial_mutator(mutator); + } + + let harness = harness_builder.build(); let ApiServer { server, diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index b3227d7723a..17a3624afed 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -1,9 +1,22 @@ //! Generic tests that make use of the (newer) `InteractiveApiTester` use crate::common::*; -use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; +use beacon_chain::{ + chain_config::ReOrgThreshold, + test_utils::{AttestationStrategy, BlockStrategy}, +}; use eth2::types::DepositContractData; +use execution_layer::{ForkChoiceState, PayloadAttributes}; +use parking_lot::Mutex; +use slot_clock::SlotClock; +use state_processing::state_advance::complete_state_advance; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; use tree_hash::TreeHash; -use types::{EthSpec, FullPayload, MainnetEthSpec, Slot}; +use types::{ + Address, Epoch, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload, + MainnetEthSpec, ProposerPreparationData, Slot, +}; type E = MainnetEthSpec; @@ -33,6 +46,495 @@ async fn deposit_contract_custom_network() { assert_eq!(result, expected); } +/// Data structure for tracking fork choice updates received by the mock execution layer. +#[derive(Debug, Default)] +struct ForkChoiceUpdates { + updates: HashMap>, +} + +#[derive(Debug, Clone)] +struct ForkChoiceUpdateMetadata { + received_at: Duration, + state: ForkChoiceState, + payload_attributes: Option, +} + +impl ForkChoiceUpdates { + fn insert(&mut self, update: ForkChoiceUpdateMetadata) { + self.updates + .entry(update.state.head_block_hash) + .or_insert_with(Vec::new) + .push(update); + } + + fn contains_update_for(&self, block_hash: ExecutionBlockHash) -> bool { + self.updates.contains_key(&block_hash) + } + + /// Find the first fork choice update for `head_block_hash` with payload attributes for a + /// block proposal at `proposal_timestamp`. + fn first_update_with_payload_attributes( + &self, + head_block_hash: ExecutionBlockHash, + proposal_timestamp: u64, + ) -> Option { + self.updates + .get(&head_block_hash)? + .iter() + .find(|update| { + update + .payload_attributes + .as_ref() + .map_or(false, |payload_attributes| { + payload_attributes.timestamp == proposal_timestamp + }) + }) + .cloned() + } +} + +pub struct ReOrgTest { + head_slot: Slot, + /// Number of slots between parent block and canonical head. + parent_distance: u64, + /// Number of slots between head block and block proposal slot. + head_distance: u64, + re_org_threshold: u64, + max_epochs_since_finalization: u64, + percent_parent_votes: usize, + percent_empty_votes: usize, + percent_head_votes: usize, + should_re_org: bool, + misprediction: bool, +} + +impl Default for ReOrgTest { + /// Default config represents a regular easy re-org. + fn default() -> Self { + Self { + head_slot: Slot::new(30), + parent_distance: 1, + head_distance: 1, + re_org_threshold: 20, + max_epochs_since_finalization: 2, + percent_parent_votes: 100, + percent_empty_votes: 100, + percent_head_votes: 0, + should_re_org: true, + misprediction: false, + } + } +} + +// Test that the beacon node will try to perform proposer boost re-orgs on late blocks when +// configured. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_zero_weight() { + proposer_boost_re_org_test(ReOrgTest::default()).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_epoch_boundary() { + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(31), + should_re_org: false, + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_slot_after_epoch_boundary() { + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(33), + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_bad_ffg() { + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(64 + 22), + should_re_org: false, + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_no_finality() { + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(96), + percent_parent_votes: 100, + percent_empty_votes: 0, + percent_head_votes: 100, + should_re_org: false, + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_finality() { + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(129), + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_parent_distance() { + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(30), + parent_distance: 2, + should_re_org: false, + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_head_distance() { + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(29), + head_distance: 2, + should_re_org: false, + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_very_unhealthy() { + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(31), + parent_distance: 2, + head_distance: 2, + percent_parent_votes: 10, + percent_empty_votes: 10, + percent_head_votes: 10, + should_re_org: false, + ..Default::default() + }) + .await; +} + +/// The head block is late but still receives 30% of the committee vote, leading to a misprediction. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_weight_misprediction() { + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(30), + percent_empty_votes: 70, + percent_head_votes: 30, + should_re_org: false, + misprediction: true, + ..Default::default() + }) + .await; +} + +/// Run a proposer boost re-org test. +/// +/// - `head_slot`: the slot of the canonical head to be reorged +/// - `reorg_threshold`: committee percentage value for reorging +/// - `num_empty_votes`: percentage of comm of attestations for the parent block +/// - `num_head_votes`: number of attestations for the head block +/// - `should_re_org`: whether the proposer should build on the parent rather than the head +pub async fn proposer_boost_re_org_test( + ReOrgTest { + head_slot, + parent_distance, + head_distance, + re_org_threshold, + max_epochs_since_finalization, + percent_parent_votes, + percent_empty_votes, + percent_head_votes, + should_re_org, + misprediction, + }: ReOrgTest, +) { + assert!(head_slot > 0); + + // We require a network with execution enabled so we can check EL message timings. + let mut spec = ForkName::Merge.make_genesis_spec(E::default_spec()); + spec.terminal_total_difficulty = 1.into(); + + // Ensure there are enough validators to have `attesters_per_slot`. + let attesters_per_slot = 10; + let validator_count = E::slots_per_epoch() as usize * attesters_per_slot; + let all_validators = (0..validator_count).collect::>(); + let num_initial = head_slot.as_u64().checked_sub(parent_distance + 1).unwrap(); + + // Check that the required vote percentages can be satisfied exactly using `attesters_per_slot`. + assert_eq!(100 % attesters_per_slot, 0); + let percent_per_attester = 100 / attesters_per_slot; + assert_eq!(percent_parent_votes % percent_per_attester, 0); + assert_eq!(percent_empty_votes % percent_per_attester, 0); + assert_eq!(percent_head_votes % percent_per_attester, 0); + let num_parent_votes = Some(attesters_per_slot * percent_parent_votes / 100); + let num_empty_votes = Some(attesters_per_slot * percent_empty_votes / 100); + let num_head_votes = Some(attesters_per_slot * percent_head_votes / 100); + + let tester = InteractiveTester::::new_with_mutator( + Some(spec), + validator_count, + Some(Box::new(move |builder| { + builder + .proposer_re_org_threshold(Some(ReOrgThreshold(re_org_threshold))) + .proposer_re_org_max_epochs_since_finalization(Epoch::new( + max_epochs_since_finalization, + )) + })), + ) + .await; + let harness = &tester.harness; + let mock_el = harness.mock_execution_layer.as_ref().unwrap(); + let execution_ctx = mock_el.server.ctx.clone(); + let slot_clock = &harness.chain.slot_clock; + + // Move to terminal block. + mock_el.server.all_payloads_valid(); + execution_ctx + .execution_block_generator + .write() + .move_to_terminal_block() + .unwrap(); + + // Send proposer preparation data for all validators. + let proposer_preparation_data = all_validators + .iter() + .map(|i| ProposerPreparationData { + validator_index: *i as u64, + fee_recipient: Address::from_low_u64_be(*i as u64), + }) + .collect::>(); + harness + .chain + .execution_layer + .as_ref() + .unwrap() + .update_proposer_preparation( + head_slot.epoch(E::slots_per_epoch()) + 1, + &proposer_preparation_data, + ) + .await; + + // Create some chain depth. + harness.advance_slot(); + harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Start collecting fork choice updates. + let forkchoice_updates = Arc::new(Mutex::new(ForkChoiceUpdates::default())); + let forkchoice_updates_inner = forkchoice_updates.clone(); + let chain_inner = harness.chain.clone(); + + execution_ctx + .hook + .lock() + .set_forkchoice_updated_hook(Box::new(move |state, payload_attributes| { + let received_at = chain_inner.slot_clock.now_duration().unwrap(); + let state = ForkChoiceState::from(state); + let payload_attributes = payload_attributes.map(Into::into); + let update = ForkChoiceUpdateMetadata { + received_at, + state, + payload_attributes, + }; + forkchoice_updates_inner.lock().insert(update); + None + })); + + // We set up the following block graph, where B is a block that arrives late and is re-orged + // by C. + // + // A | B | - | + // ^ | - | C | + + let slot_a = Slot::new(num_initial + 1); + let slot_b = slot_a + parent_distance; + let slot_c = slot_b + head_distance; + + harness.advance_slot(); + let (block_a_root, block_a, state_a) = harness + .add_block_at_slot(slot_a, harness.get_current_state()) + .await + .unwrap(); + + // Attest to block A during slot A. + let (block_a_parent_votes, _) = harness.make_attestations_with_limit( + &all_validators, + &state_a, + state_a.canonical_root(), + block_a_root, + slot_a, + num_parent_votes, + ); + harness.process_attestations(block_a_parent_votes); + + // Attest to block A during slot B. + for _ in 0..parent_distance { + harness.advance_slot(); + } + let (block_a_empty_votes, block_a_attesters) = harness.make_attestations_with_limit( + &all_validators, + &state_a, + state_a.canonical_root(), + block_a_root, + slot_b, + num_empty_votes, + ); + harness.process_attestations(block_a_empty_votes); + + let remaining_attesters = all_validators + .iter() + .copied() + .filter(|index| !block_a_attesters.contains(index)) + .collect::>(); + + // Produce block B and process it halfway through the slot. + let (block_b, mut state_b) = harness.make_block(state_a.clone(), slot_b).await; + let block_b_root = block_b.canonical_root(); + + let obs_time = slot_clock.start_of(slot_b).unwrap() + slot_clock.slot_duration() / 2; + slot_clock.set_current_time(obs_time); + harness.chain.block_times_cache.write().set_time_observed( + block_b_root, + slot_b, + obs_time, + None, + None, + ); + harness.process_block_result(block_b.clone()).await.unwrap(); + + // Add attestations to block B. + let (block_b_head_votes, _) = harness.make_attestations_with_limit( + &remaining_attesters, + &state_b, + state_b.canonical_root(), + block_b_root.into(), + slot_b, + num_head_votes, + ); + harness.process_attestations(block_b_head_votes); + + let payload_lookahead = harness.chain.config.prepare_payload_lookahead; + let fork_choice_lookahead = Duration::from_millis(500); + while harness.get_current_slot() != slot_c { + let current_slot = harness.get_current_slot(); + let next_slot = current_slot + 1; + + // Simulate the scheduled call to prepare proposers at 8 seconds into the slot. + harness.advance_to_slot_lookahead(next_slot, payload_lookahead); + harness + .chain + .prepare_beacon_proposer(current_slot) + .await + .unwrap(); + + // Simulate the scheduled call to fork choice + prepare proposers 500ms before the + // next slot. + harness.advance_to_slot_lookahead(next_slot, fork_choice_lookahead); + harness.chain.recompute_head_at_slot(next_slot).await; + harness + .chain + .prepare_beacon_proposer(current_slot) + .await + .unwrap(); + + harness.advance_slot(); + harness.chain.per_slot_task().await; + } + + // Produce block C. + // Advance state_b so we can get the proposer. + complete_state_advance(&mut state_b, None, slot_c, &harness.chain.spec).unwrap(); + + let proposer_index = state_b + .get_beacon_proposer_index(slot_c, &harness.chain.spec) + .unwrap(); + let randao_reveal = harness + .sign_randao_reveal(&state_b, proposer_index, slot_c) + .into(); + let unsigned_block_c = tester + .client + .get_validator_blocks(slot_c, &randao_reveal, None) + .await + .unwrap() + .data; + let block_c = harness.sign_beacon_block(unsigned_block_c, &state_b); + + if should_re_org { + // Block C should build on A. + assert_eq!(block_c.parent_root(), block_a_root.into()); + } else { + // Block C should build on B. + assert_eq!(block_c.parent_root(), block_b_root); + } + + // Applying block C should cause it to become head regardless (re-org or continuation). + let block_root_c = harness + .process_block_result(block_c.clone()) + .await + .unwrap() + .into(); + assert_eq!(harness.head_block_root(), block_root_c); + + // Check the fork choice updates that were sent. + let forkchoice_updates = forkchoice_updates.lock(); + let block_a_exec_hash = block_a.message().execution_payload().unwrap().block_hash(); + let block_b_exec_hash = block_b.message().execution_payload().unwrap().block_hash(); + + let block_c_timestamp = block_c.message().execution_payload().unwrap().timestamp(); + + // If we re-orged then no fork choice update for B should have been sent. + assert_eq!( + should_re_org, + !forkchoice_updates.contains_update_for(block_b_exec_hash), + "{block_b_exec_hash:?}" + ); + + // Check the timing of the first fork choice update with payload attributes for block C. + let c_parent_hash = if should_re_org { + block_a_exec_hash + } else { + block_b_exec_hash + }; + let first_update = forkchoice_updates + .first_update_with_payload_attributes(c_parent_hash, block_c_timestamp) + .unwrap(); + let payload_attribs = first_update.payload_attributes.as_ref().unwrap(); + + let lookahead = slot_clock + .start_of(slot_c) + .unwrap() + .checked_sub(first_update.received_at) + .unwrap(); + + if !misprediction { + assert_eq!( + lookahead, payload_lookahead, + "lookahead={lookahead:?}, timestamp={}, prev_randao={:?}", + payload_attribs.timestamp, payload_attribs.prev_randao, + ); + } else { + // On a misprediction we issue the first fcU 500ms before creating a block! + assert_eq!( + lookahead, fork_choice_lookahead, + "timestamp={}, prev_randao={:?}", + payload_attribs.timestamp, payload_attribs.prev_randao, + ); + } +} + // Test that running fork choice before proposing results in selection of the correct head. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn fork_choice_before_proposal() { diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index f1180e3f9db..129dc61d7ac 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -772,6 +772,38 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { experimental as it may obscure performance issues.") .takes_value(false) ) + .arg( + Arg::with_name("disable-proposer-reorgs") + .long("disable-proposer-reorgs") + .help("Do not attempt to reorg late blocks from other validators when proposing.") + .takes_value(false) + ) + .arg( + Arg::with_name("proposer-reorg-threshold") + .long("proposer-reorg-threshold") + .value_name("PERCENT") + .help("Percentage of vote weight below which to attempt a proposer reorg. \ + Default: 20%") + .conflicts_with("disable-proposer-reorgs") + ) + .arg( + Arg::with_name("proposer-reorg-epochs-since-finalization") + .long("proposer-reorg-epochs-since-finalization") + .value_name("EPOCHS") + .help("Maximum number of epochs since finalization at which proposer reorgs are \ + allowed. Default: 2") + .conflicts_with("disable-proposer-reorgs") + ) + .arg( + Arg::with_name("prepare-payload-lookahead") + .long("prepare-payload-lookahead") + .value_name("MILLISECONDS") + .help("The time before the start of a proposal slot at which payload attributes \ + should be sent. Low values are useful for execution nodes which don't \ + improve their payload after the first call, and high values are useful \ + for ensuring the EL is given ample notice. Default: 1/3 of a slot.") + .takes_value(true) + ) .arg( Arg::with_name("fork-choice-before-proposal-timeout") .long("fork-choice-before-proposal-timeout") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index ecf2879b954..f6bccede70e 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1,3 +1,7 @@ +use beacon_chain::chain_config::{ + ReOrgThreshold, DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR, + DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_RE_ORG_THRESHOLD, +}; use clap::ArgMatches; use clap_utils::flags::DISABLE_MALLOC_TUNING_FLAG; use client::{ClientConfig, ClientGenesis}; @@ -18,6 +22,7 @@ use std::net::Ipv6Addr; use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::time::Duration; use types::{Checkpoint, Epoch, EthSpec, Hash256, PublicKeyBytes, GRAFFITI_BYTES_LEN}; use unused_port::{unused_tcp_port, unused_udp_port}; @@ -680,11 +685,32 @@ pub fn get_config( client_config.chain.enable_lock_timeouts = false; } + if cli_args.is_present("disable-proposer-reorgs") { + client_config.chain.re_org_threshold = None; + } else { + client_config.chain.re_org_threshold = Some( + clap_utils::parse_optional(cli_args, "proposer-reorg-threshold")? + .map(ReOrgThreshold) + .unwrap_or(DEFAULT_RE_ORG_THRESHOLD), + ); + client_config.chain.re_org_max_epochs_since_finalization = + clap_utils::parse_optional(cli_args, "proposer-reorg-epochs-since-finalization")? + .unwrap_or(DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION); + } + // Note: This overrides any previous flags that enable this option. if cli_args.is_present("disable-deposit-contract-sync") { client_config.sync_eth1_chain = false; } + client_config.chain.prepare_payload_lookahead = + clap_utils::parse_optional(cli_args, "prepare-payload-lookahead")? + .map(Duration::from_millis) + .unwrap_or_else(|| { + Duration::from_secs(spec.seconds_per_slot) + / DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR + }); + if let Some(timeout) = clap_utils::parse_optional(cli_args, "fork-choice-before-proposal-timeout")? { diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index a43fa10e649..470407ebee9 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -47,6 +47,7 @@ * [Release Candidates](./advanced-release-candidates.md) * [MEV and Lighthouse](./builders.md) * [Merge Migration](./merge-migration.md) + * [Late Block Re-orgs](./late-block-re-orgs.md) * [Contributing](./contributing.md) * [Development Environment](./setup.md) * [FAQs](./faq.md) diff --git a/book/src/builders.md b/book/src/builders.md index 99fae5b3e76..f2a4b3936a5 100644 --- a/book/src/builders.md +++ b/book/src/builders.md @@ -200,19 +200,23 @@ for `INFO` and `WARN` messages indicating why the builder was not used. Examples of messages indicating fallback to a locally produced block are: ``` -INFO No payload provided by connected builder. +INFO Builder did not return a payload ``` ``` -WARN Unable to retrieve a payload from a connected builder +WARN Builder error when requesting payload ``` ``` -INFO The value offered by the connected builder does not meet the configured profit threshold. +WARN Builder returned invalid payload ``` ``` -INFO Due to poor chain health the local execution engine will be used for payload construction. +INFO Builder payload ignored +``` + +``` +INFO Chain is unhealthy, using local payload ``` In case of fallback you should see a log indicating that the locally produced payload was diff --git a/book/src/late-block-re-orgs.md b/book/src/late-block-re-orgs.md new file mode 100644 index 00000000000..0014af8f152 --- /dev/null +++ b/book/src/late-block-re-orgs.md @@ -0,0 +1,60 @@ +# Late Block Re-orgs + +Since v3.4.0 Lighthouse will opportunistically re-org late blocks when proposing. + +This feature is intended to disincentivise late blocks and improve network health. Proposing a +re-orging block is also more profitable for the proposer because it increases the number of +attestations and transactions that can be included. + +## Command line flags + +There are three flags which control the re-orging behaviour: + +* `--disable-proposer-reorgs`: turn re-orging off (it's on by default). +* `--proposer-reorg-threshold N`: attempt to orphan blocks with less than N% of the committee vote. If this parameter isn't set then N defaults to 20% when the feature is enabled. +* `--proposer-reorg-epochs-since-finalization N`: only attempt to re-org late blocks when the number of epochs since finalization is less than or equal to N. The default is 2 epochs, + meaning re-orgs will only be attempted when the chain is finalizing optimally. + +All flags should be applied to `lighthouse bn`. The default configuration is recommended as it +balances the chance of the re-org succeeding against the chance of failure due to attestations +arriving late and making the re-org block non-viable. + +## Safeguards + +To prevent excessive re-orgs there are several safeguards in place that limit when a re-org +will be attempted. + +The full conditions are described in [the spec][] but the most important ones are: + +* Only single-slot re-orgs: Lighthouse will build a block at N + 1 to re-org N by building on the + parent N - 1. The result is a chain with exactly one skipped slot. +* No epoch boundaries: to ensure that the selected proposer does not change, Lighthouse will + not propose a re-orging block in the 0th slot of an epoch. + +## Logs + +You can track the reasons for re-orgs being attempted (or not) via Lighthouse's logs. + +A pair of messages at `INFO` level will be logged if a re-org opportunity is detected: + +> INFO Attempting re-org due to weak head threshold_weight: 45455983852725, head_weight: 0, parent: 0x09d953b69041f280758400c671130d174113bbf57c2d26553a77fb514cad4890, weak_head: 0xf64f8e5ed617dc18c1e759dab5d008369767c3678416dac2fe1d389562842b49 + +> INFO Proposing block to re-org current head head_to_reorg: 0xf64f…2b49, slot: 1105320 + +This should be followed shortly after by a `WARN` log indicating that a re-org occurred. This is +expected and normal: + +> WARN Beacon chain re-org reorg_distance: 1, new_slot: 1105320, new_head: 0x72791549e4ca792f91053bc7cf1e55c6fbe745f78ce7a16fc3acb6f09161becd, previous_slot: 1105319, previous_head: 0xf64f8e5ed617dc18c1e759dab5d008369767c3678416dac2fe1d389562842b49 + +In case a re-org is not viable (which should be most of the time), Lighthouse will just propose a +block as normal and log the reason the re-org was not attempted at debug level: + +> DEBG Not attempting re-org reason: head not late + +If you are interested in digging into the timing of `forkchoiceUpdated` messages sent to the +execution layer, there is also a debug log for the suppression of `forkchoiceUpdated` messages +when Lighthouse thinks that a re-org is likely: + +> DEBG Fork choice update overridden slot: 1105320, override: 0x09d953b69041f280758400c671130d174113bbf57c2d26553a77fb514cad4890, canonical_head: 0xf64f8e5ed617dc18c1e759dab5d008369767c3678416dac2fe1d389562842b49 + +[the spec]: https://github.com/ethereum/consensus-specs/pull/3034 diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs index 7d59cdf022c..c6e5ad01e68 100644 --- a/common/task_executor/src/test_utils.rs +++ b/common/task_executor/src/test_utils.rs @@ -60,6 +60,13 @@ impl Drop for TestRuntime { } } +impl TestRuntime { + pub fn set_logger(&mut self, log: Logger) { + self.log = log.clone(); + self.task_executor.log = log; + } +} + pub fn null_logger() -> Result { let log_builder = NullLoggerBuilder; log_builder diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 5f4c9931f0e..f795b07907a 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1,6 +1,7 @@ use crate::{ForkChoiceStore, InvalidationOperation}; use proto_array::{ - Block as ProtoBlock, CountUnrealizedFull, ExecutionStatus, ProtoArrayForkChoice, + Block as ProtoBlock, CountUnrealizedFull, ExecutionStatus, ProposerHeadError, ProposerHeadInfo, + ProtoArrayForkChoice, ReOrgThreshold, }; use slog::{crit, debug, warn, Logger}; use ssz_derive::{Decode, Encode}; @@ -23,7 +24,8 @@ pub enum Error { InvalidAttestation(InvalidAttestation), InvalidAttesterSlashing(AttesterSlashingValidationError), InvalidBlock(InvalidBlock), - ProtoArrayError(String), + ProtoArrayStringError(String), + ProtoArrayError(proto_array::Error), InvalidProtoArrayBytes(String), InvalidLegacyProtoArrayBytes(String), FailedToProcessInvalidExecutionPayload(String), @@ -45,6 +47,7 @@ pub enum Error { ForkChoiceStoreError(T), UnableToSetJustifiedCheckpoint(T), AfterBlockFailed(T), + ProposerHeadError(T), InvalidAnchor { block_slot: Slot, state_slot: Slot, @@ -60,6 +63,13 @@ pub enum Error { MissingFinalizedBlock { finalized_checkpoint: Checkpoint, }, + WrongSlotForGetProposerHead { + current_slot: Slot, + fc_store_slot: Slot, + }, + ProposerBoostNotExpiredForGetProposerHead { + proposer_boost_root: Hash256, + }, UnrealizedVoteProcessing(state_processing::EpochProcessingError), ParticipationCacheBuild(BeaconStateError), ValidatorStatuses(BeaconStateError), @@ -154,6 +164,12 @@ pub enum InvalidAttestation { impl From for Error { fn from(e: String) -> Self { + Error::ProtoArrayStringError(e) + } +} + +impl From for Error { + fn from(e: proto_array::Error) -> Self { Error::ProtoArrayError(e) } } @@ -555,6 +571,69 @@ where Ok(head_root) } + /// Get the block to build on as proposer, taking into account proposer re-orgs. + /// + /// You *must* call `get_head` for the proposal slot prior to calling this function and pass + /// in the result of `get_head` as `canonical_head`. + pub fn get_proposer_head( + &self, + current_slot: Slot, + canonical_head: Hash256, + re_org_threshold: ReOrgThreshold, + max_epochs_since_finalization: Epoch, + ) -> Result>> { + // Ensure that fork choice has already been updated for the current slot. This prevents + // us from having to take a write lock or do any dequeueing of attestations in this + // function. + let fc_store_slot = self.fc_store.get_current_slot(); + if current_slot != fc_store_slot { + return Err(ProposerHeadError::Error( + Error::WrongSlotForGetProposerHead { + current_slot, + fc_store_slot, + }, + )); + } + + // Similarly, the proposer boost for the previous head should already have expired. + let proposer_boost_root = self.fc_store.proposer_boost_root(); + if !proposer_boost_root.is_zero() { + return Err(ProposerHeadError::Error( + Error::ProposerBoostNotExpiredForGetProposerHead { + proposer_boost_root, + }, + )); + } + + self.proto_array + .get_proposer_head::( + current_slot, + canonical_head, + self.fc_store.justified_balances(), + re_org_threshold, + max_epochs_since_finalization, + ) + .map_err(ProposerHeadError::convert_inner_error) + } + + pub fn get_preliminary_proposer_head( + &self, + canonical_head: Hash256, + re_org_threshold: ReOrgThreshold, + max_epochs_since_finalization: Epoch, + ) -> Result>> { + let current_slot = self.fc_store.get_current_slot(); + self.proto_array + .get_proposer_head_info::( + current_slot, + canonical_head, + self.fc_store.justified_balances(), + re_org_threshold, + max_epochs_since_finalization, + ) + .map_err(ProposerHeadError::convert_inner_error) + } + /// Return information about: /// /// - The LMD head of the chain. diff --git a/consensus/fork_choice/src/fork_choice_store.rs b/consensus/fork_choice/src/fork_choice_store.rs index a9a32173b6a..9500b1c7da8 100644 --- a/consensus/fork_choice/src/fork_choice_store.rs +++ b/consensus/fork_choice/src/fork_choice_store.rs @@ -1,3 +1,4 @@ +use proto_array::JustifiedBalances; use std::collections::BTreeSet; use std::fmt::Debug; use types::{AbstractExecPayload, BeaconBlockRef, BeaconState, Checkpoint, EthSpec, Hash256, Slot}; @@ -44,7 +45,7 @@ pub trait ForkChoiceStore: Sized { fn justified_checkpoint(&self) -> &Checkpoint; /// Returns balances from the `state` identified by `justified_checkpoint.root`. - fn justified_balances(&self) -> &[u64]; + fn justified_balances(&self) -> &JustifiedBalances; /// Returns the `best_justified_checkpoint`. fn best_justified_checkpoint(&self) -> &Checkpoint; diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 850f7c4a120..00bd1f763dc 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -378,9 +378,13 @@ impl ForkChoiceTest { assert_eq!( &balances[..], - fc.fc_store().justified_balances(), + &fc.fc_store().justified_balances().effective_balances, "balances should match" - ) + ); + assert_eq!( + balances.iter().sum::(), + fc.fc_store().justified_balances().total_effective_balance + ); } /// Returns an attestation that is valid for some slot in the given `chain`. diff --git a/consensus/proto_array/Cargo.toml b/consensus/proto_array/Cargo.toml index 1c7b19bf1da..205ef8f5210 100644 --- a/consensus/proto_array/Cargo.toml +++ b/consensus/proto_array/Cargo.toml @@ -15,3 +15,4 @@ eth2_ssz_derive = "0.3.1" serde = "1.0.116" serde_derive = "1.0.116" serde_yaml = "0.8.13" +safe_arith = { path = "../safe_arith" } diff --git a/consensus/proto_array/src/error.rs b/consensus/proto_array/src/error.rs index 826bf6c3a79..c55739da792 100644 --- a/consensus/proto_array/src/error.rs +++ b/consensus/proto_array/src/error.rs @@ -1,3 +1,4 @@ +use safe_arith::ArithError; use types::{Checkpoint, Epoch, ExecutionBlockHash, Hash256, Slot}; #[derive(Clone, PartialEq, Debug)] @@ -15,6 +16,7 @@ pub enum Error { InvalidNodeDelta(usize), DeltaOverflow(usize), ProposerBoostOverflow(usize), + ReOrgThresholdOverflow, IndexOverflow(&'static str), InvalidExecutionDeltaOverflow(usize), InvalidDeltaLen { @@ -48,6 +50,13 @@ pub enum Error { block_root: Hash256, parent_root: Hash256, }, + Arith(ArithError), +} + +impl From for Error { + fn from(e: ArithError) -> Self { + Error::Arith(e) + } } #[derive(Clone, PartialEq, Debug)] diff --git a/consensus/proto_array/src/fork_choice_test_definition.rs b/consensus/proto_array/src/fork_choice_test_definition.rs index ba6f3170dc1..035fb799eea 100644 --- a/consensus/proto_array/src/fork_choice_test_definition.rs +++ b/consensus/proto_array/src/fork_choice_test_definition.rs @@ -5,7 +5,7 @@ mod votes; use crate::proto_array::CountUnrealizedFull; use crate::proto_array_fork_choice::{Block, ExecutionStatus, ProtoArrayForkChoice}; -use crate::InvalidationOperation; +use crate::{InvalidationOperation, JustifiedBalances}; use serde_derive::{Deserialize, Serialize}; use std::collections::BTreeSet; use types::{ @@ -101,11 +101,14 @@ impl ForkChoiceTestDefinition { justified_state_balances, expected_head, } => { + let justified_balances = + JustifiedBalances::from_effective_balances(justified_state_balances) + .unwrap(); let head = fork_choice .find_head::( justified_checkpoint, finalized_checkpoint, - &justified_state_balances, + &justified_balances, Hash256::zero(), &equivocating_indices, Slot::new(0), @@ -129,11 +132,14 @@ impl ForkChoiceTestDefinition { expected_head, proposer_boost_root, } => { + let justified_balances = + JustifiedBalances::from_effective_balances(justified_state_balances) + .unwrap(); let head = fork_choice .find_head::( justified_checkpoint, finalized_checkpoint, - &justified_state_balances, + &justified_balances, proposer_boost_root, &equivocating_indices, Slot::new(0), @@ -155,10 +161,13 @@ impl ForkChoiceTestDefinition { finalized_checkpoint, justified_state_balances, } => { + let justified_balances = + JustifiedBalances::from_effective_balances(justified_state_balances) + .unwrap(); let result = fork_choice.find_head::( justified_checkpoint, finalized_checkpoint, - &justified_state_balances, + &justified_balances, Hash256::zero(), &equivocating_indices, Slot::new(0), diff --git a/consensus/proto_array/src/fork_choice_test_definition/execution_status.rs b/consensus/proto_array/src/fork_choice_test_definition/execution_status.rs index f1b0e512d7d..ede5bb39481 100644 --- a/consensus/proto_array/src/fork_choice_test_definition/execution_status.rs +++ b/consensus/proto_array/src/fork_choice_test_definition/execution_status.rs @@ -999,7 +999,7 @@ pub fn get_execution_status_test_definition_03() -> ForkChoiceTestDefinition { }); ops.push(Operation::AssertWeight { block_root: get_root(3), - // This is a "magic number" generated from `calculate_proposer_boost`. + // This is a "magic number" generated from `calculate_committee_fraction`. weight: 31_000, }); diff --git a/consensus/proto_array/src/justified_balances.rs b/consensus/proto_array/src/justified_balances.rs new file mode 100644 index 00000000000..75f6c2f7c80 --- /dev/null +++ b/consensus/proto_array/src/justified_balances.rs @@ -0,0 +1,62 @@ +use safe_arith::{ArithError, SafeArith}; +use types::{BeaconState, EthSpec}; + +#[derive(Debug, PartialEq, Clone, Default)] +pub struct JustifiedBalances { + /// The effective balances for every validator in a given justified state. + /// + /// Any validator who is not active in the epoch of the justified state is assigned a balance of + /// zero. + pub effective_balances: Vec, + /// The sum of `self.effective_balances`. + pub total_effective_balance: u64, + /// The number of active validators included in `self.effective_balances`. + pub num_active_validators: u64, +} + +impl JustifiedBalances { + pub fn from_justified_state(state: &BeaconState) -> Result { + let current_epoch = state.current_epoch(); + let mut total_effective_balance = 0u64; + let mut num_active_validators = 0u64; + + let effective_balances = state + .validators() + .iter() + .map(|validator| { + if validator.is_active_at(current_epoch) { + total_effective_balance.safe_add_assign(validator.effective_balance)?; + num_active_validators.safe_add_assign(1)?; + + Ok(validator.effective_balance) + } else { + Ok(0) + } + }) + .collect::, _>>()?; + + Ok(Self { + effective_balances, + total_effective_balance, + num_active_validators, + }) + } + + pub fn from_effective_balances(effective_balances: Vec) -> Result { + let mut total_effective_balance = 0; + let mut num_active_validators = 0; + + for &balance in &effective_balances { + if balance != 0 { + total_effective_balance.safe_add_assign(balance)?; + num_active_validators.safe_add_assign(1)?; + } + } + + Ok(Self { + effective_balances, + total_effective_balance, + num_active_validators, + }) + } +} diff --git a/consensus/proto_array/src/lib.rs b/consensus/proto_array/src/lib.rs index e7bd9c0ed56..f2b29e1c7b2 100644 --- a/consensus/proto_array/src/lib.rs +++ b/consensus/proto_array/src/lib.rs @@ -1,11 +1,18 @@ mod error; pub mod fork_choice_test_definition; +mod justified_balances; mod proto_array; mod proto_array_fork_choice; mod ssz_container; -pub use crate::proto_array::{CountUnrealizedFull, InvalidationOperation}; -pub use crate::proto_array_fork_choice::{Block, ExecutionStatus, ProtoArrayForkChoice}; +pub use crate::justified_balances::JustifiedBalances; +pub use crate::proto_array::{ + calculate_committee_fraction, CountUnrealizedFull, InvalidationOperation, +}; +pub use crate::proto_array_fork_choice::{ + Block, DoNotReOrg, ExecutionStatus, ProposerHeadError, ProposerHeadInfo, ProtoArrayForkChoice, + ReOrgThreshold, +}; pub use error::Error; pub mod core { diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 590407d7eb8..add84f54787 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -1,5 +1,5 @@ use crate::error::InvalidBestNodeInfo; -use crate::{error::Error, Block, ExecutionStatus}; +use crate::{error::Error, Block, ExecutionStatus, JustifiedBalances}; use serde_derive::{Deserialize, Serialize}; use ssz::four_byte_option_impl; use ssz::Encode; @@ -169,7 +169,7 @@ impl ProtoArray { mut deltas: Vec, justified_checkpoint: Checkpoint, finalized_checkpoint: Checkpoint, - new_balances: &[u64], + new_justified_balances: &JustifiedBalances, proposer_boost_root: Hash256, current_slot: Slot, spec: &ChainSpec, @@ -241,9 +241,11 @@ impl ProtoArray { // Invalid nodes (or their ancestors) should not receive a proposer boost. && !execution_status_is_invalid { - proposer_score = - calculate_proposer_boost::(new_balances, proposer_score_boost) - .ok_or(Error::ProposerBoostOverflow(node_index))?; + proposer_score = calculate_committee_fraction::( + new_justified_balances, + proposer_score_boost, + ) + .ok_or(Error::ProposerBoostOverflow(node_index))?; node_delta = node_delta .checked_add(proposer_score as i64) .ok_or(Error::DeltaOverflow(node_index))?; @@ -1006,32 +1008,19 @@ impl ProtoArray { } } -/// A helper method to calculate the proposer boost based on the given `validator_balances`. -/// This does *not* do any verification about whether a boost should or should not be applied. -/// The `validator_balances` array used here is assumed to be structured like the one stored in -/// the `BalancesCache`, where *effective* balances are stored and inactive balances are defaulted -/// to zero. -/// -/// Returns `None` if there is an overflow or underflow when calculating the score. +/// A helper method to calculate the proposer boost based on the given `justified_balances`. /// /// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#get_latest_attesting_balance -pub fn calculate_proposer_boost( - validator_balances: &[u64], +pub fn calculate_committee_fraction( + justified_balances: &JustifiedBalances, proposer_score_boost: u64, ) -> Option { - let mut total_balance: u64 = 0; - let mut num_validators: u64 = 0; - for &balance in validator_balances { - // We need to filter zero balances here to get an accurate active validator count. - // This is because we default inactive validator balances to zero when creating - // this balances array. - if balance != 0 { - total_balance = total_balance.checked_add(balance)?; - num_validators = num_validators.checked_add(1)?; - } - } - let average_balance = total_balance.checked_div(num_validators)?; - let committee_size = num_validators.checked_div(E::slots_per_epoch())?; + let average_balance = justified_balances + .total_effective_balance + .checked_div(justified_balances.num_active_validators)?; + let committee_size = justified_balances + .num_active_validators + .checked_div(E::slots_per_epoch())?; let committee_weight = committee_size.checked_mul(average_balance)?; committee_weight .checked_mul(proposer_score_boost)? diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 8f5d062ec6a..cbd369ae6ec 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -1,9 +1,12 @@ -use crate::error::Error; -use crate::proto_array::CountUnrealizedFull; -use crate::proto_array::{ - calculate_proposer_boost, InvalidationOperation, Iter, ProposerBoost, ProtoArray, ProtoNode, +use crate::{ + error::Error, + proto_array::{ + calculate_committee_fraction, CountUnrealizedFull, InvalidationOperation, Iter, + ProposerBoost, ProtoArray, ProtoNode, + }, + ssz_container::SszContainer, + JustifiedBalances, }; -use crate::ssz_container::SszContainer; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; @@ -170,11 +173,128 @@ where } } +/// Information about the proposer head used for opportunistic re-orgs. +#[derive(Clone)] +pub struct ProposerHeadInfo { + /// Information about the *current* head block, which may be re-orged. + pub head_node: ProtoNode, + /// Information about the parent of the current head, which should be selected as the parent + /// for a new proposal *if* a re-org is decided on. + pub parent_node: ProtoNode, + /// The computed fraction of the active committee balance below which we can re-org. + pub re_org_weight_threshold: u64, + /// The current slot from fork choice's point of view, may lead the wall-clock slot by upto + /// 500ms. + pub current_slot: Slot, +} + +/// Error type to enable short-circuiting checks in `get_proposer_head`. +/// +/// This type intentionally does not implement `Debug` so that callers are forced to handle the +/// enum. +#[derive(Clone, PartialEq)] +pub enum ProposerHeadError { + DoNotReOrg(DoNotReOrg), + Error(E), +} + +impl From for ProposerHeadError { + fn from(e: DoNotReOrg) -> ProposerHeadError { + Self::DoNotReOrg(e) + } +} + +impl From for ProposerHeadError { + fn from(e: Error) -> Self { + Self::Error(e) + } +} + +impl ProposerHeadError { + pub fn convert_inner_error(self) -> ProposerHeadError + where + E2: From, + { + self.map_inner_error(E2::from) + } + + pub fn map_inner_error(self, f: impl FnOnce(E1) -> E2) -> ProposerHeadError { + match self { + ProposerHeadError::DoNotReOrg(reason) => ProposerHeadError::DoNotReOrg(reason), + ProposerHeadError::Error(error) => ProposerHeadError::Error(f(error)), + } + } +} + +/// Reasons why a re-org should not be attempted. +/// +/// This type intentionally does not implement `Debug` so that the `Display` impl must be used. +#[derive(Clone, PartialEq)] +pub enum DoNotReOrg { + MissingHeadOrParentNode, + MissingHeadFinalizedCheckpoint, + ParentDistance, + HeadDistance, + ShufflingUnstable, + JustificationAndFinalizationNotCompetitive, + ChainNotFinalizing { + epochs_since_finalization: u64, + }, + HeadNotWeak { + head_weight: u64, + re_org_weight_threshold: u64, + }, + HeadNotLate, + NotProposing, + ReOrgsDisabled, +} + +impl std::fmt::Display for DoNotReOrg { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::MissingHeadOrParentNode => write!(f, "unknown head or parent"), + Self::MissingHeadFinalizedCheckpoint => write!(f, "finalized checkpoint missing"), + Self::ParentDistance => write!(f, "parent too far from head"), + Self::HeadDistance => write!(f, "head too far from current slot"), + Self::ShufflingUnstable => write!(f, "shuffling unstable at epoch boundary"), + Self::JustificationAndFinalizationNotCompetitive => { + write!(f, "justification or finalization not competitive") + } + Self::ChainNotFinalizing { + epochs_since_finalization, + } => write!( + f, + "chain not finalizing ({epochs_since_finalization} epochs since finalization)" + ), + Self::HeadNotWeak { + head_weight, + re_org_weight_threshold, + } => { + write!(f, "head not weak ({head_weight}/{re_org_weight_threshold})") + } + Self::HeadNotLate => { + write!(f, "head arrived on time") + } + Self::NotProposing => { + write!(f, "not proposing at next slot") + } + Self::ReOrgsDisabled => { + write!(f, "re-orgs disabled in config") + } + } + } +} + +/// New-type for the re-org threshold percentage. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ReOrgThreshold(pub u64); + #[derive(PartialEq)] pub struct ProtoArrayForkChoice { pub(crate) proto_array: ProtoArray, pub(crate) votes: ElasticList, - pub(crate) balances: Vec, + pub(crate) balances: JustifiedBalances, } impl ProtoArrayForkChoice { @@ -223,7 +343,7 @@ impl ProtoArrayForkChoice { Ok(Self { proto_array, votes: ElasticList::default(), - balances: vec![], + balances: JustifiedBalances::default(), }) } @@ -282,21 +402,20 @@ impl ProtoArrayForkChoice { &mut self, justified_checkpoint: Checkpoint, finalized_checkpoint: Checkpoint, - justified_state_balances: &[u64], + justified_state_balances: &JustifiedBalances, proposer_boost_root: Hash256, equivocating_indices: &BTreeSet, current_slot: Slot, spec: &ChainSpec, ) -> Result { let old_balances = &mut self.balances; - let new_balances = justified_state_balances; let deltas = compute_deltas( &self.proto_array.indices, &mut self.votes, - old_balances, - new_balances, + &old_balances.effective_balances, + &new_balances.effective_balances, equivocating_indices, ) .map_err(|e| format!("find_head compute_deltas failed: {:?}", e))?; @@ -313,13 +432,129 @@ impl ProtoArrayForkChoice { ) .map_err(|e| format!("find_head apply_score_changes failed: {:?}", e))?; - *old_balances = new_balances.to_vec(); + *old_balances = new_balances.clone(); self.proto_array .find_head::(&justified_checkpoint.root, current_slot) .map_err(|e| format!("find_head failed: {:?}", e)) } + /// Get the block to propose on during `current_slot`. + /// + /// This function returns a *definitive* result which should be acted on. + pub fn get_proposer_head( + &self, + current_slot: Slot, + canonical_head: Hash256, + justified_balances: &JustifiedBalances, + re_org_threshold: ReOrgThreshold, + max_epochs_since_finalization: Epoch, + ) -> Result> { + let info = self.get_proposer_head_info::( + current_slot, + canonical_head, + justified_balances, + re_org_threshold, + max_epochs_since_finalization, + )?; + + // Only re-org a single slot. This prevents cascading failures during asynchrony. + let head_slot_ok = info.head_node.slot + 1 == current_slot; + if !head_slot_ok { + return Err(DoNotReOrg::HeadDistance.into()); + } + + // Only re-org if the head's weight is less than the configured committee fraction. + let head_weight = info.head_node.weight; + let re_org_weight_threshold = info.re_org_weight_threshold; + let weak_head = head_weight < re_org_weight_threshold; + if !weak_head { + return Err(DoNotReOrg::HeadNotWeak { + head_weight, + re_org_weight_threshold, + } + .into()); + } + + // All checks have passed, build upon the parent to re-org the head. + Ok(info) + } + + /// Get information about the block to propose on during `current_slot`. + /// + /// This function returns a *partial* result which must be processed further. + pub fn get_proposer_head_info( + &self, + current_slot: Slot, + canonical_head: Hash256, + justified_balances: &JustifiedBalances, + re_org_threshold: ReOrgThreshold, + max_epochs_since_finalization: Epoch, + ) -> Result> { + let mut nodes = self + .proto_array + .iter_nodes(&canonical_head) + .take(2) + .cloned() + .collect::>(); + + let parent_node = nodes.pop().ok_or(DoNotReOrg::MissingHeadOrParentNode)?; + let head_node = nodes.pop().ok_or(DoNotReOrg::MissingHeadOrParentNode)?; + + let parent_slot = parent_node.slot; + let head_slot = head_node.slot; + let re_org_block_slot = head_slot + 1; + + // Check finalization distance. + let proposal_epoch = re_org_block_slot.epoch(E::slots_per_epoch()); + let finalized_epoch = head_node + .unrealized_finalized_checkpoint + .ok_or(DoNotReOrg::MissingHeadFinalizedCheckpoint)? + .epoch; + let epochs_since_finalization = proposal_epoch.saturating_sub(finalized_epoch).as_u64(); + if epochs_since_finalization > max_epochs_since_finalization.as_u64() { + return Err(DoNotReOrg::ChainNotFinalizing { + epochs_since_finalization, + } + .into()); + } + + // Check parent distance from head. + // Do not check head distance from current slot, as that condition needs to be + // late-evaluated and is elided when `current_slot == head_slot`. + let parent_slot_ok = parent_slot + 1 == head_slot; + if !parent_slot_ok { + return Err(DoNotReOrg::ParentDistance.into()); + } + + // Check shuffling stability. + let shuffling_stable = re_org_block_slot % E::slots_per_epoch() != 0; + if !shuffling_stable { + return Err(DoNotReOrg::ShufflingUnstable.into()); + } + + // Check FFG. + let ffg_competitive = parent_node.unrealized_justified_checkpoint + == head_node.unrealized_justified_checkpoint + && parent_node.unrealized_finalized_checkpoint + == head_node.unrealized_finalized_checkpoint; + if !ffg_competitive { + return Err(DoNotReOrg::JustificationAndFinalizationNotCompetitive.into()); + } + + // Compute re-org weight threshold. + let re_org_weight_threshold = + calculate_committee_fraction::(justified_balances, re_org_threshold.0) + .ok_or(Error::ReOrgThresholdOverflow)?; + + Ok(ProposerHeadInfo { + head_node, + parent_node, + re_org_weight_threshold, + current_slot, + }) + } + /// Returns `true` if there are any blocks in `self` with an `INVALID` execution payload status. /// /// This will operate on *all* blocks, even those that do not descend from the finalized @@ -368,7 +603,7 @@ impl ProtoArrayForkChoice { if vote.current_root == node.root { // Any voting validator that does not have a balance should be // ignored. This is consistent with `compute_deltas`. - self.balances.get(validator_index) + self.balances.effective_balances.get(validator_index) } else { None } @@ -382,9 +617,11 @@ impl ProtoArrayForkChoice { // Compute the score based upon the current balances. We can't rely on // the `previous_proposr_boost.score` since it is set to zero with an // invalid node. - let proposer_score = - calculate_proposer_boost::(&self.balances, proposer_score_boost) - .ok_or("Failed to compute proposer boost")?; + let proposer_score = calculate_committee_fraction::( + &self.balances, + proposer_score_boost, + ) + .ok_or("Failed to compute proposer boost")?; // Store the score we've applied here so it can be removed in // a later call to `apply_score_changes`. self.proto_array.previous_proposer_boost.score = proposer_score; @@ -538,10 +775,11 @@ impl ProtoArrayForkChoice { bytes: &[u8], count_unrealized_full: CountUnrealizedFull, ) -> Result { - SszContainer::from_ssz_bytes(bytes) - .map(|container| (container, count_unrealized_full)) - .map(Into::into) - .map_err(|e| format!("Failed to decode ProtoArrayForkChoice: {:?}", e)) + let container = SszContainer::from_ssz_bytes(bytes) + .map_err(|e| format!("Failed to decode ProtoArrayForkChoice: {:?}", e))?; + (container, count_unrealized_full) + .try_into() + .map_err(|e| format!("Failed to initialize ProtoArrayForkChoice: {e:?}")) } /// Returns a read-lock to core `ProtoArray` struct. diff --git a/consensus/proto_array/src/ssz_container.rs b/consensus/proto_array/src/ssz_container.rs index 63f75ed0a2f..1a20ef967ad 100644 --- a/consensus/proto_array/src/ssz_container.rs +++ b/consensus/proto_array/src/ssz_container.rs @@ -2,10 +2,12 @@ use crate::proto_array::ProposerBoost; use crate::{ proto_array::{CountUnrealizedFull, ProtoArray, ProtoNode}, proto_array_fork_choice::{ElasticList, ProtoArrayForkChoice, VoteTracker}, + Error, JustifiedBalances, }; use ssz::{four_byte_option_impl, Encode}; use ssz_derive::{Decode, Encode}; use std::collections::HashMap; +use std::convert::TryFrom; use types::{Checkpoint, Hash256}; // Define a "legacy" implementation of `Option` which uses four bytes for encoding the union @@ -30,7 +32,7 @@ impl From<&ProtoArrayForkChoice> for SszContainer { Self { votes: from.votes.0.clone(), - balances: from.balances.clone(), + balances: from.balances.effective_balances.clone(), prune_threshold: proto_array.prune_threshold, justified_checkpoint: proto_array.justified_checkpoint, finalized_checkpoint: proto_array.finalized_checkpoint, @@ -41,8 +43,12 @@ impl From<&ProtoArrayForkChoice> for SszContainer { } } -impl From<(SszContainer, CountUnrealizedFull)> for ProtoArrayForkChoice { - fn from((from, count_unrealized_full): (SszContainer, CountUnrealizedFull)) -> Self { +impl TryFrom<(SszContainer, CountUnrealizedFull)> for ProtoArrayForkChoice { + type Error = Error; + + fn try_from( + (from, count_unrealized_full): (SszContainer, CountUnrealizedFull), + ) -> Result { let proto_array = ProtoArray { prune_threshold: from.prune_threshold, justified_checkpoint: from.justified_checkpoint, @@ -53,10 +59,10 @@ impl From<(SszContainer, CountUnrealizedFull)> for ProtoArrayForkChoice { count_unrealized_full, }; - Self { + Ok(Self { proto_array, votes: ElasticList(from.votes), - balances: from.balances, - } + balances: JustifiedBalances::from_effective_balances(from.balances)?, + }) } } diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 8d4fdfb1836..679203d9e7d 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -624,6 +624,14 @@ fn main() { .takes_value(true) .help("The genesis time when generating a genesis state."), ) + .arg( + Arg::with_name("proposer-score-boost") + .long("proposer-score-boost") + .value_name("INTEGER") + .takes_value(true) + .help("The proposer score boost to apply as a percentage, e.g. 70 = 70%"), + ) + ) .subcommand( SubCommand::with_name("check-deposit-data") diff --git a/lcli/src/new_testnet.rs b/lcli/src/new_testnet.rs index 415ac9bb6f6..d6e093c1733 100644 --- a/lcli/src/new_testnet.rs +++ b/lcli/src/new_testnet.rs @@ -67,6 +67,10 @@ pub fn run(testnet_dir_path: PathBuf, matches: &ArgMatches) -> Resul spec.genesis_fork_version = v; } + if let Some(proposer_score_boost) = parse_optional(matches, "proposer-score-boost")? { + spec.proposer_score_boost = Some(proposer_score_boost); + } + if let Some(fork_epoch) = parse_optional(matches, "altair-fork-epoch")? { spec.altair_fork_epoch = Some(fork_epoch); } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index d39235cb136..07c583da5cb 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1,6 +1,9 @@ use beacon_node::{beacon_chain::CountUnrealizedFull, ClientConfig as Config}; use crate::exec::{CommandLineTestExec, CompletedTest}; +use beacon_node::beacon_chain::chain_config::{ + DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_RE_ORG_THRESHOLD, +}; use eth1::Eth1Endpoint; use lighthouse_network::PeerId; use std::fs::File; @@ -10,6 +13,7 @@ use std::path::PathBuf; use std::process::Command; use std::str::FromStr; use std::string::ToString; +use std::time::Duration; use tempfile::TempDir; use types::{Address, Checkpoint, Epoch, ExecutionBlockHash, ForkName, Hash256, MainnetEthSpec}; use unused_port::{unused_tcp_port, unused_udp_port}; @@ -153,6 +157,31 @@ fn checkpoint_sync_url_timeout_default() { }); } +#[test] +fn prepare_payload_lookahead_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.chain.prepare_payload_lookahead, + Duration::from_secs(4), + ) + }); +} + +#[test] +fn prepare_payload_lookahead_shorter() { + CommandLineTest::new() + .flag("prepare-payload-lookahead", Some("1500")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.chain.prepare_payload_lookahead, + Duration::from_millis(1500) + ) + }); +} + #[test] fn paranoid_block_proposal_default() { CommandLineTest::new() @@ -1500,6 +1529,49 @@ fn ensure_panic_on_failed_launch() { }); } +#[test] +fn enable_proposer_re_orgs_default() { + CommandLineTest::new().run().with_config(|config| { + assert_eq!( + config.chain.re_org_threshold, + Some(DEFAULT_RE_ORG_THRESHOLD) + ); + assert_eq!( + config.chain.re_org_max_epochs_since_finalization, + DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, + ); + }); +} + +#[test] +fn disable_proposer_re_orgs() { + CommandLineTest::new() + .flag("disable-proposer-reorgs", None) + .run() + .with_config(|config| assert_eq!(config.chain.re_org_threshold, None)); +} + +#[test] +fn proposer_re_org_threshold() { + CommandLineTest::new() + .flag("proposer-reorg-threshold", Some("90")) + .run() + .with_config(|config| assert_eq!(config.chain.re_org_threshold.unwrap().0, 90)); +} + +#[test] +fn proposer_re_org_max_epochs_since_finalization() { + CommandLineTest::new() + .flag("proposer-reorg-epochs-since-finalization", Some("8")) + .run() + .with_config(|config| { + assert_eq!( + config.chain.re_org_max_epochs_since_finalization.as_u64(), + 8 + ) + }); +} + #[test] fn monitoring_endpoint() { CommandLineTest::new() diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index 6608b7ca64f..f0ed4f737d4 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -389,6 +389,24 @@ fn no_doppelganger_protection_flag() { .with_config(|config| assert!(!config.enable_doppelganger_protection)); } #[test] +fn block_delay_ms() { + CommandLineTest::new() + .flag("block-delay-ms", Some("2000")) + .run() + .with_config(|config| { + assert_eq!( + config.block_delay, + Some(std::time::Duration::from_millis(2000)) + ) + }); +} +#[test] +fn no_block_delay_ms() { + CommandLineTest::new() + .run() + .with_config(|config| assert_eq!(config.block_delay, None)); +} +#[test] fn no_gas_limit_flag() { CommandLineTest::new() .run() diff --git a/scripts/local_testnet/vars.env b/scripts/local_testnet/vars.env index cde790dffc9..dbfc41e91f5 100644 --- a/scripts/local_testnet/vars.env +++ b/scripts/local_testnet/vars.env @@ -53,5 +53,8 @@ SECONDS_PER_SLOT=3 # Seconds per Eth1 block SECONDS_PER_ETH1_BLOCK=1 +# Proposer score boost percentage +PROPOSER_SCORE_BOOST=70 + # Command line arguments for validator client VC_ARGS="" diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index ef2e66676a5..c82a1a9d362 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -308,5 +308,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { by this validator client. Note this will not necessarily be used if the gas limit \ set here moves too far from the previous block's gas limit. [default: 30,000,000]") .requires("builder-proposals"), - ) + ) + /* + * Experimental/development options. + */ + .arg( + Arg::with_name("block-delay-ms") + .long("block-delay-ms") + .value_name("MILLIS") + .hidden(true) + .help("Time to delay block production from the start of the slot. Should only be \ + used for testing.") + .takes_value(true), + ) } diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 277a4bd8ded..22741dabbd7 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -13,6 +13,7 @@ use slog::{info, warn, Logger}; use std::fs; use std::net::IpAddr; use std::path::PathBuf; +use std::time::Duration; use types::{Address, GRAFFITI_BYTES_LEN}; pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/"; @@ -61,6 +62,10 @@ pub struct Config { /// A list of custom certificates that the validator client will additionally use when /// connecting to a beacon node over SSL/TLS. pub beacon_nodes_tls_certs: Option>, + /// Delay from the start of the slot to wait before publishing a block. + /// + /// This is *not* recommended in prod and should only be used for testing. + pub block_delay: Option, /// Disables publishing http api requests to all beacon nodes for select api calls. pub disable_run_on_all: bool, } @@ -95,6 +100,7 @@ impl Default for Config { monitoring_api: None, enable_doppelganger_protection: false, beacon_nodes_tls_certs: None, + block_delay: None, builder_proposals: false, builder_registration_timestamp_override: None, gas_limit: None, @@ -341,6 +347,13 @@ impl Config { ); } + /* + * Experimental + */ + if let Some(delay_ms) = parse_optional::(cli_args, "block-delay-ms")? { + config.block_delay = Some(Duration::from_millis(delay_ms)); + } + Ok(config) } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 819efec93c9..4db9804054a 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -427,6 +427,7 @@ impl ProductionValidatorClient { .runtime_context(context.service_context("block".into())) .graffiti(config.graffiti) .graffiti_file(config.graffiti_file.clone()) + .block_delay(config.block_delay) .build()?; let attestation_service = AttestationServiceBuilder::new()