diff --git a/mqtt-input-event-adaptor/src/main/java/org/wso2/event/adaptor/mqtt/MQTTEventAdaptorType.java b/mqtt-input-event-adaptor/src/main/java/org/wso2/event/adaptor/mqtt/MQTTEventAdaptorType.java index af338b0..7365fe6 100644 --- a/mqtt-input-event-adaptor/src/main/java/org/wso2/event/adaptor/mqtt/MQTTEventAdaptorType.java +++ b/mqtt-input-event-adaptor/src/main/java/org/wso2/event/adaptor/mqtt/MQTTEventAdaptorType.java @@ -39,7 +39,6 @@ public final class MQTTEventAdaptorType extends AbstractInputEventAdaptor { private static MQTTEventAdaptorType mqttEventAdaptorAdaptor = new MQTTEventAdaptorType(); public static MQTTEventAdaptorType getInstance() { - return mqttEventAdaptorAdaptor; } @@ -59,8 +58,10 @@ protected List getSupportedInputMessageTypes() { protected void init() { this.resourceBundle = ResourceBundle.getBundle("org.wso2.event.adaptor.mqtt.i18n.Resources", Locale.getDefault()); log.info("MQTTEventAdaptorType init"); + } + @Override protected List getInputAdaptorProperties() { diff --git a/mqtt-input-event-adaptor/src/main/java/org/wso2/event/adaptor/mqtt/internal/util/MQTTListener.java b/mqtt-input-event-adaptor/src/main/java/org/wso2/event/adaptor/mqtt/internal/util/MQTTListener.java index 4ee83f0..a8df10e 100644 --- a/mqtt-input-event-adaptor/src/main/java/org/wso2/event/adaptor/mqtt/internal/util/MQTTListener.java +++ b/mqtt-input-event-adaptor/src/main/java/org/wso2/event/adaptor/mqtt/internal/util/MQTTListener.java @@ -7,7 +7,10 @@ import org.wso2.carbon.event.input.adaptor.core.InputEventAdaptorListener; import org.wso2.carbon.event.input.adaptor.core.exception.InputEventAdaptorEventProcessingException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; /** * Created by deep on 4/22/14. @@ -72,11 +75,39 @@ public MQTTListener(MQTTBrokerConnectionConfig mqttBrokerConnectionConfig, Strin } } + private void start(MqttClient client, String topicName, int qos) { + ScheduledExecutorService dhtReaderScheduler = Executors.newScheduledThreadPool(1); + dhtReaderScheduler.scheduleWithFixedDelay(new Reconnector(topicName, qos), 0, 10, TimeUnit.SECONDS); + } + public class Reconnector implements Runnable { + + private final String topicName; + private final int qos; + + public Reconnector(String topicName, int qos) { + this.topicName = topicName; + this.qos = qos; + } + + @Override + public void run() { + + try { + if(!mqttClient.isConnected()){ + mqttClient.connect(connectionOptions); + System.out.println("Mqtt client reconnected"); + mqttClient.subscribe(topicName, qos); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } public void subscribe(String topicName, int qos) throws MqttException { // Connect to the MQTT server - mqttClient.connect(connectionOptions); + start(mqttClient, topicName, qos); // Subscribe to the requested topic // The QoS specified is the maximum level that messages will be sent to the client at. @@ -84,7 +115,6 @@ public void subscribe(String topicName, int qos) throws MqttException { // be downgraded to 1 when delivering to the client but messages published at 1 and 0 // will be received at the same level they were published at. - mqttClient.subscribe(topicName, qos); //Will need to wait for the message delivery // try { // semaphore.acquire(); @@ -116,6 +146,7 @@ public void connectionLost(Throwable throwable) { public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { try { log.info("message arrived " + mqttMessage.toString()); + System.out.println("message arrived " + mqttMessage.toString()); String msgText = mqttMessage.toString(); eventAdaptorListener.onEventCall(msgText); } catch (InputEventAdaptorEventProcessingException e) {