The fan-in pattern

There is a requirement to source messages from multiple sources and do some processing. It is not guaranteed which source will have a message ready at a given time. If we were to process all sources in a loop, the loop will block for a source that does not have a message. It is possible to check availability and add timeouts, but this causes extra complications in the code.

The ideal solution is to merge the messages into one fanIn channel, which can then be used for processing. The following code snippet demonstrates this pattern:

func main() { 
   c := fanIn(emitter("Source1"), emitter("Source2")) 
 
   for i := 0; i < 10; i++ { 
         fmt.Println(<-c) // Display the output of the FanIn channel. 
   } 
 
} 
 
// this combines the sources to a Fan-In channel 
func fanIn(input1, input2 <-chan string) <-chan string { 
   c := make(chan string) // The FanIn channel 
 
   // to avoid blocking, listen to the input channels in separate goroutines 
   go func() { 
         for { 
               c <- <-input1 // Write the message to the FanIn channel, Blocking Call. 
         } 
   }() 
 
   go func() { 
         for { 
               c <- <-input2 // Write the message to the FanIn channel, Blocking Call. 
         } 
   }() 
 
   return c 
} 
 
// dummy function for a source 
func emitter(name string) <-chan string { 
   c := make(chan string) 
 
   go func() { 
         for i := 0; ; i++ { 
               c <- fmt.Sprintf("[%s] says %d", name, i) 
               time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Sleep for some time 
         } 
   }() 
 
   return c 
} 

We could have also used the select keyword for doing the fanIn and, in fact, this is more idiomatic Go. The fanIn with select looks like this:

func fanInSelect(input1, input2 <-chan string) <-chan string { 
   out := make(chan string) 
   go func() { 
         for { 
               select { 
               case in := <-input1: 
                     out <- in 
               case in := <-input2: 
                     out <- in 
               } 
         } 
   }() 
   return out 
} 

The fanIn pattern can be combined with the request-reply pattern to enable sequencing between the input of the fanIn channel. After sending a message, each source would block on its own Boolean channel, which is passed as part of the message to the fanIn channel. The Source is then effectively stalled until the fanIn processor unblocks it by signaling on the channel.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset