RabbitMQ connector

RabbitMQ is a widely used, distributed, high-performance message queuing system. It is used as a message delivery system for high-throughput operations. It allows you to create a distributed message queue and include publishers and subscribers in the queue. For more information about RabbitMQ, visit https://www.rabbitmq.com/.

Flink supports fetching and publishing data to and from RabbitMQ. It provides a connector that can act as a data source for data streams.

For the RabbitMQ connector to work, you must provide the following information:

  • RabbitMQ: Configurations such as host, port, user credentials, and so on.
  • Queue: The RabbitMQ queue name that you wish to subscribe to.
  • Correlation IDs: This is a RabbitMQ feature used for correlating the request and response by a unique ID in a distributed world. The Flink RabbitMQ connector provides an interface to set this to true or false depending on whether you are using it or not.
  • Deserialization schema: RabbitMQ stores and transports the data in a serialized manner to avoid network traffic. So when the message is received, the subscriber knows how to deserialize the message. The Flink connector provides us with some default deserializers, such as the string deserializer.

The RabbitMQ source provides us with the following options on stream deliveries:

  • Exactly once: Using RabbitMQ correlation IDs and the Flink checkpointing mechanism with RabbitMQ transactions
  • Atleast once: When Flink checkpointing is enabled but RabbitMQ correlation IDs are not set

There are no strong delivery guarantees with the RabbitMQ auto-commit mode.

Now let's write a code to get this connector working. Like other connectors, add a Maven dependency to the code:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.11/artifactId>
<version>1.1.4</version>
</dependency>

The following snippet shows how to use the RabbitMQ connector in Java:

//Configurations
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(<host>).setPort(<port>).setUserName(..)
.setPassword(..).setVirtualHost("/").build();

//Get Data Stream without correlation ids
DataStream<String> streamWO = env.addSource(new
RMQSource<String>(connectionConfig, "my-queue", new SimpleStringSchema()))
.print

//Get Data Stream with correlation ids
DataStream<String> streamW = env.addSource(new
RMQSource<String>(connectionConfig, "my-queue", true, new
SimpleStringSchema()))
.print

Similarly, in Scala the code can be written as follows:

val connectionConfig = new RMQConnectionConfig.Builder()
.setHost(<host>).setPort(<port>).setUserName(..)
.setPassword(..).setVirtualHost("/").build()
streamsWOIds = env.addSource(new RMQSource[String](connectionConfig, " my-queue", new SimpleStringSchema))
.print
streamsWIds = env.addSource(new RMQSource[String](connectionConfig, "my-queue", true, new SimpleStringSchema))
.print

You may also use the RabbitMQ connector as a Flink sink.

To send processes back to some different RabbitMQ queue, provide three important configurations:

  • RabbitMQ configurations
  • Queue name – where to send back the processed data
  • Serialization schema – schema for RabbitMQ to convert the data into bytes

The following is sample code in Java to show how to use this connector as a Flink sink:

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(<host>).setPort(<port>).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
stream.addSink(new RMQSink<String>(connectionConfig, "target-queue", new StringToByteSerializer()));

The same can be done in Scala:

val connectionConfig = new RMQConnectionConfig.Builder()
.setHost(<host>).setPort(<port>).setUserName(..)
.setPassword(..).setVirtualHost("/").build()
stream.addSink(new RMQSink[String](connectionConfig, "target-queue", new StringToByteSerializer
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset