Recently parallel programming has moved from being a relatively obscure topic, practiced only by specialist developers, to a more mainstream endeavor. This is due to the increasing prevalence of multicore processors. At the time of writing, it is almost impossible buy a PC with a single core processor—the standard is dual core—and quad core processors are beginning to arrive in the shops. It is fully expect that this trend will continue in the years to come.
To a certain extent, this interest in parallel programming has driven the renewed interest in functional programming. Functional programming is certainly not a silver bullet for all parallel programming problems, but it can help you design your software so it executes in parallel. In this chapter, you will learn about some of the simpler techniques to help your software execute in parallel, as well as how to take advantage of several processors.
It's often helpful to break down parallel programming into several smaller subtopics, all of which you'll learn about this chapter:
You'll learn about basic techniques for creating and controlling threads in .NET programming. You'll also take a quick look at how you can share resources (such as memory) between threads, as well as how you can control access to these shared resources.
It's often important to the user experience that programs remain reactive to input. To do this, it's important that you avoid doing too much processing on the thread responsible for reacting to user input. This is particularly relevant to GUI programming, but it can also apply to a server that needs to stay responsive to incoming requests.
This term refers to executing one piece of code concurrently on several processors with varying input data. This is a good way to parallelize the processing of large data structures such as collections. It's often possible to apply a transformation to several items in a collection in parallel, which will generally speed the overall execution time. The classic example of this is the parallel map, which provides one of the simplest ways to parallelize a functional program.
Some tasks, particularly I/O, need to happen asynchronously to make program execution efficient. It is important that threads are not blocked for long periods while IO takes place.
This technique is more formally referred to as the actor model. You use it to coordinate tasks that execute in parallel. This is the most advanced parallel-programming topic covered in this chapter.
Parallel programming is a large topic, so this chapter won't be exhaustive, but it will provide some straightforward ways to help you get started with parallel programming in F#.
If you are serious about parallel programming, it's worth investing time to understand threads and memory. In this section, you'll take a look at explicitly creating threads and how to control their access to shared resources, such as memory. My advice is to avoid explicitly creating and managing threads like this; however, when using the other parallel programming techniques, it's useful to understand the underlying threading concepts.
When a program is executed, the operating system creates a process to execute it. The process represents the resources that are allocated to the program, most notably the memory allocated to it. A process has one or more threads that are responsible for executing the program's instructions and share the process memory. In .NET, a program starts with one thread to execute the programs code. To create an extra thread in F#, you use the System.Threading.Thread
class. The Thread class's constructor takes a delegate that represents the function the thread will start executing. Once a Thread
class has been constructed, it does not start executing automatically, you must call its Start
method. The following example demonstrates how to create and start a new thread:
open System.Threading let main() = // create a new thread passing it a lambda function let thread = new Thread(fun () -> // print a message on the newly created thread printfn "Created thread: %i" Thread.CurrentThread.ManagedThreadId) // start the new thread thread.Start() // print an message on the original thread printfn "Orginal thread: %i" Thread.CurrentThread.ManagedThreadId // wait of the created thread to exit thread.Join() do main()
Compiling and executing the preceding program will output results similar to this:
Orginal thread: 1 Created thread: 3
You should look at a couple important things in this example. First, notice that the original thread prints its message before the second thread does. This is because calling a thread's Start
method does not immediately start the thread; rather, it schedules a new thread for execution and the operating system chooses when to start it. Normally, the delay will be short, but as the original thread will continue to execute, it's probable that the original thread will execute a few instructions before the new thread starts executing. Second, notice how you use the thread's Join function to wait for it to exit. If you did not do this, it is highly probable that the original thread would finish executing before the second thread had a chance to start. While the original thread is waiting for the create thread to do its work, you say that it is blocked. Threads can become blocked for a number of reasons. For example, they might be waiting on a lock, or might be waiting for I/O to complete. When a thread becomes blocked, the operating system switches to the next runnable thread; this is called a context switch. You'll learn about locking in the next section; in this section, you'll look at blocking I/O operations in asynchronous programming.
Any resource that can be updated by two different threads at the same time is at risk of being corrupted. This is because a thread can context switch at any time, leaving operations that should have been atomic half done. To avoid corruption, you need to use locks. A lock, sometimes referred to as a monitor, is a section of code where only one thread can pass through it at a time. In F#, we use the lock function to create and control locking. You do this by locking on an object; the idea is that, as soon as the lock is taken, any thread attempting to enter the section of code will be blocked until the lock is released by the thread that holds it. Code protected in this way is sometimes called a critical section. You achieve this calling System.Threading.Monitor.Enterat
the start of the code that you want to protect and System. Threading.Monitor.Exit
at the end of that code. You must guarantee that Monitor.Exitis
called, or this could lead to threads being locked forever. The lock function is a nice way to ensure that Monitor.Exit
is always called if Monitor.Enterhas
been called. This function takes two parameters: the first is the object you want to lock on, while the second is a function that contains the code you want to protect. This function should take unit as its parameter, and it can return any value.
The following example demonstrates the subtle issues involved in locking. The code to accomplish the lock needs to be quite long, and this example has been deliberately written to exaggerate the problem of context switching. The idea behind this code is this: if two threads run at the same time, both try to write the console. The aim of the sample is to write the string "One ... Two ... Three ...
" to the console atomically; that is, one thread should be able to finish writing its message before the next one starts. The example has a function, called makeUnsafeThread
, that creates a thread that won't be able to write to the console atomically and a second thread, makeSafeThread
, that writes to the console atomically by using a lock:
open System open System.Threading // function to print to the console character by character // this increases the chance of there being a context switch // between threads. let printSlowly (s : string) = s.ToCharArray() |> Array.iter (printf "%c") printfn "" // create a thread that prints to the console in an unsafe way let makeUnsafeThread() = new Thread(fun () -> for x in 1 .. 100 do printSlowly "One ... Two ... Three ... ") // the object that will be used as a lock let lockObj = new Object()
// create a thread that prints to the console in a safe way let makeSafeThread() = new Thread(fun () -> for x in 1 .. 100 do // use lock to ensure operation is atomic lock lockObj (fun () -> printSlowly "One ... Two ... Three ... ")) // helper function to run the test to let runTest (f: unit -> Thread) message = printfn "%s" message let t1 = f() in let t2 = f() in t1.Start() t2.Start() t1.Join() t2.Join() // runs the demonstrations let main() = runTest makeUnsafeThread "Running test without locking ..." runTest makeSafeThread "Running test with locking ..." do main()
The part of the example that uses the lock is repeated next to highlight the important points. You should note a couple of important factors. First, you use the declaration of the lockObj
to create the critical section. Second, you embed your use of the lock function in the makeSafeThread
function. The most important thing to notice is how, when printing the functions you want to be atomic, you place them inside the function you want to pass to lock
:
// the object that will be used as a lock let lockObj = new Object() // create a thread that prints to the console in a safe way let makeSafeThread() = new Thread(fun () -> for x in 1 .. 100 do // use lock to ensure operation is atomic lock lockObj (fun () -> printSlowly "One ... Two ... Three ... "))
The results of the first part of the test will vary each time it runs because it depends on when a thread context switches. It might also vary based on the number of processors, because multiple threads can run at the same time if a machine has two or more processors, so the messages will be more tightly packed together. On a single-processor machine, the output will be less tightly packed together because printing a message will go wrong only when a content switch takes place. The results of the first part of the sample, run on a dual-processor machine, look like this:
Running test without locking ... ... One ... Two ... Three ... One One ... Two ... Three ... One ... Two ... Three ... ...
The lock means that the results of the second half of the example will not vary at all, so they will always look like this:
Running test with locking ... One ... Two ... Three ... One ... Two ... Three ... One ... Two ... Three ... ...
Locking is an important aspect of concurrency. You should lock any resource that you write to and share between threads. A resource is often a variable, but it can also be a file or even the console, as shown in this example. Although locks can provide a solution to concurrency, they also can also create problems of their own because they can create a deadlock. A deadlock occurs when two or more different threads lock resources that the other thread needs and neither can advance. The simplest solution to concurrency is often to avoid sharing a resource that different threads can write to. In the rest of this chapter, you'll look at solutions for creating parallel programs that do not rely on explicitly creating locks.
This book provides an extremely brief introduction to threading. You will need to learn much more about threading if you want to become good at parallel programming. A good place to start is the MSDN section on managed threads: http://msdn.microsoft.com/en-us/library/hyz69czz.aspx
. You might also find this tutorial useful: http://www.albahari.com/threading/
.
Reactive programming refers to the practice of ensuring you're your programs react to events or input. In this section, you'll concentrate on reactive programming in terms of GUI programming; GUIs should always be reactive. However, other styles of programming also need to take reactive programming into account. For example, programs running on servers often need to stay reactive to input, even as they process other, longer running tasks. You must also apply some of the techniques discussed here to server programming, as you'll see when you implement a chat server in Chapter 11.
Most GUI libraries use an event loop to handle drawing the GUI and the interactions with the user. This means that one thread takes care of drawing the GUI and raising all the events on it. You refer to this thread as the GUI thread. Another consideration: You should update GUI objects only with the GUI thread; you want to avoid creating situations where other threads can corrupt the state of GUI objects. This means that computations or IO operations that take a significant amount of time should not take place on the GUI thread. If the GUI thread is involved with a long-running computation, it cannot process interactions from the user, nor can it draw the GUI. This is the number one cause of unresponsive GUIs.
You can see this in action in the following example that creates a GUI that could easily become unreactive because it tries to do too much computation on the GUI thread. This example also illustrates how to ensure your GUI remains reactive by making a few simple changes. You will look primarily at a useful abstraction called the BackgroundWorker
class, which you find in the System.ComponentModel
namespace. This useful class allows you to run some work, raising a notification event when this work is complete. This is especially useful for GUI programming because the completed notification is raised on the GUI thread. This helps you enforce the rule that GUI objects should only be altered from the thread that created them.
Specifically, the example creates a GUI for calculating the Fibonacci numbers using the simple Fibonacci calculation algorithm you saw in Chapter 7:
module Strangelights.Extensions let fibs = (1I,1I) |> Seq.unfold (fun (n0, n1) -> Some(n0, (n1, n0 + n1))) let fib n = Seq.nth n fibs
Creating a simple GUI for this calculation is straightforward; you can do this using the WinForms GUI toolkit you saw in Chapter 8:
open Strangelights.Extensions open System open System.Windows.Forms let form = let form = new Form() // input text box let input = new TextBox() // button to launch processing let button = new Button(Left = input.Right + 10, Text = "Go") // label to display the result let output = new Label(Top = input.Bottom + 10, Width = form.Width, Height = form.Height - input.Bottom + 10, Anchor = (AnchorStyles.Top ||| AnchorStyles.Left ||| AnchorStyles.Right ||| AnchorStyles.Bottom))
// do all the work when the button is clicked button.Click.Add(fun _ -> output.Text <- Printf.sprintf "%A" (fib (Int32.Parse(input.Text)))) // add the controls let dc c = c :> Control form.Controls.AddRange([|dc input; dc button; dc output |]) // return the form form // show the form do Application.Run(form)
Executing this example creates the GUI you see in Figure 10-1.
This GUI lets you display the results of your calculation in a reasonable way; unfortunately, your GUI becomes unreactive as soon as the calculation starts to take a long time. The code is responsible for the unresponsiveness:
// do all the work when the button is clicked button.Click.Add(fun _ -> output.Text <- Printf.sprintf "%A" (fib (Int32.Parse(input.Text))))
This code means that you do all the calculation on the same thread that raised click event: the GUI thread. This means that it is the GUI thread that is responsible for making the calculations, and it cannot process other events while it performs the calculation.
It's fairly easy to fix this using the background worker:
open Strangelights.Extensions open System open System.ComponentModel open System.Windows.Forms let form = let form = new Form() // input text box let input = new TextBox() // button to launch processing let button = new Button(Left = input.Right + 10, Text = "Go") // label to display the result let output = new Label(Top = input.Bottom + 10, Width = form.Width, Height = form.Height - input.Bottom + 10, Anchor = (AnchorStyles.Top ||| AnchorStyles.Left ||| AnchorStyles.Right ||| AnchorStyles.Bottom)) // create and run a new background worker let runWorker() = let background = new BackgroundWorker() // parse the input to an int let input = Int32.Parse(input.Text) // add the "work" event handler background.DoWork.Add(fun ea -> ea.Result <- fib input) // add the work completed event handler background.RunWorkerCompleted.Add(fun ea -> output.Text <- Printf.sprintf "%A" ea.Result) // start the worker off background.RunWorkerAsync() // hook up creating and running the worker to the button button.Click.Add(fun _ -> runWorker()) // add the controls let dc c = c :> Control form.Controls.AddRange([|dc input; dc button; dc output |]) // return the form form // show the form do Application.Run(form)
Using the background worker imposes few changes on the code. You do need to split the code does between the DoWork
and the RunWorkerCompleted
events, and this means you need to write slightly more code, but this will never require more than a few extra lines. Let's step though the required code changes; begin by creating a new instance of the background worker class:
let background = new BackgroundWorker()
You place the code that you need to happen in the background on a different thread—in the DoWork
event. You also need to be careful that you extract any data you need from controls outside of the DoWork
event. Because this code happens on a different thread, letting that code interact with the GUI objects would break the rule that they should only be manipulated by the GUI thread. You can see the code you use to read the integer and wire up the DoWork
event here:
// parse the input to an int let input = Int32.Parse(input.Text) // add the "work" event handler background.DoWork.Add(fun ea -> ea.Result <- fib input)
In the preceding example, you extract the input integer from the text box and parse it just before adding the event handler to the DoWork
event. Next, the lambda function you added to the DoWork
event captures the resulting integer. You should place the result that interests you in the DoWork
event's Result
property of the event argument. You can then recover the value in this property can then be recovered in the RunWorkerCompleted
event. It two has a result property that you can see in the following code:
// add the work completed event handler background.RunWorkerCompleted.Add(fun ea -> output.Text <- Printf.sprintf "%A" ea.Result)
You can be certain that the RunWorkerCompleted
event runs on the GUI thread, so it is fine to interact with GUI objects. You've wired up the events, but you have a couple tasks remaining. First, you need to start the background worker:
// start the worker off background.RunWorkerAsync()
Second, you need to add all this code to the button's Click
event. You've wrapped the preceding code in a function called runWorker(), so it's a simple matter of calling this code in the event handler:
// hook up creating and running the worker to the button button.Click.Add(fun _ -> runWorker())
Notice how this means you create a new background worker each time the button is clicked. This happens because a background worker cannot be reused once it's in use.
Now the GUI remains reactive no matter how many times someone clicks the Go button. This does leads to some other problems; for example, it's fairly easy to set off two calculations that will take some time to complete. If this happens, the results of both are placed in the same result label, so the user might have no idea which one finished first and is being displayed at the time she sees it. Your GUI remains reactive, but it's not well adapted to this multithreaded style of programming. One option might be to disable all the controls while the calculation takes place. This might be appropriate for a few case, but it's not a great option overall because it means the user can take little advantage of your reactive GUI. A better option is to create a system capable of displaying multiple results, along with their initial parameters. Ensuring that the user knows what a given result means. This example uses a data grid view to display the results:
open Strangelights.Extensions open System open System.ComponentModel open System.Windows.Forms open System.Numerics // define a type to hold the results type Result = { Input: int; Fibonacci: BigInteger; } let form = let form = new Form() // input text box let input = new TextBox() // button to launch processing let button = new Button(Left = input.Right + 10, Text = "Go") // list to hold the results let results = new BindingList<Result>() // data grid view to display multiple results let output = new DataGridView(Top = input.Bottom + 10, Width = form.Width, Height = form.Height - input.Bottom + 10, Anchor = (AnchorStyles.Top ||| AnchorStyles.Left ||| AnchorStyles.Right ||| AnchorStyles.Bottom), DataSource = results) // create and run a new background worker let runWorker() = let background = new BackgroundWorker() // parse the input to an int let input = Int32.Parse(input.Text) // add the "work" event handler background.DoWork.Add(fun ea -> ea.Result <- (input, fib input)) // add the work completed event handler background.RunWorkerCompleted.Add(fun ea -> let input, result = ea.Result :?> (int * BigInteger) results.Add({ Input = input; Fibonacci = result; })) // start the worker off background.RunWorkerAsync()
// hook up creating and running the worker to the button button.Click.Add(fun _ -> runWorker()) // add the controls let dc c = c :> Control form.Controls.AddRange([|dc input; dc button; dc output |]) // return the form form // show the form do Application.Run(form)
You can see this new GUI in Figure 10-2.
Data parallelism relies on executing a single function in parallel with varying data inputs. This breaks work into discrete units, so it can be processed in parallel, on separate threads, ensuring that work can be partitioned between the available processors.
Typically this means processing a collection of data in parallel. This method takes advantage of the fact that the items in the collection provide a natural way to partition the work. In the simplest case, a parallel map function, you apply a transformation to each item in the collection, and the results form a new collection. This simple case generally works because each item in the collection can typically be processed independently and in any order. It's also possible to use this technique to handle more complex scenarios, such as summing all the items in a list; however, it can also prove tricky for some complex cases, and the processing order can take on added significance.
Data parallelism typically relies on libraries and frameworks to provide parallel processing. Although they use multiple threads or processes to provide the parallelism, parallelism doesn't typically require the user to create or control these threads explicitly; instead, it's the job of the library or framework to do this. Work units can be distributed between different physical machines that form a computing grid; for the sake of simplicity and because multicore systems are becoming more common and powerful, this chapter will concentrate on systems where work is distributed between multiple processors of a single physical machine. Microsoft is working on providing new parallel programming facilities in the .NET framework, which will be available in version of the .NET framework 4.0. A few other libraries exist that implement data and task parallelism on the .NET platform, but this chapter will concentrate on what's available in .NET 4.0.
You have two main ways to achieve data parallelism in .NET 4.0: you can use the System.Threading.Parallel
class available in mscorlib.dll, or you can
use the System.Linq.ParallelEnumerable
class available in System.Core.dll
. The System.Threading.Parallel
class is perfectly usable from F#, but the System.Linq.ParallelEnumerable
class is probably the preferred way for F# programmers to achieve data parallelism because this library is written much more in a functional style.
Let's start with a quick example that illustrates how to use the System.Threading.Parallel
class's parallel For
, and then discuss why you probably don't want to use it. Assume that you want to write out the integers from 0 to 99 in parallel, as in this program:
open System.Threading Parallel.For(0, 100, (printfn "%i"))
When executed on my dual-core machine, the preceding code produces the following output:
0 13 8 9 6 7 14 10 ...
The numbers from the loop appear in a non deterministic order. This is because they were executed in parallel, and the threads assigned to execute each function are scheduled at random. The problem with using this kind of function in F# is that typically it will rely on a side effect at some point. It's easy to create functions with side effects in F#, but it can be undesirable when dealing with concurrency because it introduces problems. Even in this simple example, you face a risk that the numbers won't print atomically, but instead become mixed up as they print out. For example, you might see output like this:
... 6 174 10 ...
Here two numbers form a three digit number that didn't exist in the collection.
Bearing that in mind, the System.Threading.Parallel
class can still be useful on some occasions.
Imagine you need to send out a large number of emails, and you want to send these emails concurrently.
You might choose to parallelize this with a parallel ForEach
from the Parallel
class because sending an email is I/O and therefore a side effect:
open System.Threading let emails = [ "[email protected]"; "[email protected]"; "[email protected]"; "[email protected]" ] Parallel.ForEach(emails, (fun addr -> // code to create and send email goes here ()))
Even in this simple example, you need to ensure that you can call any function called inside Parallel.For
from multiple threads.
The System.Linq.ParallelEnumerable
class is much more promising for easily parallelizing F# programs. Essentially, it is a parallel implementation of the functions available in F#'s Seq
module. Because there are plenty of name changes between Seq
module and ParallelEnumerable
class, it's common to create a thin wrapper of ParallelEnumerable
to make it feel more like the Seq
module, as shown in this code:
namespace Strangelights.Extensions open System open System.Linq // Import a small number of functions from ParallelLinq module PSeq = // helper function to convert an ordinary seq (IEnumerable) into a IParallelEnumerable let asParallel list: IParallelEnumerable<_> = ParallelQuery.AsParallel(list) // the parallel map function we going to test let map f list = ParallelEnumerable.Select(asParallel list, new Func<_, _>(f)) // other parallel functions you may consider using let reduce f list = ParallelEnumerable.Aggregate(asParallel list, new Func<_, _, _>(f)) let fold f acc list = ParallelEnumerable.Aggregate(asParallel list, acc, new Func<_, _, _>(f))
You can use the finished version of this code to replace calls to functions from the Seq
module with their equivalent calls from the PSeq
wrapper module and expect your program to go faster in most cases—you will find some circumstances where it might be slower. For example, this code might execute more slowly for short lists where a relatively small amount of work is required for each item. You can see this at work in a micro benchmark for a parallel map function by comparing your parallel map function to normal map function. To do this, you vary both the size of the input list and the amount of work performed for each item in the input list.
Micro benchmarking is a useful tool for helping you to understand the performance consequences of a small section of code. Vance Morrison has a nice MSDN article on how to run micro benchmarks on the .NET platform at http://msdn.microsoft.com/en-us/magazine/cc500596.aspx
.
The following example shows how you might do this:
open System.Diagnostics open Strangelights.Extensions // the number of samples to collect let samples = 5 // the number of times to repeat each test within a sample let runs = 100 // this function provides the "work", by enumerating over a // collection of a given size let addSlowly x = Seq.fold (fun acc _ -> acc + 1) 0 (seq { 1 .. x }) // tests the sequentual map function by performing a map on a // a list with the given number of items and performing the given // number of opertions for each item. // the map is then iterated, to force it to perform the work. let testMap items ops = Seq.map (fun _ -> addSlowly ops) (seq { 1 .. items }) |> Seq.iter (fun _ -> ()) // test the parallel map function, works as above let testPMap items ops = PSeq.map (fun _ -> addSlowly ops) (seq { 1 .. items }) |> Seq.iter (fun _ -> ()) // a test harness function, takes a function and passes it the give let harness f items ops = // run once to ensure everything is JITed f items ops // collect a list of results let res = [ for _ in 1 .. samples do let clock = new Stopwatch() clock.Start() for _ in 1 .. runs do
f items ops clock.Stop() yield clock.ElapsedMilliseconds ] // calculate the average let avg = float (Seq.reduce (+) res) / (float samples) // output the results printf "Items %i, Ops %i," items ops Seq.iter (printf "%i,") res printfn "%f" avg // the parameters to use let itemsList = [ 10; 100; 200; 400; 800; 1000 ] let opsList = [ 1; 10; 100; 200; 400; 800; 1000 ] // test the sequential function for items in itemsList do for ops in opsList do harness testMap items ops // test the parallel function for items in itemsList do for ops in opsList do harness testPMap items ops
Before you examine the results of the tests, it's probably worth looking at the micro benchmarking code in a bit more detail because it can help you understand the results better. Perhaps the most important function is harness
, which has the job of running the test code. You need to keep a couple things in mind when setting up the test. First, you always run each test 100 times when measuring a test result. You do this because some of the tests on small list can run exceptionally fast, so running them only run once might make it hard to measure how long they take. Running the test repeatedly helps you avoid this problem. Second, you can always create a list of five results, then take the average time of this list. This is because other background processes on the computer can affect some tests Running the test several times and taking an average time helps you avoid this. You could also take this technique further and compute the standard deviation, which would highlight any tests were significantly longer or shorter than others.
The other interesting function is testMap
, which has the job of providing the work for the map function to do. You want to vary two things: the number of items each input list has and the amount of processing that each item in the list will take. The testMap
function achieves this through two parameters: items
and ops
. The items
parameter is the number of items in a list that the map function must process, and the ops parameter is the number of operations on each item that must be performed. It's also worth noting that, because your Seq.map
and PSeq.map
function are both lazy, you need to force them by iterating over the resulting list. Iterating over the list will cause the lazy sequence to be created and evaluated. If you did not do this, you could expect to see a small and constant time result. It would show only the time it takes to create an object that is capable of generating the list, but not the generation of the list itself. You force the generation of the list by iterating over it using the Seq.iter
function.
Now you're ready to look at the results themselves (see Table 10-1).
Table 10.1. The results of the sequence micro benchmark
Items | 10 | 100 | 200 | 400 | 800 | |||||
---|---|---|---|---|---|---|---|---|---|---|
Ops | Serial | Parallel | Serial | Parallel | Serial | Parallel | Serial | Parallel | Serial | Parallel |
1 | 1.6 | 30.6 | 12 | 36.2 | 23.2 | 42.6 | 45 | 88.8 | 93.2 | 100 |
10 | 2 | 30.8 | 33.2 | 50.8 | 61.8 | 74.4 | 125.4 | 122.6 | 251.6 | 201 |
100 | 23.8 | 39.2 | 213.4 | 198.8 | 421.6 | 307 | 822.8 | 589.6 | 1660 | 1024.6 |
200 | 40.4 | 57.8 | 407.8 | 299.8 | 798.8 | 577.8 | 1634 | 1071.2 | 3262.4 | 1954.6 |
400 | 78.8 | 94 | 841 | 601.8 | 1676.8 | 1135.4 | 3237.4 | 2228.4 | 6424.2 | 3669.2 |
800 | 157.2 | 147.2 | 1591.4 | 1095 | 3174.6 | 2136.4 | 6388.4 | 4238.8 | 12747.6 | 7159.8 |
1000 | 196.8 | 181.4 | 1971.2 | 1329.6 | 3966.6 | 2630.6 | 7964 | 5279.6 | 16026 | 9111.6 |
It's difficult to interpret the raw numbers, but a good place to start is with some graphs of the numbers. Figures 10-3, 10-4, and 10-5 show the time it takes in milliseconds for a sequence to be processed by the Seq.map
function versus the PSeq.map
function. This is for a sequence of a fixed length (10 items, 100 items, and 1000 items respectively), but with varying numbers of operations on the sequence.
Figure 10.3. Time in milliseconds to process a list of ten items with a varying number of operations
Figure 10.4. Time in milliseconds to process a list of 100 items with a varying number of operations
Figure 10.5. Time in milliseconds to process a list of 1000 items with a varying number of operations
These three diagrams illustrate nicely the results of the experiment. For a sequence with a small number of items (the ten items in Figure 10-3), you can see that the parallel function (PSeq.map
) is slower than the serial function (Seq.map
) when you perform a small amount of work for each item. As the amount of work you need to perform for each item increases, the parallel processing becomes slightly quicker than the serial version, but the difference is never that great. For a sequence of 100 items (see Figure 10-4), you see a similar curve, but the point where the parallel version of the function becomes quicker than the serial version occurs much earlier, and the gains that the parallel function makes over the serial version are more pronounced. Finally, for a sequence of 1000 items (see Figure 10-5), you can see that the overhead incurred by the parallel function has been completely amortized. The parallel function is linearly two times faster that the sequential version because you perform the test on a dual processor. You can conclude from this that it will be often worth using the parallel version of the map function as long as you can ensure your input list is reasonably large.
Asynchronous programming is slightly different than the other forms of parallel programming you've seen so far. The other topics covered allow a number of threads to execute work in parallel, taking advantage of all available processors in a system. In asynchronous programming, your want to avoid blocking threads. You're already familiar with the concept of blocked threads from this chapter's first section, "Threads, Memory, Locking and Blocking." A blocked thread is one that can do no work because it is waiting for some task to finish; commonly the task a thread is waiting for is the operating system performing IO, but sometimes it might also be waiting for a lock, so it can enter a critical section. Threads are relatively expensive resources; each thread is allocated a 1 MB stack, and there are other expenses concerning how the operating system kernel handles a large number of threads. In performance critical code, it's important to keep the number of blocked threads low. Ideally, you will only have only as many threads as you have processors, and you will have no blocked threads.
For an overview of the kind of results that you can achieve using these techniques, see Amanda Laucher's 2009 Lang.NET talk in which she describes using F# asynchronous workflows to parallelize a C# program and achieves some impressive results: www.langnetsymposium.com/2009/speakers.aspx
.
In this section, you will look at how you can use the .NET frameworks asynchronous programming model to avoid blocking threads during IO. The asynchronous programming model means using the pairs of Begin/End
, such as the BeginRead/EndRead
, on the Stream
class. Typically you use these pairs of methods to perform some kind IO task, such as reading from a file. This method of programming has acquired a reputation for being difficult, mainly because you need to find a good way to store state between the Begin/End
calls. This section will not cover the asynchronous programming model directly; instead, you'll look at how to use a feature of F# called asynchronous workflows to avoid some of the work associated with asynchronous programming model in other .NET languages. For a more detailed explanation of the asynchronous programming model and some of the difficulties in using them, please refer to Jeffrey Richter's MSDN article, "Asynchronous Device Operations" (http://msdn.microsoft.com/en-us/magazine/cc163415.aspx
).
Asynchronous workflows are not exclusively for use with the .NET asynchronous programming model. In the next section, "Message Passing," you'll see how you can use these workflows with F#'s mailboxes to coordinate a number of different tasks. This will allow you to wait for tasks to complete without blocking threads.
The first step in understanding asynchronous workflows in F# is to understand the syntax itself. To create an asynchronous workflow, you use monadic syntax, similar to the sequence expressions you saw in Chapter 3. The basic syntax is the keyword async
with the workflow expression surrounded by curly brackets: async
{ ... }. A simple workflow program that uses workflows looks like this:
open System.IO // a function to read a text file asynchronusly let readFile file = async { let! stream = File.AsyncOpenText(file) let! fileContents = stream.AsyncReadToEnd() return fileContents } // create an instance of the workflow let readFileWorkflow = readFile "mytextfile.txt" // invoke the workflow and get the contents let fileContents = Async.RunSynchronously readFileWorkflow
To compile this program, you need to add a reference to the FSharp.PowerPack.dll
. This program shows a function readFile
that creates a workflow that reads a file asynchronously, then returns its contents. Next, you create an instance of the workflow called readFileWorkflow
, and finally, you execute the workflow to get the file's contents. It's important to understand that simply calling the readFile
function won't actually read the file. Instead, it creates a new instance of the workflow, and you can then execute the workflow to perform the task of reading the file. The Async.RunSynchronously
function is actually responsible for executing the workflow. A workflow instance is a small data structure, rather like a small program, that can be interpreted to do some work.
The most important thing to notice about this example is the let
followed by an explanation mark (let
!), often pronounced let bang. The workflows/monadic syntax allows library writers to give different meanings to let!
. In the case of asynchronous workflows, it means that an asynchronous operation will take place. The workflow will stop executing while the asynchronous operation takes place. A callback will be placed in the thread pool, and it will be invoked when the operation has completed, possibly on a different thread if the thread making the original call is not free. After the async call, the original thread is free to carry on doing other work.
You've probably also noticed that the let!
is used with some special methods prefixed with Async
. These functions are defined as type augmentations, which are F#'s equivalent of C#'s extension methods, in FSharp.PowerPack.dll
. These methods handle the calling of the Begin/End
method pairs. If no Async
method is available, it's fairly easy to create your own using the Async.Primitive
function and the Begin/End
method pairs.
The flow of your simple example would look like this:
Step 1: The main program thread starts the process of opening the file stream, and a callback is placed in thread pool that can be used when this completes. This thread is now free to continue doing other work.
Step 2: A thread pool thread will activate when the file stream has opened. It will start reading the contents of the file and place a callback in thread pool that can be used when this completes. Because it is a thread pool thread, it will return to the thread pool.
Step 3: A thread pool thread will activate when the files has been completely read. It will return the text data that has been read from the file and return to thread pool.
Step 4: Because you have usedthe Async.RunSynchronously
function, the main program thread is waiting for the results of the workflow. It will receive the file contents.
You will have probably spotted that there is a flaw in this simple example. You do not block the main program thread waiting for IO, but as you wait for the asynchronous workflow to complete, you do block the main program thread until the IO has completed. To put this another way, there's little or no advantage to executing one asynchronous workflow on its own and waiting for the result. However, it's fairly simple to execute several workflows in parallel. Executing several workflows at once does have a distinct advantage because the original thread is not blocked after it starts executing the first "Async
" task; this means it is free to go on and execute more asynchronous tasks.
It's fairly easy to illustrate this using a slightly modified version of the original example where, instead of reading one file, you read three of them. You will also compare this to a synchronous version of the program, which will help demonstrate the differences. First, take a look at the synchronous version:
open System open System.IO open System.Threading let print s = let tid = Thread.CurrentThread.ManagedThreadId Console.WriteLine(sprintf "Thread %i: %s" tid s) let readFileSync file = print (sprintf "Beginning file %s" file) let stream = File.OpenText(file) let fileContents = stream.ReadToEnd() print (sprintf "Ending file %s" file) fileContents // invoke the workflow and get the contents let filesContents = [| readFileSync "text1.txt"; readFileSync "text2.txt"; readFileSync "text3.txt"; |]
This program is fairly straightforward. Note that the preceding includes some debugging code to show when you begin and end processing a file. Now look at the asynchronous version:
open System open System.IO open System.Threading let print s = let tid = Thread.CurrentThread.ManagedThreadId Console.WriteLine(sprintf "Thread %i: %s" tid s)
// a function to read a text file asynchronusly let readFileAsync file = async { do print (sprintf "Beginning file %s" file) let! stream = File.AsyncOpenText(file) let! fileContents = stream.AsyncReadToEnd() do print (sprintf "Ending file %s" file) return fileContents } let filesContents = Async.RunSynchronously (Async.Parallel [ readFileAsync "text1.txt"; readFileAsync "text2.txt"; readFileAsync "text3.txt"; ])
Again, this version incorporates some debugging code, so you can see how the program executes. The biggest change is that you now use the Async.Parallel
function to compose several workflows into a single workflow. This means that when the first thread finishes processing the first asynchronous call, it will be free to carry on processing the other workflows. This is probably easiest to see when you look at the results of the two programs:
Synchronous results
Thread 1: Beginning file text1.txt Thread 1: Ending file text1.txt Thread 1: Beginning file text2.txt Thread 1: Ending file text2.txt Thread 1: Beginning file text3.txt Thread 1: Ending file text3.txt
Asynchronous results
Thread 3: Beginning file text1.txt Thread 4: Beginning file text2.txt Thread 3: Beginning file text3.txt Thread 4: Ending file text2.txt Thread 4: Ending file text1.txt Thread 4: Ending file text3.txt
The two sets of results look quite different. For synchronous results, you see that each Beginning file is followed by an Ending file, and they all occur on the same thread. In the second case, you can see that all instances of the Beginning file occur at once, on two different threads. This occurs because once the first thread comes to an asynchronous operation, it is free to carry on and start another operation. The ending files occur later, once the IO has completed.
It's often useful to think of a parallel program as a series of independent components that send and receive messages. This is often referred to as the Actor Model—you can find a more formal description of the actor model on Wikipedia at http://en.wikipedia.org/wiki/Actor_model
. Although the scenarios in which you would use message passing tend to be quite complex, the ideas behind it are relatively simple, as you'll see in a handful of simple examples.
The basic idea behind message passing is that a system is composed of agents, or actors. These can both send and receive messages. When an agent receives a message, the message is placed in a queue until the agent is ready to process it. When an agent processes a message, it makes a decision about what to do with it based on its internal state and contents of the message. The agent has a number of possibilities open to it in response to an incoming message: it might send a reply to the agent that initiated the exchange, create a new message for a different agent, create a new agent, or perhaps update some internal data structure.
F# provides the generic MailboxProcessor
class as its implementation of message passing and the actor model. When a MailboxProcessor
is created, it has (as the name suggests) a message queue that it can use to receive messages. MailboxProcessor
is responsible for deciding what it will do with the message once it receives it. The implementation of a MailboxProcessor
tends to follow a few simple patterns; the next example illustrates the simplest pattern for a MailboxProcessor
:
open System let mailbox = MailboxProcessor.Start(fun mb -> let rec loop x = async { let! msg = mb.Receive() let x = x + msg printfn "Running total: %i - new value %i" x msg return! loop x } loop 0) mailbox.Post(1) mailbox.Post(2) mailbox.Post(3) Console.ReadLine() |> ignore
Executing the preceding code produces the following results:
Running total: 1 - new value 1 Running total: 3 - new value 2 Running total: 6 - new value 3
In the first part of the example, you create a mailbox that receives messages of type int
. When the mailbox receives a message, it adds it to a running total and then displays the running total, along with the value received. Let's take a closer look at how you achieve this. The MailboxProcessor
has a static start method that receives a function as a parameter. The function the start method receives has an instance of the new MailboxProcessor
, and it must return an asynchronous workflow. You should use the asynchronous workflow to read messages from the queue. You make it an asynchronous workflow because messages need to be read asynchronously; this ensures that a mailbox is not tied to a single thread, which would cause scalability issues if you were using lots of mailboxes. You need to keep checking the queue for new messages that arrive; typically, you do this by using an infinite loop to keep checking the queue. In this case, you define a recursive function called loop
, which reads from the queue by calling the Receive
function, processes the message, and then calls itself to start the process again. This is an infinite recursion, but there's no danger of the stack overflowing because the function is tail recursive. The loop
function takes a single parameter; you use this to store the mailbox's state—an integer that represents the running total in this case.
It's also worth noting that Console.ReadLine()
at the end of this example is important. This is because the message queue is processed in a separate thread. Once we finish posting messages to mailbox using the Post
method, the main thread has no more work to do, so it exits, causing the process to exit. In this case, the process will probably exit before the mailbox has had chance to process the messages in its queue. Calling Console.ReadLine()
provides a simple way to block the main thread until the user has had chance to see the results of the mailbox processing the messages.
One final detail about this example: The mailbox's Post
member function is safe to call from any thread because of the mailbox's work queue that ensures each message is processed in turn in an atomic way. The current example does not take advantage of this, but you will see this used in the next two examples.
This particular asynchronous workflow isn't that useful; however, it does represent the simplest usage pattern of workflow: receive a message, update some internal state, and then react to the message. In this case, reacting to the message means writing to the console, which probably is too simplistic to be of much use. However, you can find more realistic scenarios for this usage pattern. A good example of this would be using a mailbox to gather up a number of values, then marshal to the GUI thread so the values can be viewed. You'll learn more about this technique in the next pair of examples.
Begin by looking at the problem you're trying to solve in a bit more detail. If you have a simulation that generates data, you might want to be able to see this data in real time, as it is generated. When working with GUIs, you face two related constraints that make this quite challenging. First, the GUI must run on its own thread, and this thread must not be occupied for a long time, or the GUI will become unresponsive. This makes it impossible to execute a long running simulation on the GUI thread. Second, you can only access GUI objects from the thread that created them: the GUI thread. If your simulation is running on anther thread, then it cannot write directly to the GUI. Fortunately, GUI objects provide an Invoke
method that allows you to invoke a function on the GUI thread and safely update the GUI with the generated data. Calling the invoke function too often can have a negative impact on performance because marshalling data to the GUI thread is fairly expensive. If your simulation outputs a small amount of data frequently, it's often a good idea to batch up the results, so you can print them to the screen 12 to 20 times a second to get a smooth animation effect. You'll begin by learning how to use mailboxes to solve a specific instance of this problem; next, you'll see a second example where you tidy this up into a more generic example.
F#'s mailboxes can help here by providing an elegant way to buffer the data before you print it to the screen. The basics of the algorithm are fairly simple. The thread running the simulation posts messages to the mailbox; when the mailbox has received enough messages, it notifies the GUI of the new updates to be drawn. This programming style also provides a neat way of separating the logic for generating the data from the logic presenting the data in the UI. Let's have a look at the whole code example, then step through and examine the how it all works. You'll need to add references to System.Drawing.dll
and System.Windows.Forms.dll
:
open System open System.Threading open System.Windows.Forms open System.Drawing.Imaging open System.Drawing
// the width & height for the simulation let width, height = 500, 600 // the bitmap that will hold the output data let bitmap = new Bitmap(width, height, PixelFormat.Format24bppRgb) // a form to display the bitmap let form = new Form(Width = width, Height = height, BackgroundImage = bitmap) // the function which recieves that points to be plotted // and marshals to the GUI thread to plot them let printPoints points = form.Invoke(new Action(fun () -> List.iter bitmap.SetPixel points form.Invalidate())) |> ignore // the mailbox that will be used to collect the data let mailbox = MailboxProcessor.Start(fun mb -> // main loop to read from the message queue // the parameter "points" holds the working data let rec loop points = async { // read a message let! msg = mb.Receive() // if we have over 100 messages write // message to the GUI if List.length points > 100 then printPoints points return! loop [] // otherwise append message and loop return! loop (msg :: points) } loop []) // start a worker thread running our fake simulation let startWorkerThread() = // function that loops infinitely generating random // "simulation" data let fakeSimulation() = let rand = new Random() let colors = [| Color.Red; Color.Green; Color.Blue |] while true do // post the random data to the mailbox // then sleep to simulate work being done
mailbox.Post(rand.Next(width), rand.Next(height), colors.[rand.Next(colors.Length)]) Thread.Sleep(rand.Next(100)) // start the thread as a background thread, so it won't stop // the program exiting let thread = new Thread(fakeSimulation, IsBackground = true) thread.Start() // start 6 instances of our simulation for _ in 0 .. 5 do startWorkerThread() // run the form Application.Run form
This example has three key parts: how the simulation posts data to the mailbox, how the mailbox buffers points to be sent to the GUI, and how the GUI receives the points. Let's examine each of these in turn. Posting data to the mailbox remains simple; you continue to call the Post
method on the mailbox. Two important differences exist between this example and the previous one. First, you pass a different data structure; however, the post method is generic, so you remain strongly typed. Second, you call the Postmethod from six different threads. The message queue enables this to work just fine, so everything just works. You use a simple technique to buffer data, which means you can simply count the number of you messages receive. When you receive 100, you send them to the GUI:
async { // read a message let! msg = mb.Receive() // if we have over 100 messages write // message to the GUI if List.length points > 100 then printPoints points return! loop [] // otherwise append message and loop return! loop (msg :: points) }
The number 100 is fairly arbitrary; it was chosen because it seemed to work well for this particular simulation. It's also worth noting that you count the number of messages you receive at each iteration by calling the List.length
function. This is suboptimal from a performance point of view because the List.length
function will traverse the list each time you call it. This won't matter much in the current example because it uses a fairly small list; however, if you increase the buffer size, this approach could become a bottle neck. A better approach might be to store a separate parameter that you increment during each iteration of the function; however, this example avoids doing that for the sake of maintaining simplicity. Another alternative might be to store the time of the previous update, updating again only if the previous update was more than a twentieth of a second ago. This approach works well because it allows you to aim for the correct number of frames per second required to achieve a smooth animation effect. Again, this book's examples don't rely on this approach because adopting it would add an unnecessary element of complexity to the examples. The example includes one more technique worth mentioning, which is how you write the data to the screen:
let printPoints points = form.Invoke(new Action(fun () -> List.iter bitmap.SetPixel points form.Invalidate())) |> ignore
This is fairly straightforward. The printPoints
function takes a points
parameter, then invokes a delegate in the context of the form and allows you to write the points to the bitmap. Finally, you need to call the forms Invalidate
function to ensure the points are displayed correctly.
The previous example provides a nice demonstration of how to use mailboxes, but the main problem with it is that the code is not reusable. It would be better if you could wrap your mailbox into a reusable component. F#'s object-oriented features provide a great way of doing this. This following example also demonstrates a couple of other important concepts, such as how you can support messages of different types within the same mailbox, as well as how you can return messages to a client of the mailbox. Again, you'll need to add references to System.Drawing.dll
and System.Windows.Forms.dll
:
open System open System.Threading open System.ComponentModel open System.Windows.Forms open System.Drawing.Imaging open System.Drawing // type that defines the messages types our updater can handle type Updates<'a> = | AddValue of 'a | GetValues of AsyncReplyChannel<list<'a>> | Stop // a generic collecter that recieves a number of post items and // once a configurable limit is reached fires the update even type Collector<'a>(?updatesCount) = // the number of updates to cound to before firing the update even let updatesCount = match updatesCount with Some x -> x | None -> 100 // Capture the synchronization context of the thread that creates this object. This // allows us to send messages back to the GUI thread painlessly. let context = AsyncOperationManager.SynchronizationContext let runInGuiContext f = context.Post(new SendOrPostCallback(fun _ -> f()), null) // This events are fired in the synchronization context of the GUI (i.e. the thread // that created this object) let event = new Event<list<'a>>()
let mailboxWorkflow (inbox: MailboxProcessor<_>) = // main loop to read from the message queue // the parameter "curr" holds the working data // the parameter "master" holds all values received let rec loop curr master = async { // read a message let! msg = inbox.Receive() match msg with | AddValue x -> let curr, master = x :: curr, x :: master // if we have over 100 messages write // message to the GUI if List.length curr > updatesCount then do runInGuiContext(fun () -> event.Trigger(curr)) return! loop [] master return! loop curr master | GetValues channel -> // send all data received back channel.Reply master return! loop curr master | Stop -> () } // stop by not calling "loop" loop [] [] // the mailbox that will be used to collect the data let mailbox = new MailboxProcessor<Updates<'a>>(mailboxWorkflow) // the API of the collector // add a value to the queue member w.AddValue (x) = mailbox.Post(AddValue(x)) // get all the values the mailbox stores member w.GetValues() = mailbox.PostAndReply(fun x -> GetValues x) // publish the updates event [<CLIEvent>] member w.Updates = event.Publish // start the collector member w.Start() = mailbox.Start() // stop the collector member w.Stop() = mailbox.Post(Stop) // create a new instance of the collector let collector = new Collector<int*int*Color>() // the width & height for the simulation let width, height = 500, 600
// a form to display the updates let form = // the bitmap that will hold the output data let bitmap = new Bitmap(width, height, PixelFormat.Format24bppRgb) let form = new Form(Width = width, Height = height, BackgroundImage = bitmap) // handle the collectors updates even and use it to post collector.Updates.Add(fun points -> List.iter bitmap.SetPixel points form.Invalidate()) // start the collector when the form loads form.Load.Add(fun _ -> collector.Start()) // when the form closes get all the values that were processed form.Closed.Add(fun _ -> let vals = collector.GetValues() MessageBox.Show(sprintf "Values processed: %i" (List.length vals)) |> ignore collector.Stop()) form // start a worker thread running our fake simulation let startWorkerThread() = // function that loops infinitely generating random // "simulation" data let fakeSimulation() = let rand = new Random() let colors = [| Color.Red; Color.Green; Color.Blue |] while true do // post the random data to the collector // then sleep to simulate work being done collector.AddValue(rand.Next(width), rand.Next(height), colors.[rand.Next(colors.Length)]) Thread.Sleep(rand.Next(100)) // start the thread as a background thread, so it won't stop // the program exiting let thread = new Thread(fakeSimulation, IsBackground = true) thread.Start() // start 6 instances of our simulation for _ in 0 .. 5 do startWorkerThread() // run the form Application.Run form
The output of this example is exactly the same as that of the previous example, and the code base follows largely the same pattern; however you can see a couple several important differences in the two examples. Perhaps the most noticeable one is that mailbox is now wrapped in an object that provides a strongly typed interface. The class you have created is called a Collector<'a>
; its interface looks like this:
type Collector<'a> = class new : ?updatesCount:int -> Collector<'a> member AddValue : x:'a -> unit member GetValues : unit -> 'a list member Start : unit -> unit member Stop : unit -> unit member Updates : IEvent<'a list> end
The class is generic in terms of the type of values that it collects. It has an AddValue
method to post a value to the internal mailbox and a GetValues
method to get all the messages that have been passed to the mailbox so far. The collector must now be explicitly started and stopped by its Start
and Stop
methods. Finally, the collector has an Update
event that is raised when enough messages have been collected. The number of messages collected is configurable by an optional integer that you can pass to the class constructor. The fact you use an event is an important design detail. Using an event to notify clients that updates exist means that your Collector<'a>
needs no knowledge of the clients it uses, which greatly improves its reusability.
You now use a union type to represent your messages; this gives you the flexibility to have different types of messages. Clients of the Collector<'a>
don't deal with it directly, but instead use the member methods it provides. The member methods have the job of creating the different types of messages. In addition to providing a value to the message queue, you can also send a message to retrieve all the current messages, as well as a message to stop the mailbox from reading new messages:
type Updates<'a> = | AddValue of 'a | GetValues of AsyncReplyChannel<list<'a>> | Stop
Next, you implement these different types of messages by pattern matching over the received messages:
let! msg = inbox.Receive() match msg with | AddValue x -> let curr, master = x :: curr, x :: master // if we have over 100 messages write // message to the GUI if List.length curr > updatesCount then do runInGuiCtxt(fun () -> fireUpdates(curr)) return! loop [] master return! loop curr master | GetValues channel ->
// send all data received back channel.Reply master return! loop curr master | Stop -> ()
The AddValue
union case is basically what you did in the previous example, except that this time you add the values to both the curr
and master
lists. The curr list stores the current values you will pass to the GUI on the next update, while the mast list provides a list of all the values that you've received. The master list enables you to accommodate any client that requests all the values.
For the union case GetValues
, it's worth spending some time looking at how a client can return values. You start this process by calling the mailbox's PostAndReply
method rather than its Post
method; you can see this at work in the GetValues
member method implementation:
// get all the values the mailbox stores member w.GetValues() = mailbox.PostAndReply(fun x -> GetValues x)
The PostAndReply
method accepts a function that is passed a AsyncReplyChannel< 'a>
type. You can use this AsyncReplyChannel< 'a>
type to send a message back to the call via its Reply
member. This is what you see in the GetValues
case of your union. Users of this method should be careful because it blocks until the message is returned, which means the message won't be processed until it reaches the front of the queue. This can take a long time if you have a long queue. Users tend to prefer the AsyncPostAndReply
approach because it enables you to avoid blocking a thread while waiting for the reply; however, this example doesn't do this for the sake of keeping the example simple.
The Stop
union case is the simplest way to stop reading messages from the queue; all you need to do is avoid calling the loop
method recursively. That's not an issue in this case, but you still need to return a value, which you do this by returning the unittype, which is represented by empty parentheses (). The only subtlety you need to be careful of here is that calling the Stop method will not stop the mailbox immediately; it will stop the mailbox only when the stop message reaches the front of the queue.
You've seen how our Collector<'a>
type handles messages; now let's look at how the Collector<'a>
raises the Update
event, so that it runs on the GUI thread. You create the Update
event using new Event
, just as you create any other event in F#. You use the function runInGuiContext
to make this event run in the context of the GUI:
let context = AsyncOperationManager.SynchronizationContext let runInGuiContext f = context.Post(new SendOrPostCallback(fun _ -> f()), null)
First, you store the SynchronizationContext
of the thread that created the object. You do this by using a static property on the AsyncOperationManager
available in the System.ComponentModel
namespace. The SynchronizationContext
enables you to marshal to the thread that created it using its Post
member method. The only thing you need to be careful about is that the thread that creates the collector object becomes the GUI thread; however, typically you'll use the main program thread to do both things, so you this won't be a problem. This technique where you capture the synchronization context is also used in the BackgroundWorker
class from the "Reactive Programming" section of this chapter.
The definition of the form is now somewhat simpler because you no longer need to provide a function for the mailbox to call. You simply handle the Updates event instead:
// handle the collectors updates even and use it to post collector.Updates.Add(fun points -> List.iter bitmap.SetPixel points form.Invalidate())
You can also now take advantages of the form's Closed
event to stop the mailbox processor and obtain a list of all the messages processed when a user closes the form:
// when the form closes get all the values that were processed form.Closed.Add(fun _ -> let vals = collector.GetValues() MessageBox.Show(sprintf "Values processed: %i" (List.length vals)) |> ignore collector.Stop())
You haven't changed the behavior of your example, but these additions greatly improved the design of the code by decoupling the code for the mailbox from the GUI code, which improves the reusability of the Collector<'a>
class tremendously.
In this chapter, you've covered quite a lot of ground. You've also seen five different concurrency techniques, all of which have their place in certain kinds of applications.
In the next chapter, you'll see how some of these techniques, especially asynchronous workflows, can be used to make programming Distributed Applications easier.