Creating and Managing Tasks

TPL introduced the new task-based programming model to translate multicore power into application performance without having to work with low-level, more complex, and heavyweight threads. It is very important to understand that tasks aren't threads. Tasks run using threads. However, it doesn't mean they replace threads. In fact, all the parallel loops used in the previous examples run by creating tasks, and their parallel and concurrent execution is supported by underlying threads, as shown in Figure 19.8.

Figure 19.8 Tasks and their relationship with threads

19.8

When you work with tasks, they run their code using underlying threads (software threads, scheduled on certain hardware threads, or logical cores). However, there isn't a one-to-one relationship between tasks and threads. This means you aren't creating a new thread each time you create a new task. The CLR creates the necessary threads to support the tasks' execution needs. Of course, this is a simplified view of what goes on when creating tasks.

Synchronizing code running in multiple threads is indeed complex. Thus, a task-based alternative offers an excellent opportunity to leave some synchronization problems behind, especially those regarding work scheduling mechanisms. The CLR uses work-stealing queues to reduce the locks and to schedule small work chunks without adding a significant overhead. Creating a new thread introduces a big overhead, but creating a new task “steals” work from an existing thread whenever possible. Therefore, tasks offer a new lightweight mechanism for parts of code capable of taking advantage of multiple cores.

The default task scheduler relies on an underlying thread pool engine. Thus, when you create a new task, it will use the steal-working queues to find the most appropriate thread to enqueue it. It steals work from an existing thread or creates a new one when necessary. The code included in tasks will run in one thread, but this happens under the hood, and the overhead is smaller than manually creating a new thread.

System.Theading.Tasks.Task

So far, TPL has been creating instances of System.Threading.Tasks.Task under the hood in order to support the parallel execution of the iterations. In addition, calling Parallel.Invoke also creates as many instances of Task as delegates are called.

A Task represents an asynchronous operation. It offers many methods and properties that enable you to control its execution and get information about its status. The creation of a Task is independent of its execution. This means that you have complete control over the execution of the associated operation.


Note
When you launch many asynchronous operations as Task instances, the task scheduler will try to run them in parallel in order to load-balance all the available logical cores at run time. However, it isn't convenient to use tasks to run any existing piece of code, because tasks add an overhead. Sometimes it doesn't make sense to use tasks. Although this overhead is smaller than that added by a thread, it is still an overhead that has to be considered. For example, it doesn't make sense to create tasks to run two lines of code as two independent asynchronous tasks that solve very simple calculations. Remember to measure the speedups achieved between the parallel execution and the sequential version to decide whether parallelism is appropriate or not.

Table 19.2 explains the three possible situations considered in this example.

Table 19.2 Task Read-only Properties

Property Description
AsyncState A state Object supplied when you created the Task instance.
CreationOptions The TaskCreationOptions enum value used to provide hints to the task scheduler in order to help it make the best scheduling decisions.
CurrentId The unique ID for the Task being executed. It is not equivalent to a thread ID in unmanaged code.
Exception The AggregateException that caused the Task to end prematurely. It is a null value if the Task hasn't thrown exceptions at all or finished without throwing exceptions.
Factory Provides access to the factory methods that allow the creation of Task instances with and without results
Id The unique ID for the Task instance
IsCanceled A Boolean value indicating whether the Task instance was canceled
IsCompleted A Boolean value indicating whether the Task has completed its execution
IsFaulted A Boolean value indicating whether the Task has aborted its execution due to an unhandled exception
Status The TaskStatus value indicating the current stage in the life cycle of a Task instance

Understanding a Task's Life Cycle

It is very important to understand that each Task instance has a life cycle. However, it represents concurrent code potentially running in parallel according to the possibilities offered by the underlying hardware and the availability of resources at run time. Therefore, any information about the Task instance could change as soon as you retrieve it, because its states are changing concurrently.

A Task instance completes its life cycle just once. After it reaches one of its three possible final states, it doesn't go back to any previous state, as shown in the state diagram in Figure 19.9.

Figure 19.9 Status diagram for a Task instance

19.9

A Task instance has three possible initial states, depending on how it was created, as described in Table 19.3.

Table 19.3 Initial States for a Task Instance

Value Description
TaskStatus.Created A Task instance created using the Task constructor has this initial state. It will change once there is a call to either Start or RunSynchronously, or if the task is canceled.
TaskStatus.WaitingForActivation This is the initial state for tasks created through methods that allow the definition of continuations—that is, tasks that aren't scheduled until other dependent tasks finish their execution.
TaskStatus.WaitingToRun This is the initial state for a task created through TaskFactory.StartNew. It is waiting for the specified scheduler to pick it up and run it.

Next, the task status can transition to the TaskStatus.Running state, and finally move to a final state. If it has attached children, it isn't considered complete and will transition to the TaskStatus.WaitingForChildrenToComplete state. Once its children tasks complete, the task moves to one of the three possible final states shown in Table 19.4.

Table 19.4 Final States for a Task Instance

Value Description
TaskStatus.Canceled A cancellation request arrived before the task started its execution or during it. The IsCanceled property will be True.
TaskStatus.Faulted An unhandled exception in its body or the bodies of its children made the task end. The IsFaulted property will be True and the Exception property will be non-null and will hold the AggregateException that caused the task or its children to end prematurely.
TaskStatus.RanToCompletion The task completed its execution. It ran to the end of its body without being canceled or throwing an unhandled exception. The IsCompleted property will be True. In addition, IsCanceled and IsFaulted will be both False.

Using Tasks to Parallelize Code

In a previous example, you used Parallel.Invoke to launch two subroutines in parallel:

Parallel.Invoke(Sub() GenerateAESKeys(), Sub() GenerateMD5Hashes())

It is possible to do the same job using two instances of Task, as shown in the following code (code file: Listing13.sln). Working with instances of Tasks offers more flexibility to schedule and start independent and chained tasks that can take advantage of multiple cores.

’ Create the tasks
Dim t1 = New Task(Sub() GenerateAESKeys())
Dim t2 = New Task(Sub() GenerateMD5Hashes())
’ Start the tasks
t1.Start()
t2.Start()
’ Wait for all the tasks to finish
Task.WaitAll(t1, t2)

The first two lines create two instances of Task with a lambda expression to create a delegate for GenerateAESKeys and GenerateMD5Hashes. t1 is associated with the first subroutine, and t2 with the second. It is also possible to use multiline lambda expression syntax to define the action that the Task constructor receives as a parameter. At this point, the Status for both Task instances is TaskStatus.Created. The subroutines aren't running yet, but the code continues with the next line.

Starting Tasks

Then, the following line starts the asynchronous execution of t1:

t1.Start()

The Start method initiates the execution of the delegate in an independent way, and the program flow continues with the instruction after this method, even though the delegate has not finished its execution. The code in the delegate associated with the task runs concurrently and potentially in parallel with the main program flow, the main thread. This means that at this point, there is a main thread and another thread or threads supporting the execution of this new task.

The execution of the main program flow, the main thread, is synchronous. This means that it will continue with the next instruction, the line that starts the asynchronous execution of t2:

t2.Start()

Now the Start method initiates the execution of the delegate in another independent way and the program flow continues with the instruction after this method, even though this other delegate has not finished its execution. The code in the delegate associated with the task runs concurrently and potentially in parallel with the main thread and the code inside GenerateAESKeys that is already running. This means that at this point, there is a main thread and other threads supporting the execution of the two tasks.


Note
It is indeed easy to run asynchronous code using Task instances and the latest language improvements added to Visual Basic. With just a few lines, you can create code that runs asynchronously, control its execution flow, and take advantage of multicore microprocessors or multiple processors. Microsoft .NET Framework 4.5 went a step forward with the addition of the Async and Await modifiers to simplify their usage in asynchronous operations.

The sequence diagram in Figure 19.10 shows the parallel and asynchronous execution flow for the main thread and the two tasks.

Figure 19.10 Parallel and asynchronous execution flow for a main thread and two tasks

19.10

Visualizing Tasks Using Parallel Tasks and Parallel Stacks

The Visual Basic 2012 IDE improves the two debugging windows introduced in Visual Basic 2010: Parallel Tasks and Parallel Stacks. These windows offer information about the tasks that are running, including their status and their relationship with the underlying threads. It is very easy to understand the relationship between the tasks and the underlying threads in .NET Framework 4.5 when debugging simple parallelized code and running it step-by-step using the debugging windows, which enable you to monitor what is going on under the hood. By running the code step-by-step, you can see the differences between synchronous and asynchronous execution.

For example, if you insert a breakpoint on the line Task.WaitAll(t1, t2) and your microprocessor has at least two cores, you will be able to see two tasks running in parallel while debugging. To do so, select Debug ⇒ Windows ⇒ Parallel Tasks (Ctrl + Shift + D, K) while debugging the application. The IDE will display the Parallel Tasks dialogue shown in Figure 19.11, which includes a list of all the tasks and their status (Active).

Figure 19.11 Parallel Tasks dialogue with two active tasks

19.11

There are two tasks:

  • Task ID 1: <lambda10>() — Assigned to Worker thread ID 60
  • Task ID 2: <lambda11>() — Assigned to Worker thread ID 5464

Each of the two tasks is assigned to a different thread. The status for both tasks is Running, and they are identified by an auto-generated lambda name and number, <lambda10>() and <lambda11>(). This happens because the code uses lambda expressions to generate the delegates associated with each task.

If you double-click on a task name, the IDE will display the next statement that is going to run for the selected task. Remember that the threads assigned to these tasks and the main thread are running concurrently and potentially in different logical cores, according to the available hardware resources and the decisions taken by the schedulers.


Note
The CLR task scheduler tries to steal work from the most appropriate underlying thread, by consuming time from an idle one. It can also decide to create a new thread to support the task's execution. The operating system scheduler distributes the cores between the dozens or hundreds of threads scheduled to receive processor time from the available cores. This is why the same code can run with different parallelism levels and different concurrent times on the same hardware configuration.

You can check what is going on with each different concurrent task. You have similar options to those offered by previous Visual Basic versions with threads, but the information is better because you can check whether a task is scheduled or waiting-deadlocked. You can also order and group the information shown in the windows, as you can with any other Visual Basic IDE feature.

The Parallel Tasks grid includes a column named Thread Assignment. This number is the ID shown in the Threads window. Thus, you know which managed thread is supporting the execution of a certain task while debugging. You can also check the next statement and additional detailed information for each different thread. To do so, select Debug ⇒ Windows ⇒ Threads (Ctrl + Alt + H) while debugging the application. The IDE will display the Threads dialog shown in Figure 19.12, which includes a list of all the threads, their category, and their locations.

Figure 19.12 Threads dialog with 5 worker threads and the main thread

19.12

Note
There is a simpler way to visualize the relationship between tasks and threads. You can select Debug → Windows → Parallel Stacks (Ctrl + Shift + D, S). The IDE will display the Parallel Stacks window shown in Figure 19.13, which includes a diagram with all the tasks or threads, their status, and their relationships. The default view is Threads.

Figure 19.13 Parallel Stacks window displaying the default Threads view

19.13

The two threads on the right side of the diagram are running the code scheduled by the two tasks. Each thread shows its call stack. The thread that supports Module1.<lambda10> is running the GenerateAESKeys subroutine — specifically, code inside the call to the ConvertToHexString subroutine. The thread that supports Module1.<lambda11> is running the GenerateMD5Hashes subroutine. This diagram indicates what each thread is doing with a great level of detail.

You can change the value for the combo box in the upper-left corner from Threads to Tasks, and the IDE will display a diagram with all the tasks, including their status, relationships, and the call stack, as shown in Figure 19.14.

Figure 19.14 Parallel Stacks window displaying the Tasks view

19.14

Waiting for Tasks to Finish

At some point, you need to wait for certain tasks, started with an asynchronous execution, to finish. The following line calls the Task.WaitAll method, which will wait for the Task instances received as a ParamArray, separated by commas. This method has a synchronous execution, which means that the main thread won't continue with the next statement until the Task instances received as parameters finish their execution.

Task.WaitAll(t1, t2)

Here, t1 and t2 have to finish their execution. The current thread—in this case, the main thread—will wait until both tasks finish their execution. However, it is very important that this time waiting for the tasks to finish is not a loop continuously checking a status and consuming a lot of CPU cycles. The WaitAll method uses a lightweight mechanism to reduce the need for CPU cycles as much as possible. This way, once these tasks finish their execution, the next statement will run.

Because the WaitAll method uses a synchronous execution, if the tasks take one minute to run, then the thread where this method was called (in this case, the main thread) will be waiting for this amount of time. Therefore, sometimes you want to limit the number of milliseconds to wait for the tasks to finish. You can use another definition for the Task.WaitAll method that accepts an array of Task instances and the number of milliseconds to wait. The method returns a Boolean value indicating whether the tasks where able to finish within the specified timeout. The following code waits for t1 and t2 to finish their execution with a three-second timeout (code file: Snippet03.sln):

If Task.WaitAll(New Task() {t1, t2}, 3000) = False Then
    Console.WriteLine(
"GenerateAESKeys and GenerateMD5Hashes are taking 
more than 3 seconds to complete.")
    Console.WriteLine(t1.Status.ToString())
    Console.WriteLine(t2.Status.ToString())
End If

If t1 and t2 don't finish in three seconds, the code displays a message and the status for both tasks. If no exceptions occurred in the code for these tasks, they could be still running. The Task.WaitAll method with a specific timeout doesn't cancel the tasks if they take more time to run; it just returns from its synchronous execution with the Boolean result.

It is also possible to call the Wait method for a Task instance. In this case, the current thread will wait until that task finishes its execution. Of course, there is no need to send the task instance as a parameter because the Wait method is an instance method. The Task.Wait method also supports a timeout in one of its definitions. The following code waits for t1 to finish, and if it doesn't complete its work in three seconds, it displays a message and its status (code file: Snippet04.sln):

If t1.Wait (3000) = False Then
    Console.WriteLine("GenerateAESKeys is taking more than 3 seconds to complete.")
    Console.WriteLine(t1.Status.ToString())
End If

Cancelling Tasks Using Tokens

You can interrupt the execution of Task instances through the use of cancellation tokens. To do so, it is necessary to add some code in the delegate, in order to create an cancelable operation that is capable of terminating in a timely manner.

The following code (code file: Listing14.sln) shows two new versions of the AES keys and MD5 hash generators. The changes made in order to support cancellation appear in bold. The new GenerateAESKeysCancel, replacing the old GenerateAESKeys, receives a System.Threading.CancellationToken instance and throws an OperationCanceledException calling the ThrowIfCancellationRequested method. This way, the Task instance transitions to the TaskStatus.Canceled state and the IsCanceled property will be True.

Private Sub GenerateAESKeysCancel(ByVal ct As System.Threading.CancellationToken)
    ct.ThrowIfCancellationRequested()
    Dim sw = Stopwatch.StartNew()
    Dim aesM As New AesManaged()
    For i As Integer = 1 To NUM_AES_KEYS
        aesM.GenerateKey()
        Dim result = aesM.Key
        Dim hexString = ConvertToHexString(result)
        ' Console.WriteLine("AES: " + ConvertToHexString(result))
        If ct.IsCancellationRequested Then
            ct.ThrowIfCancellationRequested()
        End If
    Next
    Console.WriteLine("AES: " + sw.Elapsed.ToString())
End Sub

Private Sub GenerateMD5HashesCancel(ByVal ct As System.Threading.CancellationToken)
    ct.ThrowIfCancellationRequested()
    Dim sw = Stopwatch.StartNew()
    Dim md5M As MD5 = MD5.Create()
    For i As Integer = 1 To NUM_MD5_HASHES
        Dim data = Encoding.Unicode.GetBytes(i.ToString())
        Dim result = md5M.ComputeHash(data)
        Dim hexString = ConvertToHexString(result)
        ' Console.WriteLine(ConvertToHexString(result))
        If ct.IsCancellationRequested Then
            ct.ThrowIfCancellationRequested()
        End If
    Next
    Debug.WriteLine("MD5: " + sw.Elapsed.ToString())
End Sub

Sub Main()
    Dim cts As New System.Threading.CancellationTokenSource()
    Dim ct As System.Threading.CancellationToken = cts.Token

    Dim t1 = Task.Factory.StartNew(Sub() GenerateAESKeysCancel(ct), ct)
    Dim t2 = Task.Factory.StartNew(Sub() GenerateMD5HashesCancel(ct), ct)

    ' Sleep the main thread for 1 second
    Threading.Thread.Sleep(1000)

    cts.Cancel()

    Try
        If Task.WaitAll(New Task() {t1, t2}, 1000) = False Then
            Console.WriteLine(
"GenerateAESKeys and GenerateMD5Hashes are taking 
more than 1 second to complete.")
            Console.WriteLine(t1.Status.ToString())
            Console.WriteLine(t2.Status.ToString())
        End If
    Catch ex As AggregateException
        For Each innerEx As Exception In ex.InnerExceptions
            Debug.WriteLine(innerEx.ToString())
            ' Do something else considering the innerEx Exception
        Next
    End Try
    If t1.IsCanceled Then
        Console.WriteLine("The task running GenerateAESKeysCancel was canceled.")
    End If
    If t2.IsCanceled Then
        Console.WriteLine(
"The task running GenerateMD5HashesCancel was canceled.")
    End If
        ' Display the results and wait for the user to press a key
        Console.ReadLine()
End Sub

The first line of GenerateAESKeysCancel will throw the aforementioned exception if its cancellation was already requested at that time. This way, it won't start the loop if unnecessary at that point.

ct.ThrowIfCancellationRequested()

In addition, after each iteration of the loop, new code checks the token's IsCancellationRequested. If it is True, it calls the ThrowIfCancellationRequested method. Before calling this method, when IsCancellationRequested is True, it is possible to add cleanup code when necessary:

If ct.IsCancellationRequested Then
    ' It is important to add cleanup code here when necessary
    ct.ThrowIfCancellationRequested()
End If

Doing this adds a small amount of overhead to each iteration of the loop, but it is capable of observing an OperationCanceledException and compares its token to the Task instance's associated one. If they are the same and its IsCancelled property is True, the Task instance understands that there is a request for cancellation and makes the transition to the Canceled state, interrupting its execution. When there is code waiting for the canceled Task instance, this also generates an automatic TaskCanceledException, which is wrapped in an AggregateException.

In this case, the main subroutine creates a CancellationTokenSource, cts, and a Cancellation Token, ct:

    Dim cts As New System.Threading.CancellationTokenSource()
    Dim ct As System.Threading.CancellationToken = cts.Token

CancellationTokenSource is capable of initiating cancellation requests, and CancellationToken communicates it to asynchronous operations.

It is necessary to send a CancellationToken as a parameter to each task delegate; therefore, the code uses one of the definitions of the TaskFactory.StartNew method. The following lines create and start two Task instances with associated actions and the same CancellationToken instance (ct) as parameters:

    Dim t1 = Task.Factory.StartNew(Sub() GenerateAESKeysCancel(ct), ct)
    Dim t2 = Task.Factory.StartNew(Sub() GenerateMD5HashesCancel(ct), ct)

The preceding lines use the Task class Factory property to retrieve a TaskFactory instance that can be used to create tasks with more options than those offered by direct instantiation of the Task class. In this case, it uses the StartNew method, which is functionally equivalent to creating a Task using one of its constructors and then calling Start to schedule it for execution.

Then, the code calls the Sleep method to make the main thread sleep for one second. This method suspends the current thread for the indicated time—in this case, specified as an Integer in milliseconds:

Threading.Thread.Sleep(1000)

Note
The main thread remains suspended for one second, but the threads that are supporting the tasks' execution won't be suspended. Therefore, the tasks will be scheduled to begin their execution.

One second later, the main thread communicates a request for cancellation for both tasks through the CancellationTokenSource instance's Cancel method:

cts.Cancel()

The cancellation token is evaluated in the two delegates launched by the Task instances, as previously explained.

Adding a few lines, it is indeed easy to cancel asynchronous actions. However, it is very important to add the necessary cleanup code.

A TryCatchEnd Try block encloses the call to Task.WaitAll. Because there was a request for cancellation for both tasks, there will be two benign exceptions of type OperationCanceledException.

The IsCanceled property for both tasks is going to be True. Checking this property, you can add code whenever a task was canceled.

Handling Exceptions Thrown by Tasks

As many tasks run in parallel, many exceptions can occur in parallel. Task instances also work with a set of exceptions, handled by the previously explained System.AggregateException class.

The following code (code file: Listing15.sln) shows the highlighted lines that add an unhandled exception in the GenerateAESKeysCancel subroutine.

Comment the code that requested cancellation for both tasks:

’ cts.Cancel()
Private Sub GenerateAESKeysCancel(ByVal ct As System.Threading.CancellationToken)
    ct.ThrowIfCancellationRequested()
    Dim sw = Stopwatch.StartNew()
    Dim aesM As New AesManaged()
    For i As Integer = 1 To NUM_AES_KEYS
        aesM.GenerateKey()
        Dim result = aesM.Key
        Dim hexString = ConvertToHexString(result)
        ' Console.WriteLine("AES: " + ConvertToHexString(result))
        If (sw.Elapsed.Seconds > 0.5) Then
            Throw New TimeoutException(
"GenerateAESKeysCancel is taking
more than 0.5 seconds to complete.")
        End If
        If ct.IsCancellationRequested Then
            ct.ThrowIfCancellationRequested()
        End If
    Next
    Console.WriteLine("AES: " + sw.Elapsed.ToString())
End Sub

Add the following lines to the Main subroutine:

If t1.IsFaulted Then
    For Each innerEx As Exception In t1.Exception.InnerExceptions
        Debug.WriteLine(innerEx.ToString())
        ' Do something else considering the innerEx Exception
    Next
End If

Because there is an unhandled exception in t1, its IsFaulted property is True. Therefore, t1.Exception, an AggregateException, contains one or more exceptions that occurred during the execution of its associated delegate. After checking the IsFaulted property, it is possible to iterate through each individual exception contained in the InnerExceptions read-only collection of Exception. You can make decisions according to the problems that made it impossible to complete the task. The following lines show the information about the unhandled exception converted to a string and sent to the Debug output.

System.TimeoutException: GenerateAESKeysCancel
is taking more than 0.5 seconds to complete.
   at ConsoleApplication3.Module1.GenerateAESKeysCancel(CancellationToken ct) 
in C:WroxProfessional_VB_2012ConsoleApplication3
ConsoleApplication3Module1.vb:line 427
   at ConsoleApplication3.Module1._Closure$____3._Lambda$____11() 
in C:WroxProfessional_VB_2012ConsoleApplication3
ConsoleApplication3Module1.vb:line 337
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.Execute()

Note
Unhandled exceptions inside asynchronous operations are usually complex problems, because sometimes you need to perform important cleanup operations. For example, when an exception occurs, you can have partial results, and you could have to remove these values if the job doesn't complete because of an exception. Thus, you have to consider cleanup operations when working with tasks.

Returning Values from Tasks

So far, task instances did not return values; they were delegates running subroutines. However, it is also possible to return values from tasks, invoking functions and using Task(Of TResult) instances, where TResult has to be replaced by the returned type.

The following code (code file: Listing17.sln) shows the code for a new function that generates the well-known AES keys and then returns a list of the ones that begin with the character prefix received as one of the parameters (prefix). GenerateAESKeysWithCharPrefix returns a List of String.

The Main subroutine uses the definition of the TaskFactory.StartNew method, but this time it calls it from a Task(Of TResult) instance and not a Task instance. Specifically, it creates a Task(Of List(Of String)) instance, sending it a CancellationToken as a parameter to the task delegate:

Dim t1 = Task(Of List(Of String)).Factory.StartNew(
    Function() GenerateAESKeysWithCharPrefix(ct, "A"), ct)

The delegate is a function that returns a List(Of String), which is going to be available in the Task(Of Result) instance (t1) through its Result property after the associated delegate completes its execution and the function returns a value.

The main thread waits for t1 to finish and then checks whether it completed its execution, checking the previously explained Task instance properties. Then, it iterates through each string in the list, returned by the function called in the previous task, and displays the results on the console. It does this job running a new asynchronous task, t2 (code file: Listing17.sln).

Private Function GenerateAESKeysWithCharPrefix(
    ByVal ct As System.Threading.CancellationToken, 
    ByVal prefix As Char) As List(Of String)

    ct.ThrowIfCancellationRequested()
    Dim sw = Stopwatch.StartNew()
    Dim aesM As New AesManaged()
    Dim keysList As New List(Of String)
    For i As Integer = 1 To NUM_AES_KEYS
        aesM.GenerateKey()
        Dim result = aesM.Key
        Dim hexString = ConvertToHexString(result)
        If Left(hexString, 1) = prefix Then
            keysList.Add(hexString)
        End If
        If ct.IsCancellationRequested Then
            ' It is important to add cleanup code here
            ct.ThrowIfCancellationRequested()
        End If
    Next
    Return keysList
    Console.WriteLine("AES: " + sw.Elapsed.ToString())
End Function

Sub Main()
    Dim sw = Stopwatch.StartNew()
    Dim cts As New System.Threading.CancellationTokenSource()
    Dim ct As System.Threading.CancellationToken = cts.Token

    Dim t1 = Task(Of List(Of String)).Factory.StartNew(
        Function() GenerateAESKeysWithCharPrefix(ct, "A"), ct)

    Try
        t1.Wait()
    Catch ex As AggregateException
        For Each innerEx As Exception In ex.InnerExceptions
            Debug.WriteLine(innerEx.ToString())
            ' Do something else considering the innerEx Exception
        Next
    End Try
    If t1.IsCanceled Then
        Console.WriteLine(
"The task running GenerateAESKeysWithCharPrefix was canceled.")
        Exit Sub
    End If
    If t1.IsFaulted Then
        For Each innerEx As Exception In t1.Exception.InnerExceptions
            Debug.WriteLine(innerEx.ToString())
            ' Do something else considering the innerEx Exception
        Next
        Exit Sub
    End If

    Dim t2 = Task.Factory.StartNew(Sub()
         ' Do something with the result
         ' returned by the task's delegate
         For i As Integer = 0 To t1.Result.Count - 1
             Console.WriteLine(t1.Result(i))
         Next
     End Sub, TaskCreationOptions.LongRunning)

    ' Wait for the user to press a key while t2 is displaying the results
    Console.ReadLine()
End Sub

TaskCreationOptions

The code creates and starts the second task, t2, using the StarNew method and multiline lambda expression syntax. However, in this case, it uses a different definition that receives a TaskCreationOptions parameter that specifies flags to control optional behavior for the creation, scheduling, and execution of tasks.

The TaskCreationOptions enumeration has the four members described in Table 19.5.

Table 19.5 Optional Behaviors for Tasks

Value Description
TaskCreationOptions.AttachedToParent The task is attached to a parent task. You can create tasks inside other tasks.
TaskCreationOptions.None The task can use the default behavior.
TaskCreationOptions.LongRunning The task will take a long time to run. Therefore, the scheduler can work with it as a coarse-grained operation. You can use this option if the task is likely to take many seconds to run. It is not advisable to use this option when a task takes less than one second to run.
TaskCreationOptions.PreferFairness This option tells the scheduler that tasks scheduled sooner should be run sooner and tasks scheduled later should be run later.

Note
It is possible to combine multiple TaskCreationOptions enum values using bitwise operations.

Chaining Two Tasks Using Continuations

Clearly, the previous case shows an example of chained tasks. Task t1 produces a result, and t2 needs it as an input in order to start processing it. In these cases, instead of adding many lines that check for the successful completion of a precedent task and then schedule a new task, TPL enables you to chain tasks using continuations.

You can call the ContinueWith method for any task instance and create a continuation that executes when this task successfully completes its execution. It has many definitions, the simplest of which defines an action as done when creating Task instances.

The following lines show a simplified version of the code used in the previous example to display the results generated by t1 (code file: Snippet05.sln):

Dim t1 = Task(Of List(Of String)).Factory.StartNew(
    Function() GenerateAESKeysWithCharPrefix(ct, "A"), ct)

Dim t2 = t1.ContinueWith(Sub(t)
                             ' Do something with the result returned 
                             ' by the task's delegate
                             For i As Integer = 0 To t.Result.Count - 1
                                 Console.WriteLine(t.Result(i))
                             Next
                         End Sub)

It is possible to chain many tasks and then wait for the last task to be executed. However, you have to be careful with the continuous changes in the states when checking their values for all these asynchronous operations. In addition, it is very important to consider all the potential exceptions that could be thrown.

Preparing the Code for Parallelism

Parallel programming applied to certain complex algorithms is not as simple as shown in the previously explained examples. Sometimes, the differences between a reliable and bug-free parallelized version and its sequential counterpart could reveal an initially unexpected complexity. The code can become too complex, even when taking advantage of the new features offered by TPL. In fact, a complex sequential algorithm is probably going to be a more complex parallel algorithm. Therefore, TPL offers many new data structures for parallel programming that simplify many complex synchronization problems:

  • Concurrent collection classes
  • Lightweight synchronization primitives
  • Types for lazy initialization

The aforementioned data structures were designed to avoid locks wherever possible, and use fine-grained locking when they are necessary on their different shared resources. Locks generate many potential bugs and can significantly reduce scalability. However, sometimes they are necessary because writing lock-free code isn't always possible.

These new data structures enable you to forget about complex lock mechanisms in certain situations because they already include all the necessary lightweight synchronization under the hood. Therefore, it is a good idea to use these data structures whenever possible.

Synchronization Primitives

Furthermore, .NET Framework 4.5 offers synchronization primitives for managing and controlling the interactions between different tasks and their underlying threads, including the following operations:

  • Locking—As with relational databases, sometimes you need to ensure that only one piece of code is working with a variable at that time. Unfortunately, the same problems that appear when working with concurrent access in a relational database are also present in concurrent and parallel codes.
  • Signaling—It provides a waiting and signaling mechanism to simplify the communication between different tasks and their underlying threads. The previously explained cancellation token is a clear example of signaling among many tasks. The mechanisms to wait for certain tasks to complete and the continuations are also examples of signaling implementations.
  • Lock constructors (interlocked operations)—These provide a mechanism to perform atomic operations, such as addition, increment, decrement, exchange, or conditional exchange, depending on the results of a comparison operation and a read operation.

Synchronization Problems

The aforementioned synchronization primitives are advanced topics that require an in-depth analysis in order to determine the most convenient primitive to apply in a given situation. Nowadays, it is important to use the right synchronization primitive in order to avoid potential pitfalls, explained in the following list, while still keeping the code scalable.

Many techniques and new debugging tools can simplify the most complex problems, such as the following:

  • Deadlock—At least two tasks are waiting for each other but the wait never ends because they won't continue with other instructions until the other task releases the protection held over certain resources. The other task is also waiting for resources held by its counterpart to resume its execution. As no task is willing to release its protection, none of them make any progress, and the tasks continue to wait for each other forever. Consider the following situation, task t1 holds a protection over resource A and is waiting to gain exclusive access over resource B. However, at the same time, task t2 holds a protection over resource B and is waiting to gain exclusive access over resource A. This is one of the most horrible bugs.
  • Race conditions—Many tasks read from and write to the same variable without the appropriate synchronization mechanism. It is a correctness problem. Erroneous parallelized code could generate wrong results under certain concurrency or parallel execution scenarios. However, when executed in some circumstances it could generate the expected results because the race may finish correctly. Consider the following situation: Taskt1 writes a value to public variable A. Then, task t2 writes another value to public variable A. When task t1 reads the value for the public variable A, it will hold a different value than the one that it had originally written to it.

Understanding Concurrent Collection Features

Lists, collections, and arrays are excellent examples of when complex synchronization management is needed to access them concurrently and in parallel. If you have to write a parallel loop that adds elements in an unordered way into a shared collection, you have to add a synchronization mechanism to generate a thread-safe collection. The classic lists, collections, and arrays are not thread-safe because they aren't prepared to receive concurrent instructions to add or remove elements. Therefore, creating a thread-safe collection is indeed a very complex job.

Systems.Collections.Concurrent

Luckily, TPL offers a new namespace, System.Collections.Concurrent, for dealing with thread-safe issues. As previously explained, this namespace provides access to the custom partitioners for parallelized loops. However, it also offers access to the following collections prepared for concurrency:

  • BlockingCollection(Of T)—Similar to the classic blocking queue data structure—in this case, prepared for producer-consumer scenarios in which many tasks add and remove data. It is a wrapper of an IProducerConsumer(Of T) instance, providing blocking and bounding capabilities.
  • ConcurrentBag(Of T)—Offers an unordered collection of objects. It is useful when ordering doesn't matter.
  • ConcurrentDictionary(Of TKey, TValue)—Similar to a classic dictionary, with key-value pairs that can be accessed concurrently.
  • ConcurrentQueue(Of T)—A FIFO (First In, First Out) collection whereby many tasks can enqueue and dequeue elements concurrently.
  • ConcurrentStack(Of T)—A LIFO (Last In, First Out) collection whereby many tasks can push and pop elements concurrently.

Note
You don't have to worry about locks and synchronization primitives while using the aforementioned collections in many tasks because they are already prepared to receive concurrent and parallel methods calls. They solve potential deadlocks and race conditions, and they make it easier to work with parallelized code in many advanced scenarios.

ConcurrentQueue

It would be difficult to use a classic shared list to add elements from many independent tasks created by the Parallel.ForEach method. You would need to add synchronization code, which would be a great challenge without restricting the overall scalability. However, it is possible to add strings to a queue (enqueue strings) in a shared ConcurrentCollection inside the parallelized code, because it is prepared for adding elements concurrently.

The following code (code file Listing18.sln) uses a shared ConcurrentQueue(Of String), Keys, in order to hold the strings that contain the AES keys that begin with a certain prefix, generated in a parallelized loop with the custom partitioner. All the tasks created automatically by Parallel.ForEach are going to call the Enqueue method to add the elements that comply with the condition.

Keys.Enqueue(hexString)

It is indeed simple to work with a ConcurrentQueue. There is no need to worry about synchronization problems because everything is controlled under the hood.

Private Keys As Concurrent.ConcurrentQueue(Of String)

Private Sub ParallelPartitionGenerateAESKeysWCP(
    ByVal ct As System.Threading.CancellationToken, 
    ByVal prefix As Char)
    ct.ThrowIfCancellationRequested()
    Dim sw = Stopwatch.StartNew()
    Dim parallelOptions As New ParallelOptions()
    ' Set the CancellationToken for the ParallelOptions instance
    parallelOptions.CancellationToken = ct
    Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), parallelOptions,
        Sub(range)
            Dim aesM As New AesManaged()
            'Debug.WriteLine("Range ({0}, {1}. Time: {2})",
            '                range.Item1, range.Item2, Now().TimeOfDay)
            For i As Integer = range.Item1 To range.Item2 - 1
                aesM.GenerateKey()
                Dim result = aesM.Key
                Dim hexString = ConvertToHexString(result)
                ' Console.WriteLine("AES: " + ConvertToHexString(result))
                If Left(hexString, 1) = prefix Then
                    Keys.Enqueue(hexString)
                End If
                parallelOptions.CancellationToken.ThrowIfCancellationRequested()
            Next
        End Sub)
    Console.WriteLine("AES: " + sw.Elapsed.ToString())
End Sub

Sub Main()
    Dim cts As New System.Threading.CancellationTokenSource()
    Dim ct As System.Threading.CancellationToken = cts.Token
    Keys = New ConcurrentQueue(Of String)

    Dim tAsync = New Task(Sub() ParallelPartitionGenerateAESKeysWCP(ct, "A"))
    tAsync.Start()

    ' Do something else
    ' Wait for tAsync to finish
    tAsync.Wait()

    Console.ReadLine()
End Sub

For example, it is possible to run many LINQ queries to display partial statistics while running the task that is adding elements to the ConcurrentQueue (Keys). The following code (code file: Listing19.sln) shows a new Main subroutine that checks whether the task (tAsync) is running or waiting to run, and while this happens it runs a LINQ query to show the number of keys that contain an F in the shared ConcurrentQueue (Keys).

Sub Main()
    Dim cts As New System.Threading.CancellationTokenSource()
    Dim ct As System.Threading.CancellationToken = cts.Token

    Keys = New ConcurrentQueue(Of String)
    Dim tAsync = Task.Factory.StartNew(
        Sub() ParallelPartitionGenerateAESKeysWCP(ct, "A"))

    Do While (tAsync.Status = TaskStatus.Running) Or
             (tAsync.Status = TaskStatus.WaitingToRun)
        ' Display partial results
        Dim countQuery = Aggregate key In Keys
                         Where key.Contains("F")
                         Into Count()

        Console.WriteLine("So far, the number of keys
that contain an F is: {0}", countQuery)
        ' Sleep the main thread for 0.5 seconds
        Threading.Thread.Sleep(500)
    Loop

    tAsync.Wait()

    ' Do something else

    Console.ReadLine()
End Sub

Another useful feature is the capability to remove an element at the beginning of the queue in a safe way using its TryDequeue method:

Dim firstKey As String
If Keys.TryDequeue(firstKey) Then
    ' firstKey has the first key added to the ConcurrentQueue
Else
    ' It wasn't possible to remove an element from the ConcurrentQueue
End If

TryDequeue returns a Boolean value indicating whether the operation was successful. It returns the element using an output attribute—in this case, a String received by reference (firstKey).

It is possible to add and remove elements in different tasks.

ConcurrentStack

ConcurrentStack is very similar to the previously explained ConcurrentQueue but it uses different method names to better represent a stack (a LIFO collection). Its most important methods are Push and TryPop.

Push inserts an element at the top of the ConcurrentStack. If Keys were a ConcurrentStack(Of String), the following lines would add hexString at the top of the stack:

If Left(hexString, 1) = prefix Then
    Keys.Push(hexString)
End If

You can remove an element at the top of the stack in a safe way using its TryPop method. However, in this case, the method will return the last element added because it is a stack and not a queue:

Dim firstKey As String
If Keys.TryPop(firstKey) Then
    ' firstKey has the last key added to the ConcurrentStack
Else
    ' It wasn't possible to remove an element from the ConcurrentStack
End If

TryPop also returns a Boolean value indicating whether the operation was successful.

Transforming LINQ into PLINQ

You already learned that LINQ is very useful to query and process different data sources. If you are using LINQ to Objects, it is possible to take advantage of parallelism using its parallel implementation, Parallel LINQ (PLINQ).


Note
PLINQ implements the full set of LINQ query operators and adds new additional operators for parallel execution. PLINQ can achieve significant speedups over its LINQ counterpart, but it depends on the scenario, as always with parallelism. If the query involves an appreciable number of calculations and memory-intensive operations and ordering doesn't matter, the speedups could be significant. However, when ordering matters, the speedups could be reduced.

As you might have expected, LINQ and PLINQ can work with the previously explained concurrent collections. The following code defines a simple but intensive function to count and return the number of letters in a string received as a parameter (code file: Snippet06.sln):

Function CountLetters(ByVal key As String) As Integer
    Dim letters As Integer = 0
    For i As Integer = 0 To key.Length() - 1
        If Char.IsLetter(key, i) Then letters += 1
    Next
    Return letters
End Function

A simple LINQ expression to return all the AES keys with at least 10 letters containing an A, an F, a 9, and not a B, would look like the following:

Dim keysWith10Letters = From key In Keys
                        Where CountLetters(key) >= 10 
                        And key.Contains("A") 
                        And key.Contains("F") 
                        And key.Contains("9") 
                        And Not key.Contains("B")

In order to transform the aforementioned LINQ expression into a PLINQ expression that can take advantage of parallelism, it is necessary to use the AsParallel method, as shown here:

Dim keysWith10Letters = From key In Keys.AsParallel()
                        Where CountLetters(key) >= 10 
                        And key.Contains("A") 
                        And key.Contains("F") 
                        And key.Contains("9") 
                        And Not key.Contains("B")

This way, the query will try to take advantage of all the available logical cores at run time in order to run faster than its sequential version.

It is possible to add code at the end of the Main subroutine to return some results according to the PLINQ query (code file: Snippet06.sln):

Dim sw = Stopwatch.StartNew()

Dim keysWith10Letters = From key In Keys.AsParallel()
                        Where CountLetters(key) >= 10 
                        And key.Contains("A") 
                        And key.Contains("F") 
                        And key.Contains("9") 
                        And Not key.Contains("B")

Console.WriteLine("The code generated {0} keys 
with at least ten letters, A, F and 9 
but no B in the hexadecimal code.", keysWith10Letters.Count())
Console.WriteLine("First key {0}: ", keysWith10Letters(0))
Console.WriteLine("Last key {0}: ", 
    keysWith10Letters(keysWith10Letters.Count() - 1))
Debug.WriteLine(sw.Elapsed.ToString())

Console.ReadLine()

This code shows the number of keys that comply with the conditions, the first one and the last one, stored in the results of the PLINQ query that worked against the ConcurrentQueue(Of String).

ParallelEnumerable and Its AsParallel Method

The System.Linq.ParallelEnumerable class is responsible for exposing most of PLINQ's additional functionality, including its most important one: the AsParallel method. Table 19.6 summarizes the PLINQ-specific methods.

Table 19.6 PLINQ Operators Exposed by ParallelEnumerable

Value Description
AsOrdered() PLINQ must preserve the ordering of the source sequence for the rest of the query or until it changes using an Order By clause.
AsParallel() The rest of the query should be parallelized, whenever possible.
AsSequential() The rest of the query should run sequentially, as traditional LINQ.
AsUnordered() PLINQ doesn't have to preserve the ordering of the source sequence.
ForAll() An enumeration method that enables the results to be processed in parallel, using multiple tasks.
WithCancellation Enables working with a cancellation token to permit cancellation of the query execution as previously explained with tasks.
WithDegreeOfParallelism PLINQ will be optimized as if the total number of available cores were equal to the degree of parallelism specified as a parameter for this method.
WithExecutionMode This can force parallel execution when the default behavior would be to run it sequentially as traditional LINQ.
WithMergeOptions This can provide hints about the way PLINQ should merge the parallel pieces of the result on the thread that is consuming the query.

In addition, AsParallel offers an Aggregate overload that enables the implementation of parallel reduction algorithms. It enables intermediate aggregation on each parallelized part of the query and a final aggregation function that is capable of providing the logic to combine the results of all the generated partitions.

Sometimes it is useful to run a PLINQ query with many different degrees of parallelism in order to measure its scalability. For example, the following line runs the previously shown PLINQ query to take advantage of no more than three cores:

Dim keysWith10Letters = From key In Keys.AsParallel().WithDegreeOfParallelism(3)
                        Where CountLetters(key) >= 10 
                        And key.Contains("A")
                        And key.Contains("F") 
                        And key.Contains("9") 
                        And Not key.Contains("B")

AsOrdered and Order By

Because using AsOrdered and the Order By clause in PLINQ queries can reduce any speed gains, it is very important to compare the speedup achieved against the sequential version before requesting ordered results.

If a PLINQ query doesn't achieve significant performance improvements, you have another interesting option to take advantage of parallelism: running many LINQ queries in independent tasks or using Parallel.Invoke.

Working with ForAll and a ConcurrentBag

The ForAll extension method is very useful to process the results of a query in parallel without having to write a parallel loop. It receives an action as a parameter, offering the same possibilities as the same parameter received by the Task constructors. Therefore, using lambda expressions, you can combine parallelized processing actions from the results of a PLINQ query. The following lines add elements in parallel to a new ConcurrentBag (keysBag), an unordered collection of Integer, counting the letters for each of the keys in the results of the previous PLINQ query (code file: Snippet07.sln):

Dim keysWith10Letters = From key In Keys.AsParallel()
                        Where CountLetters(key) >= 10 
                        And key.Contains("A") 
                        And key.Contains("F") 
                        And key.Contains("9") 
                        And Not key.Contains("B")

Dim keysBag As New ConcurrentBag(Of Integer)
keysWith10Letters.ForAll(Sub(i) keysBag.Add(CountLetters(i)))

Note
This parallel processing is possible because ConcurrentBag is one of the concurrent collections that allows many elements to be added by multiple tasks running in parallel.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset