The pipes and filters pattern

In many requirements, a single event triggers a workflow (or a sequence of processing steps) as part of the required response to that event. For example, on a ticket payment, we might have to validate the payment, confirm the booking, and email an invoice. Each of the individual actions are independent and sometimes form part of multiple workflows. We need a way to stitch together these processors to enable different workflows.

The pipes and filters architectural pattern aims to provide a solution by dividing a workflow into a sequence of smaller, independent Processors (called filters). Filters are connected by messaging channels, called pipes.

Each filter has a very simple interfaceit gets input messages from one inbound pipe and is supposed to write the output to an output pipe. It encapsulates the processing of the message internally, and can take more data as part of the context during initialization. The output pipe of one filter is connected to the inbound pipe of another filter to compose the workflow.

An example of a workflow that computes the y = x2 + c values is given in the following code.

The emitter filter is the start of the chain, and generates numbers from 0 to a given value. These numbers then flow to an xSquare filter that squares the value and outputs it to another pipe. This then goes as input to the addC filter, which does the last part of the equation processing:

func emitter(till int) <-chan int { 
   out := make(chan int) 
   go func() { 
         for i := 0; i < till; i++ { 
               out <- i 
         } 
         close(out) 
   }() 
   return out 
} 
 
func xSquare(in <-chan int) <-chan int { 
   out := make(chan int) 
   go func() { 
         for x := range in { 
               out <- x * x 
         } 
         close(out) // close forward 
   }() 
   return out 
} 
 
func addC(in <-chan int, c int) <-chan int { 
   out := make(chan int) 
   go func() { 
         for x := range in { 
               out <- x + c 
         } 
         close(out) // close forward 
   }() 
   return out 
} 
 
func main() { 
   // y = x*x + c 
   out := addC( 
         xSquare(emitter(3)), 
         5) 
 
   for y := range out { 
         fmt.Println(y) 
   } 
 
    
 
} 

Once this is coded, it is easy to extend this to say y = x4 + csee the following code:

             // y = x*x*x*x + c 
 
   out1 := addC( 
         xSquare(xSquare(emitter(3))), 
         5) 
 
   for y := range out1 { 
         fmt.Println(y) 
   } 

This will print the following:

5
6
21 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset