Skip to content

Commit

Permalink
[TASK] Expose More Metrics from the DKG System for benchmarking (#488)
Browse files Browse the repository at this point in the history
* add more dkg metrics

* fix typo

* fix clippy error
  • Loading branch information
1xstj authored Feb 6, 2023
1 parent 3496e99 commit 7392a32
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 5 deletions.
7 changes: 7 additions & 0 deletions dkg-gadget/src/async_protocols/blockchain_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
async_protocols::BatchKey,
gossip_engine::GossipEngineIface,
gossip_messages::{dkg_message::sign_and_send_messages, public_key_gossip::gossip_public_key},
metrics::Metrics,
proposal::get_signed_proposal,
storage::proposals::save_signed_proposals_in_storage,
worker::{DKGWorker, HasLatestHeader, KeystoreExt},
Expand Down Expand Up @@ -88,6 +89,7 @@ pub struct DKGProtocolEngine<B: Block, BE, C, GE> {
pub is_genesis: bool,
pub current_validator_set: Arc<RwLock<AuthoritySet<Public>>>,
pub local_keystore: Arc<RwLock<Option<Arc<LocalKeystore>>>>,
pub metrics: Arc<Option<Metrics>>,
pub _pd: PhantomData<BE>,
}

Expand Down Expand Up @@ -172,6 +174,11 @@ where
dkg_logging::info!(target: "dkg", "All proposals have resolved for batch {:?}", batch_key);
let proposals = lock.remove(&batch_key).unwrap(); // safe unwrap since lock is held
std::mem::drop(lock);

if let Some(metrics) = self.metrics.as_ref() {
metrics.dkg_signed_proposal_counter.inc_by(proposals.len() as u64);
}

save_signed_proposals_in_storage::<B, C, BE>(
&self.get_authority_public_key(),
&self.current_validator_set,
Expand Down
11 changes: 11 additions & 0 deletions dkg-gadget/src/gossip_engine/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,11 @@ impl<B: Block + 'static> GossipHandler<B> {
// Check behavior of the peer.
let now = self.get_latest_block_number();
debug!(target: "dkg", "{:?} session {:?} | Received a signed DKG messages from {} @ block {:?}, ", message.msg.status, message.msg.session_id, who, now);

if let Some(metrics) = self.metrics.as_ref() {
metrics.dkg_signed_messages.inc();
}

if let Some(ref mut peer) = self.peers.write().get_mut(&who) {
peer.known_messages.insert(message.message_hash::<B>());
let mut pending_messages_peers = self.pending_messages_peers.write();
Expand All @@ -572,6 +577,9 @@ impl<B: Block + 'static> GossipHandler<B> {
match pending_messages_peers.entry(message.message_hash::<B>()) {
Entry::Vacant(entry) => {
dkg_logging::debug!(target: "dkg_gadget::gossip_engine::network", "NEW DKG MESSAGE FROM {}", who);
if let Some(metrics) = self.metrics.as_ref() {
metrics.dkg_new_signed_messages.inc();
}
enqueue_the_message();
entry.insert(HashSet::from([who]));
// This good, this peer is good, they sent us a message we didn't know about.
Expand All @@ -580,6 +588,9 @@ impl<B: Block + 'static> GossipHandler<B> {
},
Entry::Occupied(mut entry) => {
dkg_logging::debug!(target: "dkg_gadget::gossip_engine::network", "OLD DKG MESSAGE FROM {}", who);
if let Some(metrics) = self.metrics.as_ref() {
metrics.dkg_old_signed_messages.inc();
}
// if we are here, that means this peer sent us a message we already know.
let inserted = entry.get_mut().insert(who);
// and if inserted is `false` that means this peer was already in the set
Expand Down
75 changes: 74 additions & 1 deletion dkg-gadget/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use prometheus::{register, Counter, Gauge, PrometheusError, Registry, U64};

/// DKG metrics exposed through Prometheus
#[derive(Clone)]
pub(crate) struct Metrics {
pub struct Metrics {
/// Count to total propogated messages
pub dkg_propagated_messages: Counter<U64>,
/// Current active validator set id
Expand All @@ -34,6 +34,22 @@ pub(crate) struct Metrics {
pub dkg_latest_block_height: Gauge<U64>,
/// The signing sets for dkg worker
pub dkg_signing_sets: Gauge<U64>,
/// The total number of unsigned proposals seen
pub dkg_unsigned_proposal_counter: Counter<U64>,
/// The total number of signed proposals
pub dkg_signed_proposal_counter: Counter<U64>,
/// The total number of signed DKG messages seen
pub dkg_signed_messages: Counter<U64>,
/// The total number of signed DKG messages seen marked as old
pub dkg_old_signed_messages: Counter<U64>,
/// The total number of signed DKG messages seen marked as new
pub dkg_new_signed_messages: Counter<U64>,
/// The total number of keygen misbehaviour errors seen
pub dkg_keygen_misbehaviour_error: Counter<U64>,
/// The total number of keygen timeout errors seen
pub dkg_keygen_timeout_error: Counter<U64>,
/// The total number of sign misbehaviour errors seen
pub dkg_sign_misbehaviour_error: Counter<U64>,
}

impl Metrics {
Expand Down Expand Up @@ -77,8 +93,65 @@ impl Metrics {
Gauge::new("dkg_signing_sets", "The number of signing sets created")?,
registry,
)?,
dkg_unsigned_proposal_counter: register(
Counter::new("dkg_unsigned_proposal_counter", "Number of Unsigned proposals seen")?,
registry,
)?,
dkg_signed_proposal_counter: register(
Counter::new("dkg_signed_proposal_counter", "Number of signed proposals")?,
registry,
)?,
dkg_signed_messages: register(
Counter::new("dkg_signed_messages", "Number of signed DKG messages received")?,
registry,
)?,
dkg_old_signed_messages: register(
Counter::new(
"dkg_old_signed_messages",
"Number of OLD signed DKG messages received",
)?,
registry,
)?,
dkg_new_signed_messages: register(
Counter::new(
"dkg_new_signed_messages",
"Number of NEW signed DKG messages received",
)?,
registry,
)?,
dkg_keygen_misbehaviour_error: register(
Counter::new(
"dkg_keygen_misbehaviour_error",
"Number of KeygenMisbehaviour reports",
)?,
registry,
)?,
dkg_keygen_timeout_error: register(
Counter::new("dkg_keygen_timeout_error", "Number of Keygentimeout reports")?,
registry,
)?,
dkg_sign_misbehaviour_error: register(
Counter::new("dkg_sign_misbehaviour_error", "Number of SignMisbehaviour reports")?,
registry,
)?,
})
}

pub(crate) fn reset_session_metrics(&self) {
// reset all counters that have to be reset per session
self.dkg_propagated_messages.reset();
self.dkg_inbound_messages.reset();
self.dkg_error_counter.reset();
self.dkg_keygen_retry_counter.reset();
self.dkg_unsigned_proposal_counter.reset();
self.dkg_signed_proposal_counter.reset();
self.dkg_signed_messages.reset();
self.dkg_old_signed_messages.reset();
self.dkg_new_signed_messages.reset();
self.dkg_keygen_misbehaviour_error.reset();
self.dkg_keygen_timeout_error.reset();
self.dkg_sign_misbehaviour_error.reset();
}
}

// Note: we use the `format` macro to convert an expr into a `u64`. This will fail,
Expand Down
23 changes: 19 additions & 4 deletions dkg-gadget/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ where
local_keystore: self.local_keystore.clone(),
vote_results: Arc::new(Default::default()),
is_genesis: stage == ProtoStageType::Genesis,
metrics: self.metrics.clone(),
_pd: Default::default(),
}),
session_id,
Expand Down Expand Up @@ -1011,6 +1012,10 @@ where
}
*v = None;
});
// Reset per session metrics
if let Some(metrics) = self.metrics.as_ref() {
metrics.reset_session_metrics();
}
}
}

Expand Down Expand Up @@ -1094,11 +1099,19 @@ where
self.best_authorities.read().iter().map(|x| x.1.clone()).collect();

let (bad_actors, session_id) = match dkg_error {
DKGError::KeygenMisbehaviour { ref bad_actors, .. } => (bad_actors.clone(), 0),
DKGError::KeygenTimeout { ref bad_actors, session_id, .. } =>
(bad_actors.clone(), session_id),
DKGError::KeygenMisbehaviour { ref bad_actors, .. } => {
metric_inc!(self, dkg_keygen_misbehaviour_error);
(bad_actors.clone(), 0)
},
DKGError::KeygenTimeout { ref bad_actors, session_id, .. } => {
metric_inc!(self, dkg_keygen_timeout_error);
(bad_actors.clone(), session_id)
},
// Todo: Handle Signing Timeout as a separate case
DKGError::SignMisbehaviour { ref bad_actors, .. } => (bad_actors.clone(), 0),
DKGError::SignMisbehaviour { ref bad_actors, .. } => {
metric_inc!(self, dkg_sign_misbehaviour_error);
(bad_actors.clone(), 0)
},
_ => Default::default(),
};

Expand Down Expand Up @@ -1331,6 +1344,8 @@ where
for proposal in res {
if let Some(hash) = proposal.hash() {
if !self.currently_signing_proposals.read().contains(&hash) {
// update unsigned proposal counter
metric_inc!(self, dkg_unsigned_proposal_counter);
filtered_unsigned_proposals.push(proposal);
}
}
Expand Down

0 comments on commit 7392a32

Please sign in to comment.