diff --git a/src/main/java/hlf/java/rest/client/config/FabricProperties.java b/src/main/java/hlf/java/rest/client/config/FabricProperties.java index cb34d65..0b0f5d4 100644 --- a/src/main/java/hlf/java/rest/client/config/FabricProperties.java +++ b/src/main/java/hlf/java/rest/client/config/FabricProperties.java @@ -68,14 +68,21 @@ public static class Events { // preferred for providing Chaincode details for Event subscription private List chaincode; private boolean standardCCEventEnabled; - private List block; + private List blockDetails; private List chaincodeDetails; } + @Data + public static class BlockDetails { + private String channelName; + private List listenerTopics; + } + @Data public static class ChaincodeDetails { private String channelName; private String chaincodeId; + private List listenerTopics; } /** diff --git a/src/main/java/hlf/java/rest/client/listener/FabricEventListener.java b/src/main/java/hlf/java/rest/client/listener/FabricEventListener.java index c62296e..b01f1d0 100644 --- a/src/main/java/hlf/java/rest/client/listener/FabricEventListener.java +++ b/src/main/java/hlf/java/rest/client/listener/FabricEventListener.java @@ -47,13 +47,15 @@ public void onRefresh(RefreshScopeRefreshedEvent event) { startEventListener(); } - public void startEventListener() { + private void startEventListener() { try { - List blockChannelNames = fabricProperties.getEvents().getBlock(); - if (!CollectionUtils.isEmpty(blockChannelNames)) { + List blockDetailsList = + fabricProperties.getEvents().getBlockDetails(); + if (!CollectionUtils.isEmpty(blockDetailsList)) { - for (String channelName : blockChannelNames) { + for (FabricProperties.BlockDetails blockDetails : blockDetailsList) { + String channelName = blockDetails.getChannelName(); log.info("channel names {}", channelName); Network network = gateway.getNetwork(channelName); diff --git a/src/main/java/hlf/java/rest/client/service/EventPublishService.java b/src/main/java/hlf/java/rest/client/service/EventPublishService.java index 554b807..a3a12e6 100644 --- a/src/main/java/hlf/java/rest/client/service/EventPublishService.java +++ b/src/main/java/hlf/java/rest/client/service/EventPublishService.java @@ -25,9 +25,8 @@ boolean sendMessage( * @param eventName String chaincode event-name * @param channelName String Name of the channel where the event was generated. * @param messageKey associated key for the payload. - * @return status boolean status of msg sent */ - boolean publishChaincodeEvents( + void publishChaincodeEvents( final String payload, String chaincodeName, String fabricTxId, @@ -42,9 +41,8 @@ boolean publishChaincodeEvents( * @param channelName String Name of the channel where the event was generated. * @param functionName String Name of the function name. * @param isPrivateDataPresent boolean flag to check if privateData present in payload - * @return status boolean status of msg sent */ - boolean publishBlockEvents( + void publishBlockEvents( final String payload, String fabricTxId, String channelName, diff --git a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java index 3639069..de5fb18 100644 --- a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java +++ b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java @@ -4,6 +4,7 @@ import hlf.java.rest.client.config.KafkaProperties; import hlf.java.rest.client.service.EventPublishService; import hlf.java.rest.client.util.FabricClientConstants; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.internals.RecordHeader; @@ -12,6 +13,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -82,18 +84,51 @@ public void onFailure(Throwable ex) { } @Override - public boolean publishChaincodeEvents( + public void publishChaincodeEvents( String payload, String chaincodeName, String fabricTxId, String eventName, String channelName, String messageKey) { - boolean status = true; + Optional optionalChaincodeDetails = + fabricProperties.getEvents().getChaincodeDetails().stream() + .filter( + chaincodeDetail -> + chaincodeDetail.getChannelName().equals(channelName) + && chaincodeDetail.getChaincodeId().equals(chaincodeName)) + .findAny(); + + if (!optionalChaincodeDetails.isPresent() + || CollectionUtils.isEmpty(optionalChaincodeDetails.get().getListenerTopics())) { + sendMessage( + kafkaProperties.getEventListener().getTopic(), + payload, + chaincodeName, + fabricTxId, + eventName, + channelName, + messageKey); + return; + } + + for (String topic : optionalChaincodeDetails.get().getListenerTopics()) { + sendMessage(topic, payload, chaincodeName, fabricTxId, eventName, channelName, messageKey); + } + } + + private void sendMessage( + String topic, + String payload, + String chaincodeName, + String fabricTxId, + String eventName, + String channelName, + String messageKey) { try { ProducerRecord producerRecord = - new ProducerRecord<>(kafkaProperties.getEventListener().getTopic(), messageKey, payload); + new ProducerRecord<>(topic, messageKey, payload); producerRecord .headers() @@ -118,9 +153,7 @@ public boolean publishChaincodeEvents( FabricClientConstants.FABRIC_EVENT_TYPE, FabricClientConstants.FABRIC_EVENT_TYPE_CHAINCODE.getBytes())); - log.info( - "Publishing Chaincode event to outbound topic {}", - kafkaProperties.getEventListener().getTopic()); + log.info("Publishing Chaincode event to outbound topic {}", topic); ListenableFuture> future = kafkaTemplate.send(producerRecord); @@ -146,30 +179,61 @@ public void onFailure(Throwable ex) { }); } catch (Exception ex) { - status = false; log.error("Error sending message - " + ex.getMessage()); } - - return status; } @Override - public boolean publishBlockEvents( + public void publishBlockEvents( String payload, String fabricTxId, String channelName, String chaincodeName, String functionName, Boolean isPrivateDataPresent) { - boolean status = true; + Optional optionalBlockDetails = + fabricProperties.getEvents().getBlockDetails().stream() + .filter(blockDetails -> blockDetails.getChannelName().equals(channelName)) + .findAny(); + + if (!optionalBlockDetails.isPresent() + || CollectionUtils.isEmpty(optionalBlockDetails.get().getListenerTopics())) { + sendMessage( + kafkaProperties.getEventListener().getTopic(), + payload, + fabricTxId, + channelName, + chaincodeName, + functionName, + isPrivateDataPresent); + return; + } + + for (String topic : optionalBlockDetails.get().getListenerTopics()) { + sendMessage( + topic, + payload, + fabricTxId, + channelName, + chaincodeName, + functionName, + isPrivateDataPresent); + } + } + + private void sendMessage( + String topic, + String payload, + String fabricTxId, + String channelName, + String chaincodeName, + String functionName, + Boolean isPrivateDataPresent) { try { ProducerRecord producerRecord = - new ProducerRecord<>( - kafkaProperties.getEventListener().getTopic(), - String.valueOf(payload.hashCode()), - payload); + new ProducerRecord<>(topic, String.valueOf(payload.hashCode()), payload); producerRecord .headers() @@ -204,9 +268,7 @@ public boolean publishBlockEvents( FabricClientConstants.IS_PRIVATE_DATA_PRESENT, isPrivateDataPresent.toString().getBytes())); - log.info( - "Publishing Block event to outbound topic {}", - kafkaProperties.getEventListener().getTopic()); + log.info("Publishing Block event to outbound topic {}", topic); ListenableFuture> future = kafkaTemplate.send(producerRecord); @@ -232,10 +294,7 @@ public void onFailure(Throwable ex) { }); } catch (Exception ex) { - status = false; log.error("Error sending message - " + ex.getMessage()); } - - return status; } } diff --git a/src/main/resources/application.template b/src/main/resources/application.template index 7f70104..e1e3423 100644 --- a/src/main/resources/application.template +++ b/src/main/resources/application.template @@ -32,11 +32,14 @@ fabric: enable: true standardCCEventEnabled: boolean (if set to true then the chaincode event is attempted at deserializing in the connector) chaincode: (Note : Will soon be deprecated / removed) - block: + blockDetails: + - channelName: Name of the Channel + listenerTopics: topics to which event messages will be sent chaincodeDetails: - - - channelName: Name of the Channel - chaincodeId: chaincode-id of the deployed chaincode in this Channel + - + channelName: Name of the Channel + chaincodeId: chaincode-id of the deployed chaincode in this Channel + listenerTopics: topics to which event messages will be sent kafka: integration-points: if not provided, default will be latest. event-listener: brokerHost: - topic: + topic: ssl-enabled: boolean security-protocol: listenToFailedMessages: boolean diff --git a/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java b/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java index dac61a3..f4f751c 100644 --- a/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java +++ b/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java @@ -81,7 +81,7 @@ private static class TestConfiguration { static String FABRIC_PROPERTIES_CLIENT = "FabricProperties.Client(rest=FabricProperties.Client.Rest(apikey=expected-key))"; static String FABRIC_PROPERTIES_EVENTS = - "FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], standardCCEventEnabled=false, block=[block111, block2], chaincodeDetails=null)"; + "FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], standardCCEventEnabled=false, blockDetails=[FabricProperties.BlockDetails(channelName=block111, listenerTopics=[topic-1])], chaincodeDetails=null)"; static String KAFKA_PROPERTIES_PRODUCER = "Producer{brokerHost='localhost:8087', topic='hlf-offchain-topic1', saslJaasConfig='null'}"; static String KAFKA_CONSUMER_PROPERTIES = diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 92d67c1..fcbe2a9 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -67,7 +67,9 @@ fabric: events: enable: false chaincode: channel1 - block: channel1 + blockDetails: + - channelName: channel1 + listenerTopics: topic-1 client: rest: apikey: abc diff --git a/src/test/resources/integration/sample-application.yml b/src/test/resources/integration/sample-application.yml index 7544a1e..f3a4f3a 100644 --- a/src/test/resources/integration/sample-application.yml +++ b/src/test/resources/integration/sample-application.yml @@ -38,7 +38,9 @@ fabric: enable: true standardCCEventEnabled: false chaincode: chaincode12, chaincode2 - block: block111, block2 + blockDetails: + - channelName: block111 + listenerTopics: topic-1 kafka: integration-points: -