Let's see the following service class that is responsible for writing to Kafka in our application:
@Servicepublic class NotificationService {
private final NotificationStreams notificationStreams;
public NotificationService(NotificationStreams notificationStreams) {
super();
this.notificationStreams = notificationStreams;
}
public void sendNotification(final Notification notification) {
MessageChannel messageChannel = notificationStreams.notifyTo();
messageChannel.send(MessageBuilder.withPayload(notification)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}
In the preceding service class, the sentNotification() method uses an injected NotificationStreams object to send message represented by the Notification object in our application. Let's see the following Controller class that will trigger sending the message to Kafka.