The RabbitMQ community and its main supporter company, Pivotal, provide an official client library for Java called RabbitMQ Java Client. Client library provides both the publishing of messages and receiving of messages. Moreover, Client library supports both synchronous receiving and asynchronous receiving. The details will be explained in the following topics.
If we look at the main packages of the RabbitMQ Java Client, we can see three packages as shown in the following screenshot:
Let's take a look at the following explanation:
As the most important package of the API is Client API, we cover the basics and the internals of the API in the following topics:
You can find each AMQP element in the Client package such as Connection, Channel, Exchanges, Queues, and so on. You can find each functionality of AMQP in the Client package too. Therefore, we would like to introduce you to each element and its functions as follows:
Connection is an interface in the Client package. Connection
interface directly refers to the Connection
element of AMQP. So, Connection
interface covers the functionalities of the Connection
element of AMQP too. We can create a Connection
instance through the ConnectionFactory
class as shown in the following code:
//ConnectionFactory initialization ConnectionFactory factory = new ConnectionFactory(); //Setting the hostname factory.setHost("localhost"); //Setting the Username factory.setUsername("guest"); //Setting the Password factory.setPassword("guest"); //Creating the connection using factory instance Connection conn = factory.newConnection();
The ConnectionFactory
class has attributes that refer to hostname
, port
, username
, password
, and virtual host
. We can set each of the mandatory attributes, and then we are ready to create our connection. Additionally, we can set each attribute using the URI standards as follows:
//ConnectionFactory initialization ConnectionFactory factory = new ConnectionFactory(); //Setting the Attributes using Uri factory.setUri("amqp://guest:guest@localhost") //Creating the connection using factory instance Connection conn = factory.newConnection();
As we recall from the database connections, we should close our connection after completing our tasks. In Connection
class, we have the close()
method to do the same job as the following code example demonstrates:
//Creating the connection using factory instance Connection conn = factory.newConnection(); //Closing the connection conn.close();
Channel is another interface in the Client package. As Connection interface refers to the Connection
element of AMQP, Channel refers to the Channel
element of AMQP. As we discussed in Chapter 3, Architecture and Messaging, Channel's main role is to serve as a logical connection inside of the network connection to the message broker. Channel
instances are thread safe.
The Channel
instance could be initialized through the Connection
instance as shown in the following code example:
//Connection Init Connection conn = factory.newConection(); //Initializing the Channel using Connection Channel channel = conn.createChannel();
Because of the Channel's main responsibility, we can send message, receive message, make queue operations, and so on using the Channel. Channel won't be available if these operations fail. Code examples of channel could be presented in the following topics.
Exchanges are the main elements of AMQP that moderate the queues with given functionalities. Exchanges are also available within the RabbitMQ Java Client API. Exchanges' main responsibility is to receive the messages from producers and push them to the related queues that are expressed by the rules. Although they have such importance at AMQP 0.9.1, they don't exist in the AMQP 1.0 specification.
We are able to define each exchange type such as direct, fanout, headers, and so on using the Java API. In Java API, we can create the exchanges via Channel
instances as shown in the following code example:
//Channel Initialization Channel channel = conn.createChannel(); //Declaring Exchanges using the Channel channel.exchangeDeclare("mastering.rabbitmq","fanout");
Message Brokers are nothing without queues. Queues are the most important part of the Message Brokers and AMQP. Whenever a new message consumer or subscriber is connected to the Exchange, RabbitMQ creates a queue for the related exchange with the provided name.
As we discussed earlier, Channels are responsible for common operations of the Queues. Therefore, we can declare, bind, unbind, purge, and delete queues with the methods of the Channels as provided in the RabbitMQ Java API. The following simple coding example shows how Queues are bound to given exchanges:
//Declare Exchange channel.exchangeDeclare("mastering.rabbitmq", "fanout") //Get the name of bound Queue String queueName = channel.queueDeclare().getQueue() //Bind the queue to the exchange without routing key channel.queueBind(queueName, "mastering.rabbitmq","");
Before talking about the details of sending messages within our case study, we should look at how we send messages through RabbitMQ using RabbitMQ Java Client API. Although we know that we have many methods such as pub-sub, routed messaging, and so on to publish our message, we'd just like to show the simple message sending.
As we'd like to show simple message sending, we should declare queue and publish message to the declared queue as shown in the following code example:
importcom.rabbitmq.client.ConnectionFactory; importcom.rabbitmq.client.Connection; importcom.rabbitmq.client.Channel; publicclassSender { privatefinalstatic String QUEUE_NAME ="mastering.rabbitmq"; publicstaticvoidmain(String[] argv)throws Exception { ConnectionFactory factory =new ConnectionFactory(); factory.setHost("localhost"); //1 Connection connection = factory.newConnection(); //2 Channel channel = connection.createChannel(); //3 channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4 String message ="Hello Mastering RabbitMQ!"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes());//5 System.out.println("Following Message Sent: "+ message); channel.close(); connection.close(); } }
If we look at the details of the code that has numbered comments, we notice that:
Connection
instance created through ConnectionFactory
instanceChannel
instance initialized through the Connection
instanceNow, we've got the basics of the sending message, we're ready to move on to how we receive a message that delivers from the connected queue.
Consuming messages from RabbitMQ is similar, but not identical to publishing. Firstly, we initialize the connection through the ConnectionFactory
instance and declare the queue that is related to our receiver and sender. Then, the difference from sender comes here, that is, the receiving message part. The receiving part could be implemented in a synchronous way or asynchronous way. In a synchronous way, we block the current thread to listen to message deliveries; however, in an asynchronous way, a thread can't be blocked, so whenever a message is delivered, the consumer method is called instantly in an event like manner.
The consumer can receive messages synchronously, and we will go over an example regarding this. As we look at the following code example, we block our thread to listen to message deliveries by using the while
loop. In a while
loop, we fetch the next incoming message using the QueueinConsumer
instance. Then we can convert the incoming message body to our custom object type:
importcom.rabbitmq.client.ConnectionFactory; importcom.rabbitmq.client.Connection; importcom.rabbitmq.client.Channel; importcom.rabbitmq.client.QueueingConsumer; publicclassReciever { privatefinalstaticStringQUEUE_NAME="mastering.rabbitmq"; publicstaticvoidmain(String[]argv)throwsException { ConnectionFactoryfactory=newConnectionFactory(); factory.setHost("localhost"); Connectionconnection=factory.newConnection(); Channelchannel=connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); QueueingConsumerconsumer=newQueueingConsumer(channel); channel.basicConsume(QUEUE_NAME,true,consumer); while(true) { QueueingConsumer.Deliverydelivery=consumer.nextDelivery(); Stringmsg=newString(delivery.getBody()); System.out.println("Received Message:"+msg); } } }
The main functional difference between synchronous receiving and asynchronous receiving is blocking. In asynchronous receiving, thread couldn't be blocked by the listening part, so you can do anything with the current thread.
Non-blocking and event driven software systems are very popular for their scalability. Therefore, it is good to use an asynchronous way in the receiving part. RabbitMQ Java API gives us a DefaultConsumer
method to control the deliveries. In the following code example, you can find the inner class that implements the DefaultConsumer
method called handleDelivery
:
importcom.rabbitmq.client.Connection; importcom.rabbitmq.client.Channel; importcom.rabbitmq.client.QueueingConsumer; publicclassReciever { privatefinalstaticStringQUEUE_NAME="mastering.rabbitmq"; publicstaticvoidmain(String[]argv)throwsException { ConnectionFactoryfactory=newConnectionFactory(); factory.setHost("localhost"); Connectionconnection=factory.newConnection(); Channelchannel=connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException { String msg = new String(body); System.out.println("Received Message: " + msg); } }); } }