Sarama provides a low-level message-consumption API, but there are higher-level libraries that provide the group coordination based on the load-balancing described in the Apache Kafka deep-dive section. The library is called Sarama (https://github.com/bsm/sarama-cluster).
The main interface is a channel on which messages are received.
A sample program is shown here:
package main import ( "fmt" cluster "github.com/bsm/sarama-cluster" "log" "os" "os/signal" ) func main() { // setup config, enable errors and notifications config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true // specify Broker co-ordinates and topics of interest brokers := []string{"localhost:9092"} topics := []string{"topic_a", "topic_b"} // connect, and register specifying the consumer group name consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) if err != nil { panic(err) } defer consumer.Close() // process errors go func() { for err := range consumer.Errors() { log.Printf("Error: %s ", err.Error()) } }() // process notifications go func() { for ntf := range consumer.Notifications() { log.Printf("Rebalanced: %+v ", ntf) } }() // process messages for msg := range consumer.Messages() { fmt.Fprintf(os.Stdout, "%s-%d-%d-%s-%s ", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) // <- Actually process message here consumer.MarkOffset(msg, "") // Commit offeset for this message } }
Most of the code before the for loop is setup code. The main consumption happens in the iteration over the channel returned by consumer.Messages(). Within this loop, whenever a new message is available, it will be delivered encapsulated by the msg object. After processing, the client can use the consumer.MarkOffset() call to commit the offset to Kafka—hence declaring that the consumer (group) has processed the message and, does not want to see it again. Typically, a handler function will be used to process the message and if the handler is successful, only then commit the message. For long-lived processing, we can ship the msg object itself to a handler function—and the message may be acknowledged later downstream. The second parameter to consumer.MarkOffset() indicates consumer state at the time of commit. This can be used as a checkpoint to restore consumption from a specific point.
Samara has tuneables to ensure that back-pressure can be applied so that consumers can process data at the rate they are comfortable with. Some of them are as follows:
- Config.Consumer.MaxWaitTime: This defines how long the consumer should wait before going to the Broker for more messages.
- Config.ChannelBufferSize: This defines the size of the buffered channels (input and output).
- Config.Consumer.MaxProcessingTime: This defines the time required for processing message at a consumer.