The Sync producer

Samara also has a blocking producer interface to wait until the message is reliably delivered. Its usage is similar to AsyncProducer, but instead of channels, there is a blocking SendMessage() method to be called.

The following code snippet illustrates the usage:

package main 
import ( 
func main() { 
   // Config 
   config := sarama.NewConfig() 
   config.Producer.RequiredAcks = sarama.WaitForAll 
   config.Producer.Retry.Max = 5 
   config.Producer.Return.Errors = true  // For sync producer this needs to be true 
   config.Producer.Return.Success = true // For sync producer this needs to be true 
   // Connect to a Kafka broker running locally 
   brokers := []string{"localhost:9092"} 
   producer, err := sarama.NewSyncProducer(brokers, config) 
   if err != nil { 
   // cleanup 
   defer func() { 
         if err := producer.Close(); err != nil { 
   msg := &sarama.ProducerMessage{ 
         Topic: "currentTime", 
         Value: sarama.StringEncoder(strconv.Itoa(int(time.Now().Unix()))), 
   partition, offset, err := producer.SendMessage(msg) 
   if err != nil { 
         fmt.Printf("FAILED to publish message: %s
", err) 
   } else { 
         fmt.Printf("message sent | partition(%d)/offset(%d)
", partition, offset) 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.