For most of computing’s history, software developers have been spoiled by processor manufacturers that were constantly pushing the limits of their chips’ clock speeds. If you needed your software to run faster (to process larger data sets, or because users were complaining about the system freezing when it was really just busy), often all you had to do was upgrade to the latest chip. Over the past decade or so something changed: Processor manufacturers began improving processor performance not by increasing clock speeds but by adding processing cores.
Although processor architecture has changed, software architecture has largely remained static. Multicore processors have become the norm, yet many applications are still written as though only one core is available to them and thus are not taking full advantage of the underlying hardware. Long-running tasks are still being executed on the UI thread, and large data sets are often processed synchronously. A big reason for this is that, traditionally, asynchronous and parallel programming have been sufficiently complex and error prone that they were typically the domain of expert developers working on highly specialized software.
Fortunately, software is starting to catch up. Programmers are learning that the days of solving performance issues by throwing faster hardware at the problem have passed and that it’s increasingly important to consider concurrent processing needs at an architectural level.
Although they’re closely related, asynchronous and parallel programming have different goals. Asynchronous programming aims to separate processing and reduce blocking so that longer-running tasks don’t prevent the system from completing other tasks within the same process. By contrast, parallel processing aims to improve performance by partitioning work into chunks that can be distributed across processors and operated against independently.
Since its inception, the .NET Framework has supported both asynchronous and parallel programming through threads and a multitude of synchronization mechanisms such as monitors, mutexes, semaphores, and so on. The Asynchronous Programming Model (APM), where classes define BeginX
and EndX
methods for operations that should be run asynchronously (such as the BeginRead
and EndRead
methods on the System.IO.FileStream
class) has long been the preferred approach to asynchronous programming in .NET.
In this chapter, we’ll explore several ways that F# makes asynchronous and parallel programming more accessible, thereby freeing you to focus on creating correct solutions. We’ll begin with a brief introduction to the Task Parallel Library. Next, we’ll discuss another F# construct: asynchronous workflows. Finally, we’ll conclude with an introduction to the MailboxProcessor
, F#’s agent-based model for asynchronous programming.
As its name implies, the Task Parallel Library (TPL) excels at handling parallel programming scenarios and is the preferred mechanism for CPU-bound operations. It abstracts much of the complexity of managing threads, locks, callbacks, cancellations, and exception handling behind a uniform interface. Although the TPL is not specific to F#, a basic understanding of it is helpful especially if you need to interact with code from libraries that use it.
The TPL enables two types of parallelism: data parallelism and task parallelism.
Data parallelism. Involves performing a specific action against each value in a sequence by distributing the work effectively across available processing resources. Under the data parallelism model, you specify a sequence along with an action and the TPL determines how to partition the data and distributes the work accordingly.
Task parallelism. Focuses on executing independent tasks concurrently. With task parallelism, you are responsible for manually creating and managing tasks, but this model offers you more control. Through the various Task
classes, you can easily initiate asynchronous processing, wait for tasks to complete, return values, set up continuations, or spawn additional tasks.
This section is not intended to be a comprehensive guide to the TPL. Thus, it won’t get into many of the intricacies of task creation, scheduling, management, or other associated topics. The intention here is to establish a baseline, providing you with enough information to make you immediately productive when writing code using the TPL.
One of the key differences between working directly with threads and using the TPL is that the TPL is task based rather than thread based. This difference is quite important in that the TPL tries to run tasks concurrently by pulling threads from the thread pool, but it does not guarantee parallelism. This is known as potential parallelism.
Whenever you create a thread directly, you incur the overhead of allocating and scheduling it. This overhead can be detrimental to overall system performance if there aren’t enough system resources available to support it. The basic concurrency mechanisms, like thread pooling, help reduce the impact by reusing existing threads, but the TPL goes a step further by taking available system resources into account. If there aren’t sufficient resources available or the TPL otherwise determines that running a task in parallel will be detrimental to performance, it will run the task synchronously. As resources fluctuate over time, the TPL’s task scheduling and work partitioning algorithms help rebalance work to use the available resources effectively.
Data parallelism is achieved primarily through the use of the static For
and ForEach
methods of the Parallel
class located in the System.Threading.Tasks
namespace. As their names imply, these methods are essentially parallel versions of the simple and enumerable for
loops, respectively.
Data parallelism can also be achieved through PLINQ’s (Parallel LINQ) AsParallel extension method. To simplify working with parallel sequences in F#, the PSeq module in the F# PowerPack exposes many of the ParallelEnumerable methods using the same nomenclature as the Seq module.
For normal usage, Parallel.For
and Parallel.ForEach
differ only by their input; Parallel.For
accepts range boundaries, whereas Parallel.ForEach
accepts a sequence. Both methods also accept a function that serves as the loop body, and they implicitly wait for all iterations to complete before returning control to the caller. Since the methods are so similar, the examples in this section will use Parallel.For
for consistency.
The simplest form, the parallel for
loop, simply invokes an action for each value in the range. Here, we use a parallel for
loop to write the numbers 0 through 99.
open System open System.Threading.Tasks Parallel.For(0, 100, printfn "%i")
This snippet is pretty self-explanatory. The first argument passed to Parallel.For
identifies the inclusive beginning of the range, while the second identifies the exclusive end of the range. The third argument is a function that writes a number to the console.
Now that we’re dealing with concurrency, there’s a subtle bug in the previous example. Internally, printfn
incrementally sends its text to System.Console.Out
as it parses the pattern. Hence, it’s possible that as each parallel iteration executes, multiple calls to printfn
will be invoked simultaneously, resulting in some items being interlaced.
The example used for this discussion is less of an issue in F# 3.1, where printf and its related functions have been improved such that they run up to 40 times faster than in previous releases.
We can address this problem a few ways. One approach is to control access to System.Console.Out
with the lock
operator. The lock
operator serves the same purpose as the lock
statement in C# (SyncLock
in Visual Basic) in that it prevents additional threads from executing a block of code until the locked resource is freed. Here is the previous example reworked to use locking:
Parallel.For(0, 100, fun n -> lock Console.Out (fun () -> printfn "%i" n))
There are times when locking is appropriate, but using it like this is a horrible idea. By locking, we negate most of the benefits of parallelizing the loop because only one item can be written at a time! Instead, we want to try another approach that avoids locking and doesn’t interlace the results.
One of the easiest ways to achieve a satisfactory result is with function composition. Here, we use the sprint
function to format the number and send that result to Console.WriteLine
:
Parallel.For(0, 100, (sprintf "%i") >> Console.WriteLine)
This approach works because each call to sprintf
writes to an isolated StringBuilder
rather than a shared TextWriter
. This eliminates the need to lock, thereby eliminating a potential bottleneck in your application.
Unlike F#’s built-in for
loops, parallel loops provide some short-circuiting mechanisms by means of the ParallelLoopState
class’s Break
and Stop
methods. The TPL handles creating and managing the loop state, so all you need to do to access either of these methods is use one of the overloads that exposes it. Consider the following shortCircuitExample
function:
open System.Collections.Concurrent open System.Threading.Tasks let shortCircuitExample shortCircuit = let bag = ConcurrentBag<_>() Parallel.For( 0, 999999, ① fun i s -> if i < 10000 then bag.Add i else shortCircuit s) |> ignore (bag, bag.Count)
Like the previous examples, the shortCircuitExample
function uses Parallel.For
, but notice at ① that the supplied function accepts two parameters instead of one. The second parameter, s
, is the loop state.
With shortCircuitExample
in place we can now invoke it, passing a function that accepts a ParallelLoopState
instance and calls either Stop
or Break
, like this:
shortCircuitExample (fun s -> s.Stop()) |> printfn "%A" shortCircuitExample (fun s -> s.Break()) |> printfn "%A"
Both of the preceding lines will force the parallel loop to terminate before all iterations complete, but they have very different effects. Stop
causes the loop to terminate at its earliest convenience but allows any iterations that are executing to continue. Break
, on the other hand, causes the loop to terminate at its earliest convenience after the current iteration. You also need to take care that you do not call Stop
and Break
in succession to avoid an InvalidOperationException
.
The difference between these two methods can be drastic. For example, in one run on my desktop, the Break
version processed 10,000 items, whereas the Stop
version processed only 975.
Cancelling a parallel for
loop is similar to short-circuiting, except that instead of using the Stop
or Break
methods to terminate the loop from within, you identify an external cancellation token that the loop monitors and responds to. Unlike the short-circuiting mechanism, cancellation forces all tasks configured with the same token to stop. Cancelling does raise an OperationCanceledException
, so you’ll want to handle that accordingly.
The following function demonstrates cancelling a parallel for
loop:
open System open System.Threading.Tasks let parallelForWithCancellation (wait : int) = use tokenSource = new ① System.Threading.CancellationTokenSource(wait) try Parallel.For( 0, Int32.MaxValue, ② ParallelOptions(③ CancellationToken = ④ tokenSource.Token), fun (i : int) -> Console.WriteLine i ) |> ignore with | :? ⑤ OperationCanceledException -> printfn "Cancelled!" | ex -> printfn "%O" ex
In the preceding code, we create a CancellationTokenSource
at ①. This object is initialized to automatically cancel after a provided number of milliseconds. Inside the try
block, we use an overload of Parallel.For
that accepts a ParallelOptions
instance as shown at ②. Through this ParallelOptions
instance, we initialize the CancellationToken
property ③ to the token exposed by the CancellationTokenSource
④. When the token source’s internal timer expires, the parallel loop raises an exception, which is then caught and handled at ⑤.Although we relied on a CancellationTokenSource
that automatically cancelled, you can manually force cancellation by calling the Cancel
method, typically from another task or thread.
Task parallelism gives you the most control over executing code in parallel while still abstracting many of the implementation details from you.
Tasks can be created and started in several ways. The easiest, but least flexible, way is the Parallel.Invoke
method, which accepts one or more functions to execute concurrently and implicitly waits for them to finish, like this:
open System open System.Threading.Tasks Parallel.Invoke( (fun () -> printfn "Task 1"), (fun () -> Task.Delay(100).Wait() printfn "Task 2"), (fun () -> printfn "Task 3") ) printfn "Done"
Here, Parallel.Invoke
creates and starts three independent tasks. The first and third tasks simply print a message, while the second task waits 100 milliseconds before printing its message.
Parallel.Invoke
limits what you can do because it doesn’t expose any information about the individual tasks, nor does it provide any feedback about whether the tasks succeeded or failed. You can catch and handle exceptions raised by the tasks and cancel them by providing a cancellation token (similar to the approach used in Cancelling Parallel Loops), but that’s about it. When you want to do anything more advanced with tasks, you’ll need to create them manually.
There are two ways to create tasks manually: directly via a constructor, or through a TaskFactory
. For our purposes, the primary difference between the two approaches is that when creating tasks with the constructor you must manually start them. Microsoft recommends favoring the TaskFactory
when task creation and scheduling don’t need to be separated.
To create a new task with the Task
constructor, you need only provide a function that serves as the task’s body, like this:
open System.Threading.Tasks let t = new Task(fun () -> printfn "Manual Task")
This creates a new task that prints a string. To start the task, call its Start
method.
t.Start()
Alternatively, you can combine the two steps into one with a TaskFactory
. Conveniently, the Task
class has a static Factory
property that is preset to a default TaskFactory
, so you don’t need to create one on your own. Here, we create and start a task using the default factory’s StartNew
method:
open System.Threading.Tasks let t = Task.Factory.StartNew(fun () -> printfn "Factory Task")
The tasks we’ve looked at so far simply invoke an action, but you also need to know how to return a value—a commonly needed but cumbersome process under traditional asynchronous models. The TPL makes returning values trivial through a generic Task<'T>
class, where 'T
represents the task’s return type.
The random-number generation used in the following examples is sufficient for demonstration purposes, but be aware that the System.Random
class is not thread-safe and even creating a new instance per task may not be sufficient. Should your solution require a more robust approach to concurrently generating random numbers, I recommend reading Stephen Toub’s article on the subject at http://blogs.msdn.com/b/pfxteam/archive/2009/02/19/9434171.aspx.
Creating tasks that return values is almost identical to the basic tasks we’ve already looked at. The Task<'T>
class provides a set of constructor overloads that are comparable to that of the non-generic Task
class, and the TaskFactory
includes a generic overload of StartNew
. To demonstrate, let’s use StartNew<'T>
to create and run a task that returns a random number.
let t = Task.Factory.StartNew(fun () -> System.Random().Next())
The only truly notable thing about this example is that the function passed to StartNew
returns an integer and the generic overload is inferred. Of course, returning a value doesn’t do much good without a way to access that value, and that’s why Task<'T>
provides the Result
property, which will contain the return value when the task completes. Here, we see how to access the return value:
t.Result |> printfn "Result: %i"
Because this is an asynchronous operation, there’s no guarantee that the task has completed executing before the Result
property is accessed. For this reason, Result
’s get
accessor checks whether the task has completed and waits for it to complete if necessary before returning its result. It’s more typical to access the result as part of a continuation (as shown a bit later in this chapter) than immediately after the task starts.
When your program depends on one or more tasks completing before it can continue processing, you can wait for those tasks using one of the wait mechanisms. For convenience, the examples in this section will use the following function, which returns a new function that sleeps for a random amount of time (simulating a long-running operation lasting up to delayMs
) before printing a message:
let randomWait (delayMs : int) (msg : string) = fun () -> (System.Random().Next delayMs |> Task.Delay).Wait() Console.WriteLine msg
We can use the TaskFactory
to create a task and wait for it to complete with the task’s Wait
method like this:
let waitTask = Task.Factory.StartNew(randomWait 1000 "Task Finished")
waitTask.Wait()
printfn "Done Waiting"
In this code, a new task is created and started, but the message “Done Waiting” won’t be written to the console until it completes due to the explicit wait. This can be helpful when subsequent code is dependent upon the task’s completion.
You’ll often want to run a number of tasks in parallel and block until one completes. To do so, you can use the static WaitAny
method from the Task
class. The most basic WaitAny
overload accepts a params
array of tasks and will stop blocking as soon as one of the tasks in the array completes. Here, we pass three started tasks to WaitAny
:
Task.WaitAny( Task.Factory.StartNew(randomWait 2000 "Task 0 Finished"), Task.Factory.StartNew(randomWait 2000 "Task 1 Finished"), Task.Factory.StartNew(randomWait 2000 "Task 2 Finished")) Console.WriteLine "Done Waiting"
When any of the three tasks complete, WaitAny
will stop blocking, thus allowing execution to continue to the Console.WriteLine
call. Note that WaitAny
doesn’t kill the remaining tasks when it unblocks, so they’ll continue executing in parallel with the source thread.
Similar to WaitAny
, the Task
class provides a static WaitAll
method. WaitAll
also accepts a params
array of tasks, but instead of allowing execution to continue when one task completes, WaitAll
unblocks only when all of the tasks have completed. Because the code differs only by which method is called, I haven’t included a sample, but I encourage you to experiment with each. As you do so, run each form several times and observe the differences.
Traditionally, whenever you wanted to execute some code as soon as some parallel or asynchronous code completed, you needed to pass a function, called a callback, to the asynchronous code. In .NET, callbacks have typically been implemented through the built-in AsyncCallback
delegate type.
Using callbacks is effective, but they can complicate the code and be tricky to maintain. The TPL greatly simplifies this process with continuations, which are tasks configured to start when one or more tasks, called antecedents, complete.
The simplest continuations are created from individual tasks. Let’s start by creating a task that will serve as an antecedent:
let antecedent = new Task<string>( fun () -> Console.WriteLine("Started antecedent") System.Threading.Thread.Sleep(1000) Console.WriteLine("Completed antecedent") "Job's done")
Now that we have a task, we can set up a continuation by passing a function to the task’s ContinueWith
method, like so:
let continuation = antecedent.ContinueWith( fun ① (a : Task<string>) -> Console.WriteLine("Started continuation") Console.WriteLine("Antecedent status: {0}", a.Status) Console.WriteLine("Antecedent result: {0}", a.Result) Console.WriteLine("Completed continuation"))
As you can see, creating a continuation is very similar to creating a regular task, but notice at ① how the function passed to the ContinueWith
method accepts a parameter of type Task<string>
. This parameter represents the antecedent so that the continuation can branch according to the antecedent’s status (for example, RanToCompletion
, Faulted
, Canceled
, and so on) or its result if it has one.
At this point, neither task has been started, so we’ll start antecedent
. When it completes, the TPL will automatically start continuation
. We can observe this behavior as follows:
antecedent.Start() Console.WriteLine("Waiting for continuation") continuation.Wait() Console.WriteLine("Done")
which should print the following messages:
Waiting for continuation Started antecedent Completed antecedent Started continuation Antecedent status: RanToCompletion Completed continuation Done
The ContinueWith
method is useful when you’re dealing with only a single task. When you have multiple tasks, you can turn to the TaskFactory
’s ContinueWhenAny
or ContinueWhenAll
methods. Like their WaitAny
and WaitAll
counterparts, the ContinueWhenAny
and ContinueWhenAll
methods will start the continuation task when any or all of the tasks in an array complete, respectively. For brevity we’ll focus on the ContinueWhenAll
method.
let antecedents = [| new Task( fun () -> Console.WriteLine("Started first antecedent") System.Threading.Thread.Sleep(1000) Console.WriteLine("Completed first antecedent")) new Task( fun () -> Console.WriteLine("Started second antecedent") System.Threading.Thread.Sleep(1250) Console.WriteLine("Completed second antecedent")) new Task( fun () -> Console.WriteLine("Started third antecedent") System.Threading.Thread.Sleep(1000) Console.WriteLine("Completed third antecedent")) |] let continuation = ① Task.Factory.ContinueWhenAll( antecedents, fun ② (a : Task array) -> Console.WriteLine("Started continuation") for x in a do Console.WriteLine("Antecedent status: {0}", x.Status) Console.WriteLine("Completed continuation")) for a in antecedents do a.Start() Console.WriteLine("Waiting for continuation") continuation.Wait() Console.WriteLine("Done")
ContinueWhenAny
follows the same pattern as WaitAny
. Here we’ve defined three tasks, which we manually start after creating the continuation at ①.Notice the continuation task’s parameter at ②. Instead of receiving a single antecedent task as you would with ContinueWith
or ContinueWhenAny
, continuations created with ContinueWhenAll
accept an array of tasks. This array contains all of the tasks supplied to ContinueWhenAll
instead of the individual task that caused the continuation to start. This allows you to inspect each antecedent and handle success and failure scenarios as granularly as you need.
Cancelling a task is fundamentally the same as cancelling a parallel for
loop, but it requires a bit more work because the parallel for
loops handle the cancellation details for you. The following function demonstrates cancelling a task and follows the typical pattern for handling the cancellation:
let taskWithCancellation (cancelDelay : int) (taskDelay : int) = ① use tokenSource = new System.Threading.CancellationTokenSource(cancelDelay) ② let token = tokenSource.Token try let t = Task.Factory.StartNew( (fun () -> ③ token.ThrowIfCancellationRequested() printfn "passed cancellation check; waiting" System.Threading.Thread.Sleep taskDelay ④ token.ThrowIfCancellationRequested()), token) ⑤ t.Wait() with | ex -> printfn "%O" ex printfn "Done"
As with cancelling parallel for
loops, we start by creating a CancellationTokenSource
at ①. For convenience, we then bind the token to a name at ② so we can reference it within the function the task is based upon. Within the task body, the first thing we do at ③ is call the token’s ThrowIfCancellationRequested
method, which interrogates the token’s Is CancellationRequested
property and throws an OperationCanceledException
if that property returns true
. We do this to ensure that no unnecessary work is performed if cancellation was requested when the task was started. When no exception is thrown, execution continues. At ④ we again check for cancellation to avoid a successful task completion. Finally, at ⑤ we wait for the task to complete so we can handle any exceptions thrown by the task.
Exceptions can be raised by any number of executing tasks at any time. When this happens, we need a way to capture and handle them. In the previous section, we handled the exception in a general manner—by matching any exception and writing it to the console. If you executed the taskWithCancellation
function, you may have noticed that the exception we caught wasn’t an OperationCanceledException
but rather an AggregateException
that included an OperationCanceledException
. The base exception classes aren’t well suited for parallel scenarios because they represent only a single failure. To compensate, a new exception type, AggregateException
, was introduced to allow us to report one or more failures within a single construct.
Although you certainly could handle an AggregateException
directly, you’ll typically want to find a specific exception within it. For this, the AggregateException
class provides the Handle
method, which iterates over the exceptions contained within its InnerExceptions
collection so you can find the exception you really care about and handle it accordingly.
try raise (AggregateException( NotSupportedException(), ArgumentException(), AggregateException( ArgumentNullException(), NotImplementedException()))) with | :? AggregateException as ex -> ex.Handle( ① Func<_, _>( function ② | :? AggregateException as ex1 -> ③ ex1.Handle( Func<_, _>( function | :? NotImplementedException as ex2 -> printfn "%O" ex2; true | _ -> true)) true | _ -> true))
Handling an AggregateException
follows the familiar exception-handling pattern: We match against the AggregateException
and bind it to the name ex
as you might expect. Inside the handler, we invoke the Handle
method ①, which accepts a Func<exn, bool>
indicating that the supplied function accepts an exception, and return a Boolean value. (To use pattern-matching functions as we’ve done here, we explicitly construct Func<_, _>
instances and allow the compiler to infer the proper type arguments.) Inside the pattern-matching function ②, we detect whether we have a nested AggregateException
and handle it at ③. At each level, we need to return a Boolean value indicating whether the particular exception was handled. If we return false
for any exception, a new AggregateException
which contains the unhandled exception will be raised.
Handling AggregateException
s like this can get quite cumbersome, complex, and tedious. Fortunately, AggregateException
provides another method, Flatten
, which simplifies error handling by iterating over the InnerExceptions
collection and recursing over each nested AggregateException
to construct a new AggregateException
instance that directly contains all of the exceptions within the source exception’s hierarchy. For example, we can revise the previous example to use Flatten
to simplify the handler, like this:
try
raise (AggregateException(
NotSupportedException(),
ArgumentException(),
AggregateException(
ArgumentNullException(),
NotImplementedException())))
with
| :? AggregateException as ex ->
ex.Flatten().Handle(
Func<_, _>(
function
| :? NotImplementedException as ex2 -> printfn "%O" ex2; true
| _ -> true))
In this revised example, we call Handle
against the flattened AggregateException
. With only one level to process, we can omit the checks for nested AggregateExceptions
and handle the NotImplementedException
directly.
Despite the many improvements that the TPL brings to asynchronous and parallel programming, F# offers its own model, which better matches the functional paradigm emphasized by the language. While it’s sometimes desirable to use the TPL in F# (particularly when working across language boundaries) you’ll often turn to F#’s asynchronous workflows, which are best suited for I/O-based operations.
Asynchronous workflows provide a uniform and idiomatic way to compose and execute asynchronous code against the thread pool. Furthermore, their very nature often makes it difficult (if not impossible) to fall into some of the asynchronous traps present even in the TPL.
Like our TPL discussion, this section is intended to give you a basic working knowledge of asynchronous workflows rather than serving as a comprehensive guide.
Asynchronous workflows are based on the Async<'T>
class that resides in the Microsoft.FSharp.Control
namespace. This type represents a bit of code you want to run asynchronously, ultimately returning some value. Instead of creating Async<'T>
instances directly, though, we compose them through async expressions much like we compose sequences or queries.
Async expressions take the following form:
async {
async-expressions }
Here, async-expressions represents one or more expressions that will participate in the asynchronous operation. In addition to the standard expressions we’ve seen throughout this book, asynchronous workflows allow you to easily invoke additional workflows and wait for results without blocking through specialized variants of some familiar keywords such as let
and use
. For instance, the let!
keyword invokes an asynchronous workflow and binds the result to a name. Similarly, the use!
keyword invokes an asynchronous workflow that returns a disposable object, binds the result to a name, and disposes of the object when it goes out of scope. It’s also possible to invoke an asynchronous workflow and immediately return the result with the return!
keyword.
To demonstrate, we’ll turn to the “hello world” example of asynchronous workflows: requesting multiple web pages. To begin, let’s define some functions that encapsulate the logic needed to create an asynchronous page request (note that a similar function, Http.AsyncRequestString
, is available in the FSharp.Data
framework):
open System open System.IO open System.Net type StreamReader with member x.AsyncReadToEnd () = async { do! Async.SwitchToNewThread() let content = x.ReadToEnd() do! Async.SwitchToThreadPool() return content } let getPage (uri : Uri) = async { let req = WebRequest.Create uri use! response = req.AsyncGetResponse() use stream = response.GetResponseStream() use reader = new StreamReader(stream) return! reader.AsyncReadToEnd() }
After opening the relevant namespaces, we extend the StreamReader
class with a single AsyncReadToEnd
method. This method, adapted from the F# PowerPack, is similar to the existing ReadToEndAsync
method except that instead of using the TPL, it returns an asynchronous workflow that we can evaluate as the final step of the larger workflow in the getPage
function where we describe how to make the page request. The overall flow of the expression is pretty standard: Create a WebRequest
, wait for the response, and then explicitly return the response stream’s contents.
The AsyncGetResponseMethod
is an extension method defined in the F# core library. It conveniently wraps the standard .NET code within another asynchronous workflow, which makes it possible to employ use! and greatly simplifies the code.
It’s important to recognize that getPage
doesn’t actually execute the request; it merely creates an instance of Async<string>
that represents the request. This allows us to define multiple requests or pass them around to other functions. We can even execute the request multiple times. To execute the request we need to turn to the static Async
class, which you can think of as a controller for asynchronous workflows.
There are a number of methods for starting an asynchronous workflow. Some common methods are listed in Table 11-1.
Method | Description |
| Starts an asynchronous workflow and waits for its result. |
| Starts an asynchronous workflow but does not wait for a result. |
| Starts an asynchronous workflow immediately using the current thread. Useful for UI updates. |
| Immediately starts an asynchronous workflow using the current thread, invoking a success, exception, or cancellation continuation depending on how the operation completed. |
The method you choose is largely dependent upon what the workflow does, but you’ll typically use Start
unless your application requires one of the others. The workflow created by the getPage
function returns the result of a web request. Since we’re making the request, we probably don’t want to ignore the result, so we’ll need to wire up a continuation to do something with it. The easiest way to do that is to wrap the call to getPage
inside another asynchronous expression, passing the result to another function when it completes, and starting the entire workflow with Start
. Here, we call getPage
and print the result:
async { let! content = Uri "http://nostarch.com" |> getPage content.Substring(0, 50) |> printfn "%s" } |> Async.Start
Alternatively, we can use the StartWithContinuations
method, which accepts an asynchronous workflow and three functions to invoke when the workflow finishes successfully, raises an exception, or is cancelled, respectively. The following code shows such an approach:
Async.StartWithContinuations( ① getPage(Uri "http://nostarch.com"), ② (fun c -> c.Substring(0, 50) |> printfn "%s..."), ③ (printfn "Exception: %O"), ④ (fun _ -> printfn "Cancelled") )
When the asynchronous operation ① completes successfully, the success continuation ② is invoked and the first 50 characters from the page source will be printed. Should the operation throw an exception, the exception continuation ③ will execute and print the exception. Finally, if the operation is cancelled, as described in Cancelling Asynchronous Workflows, the cancellation continuation ④ will execute and display a note informing the user of the cancellation.
Instead of relying on continuations, we can use the RunSynchronously
method and get the result directly, like this:
let html = Uri "http://nostarch.com" |> getPage |> Async.RunSynchronously
Of course, running a single asynchronous workflow like this defeats the purpose of running it asynchronously because RunSynchronously
waits for the result. Instead, RunSynchronously
is often used in conjunction with Async.Parallel
to run multiple workflows in parallel and wait for all of them to complete. For instance, we can make multiple requests, starting with an array of asynchronous workflows, as follows:
open System.Text.RegularExpressions [| getPage(Uri "http://nostarch.com") getPage(Uri "http://microsoft.com") getPage(Uri "http://fsharp.org") |] |> Async.Parallel |> Async.RunSynchronously |> Seq.iter (fun c -> let sample = c.Substring(0, 50) Regex.Replace(sample, @"[ ]| {2,}", "") |> printfn "%s...")
Here, we employ the Parallel
method to combine each of the asynchronous workflows into a single workflow that is then piped to the RunSynchronously
method. When each of the requests has completed, we iterate over the resulting array, stripping a few characters from the content for readability and printing the result.
In the previous section I indicated that asynchronous workflows can be cancelled. Just as in the TPL, asynchronous workflows use cancellation tokens to control cancellation. It’s possible, and sometimes even necessary, to manage tokens on your own, but in many cases you can rely on the Async
class’s default token.
For simple scenarios, such as when you’re starting a single work-flow via the Start
or StartWithContinuations
methods, you can use the CancelDefaultToken
method to cancel the workflow, like this:
① Async.StartWithContinuations( getPage(Uri "http://nostarch.com"), (fun c -> c.Substring(0, 50) |> printfn "%s..."), (printfn "Exception: %O"), (fun _ -> printfn "Cancelled") ) ② Async.CancelDefaultToken()
The StartWithContinuations
method ① monitors the default token and cancels the workflow when the token is marked as cancelled via the CancelDefaultToken
method ②. In this example, because the workflow is cancelled before it completes, the cancellation callback is invoked instead of the success callback, resulting in the cancellation message being displayed.
The TryCancelled
method, which accepts a workflow and a function that will be invoked when cancellation is requested, is a nice alternative for work-flows that don’t return a value. Here, the displayPartialPage
function wraps a call to getPage
within another asynchronous workflow. The outer workflow waits for the response and writes out the first 50 characters when the message is received. Because TryCancelled
returns yet another workflow and doesn’t automatically start it, we need to explicitly do so with a call to Start
.
let displayPartialPage uri = Async.TryCancelled( async { let! c = getPage uri Regex.Replace(c.Substring(0, 50), @"[ ]| {2,}", "") |> sprintf "[%O] %s..." uri |> Console.WriteLine }, (sprintf "[%O] Cancelled: %O" uri >> Console.WriteLine)) Async.Start(displayPartialPage (Uri "http://nostarch.com")) Async.CancelDefaultToken()
The default token is often sufficient for cancelling workflows. When you’re executing multiple workflows and want to coordinate cancellation or if you want more control over cancellation, you can supply your own. Consider what happens when you request three pages but request cancellation with the default token.
[| Uri "http://nostarch.com" Uri "http://microsoft.com" Uri "http://fsharp.org" |] |> Array.iter (fun u -> Async.Start(displayPartialPage u)) Async.CancelDefaultToken()
Executing the preceding code usually results in all three workflows being cancelled. (Usually, but not always, because there’s a chance that one or more workflows complete before the cancellation is handled.) To isolate each workflow’s cancellation, we can use an overload of the Start
method that accepts a user-specified token, like this:
open System.Threading let tokens = [| Uri "http://nostarch.com" Uri "http://didacticcode.com" Uri "http://fsharp.org" |] |> Array.map (fun u -> ① let ts = new CancellationTokenSource() Async.Start(displayPartialPage u, ② ts.Token) ts) ③ tokens.[0].Cancel() ④ tokens.[1].Cancel()
In this revised version, we use Array.map
to map each Uri
to a workflow with its own CancellationTokenSource
created at ①. We then pass the associated token to Async.Start
as the second argument ② before returning the CancellationTokenSource
. Finally, at ③ and ④, respectively, we request cancellation of the first and second requests, allowing the third to proceed as normal.
What’s especially nice about cancelling asynchronous workflows is that, unlike the TPL, cancellation tokens are propagated through the entire workflow automatically. This means that you don’t have to manually ensure that each new workflow is given a token, leaving you with cleaner code.
Because exceptions can and do occur within asynchronous workflows, it’s important to know how to handle them properly. There are a few exception-handling options available, but their utility may vary depending on what you’re doing.
The most uniform way to handle exceptions in an asynchronous work-flow is to wrap the potentially offending code inside a try...with
block within the async expression. For instance, we can provide a version of our getPage
function that handles exceptions raised during the page request and read, like this:
let getPageSafe uri = async { try let! content = getPage uri return Some content with | :? NotSupportedException as ex -> Console.WriteLine "Caught NotSupportedException" return None | :? OutOfMemoryException as ex -> Console.WriteLine "Caught OutOfMemoryException" return None | ex -> ex |> sprintf "Caught general exception: %O" |> Console.WriteLine return None }
There’s nothing unusual about the try...with
block in the preceding code—we simply wrap the asynchronous call to getPage
in the try...with
block and return a successful read as an option. Should the operation raise an exception, we match the exception type, print a message, and return None
.
Another way to handle exceptions from asynchronous workflows is the Async.Catch
method. Async.Catch
takes a more functional approach than StartWithContinuations
in that rather than accepting an exception-handling function, it returns Choice<'T, exn>
, where 'T
is the asynchronous workflow’s return type and exn
is the exception thrown by the workflow.
The Choice
type is a discriminated union with two union cases: Choice1Of2
and Choice2Of2
. For Async.Catch
, Choice1Of2
represents successful completion of the workflow and contains the result, whereas Choice2Of2
represents failure and contains the first raised exception.
Handling exceptions with Async.Catch
lets you structure your asynchronous code to create an idiomatic, pipelined data flow. For example, the following code shows how we can model an asynchronous operation as a series of function applications, beginning with a Uri
.
Uri "http://nostarch.com" |> getPage |> Async.Catch |> Async.RunSynchronously |> function | Choice1Of2 result -> Some result | Choice2Of2 ex -> match ex with | :? NotSupportedException -> Console.WriteLine "Caught NotSupportedException" | :? OutOfMemoryException -> Console.WriteLine "Caught OutOfMemoryException" | ex -> ex.Message |> sprintf "Exception: %s" |> Console.WriteLine None
Here, a Uri
is piped into the getPage
function to create an asynchronous workflow. The resulting workflow is piped into Async.Catch
to set up another workflow, which we then pipe to Async.RunSynchronously
so we can wait for the result. Finally, we pipe the Choice
into a pattern-matching function where we either return Some result
or handle the exception before returning None
.
In addition to the ThreadPool
-based asynchronous operations we’ve seen so far, the Async
class provides a few methods for working with TPL tasks. Most notable among them are StartAsTask
and AwaitTask
.
The StartAsTask
method invokes an asynchronous workflow as a TPL task. You would typically use this for CPU-bound operations or to expose an asynchronous workflow to code using the TPL in C# or Visual Basic. For instance, we can treat the result of our getPage
function as a TPL task like this:
Uri "http://nostarch.com" |> getPage |> Async.StartAsTask |> (fun t -> ① t.Result.Substring(0, 50)) |> printfn "%s"
The presence of the Result
property at ① indicates that the result of StartAsTask
is indeed a Task
. In a more real-world scenario, you likely wouldn’t fire off a task and immediately block by waiting for the result, but this example is intended only to show how to start an asynchronous workflow as a TPL Task
.
The StartAsTask
method is handy when you need to create a new task, but what about when you need to handle an existing task? Consider the DownloadStringTaskAsync
method added to the System.Net.WebClient
class in .NET 4.5. This method serves the same purpose as our getPage
function except that it encapsulates downloading a resource within a TPL task.
In C#, you can easily handle such methods with the async
modifier and await
operator, as shown here:
// C# // using System.Threading.Tasks private static ① async Task<string> GetPageAsync(string uri) { using (var client = new System.Net.WebClient()) { return ② await client.DownloadStringTaskAsync(uri); } } static void Main() { var result = GetPageAsync("http://nostarch.com").Result; Console.WriteLine("{0}", result.Substring(0, 50)); Console.ReadLine(); }
From a greatly simplified perspective, what happens in the preceding C# code is this: The async
modifier ① is applied to the GetPageAsync
method to signify that part of the method will run asynchronously. The await
operator ② then signifies that execution should return to the caller and the remainder of the method should be treated as a continuation to be executed when the task completes.
Asynchronous workflows allow us to follow a similar pattern in F# using the AwaitTask
method in combination with a TPL task and let!
, use!
, or return!
. Here is the corresponding code in F#:
// F# open System.Threading.Tasks let getPageAsync (uri : string) = async { use client = new System.Net.WebClient() ① return! Async.AwaitTask (client.DownloadStringTaskAsync uri) } async { ② let! result = getPageAsync "http://nostarch.com" result.Substring(0, 50) |> printfn "%s" } |> Async.Start
Although they’re not quite functionally equivalent (the C# version waits for the result in Main
while the F# version passes the result to a continuation), the F# approach is similar to that of C#. In the F# version, the asynchronous workflow created by the getPageAsync
function uses return!
and Async.AwaitTask
① to wait for the task to complete before returning the result. Then, in the second asynchronous workflow, let!
② is used to evaluate getPageAsync
, while printing the result is treated as a continuation.
As if the TPL and asynchronous workflows didn’t make parallel and asynchronous programming accessible enough, F# has borrowed a message-processing mechanism from Erlang. The MailboxProcessor<'T>
class implements a queue-based system for asynchronously routing messages (data items) to handlers using shared memory. This is especially useful in scenarios where multiple sources (clients) need to request something from a single target (server), the canonical example being a web server. Furthermore, because MailboxProcessor
instances are extremely lightweight, an application can manage thousands of them without breaking a sweat. This fact enables mailbox processors to work independently or together by passing messages between instances.
MailboxProcessor
instances are usually referred to as agents, and I’ll follow this convention throughout this section. In that regard, a common practice in agent-based programming is to alias MailboxProcessor<'T>
as Agent<'T>
as follows:
type Agent<'T> = MailboxProcessor<'T>
With the type aliased, we can create agents using the more convenient name.
I think the best way to understand agent-based programming is with an example. We’ll start with a simple agent that simply prints whatever is sent into it.
type Message = | Message of obj let echoAgent = ① Agent<Message>.Start( fun inbox -> ② let rec loop () = async { let! (Message(content)) = ③ inbox.Receive() printfn "%O" content ④ return! loop()} ⑤ loop())
In the preceding code, we create an agent called echoAgent
by passing a function to the Start
method as shown at ①. By convention, the function’s parameter is called inbox
because it’s the mailbox from which we’ll receive new messages. At ② we define the recursive loop
function, which we’ll call continually to receive new messages.
It’s certainly possible to loop imperatively using a while loop, but the recursive function is the more typical approach. Functional loops provide the additional benefit of easily allowing you to provide different looping logic when you need to manage multiple states. For instance, if your agent needs to behave differently in a paused state than a running state, you could define a pair of mutually recursive functions that both return a workflow that handles the corresponding state accordingly.
Inside the loop, we create an asynchronous workflow that first asynchronously receives a message from inbox
using the Receive
method as shown at ③.Next, the received message is printed before making an asynchronous recursive call to loop
at ④. Finally, at ⑤ we initiate recursion by making a standard, synchronous call to loop
.
With echoAgent
actively listening, we can send it some messages via the Post
method, like this:
> Message "nuqneH" |> echoAgent.Post;; nuqneH > Message 123 |> echoAgent.Post;; 123 > Message [ 1; 2; 3 ] |> echoAgent.Post;; [1; 2; 3]
As you can see, when echoAgent
receives a message, it is written to the console and then echoAgent
waits for another message, and the process repeats ad infinitum.
In the echoAgent
example, we used the Receive
method to get messages from the underlying queue. In many cases, Receive
is appropriate, but it makes it difficult to filter messages because it removes them from the queue. To selectively process messages, you might consider using the Scan
method instead.
Scanning for messages follows a different pattern than receiving them directly. Rather than processing the messages inline and always returning an asynchronous workflow, the Scan
method accepts a filtering function that accepts a message and returns an Async<'T>
option. In other words, when the message is something you want to process, you return Some<Async<'T>
; otherwise, you return None
. To demonstrate, let’s revise the echoAgent
to process only strings and integers.
let echoAgent2 = Agent<Message>.Start(fun inbox -> let rec loop () = inbox.Scan(fun (Message(x)) -> match x with | ① :? string | ② :? int -> Some (async { printfn "%O" x return! loop() }) | _ -> printfn "<not handled>"; None) loop())
At ① and ② you can see standard dynamic type-test patterns used to filter incoming messages to strings and integers, respectively. When the message is one of those two types, we associate an asynchronous workflow with Some
and return it. For all other messages, we return None
. Scan
the nexamines the returned value, and when it is Some
, the message is consumed(removed from the queue) and the workflow is invoked. When the returned value is None
, Scan
immediately waits for another message.
Passing messages to echoAgent2
is the same as before—just pass the messages via the Post
method:
> Message "nuqneH" |> echoAgent2.Post;; nuqneH > Message 123 |> echoAgent2.Post;; 123 > Message [ 1; 2; 3 ] |> echoAgent2.Post;; <not handled>
Scanning for messages does offer some flexibility with how you process messages, but you need to be mindful of what you’re posting to the agent because messages not processed by Scan
remain in the queue. As the queue size increases, scans will take longer to complete, so you can quickly run into performance issues using this approach if you’re not careful. You can see how many messages are in the queue at any time by inspecting the CurrentQueueLength
property. If you need to remove messages from the queue, you can do so by invoking Receive
for each message in the queue, but needing to do so is probably indicative of a larger design problem that should be addressed.
The agents we’ve created so far have been self-contained: They receive a message, do something with it, and wait for another message. Agents don’t have to work in isolation, though. One way you can make agents more interactive is by having them reply via an AsyncReplyChannel
. To demonstrate, let’s revise echoAgent
again, but this time, instead of printing a message within the agent, we’ll have it reply.
① type ReplyMessage = | ReplyMessage of obj * AsyncReplyChannel<obj> let echoAgent3 = Agent.Start(fun inbox -> let rec loop () = async { let! ② (ReplyMessage(m, c)) = inbox.Receive() ③ c.Reply m return! loop() } loop())
The overall structure of echoAgent3
doesn’t differ much from the previous versions. For convenience, we’re using a discriminated union ① for our message type as is typical in agent-based programming. In this case, the ReplyMessage
union type has a single case with two associated values, an object and the reply channel.
Inside the loop body, we use pattern matching ② to identify the union case and extract the message and channel. We then pass the message to the channel’s Reply
method ③ before repeating. Now all that’s left is to send a message to the agent.
ReplyMessage
’s second value is an AsyncReplyChannel<obj>
, as you’ve already seen. In theory we could manually construct a reply channel and send the ReplyMessage
to the agent with the Post
method, but then we’d have to handle waiting for the result manually. There are much better ways to get the reply channel—namely, the PostAndReply
method and its variants.
The PostAndReply
methods differ a bit from Post
in that, instead of accepting the message directly, they are higher-order functions that accept a function that takes in a preconstructed reply channel and returns the fully constructed method. For our purposes, we’ll simply create a ReplyMessage
like this:
echoAgent3.PostAndReply(fun c -> ReplyMessage("hello", c)) |> printfn "Response: %O"
Internally, PostAndReply
(and its variants) construct reply channels that they pass on to the supplied function, which then creates the message that is ultimately posted to the agent.
Now that you’ve seen a variety of ways to create and interact with agents, let’s look at a more interesting example that ties together several of the concepts for something a bit more useful than simply regurgitating its input: an agent-based calculator. We’ll begin by defining a discriminated union that represents the messages the calculator will support.
type Operation = | Add of float | Subtract of float | Multiply of float | Divide of float | Clear | Current of AsyncReplyChannel<float>
The Operation
union type defines six cases. Of those, four represent basic mathematical operations and have an associated float
that is used in the calculation. The Clear
case allows us to clear the stored value. Finally, the Current
case lets us interrogate the agent for its current value using its associated reply channel. From this definition, we can create a new agent that handles each case as follows:
let calcAgent = Agent.Start(fun inbox -> let rec loop total = async { let! msg = inbox.Receive() let newValue = match msg with | Add x -> total + x | Subtract x -> total - x | Multiply x -> total * x | Divide x -> total / x | Clear -> 0.0 | Current channel -> channel.Reply total total return! loop newValue } loop 0.0)
Even though calcAgent
appears to keep a running total, it is a bit of an illusion in that we keep state only by passing a value (total
) to the recursive loop
function. When calcAgent
receives a message, it uses pattern matching to determine the appropriate action, binding the result to newValue
. For instance, when it receives an Add
, Subtract
, Multiply
, or Divide
operation, it applies the corresponding mathematical operation to total
. Similarly, when it receives a Clear
operation, it simply returns 0.0
and Current
returns total
after replying.
To see calcAgent
in action, we just need to send it some messages:
[ Add 10.0 Subtract 5.0 Multiply 10.0 Divide 2.0 ] |> List.iter (calcAgent.Post) calcAgent.PostAndReply(Current) |> printfn "Result: %f" calcAgent.Post(Clear) calcAgent.PostAndReply(Current) |> printfn "Result: %f"
In the preceding snippet, we simply pass a list of Operations
to List.iter
, posting each message to calcAgent
. When those have been processed, we query for the current value, clear, and then query again to ensure that the total has been zeroed out. Invoking the preceding snippet results in the following:
Result: 25.000000 Result: 0.000000
Asynchronous and parallel programming have long been viewed as tools for specialized software and reserved for experienced developers. With processor manufacturers improving processor performance by adding cores instead of increasing clock speed, software developers can no longer solve performance issues solely by upgrading hardware, nor can they continue expecting users to wait for long-running operations to complete before returning control.
Languages such as F# make asynchronous and parallel programming more accessible by providing multiple, robust mechanisms. The TPL makes it easy for developers to efficiently handle CPU-bound operations such as processing large data sets while effectively using available system resources. Language features such as asynchronous workflows excel at keeping applications responsive during IO-based operations such as web requests or file accesses. Finally, agent-based programming lets you easily coordinate complex systems by firing off individual asynchronous processes without having to directly manage the complexity of traditional thread-based models. Together, these approaches help you build scalable, responsive applications that meet the demands of modern computing while keeping you focused on the actual problems your software is trying to solve.