Chapter 10. Paral lel Programming

Recently parallel programming has moved from being a relatively obscure topic, practiced only by specialist developers, to a more mainstream endeavor. This is due to the increasing prevalence of multicore processors. At the time of writing, it is almost impossible buy a PC with a single core processor—the standard is dual core—and quad core processors are beginning to arrive in the shops. It is fully expect that this trend will continue in the years to come.

To a certain extent, this interest in parallel programming has driven the renewed interest in functional programming. Functional programming is certainly not a silver bullet for all parallel programming problems, but it can help you design your software so it executes in parallel. In this chapter, you will learn about some of the simpler techniques to help your software execute in parallel, as well as how to take advantage of several processors.

It's often helpful to break down parallel programming into several smaller subtopics, all of which you'll learn about this chapter:

Threads, Memory, Locking, and Blocking:

You'll learn about basic techniques for creating and controlling threads in .NET programming. You'll also take a quick look at how you can share resources (such as memory) between threads, as well as how you can control access to these shared resources.

Reactive programming:

It's often important to the user experience that programs remain reactive to input. To do this, it's important that you avoid doing too much processing on the thread responsible for reacting to user input. This is particularly relevant to GUI programming, but it can also apply to a server that needs to stay responsive to incoming requests.

Data parallelism:

This term refers to executing one piece of code concurrently on several processors with varying input data. This is a good way to parallelize the processing of large data structures such as collections. It's often possible to apply a transformation to several items in a collection in parallel, which will generally speed the overall execution time. The classic example of this is the parallel map, which provides one of the simplest ways to parallelize a functional program.

Asynchronous programming:

Some tasks, particularly I/O, need to happen asynchronously to make program execution efficient. It is important that threads are not blocked for long periods while IO takes place.

Message passing:

This technique is more formally referred to as the actor model. You use it to coordinate tasks that execute in parallel. This is the most advanced parallel-programming topic covered in this chapter.

Parallel programming is a large topic, so this chapter won't be exhaustive, but it will provide some straightforward ways to help you get started with parallel programming in F#.

Threads, Memory, Locking, and Blocking

If you are serious about parallel programming, it's worth investing time to understand threads and memory. In this section, you'll take a look at explicitly creating threads and how to control their access to shared resources, such as memory. My advice is to avoid explicitly creating and managing threads like this; however, when using the other parallel programming techniques, it's useful to understand the underlying threading concepts.

When a program is executed, the operating system creates a process to execute it. The process represents the resources that are allocated to the program, most notably the memory allocated to it. A process has one or more threads that are responsible for executing the program's instructions and share the process memory. In .NET, a program starts with one thread to execute the programs code. To create an extra thread in F#, you use the System.Threading.Thread class. The Thread class's constructor takes a delegate that represents the function the thread will start executing. Once a Thread class has been constructed, it does not start executing automatically, you must call its Start method. The following example demonstrates how to create and start a new thread:

open System.Threading

let main() =
    // create a new thread passing it a lambda function
    let thread = new Thread(fun () ->
        // print a message on the newly created thread
        printfn "Created thread: %i" Thread.CurrentThread.ManagedThreadId)
    // start the new thread
    thread.Start()
    // print an message on the original thread
    printfn "Orginal thread: %i" Thread.CurrentThread.ManagedThreadId
    // wait of the created thread to exit
    thread.Join()

do main()

Compiling and executing the preceding program will output results similar to this:

Orginal thread: 1
Created thread: 3

You should look at a couple important things in this example. First, notice that the original thread prints its message before the second thread does. This is because calling a thread's Start method does not immediately start the thread; rather, it schedules a new thread for execution and the operating system chooses when to start it. Normally, the delay will be short, but as the original thread will continue to execute, it's probable that the original thread will execute a few instructions before the new thread starts executing. Second, notice how you use the thread's Join function to wait for it to exit. If you did not do this, it is highly probable that the original thread would finish executing before the second thread had a chance to start. While the original thread is waiting for the create thread to do its work, you say that it is blocked. Threads can become blocked for a number of reasons. For example, they might be waiting on a lock, or might be waiting for I/O to complete. When a thread becomes blocked, the operating system switches to the next runnable thread; this is called a context switch. You'll learn about locking in the next section; in this section, you'll look at blocking I/O operations in asynchronous programming.

Any resource that can be updated by two different threads at the same time is at risk of being corrupted. This is because a thread can context switch at any time, leaving operations that should have been atomic half done. To avoid corruption, you need to use locks. A lock, sometimes referred to as a monitor, is a section of code where only one thread can pass through it at a time. In F#, we use the lock function to create and control locking. You do this by locking on an object; the idea is that, as soon as the lock is taken, any thread attempting to enter the section of code will be blocked until the lock is released by the thread that holds it. Code protected in this way is sometimes called a critical section. You achieve this calling System.Threading.Monitor.Enterat the start of the code that you want to protect and System. Threading.Monitor.Exit at the end of that code. You must guarantee that Monitor.Exitis called, or this could lead to threads being locked forever. The lock function is a nice way to ensure that Monitor.Exit is always called if Monitor.Enterhas been called. This function takes two parameters: the first is the object you want to lock on, while the second is a function that contains the code you want to protect. This function should take unit as its parameter, and it can return any value.

The following example demonstrates the subtle issues involved in locking. The code to accomplish the lock needs to be quite long, and this example has been deliberately written to exaggerate the problem of context switching. The idea behind this code is this: if two threads run at the same time, both try to write the console. The aim of the sample is to write the string "One ... Two ... Three ..." to the console atomically; that is, one thread should be able to finish writing its message before the next one starts. The example has a function, called makeUnsafeThread, that creates a thread that won't be able to write to the console atomically and a second thread, makeSafeThread, that writes to the console atomically by using a lock:

open System
open System.Threading

// function to print to the console character by character
// this increases the chance of there being a context switch
// between threads.
let printSlowly (s : string) =
    s.ToCharArray()
    |> Array.iter (printf "%c")
    printfn ""

// create a thread that prints to the console in an unsafe way
let makeUnsafeThread() =
    new Thread(fun () ->
    for x in 1 .. 100 do
        printSlowly "One ... Two ... Three ... ")

// the object that will be used as a lock
let lockObj = new Object()
// create a thread that prints to the console in a safe way
let makeSafeThread() =
     new Thread(fun () ->
        for x in 1 .. 100 do
            // use lock to ensure operation is atomic
            lock lockObj (fun () ->
                printSlowly "One ... Two ... Three ... "))

// helper function to run the test to
let runTest (f: unit -> Thread) message =
     printfn "%s" message
     let t1 = f() in
     let t2 = f() in
     t1.Start()
     t2.Start()
     t1.Join()
     t2.Join()

// runs the demonstrations
let main() =
    runTest
         makeUnsafeThread
        "Running test without locking ..."
    runTest
        makeSafeThread
        "Running test with locking ..."

do main()

The part of the example that uses the lock is repeated next to highlight the important points. You should note a couple of important factors. First, you use the declaration of the lockObj to create the critical section. Second, you embed your use of the lock function in the makeSafeThread function. The most important thing to notice is how, when printing the functions you want to be atomic, you place them inside the function you want to pass to lock:

// the object that will be used as a lock
let lockObj = new Object()

// create a thread that prints to the console in a safe way
let makeSafeThread() =
    new Thread(fun () ->
        for x in 1 .. 100 do
            // use lock to ensure operation is atomic
            lock lockObj (fun () ->
                printSlowly "One ... Two ... Three ... "))

The results of the first part of the test will vary each time it runs because it depends on when a thread context switches. It might also vary based on the number of processors, because multiple threads can run at the same time if a machine has two or more processors, so the messages will be more tightly packed together. On a single-processor machine, the output will be less tightly packed together because printing a message will go wrong only when a content switch takes place. The results of the first part of the sample, run on a dual-processor machine, look like this:

Running test without locking ...
...
One ... Two ... Three ...
One One ... Two ... Three ...
One ... Two ... Three ...
...

The lock means that the results of the second half of the example will not vary at all, so they will always look like this:

Running test with locking ...
One ... Two ... Three ...
One ... Two ... Three ...
One ... Two ... Three ...
...

Locking is an important aspect of concurrency. You should lock any resource that you write to and share between threads. A resource is often a variable, but it can also be a file or even the console, as shown in this example. Although locks can provide a solution to concurrency, they also can also create problems of their own because they can create a deadlock. A deadlock occurs when two or more different threads lock resources that the other thread needs and neither can advance. The simplest solution to concurrency is often to avoid sharing a resource that different threads can write to. In the rest of this chapter, you'll look at solutions for creating parallel programs that do not rely on explicitly creating locks.

Note

This book provides an extremely brief introduction to threading. You will need to learn much more about threading if you want to become good at parallel programming. A good place to start is the MSDN section on managed threads: http://msdn.microsoft.com/en-us/library/hyz69czz.aspx. You might also find this tutorial useful: http://www.albahari.com/threading/.

Reactive Programming

Reactive programming refers to the practice of ensuring you're your programs react to events or input. In this section, you'll concentrate on reactive programming in terms of GUI programming; GUIs should always be reactive. However, other styles of programming also need to take reactive programming into account. For example, programs running on servers often need to stay reactive to input, even as they process other, longer running tasks. You must also apply some of the techniques discussed here to server programming, as you'll see when you implement a chat server in Chapter 11.

Most GUI libraries use an event loop to handle drawing the GUI and the interactions with the user. This means that one thread takes care of drawing the GUI and raising all the events on it. You refer to this thread as the GUI thread. Another consideration: You should update GUI objects only with the GUI thread; you want to avoid creating situations where other threads can corrupt the state of GUI objects. This means that computations or IO operations that take a significant amount of time should not take place on the GUI thread. If the GUI thread is involved with a long-running computation, it cannot process interactions from the user, nor can it draw the GUI. This is the number one cause of unresponsive GUIs.

You can see this in action in the following example that creates a GUI that could easily become unreactive because it tries to do too much computation on the GUI thread. This example also illustrates how to ensure your GUI remains reactive by making a few simple changes. You will look primarily at a useful abstraction called the BackgroundWorker class, which you find in the System.ComponentModel namespace. This useful class allows you to run some work, raising a notification event when this work is complete. This is especially useful for GUI programming because the completed notification is raised on the GUI thread. This helps you enforce the rule that GUI objects should only be altered from the thread that created them.

Specifically, the example creates a GUI for calculating the Fibonacci numbers using the simple Fibonacci calculation algorithm you saw in Chapter 7:

module Strangelights.Extensions
let fibs =
    (1I,1I) |> Seq.unfold
        (fun (n0, n1) ->
            Some(n0, (n1, n0 + n1)))

let fib n = Seq.nth n fibs

Creating a simple GUI for this calculation is straightforward; you can do this using the WinForms GUI toolkit you saw in Chapter 8:

open Strangelights.Extensions
open System
open System.Windows.Forms

let form =
    let form = new Form()
    // input text box
    let input = new TextBox()
     // button to launch processing
    let button = new Button(Left = input.Right + 10, Text = "Go")
     // label to display the result
    let output = new Label(Top = input.Bottom + 10, Width = form.Width,
                           Height = form.Height - input.Bottom + 10,
                           Anchor = (AnchorStyles.Top ||| AnchorStyles.Left |||
                                     AnchorStyles.Right ||| AnchorStyles.Bottom))
// do all the work when the button is clicked
    button.Click.Add(fun _ ->
        output.Text <- Printf.sprintf "%A" (fib (Int32.Parse(input.Text))))
    // add the controls
    let dc c = c :> Control
    form.Controls.AddRange([|dc input; dc button; dc output |])
    // return the form
    form

// show the form
do Application.Run(form)

Executing this example creates the GUI you see in Figure 10-1.

A GUI for the Fibonacci numbers

Figure 10.1. A GUI for the Fibonacci numbers

This GUI lets you display the results of your calculation in a reasonable way; unfortunately, your GUI becomes unreactive as soon as the calculation starts to take a long time. The code is responsible for the unresponsiveness:

// do all the work when the button is clicked
button.Click.Add(fun _ ->
     output.Text <- Printf.sprintf "%A" (fib (Int32.Parse(input.Text))))

This code means that you do all the calculation on the same thread that raised click event: the GUI thread. This means that it is the GUI thread that is responsible for making the calculations, and it cannot process other events while it performs the calculation.

It's fairly easy to fix this using the background worker:

open Strangelights.Extensions
open System
open System.ComponentModel
open System.Windows.Forms

let form =
    let form = new Form()
     // input text box
    let input = new TextBox()
     // button to launch processing
    let button = new Button(Left = input.Right + 10, Text = "Go")
     // label to display the result
    let output = new Label(Top = input.Bottom + 10, Width = form.Width,
                           Height = form.Height - input.Bottom + 10,
                           Anchor = (AnchorStyles.Top ||| AnchorStyles.Left |||
                                     AnchorStyles.Right ||| AnchorStyles.Bottom))

    // create and run a new background worker
     let runWorker() =
         let background = new BackgroundWorker()
         // parse the input to an int
        let input = Int32.Parse(input.Text)
         // add the "work" event handler
         background.DoWork.Add(fun ea ->
             ea.Result <- fib input)
         // add the work completed event handler
        background.RunWorkerCompleted.Add(fun ea ->
            output.Text <- Printf.sprintf "%A" ea.Result)
        // start the worker off
        background.RunWorkerAsync()

    // hook up creating and running the worker to the button
    button.Click.Add(fun _ -> runWorker())
    // add the controls
    let dc c = c :> Control
    form.Controls.AddRange([|dc input; dc button; dc output |])
    // return the form
    form

// show the form
do Application.Run(form)

Using the background worker imposes few changes on the code. You do need to split the code does between the DoWork and the RunWorkerCompleted events, and this means you need to write slightly more code, but this will never require more than a few extra lines. Let's step though the required code changes; begin by creating a new instance of the background worker class:

let background = new BackgroundWorker()

You place the code that you need to happen in the background on a different thread—in the DoWork event. You also need to be careful that you extract any data you need from controls outside of the DoWork event. Because this code happens on a different thread, letting that code interact with the GUI objects would break the rule that they should only be manipulated by the GUI thread. You can see the code you use to read the integer and wire up the DoWork event here:

// parse the input to an int
let input = Int32.Parse(input.Text)
// add the "work" event handler
background.DoWork.Add(fun ea ->
        ea.Result <- fib input)

In the preceding example, you extract the input integer from the text box and parse it just before adding the event handler to the DoWork event. Next, the lambda function you added to the DoWork event captures the resulting integer. You should place the result that interests you in the DoWork event's Result property of the event argument. You can then recover the value in this property can then be recovered in the RunWorkerCompleted event. It two has a result property that you can see in the following code:

// add the work completed event handler
background.RunWorkerCompleted.Add(fun ea ->
        output.Text <- Printf.sprintf "%A" ea.Result)

You can be certain that the RunWorkerCompleted event runs on the GUI thread, so it is fine to interact with GUI objects. You've wired up the events, but you have a couple tasks remaining. First, you need to start the background worker:

// start the worker off
background.RunWorkerAsync()

Second, you need to add all this code to the button's Click event. You've wrapped the preceding code in a function called runWorker(), so it's a simple matter of calling this code in the event handler:

// hook up creating and running the worker to the button
button.Click.Add(fun _ -> runWorker())

Notice how this means you create a new background worker each time the button is clicked. This happens because a background worker cannot be reused once it's in use.

Now the GUI remains reactive no matter how many times someone clicks the Go button. This does leads to some other problems; for example, it's fairly easy to set off two calculations that will take some time to complete. If this happens, the results of both are placed in the same result label, so the user might have no idea which one finished first and is being displayed at the time she sees it. Your GUI remains reactive, but it's not well adapted to this multithreaded style of programming. One option might be to disable all the controls while the calculation takes place. This might be appropriate for a few case, but it's not a great option overall because it means the user can take little advantage of your reactive GUI. A better option is to create a system capable of displaying multiple results, along with their initial parameters. Ensuring that the user knows what a given result means. This example uses a data grid view to display the results:

open Strangelights.Extensions
open System
open System.ComponentModel
open System.Windows.Forms
open System.Numerics

// define a type to hold the results
type Result =
    { Input: int;
      Fibonacci: BigInteger; }

let form =
    let form = new Form()
     // input text box
    let input = new TextBox()
    // button to launch processing
    let button = new Button(Left = input.Right + 10, Text = "Go")
     // list to hold the results
     let results = new BindingList<Result>()
    // data grid view to display multiple results
    let output = new DataGridView(Top = input.Bottom + 10, Width = form.Width,
                                  Height = form.Height - input.Bottom + 10,
                                  Anchor = (AnchorStyles.Top ||| AnchorStyles.Left |||
                                            AnchorStyles.Right |||
AnchorStyles.Bottom),
                                  DataSource = results)

     // create and run a new background worker
     let runWorker() =
         let background = new BackgroundWorker()
        // parse the input to an int
         let input = Int32.Parse(input.Text)
         // add the "work" event handler
         background.DoWork.Add(fun ea ->
             ea.Result <- (input, fib input))
         // add the work completed event handler
        background.RunWorkerCompleted.Add(fun ea ->
             let input, result = ea.Result :?> (int * BigInteger)
             results.Add({ Input = input; Fibonacci = result; }))
         // start the worker off
        background.RunWorkerAsync()
// hook up creating and running the worker to the button
    button.Click.Add(fun _ -> runWorker())
    // add the controls
    let dc c = c :> Control
    form.Controls.AddRange([|dc input; dc button; dc output |])
    // return the form
    form

// show the form
do Application.Run(form)

You can see this new GUI in Figure 10-2.

A GUI that is better adapted to multi-threaded programming

Figure 10.2. A GUI that is better adapted to multi-threaded programming

Data Parallelism

Data parallelism relies on executing a single function in parallel with varying data inputs. This breaks work into discrete units, so it can be processed in parallel, on separate threads, ensuring that work can be partitioned between the available processors.

Typically this means processing a collection of data in parallel. This method takes advantage of the fact that the items in the collection provide a natural way to partition the work. In the simplest case, a parallel map function, you apply a transformation to each item in the collection, and the results form a new collection. This simple case generally works because each item in the collection can typically be processed independently and in any order. It's also possible to use this technique to handle more complex scenarios, such as summing all the items in a list; however, it can also prove tricky for some complex cases, and the processing order can take on added significance.

Data parallelism typically relies on libraries and frameworks to provide parallel processing. Although they use multiple threads or processes to provide the parallelism, parallelism doesn't typically require the user to create or control these threads explicitly; instead, it's the job of the library or framework to do this. Work units can be distributed between different physical machines that form a computing grid; for the sake of simplicity and because multicore systems are becoming more common and powerful, this chapter will concentrate on systems where work is distributed between multiple processors of a single physical machine. Microsoft is working on providing new parallel programming facilities in the .NET framework, which will be available in version of the .NET framework 4.0. A few other libraries exist that implement data and task parallelism on the .NET platform, but this chapter will concentrate on what's available in .NET 4.0.

You have two main ways to achieve data parallelism in .NET 4.0: you can use the System.Threading.Parallel class available in mscorlib.dll, or you can use the System.Linq.ParallelEnumerable class available in System.Core.dll. The System.Threading.Parallel class is perfectly usable from F#, but the System.Linq.ParallelEnumerable class is probably the preferred way for F# programmers to achieve data parallelism because this library is written much more in a functional style.

Let's start with a quick example that illustrates how to use the System.Threading.Parallel class's parallel For, and then discuss why you probably don't want to use it. Assume that you want to write out the integers from 0 to 99 in parallel, as in this program:

open System.Threading

Parallel.For(0, 100, (printfn "%i"))

When executed on my dual-core machine, the preceding code produces the following output:

0
13

8
9
6
7
14
10
...

The numbers from the loop appear in a non deterministic order. This is because they were executed in parallel, and the threads assigned to execute each function are scheduled at random. The problem with using this kind of function in F# is that typically it will rely on a side effect at some point. It's easy to create functions with side effects in F#, but it can be undesirable when dealing with concurrency because it introduces problems. Even in this simple example, you face a risk that the numbers won't print atomically, but instead become mixed up as they print out. For example, you might see output like this:

...
6
174
10
...

Here two numbers form a three digit number that didn't exist in the collection.

Bearing that in mind, the System.Threading.Parallel class can still be useful on some occasions.

Imagine you need to send out a large number of emails, and you want to send these emails concurrently.

You might choose to parallelize this with a parallel ForEach from the Parallel class because sending an email is I/O and therefore a side effect:

open System.Threading

let emails = [ "[email protected]"; "[email protected]";
               "[email protected]"; "[email protected]" ]

Parallel.ForEach(emails, (fun addr ->
    // code to create and send email goes here
    ()))

Even in this simple example, you need to ensure that you can call any function called inside Parallel.For from multiple threads.

The System.Linq.ParallelEnumerable class is much more promising for easily parallelizing F# programs. Essentially, it is a parallel implementation of the functions available in F#'s Seq module. Because there are plenty of name changes between Seq module and ParallelEnumerable class, it's common to create a thin wrapper of ParallelEnumerable to make it feel more like the Seq module, as shown in this code:

namespace Strangelights.Extensions
open System
open System.Linq

// Import a small number of functions from ParallelLinq
module PSeq =
    // helper function to convert an ordinary seq (IEnumerable) into
a IParallelEnumerable
    let asParallel list: IParallelEnumerable<_> = ParallelQuery.AsParallel(list)
    // the parallel map function we going to test
    let map f list = ParallelEnumerable.Select(asParallel list, new Func<_, _>(f))

    // other parallel functions you may consider using
    let reduce f list = ParallelEnumerable.Aggregate(asParallel list, new
Func<_, _, _>(f))
    let fold f acc list = ParallelEnumerable.Aggregate(asParallel list, acc,
Data Parallelism
new Func<_, _, _>(f))

You can use the finished version of this code to replace calls to functions from the Seq module with their equivalent calls from the PSeq wrapper module and expect your program to go faster in most cases—you will find some circumstances where it might be slower. For example, this code might execute more slowly for short lists where a relatively small amount of work is required for each item. You can see this at work in a micro benchmark for a parallel map function by comparing your parallel map function to normal map function. To do this, you vary both the size of the input list and the amount of work performed for each item in the input list.

Note

Micro benchmarking is a useful tool for helping you to understand the performance consequences of a small section of code. Vance Morrison has a nice MSDN article on how to run micro benchmarks on the .NET platform at http://msdn.microsoft.com/en-us/magazine/cc500596.aspx.

The following example shows how you might do this:

open System.Diagnostics
open Strangelights.Extensions

// the number of samples to collect
let samples = 5
// the number of times to repeat each test within a sample
let runs = 100

// this function provides the "work", by enumerating over a
// collection of a given size
let addSlowly x =
    Seq.fold (fun acc _ -> acc + 1) 0 (seq { 1 .. x })

// tests the sequentual map function by performing a map on a
// a list with the given number of items and performing the given
// number of opertions for each item.
// the map is then iterated, to force it to perform the work.
let testMap items ops =
    Seq.map (fun _ -> addSlowly ops) (seq { 1 .. items })
    |> Seq.iter (fun _ -> ())

// test the parallel map function, works as above
let testPMap items ops =
    PSeq.map (fun _ -> addSlowly ops) (seq { 1 .. items })
    |> Seq.iter (fun _ -> ())

// a test harness function, takes a function and passes it the give
let harness f items ops =
    // run once to ensure everything is JITed
    f items ops
    // collect a list of results
    let res =
        [ for _ in 1 .. samples do
                let clock = new Stopwatch()
                clock.Start()
                for _ in 1 .. runs do
f items ops
                 clock.Stop()
                yield clock.ElapsedMilliseconds ]
     // calculate the average
    let avg = float (Seq.reduce (+) res) / (float samples)
    // output the results
    printf "Items %i, Ops %i," items ops
    Seq.iter (printf "%i,") res
    printfn "%f" avg

// the parameters to use
let itemsList = [ 10; 100; 200; 400; 800; 1000 ]
let opsList = [ 1; 10; 100; 200; 400; 800; 1000 ]

// test the sequential function
for items in itemsList do
    for ops in opsList do
        harness testMap items ops

// test the parallel function
for items in itemsList do
    for ops in opsList do
        harness testPMap items ops

Before you examine the results of the tests, it's probably worth looking at the micro benchmarking code in a bit more detail because it can help you understand the results better. Perhaps the most important function is harness, which has the job of running the test code. You need to keep a couple things in mind when setting up the test. First, you always run each test 100 times when measuring a test result. You do this because some of the tests on small list can run exceptionally fast, so running them only run once might make it hard to measure how long they take. Running the test repeatedly helps you avoid this problem. Second, you can always create a list of five results, then take the average time of this list. This is because other background processes on the computer can affect some tests Running the test several times and taking an average time helps you avoid this. You could also take this technique further and compute the standard deviation, which would highlight any tests were significantly longer or shorter than others.

The other interesting function is testMap, which has the job of providing the work for the map function to do. You want to vary two things: the number of items each input list has and the amount of processing that each item in the list will take. The testMap function achieves this through two parameters: items and ops. The items parameter is the number of items in a list that the map function must process, and the ops parameter is the number of operations on each item that must be performed. It's also worth noting that, because your Seq.map and PSeq.map function are both lazy, you need to force them by iterating over the resulting list. Iterating over the list will cause the lazy sequence to be created and evaluated. If you did not do this, you could expect to see a small and constant time result. It would show only the time it takes to create an object that is capable of generating the list, but not the generation of the list itself. You force the generation of the list by iterating over it using the Seq.iter function.

Now you're ready to look at the results themselves (see Table 10-1).

Table 10.1. The results of the sequence micro benchmark

Items

10

100

200

400

800

Ops

Serial

Parallel

Serial

Parallel

Serial

Parallel

Serial

Parallel

Serial

Parallel

1

1.6

30.6

12

36.2

23.2

42.6

45

88.8

93.2

100

10

2

30.8

33.2

50.8

61.8

74.4

125.4

122.6

251.6

201

100

23.8

39.2

213.4

198.8

421.6

307

822.8

589.6

1660

1024.6

200

40.4

57.8

407.8

299.8

798.8

577.8

1634

1071.2

3262.4

1954.6

400

78.8

94

841

601.8

1676.8

1135.4

3237.4

2228.4

6424.2

3669.2

800

157.2

147.2

1591.4

1095

3174.6

2136.4

6388.4

4238.8

12747.6

7159.8

1000

196.8

181.4

1971.2

1329.6

3966.6

2630.6

7964

5279.6

16026

9111.6

It's difficult to interpret the raw numbers, but a good place to start is with some graphs of the numbers. Figures 10-3, 10-4, and 10-5 show the time it takes in milliseconds for a sequence to be processed by the Seq.map function versus the PSeq.map function. This is for a sequence of a fixed length (10 items, 100 items, and 1000 items respectively), but with varying numbers of operations on the sequence.

Time in milliseconds to process a list of ten items with a varying number of operations

Figure 10.3. Time in milliseconds to process a list of ten items with a varying number of operations

Time in milliseconds to process a list of 100 items with a varying number of operations

Figure 10.4. Time in milliseconds to process a list of 100 items with a varying number of operations

Time in milliseconds to process a list of 1000 items with a varying number of operations

Figure 10.5. Time in milliseconds to process a list of 1000 items with a varying number of operations

These three diagrams illustrate nicely the results of the experiment. For a sequence with a small number of items (the ten items in Figure 10-3), you can see that the parallel function (PSeq.map) is slower than the serial function (Seq.map) when you perform a small amount of work for each item. As the amount of work you need to perform for each item increases, the parallel processing becomes slightly quicker than the serial version, but the difference is never that great. For a sequence of 100 items (see Figure 10-4), you see a similar curve, but the point where the parallel version of the function becomes quicker than the serial version occurs much earlier, and the gains that the parallel function makes over the serial version are more pronounced. Finally, for a sequence of 1000 items (see Figure 10-5), you can see that the overhead incurred by the parallel function has been completely amortized. The parallel function is linearly two times faster that the sequential version because you perform the test on a dual processor. You can conclude from this that it will be often worth using the parallel version of the map function as long as you can ensure your input list is reasonably large.

Asynchronous Programming

Asynchronous programming is slightly different than the other forms of parallel programming you've seen so far. The other topics covered allow a number of threads to execute work in parallel, taking advantage of all available processors in a system. In asynchronous programming, your want to avoid blocking threads. You're already familiar with the concept of blocked threads from this chapter's first section, "Threads, Memory, Locking and Blocking." A blocked thread is one that can do no work because it is waiting for some task to finish; commonly the task a thread is waiting for is the operating system performing IO, but sometimes it might also be waiting for a lock, so it can enter a critical section. Threads are relatively expensive resources; each thread is allocated a 1 MB stack, and there are other expenses concerning how the operating system kernel handles a large number of threads. In performance critical code, it's important to keep the number of blocked threads low. Ideally, you will only have only as many threads as you have processors, and you will have no blocked threads.

Note

For an overview of the kind of results that you can achieve using these techniques, see Amanda Laucher's 2009 Lang.NET talk in which she describes using F# asynchronous workflows to parallelize a C# program and achieves some impressive results: www.langnetsymposium.com/2009/speakers.aspx.

In this section, you will look at how you can use the .NET frameworks asynchronous programming model to avoid blocking threads during IO. The asynchronous programming model means using the pairs of Begin/End, such as the BeginRead/EndRead, on the Stream class. Typically you use these pairs of methods to perform some kind IO task, such as reading from a file. This method of programming has acquired a reputation for being difficult, mainly because you need to find a good way to store state between the Begin/End calls. This section will not cover the asynchronous programming model directly; instead, you'll look at how to use a feature of F# called asynchronous workflows to avoid some of the work associated with asynchronous programming model in other .NET languages. For a more detailed explanation of the asynchronous programming model and some of the difficulties in using them, please refer to Jeffrey Richter's MSDN article, "Asynchronous Device Operations" (http://msdn.microsoft.com/en-us/magazine/cc163415.aspx).

Asynchronous workflows are not exclusively for use with the .NET asynchronous programming model. In the next section, "Message Passing," you'll see how you can use these workflows with F#'s mailboxes to coordinate a number of different tasks. This will allow you to wait for tasks to complete without blocking threads.

The first step in understanding asynchronous workflows in F# is to understand the syntax itself. To create an asynchronous workflow, you use monadic syntax, similar to the sequence expressions you saw in Chapter 3. The basic syntax is the keyword async with the workflow expression surrounded by curly brackets: async { ... }. A simple workflow program that uses workflows looks like this:

open System.IO

// a function to read a text file asynchronusly
let readFile file =
    async { let! stream = File.AsyncOpenText(file)
            let! fileContents = stream.AsyncReadToEnd()
            return fileContents }

// create an instance of the workflow
let readFileWorkflow = readFile "mytextfile.txt"

// invoke the workflow and get the contents
let fileContents = Async.RunSynchronously readFileWorkflow

To compile this program, you need to add a reference to the FSharp.PowerPack.dll. This program shows a function readFile that creates a workflow that reads a file asynchronously, then returns its contents. Next, you create an instance of the workflow called readFileWorkflow, and finally, you execute the workflow to get the file's contents. It's important to understand that simply calling the readFile function won't actually read the file. Instead, it creates a new instance of the workflow, and you can then execute the workflow to perform the task of reading the file. The Async.RunSynchronously function is actually responsible for executing the workflow. A workflow instance is a small data structure, rather like a small program, that can be interpreted to do some work.

The most important thing to notice about this example is the let followed by an explanation mark (let!), often pronounced let bang. The workflows/monadic syntax allows library writers to give different meanings to let!. In the case of asynchronous workflows, it means that an asynchronous operation will take place. The workflow will stop executing while the asynchronous operation takes place. A callback will be placed in the thread pool, and it will be invoked when the operation has completed, possibly on a different thread if the thread making the original call is not free. After the async call, the original thread is free to carry on doing other work.

You've probably also noticed that the let! is used with some special methods prefixed with Async. These functions are defined as type augmentations, which are F#'s equivalent of C#'s extension methods, in FSharp.PowerPack.dll. These methods handle the calling of the Begin/End method pairs. If no Async method is available, it's fairly easy to create your own using the Async.Primitive function and the Begin/End method pairs.

The flow of your simple example would look like this:

  • Step 1: The main program thread starts the process of opening the file stream, and a callback is placed in thread pool that can be used when this completes. This thread is now free to continue doing other work.

  • Step 2: A thread pool thread will activate when the file stream has opened. It will start reading the contents of the file and place a callback in thread pool that can be used when this completes. Because it is a thread pool thread, it will return to the thread pool.

  • Step 3: A thread pool thread will activate when the files has been completely read. It will return the text data that has been read from the file and return to thread pool.

  • Step 4: Because you have usedthe Async.RunSynchronously function, the main program thread is waiting for the results of the workflow. It will receive the file contents.

You will have probably spotted that there is a flaw in this simple example. You do not block the main program thread waiting for IO, but as you wait for the asynchronous workflow to complete, you do block the main program thread until the IO has completed. To put this another way, there's little or no advantage to executing one asynchronous workflow on its own and waiting for the result. However, it's fairly simple to execute several workflows in parallel. Executing several workflows at once does have a distinct advantage because the original thread is not blocked after it starts executing the first "Async" task; this means it is free to go on and execute more asynchronous tasks.

It's fairly easy to illustrate this using a slightly modified version of the original example where, instead of reading one file, you read three of them. You will also compare this to a synchronous version of the program, which will help demonstrate the differences. First, take a look at the synchronous version:

open System
open System.IO
open System.Threading

let print s =
    let tid = Thread.CurrentThread.ManagedThreadId
    Console.WriteLine(sprintf "Thread %i: %s" tid s)

let readFileSync file =
    print (sprintf "Beginning file %s" file)
    let stream = File.OpenText(file)
     let fileContents = stream.ReadToEnd()
     print (sprintf "Ending file %s" file)
    fileContents


// invoke the workflow and get the contents
let filesContents =
    [| readFileSync "text1.txt";
       readFileSync "text2.txt";
       readFileSync "text3.txt"; |]

This program is fairly straightforward. Note that the preceding includes some debugging code to show when you begin and end processing a file. Now look at the asynchronous version:

open System
open System.IO
open System.Threading

let print s =
    let tid = Thread.CurrentThread.ManagedThreadId
    Console.WriteLine(sprintf "Thread %i: %s" tid s)
// a function to read a text file asynchronusly
let readFileAsync file =
     async { do print (sprintf "Beginning file %s" file)
             let! stream = File.AsyncOpenText(file)
             let! fileContents = stream.AsyncReadToEnd()
             do print (sprintf "Ending file %s" file)
             return fileContents }

let filesContents =
    Async.RunSynchronously
         (Async.Parallel [ readFileAsync "text1.txt";
                          readFileAsync "text2.txt";
                           readFileAsync "text3.txt"; ])

Again, this version incorporates some debugging code, so you can see how the program executes. The biggest change is that you now use the Async.Parallel function to compose several workflows into a single workflow. This means that when the first thread finishes processing the first asynchronous call, it will be free to carry on processing the other workflows. This is probably easiest to see when you look at the results of the two programs:

Synchronous results

Thread 1: Beginning file text1.txt
Thread 1: Ending    file text1.txt
Thread 1: Beginning file text2.txt
Thread 1: Ending    file text2.txt
Thread 1: Beginning file text3.txt
Thread 1: Ending    file text3.txt

Asynchronous results

Thread 3: Beginning file text1.txt
Thread 4: Beginning file text2.txt
Thread 3: Beginning file text3.txt
Thread 4: Ending    file text2.txt
Thread 4: Ending    file text1.txt
Thread 4: Ending    file text3.txt

The two sets of results look quite different. For synchronous results, you see that each Beginning file is followed by an Ending file, and they all occur on the same thread. In the second case, you can see that all instances of the Beginning file occur at once, on two different threads. This occurs because once the first thread comes to an asynchronous operation, it is free to carry on and start another operation. The ending files occur later, once the IO has completed.

Message Passing

It's often useful to think of a parallel program as a series of independent components that send and receive messages. This is often referred to as the Actor Model—you can find a more formal description of the actor model on Wikipedia at http://en.wikipedia.org/wiki/Actor_model. Although the scenarios in which you would use message passing tend to be quite complex, the ideas behind it are relatively simple, as you'll see in a handful of simple examples.

The basic idea behind message passing is that a system is composed of agents, or actors. These can both send and receive messages. When an agent receives a message, the message is placed in a queue until the agent is ready to process it. When an agent processes a message, it makes a decision about what to do with it based on its internal state and contents of the message. The agent has a number of possibilities open to it in response to an incoming message: it might send a reply to the agent that initiated the exchange, create a new message for a different agent, create a new agent, or perhaps update some internal data structure.

F# provides the generic MailboxProcessor class as its implementation of message passing and the actor model. When a MailboxProcessor is created, it has (as the name suggests) a message queue that it can use to receive messages. MailboxProcessor is responsible for deciding what it will do with the message once it receives it. The implementation of a MailboxProcessor tends to follow a few simple patterns; the next example illustrates the simplest pattern for a MailboxProcessor:

open System

let mailbox =
     MailboxProcessor.Start(fun mb ->
        let rec loop x =
            async { let! msg = mb.Receive()
                    let x = x + msg
                    printfn "Running total: %i - new value %i" x msg
                     return! loop x }
        loop 0)

mailbox.Post(1)
mailbox.Post(2)
mailbox.Post(3)

Console.ReadLine() |> ignore

Executing the preceding code produces the following results:

Running total: 1 - new value 1
Running total: 3 - new value 2
Running total: 6 - new value 3

In the first part of the example, you create a mailbox that receives messages of type int. When the mailbox receives a message, it adds it to a running total and then displays the running total, along with the value received. Let's take a closer look at how you achieve this. The MailboxProcessor has a static start method that receives a function as a parameter. The function the start method receives has an instance of the new MailboxProcessor, and it must return an asynchronous workflow. You should use the asynchronous workflow to read messages from the queue. You make it an asynchronous workflow because messages need to be read asynchronously; this ensures that a mailbox is not tied to a single thread, which would cause scalability issues if you were using lots of mailboxes. You need to keep checking the queue for new messages that arrive; typically, you do this by using an infinite loop to keep checking the queue. In this case, you define a recursive function called loop, which reads from the queue by calling the Receive function, processes the message, and then calls itself to start the process again. This is an infinite recursion, but there's no danger of the stack overflowing because the function is tail recursive. The loop function takes a single parameter; you use this to store the mailbox's state—an integer that represents the running total in this case.

It's also worth noting that Console.ReadLine() at the end of this example is important. This is because the message queue is processed in a separate thread. Once we finish posting messages to mailbox using the Post method, the main thread has no more work to do, so it exits, causing the process to exit. In this case, the process will probably exit before the mailbox has had chance to process the messages in its queue. Calling Console.ReadLine() provides a simple way to block the main thread until the user has had chance to see the results of the mailbox processing the messages.

One final detail about this example: The mailbox's Post member function is safe to call from any thread because of the mailbox's work queue that ensures each message is processed in turn in an atomic way. The current example does not take advantage of this, but you will see this used in the next two examples.

This particular asynchronous workflow isn't that useful; however, it does represent the simplest usage pattern of workflow: receive a message, update some internal state, and then react to the message. In this case, reacting to the message means writing to the console, which probably is too simplistic to be of much use. However, you can find more realistic scenarios for this usage pattern. A good example of this would be using a mailbox to gather up a number of values, then marshal to the GUI thread so the values can be viewed. You'll learn more about this technique in the next pair of examples.

Begin by looking at the problem you're trying to solve in a bit more detail. If you have a simulation that generates data, you might want to be able to see this data in real time, as it is generated. When working with GUIs, you face two related constraints that make this quite challenging. First, the GUI must run on its own thread, and this thread must not be occupied for a long time, or the GUI will become unresponsive. This makes it impossible to execute a long running simulation on the GUI thread. Second, you can only access GUI objects from the thread that created them: the GUI thread. If your simulation is running on anther thread, then it cannot write directly to the GUI. Fortunately, GUI objects provide an Invoke method that allows you to invoke a function on the GUI thread and safely update the GUI with the generated data. Calling the invoke function too often can have a negative impact on performance because marshalling data to the GUI thread is fairly expensive. If your simulation outputs a small amount of data frequently, it's often a good idea to batch up the results, so you can print them to the screen 12 to 20 times a second to get a smooth animation effect. You'll begin by learning how to use mailboxes to solve a specific instance of this problem; next, you'll see a second example where you tidy this up into a more generic example.

F#'s mailboxes can help here by providing an elegant way to buffer the data before you print it to the screen. The basics of the algorithm are fairly simple. The thread running the simulation posts messages to the mailbox; when the mailbox has received enough messages, it notifies the GUI of the new updates to be drawn. This programming style also provides a neat way of separating the logic for generating the data from the logic presenting the data in the UI. Let's have a look at the whole code example, then step through and examine the how it all works. You'll need to add references to System.Drawing.dll and System.Windows.Forms.dll:

open System
open System.Threading
open System.Windows.Forms
open System.Drawing.Imaging
open System.Drawing
// the width & height for the simulation
let width, height = 500, 600

// the bitmap that will hold the output data
let bitmap = new Bitmap(width, height, PixelFormat.Format24bppRgb)

// a form to display the bitmap
let form = new Form(Width = width, Height = height,
                    BackgroundImage = bitmap)

// the function which recieves that points to be plotted
// and marshals to the GUI thread to plot them
let printPoints points =
     form.Invoke(new Action(fun () ->
        List.iter bitmap.SetPixel points
        form.Invalidate()))
    |> ignore

// the mailbox that will be used to collect the data
let mailbox =
    MailboxProcessor.Start(fun mb ->
        // main loop to read from the message queue
        // the parameter "points" holds the working data
        let rec loop points =
             async { // read a message
                    let! msg = mb.Receive()
                    // if we have over 100 messages write
                    // message to the GUI
                    if List.length points > 100 then
                        printPoints points
                        return! loop []
                    // otherwise append message and loop
                     return! loop (msg :: points) }
        loop [])

// start a worker thread running our fake simulation
let startWorkerThread() =
    // function that loops infinitely generating random
    // "simulation" data
    let fakeSimulation() =
        let rand = new Random()
        let colors = [| Color.Red; Color.Green; Color.Blue |]
        while true do
             // post the random data to the mailbox
             // then sleep to simulate work being done
mailbox.Post(rand.Next(width),
                rand.Next(height),
                colors.[rand.Next(colors.Length)])
            Thread.Sleep(rand.Next(100))
     // start the thread as a background thread, so it won't stop
     // the program exiting
     let thread = new Thread(fakeSimulation, IsBackground = true)
    thread.Start()

// start 6 instances of our simulation
for _ in 0 .. 5 do startWorkerThread()

// run the form
Application.Run form

This example has three key parts: how the simulation posts data to the mailbox, how the mailbox buffers points to be sent to the GUI, and how the GUI receives the points. Let's examine each of these in turn. Posting data to the mailbox remains simple; you continue to call the Post method on the mailbox. Two important differences exist between this example and the previous one. First, you pass a different data structure; however, the post method is generic, so you remain strongly typed. Second, you call the Postmethod from six different threads. The message queue enables this to work just fine, so everything just works. You use a simple technique to buffer data, which means you can simply count the number of you messages receive. When you receive 100, you send them to the GUI:

async { // read a message
        let! msg = mb.Receive()
        // if we have over 100 messages write
        // message to the GUI
        if List.length points > 100 then
            printPoints points
            return! loop []
        // otherwise append message and loop
        return! loop (msg :: points) }

The number 100 is fairly arbitrary; it was chosen because it seemed to work well for this particular simulation. It's also worth noting that you count the number of messages you receive at each iteration by calling the List.length function. This is suboptimal from a performance point of view because the List.length function will traverse the list each time you call it. This won't matter much in the current example because it uses a fairly small list; however, if you increase the buffer size, this approach could become a bottle neck. A better approach might be to store a separate parameter that you increment during each iteration of the function; however, this example avoids doing that for the sake of maintaining simplicity. Another alternative might be to store the time of the previous update, updating again only if the previous update was more than a twentieth of a second ago. This approach works well because it allows you to aim for the correct number of frames per second required to achieve a smooth animation effect. Again, this book's examples don't rely on this approach because adopting it would add an unnecessary element of complexity to the examples. The example includes one more technique worth mentioning, which is how you write the data to the screen:

let printPoints points =
    form.Invoke(new Action(fun () ->
        List.iter bitmap.SetPixel points
        form.Invalidate()))
    |> ignore

This is fairly straightforward. The printPoints function takes a points parameter, then invokes a delegate in the context of the form and allows you to write the points to the bitmap. Finally, you need to call the forms Invalidate function to ensure the points are displayed correctly.

The previous example provides a nice demonstration of how to use mailboxes, but the main problem with it is that the code is not reusable. It would be better if you could wrap your mailbox into a reusable component. F#'s object-oriented features provide a great way of doing this. This following example also demonstrates a couple of other important concepts, such as how you can support messages of different types within the same mailbox, as well as how you can return messages to a client of the mailbox. Again, you'll need to add references to System.Drawing.dll and System.Windows.Forms.dll:

open System
open System.Threading
open System.ComponentModel
open System.Windows.Forms
open System.Drawing.Imaging
open System.Drawing

// type that defines the messages types our updater can handle
type Updates<'a> =
    | AddValue of 'a
    | GetValues of AsyncReplyChannel<list<'a>>
    | Stop

// a generic collecter that recieves a number of post items and
// once a configurable limit is reached fires the update even
type Collector<'a>(?updatesCount) =
    // the number of updates to cound to before firing the update even
    let updatesCount = match updatesCount with Some x -> x | None -> 100

     // Capture the synchronization context of the thread that creates
this object. This
     // allows us to send messages back to the GUI thread painlessly.
     let context = AsyncOperationManager.SynchronizationContext
     let runInGuiContext f =
        context.Post(new SendOrPostCallback(fun _ -> f()), null)

     // This events are fired in the synchronization context of the GUI
(i.e. the thread
     // that created this object)
     let event = new Event<list<'a>>()
let mailboxWorkflow (inbox: MailboxProcessor<_>) =
         // main loop to read from the message queue
         // the parameter "curr" holds the working data
         // the parameter "master" holds all values received
         let rec loop curr master =
            async { // read a message
                    let! msg = inbox.Receive()
                    match msg with
                     | AddValue x ->
                         let curr, master = x :: curr, x :: master
                        // if we have over 100 messages write
                        // message to the GUI
                        if List.length curr > updatesCount then
                            do runInGuiContext(fun () -> event.Trigger(curr))
                            return! loop [] master
                         return! loop curr master
                    | GetValues channel ->
                         // send all data received back
                         channel.Reply master
                         return! loop curr master
                     | Stop -> () } // stop by not calling "loop"
        loop [] []

    // the mailbox that will be used to collect the data
    let mailbox = new MailboxProcessor<Updates<'a>>(mailboxWorkflow)

     // the API of the collector

     // add a value to the queue
    member w.AddValue (x) = mailbox.Post(AddValue(x))
     // get all the values the mailbox stores
    member w.GetValues() = mailbox.PostAndReply(fun x -> GetValues x)
    // publish the updates event
    [<CLIEvent>]
    member w.Updates = event.Publish
    // start the collector
    member w.Start() = mailbox.Start()
    // stop the collector
    member w.Stop() = mailbox.Post(Stop)

// create a new instance of the collector
let collector = new Collector<int*int*Color>()

// the width & height for the simulation
let width, height = 500, 600
// a form to display the updates
let form =
    // the bitmap that will hold the output data
    let bitmap = new Bitmap(width, height, PixelFormat.Format24bppRgb)
    let form = new Form(Width = width, Height = height, BackgroundImage = bitmap)
    // handle the collectors updates even and use it to post
     collector.Updates.Add(fun points ->
         List.iter bitmap.SetPixel points
        form.Invalidate())
    // start the collector when the form loads
    form.Load.Add(fun _ -> collector.Start())
    // when the form closes get all the values that were processed
    form.Closed.Add(fun _ ->
         let vals = collector.GetValues()
        MessageBox.Show(sprintf "Values processed: %i" (List.length vals))
        |> ignore
        collector.Stop())
    form

// start a worker thread running our fake simulation
let startWorkerThread() =
    // function that loops infinitely generating random
    // "simulation" data
    let fakeSimulation() =
        let rand = new Random()
        let colors = [| Color.Red; Color.Green; Color.Blue |]
        while true do
            // post the random data to the collector
            // then sleep to simulate work being done
            collector.AddValue(rand.Next(width),
                rand.Next(height),
                colors.[rand.Next(colors.Length)])
             Thread.Sleep(rand.Next(100))
     // start the thread as a background thread, so it won't stop
     // the program exiting
     let thread = new Thread(fakeSimulation, IsBackground = true)
     thread.Start()

// start 6 instances of our simulation
for _ in 0 .. 5 do startWorkerThread()

// run the form
Application.Run form

The output of this example is exactly the same as that of the previous example, and the code base follows largely the same pattern; however you can see a couple several important differences in the two examples. Perhaps the most noticeable one is that mailbox is now wrapped in an object that provides a strongly typed interface. The class you have created is called a Collector<'a>; its interface looks like this:

type Collector<'a> =
  class
    new : ?updatesCount:int -> Collector<'a>
    member AddValue : x:'a -> unit
    member GetValues : unit -> 'a list
    member Start : unit -> unit
    member Stop : unit -> unit
    member Updates : IEvent<'a list>
  end

The class is generic in terms of the type of values that it collects. It has an AddValue method to post a value to the internal mailbox and a GetValues method to get all the messages that have been passed to the mailbox so far. The collector must now be explicitly started and stopped by its Start and Stop methods. Finally, the collector has an Update event that is raised when enough messages have been collected. The number of messages collected is configurable by an optional integer that you can pass to the class constructor. The fact you use an event is an important design detail. Using an event to notify clients that updates exist means that your Collector<'a> needs no knowledge of the clients it uses, which greatly improves its reusability.

You now use a union type to represent your messages; this gives you the flexibility to have different types of messages. Clients of the Collector<'a> don't deal with it directly, but instead use the member methods it provides. The member methods have the job of creating the different types of messages. In addition to providing a value to the message queue, you can also send a message to retrieve all the current messages, as well as a message to stop the mailbox from reading new messages:

type Updates<'a> =
    | AddValue of 'a
    | GetValues of AsyncReplyChannel<list<'a>>
    | Stop

Next, you implement these different types of messages by pattern matching over the received messages:

let! msg = inbox.Receive()
match msg with
| AddValue x ->
        let curr, master = x :: curr, x :: master
        // if we have over 100 messages write
        // message to the GUI
        if List.length curr > updatesCount then
           do runInGuiCtxt(fun () -> fireUpdates(curr))
           return! loop [] master
        return! loop curr master
| GetValues channel ->
// send all data received back
        channel.Reply master
        return! loop curr master
| Stop -> ()

The AddValue union case is basically what you did in the previous example, except that this time you add the values to both the curr and master lists. The curr list stores the current values you will pass to the GUI on the next update, while the mast list provides a list of all the values that you've received. The master list enables you to accommodate any client that requests all the values.

For the union case GetValues, it's worth spending some time looking at how a client can return values. You start this process by calling the mailbox's PostAndReply method rather than its Post method; you can see this at work in the GetValues member method implementation:

// get all the values the mailbox stores
member w.GetValues() = mailbox.PostAndReply(fun x -> GetValues x)

The PostAndReply method accepts a function that is passed a AsyncReplyChannel< 'a> type. You can use this AsyncReplyChannel< 'a> type to send a message back to the call via its Reply member. This is what you see in the GetValues case of your union. Users of this method should be careful because it blocks until the message is returned, which means the message won't be processed until it reaches the front of the queue. This can take a long time if you have a long queue. Users tend to prefer the AsyncPostAndReply approach because it enables you to avoid blocking a thread while waiting for the reply; however, this example doesn't do this for the sake of keeping the example simple.

The Stop union case is the simplest way to stop reading messages from the queue; all you need to do is avoid calling the loop method recursively. That's not an issue in this case, but you still need to return a value, which you do this by returning the unittype, which is represented by empty parentheses (). The only subtlety you need to be careful of here is that calling the Stop method will not stop the mailbox immediately; it will stop the mailbox only when the stop message reaches the front of the queue.

You've seen how our Collector<'a> type handles messages; now let's look at how the Collector<'a> raises the Update event, so that it runs on the GUI thread. You create the Update event using new Event, just as you create any other event in F#. You use the function runInGuiContext to make this event run in the context of the GUI:

let context = AsyncOperationManager.SynchronizationContext
    let runInGuiContext f =
        context.Post(new SendOrPostCallback(fun _ -> f()), null)

First, you store the SynchronizationContext of the thread that created the object. You do this by using a static property on the AsyncOperationManager available in the System.ComponentModel namespace. The SynchronizationContext enables you to marshal to the thread that created it using its Post member method. The only thing you need to be careful about is that the thread that creates the collector object becomes the GUI thread; however, typically you'll use the main program thread to do both things, so you this won't be a problem. This technique where you capture the synchronization context is also used in the BackgroundWorker class from the "Reactive Programming" section of this chapter.

The definition of the form is now somewhat simpler because you no longer need to provide a function for the mailbox to call. You simply handle the Updates event instead:

// handle the collectors updates even and use it to post
     collector.Updates.Add(fun points ->
         List.iter bitmap.SetPixel points
         form.Invalidate())

You can also now take advantages of the form's Closed event to stop the mailbox processor and obtain a list of all the messages processed when a user closes the form:

// when the form closes get all the values that were processed
form.Closed.Add(fun _ ->
        let vals = collector.GetValues()
        MessageBox.Show(sprintf "Values processed: %i" (List.length vals))
        |> ignore
        collector.Stop())

You haven't changed the behavior of your example, but these additions greatly improved the design of the code by decoupling the code for the mailbox from the GUI code, which improves the reusability of the Collector<'a> class tremendously.

Summary

In this chapter, you've covered quite a lot of ground. You've also seen five different concurrency techniques, all of which have their place in certain kinds of applications.

In the next chapter, you'll see how some of these techniques, especially asynchronous workflows, can be used to make programming Distributed Applications easier.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset