Skip to content

Latest commit

 

History

History
202 lines (161 loc) · 5.23 KB

File metadata and controls

202 lines (161 loc) · 5.23 KB

RabbitMQ Client for Vert.x

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-rabbitmq-client</artifactId>
  <version>3.5.0.Beta1</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
  compile 'io.vertx:vertx-rabbitmq-client:3.5.0.Beta1'
}

Create a client

You can create a client instance as follows using a full amqp uri:

RabbitMQOptions config = new RabbitMQOptions();
// full amqp uri
config.setUri("amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc");
RabbitMQClient client = RabbitMQClient.create(vertx, config);

Or you can also specify individual parameters manually:

RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1");
config.setPassword("password1");
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("vhost1");
config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
config.setHandshakeTimeout(6000); // in milliseconds
config.setRequestedChannelMax(5);
config.setNetworkRecoveryInterval(500); // in milliseconds
config.setAutomaticRecoveryEnabled(true);

RabbitMQClient client = RabbitMQClient.create(vertx, config);

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

Map<String, String> config = new HashMap<>();

config.put("x-dead-letter-exchange", "my.deadletter.exchange");
config.put("alternate-exchange", "my.alternate.exchange");
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, onResult -> {
  if (onResult.succeeded()) {
    System.out.println("Exchange successfully declared with config");
  } else {
    onResult.cause().printStackTrace();
  }
});

Operations

The following are some examples of the operations supported by the RabbitMQService API.

Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

JsonObject message = new JsonObject().put("body", "Hello RabbitMQ, from Vert.x !");
client.basicPublish("", "my.queue", message, pubResult -> {
  if (pubResult.succeeded()) {
    System.out.println("Message published !");
  } else {
    pubResult.cause().printStackTrace();
  }
});

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

JsonObject message = new JsonObject().put("body", "Hello RabbitMQ, from Vert.x !");

// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect(confirmResult -> {
  if(confirmResult.succeeded()) {
    client.basicPublish("", "my.queue", message, pubResult -> {
      if (pubResult.succeeded()) {
        // Check the message got confirmed by the broker.
        client.waitForConfirms(waitResult -> {
          if(waitResult.succeeded())
            System.out.println("Message published !");
          else
            waitResult.cause().printStackTrace();
        });
      } else {
        pubResult.cause().printStackTrace();
      }
    });
  } else {
    confirmResult.cause().printStackTrace();
  }
});

Consume

Consume messages from a queue

// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", msg -> {
  JsonObject json = (JsonObject) msg.body();
  System.out.println("Got message: " + json.getString("body"));
});

// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", consumeResult -> {
  if (consumeResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
  } else {
    consumeResult.cause().printStackTrace();
  }
});

Get

Will get a message from a queue

client.basicGet("my.queue", true, getResult -> {
  if (getResult.succeeded()) {
    JsonObject msg = getResult.result();
    System.out.println("Got message: " + msg.getString("body"));
  } else {
    getResult.cause().printStackTrace();
  }
});

Consume messages without auto-ack

vertx.eventBus().consumer("my.address", msg -> {
  JsonObject json = (JsonObject) msg.body();
  System.out.println("Got message: " + json.getString("body"));
  // ack
  client.basicAck(json.getLong("deliveryTag"), false, asyncResult -> {
  });
});

// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", false, consumeResult -> {
  if (consumeResult.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
  } else {
    consumeResult.cause().printStackTrace();
  }
});

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work. <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>