Skip to content

Commit

Permalink
Incorporate ITs for Listener Tx Processing flows
Browse files Browse the repository at this point in the history
Signed-off-by: “Nithin <nithin.pankaj@walmartlabs.com>
  • Loading branch information
“Nithin committed Apr 1, 2024
1 parent 98c92e4 commit 0495cff
Show file tree
Hide file tree
Showing 5 changed files with 666 additions and 0 deletions.
118 changes: 118 additions & 0 deletions src/test/java/hlf/java/rest/client/IT/KafkaBaseIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package hlf.java.rest.client.IT;

import hlf.java.rest.client.util.FabricClientConstants;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;

public abstract class KafkaBaseIT {

private static final String IN_MEMORY_BROKER_ADDRESS = "PLAINTEXT://localhost:9092";
private static final String INBOUND_TOPIC_NAME = "test-consumer-inbound-topic";
private static final String OUTBOUND_TOPIC_NAME = "test-publisher-event-topic";

protected static final String DEFAULT_CHANNEL_NAME = "test-channel";
protected static final String DEFAULT_CONTRACT_NAME = "test-contract";
protected static final String DEFAULT_FUNCTION_NAME = "test-function";

private static EmbeddedKafkaBroker embeddedKafkaBroker;
protected static Producer<Object, Object> testProducer;

@BeforeAll
public static void setUpClass() {
startEmbeddedKafkaBroker();
}

@AfterAll
public static void tearDownClass() {
if (embeddedKafkaBroker != null) {
embeddedKafkaBroker.destroy();
embeddedKafkaBroker = null;
}
}

private static void startEmbeddedKafkaBroker() {
if (embeddedKafkaBroker == null) {
embeddedKafkaBroker =
new EmbeddedKafkaBroker(1, false, getDefaultPartitionSize(), getTopicsToBootstrap())
.brokerProperties(getBrokerProperties())
.kafkaPorts(9092);
embeddedKafkaBroker.afterPropertiesSet();

testProducer = configureProducer();
}
}

private static Producer<Object, Object> configureProducer() {
Map<String, Object> producerProps =
new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<>(producerProps).createProducer();
}

private static Map<String, String> getBrokerProperties() {
Map<String, String> brokerProperties = new HashMap<>();
brokerProperties.put("listeners", IN_MEMORY_BROKER_ADDRESS);
brokerProperties.put("port", "9092");
return brokerProperties;
}

protected static String[] getTopicsToBootstrap() {
return new String[] {INBOUND_TOPIC_NAME, OUTBOUND_TOPIC_NAME};
}

protected static int getDefaultPartitionSize() {
return 12;
}

protected static String getBrokerAddress() {
return IN_MEMORY_BROKER_ADDRESS;
}

protected void publishValidTransactionToInboundTopic(
String channelName, String contractName, String functionName) {

ProducerRecord<Object, Object> producerRecord =
new ProducerRecord<Object, Object>(INBOUND_TOPIC_NAME, "stringified-transaction-message");

producerRecord.headers().add(getHeader(FabricClientConstants.CHANNEL_NAME, channelName));
producerRecord.headers().add(getHeader(FabricClientConstants.CHAINCODE_NAME, contractName));
producerRecord.headers().add(getHeader(FabricClientConstants.FUNCTION_NAME, functionName));

testProducer.send(producerRecord);
}

private Header getHeader(String headerName, String headerValue) {
return new Header() {
@Override
public String key() {
return headerName;
}

@Override
public byte[] value() {
return headerValue.getBytes(StandardCharsets.UTF_8);
}
};
}

protected long getCurrentCommittedMessageCountForInboundTopic(String groupId) throws Exception {

long currentOffset = 0;

for (int i = 0; i < getDefaultPartitionSize(); i++) {
currentOffset +=
KafkaTestUtils.getCurrentOffset(getBrokerAddress(), groupId, INBOUND_TOPIC_NAME, i)
.offset();
}

return currentOffset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package hlf.java.rest.client.IT;

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import hlf.java.rest.client.config.KafkaProperties;
import hlf.java.rest.client.listener.DynamicKafkaListener;
import hlf.java.rest.client.service.TransactionFulfillment;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.hyperledger.fabric.gateway.Contract;
import org.hyperledger.fabric.gateway.ContractException;
import org.hyperledger.fabric.gateway.Gateway;
import org.hyperledger.fabric.gateway.Network;
import org.hyperledger.fabric.gateway.Transaction;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.util.ReflectionTestUtils;

/**
* The 'per-partition-consumer' profile spins up a container listener and a publisher with a
* concurrency limit equal to the number of Partitions or 12 (whichever is lower).
*
* <p>Note : This test utilises Junit's 'OutputCaptureExtension' to verify the presence of consumer
* threads polling from a Topic.
*/
@Slf4j
@DirtiesContext
@SpringBootTest
@EmbeddedKafka
@ActiveProfiles("per-partition-consumer")
@ExtendWith(OutputCaptureExtension.class)
public class PerPartitionListenerIntegrationIT extends KafkaBaseIT {

private static final CountDownLatch LATCH = new CountDownLatch(1);

private String singleListenerConsumerThreadNamePattern = "consumer-{{thread-count}}-C-1";

@Autowired private DynamicKafkaListener dynamicKafkaListener;
@Autowired private TransactionFulfillment transactionFulfillment;
@Autowired private KafkaProperties kafkaProperties;
@Autowired private MeterRegistry meterRegistry;

/**
* Test to validate a scenario where a list of valid incoming transaction was polled and
* dispatched to {@link hlf.java.rest.client.service.impl.TransactionFulfillmentImpl} for
* processing. Each partition is likely to be processed by a consumer thread. After successful
* processing, all the offsets are committed.
*/
@Test
public void testHappyPathMultipleTxConsumption(CapturedOutput capturedLogOutput)
throws Exception {

// Given
List<ConcurrentMessageListenerContainer> existingContainers =
dynamicKafkaListener.getExistingContainers();
Assertions.assertEquals(
1, existingContainers.size()); // There should only be one Container at this point.

KafkaProperties.Consumer configuredConsumerProp = kafkaProperties.getIntegrationPoints().get(0);

long startingOffset =
getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId());

Transaction mockTransaction = setupMockNetworkInvocation(Optional.empty());

// When

// Publish multiple valid Transactions for consumption
for (int publishCount = 0; publishCount < 50; publishCount++) {
publishValidTransactionToInboundTopic(
DEFAULT_CHANNEL_NAME, DEFAULT_CONTRACT_NAME, DEFAULT_FUNCTION_NAME);
}

// Reducing the timeout since we have dedicated consumer threads per partition
Awaitility.await()
.pollDelay(500, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(true));

// Then
long endingOffset =
getCurrentCommittedMessageCountForInboundTopic(configuredConsumerProp.getGroupId());

// Verify that all the published messages were polled and processed
Assertions.assertEquals(50, (endingOffset - startingOffset));

// There were no retries. Therefore, Transaction invocation was performed for all inbound
// messages.
verify(mockTransaction, times(50)).submit(Mockito.any());

ConcurrentMessageListenerContainer<String, String> currentContainer = existingContainers.get(0);
int containerConcurrencyLevel = currentContainer.getConcurrency();

for (int threadCount = 0; threadCount < containerConcurrencyLevel; threadCount++) {
String consumerThreadId =
singleListenerConsumerThreadNamePattern.replace(
"{{thread-count}}", String.valueOf(threadCount));
// Does the logs talk about consumer thread processing the messages?
Assertions.assertTrue(capturedLogOutput.getOut().contains(consumerThreadId));
}
}

@NotNull
private Transaction setupMockNetworkInvocation(Optional<Exception> optionalTxSubmitException)
throws ContractException, TimeoutException, InterruptedException {
Gateway gatewayMock = Mockito.mock(Gateway.class);
ReflectionTestUtils.setField(transactionFulfillment, "gateway", gatewayMock);

Network mockNetwork = Mockito.mock(Network.class);
Contract mockContract = Mockito.mock(Contract.class);
Transaction mockTransaction = Mockito.mock(Transaction.class);
Mockito.when(gatewayMock.getNetwork(DEFAULT_CHANNEL_NAME)).thenReturn(mockNetwork);
Mockito.when(mockNetwork.getContract(DEFAULT_CONTRACT_NAME)).thenReturn(mockContract);
Mockito.when(mockContract.createTransaction(DEFAULT_FUNCTION_NAME)).thenReturn(mockTransaction);

if (optionalTxSubmitException.isPresent()) {
doThrow(optionalTxSubmitException.get()).when(mockTransaction).submit(Mockito.any());
} else {
Mockito.when(mockTransaction.submit(Mockito.any()))
.thenReturn(UUID.randomUUID().toString().getBytes());
}
return mockTransaction;
}
}
Loading

0 comments on commit 0495cff

Please sign in to comment.