RabbitMQ Java client API

The RabbitMQ community and its main supporter company, Pivotal, provide an official client library for Java called RabbitMQ Java Client. Client library provides both the publishing of messages and receiving of messages. Moreover, Client library supports both synchronous receiving and asynchronous receiving. The details will be explained in the following topics.

If we look at the main packages of the RabbitMQ Java Client, we can see three packages as shown in the following screenshot:

RabbitMQ Java client API

RabbitMQ Java Client Packages

Let's take a look at the following explanation:

  • com.rabbitmq.client package provides classes and interfaces for AMQP connections, channels, and wire-protocol framing descriptions
  • com.rabbitmq.tools provides classes and methods for non-core utilities and administration tools
  • Lastly, com.rabbitmq.utility provides helper classes which are mostly used in the implementation of a library

As the most important package of the API is Client API, we cover the basics and the internals of the API in the following topics:

Client package in detail

You can find each AMQP element in the Client package such as Connection, Channel, Exchanges, Queues, and so on. You can find each functionality of AMQP in the Client package too. Therefore, we would like to introduce you to each element and its functions as follows:

Connection

Connection is an interface in the Client package. Connection interface directly refers to the Connection element of AMQP. So, Connection interface covers the functionalities of the Connection element of AMQP too. We can create a Connection instance through the ConnectionFactory class as shown in the following code:

//ConnectionFactory initialization
ConnectionFactory factory = new ConnectionFactory();
//Setting the hostname
factory.setHost("localhost");
//Setting the Username
factory.setUsername("guest");
//Setting the Password
factory.setPassword("guest");
//Creating the connection using factory instance
Connection conn = factory.newConnection();

The ConnectionFactory class has attributes that refer to hostname, port, username, password, and virtual host. We can set each of the mandatory attributes, and then we are ready to create our connection. Additionally, we can set each attribute using the URI standards as follows:

//ConnectionFactory initialization
ConnectionFactory factory = new ConnectionFactory();
//Setting the Attributes using Uri
factory.setUri("amqp://guest:guest@localhost")
//Creating the connection using factory instance
Connection conn = factory.newConnection();

As we recall from the database connections, we should close our connection after completing our tasks. In Connection class, we have the close() method to do the same job as the following code example demonstrates:

//Creating the connection using factory instance
Connection conn = factory.newConnection();
//Closing the connection
conn.close();

Channel

Channel is another interface in the Client package. As Connection interface refers to the Connection element of AMQP, Channel refers to the Channel element of AMQP. As we discussed in Chapter 3, Architecture and Messaging, Channel's main role is to serve as a logical connection inside of the network connection to the message broker. Channel instances are thread safe.

The Channel instance could be initialized through the Connection instance as shown in the following code example:

//Connection Init
Connection conn = factory.newConection();
//Initializing the Channel using Connection
Channel channel = conn.createChannel();

Because of the Channel's main responsibility, we can send message, receive message, make queue operations, and so on using the Channel. Channel won't be available if these operations fail. Code examples of channel could be presented in the following topics.

Exchanges

Exchanges are the main elements of AMQP that moderate the queues with given functionalities. Exchanges are also available within the RabbitMQ Java Client API. Exchanges' main responsibility is to receive the messages from producers and push them to the related queues that are expressed by the rules. Although they have such importance at AMQP 0.9.1, they don't exist in the AMQP 1.0 specification.

We are able to define each exchange type such as direct, fanout, headers, and so on using the Java API. In Java API, we can create the exchanges via Channel instances as shown in the following code example:

//Channel Initialization
Channel channel = conn.createChannel();
//Declaring Exchanges using the Channel
channel.exchangeDeclare("mastering.rabbitmq","fanout");

Queues

Message Brokers are nothing without queues. Queues are the most important part of the Message Brokers and AMQP. Whenever a new message consumer or subscriber is connected to the Exchange, RabbitMQ creates a queue for the related exchange with the provided name.

As we discussed earlier, Channels are responsible for common operations of the Queues. Therefore, we can declare, bind, unbind, purge, and delete queues with the methods of the Channels as provided in the RabbitMQ Java API. The following simple coding example shows how Queues are bound to given exchanges:

//Declare Exchange
channel.exchangeDeclare("mastering.rabbitmq", "fanout")
//Get the name of bound Queue
String queueName = channel.queueDeclare().getQueue()
//Bind the queue to the exchange without routing key
channel.queueBind(queueName, "mastering.rabbitmq","");

Publishing messages

Before talking about the details of sending messages within our case study, we should look at how we send messages through RabbitMQ using RabbitMQ Java Client API. Although we know that we have many methods such as pub-sub, routed messaging, and so on to publish our message, we'd just like to show the simple message sending.

As we'd like to show simple message sending, we should declare queue and publish message to the declared queue as shown in the following code example:

importcom.rabbitmq.client.ConnectionFactory;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.Channel;

publicclassSender {

  privatefinalstatic String QUEUE_NAME ="mastering.rabbitmq";

  publicstaticvoidmain(String[] argv)throws Exception {

    ConnectionFactory factory =new ConnectionFactory();
    factory.setHost("localhost"); //1
    Connection connection = factory.newConnection(); //2
    Channel channel = connection.createChannel(); //3

    channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4 
    String message ="Hello Mastering RabbitMQ!";
    channel.basicPublish("",QUEUE_NAME,null,message.getBytes());//5
    System.out.println("Following Message Sent: "+ message);
 
    channel.close();
    connection.close();
  }
}

If we look at the details of the code that has numbered comments, we notice that:

  • Connection Factory expresses the hostname of RabbitMQ Server
  • The Connection instance created through ConnectionFactory instance
  • The Channel instance initialized through the Connection instance
  • Declaring a queue with a provided name
  • Publishing message directly to the provided queue

Now, we've got the basics of the sending message, we're ready to move on to how we receive a message that delivers from the connected queue.

Consuming messages

Consuming messages from RabbitMQ is similar, but not identical to publishing. Firstly, we initialize the connection through the ConnectionFactory instance and declare the queue that is related to our receiver and sender. Then, the difference from sender comes here, that is, the receiving message part. The receiving part could be implemented in a synchronous way or asynchronous way. In a synchronous way, we block the current thread to listen to message deliveries; however, in an asynchronous way, a thread can't be blocked, so whenever a message is delivered, the consumer method is called instantly in an event like manner.

Synchronously receiving messages

The consumer can receive messages synchronously, and we will go over an example regarding this. As we look at the following code example, we block our thread to listen to message deliveries by using the while loop. In a while loop, we fetch the next incoming message using the QueueinConsumer instance. Then we can convert the incoming message body to our custom object type:

importcom.rabbitmq.client.ConnectionFactory;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.QueueingConsumer;

publicclassReciever {

  privatefinalstaticStringQUEUE_NAME="mastering.rabbitmq";

  publicstaticvoidmain(String[]argv)throwsException {

    ConnectionFactoryfactory=newConnectionFactory();
    factory.setHost("localhost");
    Connectionconnection=factory.newConnection();
    Channelchannel=connection.createChannel();

    channel.queueDeclare(QUEUE_NAME,false,false,false,null);

    QueueingConsumerconsumer=newQueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME,true,consumer);

    while(true) {
      QueueingConsumer.Deliverydelivery=consumer.nextDelivery();
      Stringmsg=newString(delivery.getBody());
      System.out.println("Received Message:"+msg);
    }
  }
}

Asynchronously receiving messages

The main functional difference between synchronous receiving and asynchronous receiving is blocking. In asynchronous receiving, thread couldn't be blocked by the listening part, so you can do anything with the current thread.

Non-blocking and event driven software systems are very popular for their scalability. Therefore, it is good to use an asynchronous way in the receiving part. RabbitMQ Java API gives us a DefaultConsumer method to control the deliveries. In the following code example, you can find the inner class that implements the DefaultConsumer method called handleDelivery:

importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.QueueingConsumer;

publicclassReciever {

  privatefinalstaticStringQUEUE_NAME="mastering.rabbitmq";

  publicstaticvoidmain(String[]argv)throwsException {
    ConnectionFactoryfactory=newConnectionFactory();
    factory.setHost("localhost");
    Connectionconnection=factory.newConnection();
    Channelchannel=connection.createChannel();

    channel.queueDeclare(QUEUE_NAME,false,false,false,null);

    channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
      AMQP.BasicProperties properties, byte[] body)throws IOException {
        String msg = new String(body);
        System.out.println("Received Message: " + msg);
      }
    });
  }
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset