With version 4.0 of the .NET Framework, Microsoft introduced a new model for writing applications that need to perform multiple simultaneous tasks—that model is known as parallel programming, and the implementation is called the Task Parallel Library. Unlike the traditional approach to multitasking, where you create and manage a set of threads in your code, the new parallel programming model lets you focus on the tasks you need to accomplish and allows the runtime to create and manage the threads on your behalf.
There key advantage of this approach is that your code is focused on the tasks you need to perform, not the way in which they will be performed. The main disadvantage is that you give up direct control of the behavior of your application—so, for many applications, the new parallel programming model will be ideal, but for those applications that require careful control and management (and for those programmers who cannot let go), we refer you to Chapter 4, which covers the traditional threading approach. The recipes in this chapter describe how to perform the following tasks:
Performing simple parallel tasks (recipe 15-1)
Writing more complex tasks (recipes 15-2, 15-6, and 15-7)
Managing tasks (recipes 15-3, 15-5 and 15-8)
Working in parallel with data (recipes 15-4 and 15-9)
Use the Invoke
method of the System.Threading.Parallel
class, passing in an instance of the System.
Action
delegate for each method you wish to run.
The Invoke
method of the Parallel
class is the simplest way to add multitasking to your application. You simply provide a set of Action
delegates, each of which wraps around a method you wish to invoke. The .NET Framework takes care of the rest—threads are created and managed automatically on your behalf.
The Parallel.Invoke
method can only be used to invoke methods that do not return a result. See the other recipes in this chapter for more complex examples.
The following example invokes three methods concurrently, each of which writes a series of messages to the console. In order to simulate a time-intensive task, these methods call Thread.
S
leep
to slow down the progress of the application—something that you would not do with a real application.
We have created the Action
delegates explicitly to make the example as clear as possible, but a more elegant approach is to use lambda expressions, so that
Parallel.Invoke( new Action(writeDays), new Action(writeMonths), new Action(writeCities) );
would be written as
Parallel.Invoke( () => writeDays(), () => writeMonths(), () => writeCities() );
The remaining recipes in this chapter use lambda expressions.
using System; using System.Threading; using System.Threading.Tasks; namespace Recipe15_01 { class Recipe15_01 { static void Main(string[] args) { Console.WriteLine("Press enter to start"); Console.ReadLine();
// Invoke the methods we want to run. Parallel.Invoke( new Action(writeDays), new Action(writeMonths), new Action(writeCities) ); // Wait to continue. Console.WriteLine(" Main method complete. Press Enter"); Console.ReadLine(); } static void writeDays() { string[] daysArray = { "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday" }; foreach (string day in daysArray) { Console.WriteLine("Day of the Week: {0}", day); Thread.Sleep(500); } } static void writeMonths() { string[] monthsArray = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; foreach (string month in monthsArray) { Console.WriteLine("Month: {0}", month); Thread.Sleep(500); } } static void writeCities() { string[] citiesArray = { "London", "New York", "Paris", "Tokyo", "Sydney" }; foreach (string city in citiesArray) { Console.WriteLine("City: {0}", city); Thread.Sleep(500); } } } }
Create typed instances of the Task
class by passing function delegates to the generic-typed static System.Threading.Task
<>
.Factory.StartNew
method. Use the Task.Result
property to obtain the result from your task.
For anything other than simple tasks, such as those in the previous recipe, you use the Task
class to write parallel applications. New tasks are created (and automatically started) when you call the Task
<>.Factory.StartNew
method, passing in a function delegate as the argument. You obtain the result of your task through the Task.Result
property.
The StartNew
method creates and starts a new task in one step. If you need to create tasks and start them later, you can create instances of Task
directly with the class constructors and start them running using the Start
method.
The following example modifies the task methods from the previous recipe to return how many items have been printed out. We call the Result
property for each task and write it to the console. Notice that when running the example, the results are intermingled with the output from the tasks themselves, as shown following:
. . . Month: Jul Day of the Week: Sunday Month: Aug7 days were written
Month: Sep Month: Oct Month: Nov Month: Dec12 months were written
5 cities were written
. . .
This happens because the Result
property blocks until the task has completed. See the following recipes for different ways to wait for tasks to complete.
using System; using System.Threading; using System.Threading.Tasks; namespace Recipe15_02 { class Recipe15_02 { static void Main(string[] args) { Console.WriteLine("Press enter to start"); Console.ReadLine(); // Create the tasks. Task<int> task1 = Task<int>.Factory.StartNew(() => writeDays()); Task<int> task2 = Task<int>.Factory.StartNew(() => writeMonths()); Task<int> task3 = Task<int>.Factory.StartNew(() => writeCities()); // Get the results and write them out. Console.WriteLine("{0} days were written", task1.Result); Console.WriteLine("{0} months were written", task2.Result); Console.WriteLine("{0} cities were written", task3.Result); // Wait to continue. Console.WriteLine(" Main method complete. Press Enter"); Console.ReadLine(); } static int writeDays() { string[] daysArray = { "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday" }; foreach (string day in daysArray) { Console.WriteLine("Day of the Week: {0}", day); Thread.Sleep(500); } return daysArray.Length; } static int writeMonths() { string[] monthsArray = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
foreach (string month in monthsArray) { Console.WriteLine("Month: {0}", month); Thread.Sleep(500); } return monthsArray.Length; } static int writeCities() { string[] citiesArray = { "London", "New York", "Paris", "Tokyo", "Sydney" }; foreach (string city in citiesArray) { Console.WriteLine("City: {0}", city); Thread.Sleep(500); } return citiesArray.Length; } } }
The Wait
method is called on a Task
instance and blocks until the task is complete. The static WaitAll
and WaitAny
methods take an array of tasks as parameters—the WaitAll
method blocks until all of the Task
s in the array have completed, and the WaitAny
method blocks until any one of the Task
s is finished. These methods also accept an int
argument that will block for the specific number of milliseconds and then continue regardless of whether the task or tasks have completed. The IsCompleted
property of the Task
class is used to determine whether a task has finished.
This example changes the code from the previous recipe to wait for all of the tasks we created using the WaitAll
method. In the previous example, the results of the tasks were reported as each result we requested became available—this example waits for all of the tasks to complete before obtaining the results.
using System; using System.Threading; using System.Threading.Tasks; namespace Recipe15_03 { class Recipe15_03 { static void Main(string[] args) { Console.WriteLine("Press enter to start"); Console.ReadLine(); // Create the tasks. Task<int> task1 = Task<int>.Factory.StartNew(() => writeDays()); Task<int> task2 = Task<int>.Factory.StartNew(() => writeMonths()); Task<int> task3 = Task<int>.Factory.StartNew(() => writeCities()); // Wait for all of the tasks to complete. Task.WaitAll(task1, task2, task3); // Get the results and write them out. Console.WriteLine("{0} days were written", task1.Result); Console.WriteLine("{0} months were written", task2.Result); Console.WriteLine("{0} cities were written", task3.Result); // Wait to continue. Console.WriteLine(" Main method complete. Press Enter"); Console.ReadLine(); } static int writeDays() { string[] daysArray = { "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday" }; foreach (string day in daysArray) { Console.WriteLine("Day of the Week: {0}", day); Thread.Sleep(500); } return daysArray.Length; }
static int writeMonths() { string[] monthsArray = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; foreach (string month in monthsArray) { Console.WriteLine("Month: {0}", month); Thread.Sleep(500); } return monthsArray.Length; } static int writeCities() { string[] citiesArray = { "London", "New York", "Paris", "Tokyo", "Sydney" }; foreach (string city in citiesArray) { Console.WriteLine("City: {0}", city); Thread.Sleep(500); } return citiesArray.Length; } } }
Use the System.Threading.Parallel.ForEach
method to create a new task to process each of the elements in a collection. Optionally, use System.Threading.ParallelOptions
to limit the degree of parallelism that will be used.
The static Parallel.ForEach
method accepts a collection, a function delegate, and an optional instance of ParallelOptions
as arguments. A new task is created to process each element in the collection using the function referenced by the delegate. The number of concurrent tasks is controlled by the ParallelOptions.MaxDegreeOfParallelism
property—a value of −1
means that the degree of parallelism will be determined by the runtime, whereas a value of 1
or more limits the number of tasks that will run at the same time (a value of 0
will throw an exception).
The following example creates tasks to process each element of a simple array using the printNumbers
method. We have called Thread.Sleep
in this method to slow down the processing so that the example is clearer. We use the MaxDegreeOfParallelism
property of ParallelOptions
to ensure that at most two tasks are performed simultaneously—when running the example, notice that the output from the first two tasks is intermingled and then followed by the output from the third task.
using System; using System.Threading; using System.Threading.Tasks; namespace Recipe15_04 { class Recipe15_04 { static void Main(string[] args) { Console.WriteLine("Press enter to start"); Console.ReadLine(); // Define the data we want to process. int[] numbersArray = { 100, 200, 300 }; // Configure the options. ParallelOptions options = new ParallelOptions(); options.MaxDegreeOfParallelism = 2; // Process each data element in parallel. Parallel.ForEach(numbersArray, options, baseNumber => printNumbers(baseNumber)); Console.WriteLine("Tasks Completed. Press Enter"); Console.ReadLine(); } static void printNumbers(int baseNumber) { for (int i = baseNumber, j = baseNumber + 10; i < j; i++) { Console.WriteLine("Number: {0}", i); Thread.Sleep(100); } } } }
Create an instance of Task
for the initial activity using the class constructors (as shown in the previous recipes in this chapter), and then call the ContinueWith
method to create a Task
instance representing the next activity in the sequence. When you have created all of the Task
instances you require, call the Start
method on the first in the sequence.
The Task.ContinueWith
and Task.ContinueWith<>
methods create a new task that will continue upon completion of the Task
instance on which they are invoked. The previous task (known as the antecedent) is provided as an input parameter to the lambda expression in the ContinueWith
method—this can be used to check the states or get the result of the previous task, as shown in the following example.
The example for this recipe chains three tasks together. The first task adds some integer values. The second obtains the result from the first and prints it out, and the third task simply writes a message without reference to the previous tasks at all.
using System; using System.Threading; using System.Threading.Tasks; namespace Recipe15_05 { class Recipe15_05 { static void Main(string[] args) { Console.WriteLine("Press enter to start"); Console.ReadLine(); // Create the set of tasks. Task<int> firstTask = new Task<int>(() => sumAndPrintNumbers(100)); Task secondTask = firstTask.ContinueWith(parent => printTotal(parent)); Task thirdTask = secondTask.ContinueWith(parent => printMessage());
// Start the first task. firstTask.Start(); // Read a line to keep the process alive. Console.WriteLine("Press enter to finish"); Console.ReadLine(); } static int sumAndPrintNumbers(int baseNumber) { Console.WriteLine("sum&print called for {0}", baseNumber); int total = 0; for (int i = baseNumber, j = baseNumber + 10; i < j; i++) { Console.WriteLine("Number: {0}", i); total += i; } return total; } static void printTotal(Task<int> parentTask) { Console.WriteLine("Total is {0}", parentTask.Result); } static void printMessage() { Console.WriteLine("Message from third task"); } } }
You need to write a parallel algorithm with multiple phases, each of which must be completed before the next can begin.
Create an instance of the System.Threading.Barrier
class and call the SignalAndWait
method from your Task
code at the end of each algorithm phase.
The Barrier
class allows you to wait for a set of tasks to complete one part of an algorithm before moving onto the next. This is useful when the overall results from the one phase are required by all tasks in order to complete a subsequent phase. When creating an instance of Barrier
, you specify an integer as a constructor argument. In your Task
code, you call the SignalAndWait
method when you have reached the end of a phase—your Task
will block until the specified number of Task
s is waiting, at which point the Barrier
allows all of the waiting tasks to continue into the next phase. It is up to you to determine what constitutes each phase of your algorithm and to specify how many Tasks
must reach the barrier before the next phase can begin.
You can also specify an action to be performed when each phase is completed (i.e., after the required number of tasks have called the SignalAndWait
method, but before the tasks are allowed to continue to the next phase—the example for this recipe demonstrates how to do this with a lambda function.
It is important to ensure that you set the Barrier
instance to expect the correct number of tasks at each stage of your algorithm. If you tell the Barrier
to expect too few tasks, one phase may not have completed before the next begins. If you tell the Barrier
to expect too many tasks, a phase will never start, even though all of your tasks have completed the earlier phase. You can change the number of tasks a Barrier
will wait for by using the AddParticipant, AddParticipants, RemoveParticipant
, and RemoveParticipants
methods.
The following example shows a simple two-phase cooperative algorithm, performed by three tasks. When all of the tasks reach the barrier at the end of each phase, the notifyPhaseEnd
method is called.
using System; using System.Threading; using System.Threading.Tasks; namespace Recipe15_06 { class Recipe15_06 { static void Main(string[] args) { // Create the barrier. Barrier myBarrier = new Barrier(3, (barrier) => notifyPhaseEnd(barrier)); Task task1 = Task.Factory.StartNew( () => cooperatingAlgorithm(1, myBarrier)); Task task2 = Task.Factory.StartNew( () => cooperatingAlgorithm(2, myBarrier));
Task task3 = Task.Factory.StartNew( () => cooperatingAlgorithm(3, myBarrier)); // Wait for all of the tasks to complete. Task.WaitAll(task1, task2, task3); // Wait to continue. Console.WriteLine(" Main method complete. Press Enter"); Console.ReadLine(); } static void cooperatingAlgorithm(int taskid, Barrier barrier) { Console.WriteLine("Running algorithm for task {0}", taskid); // Perform phase one and wait at the barrier. performPhase1(taskid); barrier.SignalAndWait(); // Perform phase two and wait at the barrier. performPhase2(taskid); barrier.SignalAndWait(); } static void performPhase1(int taskid) { Console.WriteLine("Phase one performed for task {0}", taskid); } static void performPhase2(int taskid) { Console.WriteLine("Phase two performed for task {0}", taskid); } static void notifyPhaseEnd(Barrier barrier) { Console.WriteLine("Phase has concluded"); } } }
Call the Task.Wait
or Task.WaitAll
methods within a try
...
catch
block to catch the System.AggregateException
exception. Call the Handle
method of AggregateException
with a function delegate—the delegate will receive each exception that has been thrown by the Task
s. Your function should return true
if the exception can be handled, and false
otherwise.
Catching AggregateException
as it is thrown from Task.Wait
or Task.WaitAll
allows you to be notified of exceptions that are unhandled by your Task
. If an error has occurred, then you will catch a single instance of System.AggregateException
representing all of the exceptions that have been thrown.
You process each individual exception by calling the AggregateException.Handle
method, which accepts a function delegate (usually specified using a lambda expression)—the delegate will be called once for each exception that has been thrown by your task or tasks. Bear in mind that several threads may have encountered the same problem, and that you are likely to have to process the same exception type more than once. If you can handle the exception, your function delegate should return true
—returning false
will cause your application to terminate.
If you do not catch exceptions from Wait
or WaitAll
, then any exception thrown by a Task
will be considered unhandled and terminate your application.
The following example demonstrates how use the AggregateException.Handle
method to implement a custom exception handler function:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Recipe15_07 { class Recipe15_07 { static void Main(string[] args) { // Create two tasks, one with a null param. Task goodTask = Task.Factory.StartNew(() => performTask("good")); Task badTask = Task.Factory.StartNew(() => performTask("bad"));
try { Task.WaitAll(goodTask, badTask); } catch (AggregateException aggex) { aggex.Handle(ex => handleException(ex)); } // Wait to continue. Console.WriteLine(" Main method complete. Press Enter"); Console.ReadLine(); } static bool handleException(Exception exception) { Console.WriteLine("Processed Exception"); Console.WriteLine(exception); // Return true to indicate we have handled the exception. return true; } static void performTask(string label) { if (label == "bad") { Console.WriteLine("About to throw exception."); throw new ArgumentOutOfRangeException("label"); } else { Console.WriteLine("performTask for label: {0}", label); } } } }
Create an instance of System.Threading.CancellationTokenSource
and call the Token
property to obtain a System.Threading.CancellationToken
. Pass a function delegate that calls the Cancel
method of your Task
to the Register
method of CancellationToken
. Cancel your Task
by calling the Cancel
method of CancellationTokenSource
.
The System.Threading.CancellationTokenSource
class provides a mechanism to cancel one or more tasks. CancellationTokenSource
is a factory for System.Threading.CancellationToken
.
CancallationToken
has the property IsCancellationRequested
, which returns true
when the Cancel
method is called on the CancellationTokenSource
that produced the token. You can also use the Register
method to specify one or more functions to be called when the Cancel
method is called. The sequence for handling cancellation is as follows:
Create an instance of CancellationTokenSource
.
Create one or more Task
s to handle your work, passing CancellationToken
as a constructor parameter.
For each Task
you have created, obtain a CancellationToken
by calling Token
on the Cance
llationTokenSource
created in step 1.
Check the IsCancellationRequested
property of the token in your Task
body—if the property returns true
, then release any resources and throw an instance of OperationCanceledException
.
When you are ready to cancel, call the Cancel
method on the CancellationTokenSource
from step 1.
Note that you must throw an instance of OperationCanceledException
to acknowledge the task cancellation request.
The following example creates a CancellationToken
that is used to create an instance of Task
. A method to be called when the CancellationTokenSource
is canceled is registered with the Register
method. When Canc
e
llationTokenSource.Cancel
is called, the Task
is stopped and a message is written to the console.
using System; using System.Threading; using System.Threading.Tasks; namespace Recipe15_08 { class Recipe15_08 { static void Main(string[] args) { // Create the token source. CancellationTokenSource tokenSource = new CancellationTokenSource(); // create the cancellation token CancellationToken token = tokenSource.Token;
// Create the task. Task task = Task.Factory.StartNew(() => printNumbers(token), token); // register the task with the token token.Register(() => notifyTaskCanceled ()); // Wait for the user to request cancellation. Console.WriteLine("Press enter to cancel token"); Console.ReadLine(); // Canceling. tokenSource.Cancel(); } static void notifyTaskCanceled() { Console.WriteLine("Task cancellation requested"); } static void printNumbers(CancellationToken token) { int i = 0; while (!token.IsCancellationRequested) { Console.WriteLine("Number {0}", i++); Thread.Sleep(500); } throw new OperationCanceledException(token); } } }
One of the biggest problems when writing parallel or threaded code is ensuring that data is shared safely. Microsoft has introduced new classes in .NET 4.0 that are designed to be more efficient than using synchronization around the default collection classes, which we demonstrated in Chapter 4. The techniques demonstrated in Chapter 4 will work with the .NET parallel programming model, but the new collection classes may be more efficient for large-scale applications. Table 15-1 lists the most useful classes from the System.Collections.Concurrent
namespace.
Table 15.1. Useful System.Collections.Concurrent Classes
Class | Description |
---|---|
| A thread-safe collection of objects where no typing or ordering is assumed |
| A key/value pair collection |
| A first in, first out (FIFO) queue |
| A last in, first out (LIFO) stack |
These new collections take care of managing data automatically—you do not have to use synchronization techniques in your code.
The following example creates a ConcurrentStack
, which is then used by three Task
s.
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Recipe15_9 { class Recipe15_9 { static void Main(string[] args) { // Create a concurrent collection. ConcurrentStack<int> cStack = new ConcurrentStack<int>(); // create tasks that will use the stack Task task1 = Task.Factory.StartNew( () => addNumbersToCollection(cStack)); Task task2 = Task.Factory.StartNew( () => addNumbersToCollection(cStack)); Task task3 = Task.Factory.StartNew( () => addNumbersToCollection(cStack));
// Wait for all of the tasks to complete. Task.WaitAll(task1, task2, task3); // Report how many items there are in the stack. Console.WriteLine("There are {0} items in the collection", cStack.Count); // Wait to continue. Console.WriteLine(" Main method complete. Press Enter"); Console.ReadLine(); } static void addNumbersToCollection(ConcurrentStack<int> stack) { for (int i = 0; i < 1000; i++) { stack.Push(i); } } } }