Spring Cloud Stream provides an abstraction over the messaging infrastructure. The underlying messaging implementation can be RabbitMQ, Redis, or Kafka. Spring Cloud Stream provides a declarative approach for sending and receiving messages:
As shown in the preceding diagram, Cloud Stream works on the concept of a source and a sink. The source represents the sender perspective of the messaging, and sink represents the receiver perspective of the messaging.
In the example shown in the diagram, the sender defines a logical queue called Source.OUTPUT
to which the sender sends messages. The receiver defines a logical queue called Sink.INPUT
from which the receiver retrieves messages. The physical binding of OUTPUT
to INPUT
is managed through the configuration. In this case, both link to the same physical queue—MyQueue
on RabbitMQ. So, while at one end, Source.OUTPUT
points to MyQueue
, on the other end, Sink.INPUT
points to the same MyQueue
.
Spring Cloud offers the flexibility to use multiple messaging providers in one application such as connecting an input stream from Kafka to a Redis output stream, without managing the complexities. Spring Cloud Stream is the basis for message-based integration. The Cloud Stream Modules subproject is another Spring Cloud library that provides many endpoint implementations.
As the next step, rebuild the inter-microservice messaging communication with the Cloud Streams. As shown in the next diagram, we will define a SearchSink
connected to InventoryQ
under the Search microservice. Booking will define a BookingSource
for sending inventory change messages connected to InventoryQ
. Similarly, Check-in defines a CheckinSource
for sending the check-in messages. Booking defines a sink, BookingSink
, for receiving messages, both bound to the CheckinQ
queue on the RabbitMQ:
In this example, we will use RabbitMQ as the message broker:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
booking-service.properties
. These properties bind the logical queue inventoryQ
to physical inventoryQ
, and the logical checkinQ
to the physical checkinQ
:spring.cloud.stream.bindings.inventoryQ.destination=inventoryQ spring.cloud.stream.bindings.checkInQ.destination=checkInQ
search-service.properties
. This property binds the logical queue inventoryQ
to the physical inventoryQ
:spring.cloud.stream.bindings.inventoryQ.destination=inventoryQ
checkin-service.properties
. This property binds the logical queue checkinQ
to the physical checkinQ
:spring.cloud.stream.bindings.checkInQ.destination=checkInQ
Add @EnableBinding
to the Sender
class of the Booking service. This enables the Cloud Stream to work on autoconfigurations based on the message broker library available in the class path. In our case, it is RabbitMQ. The parameter BookingSource
defines the logical channels to be used for this configuration:
@EnableBinding(BookingSource.class) public class Sender {
BookingSource
defines a message channel called inventoryQ
, which is physically bound to RabbitMQ's inventoryQ
, as configured in the configuration. BookingSource
uses an annotation, @Output
, to indicate that this is of the output type—a message that is outgoing from a module. This information will be used for autoconfiguration of the message channel:interface BookingSource { public static String InventoryQ="inventoryQ"; @Output("inventoryQ") public MessageChannel inventoryQ(); }
Source
class that comes with Spring Cloud Stream if the service has only one source and sink:public interface Source { @Output("output") MessageChannel output(); }
BookingSource
. The following code will inject an output message channel with the name inventory
, which is already configured in BookingSource
:@Output (BookingSource.InventoryQ) @Autowired private MessageChannel;
send
message method in BookingSender
:public void send(Object message){ messageChannel. send(MessageBuilder.withPayload(message). build()); }
SearchReceiver
class the same way we did for the Booking service:@EnableBinding(SearchSink.class) public class Receiver {
SearchSink
interface will look like the following. This will define the logical sink queue it is connected with. The message channel in this case is defined as @Input
to indicate that this message channel is to accept messages:interface SearchSink { public static String INVENTORYQ="inventoryQ"; @Input("inventoryQ") public MessageChannel inventoryQ(); }
@ServiceActivator(inputChannel = SearchSink.INVENTORYQ) public void accept(Map<String,Object> fare){ searchComponent.updateInventory((String)fare. get("FLIGHT_NUMBER"),(String)fare. get("FLIGHT_DATE"),(int)fare. get("NEW_INVENTORY")); }
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest server.port=8090
http://localhost:8001
.