Skip to content

Commit

Permalink
refactor: expose caching metric names for doc generation
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Oct 17, 2024
1 parent 6f9fad6 commit 7c09a41
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import com.github.benmanes.caffeine.cache.Weigher;

public abstract class ChunkCache<T> implements ChunkManager, Configurable {
private static final String METRIC_GROUP = "chunk-cache-metrics";
private static final String THREAD_POOL_METRIC_GROUP = "chunk-cache-thread-pool-metrics";
public static final String METRIC_GROUP = "chunk-cache-metrics";
public static final String THREAD_POOL_METRIC_GROUP = "chunk-cache-thread-pool-metrics";

private final ChunkManager chunkManager;
private ExecutorService executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@
public class MemorySegmentIndexesCache implements SegmentIndexesCache {
private static final Logger log = LoggerFactory.getLogger(MemorySegmentIndexesCache.class);

public static final String METRIC_GROUP = "segment-indexes-cache-metrics";
public static final String THREAD_POOL_METRIC_GROUP = "segment-indexes-cache-thread-pool-metrics";

private static final long DEFAULT_MAX_SIZE_BYTES = 10 * 1024 * 1024;
private static final String METRIC_GROUP = "segment-indexes-cache-metrics";
private static final String THREAD_POOL_METRIC_GROUP = "segment-indexes-cache-thread-pool-metrics";

private final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@

public class MemorySegmentManifestCache implements SegmentManifestCache {
private static final Logger log = LoggerFactory.getLogger(MemorySegmentManifestCache.class);
private static final String METRIC_GROUP = "segment-manifest-cache-metrics";
private static final String THREAD_POOL_METRIC_GROUP = "segment-manifest-cache-thread-pool-metrics";
public static final String METRIC_GROUP = "segment-manifest-cache-metrics";
public static final String THREAD_POOL_METRIC_GROUP = "segment-manifest-cache-thread-pool-metrics";
private static final long DEFAULT_MAX_SIZE = 1000L;
private static final long DEFAULT_RETENTION_MS = 3_600_000;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.metrics;

import java.util.List;

import org.apache.kafka.common.MetricNameTemplate;

public class CaffeineMetricsRegistry {
public static final String METRIC_CONTEXT = "aiven.kafka.server.tieredstorage.cache";

static final String CACHE_HITS = "cache-hits";
static final String CACHE_HITS_TOTAL = CACHE_HITS + "-total";
static final String CACHE_MISSES = "cache-misses";
static final String CACHE_MISSES_TOTAL = CACHE_MISSES + "-total";
static final String CACHE_LOAD = "cache-load";
static final String CACHE_LOAD_SUCCESS = CACHE_LOAD + "-success";
static final String CACHE_LOAD_SUCCESS_TOTAL = CACHE_LOAD_SUCCESS + "-total";
static final String CACHE_LOAD_SUCCESS_TIME = CACHE_LOAD_SUCCESS + "-time";
static final String CACHE_LOAD_SUCCESS_TIME_TOTAL = CACHE_LOAD_SUCCESS_TIME + "-total";
static final String CACHE_LOAD_FAILURE = CACHE_LOAD + "-failure";
static final String CACHE_LOAD_FAILURE_TOTAL = CACHE_LOAD_FAILURE + "-total";
static final String CACHE_LOAD_FAILURE_TIME = CACHE_LOAD_FAILURE + "-time";
static final String CACHE_LOAD_FAILURE_TIME_TOTAL = CACHE_LOAD_FAILURE_TIME + "-total";

static final String CACHE_EVICTION = "cache-eviction";
static final String CACHE_EVICTION_TOTAL = CACHE_EVICTION + "-total";
static final String CACHE_EVICTION_WEIGHT = CACHE_EVICTION + "-weight";
static final String CACHE_EVICTION_WEIGHT_TOTAL = CACHE_EVICTION_WEIGHT + "-total";

static final String CACHE_SIZE = "cache-size";
static final String CACHE_SIZE_TOTAL = CACHE_SIZE + "-total";

final String groupName;

final MetricNameTemplate cacheHitsMetricName;
final MetricNameTemplate cacheMissesMetricName;
final MetricNameTemplate cacheLoadSuccessMetricName;
final MetricNameTemplate cacheLoadSuccessTimeMetricName;
final MetricNameTemplate cacheLoadFailureMetricName;
final MetricNameTemplate cacheLoadFailureTimeMetricName;
final MetricNameTemplate cacheEvictionMetricName;
final MetricNameTemplate cacheEvictionByCauseMetricName;
final MetricNameTemplate cacheEvictionWeightMetricName;
final MetricNameTemplate cacheEvictionWeightByCauseMetricName;
final MetricNameTemplate cacheSizeTotalMetricName;

public CaffeineMetricsRegistry(final String groupName) {
this.groupName = groupName;
cacheHitsMetricName = new MetricNameTemplate(
CACHE_HITS_TOTAL,
groupName,
"Cache hits"
);
cacheMissesMetricName = new MetricNameTemplate(
CACHE_MISSES_TOTAL,
groupName,
"Cache misses"
);
cacheLoadSuccessMetricName = new MetricNameTemplate(
CACHE_LOAD_SUCCESS_TOTAL,
groupName,
"Successful load of a new entry"
);
cacheLoadSuccessTimeMetricName = new MetricNameTemplate(
CACHE_LOAD_SUCCESS_TIME_TOTAL,
groupName,
"Time to load a new entry"
);
cacheLoadFailureMetricName = new MetricNameTemplate(
CACHE_LOAD_FAILURE_TOTAL,
groupName,
"Failures to load a new entry"
);
cacheLoadFailureTimeMetricName = new MetricNameTemplate(
CACHE_LOAD_FAILURE_TIME_TOTAL,
groupName,
"Time when failing to load a new entry"
);
cacheEvictionMetricName = new MetricNameTemplate(
CACHE_EVICTION_TOTAL,
groupName,
"Eviction of an entry from the cache"
);
cacheEvictionByCauseMetricName = new MetricNameTemplate(
CACHE_EVICTION_TOTAL,
groupName,
"Eviction of an entry from the cache tagged by cause",
"cause"
);
cacheEvictionWeightMetricName = new MetricNameTemplate(
CACHE_EVICTION_WEIGHT_TOTAL,
groupName,
"Weight of evicted entry"
);
cacheEvictionWeightByCauseMetricName = new MetricNameTemplate(
CACHE_EVICTION_WEIGHT_TOTAL,
groupName,
"Weight of evicted entry tagged by cause",
"cause"
);
cacheSizeTotalMetricName = new MetricNameTemplate(
CACHE_SIZE_TOTAL,
groupName,
"Estimated number of entries in the cache"
);
}

public List<MetricNameTemplate> all() {
return List.of(
cacheHitsMetricName,
cacheMissesMetricName,
cacheLoadSuccessMetricName,
cacheLoadSuccessTimeMetricName,
cacheLoadFailureMetricName,
cacheLoadFailureTimeMetricName,
cacheEvictionMetricName,
cacheEvictionByCauseMetricName,
cacheEvictionWeightMetricName,
cacheEvictionWeightByCauseMetricName,
cacheSizeTotalMetricName
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.github.benmanes.caffeine.cache.stats.StatsCounter;

import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_EVICTION;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_EVICTION_WEIGHT;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_HITS;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_LOAD_FAILURE;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_LOAD_FAILURE_TIME;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_LOAD_SUCCESS;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_LOAD_SUCCESS_TIME;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_MISSES;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.CACHE_SIZE;
import static io.aiven.kafka.tieredstorage.metrics.CaffeineMetricsRegistry.METRIC_CONTEXT;

/**
* Records cache metrics managed by Caffeine {@code Cache#stats}.
*
Expand All @@ -43,29 +54,6 @@
* <a href="https://github.com/micrometer-metrics/micrometer/blob/main/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/cache/CaffeineStatsCounter.java">Micrometer</a>
*/
public class CaffeineStatsCounter implements StatsCounter {

static final String CACHE_HITS = "cache-hits";
static final String CACHE_HITS_TOTAL = CACHE_HITS + "-total";
static final String CACHE_MISSES = "cache-misses";
static final String CACHE_MISSES_TOTAL = CACHE_MISSES + "-total";
static final String CACHE_LOAD = "cache-load";
static final String CACHE_LOAD_SUCCESS = CACHE_LOAD + "-success";
static final String CACHE_LOAD_SUCCESS_TOTAL = CACHE_LOAD_SUCCESS + "-total";
static final String CACHE_LOAD_SUCCESS_TIME = CACHE_LOAD_SUCCESS + "-time";
static final String CACHE_LOAD_SUCCESS_TIME_TOTAL = CACHE_LOAD_SUCCESS_TIME + "-total";
static final String CACHE_LOAD_FAILURE = CACHE_LOAD + "-failure";
static final String CACHE_LOAD_FAILURE_TOTAL = CACHE_LOAD_FAILURE + "-total";
static final String CACHE_LOAD_FAILURE_TIME = CACHE_LOAD_FAILURE + "-time";
static final String CACHE_LOAD_FAILURE_TIME_TOTAL = CACHE_LOAD_FAILURE_TIME + "-total";

static final String CACHE_EVICTION = "cache-eviction";
static final String CACHE_EVICTION_TOTAL = CACHE_EVICTION + "-total";
static final String CACHE_EVICTION_WEIGHT = CACHE_EVICTION + "-weight";
static final String CACHE_EVICTION_WEIGHT_TOTAL = CACHE_EVICTION_WEIGHT + "-total";

static final String CACHE_SIZE = "cache-size";
static final String CACHE_SIZE_TOTAL = CACHE_SIZE + "-total";

private final org.apache.kafka.common.metrics.Metrics metrics;

private final LongAdder cacheHitCount;
Expand All @@ -78,7 +66,7 @@ public class CaffeineStatsCounter implements StatsCounter {
private final LongAdder cacheEvictionWeightTotal;
private final ConcurrentHashMap<RemovalCause, LongAdder> cacheEvictionCountByCause;
private final ConcurrentHashMap<RemovalCause, LongAdder> cacheEvictionWeightByCause;
private final String groupName;
private final CaffeineMetricsRegistry metricsRegistry;

public CaffeineStatsCounter(final String groupName) {
cacheHitCount = new LongAdder();
Expand All @@ -90,8 +78,6 @@ public CaffeineStatsCounter(final String groupName) {
cacheEvictionCountTotal = new LongAdder();
cacheEvictionWeightTotal = new LongAdder();

this.groupName = groupName;

cacheEvictionCountByCause = new ConcurrentHashMap<>();
Arrays.stream(RemovalCause.values()).forEach(cause -> cacheEvictionCountByCause.put(cause, new LongAdder()));

Expand All @@ -102,42 +88,84 @@ public CaffeineStatsCounter(final String groupName) {

metrics = new org.apache.kafka.common.metrics.Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext("aiven.kafka.server.tieredstorage.cache")
new KafkaMetricsContext(METRIC_CONTEXT)
);

initSensor(CACHE_HITS, CACHE_HITS_TOTAL, cacheHitCount);
initSensor(CACHE_MISSES, CACHE_MISSES_TOTAL, cacheMissCount);
initSensor(CACHE_LOAD_SUCCESS, CACHE_LOAD_SUCCESS_TOTAL, cacheLoadSuccessCount);
initSensor(CACHE_LOAD_SUCCESS_TIME, CACHE_LOAD_SUCCESS_TIME_TOTAL, cacheLoadSuccessTimeTotal);
initSensor(CACHE_LOAD_FAILURE, CACHE_LOAD_FAILURE_TOTAL, cacheLoadFailureCount);
initSensor(CACHE_LOAD_FAILURE_TIME, CACHE_LOAD_FAILURE_TIME_TOTAL, cacheLoadFailureTimeTotal);
initSensor(CACHE_EVICTION, CACHE_EVICTION_TOTAL, cacheEvictionCountTotal);
metricsRegistry = new CaffeineMetricsRegistry(groupName);
initSensor(
metricsRegistry.cacheHitsMetricName,
CACHE_HITS,
cacheHitCount
);
initSensor(
metricsRegistry.cacheMissesMetricName,
CACHE_MISSES,
cacheMissCount
);
initSensor(
metricsRegistry.cacheLoadSuccessMetricName,
CACHE_LOAD_SUCCESS,
cacheLoadSuccessCount
);
initSensor(
metricsRegistry.cacheLoadSuccessTimeMetricName,
CACHE_LOAD_SUCCESS_TIME,
cacheLoadSuccessTimeTotal
);
initSensor(
metricsRegistry.cacheLoadFailureMetricName,
CACHE_LOAD_FAILURE,
cacheLoadFailureCount
);
initSensor(
metricsRegistry.cacheLoadFailureTimeMetricName,
CACHE_LOAD_FAILURE_TIME,
cacheLoadFailureTimeTotal
);
initSensor(
metricsRegistry.cacheEvictionMetricName,
CACHE_EVICTION,
cacheEvictionCountTotal
);
Arrays.stream(RemovalCause.values()).forEach(cause ->
initSensor("cause." + cause.name() + "." + CACHE_EVICTION, CACHE_EVICTION_TOTAL,
cacheEvictionCountByCause.get(cause), () -> Map.of("cause", cause.name()), "cause")
initSensor(
metricsRegistry.cacheEvictionByCauseMetricName,
"cause." + cause.name() + "." + CACHE_EVICTION,
cacheEvictionCountByCause.get(cause),
() -> Map.of("cause", cause.name())
)
);

initSensor(CACHE_EVICTION_WEIGHT, CACHE_EVICTION_WEIGHT_TOTAL, cacheEvictionWeightTotal);
initSensor(
metricsRegistry.cacheEvictionWeightMetricName,
CACHE_EVICTION_WEIGHT,
cacheEvictionWeightTotal
);

Arrays.stream(RemovalCause.values()).forEach(cause ->
initSensor("cause." + cause.name() + "." + CACHE_EVICTION, CACHE_EVICTION_WEIGHT_TOTAL,
cacheEvictionWeightByCause.get(cause), () -> Map.of("cause", cause.name()), "cause")
initSensor(
metricsRegistry.cacheEvictionWeightByCauseMetricName,
"cause." + cause.name() + "." + CACHE_EVICTION,
cacheEvictionWeightByCause.get(cause),
() -> Map.of("cause", cause.name())
)
);
}

private void initSensor(final String sensorName,
final String metricName,
final LongAdder value,
final Supplier<Map<String, String>> tagsSupplier,
final String... tagNames) {
final var name = new MetricNameTemplate(metricName, groupName, "", tagNames);
private void initSensor(
final MetricNameTemplate metricNameTemplate,
final String sensorName,
final LongAdder value,
final Supplier<Map<String, String>> tagsSupplier
) {
new SensorProvider(metrics, sensorName, tagsSupplier)
.with(name, new MeasurableValue(value::sum))
.with(metricNameTemplate, new MeasurableValue(value::sum))
.get();
}

private void initSensor(final String sensorName, final String metricName, final LongAdder value) {
initSensor(sensorName, metricName, value, Collections::emptyMap);
private void initSensor(final MetricNameTemplate metricNameTemplate, final String sensorName,
final LongAdder value) {
initSensor(metricNameTemplate, sensorName, value, Collections::emptyMap);
}

@Override
Expand Down Expand Up @@ -193,9 +221,8 @@ public void recordHit() {
* @param sizeSupplier operation from cache to provide cache size value
*/
public void registerSizeMetric(final Supplier<Long> sizeSupplier) {
final var name = new MetricNameTemplate(CACHE_SIZE_TOTAL, groupName, "");
new SensorProvider(metrics, CACHE_SIZE)
.with(name, new MeasurableValue(sizeSupplier))
.with(metricsRegistry.cacheSizeTotalMetricName, new MeasurableValue(sizeSupplier))
.get();
}

Expand Down

0 comments on commit 7c09a41

Please sign in to comment.