Skip to content

Commit

Permalink
New partition_offsets module metrics (#93)
Browse files Browse the repository at this point in the history
* Using utility macros to create and register metrics for `consumer_groups` module

These were added in #91. Nothing changes: just the code to wire it up is a bit simpler/cleaner now.

* Adding metrics to `partition_offsets` emtitter, and documenting them
  * `kmtd_partition_offsets_emitter_fetch_time_milliseconds`
  * `kmtd__partition_offsets_emitter_channel_capacity`

* Deps upgrade

* Cleaning up const and var names used to create the new metrics

Nothing major: some just renaming that I couldn't leave inconsistent.

* New metric `kmtd_partition_offsets_register_usage` to track how many offsets we are tracking internally for each topic-partition

* Documenting the new metric `kmtd_partition_offsets_register_usage`

* No need to prefix each metric handled by the `prometheus` package with `{NAMESPACE}`

This is set already on the registry, so it gets added automatically.
  • Loading branch information
detro authored Nov 8, 2023
1 parent 32a5af6 commit e8df7b9
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 84 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ tokio = { version = "1.33.0", features = ["rt", "rt-multi-thread", "time", "sync
tokio-util = "0.7.10"

[target.'cfg(unix)'.dependencies]
rdkafka = { version = "0.34.0", features = ["ssl-vendored", "gssapi-vendored", "libz-static"] }
rdkafka = { version = "0.36.0", features = ["ssl-vendored", "gssapi-vendored", "libz-static"] }

[profile.release]
strip = true # Automatically strip symbols from the binary.
Expand Down
40 changes: 35 additions & 5 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ to produce the lag information for).
<b>Description:</b> <i>"Consumer groups currently in the cluster.</i><br/>
<b>Labels:</b> <code>cluster_id</code><br/>
<b>Type:</b> <code>gauge</code><br/>
<b>Timestamped:</b> <code>true</code>
<b>Timestamped:</b> <code>false</code>
</dd>
</dl>

Expand All @@ -109,7 +109,7 @@ to produce the lag information for).
<b>Description:</b> <i>Members of consumer groups currently in the cluster.</i><br/>
<b>Labels:</b> <code>cluster_id, group</code><br/>
<b>Type:</b> <code>gauge</code><br/>
<b>Timestamped:</b> <code>true</code>
<b>Timestamped:</b> <code>false</code>
</dd>
</dl>

Expand All @@ -118,10 +118,10 @@ to produce the lag information for).
<dl>
<dt><code>kmtd_consumer_groups_emitter_fetch_time_milliseconds</code></dt>
<dd>
<b>Description:</b> <i>Time (in milliseconds) taken to fetch information about all consumer groups in cluster.</i><br/>
<b>Description:</b> <i>Time (ms) taken to fetch information about all consumer groups in cluster.</i><br/>
<b>Labels:</b> <code>cluster_id</code><br/>
<b>Type:</b> <code>histogram</code><br/>
<b>Timestamped:</b> <code>true</code>
<b>Timestamped:</b> <code>false</code>
</dd>
</dl>

Expand All @@ -131,7 +131,37 @@ to produce the lag information for).
<b>Description:</b> <i>Capacity of internal channel used to send consumer groups metadata to rest of the service.</i><br/>
<b>Labels:</b> <code>cluster_id</code><br/>
<b>Type:</b> <code>gauge</code><br/>
<b>Timestamped:</b> <code>true</code>
<b>Timestamped:</b> <code>false</code>
</dd>
</dl>

<dl>
<dt><code>kmtd_partition_offsets_emitter_fetch_time_milliseconds</code></dt>
<dd>
<b>Description:</b> <i>Time (ms) taken to fetch earliest/latest (watermark) offsets of a specific topic partition in cluster.</i><br/>
<b>Labels:</b> <code>cluster_id, topic, partition</code><br/>
<b>Type:</b> <code>histogram</code><br/>
<b>Timestamped:</b> <code>false</code>
</dd>
</dl>

<dl>
<dt><code>kmtd_partition_offsets_emitter_channel_capacity</code></dt>
<dd>
<b>Description:</b> <i>Capacity of internal channel used to send partition watermark offsets to rest of the service.</i><br/>
<b>Labels:</b> <code>cluster_id</code><br/>
<b>Type:</b> <code>gauge</code><br/>
<b>Timestamped:</b> <code>false</code>
</dd>
</dl>

<dl>
<dt><code>kmtd_partition_offsets_register_usage</code></dt>
<dd>
<b>Description:</b> <i>Amount of offsets tracked per topic partition.</i><br/>
<b>Labels:</b> <code>cluster_id, topic, partition</code><br/>
<b>Type:</b> <code>gauge</code><br/>
<b>Timestamped:</b> <code>false</code>
</dd>
</dl>

Expand Down
94 changes: 43 additions & 51 deletions src/consumer_groups/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use std::{
};

use async_trait::async_trait;
use const_format::formatcp;
use konsumer_offsets::ConsumerProtocolAssignment;
use prometheus::{Histogram, HistogramOpts, IntGauge, IntGaugeVec, Opts, Registry};
use prometheus::{
register_histogram_with_registry, register_int_gauge_vec_with_registry,
register_int_gauge_with_registry, Histogram, IntGauge, IntGaugeVec, Registry,
};
use rdkafka::{admin::AdminClient, client::DefaultClientContext, groups::GroupList, ClientConfig};
use tokio::{
sync::mpsc,
Expand All @@ -18,23 +20,22 @@ use tokio_util::sync::CancellationToken;
use crate::constants::KOMMITTED_CONSUMER_OFFSETS_CONSUMER;
use crate::internals::Emitter;
use crate::kafka_types::{Group, GroupWithMembers, Member, MemberWithAssignment, TopicPartition};
use crate::prometheus_metrics::{LABEL_GROUP, NAMESPACE};
use crate::prometheus_metrics::LABEL_GROUP;

const CHANNEL_SIZE: usize = 5;

const FETCH_TIMEOUT: Duration = Duration::from_secs(10);
const FETCH_INTERVAL: Duration = Duration::from_secs(60);

const M_CG_NAME: &str = formatcp!("{NAMESPACE}_consumer_groups_total");
const M_CG_HELP: &str = "Consumer groups currently in the cluster";
const M_CG_MEMBERS_NAME: &str = formatcp!("{NAMESPACE}_consumer_groups_members_total");
const M_CG_MEMBERS_HELP: &str = "Members of consumer groups currently in the cluster";
const M_CG_FETCH_NAME: &str =
formatcp!("{NAMESPACE}_consumer_groups_emitter_fetch_time_milliseconds");
const M_CG_FETCH_HELP: &str =
"Time (in milliseconds) taken to fetch information about all consumer groups in cluster";
const M_CG_CH_CAP_NAME: &str = formatcp!("{NAMESPACE}_consumer_groups_emitter_channel_capacity");
const M_CG_CH_CAP_HELP: &str =
const MET_TOT_NAME: &str = "consumer_groups_total";
const MET_TOT_HELP: &str = "Consumer groups currently in the cluster";
const MET_MEMBERS_TOT_NAME: &str = "consumer_groups_members_total";
const MET_MEMBERS_TOT_HELP: &str = "Members of consumer groups currently in the cluster";
const MET_FETCH_NAME: &str = "consumer_groups_emitter_fetch_time_milliseconds";
const MET_FETCH_HELP: &str =
"Time (ms) taken to fetch information about all consumer groups in cluster";
const MET_CH_CAP_NAME: &str = "consumer_groups_emitter_channel_capacity";
const MET_CH_CAP_HELP: &str =
"Capacity of internal channel used to send consumer groups metadata to rest of the service";

/// A map of all the known Consumer Groups, at a given point in time.
Expand Down Expand Up @@ -115,10 +116,10 @@ pub struct ConsumerGroupsEmitter {
admin_client_config: ClientConfig,

// Prometheus Metrics
metric_cg: IntGauge,
metric_cg_members: IntGaugeVec,
metric_cg_fetch: Histogram,
metric_cg_ch_cap: IntGauge,
metric_tot: IntGauge,
metric_members_tot: IntGaugeVec,
metric_fetch: Histogram,
metric_ch_cap: IntGauge,
}

impl ConsumerGroupsEmitter {
Expand All @@ -128,38 +129,29 @@ impl ConsumerGroupsEmitter {
///
/// * `admin_client_config` - Kafka admin client configuration, used to fetch Consumer Groups
pub fn new(admin_client_config: ClientConfig, metrics: Arc<Registry>) -> Self {
// Create metrics
let metric_cg = IntGauge::new(M_CG_NAME, M_CG_HELP)
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_NAME}"));
let metric_cg_members =
IntGaugeVec::new(Opts::new(M_CG_MEMBERS_NAME, M_CG_MEMBERS_HELP), &[LABEL_GROUP])
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_MEMBERS_NAME}"));
let metric_cg_fetch =
Histogram::with_opts(HistogramOpts::new(M_CG_FETCH_NAME, M_CG_FETCH_HELP))
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_FETCH_NAME}"));
let metric_cg_ch_cap = IntGauge::new(M_CG_CH_CAP_NAME, M_CG_CH_CAP_HELP)
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_CH_CAP_NAME}"));

// Register metrics
metrics
.register(Box::new(metric_cg.clone()))
.unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_NAME}"));
metrics
.register(Box::new(metric_cg_members.clone()))
.unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_MEMBERS_NAME}"));
metrics
.register(Box::new(metric_cg_fetch.clone()))
.unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_FETCH_NAME}"));
metrics
.register(Box::new(metric_cg_ch_cap.clone()))
.unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_CH_CAP_NAME}"));

Self {
admin_client_config,
metric_cg,
metric_cg_members,
metric_cg_fetch,
metric_cg_ch_cap,
metric_tot: register_int_gauge_with_registry!(MET_TOT_NAME, MET_TOT_HELP, metrics)
.unwrap_or_else(|_| panic!("Failed to create metric: {MET_TOT_NAME}")),
metric_members_tot: register_int_gauge_vec_with_registry!(
MET_MEMBERS_TOT_NAME,
MET_MEMBERS_TOT_HELP,
&[LABEL_GROUP],
metrics
)
.unwrap_or_else(|_| panic!("Failed to create metric: {MET_MEMBERS_TOT_NAME}")),
metric_fetch: register_histogram_with_registry!(
MET_FETCH_NAME,
MET_FETCH_HELP,
metrics
)
.unwrap_or_else(|_| panic!("Failed to create metric: {MET_FETCH_NAME}")),
metric_ch_cap: register_int_gauge_with_registry!(
MET_CH_CAP_NAME,
MET_CH_CAP_HELP,
metrics
)
.unwrap_or_else(|_| panic!("Failed to create metric: {MET_CH_CAP_NAME}")),
}
}
}
Expand Down Expand Up @@ -188,10 +180,10 @@ impl Emitter for ConsumerGroupsEmitter {
let (sx, rx) = mpsc::channel::<Self::Emitted>(CHANNEL_SIZE);

// Clone metrics so they can be used in the spawned future
let metric_cg = self.metric_cg.clone();
let metric_cg_members = self.metric_cg_members.clone();
let metric_cg_fetch = self.metric_cg_fetch.clone();
let metric_cg_ch_cap = self.metric_cg_ch_cap.clone();
let metric_cg = self.metric_tot.clone();
let metric_cg_members = self.metric_members_tot.clone();
let metric_cg_fetch = self.metric_fetch.clone();
let metric_cg_ch_cap = self.metric_ch_cap.clone();

let join_handle = tokio::spawn(async move {
let mut interval = interval(FETCH_INTERVAL);
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
cli.offsets_history_ready_at,
cs_reg_arc.clone(),
shutdown_token.clone(),
prom_reg_arc.clone(),
);
po_reg.await_ready(shutdown_token.clone()).await?;
let po_reg_arc = Arc::new(po_reg);
Expand Down
Loading

0 comments on commit e8df7b9

Please sign in to comment.