Skip to content

Commit

Permalink
Added a process to refresh the mqtt client for input adaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
dulichan committed May 18, 2014
1 parent 3c2e7fb commit c10f25a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public final class MQTTEventAdaptorType extends AbstractInputEventAdaptor {
private static MQTTEventAdaptorType mqttEventAdaptorAdaptor = new MQTTEventAdaptorType();

public static MQTTEventAdaptorType getInstance() {

return mqttEventAdaptorAdaptor;
}

Expand All @@ -59,8 +58,10 @@ protected List<String> getSupportedInputMessageTypes() {
protected void init() {
this.resourceBundle = ResourceBundle.getBundle("org.wso2.event.adaptor.mqtt.i18n.Resources", Locale.getDefault());
log.info("MQTTEventAdaptorType init");

}


@Override
protected List<Property> getInputAdaptorProperties() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import org.wso2.carbon.event.input.adaptor.core.InputEventAdaptorListener;
import org.wso2.carbon.event.input.adaptor.core.exception.InputEventAdaptorEventProcessingException;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* Created by deep on 4/22/14.
Expand Down Expand Up @@ -72,19 +75,46 @@ public MQTTListener(MQTTBrokerConnectionConfig mqttBrokerConnectionConfig, Strin
}

}
private void start(MqttClient client, String topicName, int qos) {
ScheduledExecutorService dhtReaderScheduler = Executors.newScheduledThreadPool(1);
dhtReaderScheduler.scheduleWithFixedDelay(new Reconnector(topicName, qos), 0, 10, TimeUnit.SECONDS);
}
public class Reconnector implements Runnable {

private final String topicName;
private final int qos;

public Reconnector(String topicName, int qos) {
this.topicName = topicName;
this.qos = qos;
}

@Override
public void run() {

try {
if(!mqttClient.isConnected()){
mqttClient.connect(connectionOptions);
System.out.println("Mqtt client reconnected");
mqttClient.subscribe(topicName, qos);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

public void subscribe(String topicName, int qos) throws MqttException {

// Connect to the MQTT server
mqttClient.connect(connectionOptions);
start(mqttClient, topicName, qos);

// Subscribe to the requested topic
// The QoS specified is the maximum level that messages will be sent to the client at.
// For instance if QoS 1 is specified, any messages originally published at QoS 2 will
// be downgraded to 1 when delivering to the client but messages published at 1 and 0
// will be received at the same level they were published at.

mqttClient.subscribe(topicName, qos);
//Will need to wait for the message delivery
// try {
// semaphore.acquire();
Expand Down Expand Up @@ -116,6 +146,7 @@ public void connectionLost(Throwable throwable) {
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
try {
log.info("message arrived " + mqttMessage.toString());
System.out.println("message arrived " + mqttMessage.toString());
String msgText = mqttMessage.toString();
eventAdaptorListener.onEventCall(msgText);
} catch (InputEventAdaptorEventProcessingException e) {
Expand Down

0 comments on commit c10f25a

Please sign in to comment.