diff --git a/Cargo.lock b/Cargo.lock
index 90a13d5..f2d8ec8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -117,9 +117,9 @@ dependencies = [
"bitflags 1.3.2",
"bytes",
"futures-util",
- "http",
- "http-body",
- "hyper",
+ "http 0.2.11",
+ "http-body 0.4.5",
+ "hyper 0.14.27",
"itoa",
"matchit",
"memchr",
@@ -147,8 +147,8 @@ dependencies = [
"async-trait",
"bytes",
"futures-util",
- "http",
- "http-body",
+ "http 0.2.11",
+ "http-body 0.4.5",
"mime",
"rustversion",
"tower-layer",
@@ -205,9 +205,9 @@ dependencies = [
[[package]]
name = "cc"
-version = "1.0.84"
+version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0f8e7c90afad890484a21653d08b6e209ae34770fb5ee298f9c699fcc1e5c856"
+checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
dependencies = [
"libc",
]
@@ -348,9 +348,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
-version = "0.3.6"
+version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e"
+checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8"
dependencies = [
"libc",
"windows-sys",
@@ -424,17 +424,17 @@ checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]]
name = "h2"
-version = "0.3.21"
+version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833"
+checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
- "http",
- "indexmap 1.9.3",
+ "http 0.2.11",
+ "indexmap",
"slab",
"tokio",
"tokio-util",
@@ -442,10 +442,23 @@ dependencies = [
]
[[package]]
-name = "hashbrown"
-version = "0.12.3"
+name = "h2"
+version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
+checksum = "e1d308f63daf4181410c242d34c11f928dcb3aa105852019e043c9d1f4e4368a"
+dependencies = [
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "futures-util",
+ "http 1.0.0",
+ "indexmap",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
[[package]]
name = "hashbrown"
@@ -467,9 +480,20 @@ checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
[[package]]
name = "http"
-version = "0.2.10"
+version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f95b9abcae896730d42b78e09c155ed4ddf82c07b4de772c64aee5b2d8b7c150"
+checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
+[[package]]
+name = "http"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea"
dependencies = [
"bytes",
"fnv",
@@ -483,10 +507,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
- "http",
+ "http 0.2.11",
"pin-project-lite",
]
+[[package]]
+name = "http-body"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
+dependencies = [
+ "bytes",
+ "http 1.0.0",
+]
+
[[package]]
name = "httparse"
version = "1.8.0"
@@ -515,9 +549,9 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
- "h2",
- "http",
- "http-body",
+ "h2 0.3.22",
+ "http 0.2.11",
+ "http-body 0.4.5",
"httparse",
"httpdate",
"itoa",
@@ -529,6 +563,25 @@ dependencies = [
"want",
]
+[[package]]
+name = "hyper"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "403f9214f3e703236b221f1a9cd88ec8b4adfa5296de01ab96216361f4692f56"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "h2 0.4.0",
+ "http 1.0.0",
+ "http-body 1.0.0",
+ "httparse",
+ "httpdate",
+ "itoa",
+ "pin-project-lite",
+ "tokio",
+]
+
[[package]]
name = "iana-time-zone"
version = "0.1.58"
@@ -552,16 +605,6 @@ dependencies = [
"cc",
]
-[[package]]
-name = "indexmap"
-version = "1.9.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
-dependencies = [
- "autocfg",
- "hashbrown 0.12.3",
-]
-
[[package]]
name = "indexmap"
version = "2.1.0"
@@ -569,7 +612,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f"
dependencies = [
"equivalent",
- "hashbrown 0.14.2",
+ "hashbrown",
]
[[package]]
@@ -610,7 +653,7 @@ dependencies = [
"ctrlc",
"env_logger",
"exit-code",
- "hyper",
+ "hyper 1.0.1",
"konsumer_offsets",
"log",
"prometheus",
@@ -1017,9 +1060,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
-version = "0.38.21"
+version = "0.38.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3"
+checksum = "dc99bc2d4f1fed22595588a013687477aedf3cdcfb26558c559edb67b4d9b22e"
dependencies = [
"bitflags 2.4.1",
"errno",
@@ -1193,9 +1236,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "termcolor"
-version = "1.3.0"
+version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6093bad37da69aab9d123a8091e4be0aa4a03e4d601ec641c327398315f62b64"
+checksum = "ff1bc3d3f05aff0403e8ac0d92ced918ec05b666a43f83297ccef5bea8a3d449"
dependencies = [
"winapi-util",
]
@@ -1284,7 +1327,7 @@ version = "0.19.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
dependencies = [
- "indexmap 2.1.0",
+ "indexmap",
"toml_datetime",
"winnow",
]
diff --git a/Cargo.toml b/Cargo.toml
index 222277d..e9df13b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,7 +35,7 @@ const_format = "0.2.32"
ctrlc = { version = "3.4.1", features = ["termination"] }
env_logger = "0.10.1"
exit-code = "1.0.0"
-hyper = { version = "0.14.27", features = ["http1", "http2", "server", "runtime", "tcp"] }
+hyper = { version = "1.0.1", features = ["http1", "http2", "server"] }
konsumer_offsets = { version = "0.3.0", default-features = false, features = ["ts_chrono"] }
log = "0.4.20"
prometheus = "0.13.3"
diff --git a/METRICS.md b/METRICS.md
index 3225ac7..baf292f 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -96,7 +96,7 @@ to produce the lag information for).
kmtd_consumer_groups_total
-
- Description: "Consumer groups currently in the cluster.
+ Description: Consumer groups currently in the cluster.
Labels: cluster_id
Type: gauge
Timestamped: false
@@ -113,8 +113,72 @@ to produce the lag information for).
+
+ kmtd_cluster_brokers_total
+ -
+ Description: Brokers currently in cluster.
+ Labels: cluster_id
+ Type: gauge
+ Timestamped: false
+
+
+
+
+ kmtd_cluster_topics_total
+ -
+ Description: Topics currently in cluster.
+ Labels: cluster_id
+ Type: gauge
+ Timestamped: false
+
+
+
+
+ kmtd_cluster_partitions_total
+ -
+ Description: Partitions currently in cluster.
+ Labels: cluster_id
+ Type: gauge
+ Timestamped: false
+
+
+
+
+ kmtd_cluster_topic_partitions_total
+ -
+ Description: Topic's Partitions currently in cluster.
+ Labels: cluster_id, topic
+ Type: gauge
+ Timestamped: false
+
+
+
### Kommitted (internal) Metrics
+#### `cluster_status` module
+
+
+ kmtd_cluster_status_emitter_fetch_time_milliseconds
+ -
+ Description: Time (ms) taken to fetch cluster status metadata.
+ Labels: cluster_id
+ Type: histogram
+ Timestamped: false
+
+
+
+
+ kmtd_cluster_status_emitter_channel_capacity
+ -
+ Description: Capacity of internal channel used to send cluster status metadata to rest of the service.
+ Labels: cluster_id
+ Type: gauge
+ Timestamped: false
+
+
+
+#### `consumer_groups` module
+
kmtd_consumer_groups_emitter_fetch_time_milliseconds
-
@@ -135,6 +199,8 @@ to produce the lag information for).
+#### `partition_offsets` module
+
kmtd_partition_offsets_emitter_fetch_time_milliseconds
-
diff --git a/src/cluster_status/emitter.rs b/src/cluster_status/emitter.rs
index cb47791..114ced1 100644
--- a/src/cluster_status/emitter.rs
+++ b/src/cluster_status/emitter.rs
@@ -1,4 +1,10 @@
+use std::sync::Arc;
+
use async_trait::async_trait;
+use prometheus::{
+ register_histogram_with_registry, register_int_gauge_with_registry, Histogram, IntGauge,
+ Registry,
+};
use rdkafka::{admin::AdminClient, client::DefaultClientContext, metadata::Metadata, ClientConfig};
use tokio::{
sync::mpsc,
@@ -7,7 +13,7 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
-use crate::constants::KONSUMER_OFFSETS_DATA_TOPIC;
+use crate::constants::{DEFAULT_CLUSTER_ID, KONSUMER_OFFSETS_DATA_TOPIC};
use crate::internals::Emitter;
use crate::kafka_types::{Broker, TopicPartitionsStatus};
@@ -16,7 +22,11 @@ const CHANNEL_SIZE: usize = 5;
const FETCH_TIMEOUT: Duration = Duration::from_secs(10);
const FETCH_INTERVAL: Duration = Duration::from_secs(60);
-pub const CLUSTER_ID_NONE: &str = "__none__";
+const MET_FETCH_NAME: &str = "cluster_status_emitter_fetch_time_milliseconds";
+const MET_FETCH_HELP: &str = "Time (ms) taken to fetch cluster status metadata";
+const MET_CH_CAP_NAME: &str = "cluster_status_emitter_channel_capacity";
+const MET_CH_CAP_HELP: &str =
+ "Capacity of internal channel used to send cluster status metadata to rest of the service";
/// This is a `Send`-able struct to carry Kafka Cluster status across thread boundaries.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Default, Hash)]
@@ -39,7 +49,7 @@ pub struct ClusterStatus {
impl ClusterStatus {
fn from(id: Option, m: Metadata) -> Self {
Self {
- id: id.unwrap_or_else(|| CLUSTER_ID_NONE.to_string()),
+ id: id.unwrap_or_else(|| DEFAULT_CLUSTER_ID.to_string()),
topics: m
.topics()
.iter()
@@ -60,6 +70,10 @@ impl ClusterStatus {
/// It shuts down when the provided [`CancellationToken`] is cancelled.
pub struct ClusterStatusEmitter {
admin_client_config: ClientConfig,
+
+ // Prometheus Metrics
+ metric_fetch: Histogram,
+ metric_ch_cap: IntGauge,
}
impl ClusterStatusEmitter {
@@ -68,9 +82,21 @@ impl ClusterStatusEmitter {
/// # Arguments
///
/// * `client_config` - Kafka admin client configuration, used to fetch the Cluster current status
- pub fn new(client_config: ClientConfig) -> Self {
+ pub fn new(client_config: ClientConfig, metrics: Arc) -> Self {
Self {
admin_client_config: client_config,
+ metric_fetch: register_histogram_with_registry!(
+ MET_FETCH_NAME,
+ MET_FETCH_HELP,
+ metrics
+ )
+ .unwrap_or_else(|_| panic!("Failed to create metric: {MET_FETCH_NAME}")),
+ metric_ch_cap: register_int_gauge_with_registry!(
+ MET_CH_CAP_NAME,
+ MET_CH_CAP_HELP,
+ metrics
+ )
+ .unwrap_or_else(|_| panic!("Failed to create metric: {MET_CH_CAP_NAME}")),
}
}
}
@@ -98,14 +124,27 @@ impl Emitter for ClusterStatusEmitter {
let (sx, rx) = mpsc::channel::(CHANNEL_SIZE);
+ // Clone metrics so they can be used in the spawned future
+ let metric_fetch = self.metric_fetch.clone();
+ let metric_ch_cap = self.metric_ch_cap.clone();
+
let join_handle = tokio::spawn(async move {
let mut interval = interval(FETCH_INTERVAL);
loop {
- match admin_client.inner().fetch_metadata(None, FETCH_TIMEOUT).map(|m| {
- Self::Emitted::from(admin_client.inner().fetch_cluster_id(FETCH_TIMEOUT), m)
- }) {
+ // Fetch metadata and update timer metric
+ let timer = metric_fetch.start_timer();
+ let res_status =
+ admin_client.inner().fetch_metadata(None, FETCH_TIMEOUT).map(|m| {
+ Self::Emitted::from(admin_client.inner().fetch_cluster_id(FETCH_TIMEOUT), m)
+ });
+ timer.observe_duration();
+
+ match res_status {
Ok(status) => {
+ // Update channel capacity metric
+ metric_ch_cap.set(sx.capacity() as i64);
+
tokio::select! {
res = Self::emit_with_interval(&sx, status, &mut interval) => {
if let Err(e) = res {
diff --git a/src/cluster_status/mod.rs b/src/cluster_status/mod.rs
index b8354d9..ebf1435 100644
--- a/src/cluster_status/mod.rs
+++ b/src/cluster_status/mod.rs
@@ -2,11 +2,14 @@
mod emitter;
mod register;
+use std::sync::Arc;
+
// Exports
pub use emitter::ClusterStatusEmitter;
pub use register::ClusterStatusRegister;
// Imports
+use prometheus::Registry;
use rdkafka::ClientConfig;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
@@ -17,10 +20,12 @@ pub fn init(
admin_client_config: ClientConfig,
cluster_id_override: Option,
shutdown_token: CancellationToken,
+ metrics: Arc,
) -> (ClusterStatusRegister, JoinHandle<()>) {
// Cluster Status: emitter and register
- let (cs_rx, cse_join) = ClusterStatusEmitter::new(admin_client_config).spawn(shutdown_token);
- let cs_reg = ClusterStatusRegister::new(cluster_id_override, cs_rx);
+ let (cs_rx, cse_join) =
+ ClusterStatusEmitter::new(admin_client_config, metrics.clone()).spawn(shutdown_token);
+ let cs_reg = ClusterStatusRegister::new(cluster_id_override, cs_rx, metrics);
debug!("Initialized");
(cs_reg, cse_join)
diff --git a/src/cluster_status/register.rs b/src/cluster_status/register.rs
index 4aa50bf..75384b8 100644
--- a/src/cluster_status/register.rs
+++ b/src/cluster_status/register.rs
@@ -1,12 +1,27 @@
use std::sync::Arc;
use async_trait::async_trait;
+use prometheus::{
+ register_int_gauge_vec_with_registry, register_int_gauge_with_registry, IntGauge, IntGaugeVec,
+ Registry,
+};
use tokio::sync::{mpsc::Receiver, RwLock};
use super::emitter::ClusterStatus;
+use crate::constants::DEFAULT_CLUSTER_ID;
use crate::internals::Awaitable;
use crate::kafka_types::{Broker, TopicPartition};
+use crate::prometheus_metrics::LABEL_TOPIC;
+
+const MET_BROKERS_TOT_NAME: &str = "cluster_brokers_total";
+const MET_BROKERS_TOT_HELP: &str = "Brokers currently in cluster";
+const MET_TOPICS_TOT_NAME: &str = "cluster_topics_total";
+const MET_TOPICS_TOT_HELP: &str = "Topics currently in cluster";
+const MET_PARTITIONS_TOT_NAME: &str = "cluster_partitions_total";
+const MET_PARTITIONS_TOT_HELP: &str = "Partitions currently in cluster";
+const MET_TOPIC_PARTITIONS_TOT_NAME: &str = "cluster_topic_partitions_total";
+const MET_TOPIC_PARTITIONS_TOT_HELP: &str = "Topic's Partitions currently in cluster";
/// Registers and exposes the latest [`ClusterStatus`].
///
@@ -15,18 +30,59 @@ use crate::kafka_types::{Broker, TopicPartition};
#[derive(Debug)]
pub struct ClusterStatusRegister {
latest_status: Arc>>,
+
+ // Prometheus Metrics
+ metric_brokers: IntGauge,
+ metric_topics: IntGauge,
+ metric_partitions: IntGauge,
+ metric_topic_partitions: IntGaugeVec,
}
impl ClusterStatusRegister {
- pub fn new(cluster_id_override: Option, mut rx: Receiver) -> Self {
+ pub fn new(
+ cluster_id_override: Option,
+ mut rx: Receiver,
+ metrics: Arc,
+ ) -> Self {
let csr = Self {
latest_status: Arc::new(RwLock::new(None)),
+ metric_brokers: register_int_gauge_with_registry!(
+ MET_BROKERS_TOT_NAME,
+ MET_BROKERS_TOT_HELP,
+ metrics
+ )
+ .unwrap_or_else(|_| panic!("Failed to create metric: {MET_BROKERS_TOT_NAME}")),
+ metric_topics: register_int_gauge_with_registry!(
+ MET_TOPICS_TOT_NAME,
+ MET_TOPICS_TOT_HELP,
+ metrics
+ )
+ .unwrap_or_else(|_| panic!("Failed to create metric: {MET_TOPICS_TOT_NAME}")),
+ metric_partitions: register_int_gauge_with_registry!(
+ MET_PARTITIONS_TOT_NAME,
+ MET_PARTITIONS_TOT_HELP,
+ metrics
+ )
+ .unwrap_or_else(|_| panic!("Failed to create metric: {MET_PARTITIONS_TOT_NAME}")),
+ metric_topic_partitions: register_int_gauge_vec_with_registry!(
+ MET_TOPIC_PARTITIONS_TOT_NAME,
+ MET_TOPIC_PARTITIONS_TOT_HELP,
+ &[LABEL_TOPIC],
+ metrics
+ )
+ .unwrap_or_else(|_| panic!("Failed to create metric: {MET_TOPIC_PARTITIONS_TOT_NAME}")),
};
// A clone of the `csr.latest_status` will be moved into the async task
// that updates the register.
let latest_status_arc_clone = csr.latest_status.clone();
+ // Clone metrics so they can be used in the spawned future
+ let metric_brokers = csr.metric_brokers.clone();
+ let metric_topics = csr.metric_topics.clone();
+ let metric_partitions = csr.metric_partitions.clone();
+ let metric_topic_partitions = csr.metric_topic_partitions.clone();
+
// The Register is essentially "self updating" its data, by listening
// on a channel for updates.
//
@@ -51,6 +107,19 @@ impl ClusterStatusRegister {
cs.id, cs.topics.len(), cs.brokers.len()
);
+ // Update cluster status metrics (broker, topics, partitions)
+ metric_brokers.set(cs.brokers.len() as i64);
+ metric_topics.set(cs.topics.len() as i64);
+ let mut partitions_total = 0;
+ for t in cs.topics.iter() {
+ metric_topic_partitions
+ .with_label_values(&[&t.name])
+ .set(t.partitions.len() as i64);
+ partitions_total += t.partitions.len();
+ }
+ metric_partitions.set(partitions_total as i64);
+
+ // Set the latest cluster status
*(latest_status_arc_clone.write().await) = Some(cs);
},
else => {
@@ -67,7 +136,7 @@ impl ClusterStatusRegister {
/// Current identifier of the Kafka cluster.
pub async fn get_cluster_id(&self) -> String {
match &*(self.latest_status.read().await) {
- None => super::emitter::CLUSTER_ID_NONE.to_string(),
+ None => DEFAULT_CLUSTER_ID.to_string(),
Some(cs) => cs.id.clone(),
}
}
diff --git a/src/constants.rs b/src/constants.rs
index e2c7e08..f7b8b2a 100644
--- a/src/constants.rs
+++ b/src/constants.rs
@@ -24,3 +24,6 @@ pub(crate) const DEFAULT_OFFSETS_HISTORY: &str = "3600"; //< `usize` after parsi
///
/// See [`crate::Cli`]'s `offsets_history_ready_at`.
pub(crate) const DEFAULT_OFFSETS_HISTORY_READY_AT: &str = "0.3"; //< `f64` after parsing
+
+/// The default `cluster_id` value, if none is provided (either via CLI override, nor Cluster configuration).
+pub(crate) const DEFAULT_CLUSTER_ID: &str = "__not-set__";
diff --git a/src/consumer_groups/emitter.rs b/src/consumer_groups/emitter.rs
index 53fc9be..cfac30f 100644
--- a/src/consumer_groups/emitter.rs
+++ b/src/consumer_groups/emitter.rs
@@ -189,16 +189,15 @@ impl Emitter for ConsumerGroupsEmitter {
let mut interval = interval(FETCH_INTERVAL);
loop {
+ // Fetch Consumer Groups and update timer metrics
let timer = metric_cg_fetch.start_timer();
- let res_groups = admin_client
+ let res_cg = admin_client
.inner()
.fetch_group_list(None, FETCH_TIMEOUT)
.map(Self::Emitted::from);
-
- // Update fetching time metric
timer.observe_duration();
- match res_groups {
+ match res_cg {
Ok(cg) => {
// Update group and group member metrics
metric_cg.set(cg.groups.len() as i64);
diff --git a/src/main.rs b/src/main.rs
index 3e2fcb9..2804caf 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -28,19 +28,20 @@ async fn main() -> Result<(), Box> {
let admin_client_config = cli.build_client_config();
let shutdown_token = build_shutdown_token();
+ // Init `prometheus_metrics` module
+ let prom_reg = prometheus_metrics::init(admin_client_config.clone(), cli.cluster_id.clone());
+ let prom_reg_arc = Arc::new(prom_reg);
+
// Init `cluster_status` module, and await registry to be ready
let (cs_reg, cs_join) = cluster_status::init(
admin_client_config.clone(),
cli.cluster_id.clone(),
shutdown_token.clone(),
+ prom_reg_arc.clone(),
);
cs_reg.await_ready(shutdown_token.clone()).await?;
let cs_reg_arc = Arc::new(cs_reg);
- // Init `prometheus_metrics` module
- let prom_reg = prometheus_metrics::init(cs_reg_arc.get_cluster_id().await);
- let prom_reg_arc = Arc::new(prom_reg);
-
// Init `partition_offsets` module, and await registry to be ready
let (po_reg, po_join) = partition_offsets::init(
admin_client_config.clone(),
diff --git a/src/partition_offsets/emitter.rs b/src/partition_offsets/emitter.rs
index 27c1d68..8253c65 100644
--- a/src/partition_offsets/emitter.rs
+++ b/src/partition_offsets/emitter.rs
@@ -23,10 +23,10 @@ const CHANNEL_SIZE: usize = 10_000;
const FETCH_TIMEOUT: Duration = Duration::from_secs(10);
const FETCH_INTERVAL: Duration = Duration::from_millis(10);
-const MET_FETCH_NAME: &str = "_partition_offsets_emitter_fetch_time_milliseconds";
+const MET_FETCH_NAME: &str = "partition_offsets_emitter_fetch_time_milliseconds";
const MET_FETCH_HELP: &str =
"Time (ms) taken to fetch earliest/latest (watermark) offsets of a specific topic partition in cluster";
-const MET_CH_CAP_NAME: &str = "_partition_offsets_emitter_channel_capacity";
+const MET_CH_CAP_NAME: &str = "partition_offsets_emitter_channel_capacity";
const MET_CH_CAP_HELP: &str =
"Capacity of internal channel used to send partition watermark offsets to rest of the service";
@@ -128,14 +128,15 @@ impl Emitter for PartitionOffsetsEmitter {
trace!("Fetching earliest/latest offset for Partitions of Topic '{}'", t);
for p in csr.get_partitions_for_topic(&t).await.unwrap_or_default() {
+ // Fetch Partition Watermarks and update timer metrics
let timer =
metric_cg_fetch.with_label_values(&[&t, &p.to_string()]).start_timer();
+ let res_watermarks =
+ admin_client.inner().fetch_watermarks(&t, p as i32, FETCH_TIMEOUT);
+ timer.observe_duration();
- match admin_client.inner().fetch_watermarks(&t, p as i32, FETCH_TIMEOUT) {
+ match res_watermarks {
Ok((earliest, latest)) => {
- // Update fetching time metric for (topic,partition) tuple
- timer.observe_duration();
-
let po = PartitionOffset {
topic: t.clone(),
partition: p,
diff --git a/src/partition_offsets/mod.rs b/src/partition_offsets/mod.rs
index 395056d..c4e61c2 100644
--- a/src/partition_offsets/mod.rs
+++ b/src/partition_offsets/mod.rs
@@ -7,9 +7,7 @@ mod tracked_offset;
// Exports
pub use emitter::PartitionOffsetsEmitter;
-pub use errors::PartitionOffsetsError;
pub use register::PartitionOffsetsRegister;
-pub use tracked_offset::TrackedOffset;
// Imports
use prometheus::Registry;
diff --git a/src/partition_offsets/register.rs b/src/partition_offsets/register.rs
index 4223322..96c800d 100644
--- a/src/partition_offsets/register.rs
+++ b/src/partition_offsets/register.rs
@@ -108,7 +108,7 @@ impl PartitionOffsetsRegister {
.await
.update(po.earliest_offset, po.latest_offset, po.read_datetime);
- // Update metric
+ // Update usage metrics
metric_usage
.with_label_values(&[&k.topic, &k.partition.to_string()])
.set(estimator_rwlock.read().await.usage() as i64);
diff --git a/src/prometheus_metrics/mod.rs b/src/prometheus_metrics/mod.rs
index b02f595..49fa8a6 100644
--- a/src/prometheus_metrics/mod.rs
+++ b/src/prometheus_metrics/mod.rs
@@ -3,6 +3,12 @@ pub mod bespoke;
use std::collections::HashMap;
use prometheus::Registry;
+use rdkafka::admin::AdminClient;
+use rdkafka::client::DefaultClientContext;
+use rdkafka::ClientConfig;
+use tokio::time::Duration;
+
+use crate::constants::DEFAULT_CLUSTER_ID;
pub const NAMESPACE: &str = "kmtd";
@@ -16,9 +22,23 @@ pub const LABEL_MEMBER_CLIENT_ID: &str = "member_client_id";
pub const UNKNOWN_VAL: &str = "UNKNOWN";
-pub fn init(cluster_id: String) -> Registry {
+const FETCH_TIMEOUT: Duration = Duration::from_secs(10);
+
+pub fn init(client_config: ClientConfig, cluster_id_override: Option) -> Registry {
+ let cluster_id = match cluster_id_override {
+ Some(cid) => cid,
+ None => client_config
+ .create::>()
+ .expect("Failed to allocate Admin Client")
+ .inner()
+ .fetch_cluster_id(FETCH_TIMEOUT)
+ .unwrap_or_else(|| DEFAULT_CLUSTER_ID.to_string()),
+ };
+
let prom_def_labels = HashMap::from([(LABEL_CLUSTER_ID.to_string(), cluster_id)]);
+ info!("Prometheus Metrics default labels:\n{:#?}", prom_def_labels);
+
Registry::new_custom(Some(NAMESPACE.to_string()), Some(prom_def_labels))
.expect("Unable to create a Prometheus Metrics Registry")
}