Consuming messages

Sarama provides a low-level message-consumption API, but there are higher-level libraries that provide the group coordination based on the load-balancing described in the Apache Kafka deep-dive section. The library is called Sarama (https://github.com/bsm/sarama-cluster).

This library requires Kafka v0.9+ and follows the protocol described at https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design.

The main interface is a channel on which messages are received.

A sample program is shown here:

package main 
import ( 
   "fmt" 
   cluster "github.com/bsm/sarama-cluster" 
   "log" 
   "os" 
   "os/signal" 
) 
 
func main() { 
 
   // setup config, enable errors and notifications 
   config := cluster.NewConfig() 
   config.Consumer.Return.Errors = true 
   config.Group.Return.Notifications = true 
 
   // specify Broker co-ordinates and topics of interest 
   brokers := []string{"localhost:9092"} 
   topics := []string{"topic_a", "topic_b"} 
 
   // connect, and register specifying the consumer group name 
   consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) 
   if err != nil { 
         panic(err) 
   } 
   defer consumer.Close() 
 
   // process errors 
   go func() { 
         for err := range consumer.Errors() { 
               log.Printf("Error: %s
", err.Error()) 
         } 
   }() 
 
   // process notifications 
   go func() { 
         for ntf := range consumer.Notifications() { 
               log.Printf("Rebalanced: %+v
", ntf) 
         } 
   }() 
 
   // process messages 
   for msg := range consumer.Messages() { 
         fmt.Fprintf(os.Stdout, "%s-%d-%d-%s-%s
", 
               msg.Topic, 
               msg.Partition, 
               msg.Offset, 
               msg.Key, 
               msg.Value) // <- Actually process message here 
 
         consumer.MarkOffset(msg, "") // Commit offeset for this  message 
 
   } 
} 

Most of the code before the for loop is setup code. The main consumption happens in the iteration over the channel returned by consumer.Messages(). Within this loop, whenever a new message is available, it will be delivered encapsulated by the msg object. After processing, the client can use the consumer.MarkOffset() call to commit the offset to Kafkahence declaring that the consumer (group) has processed the message and, does not want to see it again. Typically, a handler function will be used to process the message and if the handler is successful, only then commit the message. For long-lived processing, we can ship the msg object itself to a handler functionand the message may be acknowledged later downstream. The second parameter to consumer.MarkOffset() indicates consumer state at the time of commit. This can be used as a checkpoint to restore consumption from a specific point.

Samara has tuneables to ensure that back-pressure can be applied so that consumers can process data at the rate they are comfortable with. Some of them are as follows:

  • Config.Consumer.MaxWaitTime: This defines how long the consumer should wait before going to the Broker for more messages.
  • Config.ChannelBufferSize: This defines the size of the buffered channels (input and output).
  • Config.Consumer.MaxProcessingTime: This defines the time required for processing message at a consumer.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset