Skip to content

Commit

Permalink
feat: add latency metrics to batcher (#1578)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcos Nicolau <marcosnicolau@lambdaclass.com>
  • Loading branch information
2 people authored and PatStiles committed Jan 10, 2025
1 parent 3d10a40 commit 6ac6c8a
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 13 deletions.
31 changes: 23 additions & 8 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use retry::batcher_retryables::{
user_balance_is_unlocked_retryable,
};
use retry::{retry_function, RetryError};
use tokio::time::timeout;
use tokio::time::{timeout, Instant};
use types::batch_state::BatchState;
use types::user_state::UserState;

Expand Down Expand Up @@ -1524,6 +1524,7 @@ impl Batcher {
proof_submitters: Vec<Address>,
fee_params: CreateNewTaskFeeParams,
) -> Result<TransactionReceipt, BatcherError> {
let start = Instant::now();
let result = retry_function(
|| {
create_new_task_retryable(
Expand All @@ -1542,6 +1543,11 @@ impl Batcher {
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await;
self.metrics
.create_new_task_duration
.set(start.elapsed().as_millis() as i64);
// Set to zero since it is not always executed
self.metrics.cancel_create_new_task_duration.set(0);
match result {
Ok(receipt) => {
if let Err(e) = self
Expand Down Expand Up @@ -1600,10 +1606,11 @@ impl Batcher {
/// After 2 hours (attempt 13), retries occur hourly for 1 day (33 retries).
pub async fn cancel_create_new_task_tx(&self, old_tx_gas_price: U256) {
info!("Cancelling createNewTask transaction...");
let start = Instant::now();
let iteration = Arc::new(Mutex::new(0));
let previous_gas_price = Arc::new(Mutex::new(old_tx_gas_price));

if let Err(e) = retry_function(
match retry_function(
|| async {
let mut iteration = iteration.lock().await;
let mut previous_gas_price = previous_gas_price.lock().await;
Expand Down Expand Up @@ -1639,11 +1646,12 @@ impl Batcher {
)
.await
{
error!("Could not cancel createNewTask transaction: {e}");
return;
Ok(_) => info!("createNewTask transaction successfully canceled"),
Err(e) => error!("Could not cancel createNewTask transaction: {e}"),
};

info!("createNewTask transaction successfully canceled");
self.metrics
.cancel_create_new_task_duration
.set(start.elapsed().as_millis() as i64);
}

/// Only relevant for testing and for users to easily use Aligned
Expand Down Expand Up @@ -1785,7 +1793,8 @@ impl Batcher {
batch_bytes: &[u8],
file_name: &str,
) -> Result<(), BatcherError> {
retry_function(
let start = Instant::now();
let result = retry_function(
|| {
Self::upload_batch_to_s3_retryable(
batch_bytes,
Expand All @@ -1800,7 +1809,13 @@ impl Batcher {
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| BatcherError::BatchUploadError(e.to_string()))
.map_err(|e| BatcherError::BatchUploadError(e.to_string()));

self.metrics
.s3_duration
.set(start.elapsed().as_micros() as i64);

result
}

async fn upload_batch_to_s3_retryable(
Expand Down
18 changes: 18 additions & 0 deletions batcher/aligned-batcher/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub struct BatcherMetrics {
pub batcher_started: IntCounter,
pub gas_price_used_on_latest_batch: IntGauge,
pub broken_ws_connections: IntCounter,
pub s3_duration: IntGauge,
pub create_new_task_duration: IntGauge,
pub cancel_create_new_task_duration: IntGauge,
}

impl BatcherMetrics {
Expand Down Expand Up @@ -46,6 +49,15 @@ impl BatcherMetrics {
"broken_ws_connections_count",
"Broken websocket connections"
))?;
let s3_duration = register_int_gauge!(opts!("s3_duration", "S3 Duration"))?;
let create_new_task_duration = register_int_gauge!(opts!(
"create_new_task_duration",
"Create New Task Duration"
))?;
let cancel_create_new_task_duration = register_int_gauge!(opts!(
"cancel_create_new_task_duration",
"Cancel create New Task Duration"
))?;

registry.register(Box::new(open_connections.clone()))?;
registry.register(Box::new(received_proofs.clone()))?;
Expand All @@ -56,6 +68,9 @@ impl BatcherMetrics {
registry.register(Box::new(gas_price_used_on_latest_batch.clone()))?;
registry.register(Box::new(batcher_started.clone()))?;
registry.register(Box::new(broken_ws_connections.clone()))?;
registry.register(Box::new(s3_duration.clone()))?;
registry.register(Box::new(create_new_task_duration.clone()))?;
registry.register(Box::new(cancel_create_new_task_duration.clone()))?;

let metrics_route = warp::path!("metrics")
.and(warp::any().map(move || registry.clone()))
Expand All @@ -77,6 +92,9 @@ impl BatcherMetrics {
batcher_started,
gas_price_used_on_latest_batch,
broken_ws_connections,
s3_duration,
create_new_task_duration,
cancel_create_new_task_duration,
})
}

Expand Down
Loading

0 comments on commit 6ac6c8a

Please sign in to comment.