diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index fc9e9cb1..2095b43c 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -20,6 +20,6 @@ jobs: run: mvn -B package -DskipTests - uses: ncipollo/release-action@v1 with: - artifacts: "pulsar-jms-all/target/pulsar-jms-all-*.jar,resource-adapter/target/pulsarra-rar.rar,pulsar-jms-filters/target/pulsar-jms*.nar,pulsar-jms-cli/target/jms-cli.jar,pulsar-jms-admin-ext/target/pulsar-jms*.nar" + artifacts: "pulsar-jms-all/target/pulsar-jms-all-*.jar,resource-adapter/target/pulsarra-rar.rar,pulsar-jms-filters/target/pulsar-jms*.nar,pulsar-jms-cli/target/jms-cli.jar,pulsar-jms-admin-ext/target/pulsar-jms*.nar,pulsar-jms-tracing/target/pulsar-jms*.nar" token: ${{ secrets.GITHUB_TOKEN }} generateReleaseNotes: true diff --git a/pom.xml b/pom.xml index d53377e5..ff75640c 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,7 @@ activemq-filters pulsar-jms-filters + pulsar-jms-tracing pulsar-jms-admin-api pulsar-jms resource-adapter @@ -64,6 +65,7 @@ 3.1.0 2.14.2 2.8.9 + 32.1.2-jre 1.21 4.0.3 3.1.0 @@ -106,6 +108,11 @@ slf4j-simple ${slf4j.version} + + com.google.guava + guava + ${guava.version} + org.junit.jupiter junit-jupiter-engine diff --git a/pulsar-jms-admin-ext/pom.xml b/pulsar-jms-admin-ext/pom.xml index ed3c04d1..94ecc0ae 100644 --- a/pulsar-jms-admin-ext/pom.xml +++ b/pulsar-jms-admin-ext/pom.xml @@ -66,7 +66,7 @@ org.apache.nifi nifi-nar-maven-plugin - 1.3.2 + 1.5.1 true pulsar-jms-admin-${project.version} diff --git a/pulsar-jms-filters/pom.xml b/pulsar-jms-filters/pom.xml index 02728d7c..58a39af5 100644 --- a/pulsar-jms-filters/pom.xml +++ b/pulsar-jms-filters/pom.xml @@ -85,7 +85,7 @@ org.apache.nifi nifi-nar-maven-plugin - 1.3.2 + 1.5.1 true pulsar-jms-${project.version} diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 8ac6fce5..dff7fc1c 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -103,8 +103,8 @@ copy filters - - + + diff --git a/pulsar-jms-tracing/pom.xml b/pulsar-jms-tracing/pom.xml new file mode 100644 index 00000000..19b588df --- /dev/null +++ b/pulsar-jms-tracing/pom.xml @@ -0,0 +1,90 @@ + + + + + pulsar-jms-parent + com.datastax.oss + 4.1.3-SNAPSHOT + + 4.0.0 + pulsar-jms-tracing + jar + DataStax Starlight for JMS - Broker Side Filters + + ${project.build.directory} + + + + com.fasterxml.jackson.core + jackson-databind + + + com.google.guava + guava + + + org.projectlombok + lombok + provided + + + com.github.spotbugs + spotbugs-annotations + provided + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + + + ${pulsar.groupId} + pulsar-broker + provided + + + + + + org.apache.nifi + nifi-nar-maven-plugin + 1.5.1 + true + + pulsar-jms-tracing-${project.version} + nar + + + + default-nar + package + + nar + + + + + + + diff --git a/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java new file mode 100644 index 00000000..13999322 --- /dev/null +++ b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java @@ -0,0 +1,748 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms.tracing; + +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.EventCategory; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.EventSubCategory; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.TraceLevel; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.getConnectionDetails; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.getConsumerDetails; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.getEntryDetails; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.getProducerDetails; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.getPublishContextDetails; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.getSubscriptionDetails; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.hostNameOf; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.trace; +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.traceMetadataAndPayload; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.netty.buffer.ByteBuf; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.MessageIdData; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.TxnAction; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl; +import org.apache.pulsar.common.util.DateFormatter; +import org.jetbrains.annotations.NotNull; + +@Slf4j +public class BrokerTracing implements BrokerInterceptor { + + private static final TraceLevel defaultTraceLevel = TraceLevel.OFF; + + private final Set jmsTracingEventCategory = new HashSet<>(); + private TraceLevel traceLevel = defaultTraceLevel; + private int maxPayloadLength = 256; + private int cacheTraceLevelsDurationSec = 10; + private boolean traceSystemTopics = false; + private boolean traceSchema = false; + + private static void loadEnabledEvents( + PulsarService pulsarService, Set enabledEvents, TraceLevel traceLevel) { + + if (traceLevel == TraceLevel.OFF) { + log.info("Tracing is disabled. jmsTracingEventCategory is ignored."); + enabledEvents.clear(); + return; + } + + Properties props = pulsarService.getConfiguration().getProperties(); + if (props.containsKey("jmsTracingEventCategory")) { + String events = props.getProperty("jmsTracingEventCategory", ""); + log.info("read jmsTracingEventCategory: {}", events); + + enabledEvents.clear(); + for (String event : events.split(",")) { + try { + enabledEvents.add(EventCategory.valueOf(event.trim().toUpperCase())); + } catch (IllegalArgumentException e) { + log.error("Invalid event: {}. Skipping", event); + } + } + } else { + log.warn("jmsTracingEventCategory is not set. Using ADMIN, CONN."); + enabledEvents.add(EventCategory.CONN); + } + } + + @NotNull + private static TraceLevel getTraceLevel(PulsarService pulsar) { + String level = + pulsar + .getConfiguration() + .getProperties() + .getProperty("jmsTracingLevel", defaultTraceLevel.toString()); + try { + TraceLevel traceLevel = TraceLevel.valueOf(level.trim().toUpperCase()); + log.info("Using tracing level: {}", traceLevel); + return traceLevel; + } catch (IllegalArgumentException e) { + log.warn("Invalid tracing level: {}. Using default: {}", level, defaultTraceLevel); + return defaultTraceLevel; + } + } + + private final LoadingCache traceLevelForSubscription = + CacheBuilder.newBuilder() + .maximumSize(10_000L) + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .expireAfterWrite(10L * cacheTraceLevelsDurationSec, TimeUnit.SECONDS) + .refreshAfterWrite(cacheTraceLevelsDurationSec, TimeUnit.SECONDS) + .build( + new CacheLoader() { + public TraceLevel load(Subscription sub) { + log.info("Loading trace level for subscription {}", sub); + return BrokerTracing.readTraceLevelForSubscription(sub); + } + + public ListenableFuture reload(Subscription sub, TraceLevel oldValue) + throws Exception { + SettableFuture future = SettableFuture.create(); + BrokerTracing.readTraceLevelForSubscriptionAsync(sub) + .whenComplete( + (level, ex) -> { + if (ex != null) { + future.setException(ex); + } else { + future.set(level); + } + }); + return future; + } + }); + private final LoadingCache traceLevelForProducer = + CacheBuilder.newBuilder() + .maximumSize(10_000L) + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .expireAfterWrite(10L * cacheTraceLevelsDurationSec, TimeUnit.SECONDS) + .refreshAfterWrite(cacheTraceLevelsDurationSec, TimeUnit.SECONDS) + .build( + new CacheLoader() { + public TraceLevel load(Producer producer) { + try { + log.info("Loading trace level for producer {}", producer); + PulsarAdmin admin = + producer.getCnx().getBrokerService().getPulsar().getAdminClient(); + Topic topic = producer.getTopic(); + + return BrokerTracing.readTraceLevelForTopic(admin, topic); + } catch (Throwable t) { + log.error("Error getting tracing level", t); + return TraceLevel.OFF; + } + } + + public ListenableFuture reload(Producer producer, TraceLevel oldValue) + throws Exception { + SettableFuture future = SettableFuture.create(); + + PulsarAdmin admin = + producer.getCnx().getBrokerService().getPulsar().getAdminClient(); + Topic topic = producer.getTopic(); + + BrokerTracing.readTraceLevelForTopicAsync(admin, topic) + .whenComplete( + (level, ex) -> { + if (ex != null) { + future.setException(ex); + } else { + future.set(level); + } + }); + + return future; + } + }); + + @NotNull + private static TraceLevel readTraceLevelForSubscription(Subscription sub) { + try { + return readTraceLevelForSubscriptionAsync(sub).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Interrupted while getting subscription tracing level for {}", sub, e); + Thread.currentThread().interrupt(); + return TraceLevel.OFF; + } catch (Throwable t) { + log.error("Error getting subscription tracing level for {}", sub, t); + return TraceLevel.OFF; + } + } + + @NotNull + private static CompletableFuture readTraceLevelForSubscriptionAsync( + Subscription sub) { + Map subProps = sub.getSubscriptionProperties(); + try { + if (subProps == null || !subProps.containsKey("trace")) { + PulsarAdmin admin = sub.getTopic().getBrokerService().getPulsar().getAdminClient(); + return BrokerTracing.readTraceLevelForTopicAsync(admin, sub.getTopic()); + } + + return CompletableFuture.completedFuture( + TraceLevel.valueOf(subProps.get("trace").trim().toUpperCase())); + } catch (IllegalArgumentException e) { + log.warn( + "Invalid tracing level: {}. Setting to NONE for subscription {}", + subProps.get("trace"), + sub); + return CompletableFuture.completedFuture(TraceLevel.OFF); + } catch (Throwable t) { + log.error("Error getting tracing level. Setting to NONE for subscription {}", sub, t); + return CompletableFuture.completedFuture(TraceLevel.OFF); + } + } + + @NotNull + private static TraceLevel readTraceLevelForTopic(PulsarAdmin admin, Topic topic) { + try { + return readTraceLevelForTopicAsync(admin, topic).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Interrupted while getting tracing level for topic {}", topic.getName(), e); + Thread.currentThread().interrupt(); + return TraceLevel.OFF; + } catch (Throwable t) { + log.error("Error getting tracing level for topic {}", topic.getName(), t); + return TraceLevel.OFF; + } + } + + @NotNull + private static CompletableFuture readTraceLevelForTopicAsync( + PulsarAdmin admin, Topic topic) { + CompletableFuture> propsFuture = + admin.topics().getPropertiesAsync(TopicName.get(topic.getName()).getPartitionedTopicName()); + return propsFuture.handle( + (props, ex) -> { + if (ex != null) { + log.error("Error getting tracing level for topic {}", topic.getName(), ex); + return TraceLevel.OFF; + } + + try { + if (props == null || !props.containsKey("trace")) { + return TraceLevel.OFF; + } + + return TraceLevel.valueOf(props.get("trace").trim().toUpperCase()); + } catch (IllegalArgumentException e) { + log.warn( + "Invalid tracing level for topic {}: {}. Setting to NONE", + topic.getName(), + props.get("trace")); + return TraceLevel.OFF; + } + }); + } + + @NotNull + private static String formatMessageId(MessageIdData x) { + String msgId = x.getLedgerId() + ":" + x.getEntryId(); + if (x.hasBatchIndex()) { + msgId += " (batchSize: " + x.getBatchSize() + "|ackSetCnt: " + x.getAckSetsCount() + ")"; + } else if (x.getAckSetsCount() > 0) { + msgId += " (ackSetCnt " + x.getAckSetsCount() + ")"; + } + return msgId; + } + + public void initialize(PulsarService pulsarService) { + log.info("Initializing BrokerTracing"); + + traceLevel = getTraceLevel(pulsarService); + + loadEnabledEvents(pulsarService, jmsTracingEventCategory, traceLevel); + + Properties props = pulsarService.getConfiguration().getProperties(); + if (props.containsKey("jmsTracingMaxPayloadLength")) { + maxPayloadLength = Integer.parseInt(props.getProperty("jmsTracingMaxPayloadLength")); + log.info("Setting maxPayloadLength to {}", maxPayloadLength); + } + if (props.containsKey("jmsTracingTraceSystemTopics")) { + traceSystemTopics = Boolean.parseBoolean(props.getProperty("jmsTracingTraceSystemTopics")); + log.info("Setting traceSystemTopics to {}", traceSystemTopics); + } + if (props.containsKey("jmsTracingTraceSchema")) { + traceSchema = Boolean.parseBoolean(props.getProperty("jmsTracingTraceSchema")); + log.info("Setting traceSchema to {}", traceSchema); + } + if (props.containsKey("jmsTracingCacheTraceLevelsDurationSec")) { + cacheTraceLevelsDurationSec = + Integer.parseInt(props.getProperty("jmsTracingCacheTraceLevelsDurationSec")); + if (cacheTraceLevelsDurationSec <= 0) { + log.warn( + "Invalid cache duration: {}. Setting to default: {}", cacheTraceLevelsDurationSec, 10); + cacheTraceLevelsDurationSec = 10; + } + log.info("Setting cacheTraceLevelsDurationSec to {}", cacheTraceLevelsDurationSec); + } + } + + @Override + public void close() { + log.info("Closing BrokerTracing"); + } + + private TraceLevel getTracingLevel(Consumer consumer) { + if (consumer == null) return TraceLevel.OFF; + + return getTracingLevel(consumer.getSubscription()); + } + + private TraceLevel getTracingLevel(Subscription sub) { + if (sub == null) return TraceLevel.OFF; + + if (!traceSystemTopics && sub.getTopic().isSystemTopic()) return TraceLevel.OFF; + + try { + return traceLevelForSubscription.get(sub); + } catch (ExecutionException e) { + log.error("Error getting tracing level", e); + return TraceLevel.OFF; + } + } + + private TraceLevel getTracingLevel(Producer producer) { + if (producer == null) return TraceLevel.OFF; + + if (!traceSystemTopics && producer.getTopic().isSystemTopic()) return TraceLevel.OFF; + + try { + return traceLevelForProducer.get(producer); + } catch (ExecutionException e) { + log.error("Error getting tracing level", e); + return TraceLevel.OFF; + } + } + + private static void addMinimumProducerDetails( + Producer producer, Map traceDetails) { + if (producer == null) return; + + traceDetails.put("producerId", producer.getProducerId()); + traceDetails.put("producerName", producer.getProducerName()); + if (producer.getAccessMode() != null) { + traceDetails.put("accessMode", producer.getAccessMode().name()); + } + traceDetails.put( + "clientHost", + TracingUtils.hostNameOf( + producer.getClientAddress(), producer.getCnx().clientSourceAddressAndPort())); + + if (producer.getTopic() != null) { + traceDetails.put( + "topicName", TopicName.get(producer.getTopic().getName()).getPartitionedTopicName()); + } + traceDetails.put("authRole", producer.getCnx().getAuthRole()); + } + + private static void addMinimumConsumerSubscriptionDetails( + Consumer consumer, Map traceDetails) { + if (consumer == null) return; + + addMinimumConsumerSubscriptionDetails(consumer, consumer.getSubscription(), traceDetails); + } + + private static void addMinimumConsumerSubscriptionDetails( + Consumer consumer, Subscription subscription, Map traceDetails) { + if (consumer != null) { + traceDetails.put("consumerName", consumer.consumerName()); + traceDetails.put("consumerId", consumer.consumerId()); + traceDetails.put( + "clientHost", + TracingUtils.hostNameOf( + consumer.getClientAddress(), consumer.cnx().clientSourceAddressAndPort())); + traceDetails.put("authRole", consumer.cnx().getAuthRole()); + } + + if (subscription != null) { + traceDetails.put("subscriptionName", subscription.getName()); + traceDetails.put( + "topicName", TopicName.get(subscription.getTopicName()).getPartitionedTopicName()); + traceDetails.put("subscriptionType", subscription.getType().name()); + } + } + + /* *************************** + ** Connection events + ******************************/ + + public void onConnectionCreated(ServerCnx cnx) { + if (!jmsTracingEventCategory.contains(EventCategory.CONN)) return; + + Map traceDetails = new TreeMap<>(); + traceDetails.put("serverCnx", getConnectionDetails(cnx)); + traceDetails.put("brokerUrl", cnx.getBrokerService().getPulsar().getBrokerServiceUrl()); + + trace(EventCategory.CONN, EventSubCategory.CREATED, traceDetails); + } + + public void onConnectionClosed(ServerCnx cnx) { + if (!jmsTracingEventCategory.contains(EventCategory.CONN)) return; + + Map traceDetails = new TreeMap<>(); + traceDetails.put("serverCnx", getConnectionDetails(cnx)); + traceDetails.put("brokerUrl", cnx.getBrokerService().getPulsar().getBrokerServiceUrl()); + + trace(EventCategory.CONN, EventSubCategory.CLOSED, traceDetails); + } + + /* ************************** + * Producer connection events + *****************************/ + + public void producerCreated(ServerCnx cnx, Producer producer, Map metadata) { + if (!jmsTracingEventCategory.contains(EventCategory.PROD)) return; + if (!traceSystemTopics && producer.getTopic().isSystemTopic()) return; + + Map traceDetails = new TreeMap<>(); + traceDetails.put("producer", getProducerDetails(producer, traceSchema)); + traceDetails.put("metadata", metadata); + traceDetails.put("brokerUrl", cnx.getBrokerService().getPulsar().getBrokerServiceUrl()); + + trace(EventCategory.PROD, EventSubCategory.CREATED, traceDetails); + } + + public void producerClosed(ServerCnx cnx, Producer producer, Map metadata) { + if (!jmsTracingEventCategory.contains(EventCategory.PROD)) return; + if (!traceSystemTopics && producer.getTopic().isSystemTopic()) return; + + Map traceDetails = new TreeMap<>(); + traceDetails.put("producer", getProducerDetails(producer, traceSchema)); + traceDetails.put("metadata", metadata); + traceDetails.put("brokerUrl", cnx.getBrokerService().getPulsar().getBrokerServiceUrl()); + + Map statsTrace = new TreeMap<>(); + PublisherStatsImpl stats = producer.getStats(); + + statsTrace.put("connectedSince", stats.getConnectedSince()); + statsTrace.put("closedAt", DateFormatter.now()); + statsTrace.put("averageMsgSize", stats.getAverageMsgSize()); + statsTrace.put("msgRateIn", stats.getMsgRateIn()); + statsTrace.put("msgThroughputIn", stats.getMsgThroughputIn()); + // no message count in stats? stats.getCount() is not it + traceDetails.put("stats", statsTrace); + + trace(EventCategory.PROD, EventSubCategory.CLOSED, traceDetails); + } + + /* ************************** + * Consumer connection events + *****************************/ + + public void consumerCreated(ServerCnx cnx, Consumer consumer, Map metadata) { + if (!jmsTracingEventCategory.contains(EventCategory.CONS)) return; + if (!traceSystemTopics && consumer.getSubscription().getTopic().isSystemTopic()) return; + + Map traceDetails = new TreeMap<>(); + traceDetails.put("consumer", getConsumerDetails(consumer)); + traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription())); + traceDetails.put("metadata", metadata); + traceDetails.put("brokerUrl", cnx.getBrokerService().getPulsar().getBrokerServiceUrl()); + + trace(EventCategory.CONS, EventSubCategory.CREATED, traceDetails); + } + + public void consumerClosed(ServerCnx cnx, Consumer consumer, Map metadata) { + if (!jmsTracingEventCategory.contains(EventCategory.CONS)) return; + if (!traceSystemTopics && consumer.getSubscription().getTopic().isSystemTopic()) return; + + Map traceDetails = new TreeMap<>(); + traceDetails.put("consumer", getConsumerDetails(consumer)); + traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription())); + traceDetails.put("metadata", metadata); + traceDetails.put("brokerUrl", cnx.getBrokerService().getPulsar().getBrokerServiceUrl()); + + ConsumerStatsImpl stats = consumer.getStats(); + Map statsTrace = new TreeMap<>(); + statsTrace.put("connectedSince", stats.getConnectedSince()); + statsTrace.put("closedAt", DateFormatter.now()); + statsTrace.put("averageMsgSize", stats.getAvgMessagesPerEntry()); + statsTrace.put("msgRateOut", stats.getMsgRateOut()); + statsTrace.put("msgThroughputOut", stats.getMsgThroughputOut()); + statsTrace.put("msgOutCounter", stats.getMsgOutCounter()); + statsTrace.put("bytesOutCounter", stats.getBytesOutCounter()); + statsTrace.put("unackedMessages", stats.getUnackedMessages()); + statsTrace.put("messageAckRate", stats.getMessageAckRate()); + statsTrace.put("msgRateRedeliver", stats.getMsgRateRedeliver()); + statsTrace.put("readPositionWhenJoining", stats.getReadPositionWhenJoining()); + Subscription sub = consumer.getSubscription(); + if (sub != null) { + statsTrace.put("subscriptionApproxBacklog", sub.getNumberOfEntriesInBacklog(false)); + statsTrace.put("subscriptionMsgRateExpired", sub.getExpiredMessageRate()); + } + traceDetails.put("stats", statsTrace); + + trace(EventCategory.CONS, EventSubCategory.CLOSED, traceDetails); + } + + /* *************************** + ** Message events + ******************************/ + + public void onMessagePublish( + Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) { + if (!jmsTracingEventCategory.contains(EventCategory.MSG)) return; + + TraceLevel level = getTracingLevel(producer); + if (level == TraceLevel.OFF) return; + + Map traceDetails = new TreeMap<>(); + + addMinimumProducerDetails(producer, traceDetails); + + traceDetails.put("publishContext", getPublishContextDetails(level, publishContext)); + + if (TraceLevel.PAYLOAD == level && headersAndPayload != null) { + Map headersAndPayloadDetails = new TreeMap<>(); + traceMetadataAndPayload( + "payload", + headersAndPayload.retainedDuplicate(), + headersAndPayloadDetails, + maxPayloadLength); + traceDetails.put("headersAndPayload", headersAndPayloadDetails); + } + + trace(EventCategory.MSG, EventSubCategory.PRODUCED, traceDetails); + } + + public void messageProduced( + ServerCnx cnx, + Producer producer, + long startTimeNs, + long ledgerId, + long entryId, + Topic.PublishContext publishContext) { + if (!jmsTracingEventCategory.contains(EventCategory.MSG)) return; + + TraceLevel level = getTracingLevel(producer); + if (level == TraceLevel.OFF) return; + + Map traceDetails = new TreeMap<>(); + addMinimumProducerDetails(producer, traceDetails); + + traceDetails.put("publishContext", getPublishContextDetails(level, publishContext)); + traceDetails.put("messageId", ledgerId + ":" + entryId); + trace(EventCategory.MSG, EventSubCategory.STORED, traceDetails); + } + + public void beforeSendMessage( + Subscription subscription, + Entry entry, + long[] ackSet, + MessageMetadata msgMetadata, + Consumer consumer) { + if (!jmsTracingEventCategory.contains(EventCategory.MSG)) return; + + TraceLevel level = getTracingLevel(subscription); + if (level == TraceLevel.OFF) return; + + Map traceDetails = new TreeMap<>(); + + addMinimumConsumerSubscriptionDetails(consumer, subscription, traceDetails); + + traceDetails.put("messageId", entry.getLedgerId() + ":" + entry.getEntryId()); + + traceDetails.put("headersAndPayload", getEntryDetails(level, entry, maxPayloadLength)); + + trace(EventCategory.MSG, EventSubCategory.READ, traceDetails); + } + + public void messageDispatched( + ServerCnx cnx, Consumer consumer, long ledgerId, long entryId, ByteBuf headersAndPayload) { + if (!jmsTracingEventCategory.contains(EventCategory.MSG)) return; + + TraceLevel level = getTracingLevel(consumer); + if (level == TraceLevel.OFF) return; + + Map traceDetails = new TreeMap<>(); + addMinimumConsumerSubscriptionDetails(consumer, traceDetails); + traceDetails.put("messageId", ledgerId + ":" + entryId); + + if (TraceLevel.PAYLOAD == level && headersAndPayload != null) { + Map headersAndPayloadDetails = new TreeMap<>(); + traceMetadataAndPayload( + "payload", + headersAndPayload.retainedDuplicate(), + headersAndPayloadDetails, + maxPayloadLength); + traceDetails.put("headersAndPayload", headersAndPayloadDetails); + } + + trace(EventCategory.MSG, EventSubCategory.DISPATCHED, traceDetails); + } + + public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) { + if (!jmsTracingEventCategory.contains(EventCategory.MSG)) return; + + TraceLevel level = getTracingLevel(consumer); + if (consumer != null && level == TraceLevel.OFF) return; + + Map traceDetails = new TreeMap<>(); + + addMinimumConsumerSubscriptionDetails(consumer, traceDetails); + + Map ackDetails = new TreeMap<>(); + if (ackCmd.hasAckType()) { + ackDetails.put("type", ackCmd.getAckType().name()); + } + ackDetails.put("numAckedMessages", ackCmd.getMessageIdsCount()); + ackDetails.put( + "messageIds", + ackCmd + .getMessageIdsList() + .stream() + .map(BrokerTracing::formatMessageId) + .collect(Collectors.toList())); + + if (ackCmd.hasTxnidLeastBits() && ackCmd.hasTxnidMostBits()) { + ackDetails.put( + "txnID", "(" + ackCmd.getTxnidMostBits() + "," + ackCmd.getTxnidLeastBits() + ")"); + } + if (ackCmd.hasRequestId()) { + ackDetails.put("requestId", ackCmd.getRequestId()); + } + + traceDetails.put("ack", ackDetails); + + EventSubCategory subcategory = + consumer == null ? EventSubCategory.FILTERED : EventSubCategory.ACKED; + + trace(EventCategory.MSG, subcategory, traceDetails); + } + + /* *************************** + ** Transaction events + ******************************/ + + public void txnOpened(long tcId, String txnID) { + if (!jmsTracingEventCategory.contains(EventCategory.TX)) return; + + Map traceDetails = new TreeMap<>(); + traceDetails.put("tcId", tcId); // transaction coordinator id + traceDetails.put("txnID", txnID); + + trace(EventCategory.TX, EventSubCategory.OPENED, traceDetails); + } + + public void txnEnded(String txnID, long txnAction) { + if (!jmsTracingEventCategory.contains(EventCategory.TX)) return; + + final EventSubCategory subcategory; + Map traceDetails = new TreeMap<>(); + traceDetails.put("txnID", txnID); + + TxnAction action = TxnAction.valueOf((int) txnAction); + if (action == null) { + subcategory = EventSubCategory.CLOSED; + traceDetails.put("txnAction", "unknown action code " + txnAction); + } else { + traceDetails.put("txnAction", action.name()); + switch (action) { + case COMMIT: + subcategory = EventSubCategory.COMMITTED; + break; + case ABORT: + subcategory = EventSubCategory.ABORTED; + break; + default: + subcategory = EventSubCategory.CLOSED; + traceDetails.put("txnAction", "unknown action code " + txnAction + " " + action.name()); + break; + } + } + + trace(EventCategory.TX, subcategory, traceDetails); + } + + /* *************************** + ** Servlet events + ******************************/ + + public void onWebserviceResponse(ServletRequest request, ServletResponse response) { + if (!jmsTracingEventCategory.contains(EventCategory.REST)) return; + + Map traceDetails = new TreeMap<>(); + + traceDetails.put("host", hostNameOf(request.getRemoteHost(), request.getRemotePort())); + traceDetails.put("protocol", request.getProtocol()); + traceDetails.put("scheme", request.getScheme()); + + // todo: log POST payload? + try { + HttpServletRequest req = (HttpServletRequest) FieldUtils.readField(request, "request", true); + traceDetails.put("method", req.getMethod()); + traceDetails.put("uri", req.getRequestURI()); + if (req.getQueryString() != null) { + traceDetails.put("queryString", req.getQueryString()); + } + traceDetails.put("authType", req.getAuthType()); + traceDetails.put("remoteUser", req.getRemoteUser()); + + HttpServletResponse resp = (HttpServletResponse) response; + traceDetails.put("status", resp.getStatus()); + } catch (Throwable t) { + log.error("Error getting request details", t); + } + + trace(EventCategory.REST, EventSubCategory.CALLED, traceDetails); + } + + /* *************************** + ** Skipped + ******************************/ + + public void onPulsarCommand(BaseCommand command, ServerCnx cnx) { + // skipping, output is not useful + } + + public void onWebserviceRequest(ServletRequest request) { + // skipping, it is the same as onWebserviceResponse + // but without response status. + } + + // public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain) +} diff --git a/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/TracingUtils.java b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/TracingUtils.java new file mode 100644 index 00000000..c6e6534b --- /dev/null +++ b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/TracingUtils.java @@ -0,0 +1,523 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms.tracing; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.netty.buffer.ByteBuf; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.commons.codec.binary.Hex; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; + +@Slf4j +public class TracingUtils { + + public enum EventCategory { + CONN, // connection creation, closure, + PROD, // producer creation, closure, + CONS, // consumer creation, closure, + TX, // (transaction creation, commit, rollback,etc), + MSG, // (message level send,dispatch,ack,expire,acktimeout, negative ack, etc), + REST, // (rest api calls), + } + + public enum EventSubCategory { + CREATED, + CLOSED, + + PRODUCED, + STORED, + + READ, + DISPATCHED, + ACKED, + FILTERED, + + OPENED, + COMMITTED, + ABORTED, + + CALLED, + } + + @FunctionalInterface + public interface Tracer { + void trace(EventCategory category, String message); + } + + public static class Slf4jTracer implements Tracer { + private static final Map traceLoggers = new HashMap<>(); + + static { + for (EventCategory category : EventCategory.values()) { + traceLoggers.put( + category, + org.slf4j.LoggerFactory.getLogger("jms-tracing-" + category.name().toLowerCase())); + } + } + + @Override + public void trace(EventCategory category, String message) { + traceLoggers.get(category).info(message); + } + } + + public static final Tracer SLF4J_TRACER = new Slf4jTracer(); + + private static final ObjectMapper mapper = + new ObjectMapper() + .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS) + .enable(SerializationFeature.WRITE_NULL_MAP_VALUES); + + public enum TraceLevel { + OFF, // disabled + ON, // enabled without payload tracing + PAYLOAD, // enabled with payload tracing + } + + private static final LoadingCache ipResolverCache = + CacheBuilder.newBuilder() + .maximumSize(10_000L) + .expireAfterWrite(4, TimeUnit.HOURS) + .build( + new CacheLoader() { + public String load(String clientAddress) { + // Dn resolution can be slow in some cases + // and we do not want to create too many requests to DNS, + // so we cache the result + log.info("resolving DNS for {}", clientAddress); + try { + InetAddress address = InetAddress.getByName(clientAddress); + String hostName = address.getCanonicalHostName(); + if (log.isDebugEnabled()) { + log.debug("Resolved DNS for {} to {}", clientAddress, hostName); + } + return hostName; + } catch (UnknownHostException e) { + log.error("Failed to resolve DNS for {}", clientAddress, e); + return clientAddress; + } + } + }); + + public static String hostNameOf(String clientAddress, String clientSourceAddressAndPort) { + if (clientAddress == null + || clientAddress.isEmpty() + || clientSourceAddressAndPort == null + || !clientSourceAddressAndPort.contains(":")) { + return "unknown/null"; + } + + try { + String port = clientSourceAddressAndPort.split(":")[1]; + return ipResolverCache.get(clientAddress) + ":" + port; + } catch (Throwable t) { + log.error("Failed to resolve DNS for {}", clientAddress, t); + return clientAddress; + } + } + + public static String hostNameOf(String clientAddress, int remotePort) { + if (clientAddress == null || clientAddress.isEmpty()) { + return "unknown/null"; + } + + try { + return ipResolverCache.get(clientAddress) + ":" + remotePort; + } catch (Throwable t) { + log.error("Failed to resolve DNS for {}", clientAddress, t); + return clientAddress; + } + } + + public static void trace( + EventCategory category, EventSubCategory subCategory, Map traceDetails) { + trace(SLF4J_TRACER, category, subCategory, traceDetails); + } + + public static void trace( + Tracer tracer, + EventCategory category, + EventSubCategory subCategory, + Map traceDetails) { + Map trace = new TreeMap<>(); + trace.put("event", category + "_" + subCategory); + trace.put("traceDetails", traceDetails); + + try { + String loggableJsonString = mapper.writeValueAsString(trace); + tracer.trace(category, loggableJsonString); + } catch (JsonProcessingException e) { + log.error( + "Failed to serialize trace event '{}_{}' as json, traceDetails: {}", + category, + subCategory, + traceDetails, + e); + } + } + + public static Map getConnectionDetails(ServerCnx cnx) { + if (cnx == null) { + return null; + } + + Map details = new TreeMap<>(); + populateConnectionDetails(cnx, details); + return details; + } + + private static void populateConnectionDetails(ServerCnx cnx, Map traceDetails) { + if (cnx == null) { + return; + } + traceDetails.put( + "clientHost", hostNameOf(cnx.clientSourceAddress(), cnx.clientSourceAddressAndPort())); + traceDetails.put("authRole", cnx.getAuthRole()); + traceDetails.put("clientVersion", cnx.getClientVersion()); + traceDetails.put("clientSourceAddressAndPort", cnx.clientSourceAddressAndPort()); + traceDetails.put("authMethod", cnx.getAuthMethod()); + if (cnx.getAuthenticationProvider() != null) { + traceDetails.put("authMethodName", cnx.getAuthenticationProvider().getAuthMethodName()); + } + + traceDetails.put("state", cnx.getState()); + + AuthenticationDataSource authData = cnx.getAuthenticationData(); + if (authData != null) { + traceDetails.put("authData", getAuthDataDetails(authData)); + } + } + + private static Object getAuthDataDetails(AuthenticationDataSource authData) { + if (authData == null) { + return null; + } + + Map details = new TreeMap<>(); + populateAuthDataDetails(authData, details); + return details; + } + + private static void populateAuthDataDetails( + AuthenticationDataSource authData, Map details) { + if (authData == null) { + return; + } + + details.put("peerAddress", authData.getPeerAddress()); + details.put("commandData", authData.getCommandData()); + details.put("httpAuthType", authData.getHttpAuthType()); + details.put("subscription", authData.getSubscription()); + if (authData.getTlsCertificates() != null) { + details.put( + "tlsCertificates", + Arrays.stream(authData.getTlsCertificates()) + .map(Object::toString) + .collect(Collectors.toList())); + } + } + + public static Map getSubscriptionDetails(Subscription sub) { + if (sub == null) { + return null; + } + + Map details = new TreeMap<>(); + populateSubscriptionDetails(sub, details); + return details; + } + + private static void populateSubscriptionDetails( + Subscription sub, Map traceDetails) { + if (sub == null) { + return; + } + + traceDetails.put("name", sub.getName()); + traceDetails.put("topicName", TopicName.get(sub.getTopicName()).getPartitionedTopicName()); + traceDetails.put("type", sub.getType().name()); + + if (sub.getConsumers() != null) { + traceDetails.put("numberOfConsumers", sub.getConsumers().size()); + } + traceDetails.put("isReplicated", sub.isReplicated()); + traceDetails.put("numberOfEntriesDelayed", sub.getNumberOfEntriesDelayed()); + traceDetails.put("numberOfEntriesInBacklog", sub.getNumberOfEntriesInBacklog(false)); + + traceDetails.put("subscriptionProperties", sub.getSubscriptionProperties()); + } + + public static Map getConsumerDetails(Consumer consumer) { + if (consumer == null) { + return null; + } + + Map details = new TreeMap<>(); + populateConsumerDetails(consumer, details); + return details; + } + + private static void populateConsumerDetails(Consumer consumer, Map traceDetails) { + if (consumer == null) { + return; + } + + traceDetails.put("name", consumer.consumerName()); + traceDetails.put("consumerId", consumer.consumerId()); + + traceDetails.put("priorityLevel", consumer.getPriorityLevel()); + traceDetails.put("subType", consumer.subType() == null ? null : consumer.subType().name()); + traceDetails.put( + "clientHost", + hostNameOf(consumer.getClientAddress(), consumer.cnx().clientSourceAddressAndPort())); + + traceDetails.put("metadata", consumer.getMetadata()); + traceDetails.put("unackedMessages", consumer.getUnackedMessages()); + traceDetails.put("authRole", consumer.cnx().getAuthRole()); + } + + public static Map getProducerDetails(Producer producer, boolean traceSchema) { + if (producer == null) { + return null; + } + + Map details = new TreeMap<>(); + populateProducerDetails(producer, details, traceSchema); + return details; + } + + private static void populateProducerDetails( + Producer producer, Map traceDetails, boolean traceSchema) { + if (producer == null) { + return; + } + + traceDetails.put("producerId", producer.getProducerId()); + traceDetails.put("producerName", producer.getProducerName()); + traceDetails.put( + "accessMode", producer.getAccessMode() == null ? null : producer.getAccessMode().name()); + if (producer.getTopic() != null) { + traceDetails.put( + "topicName", TopicName.get(producer.getTopic().getName()).getPartitionedTopicName()); + } + + traceDetails.put( + "clientHost", + hostNameOf(producer.getClientAddress(), producer.getCnx().clientSourceAddressAndPort())); + + traceDetails.put("metadata", producer.getMetadata()); + + if (traceSchema && producer.getSchemaVersion() != null) { + final String schemaVersion; + if (producer.getSchemaVersion() == SchemaVersion.Empty) { + schemaVersion = "Empty"; + } else if (producer.getSchemaVersion() == SchemaVersion.Latest) { + schemaVersion = "Latest"; + } else { + schemaVersion = "0x" + Hex.encodeHexString(producer.getSchemaVersion().bytes()); + } + traceDetails.put("schemaVersion", schemaVersion); + } + traceDetails.put("remoteCluster", producer.getRemoteCluster()); + + traceDetails.put("authRole", producer.getCnx().getAuthRole()); + } + + public static Map getMessageMetadataDetails(MessageMetadata msgMetadata) { + if (msgMetadata == null) { + return null; + } + + Map details = new TreeMap<>(); + populateMessageMetadataDetails(msgMetadata, details); + return details; + } + + private static void populateMessageMetadataDetails( + MessageMetadata msgMetadata, Map traceDetails) { + if (msgMetadata == null) { + return; + } + + if (msgMetadata.hasPartitionKey()) { + traceDetails.put("partitionKey", msgMetadata.getPartitionKey()); + } + if (msgMetadata.hasSequenceId()) { + traceDetails.put("sequenceId", msgMetadata.getSequenceId()); + } + if (msgMetadata.hasUncompressedSize()) { + traceDetails.put("uncompressedSize", msgMetadata.getUncompressedSize()); + } + if (msgMetadata.hasNumMessagesInBatch()) { + traceDetails.put("numMessagesInBatch", msgMetadata.getNumMessagesInBatch()); + } + traceDetails.put("serializedSize", msgMetadata.getSerializedSize()); + + if (msgMetadata.hasPublishTime()) { + traceDetails.put("publishTime", msgMetadata.getPublishTime()); + } + if (msgMetadata.hasEventTime()) { + traceDetails.put("eventTime", msgMetadata.getEventTime()); + } + if (msgMetadata.hasReplicatedFrom()) { + traceDetails.put("replicatedFrom", msgMetadata.getReplicatedFrom()); + } + if (msgMetadata.hasUuid()) { + traceDetails.put("uuid", msgMetadata.getUuid()); + } + } + + public static Map getEntryDetails( + TraceLevel level, Entry entry, int maxBinaryDataLength) { + if (entry == null) { + return null; + } + + Map details = new TreeMap<>(); + populateEntryDetails(level, entry, details, maxBinaryDataLength); + return details; + } + + private static void populateEntryDetails( + TraceLevel level, Entry entry, Map traceDetails, int maxBinaryDataLength) { + if (entry == null) { + return; + } + + traceDetails.put("length", entry.getLength()); + + if (TraceLevel.PAYLOAD == level && entry.getDataBuffer() != null) { + traceMetadataAndPayload( + "payload", entry.getDataBuffer().retainedDuplicate(), traceDetails, maxBinaryDataLength); + } + } + + public static Map getPublishContextDetails( + TraceLevel level, Topic.PublishContext publishContext) { + if (publishContext == null) { + return null; + } + + Map details = new TreeMap<>(); + populatePublishContext(level, publishContext, details); + return details; + } + + private static void populatePublishContext( + TraceLevel level, Topic.PublishContext publishContext, Map traceDetails) { + traceDetails.put("sequenceId", publishContext.getSequenceId()); + traceDetails.put("entryTimestamp", publishContext.getEntryTimestamp()); + traceDetails.put("msgSize", publishContext.getMsgSize()); + + if (TraceLevel.PAYLOAD == level) { + traceDetails.put("numberOfMessages", publishContext.getNumberOfMessages()); + traceDetails.put("isMarkerMessage", publishContext.isMarkerMessage()); + traceDetails.put("isChunked", publishContext.isChunked()); + if (publishContext.getOriginalProducerName() != null) { + traceDetails.put("originalProducerName", publishContext.getOriginalProducerName()); + traceDetails.put("originalSequenceId", publishContext.getOriginalSequenceId()); + } + } + } + + /** this will release metadataAndPayload */ + public static void traceMetadataAndPayload( + String key, + ByteBuf metadataAndPayload, + Map traceDetails, + int maxPayloadLength) { + if (metadataAndPayload == null) return; + if (maxPayloadLength <= 0) { + metadataAndPayload.release(); + return; + } + try { + // advance readerIndex + MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); + + // todo: do we need to trace this metadata? + populateMessageMetadataDetails(metadata, traceDetails); + + // Decode if needed + CompressionCodec codec = + CompressionCodecProvider.getCompressionCodec(metadata.getCompression()); + ByteBuf uncompressedPayload = + codec.decode(metadataAndPayload, metadata.getUncompressedSize()); + traceByteBuf(key, uncompressedPayload, traceDetails, maxPayloadLength); + } catch (Throwable t) { + log.error("Failed to trace metadataAndPayload", t); + } finally { + metadataAndPayload.release(); + } + } + + /** this will release payload */ + public static void traceByteBuf( + String key, ByteBuf payload, Map traceDetails, int maxPayloadLength) { + if (payload == null) return; + + if (maxPayloadLength <= 0) { + payload.release(); + return; + } + + try { + // todo: does this require additional steps if messages are batched? + String dataAsString = payload.toString(StandardCharsets.UTF_8); + if (dataAsString.length() > maxPayloadLength + 3) { + dataAsString = dataAsString.substring(0, maxPayloadLength) + "..."; + } + traceDetails.put(key, dataAsString); + } catch (Throwable t) { + log.error("Failed to convert ByteBuf to string", t); + if (payload.readableBytes() < maxPayloadLength) { + traceDetails.put(key, "0x" + Hex.encodeHexString(payload.nioBuffer())); + } else { + ByteBuf buf = payload.slice(0, maxPayloadLength / 2); + traceDetails.put(key, "0x" + Hex.encodeHexString(buf.nioBuffer()) + "..."); + buf.release(); + } + } finally { + payload.release(); + } + } +} diff --git a/pulsar-jms-tracing/src/main/resources/META-INF/services/broker_interceptor.yml b/pulsar-jms-tracing/src/main/resources/META-INF/services/broker_interceptor.yml new file mode 100644 index 00000000..a47fa05f --- /dev/null +++ b/pulsar-jms-tracing/src/main/resources/META-INF/services/broker_interceptor.yml @@ -0,0 +1,3 @@ +interceptorClass: com.datastax.oss.pulsar.jms.tracing.BrokerTracing +name: jms-tracing +description: Starlight for JMS - support for server side tracing \ No newline at end of file diff --git a/pulsar-jms-tracing/src/test/java/com/datastax/oss/pulsar/jms/tracing/TracingUtilsTest.java b/pulsar-jms-tracing/src/test/java/com/datastax/oss/pulsar/jms/tracing/TracingUtilsTest.java new file mode 100644 index 00000000..3a87a5cf --- /dev/null +++ b/pulsar-jms-tracing/src/test/java/com/datastax/oss/pulsar/jms/tracing/TracingUtilsTest.java @@ -0,0 +1,138 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms.tracing; + +import static com.datastax.oss.pulsar.jms.tracing.TracingUtils.*; +import static org.junit.jupiter.api.Assertions.*; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import org.junit.jupiter.api.Test; + +class TracingUtilsTest { + + List traces = new ArrayList<>(); + private Tracer mockTracer = + new Tracer() { + @Override + public void trace(EventCategory reason, String msg) { + traces.add(msg); + } + }; + + private static void trace(Tracer mockTracer, Map traceDetails) { + TracingUtils.trace(mockTracer, EventCategory.MSG, EventSubCategory.PRODUCED, traceDetails); + } + + @Test + void traceTest() { + traces.clear(); + trace(mockTracer, null); + assertEquals(1, traces.size()); + assertEquals("{\"event\":\"MSG_PRODUCED\",\"traceDetails\":null}", traces.get(0)); + + Map map = new TreeMap<>(); + + traces.clear(); + trace(mockTracer, map); + assertEquals(1, traces.size()); + assertEquals("{\"event\":\"MSG_PRODUCED\",\"traceDetails\":{}}", traces.get(0)); + + map.put("key1", "value1"); + + traces.clear(); + trace(mockTracer, map); + assertEquals(1, traces.size()); + assertEquals( + "{\"event\":\"MSG_PRODUCED\",\"traceDetails\":{\"key1\":\"value1\"}}", traces.get(0)); + } + + // todo: + // @Test + // void getConnectionDetailsTest() { + // } + // + // @Test + // void getSubscriptionDetailsTest() { + // } + // + // @Test + // void getConsumerDetailsTest() { + // } + // + // @Test + // void getProducerDetails() { + // } + // + // @Test + // void getMessageMetadataDetailsTest() { + // } + // + // @Test + // void getEntryDetailsTest() { + // } + // + // @Test + // void getPublishContextDetailsTest() { + // } + + @Test + void traceByteBufTest() { + Map traceDetails = new TreeMap<>(); + + int maxBinaryDataLength = 100; + + traceByteBuf("key", null, traceDetails, maxBinaryDataLength); + assertEquals(0, traceDetails.size()); + + Random rand = new Random(); + + ByteBuf small = Unpooled.buffer(20); + for (int i = 0; i < 20; i++) { + char randomChar = (char) (rand.nextInt(26) + 'a'); + small.writeByte(randomChar); + } + String smallStr = small.toString(StandardCharsets.UTF_8); + assertEquals(1, small.refCnt()); + + traceByteBuf("key", small, traceDetails, maxBinaryDataLength); + assertEquals(1, traceDetails.size()); + assertEquals(20, ((String) traceDetails.get("key")).length()); + assertEquals(smallStr, traceDetails.get("key")); + assertEquals(0, small.refCnt()); + + ByteBuf big = Unpooled.buffer(maxBinaryDataLength + 100); + for (int i = 0; i < maxBinaryDataLength + 100; i++) { + char randomChar = (char) (rand.nextInt(26) + 'a'); + big.writeByte(randomChar); + } + assertEquals(1, big.refCnt()); + String bigStr = big.toString(StandardCharsets.UTF_8); + + traceDetails.clear(); + traceByteBuf("key", big, traceDetails, maxBinaryDataLength); + assertEquals(1, traceDetails.size()); + assertEquals(maxBinaryDataLength + 3, ((String) traceDetails.get("key")).length()); + assertEquals(bigStr.substring(0, maxBinaryDataLength) + "...", traceDetails.get("key")); + assertEquals(0, big.refCnt()); + } +} diff --git a/pulsar-jms-tracing/src/test/resources/log4j2.xml b/pulsar-jms-tracing/src/test/resources/log4j2.xml new file mode 100644 index 00000000..485f6705 --- /dev/null +++ b/pulsar-jms-tracing/src/test/resources/log4j2.xml @@ -0,0 +1,41 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index 00136c60..d6f93692 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -128,10 +128,10 @@ copy filters - - - - + + + +