From e5f6e9b88d07b87fb4c893bac25e56868c5ebbfa Mon Sep 17 00:00:00 2001 From: Ivan De Marino Date: Sun, 19 Nov 2023 19:01:38 +0000 Subject: [PATCH] Export "Cluster Metadata" and other internal `cluster_status` module metrics (#99) * Reworked the initialization of the `prometheus` module, so it procures the `cluster_id` by itself, instead of depending on the `cluster_status` module. * Why? Because we want the `prometheus` metrics `Register` ready BEFORE any of the other modules are launched, so each can add their own metrics. * Passing in Prometheus Registry to the `cluster_status` module * Exposing metrics for the `ClusterStatusEmitter` * `fetch_time` * `channel_capacity` * Exposing metrics for `ClusterStatusRegister` * `brokers_total` * `topics_total` * `partitions_total` * `topic_partitions_total` * Documenting all the new `cluster_status` metrics * Deps upgrade * It includes a major update of `hyper` to `v1.0.1` --- Cargo.lock | 121 ++++++++++++++++++++---------- Cargo.toml | 2 +- METRICS.md | 68 ++++++++++++++++- src/cluster_status/emitter.rs | 53 +++++++++++-- src/cluster_status/mod.rs | 9 ++- src/cluster_status/register.rs | 73 +++++++++++++++++- src/constants.rs | 3 + src/consumer_groups/emitter.rs | 7 +- src/main.rs | 9 ++- src/partition_offsets/emitter.rs | 13 ++-- src/partition_offsets/mod.rs | 2 - src/partition_offsets/register.rs | 2 +- src/prometheus_metrics/mod.rs | 22 +++++- 13 files changed, 314 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 90a13d5..f2d8ec8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,9 +117,9 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.11", + "http-body 0.4.5", + "hyper 0.14.27", "itoa", "matchit", "memchr", @@ -147,8 +147,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.5", "mime", "rustversion", "tower-layer", @@ -205,9 +205,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.84" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f8e7c90afad890484a21653d08b6e209ae34770fb5ee298f9c699fcc1e5c856" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" dependencies = [ "libc", ] @@ -348,9 +348,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e" +checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" dependencies = [ "libc", "windows-sys", @@ -424,17 +424,17 @@ checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" [[package]] name = "h2" -version = "0.3.21" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" dependencies = [ "bytes", "fnv", "futures-core", "futures-sink", "futures-util", - "http", - "indexmap 1.9.3", + "http 0.2.11", + "indexmap", "slab", "tokio", "tokio-util", @@ -442,10 +442,23 @@ dependencies = [ ] [[package]] -name = "hashbrown" -version = "0.12.3" +name = "h2" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +checksum = "e1d308f63daf4181410c242d34c11f928dcb3aa105852019e043c9d1f4e4368a" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] [[package]] name = "hashbrown" @@ -467,9 +480,20 @@ checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] name = "http" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f95b9abcae896730d42b78e09c155ed4ddf82c07b4de772c64aee5b2d8b7c150" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" dependencies = [ "bytes", "fnv", @@ -483,10 +507,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.11", "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + [[package]] name = "httparse" version = "1.8.0" @@ -515,9 +549,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.22", + "http 0.2.11", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -529,6 +563,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "403f9214f3e703236b221f1a9cd88ec8b4adfa5296de01ab96216361f4692f56" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.0", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.58" @@ -552,16 +605,6 @@ dependencies = [ "cc", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - [[package]] name = "indexmap" version = "2.1.0" @@ -569,7 +612,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", - "hashbrown 0.14.2", + "hashbrown", ] [[package]] @@ -610,7 +653,7 @@ dependencies = [ "ctrlc", "env_logger", "exit-code", - "hyper", + "hyper 1.0.1", "konsumer_offsets", "log", "prometheus", @@ -1017,9 +1060,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.21" +version = "0.38.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3" +checksum = "dc99bc2d4f1fed22595588a013687477aedf3cdcfb26558c559edb67b4d9b22e" dependencies = [ "bitflags 2.4.1", "errno", @@ -1193,9 +1236,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "termcolor" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6093bad37da69aab9d123a8091e4be0aa4a03e4d601ec641c327398315f62b64" +checksum = "ff1bc3d3f05aff0403e8ac0d92ced918ec05b666a43f83297ccef5bea8a3d449" dependencies = [ "winapi-util", ] @@ -1284,7 +1327,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.1.0", + "indexmap", "toml_datetime", "winnow", ] diff --git a/Cargo.toml b/Cargo.toml index 222277d..e9df13b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ const_format = "0.2.32" ctrlc = { version = "3.4.1", features = ["termination"] } env_logger = "0.10.1" exit-code = "1.0.0" -hyper = { version = "0.14.27", features = ["http1", "http2", "server", "runtime", "tcp"] } +hyper = { version = "1.0.1", features = ["http1", "http2", "server"] } konsumer_offsets = { version = "0.3.0", default-features = false, features = ["ts_chrono"] } log = "0.4.20" prometheus = "0.13.3" diff --git a/METRICS.md b/METRICS.md index 3225ac7..baf292f 100644 --- a/METRICS.md +++ b/METRICS.md @@ -96,7 +96,7 @@ to produce the lag information for).
kmtd_consumer_groups_total
- Description: "Consumer groups currently in the cluster.
+ Description: Consumer groups currently in the cluster.
Labels: cluster_id
Type: gauge
Timestamped: false @@ -113,8 +113,72 @@ to produce the lag information for).
+
+
kmtd_cluster_brokers_total
+
+ Description: Brokers currently in cluster.
+ Labels: cluster_id
+ Type: gauge
+ Timestamped: false +
+
+ +
+
kmtd_cluster_topics_total
+
+ Description: Topics currently in cluster.
+ Labels: cluster_id
+ Type: gauge
+ Timestamped: false +
+
+ +
+
kmtd_cluster_partitions_total
+
+ Description: Partitions currently in cluster.
+ Labels: cluster_id
+ Type: gauge
+ Timestamped: false +
+
+ +
+
kmtd_cluster_topic_partitions_total
+
+ Description: Topic's Partitions currently in cluster.
+ Labels: cluster_id, topic
+ Type: gauge
+ Timestamped: false +
+
+ ### Kommitted (internal) Metrics +#### `cluster_status` module + +
+
kmtd_cluster_status_emitter_fetch_time_milliseconds
+
+ Description: Time (ms) taken to fetch cluster status metadata.
+ Labels: cluster_id
+ Type: histogram
+ Timestamped: false +
+
+ +
+
kmtd_cluster_status_emitter_channel_capacity
+
+ Description: Capacity of internal channel used to send cluster status metadata to rest of the service.
+ Labels: cluster_id
+ Type: gauge
+ Timestamped: false +
+
+ +#### `consumer_groups` module +
kmtd_consumer_groups_emitter_fetch_time_milliseconds
@@ -135,6 +199,8 @@ to produce the lag information for).
+#### `partition_offsets` module +
kmtd_partition_offsets_emitter_fetch_time_milliseconds
diff --git a/src/cluster_status/emitter.rs b/src/cluster_status/emitter.rs index cb47791..114ced1 100644 --- a/src/cluster_status/emitter.rs +++ b/src/cluster_status/emitter.rs @@ -1,4 +1,10 @@ +use std::sync::Arc; + use async_trait::async_trait; +use prometheus::{ + register_histogram_with_registry, register_int_gauge_with_registry, Histogram, IntGauge, + Registry, +}; use rdkafka::{admin::AdminClient, client::DefaultClientContext, metadata::Metadata, ClientConfig}; use tokio::{ sync::mpsc, @@ -7,7 +13,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; -use crate::constants::KONSUMER_OFFSETS_DATA_TOPIC; +use crate::constants::{DEFAULT_CLUSTER_ID, KONSUMER_OFFSETS_DATA_TOPIC}; use crate::internals::Emitter; use crate::kafka_types::{Broker, TopicPartitionsStatus}; @@ -16,7 +22,11 @@ const CHANNEL_SIZE: usize = 5; const FETCH_TIMEOUT: Duration = Duration::from_secs(10); const FETCH_INTERVAL: Duration = Duration::from_secs(60); -pub const CLUSTER_ID_NONE: &str = "__none__"; +const MET_FETCH_NAME: &str = "cluster_status_emitter_fetch_time_milliseconds"; +const MET_FETCH_HELP: &str = "Time (ms) taken to fetch cluster status metadata"; +const MET_CH_CAP_NAME: &str = "cluster_status_emitter_channel_capacity"; +const MET_CH_CAP_HELP: &str = + "Capacity of internal channel used to send cluster status metadata to rest of the service"; /// This is a `Send`-able struct to carry Kafka Cluster status across thread boundaries. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Default, Hash)] @@ -39,7 +49,7 @@ pub struct ClusterStatus { impl ClusterStatus { fn from(id: Option, m: Metadata) -> Self { Self { - id: id.unwrap_or_else(|| CLUSTER_ID_NONE.to_string()), + id: id.unwrap_or_else(|| DEFAULT_CLUSTER_ID.to_string()), topics: m .topics() .iter() @@ -60,6 +70,10 @@ impl ClusterStatus { /// It shuts down when the provided [`CancellationToken`] is cancelled. pub struct ClusterStatusEmitter { admin_client_config: ClientConfig, + + // Prometheus Metrics + metric_fetch: Histogram, + metric_ch_cap: IntGauge, } impl ClusterStatusEmitter { @@ -68,9 +82,21 @@ impl ClusterStatusEmitter { /// # Arguments /// /// * `client_config` - Kafka admin client configuration, used to fetch the Cluster current status - pub fn new(client_config: ClientConfig) -> Self { + pub fn new(client_config: ClientConfig, metrics: Arc) -> Self { Self { admin_client_config: client_config, + metric_fetch: register_histogram_with_registry!( + MET_FETCH_NAME, + MET_FETCH_HELP, + metrics + ) + .unwrap_or_else(|_| panic!("Failed to create metric: {MET_FETCH_NAME}")), + metric_ch_cap: register_int_gauge_with_registry!( + MET_CH_CAP_NAME, + MET_CH_CAP_HELP, + metrics + ) + .unwrap_or_else(|_| panic!("Failed to create metric: {MET_CH_CAP_NAME}")), } } } @@ -98,14 +124,27 @@ impl Emitter for ClusterStatusEmitter { let (sx, rx) = mpsc::channel::(CHANNEL_SIZE); + // Clone metrics so they can be used in the spawned future + let metric_fetch = self.metric_fetch.clone(); + let metric_ch_cap = self.metric_ch_cap.clone(); + let join_handle = tokio::spawn(async move { let mut interval = interval(FETCH_INTERVAL); loop { - match admin_client.inner().fetch_metadata(None, FETCH_TIMEOUT).map(|m| { - Self::Emitted::from(admin_client.inner().fetch_cluster_id(FETCH_TIMEOUT), m) - }) { + // Fetch metadata and update timer metric + let timer = metric_fetch.start_timer(); + let res_status = + admin_client.inner().fetch_metadata(None, FETCH_TIMEOUT).map(|m| { + Self::Emitted::from(admin_client.inner().fetch_cluster_id(FETCH_TIMEOUT), m) + }); + timer.observe_duration(); + + match res_status { Ok(status) => { + // Update channel capacity metric + metric_ch_cap.set(sx.capacity() as i64); + tokio::select! { res = Self::emit_with_interval(&sx, status, &mut interval) => { if let Err(e) = res { diff --git a/src/cluster_status/mod.rs b/src/cluster_status/mod.rs index b8354d9..ebf1435 100644 --- a/src/cluster_status/mod.rs +++ b/src/cluster_status/mod.rs @@ -2,11 +2,14 @@ mod emitter; mod register; +use std::sync::Arc; + // Exports pub use emitter::ClusterStatusEmitter; pub use register::ClusterStatusRegister; // Imports +use prometheus::Registry; use rdkafka::ClientConfig; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -17,10 +20,12 @@ pub fn init( admin_client_config: ClientConfig, cluster_id_override: Option, shutdown_token: CancellationToken, + metrics: Arc, ) -> (ClusterStatusRegister, JoinHandle<()>) { // Cluster Status: emitter and register - let (cs_rx, cse_join) = ClusterStatusEmitter::new(admin_client_config).spawn(shutdown_token); - let cs_reg = ClusterStatusRegister::new(cluster_id_override, cs_rx); + let (cs_rx, cse_join) = + ClusterStatusEmitter::new(admin_client_config, metrics.clone()).spawn(shutdown_token); + let cs_reg = ClusterStatusRegister::new(cluster_id_override, cs_rx, metrics); debug!("Initialized"); (cs_reg, cse_join) diff --git a/src/cluster_status/register.rs b/src/cluster_status/register.rs index 4aa50bf..75384b8 100644 --- a/src/cluster_status/register.rs +++ b/src/cluster_status/register.rs @@ -1,12 +1,27 @@ use std::sync::Arc; use async_trait::async_trait; +use prometheus::{ + register_int_gauge_vec_with_registry, register_int_gauge_with_registry, IntGauge, IntGaugeVec, + Registry, +}; use tokio::sync::{mpsc::Receiver, RwLock}; use super::emitter::ClusterStatus; +use crate::constants::DEFAULT_CLUSTER_ID; use crate::internals::Awaitable; use crate::kafka_types::{Broker, TopicPartition}; +use crate::prometheus_metrics::LABEL_TOPIC; + +const MET_BROKERS_TOT_NAME: &str = "cluster_brokers_total"; +const MET_BROKERS_TOT_HELP: &str = "Brokers currently in cluster"; +const MET_TOPICS_TOT_NAME: &str = "cluster_topics_total"; +const MET_TOPICS_TOT_HELP: &str = "Topics currently in cluster"; +const MET_PARTITIONS_TOT_NAME: &str = "cluster_partitions_total"; +const MET_PARTITIONS_TOT_HELP: &str = "Partitions currently in cluster"; +const MET_TOPIC_PARTITIONS_TOT_NAME: &str = "cluster_topic_partitions_total"; +const MET_TOPIC_PARTITIONS_TOT_HELP: &str = "Topic's Partitions currently in cluster"; /// Registers and exposes the latest [`ClusterStatus`]. /// @@ -15,18 +30,59 @@ use crate::kafka_types::{Broker, TopicPartition}; #[derive(Debug)] pub struct ClusterStatusRegister { latest_status: Arc>>, + + // Prometheus Metrics + metric_brokers: IntGauge, + metric_topics: IntGauge, + metric_partitions: IntGauge, + metric_topic_partitions: IntGaugeVec, } impl ClusterStatusRegister { - pub fn new(cluster_id_override: Option, mut rx: Receiver) -> Self { + pub fn new( + cluster_id_override: Option, + mut rx: Receiver, + metrics: Arc, + ) -> Self { let csr = Self { latest_status: Arc::new(RwLock::new(None)), + metric_brokers: register_int_gauge_with_registry!( + MET_BROKERS_TOT_NAME, + MET_BROKERS_TOT_HELP, + metrics + ) + .unwrap_or_else(|_| panic!("Failed to create metric: {MET_BROKERS_TOT_NAME}")), + metric_topics: register_int_gauge_with_registry!( + MET_TOPICS_TOT_NAME, + MET_TOPICS_TOT_HELP, + metrics + ) + .unwrap_or_else(|_| panic!("Failed to create metric: {MET_TOPICS_TOT_NAME}")), + metric_partitions: register_int_gauge_with_registry!( + MET_PARTITIONS_TOT_NAME, + MET_PARTITIONS_TOT_HELP, + metrics + ) + .unwrap_or_else(|_| panic!("Failed to create metric: {MET_PARTITIONS_TOT_NAME}")), + metric_topic_partitions: register_int_gauge_vec_with_registry!( + MET_TOPIC_PARTITIONS_TOT_NAME, + MET_TOPIC_PARTITIONS_TOT_HELP, + &[LABEL_TOPIC], + metrics + ) + .unwrap_or_else(|_| panic!("Failed to create metric: {MET_TOPIC_PARTITIONS_TOT_NAME}")), }; // A clone of the `csr.latest_status` will be moved into the async task // that updates the register. let latest_status_arc_clone = csr.latest_status.clone(); + // Clone metrics so they can be used in the spawned future + let metric_brokers = csr.metric_brokers.clone(); + let metric_topics = csr.metric_topics.clone(); + let metric_partitions = csr.metric_partitions.clone(); + let metric_topic_partitions = csr.metric_topic_partitions.clone(); + // The Register is essentially "self updating" its data, by listening // on a channel for updates. // @@ -51,6 +107,19 @@ impl ClusterStatusRegister { cs.id, cs.topics.len(), cs.brokers.len() ); + // Update cluster status metrics (broker, topics, partitions) + metric_brokers.set(cs.brokers.len() as i64); + metric_topics.set(cs.topics.len() as i64); + let mut partitions_total = 0; + for t in cs.topics.iter() { + metric_topic_partitions + .with_label_values(&[&t.name]) + .set(t.partitions.len() as i64); + partitions_total += t.partitions.len(); + } + metric_partitions.set(partitions_total as i64); + + // Set the latest cluster status *(latest_status_arc_clone.write().await) = Some(cs); }, else => { @@ -67,7 +136,7 @@ impl ClusterStatusRegister { /// Current identifier of the Kafka cluster. pub async fn get_cluster_id(&self) -> String { match &*(self.latest_status.read().await) { - None => super::emitter::CLUSTER_ID_NONE.to_string(), + None => DEFAULT_CLUSTER_ID.to_string(), Some(cs) => cs.id.clone(), } } diff --git a/src/constants.rs b/src/constants.rs index e2c7e08..f7b8b2a 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -24,3 +24,6 @@ pub(crate) const DEFAULT_OFFSETS_HISTORY: &str = "3600"; //< `usize` after parsi /// /// See [`crate::Cli`]'s `offsets_history_ready_at`. pub(crate) const DEFAULT_OFFSETS_HISTORY_READY_AT: &str = "0.3"; //< `f64` after parsing + +/// The default `cluster_id` value, if none is provided (either via CLI override, nor Cluster configuration). +pub(crate) const DEFAULT_CLUSTER_ID: &str = "__not-set__"; diff --git a/src/consumer_groups/emitter.rs b/src/consumer_groups/emitter.rs index 53fc9be..cfac30f 100644 --- a/src/consumer_groups/emitter.rs +++ b/src/consumer_groups/emitter.rs @@ -189,16 +189,15 @@ impl Emitter for ConsumerGroupsEmitter { let mut interval = interval(FETCH_INTERVAL); loop { + // Fetch Consumer Groups and update timer metrics let timer = metric_cg_fetch.start_timer(); - let res_groups = admin_client + let res_cg = admin_client .inner() .fetch_group_list(None, FETCH_TIMEOUT) .map(Self::Emitted::from); - - // Update fetching time metric timer.observe_duration(); - match res_groups { + match res_cg { Ok(cg) => { // Update group and group member metrics metric_cg.set(cg.groups.len() as i64); diff --git a/src/main.rs b/src/main.rs index 3e2fcb9..2804caf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,19 +28,20 @@ async fn main() -> Result<(), Box> { let admin_client_config = cli.build_client_config(); let shutdown_token = build_shutdown_token(); + // Init `prometheus_metrics` module + let prom_reg = prometheus_metrics::init(admin_client_config.clone(), cli.cluster_id.clone()); + let prom_reg_arc = Arc::new(prom_reg); + // Init `cluster_status` module, and await registry to be ready let (cs_reg, cs_join) = cluster_status::init( admin_client_config.clone(), cli.cluster_id.clone(), shutdown_token.clone(), + prom_reg_arc.clone(), ); cs_reg.await_ready(shutdown_token.clone()).await?; let cs_reg_arc = Arc::new(cs_reg); - // Init `prometheus_metrics` module - let prom_reg = prometheus_metrics::init(cs_reg_arc.get_cluster_id().await); - let prom_reg_arc = Arc::new(prom_reg); - // Init `partition_offsets` module, and await registry to be ready let (po_reg, po_join) = partition_offsets::init( admin_client_config.clone(), diff --git a/src/partition_offsets/emitter.rs b/src/partition_offsets/emitter.rs index 27c1d68..8253c65 100644 --- a/src/partition_offsets/emitter.rs +++ b/src/partition_offsets/emitter.rs @@ -23,10 +23,10 @@ const CHANNEL_SIZE: usize = 10_000; const FETCH_TIMEOUT: Duration = Duration::from_secs(10); const FETCH_INTERVAL: Duration = Duration::from_millis(10); -const MET_FETCH_NAME: &str = "_partition_offsets_emitter_fetch_time_milliseconds"; +const MET_FETCH_NAME: &str = "partition_offsets_emitter_fetch_time_milliseconds"; const MET_FETCH_HELP: &str = "Time (ms) taken to fetch earliest/latest (watermark) offsets of a specific topic partition in cluster"; -const MET_CH_CAP_NAME: &str = "_partition_offsets_emitter_channel_capacity"; +const MET_CH_CAP_NAME: &str = "partition_offsets_emitter_channel_capacity"; const MET_CH_CAP_HELP: &str = "Capacity of internal channel used to send partition watermark offsets to rest of the service"; @@ -128,14 +128,15 @@ impl Emitter for PartitionOffsetsEmitter { trace!("Fetching earliest/latest offset for Partitions of Topic '{}'", t); for p in csr.get_partitions_for_topic(&t).await.unwrap_or_default() { + // Fetch Partition Watermarks and update timer metrics let timer = metric_cg_fetch.with_label_values(&[&t, &p.to_string()]).start_timer(); + let res_watermarks = + admin_client.inner().fetch_watermarks(&t, p as i32, FETCH_TIMEOUT); + timer.observe_duration(); - match admin_client.inner().fetch_watermarks(&t, p as i32, FETCH_TIMEOUT) { + match res_watermarks { Ok((earliest, latest)) => { - // Update fetching time metric for (topic,partition) tuple - timer.observe_duration(); - let po = PartitionOffset { topic: t.clone(), partition: p, diff --git a/src/partition_offsets/mod.rs b/src/partition_offsets/mod.rs index 395056d..c4e61c2 100644 --- a/src/partition_offsets/mod.rs +++ b/src/partition_offsets/mod.rs @@ -7,9 +7,7 @@ mod tracked_offset; // Exports pub use emitter::PartitionOffsetsEmitter; -pub use errors::PartitionOffsetsError; pub use register::PartitionOffsetsRegister; -pub use tracked_offset::TrackedOffset; // Imports use prometheus::Registry; diff --git a/src/partition_offsets/register.rs b/src/partition_offsets/register.rs index 4223322..96c800d 100644 --- a/src/partition_offsets/register.rs +++ b/src/partition_offsets/register.rs @@ -108,7 +108,7 @@ impl PartitionOffsetsRegister { .await .update(po.earliest_offset, po.latest_offset, po.read_datetime); - // Update metric + // Update usage metrics metric_usage .with_label_values(&[&k.topic, &k.partition.to_string()]) .set(estimator_rwlock.read().await.usage() as i64); diff --git a/src/prometheus_metrics/mod.rs b/src/prometheus_metrics/mod.rs index b02f595..49fa8a6 100644 --- a/src/prometheus_metrics/mod.rs +++ b/src/prometheus_metrics/mod.rs @@ -3,6 +3,12 @@ pub mod bespoke; use std::collections::HashMap; use prometheus::Registry; +use rdkafka::admin::AdminClient; +use rdkafka::client::DefaultClientContext; +use rdkafka::ClientConfig; +use tokio::time::Duration; + +use crate::constants::DEFAULT_CLUSTER_ID; pub const NAMESPACE: &str = "kmtd"; @@ -16,9 +22,23 @@ pub const LABEL_MEMBER_CLIENT_ID: &str = "member_client_id"; pub const UNKNOWN_VAL: &str = "UNKNOWN"; -pub fn init(cluster_id: String) -> Registry { +const FETCH_TIMEOUT: Duration = Duration::from_secs(10); + +pub fn init(client_config: ClientConfig, cluster_id_override: Option) -> Registry { + let cluster_id = match cluster_id_override { + Some(cid) => cid, + None => client_config + .create::>() + .expect("Failed to allocate Admin Client") + .inner() + .fetch_cluster_id(FETCH_TIMEOUT) + .unwrap_or_else(|| DEFAULT_CLUSTER_ID.to_string()), + }; + let prom_def_labels = HashMap::from([(LABEL_CLUSTER_ID.to_string(), cluster_id)]); + info!("Prometheus Metrics default labels:\n{:#?}", prom_def_labels); + Registry::new_custom(Some(NAMESPACE.to_string()), Some(prom_def_labels)) .expect("Unable to create a Prometheus Metrics Registry") }