diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 37e00987fe..b443d10c7f 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -1,7 +1,7 @@ mod dsn; use crate::commands::farm::dsn::configure_dsn; -use crate::utils::shutdown_signal; +use crate::utils::{shutdown_signal, FarmerMetrics}; use anyhow::anyhow; use bytesize::ByteSize; use clap::{Parser, ValueHint}; @@ -354,6 +354,7 @@ where // Metrics let mut prometheus_metrics_registry = Registry::default(); + let farmer_metrics = FarmerMetrics::new(&mut prometheus_metrics_registry); let metrics_endpoints_are_specified = !metrics_endpoints.is_empty(); let (node, mut node_runner) = { @@ -604,10 +605,20 @@ where } }; + // Register audit plot events + let farmer_metrics = farmer_metrics.clone(); + let on_plot_audited_callback = move |audit_event: &_| { + farmer_metrics.observe_audit_event(audit_event); + }; + single_disk_farm .on_sector_plotted(Arc::new(on_plotted_sector_callback)) .detach(); + single_disk_farm + .on_plot_audited(Arc::new(on_plot_audited_callback)) + .detach(); + single_disk_farm.run() }) .collect::>(); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs b/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs index 709deff5a5..64357e2b51 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs @@ -1,5 +1,35 @@ +use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; +use prometheus_client::registry::{Registry, Unit}; +use subspace_farmer::single_disk_farm::farming::AuditEvent; use tokio::signal; +#[derive(Debug, Clone)] +pub(crate) struct FarmerMetrics { + // Type comment: we need (String, String) instead of just String for farm_id because of + // trait definition within prometheus_client library. + audit: Family, Histogram>, +} + +impl FarmerMetrics { + pub(crate) fn new(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("subspace_farmer"); + + let audit: Family<_, _> = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(0.0001, 2.0, 15))); + + sub_registry.register_with_unit("audit", "Audit time", Unit::Seconds, audit.clone()); + + Self { audit } + } + + pub(crate) fn observe_audit_event(&self, event: &AuditEvent) { + self.audit + .get_or_create(&vec![(event.farm_id.to_string(), Default::default())]) + .observe(event.duration); + } +} + pub(crate) fn raise_fd_limit() { match fdlimit::raise_fd_limit() { Ok(fdlimit::Outcome::LimitRaised { from, to }) => { diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 6354c29bf8..ff71721300 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -9,7 +9,7 @@ use crate::reward_signing::reward_signing; use crate::single_disk_farm::farming::rayon_files::RayonFiles; pub use crate::single_disk_farm::farming::FarmingError; use crate::single_disk_farm::farming::{ - farming, slot_notification_forwarder, FarmingOptions, PlotAudit, + farming, slot_notification_forwarder, AuditEvent, FarmingOptions, PlotAudit, }; use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError}; use crate::single_disk_farm::piece_reader::PieceReader; @@ -558,6 +558,7 @@ struct Handlers { sector_plotting: Handler, sector_plotted: Handler<(PlottedSector, Option)>, solution: Handler, + plot_audited: Handler, } /// Single disk farm abstraction is a container for everything necessary to plot/farm with a single @@ -706,6 +707,7 @@ impl SingleDiskFarm { single_disk_farm_info } }; + let farm_id = *single_disk_farm_info.id(); let single_disk_farm_info_lock = SingleDiskFarmInfo::try_lock(&directory) .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?; @@ -1118,6 +1120,7 @@ impl SingleDiskFarm { handlers, modifying_sector_index, slot_info_notifications: slot_info_forwarder_receiver, + farm_id, }; farming::(farming_options).await }; @@ -1376,6 +1379,11 @@ impl SingleDiskFarm { self.handlers.sector_plotted.add(callback) } + /// Subscribe to notification about audited plots + pub fn on_plot_audited(&self, callback: HandlerFn) -> HandlerId { + self.handlers.plot_audited.add(callback) + } + /// Subscribe to new solution notification pub fn on_solution(&self, callback: HandlerFn) -> HandlerId { self.handlers.solution.add(callback) diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index d1ed5068d2..4b0c26bb3b 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -2,7 +2,7 @@ pub mod rayon_files; use crate::node_client; use crate::node_client::NodeClient; -use crate::single_disk_farm::Handlers; +use crate::single_disk_farm::{Handlers, SingleDiskFarmId}; use async_lock::RwLock; use futures::channel::mpsc; use futures::StreamExt; @@ -23,6 +23,16 @@ use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; use thiserror::Error; use tracing::{debug, error, info, trace, warn}; +#[derive(Debug, Clone)] +pub struct AuditEvent { + /// Defines how much time took the audit in secs + pub duration: f64, + /// ID of the farm + pub farm_id: SingleDiskFarmId, + /// Number of sectors for this audit + pub sectors_number: usize, +} + /// Errors that happen during farming #[derive(Debug, Error)] pub enum FarmingError { @@ -204,6 +214,7 @@ pub(super) struct FarmingOptions { pub(super) handlers: Arc, pub(super) modifying_sector_index: Arc>>, pub(super) slot_info_notifications: mpsc::Receiver, + pub(super) farm_id: SingleDiskFarmId, } /// Starts farming process. @@ -229,6 +240,7 @@ where handlers, modifying_sector_index, mut slot_info_notifications, + farm_id, } = farming_options; let farmer_app_info = node_client @@ -252,7 +264,9 @@ where let modifying_sector_guard = modifying_sector_index.read().await; let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied(); - plot_audit.audit(PlotAuditOptions:: { + let start = Instant::now(); + + let sectors_solutions = plot_audit.audit(PlotAuditOptions:: { public_key: &public_key, reward_address: &reward_address, slot_info, @@ -261,7 +275,15 @@ where erasure_coding: &erasure_coding, maybe_sector_being_modified, table_generator: &table_generator, - }) + }); + + handlers.plot_audited.call_simple(&AuditEvent { + duration: start.elapsed().as_secs_f64(), + farm_id, + sectors_number: sectors_metadata.len(), + }); + + sectors_solutions }; sectors_solutions.sort_by(|a, b| {