From 9e47beda06d51d7a636426b9fadef43ada6f6f18 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 6 Apr 2023 15:04:46 +1000 Subject: [PATCH 1/7] Reduce bandwidth over the VC<>BN API using dependant roots --- validator_client/src/duties_service.rs | 212 +++++++++++++++++-------- 1 file changed, 143 insertions(+), 69 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index c335c67ab16..2aed95705b8 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -16,12 +16,15 @@ use crate::{ validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}, }; use environment::RuntimeContext; -use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; +use eth2::types::{ + AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId, +}; use futures::{stream, StreamExt}; use parking_lot::RwLock; use safe_arith::ArithError; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; +use std::cmp::min; use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -54,6 +57,11 @@ const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; /// flag in the cli to enable collection of per validator metrics. const VALIDATOR_METRICS_MIN_COUNT: usize = 64; +/// The number of validators to request duty information for in the initial request. +/// The initial request is used to determine if further requests are required, so that it +/// reduces the amount of data that needs to be transferred. +const INITIAL_DUTIES_QUERY_SIZE: usize = 1; + #[derive(Debug)] pub enum Error { UnableToReadSlotClock, @@ -674,84 +682,77 @@ async fn poll_beacon_attesters_for_epoch( &[metrics::UPDATE_ATTESTERS_FETCH], ); - let response = duties_service - .beacon_nodes - .first_success( - duties_service.require_synced, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::ATTESTER_DUTIES_HTTP_POST], - ); - beacon_node - .post_validator_duties_attester(epoch, local_indices) - .await - }, - ) - .await - .map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?; - - drop(fetch_timer); - let _store_timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::UPDATE_ATTESTERS_STORE], - ); + // Request duties for either `INITIAL_DUTIES_QUERY_SIZE` validators or the count of validators for which we + // don't already know their duties for that epoch, whichever subset is larger. We use the `dependent_root` + // in the response to determine whether validator duties need to be updated. This is to ensure that we don't + // request for extra data unless necessary in order to save on network bandwidth. + let uninitialized_validators = + get_uninitialized_validators(duties_service, &epoch, local_pubkeys); + let indices_to_request = if uninitialized_validators.len() > INITIAL_DUTIES_QUERY_SIZE { + uninitialized_validators.as_slice() + } else { + &local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())] + }; + let response = + post_validator_duties_attester(duties_service, epoch, indices_to_request).await?; let dependent_root = response.dependent_root; - // Filter any duties that are not relevant or already known. - let new_duties = { + // Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch. + let validators_to_update: Vec<_> = { // Avoid holding the read-lock for any longer than required. let attesters = duties_service.attesters.read(); - response + // Only request duties if + // - There were no known duties for this epoch. + // - The dependent root has changed, signalling a re-org. + local_pubkeys + .iter() + .filter(|pubkey| { + attesters.get(pubkey).map_or(true, |duties| { + duties + .get(&epoch) + .map_or(true, |(prior, _)| *prior != dependent_root) + }) + }) + .collect::>() + }; + + if validators_to_update.is_empty() { + // No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch. + drop(fetch_timer); + return Ok(()); + } + + // Filter out validators which have already been requested. + let attesters_in_response: Vec<_> = response.data.iter().map(|duty| duty.pubkey).collect(); + let indices_to_request = validators_to_update + .iter() + .filter(|pubkey| !attesters_in_response.contains(pubkey)) + .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) + .collect::>(); + + let new_duties = if !indices_to_request.is_empty() { + post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice()) + .await? .data .into_iter() - .filter(|duty| { - if duties_service.per_validator_metrics() { - let validator_index = duty.validator_index; - let duty_slot = duty.slot; - if let Some(existing_slot_gauge) = - get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) - { - let existing_slot = Slot::new(existing_slot_gauge.get() as u64); - let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); - - // First condition ensures that we switch to the next epoch duty slot - // once the current epoch duty slot passes. - // Second condition is to ensure that next epoch duties don't override - // current epoch duties. - if existing_slot < current_slot - || (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch - && duty_slot > current_slot - && duty_slot != existing_slot) - { - existing_slot_gauge.set(duty_slot.as_u64() as i64); - } - } else { - set_int_gauge( - &ATTESTATION_DUTY, - &[&validator_index.to_string()], - duty_slot.as_u64() as i64, - ); - } - } - - local_pubkeys.contains(&duty.pubkey) && { - // Only update the duties if either is true: - // - // - There were no known duties for this epoch. - // - The dependent root has changed, signalling a re-org. - attesters.get(&duty.pubkey).map_or(true, |duties| { - duties - .get(&epoch) - .map_or(true, |(prior, _)| *prior != dependent_root) - }) - } - }) + .chain(response.data) .collect::>() + } else { + response.data }; + drop(fetch_timer); + + let _store_timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::UPDATE_ATTESTERS_STORE], + ); + + if duties_service.per_validator_metrics() { + update_per_validator_duty_metrics::(current_slot, &new_duties); + } + debug!( log, "Downloaded attester duties"; @@ -799,6 +800,79 @@ async fn poll_beacon_attesters_for_epoch( Ok(()) } +/// Get a filtered list of local validators for which we don't already know their duties for that epoch +fn get_uninitialized_validators( + duties_service: &Arc>, + epoch: &Epoch, + local_pubkeys: &HashSet, +) -> Vec { + let attesters = duties_service.attesters.read(); + local_pubkeys + .iter() + .filter(|pubkey| { + attesters + .get(pubkey) + .map_or(true, |duties| !duties.contains_key(epoch)) + }) + .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) + .collect::>() +} + +fn update_per_validator_duty_metrics(current_slot: Slot, new_duties: &[AttesterData]) { + new_duties.iter().for_each(|duty| { + let validator_index = duty.validator_index; + let duty_slot = duty.slot; + if let Some(existing_slot_gauge) = + get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) + { + let existing_slot = Slot::new(existing_slot_gauge.get() as u64); + let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); + + // First condition ensures that we switch to the next epoch duty slot + // once the current epoch duty slot passes. + // Second condition is to ensure that next epoch duties don't override + // current epoch duties. + if existing_slot < current_slot + || (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch + && duty_slot > current_slot + && duty_slot != existing_slot) + { + existing_slot_gauge.set(duty_slot.as_u64() as i64); + } + } else { + set_int_gauge( + &ATTESTATION_DUTY, + &[&validator_index.to_string()], + duty_slot.as_u64() as i64, + ); + } + }); +} + +async fn post_validator_duties_attester( + duties_service: &Arc>, + epoch: Epoch, + validator_indices: &[u64], +) -> Result>, Error> { + duties_service + .beacon_nodes + .first_success( + duties_service.require_synced, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::ATTESTER_DUTIES_HTTP_POST], + ); + beacon_node + .post_validator_duties_attester(epoch, validator_indices) + .await + }, + ) + .await + .map_err(|e| Error::FailedToDownloadAttesters(e.to_string())) +} + /// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. /// /// Duties are computed in batches each slot. If a re-org is detected then the process will From d905e8b97a95fdd5ff0eb6cbb7cb625d07291fb8 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 6 Apr 2023 15:24:09 +1000 Subject: [PATCH 2/7] Remove extra comments --- validator_client/src/duties_service.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 2aed95705b8..fa78b79d30d 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -702,9 +702,6 @@ async fn poll_beacon_attesters_for_epoch( let validators_to_update: Vec<_> = { // Avoid holding the read-lock for any longer than required. let attesters = duties_service.attesters.read(); - // Only request duties if - // - There were no known duties for this epoch. - // - The dependent root has changed, signalling a re-org. local_pubkeys .iter() .filter(|pubkey| { From 9cc8783ea845f522b102a495819eee89bf974b63 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 6 Apr 2023 15:26:35 +1000 Subject: [PATCH 3/7] Update attester duties request indices condition --- validator_client/src/duties_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index fa78b79d30d..78f5eff22b4 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -688,7 +688,7 @@ async fn poll_beacon_attesters_for_epoch( // request for extra data unless necessary in order to save on network bandwidth. let uninitialized_validators = get_uninitialized_validators(duties_service, &epoch, local_pubkeys); - let indices_to_request = if uninitialized_validators.len() > INITIAL_DUTIES_QUERY_SIZE { + let indices_to_request = if uninitialized_validators.len() >= INITIAL_DUTIES_QUERY_SIZE { uninitialized_validators.as_slice() } else { &local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())] From c120108540694f824358528db28aaa075b814f74 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 9 May 2023 16:08:33 +1000 Subject: [PATCH 4/7] Optimize filtering by avoiding extra heap allocation, and apply other suggestions Co-authored-by: Paul Hauner --- validator_client/src/duties_service.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 78f5eff22b4..fa5a558ad41 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -716,15 +716,14 @@ async fn poll_beacon_attesters_for_epoch( if validators_to_update.is_empty() { // No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch. - drop(fetch_timer); return Ok(()); } // Filter out validators which have already been requested. - let attesters_in_response: Vec<_> = response.data.iter().map(|duty| duty.pubkey).collect(); + let initial_duties = &response.data; let indices_to_request = validators_to_update .iter() - .filter(|pubkey| !attesters_in_response.contains(pubkey)) + .filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey)) .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) .collect::>(); From 7d6c0424c32eddf3424edf535a91a8e087864256 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 9 May 2023 17:46:12 +1000 Subject: [PATCH 5/7] Update metrics for all duties rather than just the new ones. --- validator_client/src/duties_service.rs | 66 +++++++++++++++----------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index fa5a558ad41..1246770ac67 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -745,10 +745,6 @@ async fn poll_beacon_attesters_for_epoch( &[metrics::UPDATE_ATTESTERS_STORE], ); - if duties_service.per_validator_metrics() { - update_per_validator_duty_metrics::(current_slot, &new_duties); - } - debug!( log, "Downloaded attester duties"; @@ -784,6 +780,10 @@ async fn poll_beacon_attesters_for_epoch( } drop(attesters); + if duties_service.per_validator_metrics() { + update_per_validator_duty_metrics::(duties_service, epoch, current_slot); + } + // Spawn the background task to compute selection proofs. let subservice = duties_service.clone(); duties_service.context.executor.spawn( @@ -814,33 +814,41 @@ fn get_uninitialized_validators( .collect::>() } -fn update_per_validator_duty_metrics(current_slot: Slot, new_duties: &[AttesterData]) { - new_duties.iter().for_each(|duty| { - let validator_index = duty.validator_index; - let duty_slot = duty.slot; - if let Some(existing_slot_gauge) = - get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) - { - let existing_slot = Slot::new(existing_slot_gauge.get() as u64); - let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); - - // First condition ensures that we switch to the next epoch duty slot - // once the current epoch duty slot passes. - // Second condition is to ensure that next epoch duties don't override - // current epoch duties. - if existing_slot < current_slot - || (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch - && duty_slot > current_slot - && duty_slot != existing_slot) +fn update_per_validator_duty_metrics( + duties_service: &Arc>, + epoch: Epoch, + current_slot: Slot, +) { + let attesters = duties_service.attesters.read(); + attesters.values().for_each(|attester_duties_by_epoch| { + if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) { + let duty = &duty_and_proof.duty; + let validator_index = duty.validator_index; + let duty_slot = duty.slot; + if let Some(existing_slot_gauge) = + get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) { - existing_slot_gauge.set(duty_slot.as_u64() as i64); + let existing_slot = Slot::new(existing_slot_gauge.get() as u64); + let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); + + // First condition ensures that we switch to the next epoch duty slot + // once the current epoch duty slot passes. + // Second condition is to ensure that next epoch duties don't override + // current epoch duties. + if existing_slot < current_slot + || (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch + && duty_slot > current_slot + && duty_slot != existing_slot) + { + existing_slot_gauge.set(duty_slot.as_u64() as i64); + } + } else { + set_int_gauge( + &ATTESTATION_DUTY, + &[&validator_index.to_string()], + duty_slot.as_u64() as i64, + ); } - } else { - set_int_gauge( - &ATTESTATION_DUTY, - &[&validator_index.to_string()], - duty_slot.as_u64() as i64, - ); } }); } From cfaab2dcb93fe9790bf448bc0f253b51eebdd457 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 9 May 2023 17:48:38 +1000 Subject: [PATCH 6/7] Ensure that we always query for all uninitialized validators. --- validator_client/src/duties_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 1246770ac67..ebf75bda71c 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -688,7 +688,7 @@ async fn poll_beacon_attesters_for_epoch( // request for extra data unless necessary in order to save on network bandwidth. let uninitialized_validators = get_uninitialized_validators(duties_service, &epoch, local_pubkeys); - let indices_to_request = if uninitialized_validators.len() >= INITIAL_DUTIES_QUERY_SIZE { + let indices_to_request = if !uninitialized_validators.is_empty() { uninitialized_validators.as_slice() } else { &local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())] From ab2245817153c7df5f2a2c4594277df548daf6b4 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 15 May 2023 11:17:18 +1000 Subject: [PATCH 7/7] Ensure duty metrics are always updated and update code comments --- validator_client/src/duties_service.rs | 87 ++++++++++++-------------- 1 file changed, 41 insertions(+), 46 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index ebf75bda71c..3cab6e7821c 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -539,7 +539,6 @@ async fn poll_beacon_attesters( current_epoch, &local_indices, &local_pubkeys, - current_slot, ) .await { @@ -552,6 +551,8 @@ async fn poll_beacon_attesters( ) } + update_per_validator_duty_metrics::(duties_service, current_epoch, current_slot); + drop(current_epoch_timer); let next_epoch_timer = metrics::start_timer_vec( &metrics::DUTIES_SERVICE_TIMES, @@ -559,14 +560,9 @@ async fn poll_beacon_attesters( ); // Download the duties and update the duties for the next epoch. - if let Err(e) = poll_beacon_attesters_for_epoch( - duties_service, - next_epoch, - &local_indices, - &local_pubkeys, - current_slot, - ) - .await + if let Err(e) = + poll_beacon_attesters_for_epoch(duties_service, next_epoch, &local_indices, &local_pubkeys) + .await { error!( log, @@ -577,6 +573,8 @@ async fn poll_beacon_attesters( ) } + update_per_validator_duty_metrics::(duties_service, next_epoch, current_slot); + drop(next_epoch_timer); let subscriptions_timer = metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]); @@ -663,7 +661,6 @@ async fn poll_beacon_attesters_for_epoch( epoch: Epoch, local_indices: &[u64], local_pubkeys: &HashSet, - current_slot: Slot, ) -> Result<(), Error> { let log = duties_service.context.log(); @@ -682,9 +679,9 @@ async fn poll_beacon_attesters_for_epoch( &[metrics::UPDATE_ATTESTERS_FETCH], ); - // Request duties for either `INITIAL_DUTIES_QUERY_SIZE` validators or the count of validators for which we - // don't already know their duties for that epoch, whichever subset is larger. We use the `dependent_root` - // in the response to determine whether validator duties need to be updated. This is to ensure that we don't + // Request duties for all uninitialized validators. If there isn't any, we will just request for + // `INITIAL_DUTIES_QUERY_SIZE` validators. We use the `dependent_root` in the response to + // determine whether validator duties need to be updated. This is to ensure that we don't // request for extra data unless necessary in order to save on network bandwidth. let uninitialized_validators = get_uninitialized_validators(duties_service, &epoch, local_pubkeys); @@ -780,10 +777,6 @@ async fn poll_beacon_attesters_for_epoch( } drop(attesters); - if duties_service.per_validator_metrics() { - update_per_validator_duty_metrics::(duties_service, epoch, current_slot); - } - // Spawn the background task to compute selection proofs. let subservice = duties_service.clone(); duties_service.context.executor.spawn( @@ -819,38 +812,40 @@ fn update_per_validator_duty_metrics( epoch: Epoch, current_slot: Slot, ) { - let attesters = duties_service.attesters.read(); - attesters.values().for_each(|attester_duties_by_epoch| { - if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) { - let duty = &duty_and_proof.duty; - let validator_index = duty.validator_index; - let duty_slot = duty.slot; - if let Some(existing_slot_gauge) = - get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) - { - let existing_slot = Slot::new(existing_slot_gauge.get() as u64); - let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); - - // First condition ensures that we switch to the next epoch duty slot - // once the current epoch duty slot passes. - // Second condition is to ensure that next epoch duties don't override - // current epoch duties. - if existing_slot < current_slot - || (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch - && duty_slot > current_slot - && duty_slot != existing_slot) + if duties_service.per_validator_metrics() { + let attesters = duties_service.attesters.read(); + attesters.values().for_each(|attester_duties_by_epoch| { + if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) { + let duty = &duty_and_proof.duty; + let validator_index = duty.validator_index; + let duty_slot = duty.slot; + if let Some(existing_slot_gauge) = + get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) { - existing_slot_gauge.set(duty_slot.as_u64() as i64); + let existing_slot = Slot::new(existing_slot_gauge.get() as u64); + let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); + + // First condition ensures that we switch to the next epoch duty slot + // once the current epoch duty slot passes. + // Second condition is to ensure that next epoch duties don't override + // current epoch duties. + if existing_slot < current_slot + || (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch + && duty_slot > current_slot + && duty_slot != existing_slot) + { + existing_slot_gauge.set(duty_slot.as_u64() as i64); + } + } else { + set_int_gauge( + &ATTESTATION_DUTY, + &[&validator_index.to_string()], + duty_slot.as_u64() as i64, + ); } - } else { - set_int_gauge( - &ATTESTATION_DUTY, - &[&validator_index.to_string()], - duty_slot.as_u64() as i64, - ); } - } - }); + }); + } } async fn post_validator_duties_attester(