From 0fda44cd3a80ff13f340d395561e9a27a8d78f3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNithin?= Date: Mon, 1 Apr 2024 10:54:43 +0530 Subject: [PATCH 1/5] Improve console logs by adding date details MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Nithin --- src/main/resources/logback.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index a11b4746..2fbb87cd 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -2,7 +2,7 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + %d{yyyy-MM-dd | HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n From 5d48654f2e51374dd77671d29bfcf786e55c53d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNithin?= Date: Mon, 1 Apr 2024 10:55:21 +0530 Subject: [PATCH 2/5] Include dependencies for improving Listener ITs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Nithin --- pom.xml | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index bda026f0..89d55b74 100644 --- a/pom.xml +++ b/pom.xml @@ -202,7 +202,22 @@ micrometer-registry-prometheus runtime - + + org.awaitility + awaitility + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.mockito + mockito-core + test + + From 88ea8d77aa741137baf83e73c6452afdf54e1743 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNithin?= Date: Mon, 1 Apr 2024 10:58:02 +0530 Subject: [PATCH 3/5] Include Non-Retryable exceptions in Listener Error Handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Nithin --- .../rest/client/config/KafkaConsumerErrorHandler.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java b/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java index bb464160..83ca30ae 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java @@ -2,6 +2,8 @@ import hlf.java.rest.client.exception.FabricTransactionException; import hlf.java.rest.client.exception.RetryableServiceException; +import hlf.java.rest.client.exception.ServiceException; +import hlf.java.rest.client.exception.UnrecognizedTransactionPayloadException; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -33,6 +35,9 @@ public class KafkaConsumerErrorHandler { private static final List> connectorRetryableExceptions = Arrays.asList(RetryableServiceException.class, FabricTransactionException.class); + private static final List> connectorNonRetryableExceptions = + Arrays.asList(UnrecognizedTransactionPayloadException.class, ServiceException.class); + @Autowired private KafkaProperties kafkaProperties; @Autowired private KafkaProducerConfig kafkaProducerConfig; @@ -88,6 +93,10 @@ public void accept( defaultErrorHandler.addRetryableExceptions(retryableExceptionClass); } + for (Class nonRetryableExceptionClass : connectorNonRetryableExceptions) { + defaultErrorHandler.addNotRetryableExceptions(nonRetryableExceptionClass); + } + return defaultErrorHandler; } From 98c92e455ceed15316dca77dd4d7d155f2ef8be5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNithin?= Date: Mon, 1 Apr 2024 10:59:58 +0530 Subject: [PATCH 4/5] Error handling & throughput improvements in Listener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Nithin --- .../client/listener/DynamicKafkaListener.java | 26 +++++++++---------- .../client/listener/TransactionConsumer.java | 11 +++----- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/src/main/java/hlf/java/rest/client/listener/DynamicKafkaListener.java b/src/main/java/hlf/java/rest/client/listener/DynamicKafkaListener.java index 128070f7..8daf9f1a 100644 --- a/src/main/java/hlf/java/rest/client/listener/DynamicKafkaListener.java +++ b/src/main/java/hlf/java/rest/client/listener/DynamicKafkaListener.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -36,9 +37,9 @@ @ConditionalOnProperty("kafka.integration-points[0].brokerHost") public class DynamicKafkaListener { - private static final int MAX_CONCURRENT_LISTENERS_PER_CONSUMER = 6; + private static final int MAX_CONCURRENT_LISTENERS_PER_CONSUMER = 12; - private List existingContainers = new ArrayList<>(); + @Getter private List existingContainers = new ArrayList<>(); @Autowired private KafkaProperties kafkaProperties; @@ -92,7 +93,7 @@ public void generateAndStartConsumerGroup(KafkaProperties.Consumer consumer) { int consumerListenerConcurrency = 1; // Kafka default if no concurrency is set. - if (consumer.isEnableParallelListenerCapabilities() && consumer.getTopicPartitions() > 1) { + if (consumer.getTopicPartitions() > 1) { consumerListenerConcurrency = Math.min(consumer.getTopicPartitions(), MAX_CONCURRENT_LISTENERS_PER_CONSUMER); } @@ -102,6 +103,7 @@ public void generateAndStartConsumerGroup(KafkaProperties.Consumer consumer) { container.start(); existingContainers.add(container); + log.debug( "Created kafka message listener container" + container.metrics().keySet().iterator().next()); @@ -117,9 +119,9 @@ private Object determineMessageListenerForTransactions(KafkaProperties.Consumer /** * A Message listener, where each Consumer container would get the list of Records fetched as part * of poll() to process. The records are then supplied to an Async Task pool so that multiple - * individual Records can be processed in Parallel aynchronously. In case if one of the + * individual Records can be processed in Parallel asynchronously. In case if one of the * tasks/record fails with an Exception, we perform a partial Batch commit, in which the next - * poll() from the server would contain the non committed records of the previous Batch to + * poll() from the server would contain the non-committed records of the previous Batch to * process. * * @return @@ -173,13 +175,11 @@ public void onMessage( private Object getPerRecordAcknowledgingListener() { - return new AcknowledgingMessageListener() { - @Override - public void onMessage(ConsumerRecord message, Acknowledgment acknowledgment) { - transactionConsumer.listen(message); - // Manually ack the single Record - acknowledgment.acknowledge(); - } - }; + return (AcknowledgingMessageListener) + (message, acknowledgment) -> { + transactionConsumer.listen(message); + // Manually ack the single Record + acknowledgment.acknowledge(); + }; } } diff --git a/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java b/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java index 85c54063..bd83ea11 100644 --- a/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java +++ b/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java @@ -2,8 +2,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import hlf.java.rest.client.exception.ErrorCode; -import hlf.java.rest.client.exception.FabricTransactionException; -import hlf.java.rest.client.exception.ServiceException; import hlf.java.rest.client.exception.UnrecognizedTransactionPayloadException; import hlf.java.rest.client.metrics.EmitCustomTransactionListenerMetrics; import hlf.java.rest.client.model.MultiDataTransactionPayload; @@ -169,16 +167,13 @@ public void listen(ConsumerRecord message) { "Inbound transaction format is incorrect or doesn't contain valid parameters."); } - } catch (FabricTransactionException fte) { - log.error("Error in Submitting Transaction - Exception - " + fte.getMessage()); + } catch (Exception exception) { + log.error("Error in Submitting Transaction - Exception - " + exception.getMessage()); /* If the error handler has dead letter publish enabled, the errored Record header will be enriched by extracting the error cause and message from the thrown exception. */ - throw fte; - } catch (Exception ex) { - log.error("Error in Kafka Listener - Message Format exception - " + ex.getMessage()); - throw new ServiceException(ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, ex.getMessage()); + throw exception; } } From 391bf440b8ae7862c1e64b7db483162444697c9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNithin?= Date: Mon, 1 Apr 2024 11:01:05 +0530 Subject: [PATCH 5/5] Incorporate ITs for Listener Tx Processing flows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Nithin --- .../hlf/java/rest/client/IT/KafkaBaseIT.java | 145 ++++++++++ .../IT/PerPartitionListenerIntegrationIT.java | 205 +++++++++++++ .../SingleThreadedListenerIntegrationIT.java | 272 ++++++++++++++++++ .../application-default-consumer.yml | 66 +++++ .../application-per-partition-consumer.yml | 71 +++++ 5 files changed, 759 insertions(+) create mode 100644 src/test/java/hlf/java/rest/client/IT/KafkaBaseIT.java create mode 100644 src/test/java/hlf/java/rest/client/IT/PerPartitionListenerIntegrationIT.java create mode 100644 src/test/java/hlf/java/rest/client/IT/SingleThreadedListenerIntegrationIT.java create mode 100644 src/test/resources/application-default-consumer.yml create mode 100644 src/test/resources/application-per-partition-consumer.yml diff --git a/src/test/java/hlf/java/rest/client/IT/KafkaBaseIT.java b/src/test/java/hlf/java/rest/client/IT/KafkaBaseIT.java new file mode 100644 index 00000000..43f38781 --- /dev/null +++ b/src/test/java/hlf/java/rest/client/IT/KafkaBaseIT.java @@ -0,0 +1,145 @@ +package hlf.java.rest.client.IT; + +import hlf.java.rest.client.util.FabricClientConstants; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +public abstract class KafkaBaseIT { + + private static final String IN_MEMORY_BROKER_ADDRESS = "PLAINTEXT://localhost:9092"; + protected static final String INBOUND_TOPIC_NAME = "test-consumer-inbound-topic"; + protected static final String OUTBOUND_TOPIC_NAME = "test-publisher-event-topic"; + + protected static final String OUTBOUND_DLT_NAME = "test-consumer-dlt"; + + protected static final String DEFAULT_CHANNEL_NAME = "test-channel"; + protected static final String DEFAULT_CONTRACT_NAME = "test-contract"; + protected static final String DEFAULT_FUNCTION_NAME = "test-function"; + + protected static final String DEFAULT_TRANSACTION_BODY = "stringified-transaction-message"; + + private static EmbeddedKafkaBroker embeddedKafkaBroker; + protected static Producer testProducer; + + protected static Consumer testDltConsumer; + + @BeforeAll + public static void setUpClass() { + startEmbeddedKafkaBroker(); + } + + @AfterAll + public static void tearDownClass() { + if (embeddedKafkaBroker != null) { + embeddedKafkaBroker.destroy(); + embeddedKafkaBroker = null; + } + } + + private static void startEmbeddedKafkaBroker() { + if (embeddedKafkaBroker == null) { + embeddedKafkaBroker = + new EmbeddedKafkaBroker(1, false, getDefaultPartitionSize(), getTopicsToBootstrap()) + .brokerProperties(getBrokerProperties()) + .kafkaPorts(9092); + embeddedKafkaBroker.afterPropertiesSet(); + + testProducer = configureProducer(); + testDltConsumer = configureDltConsumer(); + } + } + + private static Producer configureProducer() { + Map producerProps = + new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker)); + return new DefaultKafkaProducerFactory<>(producerProps).createProducer(); + } + + private static Consumer configureDltConsumer() { + Map consumerProps = + new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker)); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "dlt_group"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + Consumer dltConsumer = + new DefaultKafkaConsumerFactory<>(consumerProps).createConsumer(); + + dltConsumer.subscribe(Collections.singleton(OUTBOUND_DLT_NAME)); + + return dltConsumer; + } + + private static Map getBrokerProperties() { + Map brokerProperties = new HashMap<>(); + brokerProperties.put("listeners", IN_MEMORY_BROKER_ADDRESS); + brokerProperties.put("port", "9092"); + return brokerProperties; + } + + protected static String[] getTopicsToBootstrap() { + return new String[] {INBOUND_TOPIC_NAME, OUTBOUND_TOPIC_NAME, OUTBOUND_DLT_NAME}; + } + + protected static int getDefaultPartitionSize() { + return 12; + } + + protected static String getBrokerAddress() { + return IN_MEMORY_BROKER_ADDRESS; + } + + protected void publishValidTransactionToInboundTopic( + String channelName, String contractName, String functionName) { + + ProducerRecord producerRecord = + new ProducerRecord(INBOUND_TOPIC_NAME, "stringified-transaction-message"); + + producerRecord.headers().add(getHeader(FabricClientConstants.CHANNEL_NAME, channelName)); + producerRecord.headers().add(getHeader(FabricClientConstants.CHAINCODE_NAME, contractName)); + producerRecord.headers().add(getHeader(FabricClientConstants.FUNCTION_NAME, functionName)); + + testProducer.send(producerRecord); + } + + private Header getHeader(String headerName, String headerValue) { + return new Header() { + @Override + public String key() { + return headerName; + } + + @Override + public byte[] value() { + return headerValue.getBytes(StandardCharsets.UTF_8); + } + }; + } + + protected long getCurrentCommittedMessageCountForInboundTopic(String groupId) throws Exception { + + long currentOffset = 0; + + for (int i = 0; i < getDefaultPartitionSize(); i++) { + currentOffset += + KafkaTestUtils.getCurrentOffset(getBrokerAddress(), groupId, INBOUND_TOPIC_NAME, i) + .offset(); + } + + return currentOffset; + } +} diff --git a/src/test/java/hlf/java/rest/client/IT/PerPartitionListenerIntegrationIT.java b/src/test/java/hlf/java/rest/client/IT/PerPartitionListenerIntegrationIT.java new file mode 100644 index 00000000..762b22dc --- /dev/null +++ b/src/test/java/hlf/java/rest/client/IT/PerPartitionListenerIntegrationIT.java @@ -0,0 +1,205 @@ +package hlf.java.rest.client.IT; + +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import hlf.java.rest.client.config.KafkaProperties; +import hlf.java.rest.client.listener.DynamicKafkaListener; +import hlf.java.rest.client.service.TransactionFulfillment; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.awaitility.Awaitility; +import org.hyperledger.fabric.gateway.Contract; +import org.hyperledger.fabric.gateway.ContractException; +import org.hyperledger.fabric.gateway.Gateway; +import org.hyperledger.fabric.gateway.Network; +import org.hyperledger.fabric.gateway.Transaction; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.util.ReflectionTestUtils; + +/** + * The 'per-partition-consumer' profile spins up a container listener and a publisher with a + * concurrency limit equal to the number of Partitions or 12 (whichever is lower). + * + *

Note : This test utilises Junit's 'OutputCaptureExtension' to verify the presence of consumer + * threads polling from a Topic. + */ +@Slf4j +@DirtiesContext +@SpringBootTest +@EmbeddedKafka +@ActiveProfiles("per-partition-consumer") +@ExtendWith(OutputCaptureExtension.class) +public class PerPartitionListenerIntegrationIT extends KafkaBaseIT { + + private static final CountDownLatch LATCH = new CountDownLatch(1); + + private String singleListenerConsumerThreadNamePattern = "consumer-{{thread-count}}-C-1"; + + @Autowired private DynamicKafkaListener dynamicKafkaListener; + @Autowired private TransactionFulfillment transactionFulfillment; + @Autowired private KafkaProperties kafkaProperties; + @Autowired private MeterRegistry meterRegistry; + + /** + * Test to validate a scenario where a list of valid incoming transaction was polled and + * dispatched to {@link hlf.java.rest.client.service.impl.TransactionFulfillmentImpl} for + * processing. Each partition is likely to be processed by a consumer thread. After successful + * processing, all the offsets are committed. + */ + @Test + public void testHappyPathMultipleTxConsumption(CapturedOutput capturedLogOutput) + throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + Transaction mockTransaction = setupMockNetworkInvocation(Optional.empty()); + + // When + + // Publish multiple valid Transactions for consumption + for (int publishCount = 0; publishCount < 50; publishCount++) { + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME); + } + + // Reducing the timeout since we have dedicated consumer threads per partition + Awaitility.await() + .pollDelay(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // Verify that all the published messages were polled and processed + Assertions.assertEquals(50, (endingOffset - startingOffset)); + + // There were no retries. Therefore, Transaction invocation was performed for all inbound + // messages. + verify(mockTransaction, times(50)).submit(Mockito.any()); + + ConcurrentMessageListenerContainer currentContainer = existingContainers.get(0); + int containerConcurrencyLevel = currentContainer.getConcurrency(); + + for (int threadCount = 0; threadCount < containerConcurrencyLevel; threadCount++) { + String consumerThreadId = + singleListenerConsumerThreadNamePattern.replace( + "{{thread-count}}", String.valueOf(threadCount)); + // Does the logs talk about consumer thread processing the messages? + Assertions.assertTrue(capturedLogOutput.getOut().contains(consumerThreadId)); + } + } + + @Test + public void testDLTBehaviourForFailedTxs(CapturedOutput capturedLogOutput) throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // TimeoutException is a retryable Exception + Transaction mockTransaction = setupMockNetworkInvocation(Optional.of(new TimeoutException())); + + // When + + Counter invalidTxMessagesCounter = + meterRegistry.find("transaction.messages.process.failures").counter(); + long initialFailures = (long) invalidTxMessagesCounter.count(); + + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME); + + Awaitility.await() + .pollDelay(7, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + + /* + Transaction errors are tracked via Metrics. Validate whether the metrics are captured and + since the error is transient in nature, it will be retried. + */ + long currentFailures = (long) invalidTxMessagesCounter.count(); + Assertions.assertTrue((currentFailures - initialFailures) > 5); + + // Initial attempts + Retried attempts. + verify(mockTransaction, atLeast(5)).submit(Mockito.any()); + + // After retries, the offset is committed + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + Assertions.assertEquals(1, (endingOffset - startingOffset)); + + // Fetch Latest DLT Record + ConsumerRecord dltRecord = + KafkaTestUtils.getSingleRecord(testDltConsumer, OUTBOUND_DLT_NAME); + + Assertions.assertNotNull(dltRecord); + Assertions.assertNotNull(dltRecord.headers()); + + // Verify if the DLT Contains the original, failed Tx msg + Assertions.assertEquals(DEFAULT_TRANSACTION_BODY, dltRecord.value()); + } + + @NotNull + private Transaction setupMockNetworkInvocation(Optional optionalTxSubmitException) + throws ContractException, TimeoutException, InterruptedException { + Gateway gatewayMock = Mockito.mock(Gateway.class); + ReflectionTestUtils.setField(transactionFulfillment, "gateway", gatewayMock); + + Network mockNetwork = Mockito.mock(Network.class); + Contract mockContract = Mockito.mock(Contract.class); + Transaction mockTransaction = Mockito.mock(Transaction.class); + Mockito.when(gatewayMock.getNetwork(DEFAULT_CHANNEL_NAME)).thenReturn(mockNetwork); + Mockito.when(mockNetwork.getContract(DEFAULT_CONTRACT_NAME)).thenReturn(mockContract); + Mockito.when(mockContract.createTransaction(DEFAULT_FUNCTION_NAME)).thenReturn(mockTransaction); + + if (optionalTxSubmitException.isPresent()) { + doThrow(optionalTxSubmitException.get()).when(mockTransaction).submit(Mockito.any()); + } else { + Mockito.when(mockTransaction.submit(Mockito.any())) + .thenReturn(UUID.randomUUID().toString().getBytes()); + } + return mockTransaction; + } +} diff --git a/src/test/java/hlf/java/rest/client/IT/SingleThreadedListenerIntegrationIT.java b/src/test/java/hlf/java/rest/client/IT/SingleThreadedListenerIntegrationIT.java new file mode 100644 index 00000000..3e7a2035 --- /dev/null +++ b/src/test/java/hlf/java/rest/client/IT/SingleThreadedListenerIntegrationIT.java @@ -0,0 +1,272 @@ +package hlf.java.rest.client.IT; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import hlf.java.rest.client.config.KafkaProperties; +import hlf.java.rest.client.listener.DynamicKafkaListener; +import hlf.java.rest.client.service.TransactionFulfillment; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.awaitility.Awaitility; +import org.hyperledger.fabric.gateway.Contract; +import org.hyperledger.fabric.gateway.ContractException; +import org.hyperledger.fabric.gateway.Gateway; +import org.hyperledger.fabric.gateway.Network; +import org.hyperledger.fabric.gateway.Transaction; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.util.ReflectionTestUtils; + +/** + * The 'default-consumer' profile spins up a single container listener and a publisher with a + * concurrency limit of '1'. This is the default configuration for the Connector unless the + * partition-count and parallel-listener capabilities are adjusted accordingly. + */ +@Slf4j +@DirtiesContext +@SpringBootTest +@EmbeddedKafka +@ActiveProfiles("default-consumer") +public class SingleThreadedListenerIntegrationIT extends KafkaBaseIT { + + private static final CountDownLatch LATCH = new CountDownLatch(1); + + @Autowired private DynamicKafkaListener dynamicKafkaListener; + @Autowired private TransactionFulfillment transactionFulfillment; + @Autowired private KafkaProperties kafkaProperties; + @Autowired private MeterRegistry meterRegistry; + + /** + * This is a simple test to validate a scenario where a single, valid incoming transaction was + * polled and dispatched to {@link hlf.java.rest.client.service.impl.TransactionFulfillmentImpl} + * for processing. After successful processing, the record offset is committed. + */ + @Test + public void testHappyPathTxConsumption() throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + Transaction mockTransaction = setupMockNetworkInvocation(Optional.empty()); + + // When + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME); + + Awaitility.await() + .pollDelay(1, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // Exactly one message was polled and processed + Assertions.assertEquals(1, (endingOffset - startingOffset)); + + // There were no retries. Therefore, Transaction invocation was performed only once. + verify(mockTransaction, times(1)).submit(Mockito.any()); + } + + /** + * Test to validate a scenario where a list of valid incoming transaction was polled and + * dispatched to {@link hlf.java.rest.client.service.impl.TransactionFulfillmentImpl} for + * processing. After successful processing, all the offsets are committed. + */ + @Test + public void testHappyPathMultipleTxConsumption() throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + Transaction mockTransaction = setupMockNetworkInvocation(Optional.empty()); + + // When + + // Publish multiple valid Transactions for consumption + for (int publishCount = 0; publishCount < 50; publishCount++) { + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME); + } + + Awaitility.await() + .pollDelay(2, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // Verify that all the published messages were polled and processed + Assertions.assertEquals(50, (endingOffset - startingOffset)); + + // There were no retries. Therefore, Transaction invocation was performed for all inbound + // messages. + verify(mockTransaction, times(50)).submit(Mockito.any()); + } + + /** + * This test validates a scenario where the inbound message does not conform to the expected + * transaction payload format. It checks whether the transaction is rejected by tracking it via + * the metrics registry. Due to the non-transient nature of the error, it also verifies that the + * transaction is not retried. * + */ + @Test + public void testConsumerBehaviourOnInvalidTxPayload() throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + Transaction mockTransaction = setupMockNetworkInvocation(Optional.empty()); + + // When + + // Passing 'functionName' as null since we want this payload be invalid. + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, StringUtils.EMPTY); + + Awaitility.await() + .pollDelay(5, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // Exactly one message was polled and processed + Assertions.assertEquals(1, (endingOffset - startingOffset)); + + /* + Invalid Transaction Payload errors are tracked via Metrics. Validate whether the metrics are captured. + Since Invalid Payload errors are non-transient in nature, it won't be retried. + */ + Counter invalidTxMessagesCounter = + meterRegistry.find("transaction.messages.unrecognized.failures").counter(); + Assertions.assertNotNull(invalidTxMessagesCounter); + // This check is important! + Assertions.assertEquals(1, invalidTxMessagesCounter.count()); + + // Since the payload was invalid, transactionFulfillment service won't be invoked + verify(mockTransaction, times(0)).submit(Mockito.any()); + } + + /** + * This test validates the Listener's retry capability. It ensures that a transaction which + * encounters a transient exception is retried for the configured number of attempts and verifies + * that metrics are tracked. * + */ + @Test + public void testRetryBehaviourOnNetworkTransientErrors() throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // TimeoutException is a retryable Exception + Transaction mockTransaction = setupMockNetworkInvocation(Optional.of(new TimeoutException())); + + // When + + Counter invalidTxMessagesCounter = + meterRegistry.find("transaction.messages.process.failures").counter(); + long initialFailures = (long) invalidTxMessagesCounter.count(); + + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME); + + Awaitility.await() + .pollDelay(7, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + + /* + Transaction errors are tracked via Metrics. Validate whether the metrics are captured and + since the error is transient in nature, it will be retried. + */ + long currentFailures = (long) invalidTxMessagesCounter.count(); + Assertions.assertTrue((currentFailures - initialFailures) > 5); + + // Initial attempts + Retried attempts. + // At the time of writing this test, retry count is statically configured as 5 for Retryable + // exceptions + // Expected invocations will then be 6 + verify(mockTransaction, times(6)).submit(Mockito.any()); + + // After retries, the offset is committed + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + Assertions.assertEquals(1, (endingOffset - startingOffset)); + } + + @NotNull + private Transaction setupMockNetworkInvocation(Optional optionalTxSubmitException) + throws ContractException, TimeoutException, InterruptedException { + Gateway gatewayMock = Mockito.mock(Gateway.class); + ReflectionTestUtils.setField(transactionFulfillment, "gateway", gatewayMock); + + Network mockNetwork = Mockito.mock(Network.class); + Contract mockContract = Mockito.mock(Contract.class); + Transaction mockTransaction = Mockito.mock(Transaction.class); + Mockito.when(gatewayMock.getNetwork(DEFAULT_CHANNEL_NAME)).thenReturn(mockNetwork); + Mockito.when(mockNetwork.getContract(DEFAULT_CONTRACT_NAME)).thenReturn(mockContract); + Mockito.when(mockContract.createTransaction(DEFAULT_FUNCTION_NAME)).thenReturn(mockTransaction); + + if (optionalTxSubmitException.isPresent()) { + doThrow(optionalTxSubmitException.get()).when(mockTransaction).submit(Mockito.any()); + } else { + Mockito.when(mockTransaction.submit(Mockito.any())) + .thenReturn(UUID.randomUUID().toString().getBytes()); + } + return mockTransaction; + } +} diff --git a/src/test/resources/application-default-consumer.yml b/src/test/resources/application-default-consumer.yml new file mode 100644 index 00000000..3f6b0381 --- /dev/null +++ b/src/test/resources/application-default-consumer.yml @@ -0,0 +1,66 @@ +spring: + devtools.add-properties: false +fabric: + localhostReportAddress: false + wallet: + path: src/test/resources/testwallet + adminUser: + name: admin + clientUser: + name: clientUser + orgConnectionConfig: + path: src/test/resources/testconfig + filename: connection.yml + events: + enable: true + chaincode: [] + chaincodeDetails: [] + block: [] + client: + rest: + apikey: 6uoIAnR +kafka: + integration-points: + - groupId: test-group-publisher + brokerHost: localhost:9092 + topic: test-consumer-inbound-topic + config-id: 892019 + ssl-enabled: false + event-listener: + ssl-enabled: false + brokerHost: localhost:9092 + topic: test-publisher-event-topic +management: + endpoints: + web: + exposure: + include: "prometheus,metrics,health,refresh" + endpoint: + health: + group: + db: + include: db + show-components: always + metrics: + enable: + tomcat: true + jvm: true + process: true + hikaricp: true + system: true + jdbc: true + logback: true + distribution: + percentiles-histogram: + http.server.requests: true + maximum-expected-value: + http.server.requests: 1s + custom: + kafka: + enabled: true +server: + config: + location: /Users/n0p0328/work/hlf-connector/src/main/resources/config/ + servlet: + context-path: /hlf-rest-client + port: 8282 diff --git a/src/test/resources/application-per-partition-consumer.yml b/src/test/resources/application-per-partition-consumer.yml new file mode 100644 index 00000000..8fa5cd72 --- /dev/null +++ b/src/test/resources/application-per-partition-consumer.yml @@ -0,0 +1,71 @@ +spring: + devtools.add-properties: false +fabric: + localhostReportAddress: false + wallet: + path: src/test/resources/testwallet + adminUser: + name: admin + clientUser: + name: clientUser + orgConnectionConfig: + path: src/test/resources/testconfig + filename: connection.yml + events: + enable: true + chaincode: [] + chaincodeDetails: [] + block: [] + client: + rest: + apikey: 6uoIAnR +kafka: + integration-points: + - groupId: test-group-publisher + brokerHost: localhost:9092 + topic: test-consumer-inbound-topic + config-id: 892019 + ssl-enabled: false + topicPartitions: 12 + event-listener: + ssl-enabled: false + brokerHost: localhost:9092 + topic: test-publisher-event-topic + failed-message-listener: + brokerHost: localhost:9092 + topic: test-consumer-dlt + ssl-enabled: false +management: + endpoints: + web: + exposure: + include: "prometheus,metrics,health,refresh" + endpoint: + health: + group: + db: + include: db + show-components: always + metrics: + enable: + tomcat: true + jvm: true + process: true + hikaricp: true + system: true + jdbc: true + logback: true + distribution: + percentiles-histogram: + http.server.requests: true + maximum-expected-value: + http.server.requests: 1s + custom: + kafka: + enabled: true +server: + config: + location: /Users/n0p0328/work/hlf-connector/src/main/resources/config/ + servlet: + context-path: /hlf-rest-client + port: 8282