Skip to content

Commit

Permalink
Metrics changes for code review
Browse files Browse the repository at this point in the history
  • Loading branch information
snorochevskiy committed Dec 8, 2024
1 parent 8eba2fb commit 955a3bb
Show file tree
Hide file tree
Showing 3 changed files with 353 additions and 318 deletions.
96 changes: 36 additions & 60 deletions grpc/src/consistencyapi_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use interface::checksums_storage::{
BbgmChangeRecord, BbgmChecksumServiceApi, BbgmEpochCksm, BbgmGrandEpochCksm,
BbgmGrandEpochCksmWithNumber,
};
use metrics_utils::{
AccoountConsistencyGrpcClientMetricsConfig, BubblegumConsistencyGrpcClientMetricsConfig,
};
use metrics_utils::{AccoountConsistencyGrpcClientMetrics, BubblegumConsistencyGrpcClientMetrics};
use solana_sdk::pubkey::Pubkey;
use tokio::time::Instant;
use tonic::async_trait;
Expand Down Expand Up @@ -399,21 +397,19 @@ impl Into<BbgmChangePos> for BbgmChangePosition {

pub struct BbgmConsistencyApiClientImpl {
client: tokio::sync::Mutex<BbgmConsistencyServiceClient<Channel>>,
peer: String,
metrics: Option<Arc<BubblegumConsistencyGrpcClientMetricsConfig>>,
metrics: Option<Arc<dyn BubblegumConsistencyGrpcClientMetrics + Send + Sync>>,
}

impl BbgmConsistencyApiClientImpl {
pub async fn new(
peer: &str,
metrics: Option<Arc<BubblegumConsistencyGrpcClientMetricsConfig>>,
metrics: Option<Arc<dyn BubblegumConsistencyGrpcClientMetrics + Send + Sync>>,
) -> Result<BbgmConsistencyApiClientImpl, GrpcError> {
let url = Uri::from_str(peer).map_err(|e| GrpcError::UriCreate(e.to_string()))?;
let channel = Channel::builder(url).connect().await?;

Ok(BbgmConsistencyApiClientImpl {
client: tokio::sync::Mutex::new(BbgmConsistencyServiceClient::new(channel)),
peer: peer.to_string(),
metrics,
})
}
Expand All @@ -439,16 +435,16 @@ impl BbgmChecksumServiceApi for BbgmConsistencyApiClientImpl {
let mut client = self.client.lock().await;
let start = Instant::now();
let call_result = client.get_bbgm_grand_epochs_for_tree(grpc_request).await;
if call_result.is_err() && self.metrics.is_some() {
self.metrics
.as_ref()
.unwrap()
.track_get_grand_epochs_for_tree_call_error(&self.peer);

if let Some(m) = self.metrics.as_ref() {
m.track_get_grand_epochs_for_tree_call(call_result.is_ok());
}

let grpc_response = call_result?;
if let Some(m) = self.metrics.as_ref() {
m.peers_bubblegum_get_grand_epochs_for_tree_latency
.observe(start.elapsed().as_secs_f64());
m.track_peers_bubblegum_get_grand_epochs_for_tree_latency(
start.elapsed().as_secs_f64(),
);
}
let list = grpc_response.into_inner().list;
let mut result = Vec::with_capacity(list.len());
Expand Down Expand Up @@ -484,16 +480,13 @@ impl BbgmChecksumServiceApi for BbgmConsistencyApiClientImpl {

let start = Instant::now();
let call_result = client.get_bbgm_grand_epoch_checksums(grpc_request).await;
if call_result.is_err() && self.metrics.is_some() {
self.metrics
.as_ref()
.unwrap()
.track_get_grand_epochs_call_error(&self.peer);
if let Some(m) = self.metrics.as_ref() {
m.track_get_grand_epochs_call(call_result.is_ok());
}

let grpc_response = call_result?;
if let Some(m) = self.metrics.as_ref() {
m.peers_bubblegum_get_grand_epochs_latency
.observe(start.elapsed().as_secs_f64());
m.track_peers_bubblegum_get_grand_epochs_latency(start.elapsed().as_secs_f64());
}

let list = grpc_response.into_inner().list;
Expand Down Expand Up @@ -533,16 +526,13 @@ impl BbgmChecksumServiceApi for BbgmConsistencyApiClientImpl {
let call_result = client
.get_bbgm_epoch_checksums_in_grand_epoch(grpc_request)
.await;
if call_result.is_err() && self.metrics.is_some() {
self.metrics
.as_ref()
.unwrap()
.track_get_epochs_call_error(&self.peer);
if let Some(m) = self.metrics.as_ref() {
m.track_get_epochs_call(call_result.is_ok());
}

let grpc_response = call_result?;
if let Some(m) = self.metrics.as_ref() {
m.peers_bubblegum_get_epochs_latency
.observe(start.elapsed().as_secs_f64());
m.track_peers_bubblegum_get_epochs_latency(start.elapsed().as_secs_f64());
}

let list = grpc_response.into_inner().list;
Expand Down Expand Up @@ -585,16 +575,13 @@ impl BbgmChecksumServiceApi for BbgmConsistencyApiClientImpl {

let start = Instant::now();
let call_result = client.get_bbgm_changes_in_epoch(grpc_request).await;
if call_result.is_err() && self.metrics.is_some() {
self.metrics
.as_ref()
.unwrap()
.track_get_changes_call_error(&self.peer);
if let Some(m) = self.metrics.as_ref() {
m.track_get_changes_call_error(call_result.is_ok());
}

let grpc_response = call_result?;
if let Some(m) = self.metrics.as_ref() {
m.peers_bubblegum_get_changes_latency
.observe(start.elapsed().as_secs_f64());
m.track_peers_bubblegum_get_changes_latency(start.elapsed().as_secs_f64());
}

let list = grpc_response.into_inner().list;
Expand Down Expand Up @@ -643,21 +630,19 @@ impl BbgmChecksumServiceApi for BbgmConsistencyApiClientImpl {

pub struct AccConsistencyApiClientImpl {
client: tokio::sync::Mutex<AccConsistencyServiceClient<Channel>>,
peer: String,
metrics: Option<Arc<AccoountConsistencyGrpcClientMetricsConfig>>,
metrics: Option<Arc<dyn AccoountConsistencyGrpcClientMetrics + Send + Sync>>,
}

impl AccConsistencyApiClientImpl {
pub async fn new(
peer: &str,
metrics: Option<Arc<AccoountConsistencyGrpcClientMetricsConfig>>,
metrics: Option<Arc<dyn AccoountConsistencyGrpcClientMetrics + Send + Sync>>,
) -> Result<AccConsistencyApiClientImpl, GrpcError> {
let url = Uri::from_str(peer).map_err(|e| GrpcError::UriCreate(e.to_string()))?;
let channel = Channel::builder(url).connect().await?;

Ok(AccConsistencyApiClientImpl {
client: tokio::sync::Mutex::new(AccConsistencyServiceClient::new(channel)),
peer: peer.to_string(),
metrics,
})
}
Expand All @@ -671,16 +656,13 @@ impl AccChecksumServiceApi for AccConsistencyApiClientImpl {

let start = Instant::now();
let call_result = client.get_acc_grand_bucket_checksums(grpc_request).await;
if call_result.is_err() && self.metrics.is_some() {
self.metrics
.as_ref()
.unwrap()
.track_get_grand_buckets_call_error(&self.peer);
if let Some(m) = self.metrics.as_ref() {
m.track_get_grand_buckets_call(call_result.is_ok());
}

let grpc_response = call_result?;
if let Some(m) = self.metrics.as_ref() {
m.peers_account_get_grand_buckets_latency
.observe(start.elapsed().as_secs_f64());
m.track_peers_account_get_grand_buckets_latency(start.elapsed().as_secs_f64());
}

let list = grpc_response.into_inner().list;
Expand Down Expand Up @@ -712,16 +694,13 @@ impl AccChecksumServiceApi for AccConsistencyApiClientImpl {
let call_result = client
.get_acc_bucket_checksums_in_grand_bucket(grpc_request)
.await;
if call_result.is_err() && self.metrics.is_some() {
self.metrics
.as_ref()
.unwrap()
.track_get_buckets_call_error(&self.peer);
if let Some(m) = self.metrics.as_ref() {
m.track_get_buckets_call(call_result.is_ok());
}

let grpc_response = call_result?;
if let Some(m) = self.metrics.as_ref() {
m.peers_account_get_buckets_latency
.observe(start.elapsed().as_secs_f64());
m.track_peers_account_get_buckets_latency(start.elapsed().as_secs_f64());
}

let list = grpc_response.into_inner().list;
Expand Down Expand Up @@ -754,16 +733,13 @@ impl AccChecksumServiceApi for AccConsistencyApiClientImpl {

let start = Instant::now();
let call_result = client.get_accs_in_bucket(grpc_request).await;
if call_result.is_err() && self.metrics.is_some() {
self.metrics
.as_ref()
.unwrap()
.track_get_latests_call_error(&self.peer);
if let Some(m) = self.metrics.as_ref() {
m.track_get_latests_call(call_result.is_ok());
}

let grpc_response = call_result?;
if let Some(m) = self.metrics.as_ref() {
m.peers_account_get_latests_latency
.observe(start.elapsed().as_secs_f64());
m.track_peers_account_get_latests_latency(start.elapsed().as_secs_f64());
}

let list = grpc_response.into_inner().list;
Expand Down
Loading

0 comments on commit 955a3bb

Please sign in to comment.