Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: expose storage backends metric names for doc generation #612

Merged
merged 3 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,38 @@
import com.azure.core.http.policy.HttpPipelinePolicy;
import reactor.core.publisher.Mono;

public class MetricCollector {
private final org.apache.kafka.common.metrics.Metrics metrics;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOB_DELETE;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOB_DELETE_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOB_DELETE_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOB_GET;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOB_GET_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOB_GET_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOB_UPLOAD;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOB_UPLOAD_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOB_UPLOAD_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOCK_LIST_UPLOAD;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOCK_LIST_UPLOAD_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOCK_LIST_UPLOAD_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOCK_UPLOAD;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOCK_UPLOAD_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.BLOCK_UPLOAD_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.azure.MetricRegistry.METRIC_CONTEXT;

private static final String METRIC_GROUP = "azure-blob-storage-client-metrics";
public class MetricCollector {

final AzureBlobStorageConfig config;
final MetricsPolicy policy;

MetricCollector(final AzureBlobStorageConfig config) {
public MetricCollector(final AzureBlobStorageConfig config) {
this.config = config;

final JmxReporter reporter = new JmxReporter();

metrics = new org.apache.kafka.common.metrics.Metrics(
final Metrics metrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext("aiven.kafka.server.tieredstorage.azure")
new KafkaMetricsContext(METRIC_CONTEXT)
);
policy = new MetricsPolicy(metrics, pathPattern());
}

Pattern pathPattern() {
Expand All @@ -64,7 +80,7 @@ Pattern pathPattern() {
}

MetricsPolicy policy() {
return new MetricsPolicy(metrics, pathPattern());
return policy;
}

static class MetricsPolicy implements HttpPipelinePolicy {
Expand All @@ -83,17 +99,41 @@ static class MetricsPolicy implements HttpPipelinePolicy {
MetricsPolicy(final Metrics metrics, final Pattern pathPattern) {
this.metrics = metrics;
this.pathPattern = pathPattern;
this.deleteBlobRequests = createSensor("blob-delete");
this.uploadBlobRequests = createSensor("blob-upload");
this.uploadBlockRequests = createSensor("block-upload");
this.uploadBlockListRequests = createSensor("block-list-upload");
this.getBlobRequests = createSensor("blob-get");
this.deleteBlobRequests = createSensor(
BLOB_DELETE,
BLOB_DELETE_RATE_METRIC_NAME,
BLOB_DELETE_TOTAL_METRIC_NAME
);
this.uploadBlobRequests = createSensor(
BLOB_UPLOAD,
BLOB_UPLOAD_RATE_METRIC_NAME,
BLOB_UPLOAD_TOTAL_METRIC_NAME
);
this.uploadBlockRequests = createSensor(
BLOCK_UPLOAD,
BLOCK_UPLOAD_RATE_METRIC_NAME,
BLOCK_UPLOAD_TOTAL_METRIC_NAME
);
this.uploadBlockListRequests = createSensor(
BLOCK_LIST_UPLOAD,
BLOCK_LIST_UPLOAD_RATE_METRIC_NAME,
BLOCK_LIST_UPLOAD_TOTAL_METRIC_NAME
);
this.getBlobRequests = createSensor(
BLOB_GET,
BLOB_GET_RATE_METRIC_NAME,
BLOB_GET_TOTAL_METRIC_NAME
);
}

private Sensor createSensor(final String name) {
private Sensor createSensor(
final String name,
final MetricNameTemplate rateMetricName,
final MetricNameTemplate totalMetricName
) {
return new SensorProvider(metrics, name)
.with(new MetricNameTemplate(name + "-rate", METRIC_GROUP, ""), new Rate())
.with(new MetricNameTemplate(name + "-total", METRIC_GROUP, ""), new CumulativeCount())
.with(rateMetricName, new Rate())
.with(totalMetricName, new CumulativeCount())
.get();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.storage.azure;

import java.util.List;

import org.apache.kafka.common.MetricNameTemplate;

public class MetricRegistry {
public static final String METRIC_CONTEXT = "aiven.kafka.server.tieredstorage.azure";

static final String METRIC_GROUP = "azure-blob-storage-client-metrics";
static final String BLOB_DELETE = "blob-delete";
static final String BLOB_DELETE_DOC = "object delete operations";
static final String BLOB_DELETE_RATE = BLOB_DELETE + "-rate";
static final String BLOB_DELETE_TOTAL = BLOB_DELETE + "-total";
static final String BLOB_UPLOAD = "blob-upload";
static final String BLOB_UPLOAD_DOC = "object upload operations";
static final String BLOB_UPLOAD_RATE = BLOB_UPLOAD + "-rate";
static final String BLOB_UPLOAD_TOTAL = BLOB_UPLOAD + "-total";
static final String BLOCK_UPLOAD = "block-upload";
static final String BLOCK_UPLOAD_RATE = BLOCK_UPLOAD + "-rate";
static final String BLOCK_UPLOAD_TOTAL = BLOCK_UPLOAD + "-total";
static final String BLOCK_UPLOAD_DOC = "block (blob part) upload operations";
static final String BLOCK_LIST_UPLOAD = "block-list-upload";
static final String BLOCK_LIST_UPLOAD_RATE = BLOCK_LIST_UPLOAD + "-rate";
static final String BLOCK_LIST_UPLOAD_TOTAL = BLOCK_LIST_UPLOAD + "-total";
static final String BLOCK_LIST_UPLOAD_DOC = "block list (making a blob) upload operations";
static final String BLOB_GET = "blob-get";
static final String BLOB_GET_RATE = BLOB_GET + "-rate";
static final String BLOB_GET_TOTAL = BLOB_GET + "-total";
static final String BLOB_GET_DOC = "get object operations";

private static final String RATE_DOC_PREFIX = "Rate of ";
private static final String TOTAL_DOC_PREFIX = "Total number of ";

static final MetricNameTemplate BLOB_DELETE_RATE_METRIC_NAME = new MetricNameTemplate(
BLOB_DELETE_RATE,
METRIC_GROUP,
RATE_DOC_PREFIX + BLOB_DELETE_DOC
);
static final MetricNameTemplate BLOB_DELETE_TOTAL_METRIC_NAME = new MetricNameTemplate(
BLOB_DELETE_TOTAL,
METRIC_GROUP,
TOTAL_DOC_PREFIX + BLOB_DELETE_DOC
);
static final MetricNameTemplate BLOB_UPLOAD_RATE_METRIC_NAME = new MetricNameTemplate(
BLOB_UPLOAD_RATE,
METRIC_GROUP,
RATE_DOC_PREFIX + BLOB_UPLOAD_DOC
);
static final MetricNameTemplate BLOB_UPLOAD_TOTAL_METRIC_NAME = new MetricNameTemplate(
BLOB_UPLOAD_TOTAL,
METRIC_GROUP,
TOTAL_DOC_PREFIX + BLOB_UPLOAD_DOC
);
static final MetricNameTemplate BLOCK_UPLOAD_RATE_METRIC_NAME = new MetricNameTemplate(
BLOCK_UPLOAD_RATE,
METRIC_GROUP,
RATE_DOC_PREFIX + BLOCK_UPLOAD_DOC
);
static final MetricNameTemplate BLOCK_UPLOAD_TOTAL_METRIC_NAME = new MetricNameTemplate(
BLOCK_UPLOAD_TOTAL,
METRIC_GROUP,
TOTAL_DOC_PREFIX + BLOCK_UPLOAD_DOC
);
static final MetricNameTemplate BLOCK_LIST_UPLOAD_RATE_METRIC_NAME = new MetricNameTemplate(
BLOCK_LIST_UPLOAD_RATE,
METRIC_GROUP,
RATE_DOC_PREFIX + BLOCK_LIST_UPLOAD_DOC
);
static final MetricNameTemplate BLOCK_LIST_UPLOAD_TOTAL_METRIC_NAME = new MetricNameTemplate(
BLOCK_LIST_UPLOAD_TOTAL,
METRIC_GROUP,
TOTAL_DOC_PREFIX + BLOCK_LIST_UPLOAD_DOC
);
static final MetricNameTemplate BLOB_GET_RATE_METRIC_NAME = new MetricNameTemplate(
BLOB_GET_RATE,
METRIC_GROUP,
RATE_DOC_PREFIX + BLOB_GET_DOC
);
static final MetricNameTemplate BLOB_GET_TOTAL_METRIC_NAME = new MetricNameTemplate(
BLOB_GET_TOTAL,
METRIC_GROUP,
TOTAL_DOC_PREFIX + BLOB_GET_DOC
);

public List<MetricNameTemplate> all() {
return List.of(
BLOB_DELETE_RATE_METRIC_NAME,
BLOB_DELETE_TOTAL_METRIC_NAME,
BLOB_UPLOAD_RATE_METRIC_NAME,
BLOB_UPLOAD_TOTAL_METRIC_NAME,
BLOCK_UPLOAD_RATE_METRIC_NAME,
BLOCK_UPLOAD_TOTAL_METRIC_NAME,
BLOCK_LIST_UPLOAD_RATE_METRIC_NAME,
BLOCK_LIST_UPLOAD_TOTAL_METRIC_NAME,
BLOB_GET_RATE_METRIC_NAME,
BLOB_GET_TOTAL_METRIC_NAME
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.regex.Pattern;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -37,7 +38,24 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.http.HttpTransportOptions;

class MetricCollector {
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.METRIC_CONTEXT;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.OBJECT_DELETE;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.OBJECT_DELETE_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.OBJECT_DELETE_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.OBJECT_GET;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.OBJECT_GET_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.OBJECT_GET_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.OBJECT_METADATA_GET;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.OBJECT_METADATA_GET_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.OBJECT_METADATA_GET_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.RESUMABLE_CHUNK_UPLOAD;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.RESUMABLE_CHUNK_UPLOAD_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.RESUMABLE_CHUNK_UPLOAD_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.RESUMABLE_UPLOAD_INITIATE;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.RESUMABLE_UPLOAD_INITIATE_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.gcs.MetricRegistry.RESUMABLE_UPLOAD_INITIATE_TOTAL_METRIC_NAME;

public class MetricCollector {
private final org.apache.kafka.common.metrics.Metrics metrics;

/**
Expand All @@ -64,33 +82,55 @@ class MetricCollector {
static final Pattern OBJECT_UPLOAD_PATH_PATTERN =
Pattern.compile("^/upload/storage/v1/b/([^/]+)/o/?$");

private static final String METRIC_GROUP = "gcs-client-metrics";

private final Sensor getObjectMetadataRequests;
private final Sensor deleteObjectRequests;
private final Sensor resumableUploadInitiateRequests;
private final Sensor resumableChunkUploadRequests;
private final Sensor getObjectRequests;

MetricCollector() {
public MetricCollector() {
final JmxReporter reporter = new JmxReporter();

metrics = new org.apache.kafka.common.metrics.Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext("aiven.kafka.server.tieredstorage.gcs")
new KafkaMetricsContext(METRIC_CONTEXT)
);

getObjectMetadataRequests = createSensor("object-metadata-get");
getObjectRequests = createSensor("object-get");
deleteObjectRequests = createSensor("object-delete");
resumableUploadInitiateRequests = createSensor("resumable-upload-initiate");
resumableChunkUploadRequests = createSensor("resumable-chunk-upload");
getObjectMetadataRequests = createSensor(
OBJECT_METADATA_GET,
OBJECT_METADATA_GET_RATE_METRIC_NAME,
OBJECT_METADATA_GET_TOTAL_METRIC_NAME
);
getObjectRequests = createSensor(
OBJECT_GET,
OBJECT_GET_RATE_METRIC_NAME,
OBJECT_GET_TOTAL_METRIC_NAME
);
deleteObjectRequests = createSensor(
OBJECT_DELETE,
OBJECT_DELETE_RATE_METRIC_NAME,
OBJECT_DELETE_TOTAL_METRIC_NAME
);
resumableUploadInitiateRequests = createSensor(
RESUMABLE_UPLOAD_INITIATE,
RESUMABLE_UPLOAD_INITIATE_RATE_METRIC_NAME,
RESUMABLE_UPLOAD_INITIATE_TOTAL_METRIC_NAME
);
resumableChunkUploadRequests = createSensor(
RESUMABLE_CHUNK_UPLOAD,
RESUMABLE_CHUNK_UPLOAD_RATE_METRIC_NAME,
RESUMABLE_CHUNK_UPLOAD_TOTAL_METRIC_NAME
);
}

private Sensor createSensor(final String name) {
private Sensor createSensor(
final String name,
final MetricNameTemplate rateMetricName,
final MetricNameTemplate totalMetricName
) {
final Sensor sensor = metrics.sensor(name);
sensor.add(metrics.metricName(name + "-rate", METRIC_GROUP), new Rate());
sensor.add(metrics.metricName(name + "-total", METRIC_GROUP), new CumulativeCount());
sensor.add(metrics.metricInstance(rateMetricName), new Rate());
sensor.add(metrics.metricInstance(totalMetricName), new CumulativeCount());
return sensor;
}

Expand Down
Loading
Loading