diff --git a/src/test/java/hlf/java/rest/client/IT/KafkaBaseIT.java b/src/test/java/hlf/java/rest/client/IT/KafkaBaseIT.java new file mode 100644 index 0000000..1acea06 --- /dev/null +++ b/src/test/java/hlf/java/rest/client/IT/KafkaBaseIT.java @@ -0,0 +1,118 @@ +package hlf.java.rest.client.IT; + +import hlf.java.rest.client.util.FabricClientConstants; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +public abstract class KafkaBaseIT { + + private static final String IN_MEMORY_BROKER_ADDRESS = "PLAINTEXT://localhost:9092"; + private static final String INBOUND_TOPIC_NAME = "test-consumer-inbound-topic"; + private static final String OUTBOUND_TOPIC_NAME = "test-publisher-event-topic"; + + protected static final String DEFAULT_CHANNEL_NAME = "test-channel"; + protected static final String DEFAULT_CONTRACT_NAME = "test-contract"; + protected static final String DEFAULT_FUNCTION_NAME = "test-function"; + + private static EmbeddedKafkaBroker embeddedKafkaBroker; + protected static Producer testProducer; + + @BeforeAll + public static void setUpClass() { + startEmbeddedKafkaBroker(); + } + + @AfterAll + public static void tearDownClass() { + if (embeddedKafkaBroker != null) { + embeddedKafkaBroker.destroy(); + embeddedKafkaBroker = null; + } + } + + private static void startEmbeddedKafkaBroker() { + if (embeddedKafkaBroker == null) { + embeddedKafkaBroker = + new EmbeddedKafkaBroker(1, false, getDefaultPartitionSize(), getTopicsToBootstrap()) + .brokerProperties(getBrokerProperties()) + .kafkaPorts(9092); + embeddedKafkaBroker.afterPropertiesSet(); + + testProducer = configureProducer(); + } + } + + private static Producer configureProducer() { + Map producerProps = + new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker)); + return new DefaultKafkaProducerFactory<>(producerProps).createProducer(); + } + + private static Map getBrokerProperties() { + Map brokerProperties = new HashMap<>(); + brokerProperties.put("listeners", IN_MEMORY_BROKER_ADDRESS); + brokerProperties.put("port", "9092"); + return brokerProperties; + } + + protected static String[] getTopicsToBootstrap() { + return new String[] {INBOUND_TOPIC_NAME, OUTBOUND_TOPIC_NAME}; + } + + protected static int getDefaultPartitionSize() { + return 12; + } + + protected static String getBrokerAddress() { + return IN_MEMORY_BROKER_ADDRESS; + } + + protected void publishValidTransactionToInboundTopic( + String channelName, String contractName, String functionName) { + + ProducerRecord producerRecord = + new ProducerRecord(INBOUND_TOPIC_NAME, "stringified-transaction-message"); + + producerRecord.headers().add(getHeader(FabricClientConstants.CHANNEL_NAME, channelName)); + producerRecord.headers().add(getHeader(FabricClientConstants.CHAINCODE_NAME, contractName)); + producerRecord.headers().add(getHeader(FabricClientConstants.FUNCTION_NAME, functionName)); + + testProducer.send(producerRecord); + } + + private Header getHeader(String headerName, String headerValue) { + return new Header() { + @Override + public String key() { + return headerName; + } + + @Override + public byte[] value() { + return headerValue.getBytes(StandardCharsets.UTF_8); + } + }; + } + + protected long getCurrentCommittedMessageCountForInboundTopic(String groupId) throws Exception { + + long currentOffset = 0; + + for (int i = 0; i < getDefaultPartitionSize(); i++) { + currentOffset += + KafkaTestUtils.getCurrentOffset(getBrokerAddress(), groupId, INBOUND_TOPIC_NAME, i) + .offset(); + } + + return currentOffset; + } +} diff --git a/src/test/java/hlf/java/rest/client/IT/PerPartitionListenerIntegrationIT.java b/src/test/java/hlf/java/rest/client/IT/PerPartitionListenerIntegrationIT.java new file mode 100644 index 0000000..0dd8fdf --- /dev/null +++ b/src/test/java/hlf/java/rest/client/IT/PerPartitionListenerIntegrationIT.java @@ -0,0 +1,143 @@ +package hlf.java.rest.client.IT; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import hlf.java.rest.client.config.KafkaProperties; +import hlf.java.rest.client.listener.DynamicKafkaListener; +import hlf.java.rest.client.service.TransactionFulfillment; +import io.micrometer.core.instrument.MeterRegistry; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.hyperledger.fabric.gateway.Contract; +import org.hyperledger.fabric.gateway.ContractException; +import org.hyperledger.fabric.gateway.Gateway; +import org.hyperledger.fabric.gateway.Network; +import org.hyperledger.fabric.gateway.Transaction; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.util.ReflectionTestUtils; + +/** + * The 'per-partition-consumer' profile spins up a container listener and a publisher with a + * concurrency limit equal to the number of Partitions or 12 (whichever is lower). + * + *

Note : This test utilises Junit's 'OutputCaptureExtension' to verify the presence of consumer + * threads polling from a Topic. + */ +@Slf4j +@DirtiesContext +@SpringBootTest +@EmbeddedKafka +@ActiveProfiles("per-partition-consumer") +@ExtendWith(OutputCaptureExtension.class) +public class PerPartitionListenerIntegrationIT extends KafkaBaseIT { + + private static final CountDownLatch LATCH = new CountDownLatch(1); + + private String singleListenerConsumerThreadNamePattern = "consumer-{{thread-count}}-C-1"; + + @Autowired private DynamicKafkaListener dynamicKafkaListener; + @Autowired private TransactionFulfillment transactionFulfillment; + @Autowired private KafkaProperties kafkaProperties; + @Autowired private MeterRegistry meterRegistry; + + /** + * Test to validate a scenario where a list of valid incoming transaction was polled and + * dispatched to {@link hlf.java.rest.client.service.impl.TransactionFulfillmentImpl} for + * processing. Each partition is likely to be processed by a consumer thread. After successful + * processing, all the offsets are committed. + */ + @Test + public void testHappyPathMultipleTxConsumption(CapturedOutput capturedLogOutput) + throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + Transaction mockTransaction = setupMockNetworkInvocation(Optional.empty()); + + // When + + // Publish multiple valid Transactions for consumption + for (int publishCount = 0; publishCount < 50; publishCount++) { + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME); + } + + // Reducing the timeout since we have dedicated consumer threads per partition + Awaitility.await() + .pollDelay(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // Verify that all the published messages were polled and processed + Assertions.assertEquals(50, (endingOffset - startingOffset)); + + // There were no retries. Therefore, Transaction invocation was performed for all inbound + // messages. + verify(mockTransaction, times(50)).submit(Mockito.any()); + + ConcurrentMessageListenerContainer currentContainer = existingContainers.get(0); + int containerConcurrencyLevel = currentContainer.getConcurrency(); + + for (int threadCount = 0; threadCount < containerConcurrencyLevel; threadCount++) { + String consumerThreadId = + singleListenerConsumerThreadNamePattern.replace( + "{{thread-count}}", String.valueOf(threadCount)); + // Does the logs talk about consumer thread processing the messages? + Assertions.assertTrue(capturedLogOutput.getOut().contains(consumerThreadId)); + } + } + + @NotNull + private Transaction setupMockNetworkInvocation(Optional optionalTxSubmitException) + throws ContractException, TimeoutException, InterruptedException { + Gateway gatewayMock = Mockito.mock(Gateway.class); + ReflectionTestUtils.setField(transactionFulfillment, "gateway", gatewayMock); + + Network mockNetwork = Mockito.mock(Network.class); + Contract mockContract = Mockito.mock(Contract.class); + Transaction mockTransaction = Mockito.mock(Transaction.class); + Mockito.when(gatewayMock.getNetwork(DEFAULT_CHANNEL_NAME)).thenReturn(mockNetwork); + Mockito.when(mockNetwork.getContract(DEFAULT_CONTRACT_NAME)).thenReturn(mockContract); + Mockito.when(mockContract.createTransaction(DEFAULT_FUNCTION_NAME)).thenReturn(mockTransaction); + + if (optionalTxSubmitException.isPresent()) { + doThrow(optionalTxSubmitException.get()).when(mockTransaction).submit(Mockito.any()); + } else { + Mockito.when(mockTransaction.submit(Mockito.any())) + .thenReturn(UUID.randomUUID().toString().getBytes()); + } + return mockTransaction; + } +} diff --git a/src/test/java/hlf/java/rest/client/IT/SingleThreadedListenerIntegrationIT.java b/src/test/java/hlf/java/rest/client/IT/SingleThreadedListenerIntegrationIT.java new file mode 100644 index 0000000..3e7a203 --- /dev/null +++ b/src/test/java/hlf/java/rest/client/IT/SingleThreadedListenerIntegrationIT.java @@ -0,0 +1,272 @@ +package hlf.java.rest.client.IT; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import hlf.java.rest.client.config.KafkaProperties; +import hlf.java.rest.client.listener.DynamicKafkaListener; +import hlf.java.rest.client.service.TransactionFulfillment; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.awaitility.Awaitility; +import org.hyperledger.fabric.gateway.Contract; +import org.hyperledger.fabric.gateway.ContractException; +import org.hyperledger.fabric.gateway.Gateway; +import org.hyperledger.fabric.gateway.Network; +import org.hyperledger.fabric.gateway.Transaction; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.util.ReflectionTestUtils; + +/** + * The 'default-consumer' profile spins up a single container listener and a publisher with a + * concurrency limit of '1'. This is the default configuration for the Connector unless the + * partition-count and parallel-listener capabilities are adjusted accordingly. + */ +@Slf4j +@DirtiesContext +@SpringBootTest +@EmbeddedKafka +@ActiveProfiles("default-consumer") +public class SingleThreadedListenerIntegrationIT extends KafkaBaseIT { + + private static final CountDownLatch LATCH = new CountDownLatch(1); + + @Autowired private DynamicKafkaListener dynamicKafkaListener; + @Autowired private TransactionFulfillment transactionFulfillment; + @Autowired private KafkaProperties kafkaProperties; + @Autowired private MeterRegistry meterRegistry; + + /** + * This is a simple test to validate a scenario where a single, valid incoming transaction was + * polled and dispatched to {@link hlf.java.rest.client.service.impl.TransactionFulfillmentImpl} + * for processing. After successful processing, the record offset is committed. + */ + @Test + public void testHappyPathTxConsumption() throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + Transaction mockTransaction = setupMockNetworkInvocation(Optional.empty()); + + // When + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME); + + Awaitility.await() + .pollDelay(1, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // Exactly one message was polled and processed + Assertions.assertEquals(1, (endingOffset - startingOffset)); + + // There were no retries. Therefore, Transaction invocation was performed only once. + verify(mockTransaction, times(1)).submit(Mockito.any()); + } + + /** + * Test to validate a scenario where a list of valid incoming transaction was polled and + * dispatched to {@link hlf.java.rest.client.service.impl.TransactionFulfillmentImpl} for + * processing. After successful processing, all the offsets are committed. + */ + @Test + public void testHappyPathMultipleTxConsumption() throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + Transaction mockTransaction = setupMockNetworkInvocation(Optional.empty()); + + // When + + // Publish multiple valid Transactions for consumption + for (int publishCount = 0; publishCount < 50; publishCount++) { + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME); + } + + Awaitility.await() + .pollDelay(2, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // Verify that all the published messages were polled and processed + Assertions.assertEquals(50, (endingOffset - startingOffset)); + + // There were no retries. Therefore, Transaction invocation was performed for all inbound + // messages. + verify(mockTransaction, times(50)).submit(Mockito.any()); + } + + /** + * This test validates a scenario where the inbound message does not conform to the expected + * transaction payload format. It checks whether the transaction is rejected by tracking it via + * the metrics registry. Due to the non-transient nature of the error, it also verifies that the + * transaction is not retried. * + */ + @Test + public void testConsumerBehaviourOnInvalidTxPayload() throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + Transaction mockTransaction = setupMockNetworkInvocation(Optional.empty()); + + // When + + // Passing 'functionName' as null since we want this payload be invalid. + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, StringUtils.EMPTY); + + Awaitility.await() + .pollDelay(5, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // Exactly one message was polled and processed + Assertions.assertEquals(1, (endingOffset - startingOffset)); + + /* + Invalid Transaction Payload errors are tracked via Metrics. Validate whether the metrics are captured. + Since Invalid Payload errors are non-transient in nature, it won't be retried. + */ + Counter invalidTxMessagesCounter = + meterRegistry.find("transaction.messages.unrecognized.failures").counter(); + Assertions.assertNotNull(invalidTxMessagesCounter); + // This check is important! + Assertions.assertEquals(1, invalidTxMessagesCounter.count()); + + // Since the payload was invalid, transactionFulfillment service won't be invoked + verify(mockTransaction, times(0)).submit(Mockito.any()); + } + + /** + * This test validates the Listener's retry capability. It ensures that a transaction which + * encounters a transient exception is retried for the configured number of attempts and verifies + * that metrics are tracked. * + */ + @Test + public void testRetryBehaviourOnNetworkTransientErrors() throws Exception { + + // Given + List existingContainers = + dynamicKafkaListener.getExistingContainers(); + Assertions.assertEquals( + 1, existingContainers.size()); // There should only be one Container at this point. + + KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0); + + long startingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + + // TimeoutException is a retryable Exception + Transaction mockTransaction = setupMockNetworkInvocation(Optional.of(new TimeoutException())); + + // When + + Counter invalidTxMessagesCounter = + meterRegistry.find("transaction.messages.process.failures").counter(); + long initialFailures = (long) invalidTxMessagesCounter.count(); + + publishValidTransactionToInboundTopic( + DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME); + + Awaitility.await() + .pollDelay(7, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertTrue(true)); + + // Then + + /* + Transaction errors are tracked via Metrics. Validate whether the metrics are captured and + since the error is transient in nature, it will be retried. + */ + long currentFailures = (long) invalidTxMessagesCounter.count(); + Assertions.assertTrue((currentFailures - initialFailures) > 5); + + // Initial attempts + Retried attempts. + // At the time of writing this test, retry count is statically configured as 5 for Retryable + // exceptions + // Expected invocations will then be 6 + verify(mockTransaction, times(6)).submit(Mockito.any()); + + // After retries, the offset is committed + long endingOffset = + getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId()); + Assertions.assertEquals(1, (endingOffset - startingOffset)); + } + + @NotNull + private Transaction setupMockNetworkInvocation(Optional optionalTxSubmitException) + throws ContractException, TimeoutException, InterruptedException { + Gateway gatewayMock = Mockito.mock(Gateway.class); + ReflectionTestUtils.setField(transactionFulfillment, "gateway", gatewayMock); + + Network mockNetwork = Mockito.mock(Network.class); + Contract mockContract = Mockito.mock(Contract.class); + Transaction mockTransaction = Mockito.mock(Transaction.class); + Mockito.when(gatewayMock.getNetwork(DEFAULT_CHANNEL_NAME)).thenReturn(mockNetwork); + Mockito.when(mockNetwork.getContract(DEFAULT_CONTRACT_NAME)).thenReturn(mockContract); + Mockito.when(mockContract.createTransaction(DEFAULT_FUNCTION_NAME)).thenReturn(mockTransaction); + + if (optionalTxSubmitException.isPresent()) { + doThrow(optionalTxSubmitException.get()).when(mockTransaction).submit(Mockito.any()); + } else { + Mockito.when(mockTransaction.submit(Mockito.any())) + .thenReturn(UUID.randomUUID().toString().getBytes()); + } + return mockTransaction; + } +} diff --git a/src/test/resources/application-default-consumer.yml b/src/test/resources/application-default-consumer.yml new file mode 100644 index 0000000..3f6b038 --- /dev/null +++ b/src/test/resources/application-default-consumer.yml @@ -0,0 +1,66 @@ +spring: + devtools.add-properties: false +fabric: + localhostReportAddress: false + wallet: + path: src/test/resources/testwallet + adminUser: + name: admin + clientUser: + name: clientUser + orgConnectionConfig: + path: src/test/resources/testconfig + filename: connection.yml + events: + enable: true + chaincode: [] + chaincodeDetails: [] + block: [] + client: + rest: + apikey: 6uoIAnR +kafka: + integration-points: + - groupId: test-group-publisher + brokerHost: localhost:9092 + topic: test-consumer-inbound-topic + config-id: 892019 + ssl-enabled: false + event-listener: + ssl-enabled: false + brokerHost: localhost:9092 + topic: test-publisher-event-topic +management: + endpoints: + web: + exposure: + include: "prometheus,metrics,health,refresh" + endpoint: + health: + group: + db: + include: db + show-components: always + metrics: + enable: + tomcat: true + jvm: true + process: true + hikaricp: true + system: true + jdbc: true + logback: true + distribution: + percentiles-histogram: + http.server.requests: true + maximum-expected-value: + http.server.requests: 1s + custom: + kafka: + enabled: true +server: + config: + location: /Users/n0p0328/work/hlf-connector/src/main/resources/config/ + servlet: + context-path: /hlf-rest-client + port: 8282 diff --git a/src/test/resources/application-per-partition-consumer.yml b/src/test/resources/application-per-partition-consumer.yml new file mode 100644 index 0000000..d57d474 --- /dev/null +++ b/src/test/resources/application-per-partition-consumer.yml @@ -0,0 +1,67 @@ +spring: + devtools.add-properties: false +fabric: + localhostReportAddress: false + wallet: + path: src/test/resources/testwallet + adminUser: + name: admin + clientUser: + name: clientUser + orgConnectionConfig: + path: src/test/resources/testconfig + filename: connection.yml + events: + enable: true + chaincode: [] + chaincodeDetails: [] + block: [] + client: + rest: + apikey: 6uoIAnR +kafka: + integration-points: + - groupId: test-group-publisher + brokerHost: localhost:9092 + topic: test-consumer-inbound-topic + config-id: 892019 + ssl-enabled: false + topicPartitions: 12 + event-listener: + ssl-enabled: false + brokerHost: localhost:9092 + topic: test-publisher-event-topic +management: + endpoints: + web: + exposure: + include: "prometheus,metrics,health,refresh" + endpoint: + health: + group: + db: + include: db + show-components: always + metrics: + enable: + tomcat: true + jvm: true + process: true + hikaricp: true + system: true + jdbc: true + logback: true + distribution: + percentiles-histogram: + http.server.requests: true + maximum-expected-value: + http.server.requests: 1s + custom: + kafka: + enabled: true +server: + config: + location: /Users/n0p0328/work/hlf-connector/src/main/resources/config/ + servlet: + context-path: /hlf-rest-client + port: 8282