Skip to content

Commit

Permalink
Support multiple outbound topics for publishing blockchain events
Browse files Browse the repository at this point in the history
Signed-off-by: Abhay Kishore <abhay.kishore@walmart.com>
  • Loading branch information
abhakish committed Aug 27, 2024
1 parent 41078ec commit add13c0
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,21 @@ public static class Events {
// preferred for providing Chaincode details for Event subscription
private List<String> chaincode;
private boolean standardCCEventEnabled;
private List<String> block;
private List<BlockDetails> blockDetails;
private List<ChaincodeDetails> chaincodeDetails;
}

@Data
public static class BlockDetails {
private String channelName;
private List<String> listenerTopics;
}

@Data
public static class ChaincodeDetails {
private String channelName;
private String chaincodeId;
private List<String> listenerTopics;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ public void onRefresh(RefreshScopeRefreshedEvent event) {
startEventListener();
}

public void startEventListener() {
private void startEventListener() {

try {
List<String> blockChannelNames = fabricProperties.getEvents().getBlock();
if (!CollectionUtils.isEmpty(blockChannelNames)) {
List<FabricProperties.BlockDetails> blockDetailsList =
fabricProperties.getEvents().getBlockDetails();
if (!CollectionUtils.isEmpty(blockDetailsList)) {

for (String channelName : blockChannelNames) {
for (FabricProperties.BlockDetails blockDetails : blockDetailsList) {
String channelName = blockDetails.getChannelName();
log.info("channel names {}", channelName);
Network network = gateway.getNetwork(channelName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ boolean sendMessage(
* @param eventName String chaincode event-name
* @param channelName String Name of the channel where the event was generated.
* @param messageKey associated key for the payload.
* @return status boolean status of msg sent
*/
boolean publishChaincodeEvents(
void publishChaincodeEvents(
final String payload,
String chaincodeName,
String fabricTxId,
Expand All @@ -42,9 +41,8 @@ boolean publishChaincodeEvents(
* @param channelName String Name of the channel where the event was generated.
* @param functionName String Name of the function name.
* @param isPrivateDataPresent boolean flag to check if privateData present in payload
* @return status boolean status of msg sent
*/
boolean publishBlockEvents(
void publishBlockEvents(
final String payload,
String fabricTxId,
String channelName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import hlf.java.rest.client.config.KafkaProperties;
import hlf.java.rest.client.service.EventPublishService;
import hlf.java.rest.client.util.FabricClientConstants;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand All @@ -12,6 +13,7 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

Expand Down Expand Up @@ -82,18 +84,51 @@ public void onFailure(Throwable ex) {
}

@Override
public boolean publishChaincodeEvents(
public void publishChaincodeEvents(
String payload,
String chaincodeName,
String fabricTxId,
String eventName,
String channelName,
String messageKey) {
boolean status = true;

Optional<FabricProperties.ChaincodeDetails> optionalChaincodeDetails =
fabricProperties.getEvents().getChaincodeDetails().stream()
.filter(
chaincodeDetail ->
chaincodeDetail.getChannelName().equals(channelName)
&& chaincodeDetail.getChaincodeId().equals(chaincodeName))
.findAny();

if (!optionalChaincodeDetails.isPresent()
|| CollectionUtils.isEmpty(optionalChaincodeDetails.get().getListenerTopics())) {
sendMessage(
kafkaProperties.getEventListener().getTopic(),
payload,
chaincodeName,
fabricTxId,
eventName,
channelName,
messageKey);
return;
}

for (String topic : optionalChaincodeDetails.get().getListenerTopics()) {
sendMessage(topic, payload, chaincodeName, fabricTxId, eventName, channelName, messageKey);
}
}

private void sendMessage(
String topic,
String payload,
String chaincodeName,
String fabricTxId,
String eventName,
String channelName,
String messageKey) {
try {
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(kafkaProperties.getEventListener().getTopic(), messageKey, payload);
new ProducerRecord<>(topic, messageKey, payload);

producerRecord
.headers()
Expand All @@ -118,9 +153,7 @@ public boolean publishChaincodeEvents(
FabricClientConstants.FABRIC_EVENT_TYPE,
FabricClientConstants.FABRIC_EVENT_TYPE_CHAINCODE.getBytes()));

log.info(
"Publishing Chaincode event to outbound topic {}",
kafkaProperties.getEventListener().getTopic());
log.info("Publishing Chaincode event to outbound topic {}", topic);

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);

Expand All @@ -146,30 +179,61 @@ public void onFailure(Throwable ex) {
});

} catch (Exception ex) {
status = false;
log.error("Error sending message - " + ex.getMessage());
}

return status;
}

@Override
public boolean publishBlockEvents(
public void publishBlockEvents(
String payload,
String fabricTxId,
String channelName,
String chaincodeName,
String functionName,
Boolean isPrivateDataPresent) {
boolean status = true;

Optional<FabricProperties.BlockDetails> optionalBlockDetails =
fabricProperties.getEvents().getBlockDetails().stream()
.filter(blockDetails -> blockDetails.getChannelName().equals(channelName))
.findAny();

if (!optionalBlockDetails.isPresent()
|| CollectionUtils.isEmpty(optionalBlockDetails.get().getListenerTopics())) {
sendMessage(
kafkaProperties.getEventListener().getTopic(),
payload,
fabricTxId,
channelName,
chaincodeName,
functionName,
isPrivateDataPresent);
return;
}

for (String topic : optionalBlockDetails.get().getListenerTopics()) {
sendMessage(
topic,
payload,
fabricTxId,
channelName,
chaincodeName,
functionName,
isPrivateDataPresent);
}
}

private void sendMessage(
String topic,
String payload,
String fabricTxId,
String channelName,
String chaincodeName,
String functionName,
Boolean isPrivateDataPresent) {
try {

ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(
kafkaProperties.getEventListener().getTopic(),
String.valueOf(payload.hashCode()),
payload);
new ProducerRecord<>(topic, String.valueOf(payload.hashCode()), payload);

producerRecord
.headers()
Expand Down Expand Up @@ -204,9 +268,7 @@ public boolean publishBlockEvents(
FabricClientConstants.IS_PRIVATE_DATA_PRESENT,
isPrivateDataPresent.toString().getBytes()));

log.info(
"Publishing Block event to outbound topic {}",
kafkaProperties.getEventListener().getTopic());
log.info("Publishing Block event to outbound topic {}", topic);

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);

Expand All @@ -232,10 +294,7 @@ public void onFailure(Throwable ex) {
});

} catch (Exception ex) {
status = false;
log.error("Error sending message - " + ex.getMessage());
}

return status;
}
}
13 changes: 8 additions & 5 deletions src/main/resources/application.template
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ fabric:
enable: true
standardCCEventEnabled: boolean (if set to true then the chaincode event is attempted at deserializing in the connector)
chaincode: <comma separated list of channels> (Note : Will soon be deprecated / removed)
block: <comma separated list of channels>
blockDetails:
- channelName: Name of the Channel
listenerTopics: topics to which event messages will be sent
chaincodeDetails:
-
channelName: Name of the Channel
chaincodeId: chaincode-id of the deployed chaincode in this Channel
-
channelName: Name of the Channel
chaincodeId: chaincode-id of the deployed chaincode in this Channel
listenerTopics: topics to which event messages will be sent
kafka:
integration-points: <Note : This is a list and Multiple intgration points can be configured)
- groupId: test_group_id
Expand All @@ -51,7 +54,7 @@ kafka:
offsetResetPolicy: <possible values are earliest / latest> if not provided, default will be latest.
event-listener:
brokerHost: <Comma separated list of boostrap servers>
topic: <topic to publish Block or Chaincode Events>
topic: <default topic to publish Block or Chaincode Events if no topic configured at individual level of event>
ssl-enabled: boolean
security-protocol: <Only supports SSL>
listenToFailedMessages: boolean <set as true if you wish to recieve errored Transaction records back to this topic>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static class TestConfiguration {
static String FABRIC_PROPERTIES_CLIENT =
"FabricProperties.Client(rest=FabricProperties.Client.Rest(apikey=expected-key))";
static String FABRIC_PROPERTIES_EVENTS =
"FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], standardCCEventEnabled=false, block=[block111, block2], chaincodeDetails=null)";
"FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], standardCCEventEnabled=false, blockDetails=[FabricProperties.BlockDetails(channelName=block111, listenerTopics=[topic-1])], chaincodeDetails=null)";
static String KAFKA_PROPERTIES_PRODUCER =
"Producer{brokerHost='localhost:8087', topic='hlf-offchain-topic1', saslJaasConfig='null'}";
static String KAFKA_CONSUMER_PROPERTIES =
Expand Down
4 changes: 3 additions & 1 deletion src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ fabric:
events:
enable: false
chaincode: channel1
block: channel1
blockDetails:
- channelName: channel1
listenerTopics: topic-1
client:
rest:
apikey: abc
4 changes: 3 additions & 1 deletion src/test/resources/integration/sample-application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ fabric:
enable: true
standardCCEventEnabled: false
chaincode: chaincode12, chaincode2
block: block111, block2
blockDetails:
- channelName: block111
listenerTopics: topic-1
kafka:
integration-points:
-
Expand Down

0 comments on commit add13c0

Please sign in to comment.