Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start adding farmer metrics. #2358

Merged
merged 6 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async-lock = "2.8.0"
async-trait = "0.1.73"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
bitvec = "1.0.1"
event-listener-primitives = "2.0.1"
fs2 = "0.4.3"
futures = "0.3.29"
hex = "0.4.3"
Expand Down
6 changes: 4 additions & 2 deletions crates/subspace-farmer-components/src/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ where
let public_key_hash = public_key.hash();

// Create auditing info for all sectors in parallel
sectors_metadata
let result = sectors_metadata
.par_iter()
.map(|sector_metadata| {
(
Expand Down Expand Up @@ -165,7 +165,9 @@ where
best_solution_distance,
})
})
.collect()
.collect();

result
}

struct SectorAuditingDetails {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod dsn;

use crate::commands::farm::dsn::configure_dsn;
use crate::utils::shutdown_signal;
use crate::utils::{shutdown_signal, FarmerMetrics};
use anyhow::anyhow;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
Expand Down Expand Up @@ -354,6 +354,7 @@ where

// Metrics
let mut prometheus_metrics_registry = Registry::default();
let farmer_metrics = FarmerMetrics::new(&mut prometheus_metrics_registry);
let metrics_endpoints_are_specified = !metrics_endpoints.is_empty();

let (node, mut node_runner) = {
Expand Down Expand Up @@ -604,10 +605,20 @@ where
}
};

// Register audit plot events
let farmer_metrics = farmer_metrics.clone();
let on_plot_audited_callback = move |audit_event: &_| {
farmer_metrics.observe_audit_event(audit_event);
};

single_disk_farm
.on_sector_plotted(Arc::new(on_plotted_sector_callback))
.detach();

single_disk_farm
.on_plot_audited(Arc::new(on_plot_audited_callback))
.detach();

single_disk_farm.run()
})
.collect::<FuturesUnordered<_>>();
Expand Down
30 changes: 30 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,35 @@
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::{Registry, Unit};
use subspace_farmer::single_disk_farm::farming::AuditEvent;
use tokio::signal;

#[derive(Debug, Clone)]
pub(crate) struct FarmerMetrics {
// Type comment: we need (String, String) instead of just String for farm_id because of
// trait definition within prometheus_client library.
audit: Family<Vec<(String, String)>, Histogram>,
}

impl FarmerMetrics {
pub(crate) fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("subspace_farmer");

let audit: Family<_, _> =
Family::new_with_constructor(|| Histogram::new(exponential_buckets(0.0001, 2.0, 15)));

sub_registry.register_with_unit("audit", "Audit time", Unit::Seconds, audit.clone());

Self { audit }
}

pub(crate) fn observe_audit_event(&self, event: &AuditEvent) {
self.audit
.get_or_create(&vec![(event.farm_id.to_string(), Default::default())])
.observe(event.duration);
}
}

pub(crate) fn raise_fd_limit() {
match fdlimit::raise_fd_limit() {
Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
Expand Down
10 changes: 9 additions & 1 deletion crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::reward_signing::reward_signing;
use crate::single_disk_farm::farming::rayon_files::RayonFiles;
pub use crate::single_disk_farm::farming::FarmingError;
use crate::single_disk_farm::farming::{
farming, slot_notification_forwarder, FarmingOptions, PlotAudit,
farming, slot_notification_forwarder, AuditEvent, FarmingOptions, PlotAudit,
};
use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError};
use crate::single_disk_farm::piece_reader::PieceReader;
Expand Down Expand Up @@ -558,6 +558,7 @@ struct Handlers {
sector_plotting: Handler<SectorPlottingDetails>,
sector_plotted: Handler<(PlottedSector, Option<PlottedSector>)>,
solution: Handler<SolutionResponse>,
plot_audited: Handler<AuditEvent>,
}

/// Single disk farm abstraction is a container for everything necessary to plot/farm with a single
Expand Down Expand Up @@ -706,6 +707,7 @@ impl SingleDiskFarm {
single_disk_farm_info
}
};
let farm_id = *single_disk_farm_info.id();

let single_disk_farm_info_lock = SingleDiskFarmInfo::try_lock(&directory)
.map_err(SingleDiskFarmError::LikelyAlreadyInUse)?;
Expand Down Expand Up @@ -1118,6 +1120,7 @@ impl SingleDiskFarm {
handlers,
modifying_sector_index,
slot_info_notifications: slot_info_forwarder_receiver,
farm_id,
};
farming::<PosTable, _, _>(farming_options).await
};
Expand Down Expand Up @@ -1376,6 +1379,11 @@ impl SingleDiskFarm {
self.handlers.sector_plotted.add(callback)
}

/// Subscribe to notification about audited plots
pub fn on_plot_audited(&self, callback: HandlerFn<AuditEvent>) -> HandlerId {
self.handlers.plot_audited.add(callback)
}

/// Subscribe to new solution notification
pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
self.handlers.solution.add(callback)
Expand Down
28 changes: 25 additions & 3 deletions crates/subspace-farmer/src/single_disk_farm/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod rayon_files;

use crate::node_client;
use crate::node_client::NodeClient;
use crate::single_disk_farm::Handlers;
use crate::single_disk_farm::{Handlers, SingleDiskFarmId};
use async_lock::RwLock;
use futures::channel::mpsc;
use futures::StreamExt;
Expand All @@ -23,6 +23,16 @@ use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
use thiserror::Error;
use tracing::{debug, error, info, trace, warn};

#[derive(Debug, Clone)]
pub struct AuditEvent {
/// Defines how much time took the audit in secs
pub duration: f64,
/// ID of the farm
pub farm_id: SingleDiskFarmId,
/// Number of sectors for this audit
pub sectors_number: usize,
}

/// Errors that happen during farming
#[derive(Debug, Error)]
pub enum FarmingError {
Expand Down Expand Up @@ -204,6 +214,7 @@ pub(super) struct FarmingOptions<NC, PlotAudit> {
pub(super) handlers: Arc<Handlers>,
pub(super) modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
pub(super) farm_id: SingleDiskFarmId,
}

/// Starts farming process.
Expand All @@ -229,6 +240,7 @@ where
handlers,
modifying_sector_index,
mut slot_info_notifications,
farm_id,
} = farming_options;

let farmer_app_info = node_client
Expand All @@ -252,7 +264,9 @@ where
let modifying_sector_guard = modifying_sector_index.read().await;
let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied();

plot_audit.audit(PlotAuditOptions::<PosTable> {
let start = Instant::now();

let sectors_solutions = plot_audit.audit(PlotAuditOptions::<PosTable> {
public_key: &public_key,
reward_address: &reward_address,
slot_info,
Expand All @@ -261,7 +275,15 @@ where
erasure_coding: &erasure_coding,
maybe_sector_being_modified,
table_generator: &table_generator,
})
});

handlers.plot_audited.call_simple(&AuditEvent {
duration: start.elapsed().as_secs_f64(),
farm_id,
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
sectors_number: sectors_metadata.len(),
});

sectors_solutions
};

sectors_solutions.sort_by(|a, b| {
Expand Down
Loading