Skip to content

Commit

Permalink
refactor: add metrics registry for all components
Browse files Browse the repository at this point in the history
Most components had metric names hardcoded. Adding a metrics registry for each component to contain all metric name templates, and export them for document generation.
  • Loading branch information
jeqo committed Oct 14, 2024
1 parent 4821be9 commit daec916
Show file tree
Hide file tree
Showing 11 changed files with 1,010 additions and 129 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.metrics;

import java.util.List;

import org.apache.kafka.common.MetricNameTemplate;

public class CaffeineMetricsRegistry {

static final String CACHE_HITS = "cache-hits";
static final String CACHE_HITS_TOTAL = CACHE_HITS + "-total";
static final String CACHE_MISSES = "cache-misses";
static final String CACHE_MISSES_TOTAL = CACHE_MISSES + "-total";
static final String CACHE_LOAD = "cache-load";
static final String CACHE_LOAD_SUCCESS = CACHE_LOAD + "-success";
static final String CACHE_LOAD_SUCCESS_TOTAL = CACHE_LOAD_SUCCESS + "-total";
static final String CACHE_LOAD_SUCCESS_TIME = CACHE_LOAD_SUCCESS + "-time";
static final String CACHE_LOAD_SUCCESS_TIME_TOTAL = CACHE_LOAD_SUCCESS_TIME + "-total";
static final String CACHE_LOAD_FAILURE = CACHE_LOAD + "-failure";
static final String CACHE_LOAD_FAILURE_TOTAL = CACHE_LOAD_FAILURE + "-total";
static final String CACHE_LOAD_FAILURE_TIME = CACHE_LOAD_FAILURE + "-time";
static final String CACHE_LOAD_FAILURE_TIME_TOTAL = CACHE_LOAD_FAILURE_TIME + "-total";

static final String CACHE_EVICTION = "cache-eviction";
static final String CACHE_EVICTION_TOTAL = CACHE_EVICTION + "-total";
static final String CACHE_EVICTION_WEIGHT = CACHE_EVICTION + "-weight";
static final String CACHE_EVICTION_WEIGHT_TOTAL = CACHE_EVICTION_WEIGHT + "-total";

static final String CACHE_SIZE = "cache-size";
static final String CACHE_SIZE_TOTAL = CACHE_SIZE + "-total";

final String groupName;

final MetricNameTemplate cacheHitsMetricName;
final MetricNameTemplate cacheMissesMetricName;
final MetricNameTemplate cacheLoadSuccessMetricName;
final MetricNameTemplate cacheLoadSuccessTimeMetricName;
final MetricNameTemplate cacheLoadFailureMetricName;
final MetricNameTemplate cacheLoadFailureTimeMetricName;
final MetricNameTemplate cacheEvictionMetricName;
final MetricNameTemplate cacheEvictionByCauseMetricName;
final MetricNameTemplate cacheEvictionWeightMetricName;
final MetricNameTemplate cacheEvictionWeightByCauseMetricName;
final MetricNameTemplate cacheSizeTotalMetricName;

public CaffeineMetricsRegistry(final String groupName) {
this.groupName = groupName;
cacheHitsMetricName =
new MetricNameTemplate(CACHE_HITS_TOTAL, groupName, "");
cacheMissesMetricName =
new MetricNameTemplate(CACHE_MISSES_TOTAL, groupName, "");
cacheLoadSuccessMetricName =
new MetricNameTemplate(CACHE_LOAD_SUCCESS_TOTAL, groupName, "");
cacheLoadSuccessTimeMetricName =
new MetricNameTemplate(CACHE_LOAD_SUCCESS_TIME_TOTAL, groupName, "");
cacheLoadFailureMetricName =
new MetricNameTemplate(CACHE_LOAD_FAILURE_TOTAL, groupName, "");
cacheLoadFailureTimeMetricName =
new MetricNameTemplate(CACHE_LOAD_FAILURE_TIME_TOTAL, groupName, "");
cacheEvictionMetricName =
new MetricNameTemplate(CACHE_EVICTION_TOTAL, groupName, "");
cacheEvictionByCauseMetricName =
new MetricNameTemplate(CACHE_EVICTION_TOTAL, groupName, "", "cause");
cacheEvictionWeightMetricName =
new MetricNameTemplate(CACHE_EVICTION_WEIGHT_TOTAL, groupName, "");
cacheEvictionWeightByCauseMetricName =
new MetricNameTemplate(CACHE_EVICTION_WEIGHT_TOTAL, groupName, "", "cause");
cacheSizeTotalMetricName =
new MetricNameTemplate(CACHE_SIZE_TOTAL, groupName, "");
}

public List<MetricNameTemplate> all() {
return List.of(
cacheHitsMetricName,
cacheMissesMetricName,
cacheLoadSuccessMetricName,
cacheLoadSuccessTimeMetricName,
cacheLoadFailureMetricName,
cacheLoadFailureTimeMetricName,
cacheEvictionMetricName,
cacheEvictionByCauseMetricName,
cacheEvictionWeightMetricName,
cacheEvictionWeightByCauseMetricName,
cacheSizeTotalMetricName
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.github.benmanes.caffeine.cache.stats.StatsCounter;

import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_EVICTION;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_EVICTION_WEIGHT;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_HITS;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_LOAD_FAILURE;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_LOAD_FAILURE_TIME;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_LOAD_SUCCESS;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_LOAD_SUCCESS_TIME;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_MISSES;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_SIZE;

/**
* Records cache metrics managed by Caffeine {@code Cache#stats}.
*
Expand All @@ -43,29 +53,6 @@
* <a href="https://github.com/micrometer-metrics/micrometer/blob/main/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/cache/CaffeineStatsCounter.java">Micrometer</a>
*/
public class CaffeineStatsCounter implements StatsCounter {

static final String CACHE_HITS = "cache-hits";
static final String CACHE_HITS_TOTAL = CACHE_HITS + "-total";
static final String CACHE_MISSES = "cache-misses";
static final String CACHE_MISSES_TOTAL = CACHE_MISSES + "-total";
static final String CACHE_LOAD = "cache-load";
static final String CACHE_LOAD_SUCCESS = CACHE_LOAD + "-success";
static final String CACHE_LOAD_SUCCESS_TOTAL = CACHE_LOAD_SUCCESS + "-total";
static final String CACHE_LOAD_SUCCESS_TIME = CACHE_LOAD_SUCCESS + "-time";
static final String CACHE_LOAD_SUCCESS_TIME_TOTAL = CACHE_LOAD_SUCCESS_TIME + "-total";
static final String CACHE_LOAD_FAILURE = CACHE_LOAD + "-failure";
static final String CACHE_LOAD_FAILURE_TOTAL = CACHE_LOAD_FAILURE + "-total";
static final String CACHE_LOAD_FAILURE_TIME = CACHE_LOAD_FAILURE + "-time";
static final String CACHE_LOAD_FAILURE_TIME_TOTAL = CACHE_LOAD_FAILURE_TIME + "-total";

static final String CACHE_EVICTION = "cache-eviction";
static final String CACHE_EVICTION_TOTAL = CACHE_EVICTION + "-total";
static final String CACHE_EVICTION_WEIGHT = CACHE_EVICTION + "-weight";
static final String CACHE_EVICTION_WEIGHT_TOTAL = CACHE_EVICTION_WEIGHT + "-total";

static final String CACHE_SIZE = "cache-size";
static final String CACHE_SIZE_TOTAL = CACHE_SIZE + "-total";

private final org.apache.kafka.common.metrics.Metrics metrics;

private final LongAdder cacheHitCount;
Expand All @@ -78,7 +65,7 @@ public class CaffeineStatsCounter implements StatsCounter {
private final LongAdder cacheEvictionWeightTotal;
private final ConcurrentHashMap<RemovalCause, LongAdder> cacheEvictionCountByCause;
private final ConcurrentHashMap<RemovalCause, LongAdder> cacheEvictionWeightByCause;
private final String groupName;
private final CaffeineMetricsRegistry metricsRegistry;

public CaffeineStatsCounter(final String groupName) {
cacheHitCount = new LongAdder();
Expand All @@ -90,8 +77,6 @@ public CaffeineStatsCounter(final String groupName) {
cacheEvictionCountTotal = new LongAdder();
cacheEvictionWeightTotal = new LongAdder();

this.groupName = groupName;

cacheEvictionCountByCause = new ConcurrentHashMap<>();
Arrays.stream(RemovalCause.values()).forEach(cause -> cacheEvictionCountByCause.put(cause, new LongAdder()));

Expand All @@ -105,39 +90,81 @@ public CaffeineStatsCounter(final String groupName) {
new KafkaMetricsContext("aiven.kafka.server.tieredstorage.cache")
);

initSensor(CACHE_HITS, CACHE_HITS_TOTAL, cacheHitCount);
initSensor(CACHE_MISSES, CACHE_MISSES_TOTAL, cacheMissCount);
initSensor(CACHE_LOAD_SUCCESS, CACHE_LOAD_SUCCESS_TOTAL, cacheLoadSuccessCount);
initSensor(CACHE_LOAD_SUCCESS_TIME, CACHE_LOAD_SUCCESS_TIME_TOTAL, cacheLoadSuccessTimeTotal);
initSensor(CACHE_LOAD_FAILURE, CACHE_LOAD_FAILURE_TOTAL, cacheLoadFailureCount);
initSensor(CACHE_LOAD_FAILURE_TIME, CACHE_LOAD_FAILURE_TIME_TOTAL, cacheLoadFailureTimeTotal);
initSensor(CACHE_EVICTION, CACHE_EVICTION_TOTAL, cacheEvictionCountTotal);
metricsRegistry = new CaffeineMetricsRegistry(groupName);
initSensor(
metricsRegistry.cacheHitsMetricName,
CACHE_HITS,
cacheHitCount
);
initSensor(
metricsRegistry.cacheMissesMetricName,
CACHE_MISSES,
cacheMissCount
);
initSensor(
metricsRegistry.cacheLoadSuccessMetricName,
CACHE_LOAD_SUCCESS,
cacheLoadSuccessCount
);
initSensor(
metricsRegistry.cacheLoadSuccessTimeMetricName,
CACHE_LOAD_SUCCESS_TIME,
cacheLoadSuccessTimeTotal
);
initSensor(
metricsRegistry.cacheLoadFailureMetricName,
CACHE_LOAD_FAILURE,
cacheLoadFailureCount
);
initSensor(
metricsRegistry.cacheLoadFailureTimeMetricName,
CACHE_LOAD_FAILURE_TIME,
cacheLoadFailureTimeTotal
);
initSensor(
metricsRegistry.cacheEvictionMetricName,
CACHE_EVICTION,
cacheEvictionCountTotal
);
Arrays.stream(RemovalCause.values()).forEach(cause ->
initSensor("cause." + cause.name() + "." + CACHE_EVICTION, CACHE_EVICTION_TOTAL,
cacheEvictionCountByCause.get(cause), () -> Map.of("cause", cause.name()), "cause")
initSensor(
metricsRegistry.cacheEvictionByCauseMetricName,
"cause." + cause.name() + "." + CACHE_EVICTION,
cacheEvictionCountByCause.get(cause),
() -> Map.of("cause", cause.name())
)
);

initSensor(CACHE_EVICTION_WEIGHT, CACHE_EVICTION_WEIGHT_TOTAL, cacheEvictionWeightTotal);
initSensor(
metricsRegistry.cacheEvictionWeightMetricName,
CACHE_EVICTION_WEIGHT,
cacheEvictionWeightTotal
);

Arrays.stream(RemovalCause.values()).forEach(cause ->
initSensor("cause." + cause.name() + "." + CACHE_EVICTION, CACHE_EVICTION_WEIGHT_TOTAL,
cacheEvictionWeightByCause.get(cause), () -> Map.of("cause", cause.name()), "cause")
initSensor(
metricsRegistry.cacheEvictionWeightByCauseMetricName,
"cause." + cause.name() + "." + CACHE_EVICTION,
cacheEvictionWeightByCause.get(cause),
() -> Map.of("cause", cause.name())
)
);
}

private void initSensor(final String sensorName,
final String metricName,
final LongAdder value,
final Supplier<Map<String, String>> tagsSupplier,
final String... tagNames) {
final var name = new MetricNameTemplate(metricName, groupName, "", tagNames);
private void initSensor(
final MetricNameTemplate metricNameTemplate,
final String sensorName,
final LongAdder value,
final Supplier<Map<String, String>> tagsSupplier
) {
new SensorProvider(metrics, sensorName, tagsSupplier)
.with(name, new MeasurableValue(value::sum))
.with(metricNameTemplate, new MeasurableValue(value::sum))
.get();
}

private void initSensor(final String sensorName, final String metricName, final LongAdder value) {
initSensor(sensorName, metricName, value, Collections::emptyMap);
private void initSensor(final MetricNameTemplate metricNameTemplate, final String sensorName,
final LongAdder value) {
initSensor(metricNameTemplate, sensorName, value, Collections::emptyMap);
}

@Override
Expand Down Expand Up @@ -193,9 +220,8 @@ public void recordHit() {
* @param sizeSupplier operation from cache to provide cache size value
*/
public void registerSizeMetric(final Supplier<Long> sizeSupplier) {
final var name = new MetricNameTemplate(CACHE_SIZE_TOTAL, groupName, "");
new SensorProvider(metrics, CACHE_SIZE)
.with(name, new MeasurableValue(sizeSupplier))
.with(metricsRegistry.cacheSizeTotalMetricName, new MeasurableValue(sizeSupplier))
.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.tieredstorage.metrics;

import java.util.List;
import java.util.Map;

import org.apache.kafka.common.MetricNameTemplate;
Expand Down Expand Up @@ -252,4 +253,70 @@ static Map<String, String> topicPartitionAndObjectTypeTags(final TopicPartition
static Map<String, String> objectTypeTags(final ObjectKeyFactory.Suffix suffix) {
return Map.of(TAG_NAME_OBJECT_TYPE, suffix.value);
}

public Iterable<MetricNameTemplate> all() {
return List.of(
// segment copy
segmentCopyTimeAvg,
segmentCopyTimeAvgByTopic,
segmentCopyTimeAvgByTopicPartition,
segmentCopyTimeMax,
segmentCopyTimeMaxByTopic,
segmentCopyTimeMaxByTopicPartition,
// segment delete
segmentDeleteRequestsRate,
segmentDeleteRequestsRateByTopic,
segmentDeleteRequestsRateByTopicPartition,
segmentDeleteRequestsTotal,
segmentDeleteRequestsTotalByTopic,
segmentDeleteRequestsTotalByTopicPartition,
segmentDeleteBytesTotal,
segmentDeleteBytesTotalByTopic,
segmentDeleteBytesTotalByTopicPartition,
segmentDeleteTimeAvg,
segmentDeleteTimeAvgByTopic,
segmentDeleteTimeAvgByTopicPartition,
segmentDeleteTimeMax,
segmentDeleteTimeMaxByTopic,
segmentDeleteTimeMaxByTopicPartition,
segmentDeleteErrorsRate,
segmentDeleteErrorsRateByTopic,
segmentDeleteErrorsRateByTopicPartition,
segmentDeleteErrorsTotal,
segmentDeleteErrorsTotalByTopic,
segmentDeleteErrorsTotalByTopicPartition,
// segment fetch
segmentFetchRequestedBytesRate,
segmentFetchRequestedBytesRateByTopic,
segmentFetchRequestedBytesRateByTopicPartition,
segmentFetchRequestedBytesTotal,
segmentFetchRequestedBytesTotalByTopic,
segmentFetchRequestedBytesTotalByTopicPartition,
// object upload
objectUploadRequestsRate,
objectUploadRequestsRateByTopic,
objectUploadRequestsRateByTopicPartition,
objectUploadRequestsRateByObjectType,
objectUploadRequestsRateByTopicAndObjectType,
objectUploadRequestsRateByTopicPartitionAndObjectType,
objectUploadRequestsTotal,
objectUploadRequestsTotalByTopic,
objectUploadRequestsTotalByTopicPartition,
objectUploadRequestsTotalByObjectType,
objectUploadRequestsTotalByTopicAndObjectType,
objectUploadRequestsTotalByTopicPartitionAndObjectType,
objectUploadBytesRate,
objectUploadBytesRateByTopic,
objectUploadBytesRateByTopicPartition,
objectUploadBytesRateByObjectType,
objectUploadBytesRateByTopicAndObjectType,
objectUploadBytesRateByTopicPartitionAndObjectType,
objectUploadBytesTotal,
objectUploadBytesTotalByTopic,
objectUploadBytesTotalByTopicPartition,
objectUploadBytesTotalByObjectType,
objectUploadBytesTotalByTopicAndObjectType,
objectUploadBytesTotalByTopicPartitionAndObjectType
);
}
}
Loading

0 comments on commit daec916

Please sign in to comment.