23
Multithreaded Programming with C++

INTRODUCTION

Multithreaded programming allows you to perform multiple calculations in parallel. As a result, you can take advantage of the multiple processor units inside virtually all systems today. Two decades ago, the processor market was racing for the highest frequency, which is perfect for single-threaded applications. Around 2005, this race stopped due to a combination of power and heat management problems. Since then, the processor market is racing toward the most cores on a single processor chip. Dual- and quad-core processors are common, and even 12-, 16-, 18-, and more core processors are available.

Similarly, if you look at the processors on graphics cards, called GPUs, you’ll see that they are massively-parallel processors. Today, high-end graphics cards have more than 4,000 cores, a number that will increase rapidly! These graphics cards are used not only for gaming, but also to perform computationally intensive tasks. Examples are image and video manipulation, protein folding (useful for discovering new drugs), processing signals as part of the SETI (Search for Extraterrestrial Intelligence) project, and so on.

C++98/03 did not have support for multithreaded programming, and you had to resort to third-party libraries or to the multithreading APIs of your target operating system. Because C++11 included a standard multithreading library, it became easier to write cross-platform multithreaded applications. The current C++ standard targets only CPUs and not GPUs. This might change in the future.

There are two reasons to start writing multithreaded code. First, if you have a computational problem and you manage to separate it into small pieces that can be run in parallel independently from each other, you can expect a huge performance boost when running it on multiple processor units. Second, you can modularize computations along orthogonal axes. For example, you can do long computations in a thread instead of blocking the UI thread, so the user interface remains responsive while a long computation occurs in the background.

Figure 23-1 shows a situation that is perfectly suited for running in parallel. An example could be the processing of pixels of an image by an algorithm that does not require information about neighboring pixels. The algorithm could split the image into four parts. On a single-core processor, each part would be processed sequentially; on a dual-core processor, two parts would be processed in parallel; and on a quad-core processor, four parts would be processed in parallel, resulting in an almost linear scaling of the performance with the number of cores.

Illustration of a situation that is suited for running in parallel.

FIGURE 23-1

Of course, it’s not always possible to split the problem into parts that can be executed independently of each other in parallel. However, it can often be made parallel, at least partially, resulting in a performance increase. A difficult part of multithreaded programming is making your algorithm parallel, which is highly dependent on the type of the algorithm. Other difficulties are preventing race conditions, deadlocks, tearing, and false-sharing. These are discussed in the following sections. They are usually solved using atomics or explicit synchronization mechanisms, as discussed later in this chapter.

Race Conditions

Race conditions can occur when multiple threads want to access any kind of shared resources. Race conditions in the context of shared memory are called data races. A data race can occur when multiple threads access shared memory, and at least one thread writes to the shared memory. For example, suppose you have a shared variable and one thread increments this value while another thread decrements it. Incrementing and decrementing the value means that the current value needs to be retrieved from memory, incremented or decremented, and stored back in memory. On old architectures, such as PDP-11 and VAX, this used to be implemented with an INC processor instruction, which was atomic. On modern x86 processors, the INC instruction is not atomic anymore, meaning that other instructions can be executed in the middle of this operation, which might cause the code to retrieve a wrong value.

The following table shows the result when the increment is finished before the decrement starts, and assumes that the initial value is 1.

THREAD 1 (INCREMENT) THREAD 2 (DECREMENT)
load value (value = 1)
increment value (value = 2)
store value (value = 2)
load value (value = 2)
decrement value (value = 1)
store value (value = 1)

The final value stored in memory is 1. When the decrement thread is finished before the increment thread starts, the final value is also 1, as shown in the following table.

THREAD 1 (INCREMENT) THREAD 2 (DECREMENT)
load value (value = 1)
decrement value (value = 0)
store value (value = 0)
load value (value = 0)
increment value (value = 1)
store value (value = 1)

However, when the instructions are interleaved, the result is different, as shown in the following table.

THREAD 1 (INCREMENT) THREAD 2 (DECREMENT)
load value (value = 1)
increment value (value = 2)
load value (value = 1)
decrement value (value = 0)
store value (value = 2)
store value (value = 0)

The final result in this case is 0. In other words, the effect of the increment operation is lost. This is a data race.

Tearing

Tearing is a specific case or consequence of a data race. There are two kinds of tearing: torn read and torn write. If a thread has written part of your data to memory, while another part hasn’t been written yet by the same thread, any other thread reading that data at that exact moment sees inconsistent data, a torn read. If two threads are writing to the data at the same time, one thread might have written part of the data, while another thread might have written another part of the data. The final result will be inconsistent, a torn write.

Deadlocks

If you opt to solve a race condition by using a synchronization method, such as mutual exclusion, you might run into another common problem with multithreaded programming: deadlocks. Two threads are deadlocked if they are both waiting for the other thread to do something. This can be extended to more than two threads. For example, if two threads want to acquire access to a shared resource, they need to ask for permission to access this resource. If one of the threads currently holds the permission to access the resource, but is blocked indefinitely for some other reason, then the other thread will block indefinitely as well when trying to acquire permission for the same resource. One mechanism to acquire permission for a shared resource is called a mutual exclusion object, discussed in detail later in this chapter. For example, suppose you have two threads and two resources protected with two mutual exclusion objects, A and B. Both threads acquire permission for both resources, but they acquire the permission in different order. The following table shows this situation in pseudo-code.

THREAD 1 THREAD 2
Acquire A
Acquire B
// compute
Release B
Release A
Acquire B
Acquire A
// compute
Release A
Release B

Now, imagine that the code in the two threads is executed in the following order.

  • Thread 1: Acquire A
  • Thread 2: Acquire B
  • Thread 1: Acquire B (waits/blocks, because B is held by thread 2)
  • Thread 2: Acquire A (waits/blocks, because A is held by thread 1)

Both threads are now waiting indefinitely in a deadlock situation. Figure 23-2 shows a graphical representation of the deadlock. Thread 1 has acquired permission for resource A and is waiting to acquire permission for resource B. Thread 2 has acquired permission for resource B and is waiting to acquire permission for resource A. In this graphical representation, you see a cycle that depicts the deadlock. Both threads will wait indefinitely.

Illustration of a deadlock situation.

FIGURE 23-2

It’s best to always acquire permissions in the same order to avoid these kinds of deadlocks. You can also include mechanisms in your program to break these deadlocks. One possible solution is to try for a certain time to acquire permission for a resource. If the permission could not be obtained within a certain time interval, the thread could stop waiting and possibly releases other permissions it is currently holding. The thread might then sleep for a little bit and try again later to acquire all the resources it needs. This method might give other threads the opportunity to acquire necessary permissions and continue their execution. Whether this method works or not depends heavily on your specific deadlock case.

Instead of using a workaround as described in the previous paragraph, you should try to avoid any possible deadlock situation altogether. If you need to acquire permission to multiple resources protected with several mutual exclusion objects, instead of acquiring permission for each resource individually, the recommended way is to use the standard std::lock() or std::try_lock() functions described later in the section “Mutual Exclusion.” These functions obtain or try to obtain permission for several resources with one call.

False-Sharing

Most caches work with so-called cache lines. For modern CPUs, cache lines are usually 64 bytes. If something needs to be written to a cache line, the entire line needs to be locked. This can bring a serious performance penalty for multithreaded code if your data structure is not properly designed. For example, if two threads are using two different pieces of data, but that data shares a cache line, then when one thread writes something, the other thread is blocked because the entire cache line is locked. You can optimize your data structures by using explicit memory alignments to make sure data that is worked on by multiple threads does not share any cache lines. To do this in a portable manner, C++17 introduces a constant called hardware_destructive_interference_size, defined in <new>, which returns you the minimum recommended offset between two concurrently accessed objects to avoid cache line sharing. You can use that value in combination with the alignas keyword to properly align your data.

THREADS

The C++ threading library, defined in the <thread> header file, makes it very easy to launch new threads. You can specify what needs to be executed in the new thread in several ways. You can let the new thread execute a global function, the operator() of a function object, a lambda expression, or even a member function of an instance of some class. The following sections give small examples of all these methods.

Thread with Function Pointer

Functions such as CreateThread(), _beginthread(), and so on, on Windows, and pthread_create() with the pthreads library, require that the thread function has only one parameter. On the other hand, a function that you want to use with the standard C++ std::thread class can have as many parameters as you want.

Suppose you have a counter() function accepting two integers: the first representing an ID and the second representing the number of iterations that the function should loop. The body of the function is a single loop that loops the given number of iterations. On each iteration, a message is printed to standard output:

void counter(int id, int numIterations)
{
    for (int i = 0; i < numIterations; ++i) {
        cout << "Counter " << id << " has value " << i << endl;
    }
}

You can launch multiple threads executing this function using std::thread. You can create a thread t1, executing counter() with arguments 1 and 6 as follows:

thread t1(counter, 1, 6);

The constructor of the thread class is a variadic template, which means that it accepts any number of arguments. Variadic templates are discussed in detail in Chapter 22. The first argument is the name of the function to execute in the new thread. The subsequent variable number of arguments are passed to this function when execution of the thread starts.

A thread object is said to be joinable if it represents or represented an active thread in the system. Even when the thread has finished executing, a thread object remains in the joinable state. A default constructed thread object is unjoinable. Before a joinable thread object is destroyed, you need to make sure to call either join() or detach() on it. A call to join() is a blocking call, it waits until the thread has finished its work. A call to detach() detaches a thread object from its underlying OS thread, in which case the OS thread keeps running independently. Both methods cause the thread to become unjoinable. If a thread object that is still joinable is destroyed, the destructor will call std::terminate(), which abruptly terminates all threads and the application itself.

The following code launches two threads executing the counter() function. After launching the threads, main() calls join() on both threads.

thread t1(counter, 1, 6);
thread t2(counter, 2, 4);
t1.join();
t2.join();

A possible output of this example looks as follows:

Counter 2 has value 0
Counter 1 has value 0
Counter 1 has value 1
Counter 1 has value 2
Counter 1 has value 3
Counter 1 has value 4
Counter 1 has value 5
Counter 2 has value 1
Counter 2 has value 2
Counter 2 has value 3

The output on your system will be different and it will most likely be different every time you run it. This is because two threads are executing the counter() function at the same time, so the output depends on the number of processing cores in your machine and on the thread scheduling of the operating system.

By default, accessing cout from different threads is thread-safe and doesn’t cause any data races, unless you have called cout.sync_with_stdio(false) before the first output or input operation. However, even though there are no data races, output from different threads can still be interleaved! This means that the output of the previous example can be mixed together as follows:

Counter Counter 2 has value 0
1 has value 0
Counter 1 has value 1
Counter 1 has value 2

This can be fixed using synchronization methods, which are discussed later in this chapter.

Thread with Function Object

Instead of using function pointers, you can also use a function object to execute in a thread. With the function pointer technique of the previous section, the only way to pass information to the thread is by passing arguments to the function. With function objects, you can add member variables to your function object class, which you can initialize and use however you want. The following example first defines a class called Counter, which has two member variables: an ID and the number of iterations for the loop. Both variables are initialized with the constructor. To make the Counter class a function object, you need to implement operator(), as discussed in Chapter 18. The implementation of operator() is the same as the counter() function from the previous section. Here is the code:

class Counter
{
    public:
        Counter(int id, int numIterations)
            : mId(id), mNumIterations(numIterations)
        {
        }

        void operator()() const
        {
            for (int i = 0; i < mNumIterations; ++i) {
                cout << "Counter " << mId << " has value " << i << endl;
            }
        }
    private:
        int mId;
        int mNumIterations;
};

Three methods for initializing threads with a function object are demonstrated in the following code snippet. The first method uses the uniform initialization syntax. You create an instance of Counter with its constructor arguments and give it to the thread constructor between curly braces.

The second method defines a named instance of Counter and gives this named instance to the constructor of the thread class.

The third method looks similar to the first one; it creates an instance of Counter and gives it to the constructor of the thread class, but uses parentheses instead of curly braces. The ramifications of this are discussed after the code.

// Using uniform initialization syntax
thread t1{ Counter{ 1, 20 }};

// Using named variable
Counter c(2, 12);
thread t2(c);

// Using temporary
thread t3(Counter(3, 10));

// Wait for threads to finish
t1.join();
t2.join();
t3.join();

If you compare the creation of t1 with the creation of t3, the only difference seems to be that the first method uses curly braces while the third method uses parentheses. However, when your function object constructor doesn’t require any parameters, the third method as written earlier does not work. Here is an example:

class Counter
{
    public:
        Counter() {}
        void operator()() const { /* Omitted for brevity */ }
};

int main()
{
    thread t1(Counter());    // Error!
    t1.join();
}

This results in a compilation error because C++ interprets the first line in main() as a declaration of a function called t1, which returns a thread object and accepts a pointer to a function without parameters returning a Counter object. For this reason, it’s recommended to use the uniform initialization syntax:

thread t1{ Counter{} };  // OK

Thread with Lambda

Lambda expressions fit nicely with the standard C++ threading library. Here is an example that launches a thread to execute a given lambda expression:

int main()
{
    int id = 1;
    int numIterations = 5;
    thread t1([id, numIterations] {
        for (int i = 0; i < numIterations; ++i) {
            cout << "Counter " << id << " has value " << i << endl;
        }
    });
    t1.join();
}

Thread with Member Function

You can specify a member function of a class to be executed in a thread. The following example defines a basic Request class with a process() method. The main() function creates an instance of the Request class, and launches a new thread, which executes the process() method of the Request instance req.

class Request
{
    public:
        Request(int id) : mId(id) { }

        void process()
        {
            cout << "Processing request " << mId << endl;
        }
    private:
        int mId;
};

int main()
{
    Request req(100);
    thread t{ &Request::process, &req };
    t.join();
}

With this technique, you are executing a method on a specific object in a separate thread. If other threads are accessing the same object, you need to make sure this happens in a thread-safe way to avoid data races. Mutual exclusion, discussed later in this chapter, can be used as a synchronization mechanism to make it thread-safe.

Thread Local Storage

The C++ standard supports the concept of thread local storage. With a keyword called thread_local, you can mark any variable as thread local, which means that each thread will have its own unique copy of the variable and it will last for the entire duration of the thread. For each thread, the variable is initialized exactly once. For example, the following code defines two global variables. Every thread shares one—and only one—copy of k, while each thread has its own unique copy of n:

int k;
thread_local int n;

Note that if the thread_local variable is declared in the scope of a function, its behavior is as if it were declared static, except that every thread has its own unique copy and is initialized exactly once per thread, no matter how many times that function is called in that thread.

Cancelling Threads

The C++ standard does not include any mechanism for cancelling a running thread from another thread. The best way to achieve this is to provide some communication mechanism that the two threads agree upon. The simplest mechanism is to have a shared variable, which the target thread checks periodically to determine if it should terminate. Other threads can set this shared variable to indirectly instruct the thread to shut down. You have to be careful here, because this shared variable is being accessed by multiple threads, of which at least one is writing to the shared variable. It’s recommended to use atomic variables or condition variables, both discussed later in this chapter.

Retrieving Results from Threads

As you saw in the previous examples, launching a new thread is pretty easy. However, in most cases you are probably interested in results produced by the thread. For example, if your thread performs some mathematical calculations, you really would like to get the results out of the thread once the thread is finished. One way is to pass a pointer or reference to a result variable to the thread in which the thread stores the results. Another method is to store the results inside class member variables of a function object, which you can retrieve later once the thread has finished executing. This only works if you use std::ref() to pass your function object by reference to the thread constructor.

However, there is another easier method to obtain a result from threads: futures. Futures also make it easier to handle errors that occur inside your threads. They are discussed later in this chapter.

Copying and Rethrowing Exceptions

The whole exception mechanism in C++ works perfectly fine, as long as it stays within one single thread. Every thread can throw its own exceptions, but they need to be caught within their own thread. If a thread throws an exception and it is not caught inside the thread, the C++ runtime calls std::terminate(), which terminates the whole application. Exceptions thrown in one thread cannot be caught in another thread. This introduces quite a few problems when you would like to use exception handling in combination with multithreaded programming.

Without the standard threading library, it’s very difficult if not impossible to gracefully handle exceptions across threads. The standard threading library solves this issue with the following exception-related functions. These functions work not only with std::exceptions, but also with other kinds of exceptions, ints, strings, custom exceptions, and so on:

  • exception_ptr current_exception() noexcept;

    This is intended to be called from inside a catch block. It returns an exception_ptr object that refers to the exception currently being handled, or a copy of the currently handled exception. A null exception_ptr object is returned if no exception is being handled. This referenced exception object remains valid for as long as there is an object of type exception_ptr that is referencing it. exception_ptr is of type NullablePointer, which means it can easily be tested with a simple if statement, as the example later in this section will demonstrate.

  • [[noreturn]] void rethrow_exception(exception_ptr p);

    This function rethrows the exception referenced by the exception_ptr parameter. Rethrowing the referenced exception does not have to be done in the same thread that generated the referenced exception in the first place, which makes this feature perfectly suited for handling exceptions across different threads. The [[noreturn]] attribute makes it clear that this function never returns normally. Attributes are introduced in Chapter 11.

  • template<class E> exception_ptr make_exception_ptr(E e) noexcept;

    This function creates an exception_ptr object that refers to a copy of the given exception object. This is basically a shorthand notation for the following code:

try {
    throw e;
} catch(…) {
    return current_exception();
}

Let’s see how handling exceptions across different threads can be implemented using these functions. The following code defines a function that does some work and throws an exception. This function will ultimately be running in a separate thread:

void doSomeWork()
{
    for (int i = 0; i < 5; ++i) {
        cout << i << endl;
    }
    cout << "Thread throwing a runtime_error exception…" << endl;
    throw runtime_error("Exception from thread");
}

The following threadFunc() function wraps the call to the preceding function in a try/catch block, catching all exceptions that doSomeWork() might throw. A single argument is supplied to threadFunc(), which is of type exception_ptr&. Once an exception is caught, the function current_exception() is used to get a reference to the exception being handled, which is then assigned to the exception_ptr parameter. After that, the thread exits normally:

void threadFunc(exception_ptr& err)
{
    try {
        doSomeWork();
    } catch (…) {
        cout << "Thread caught exception, returning exception…" << endl;
        err = current_exception();
    }
}

The following doWorkInThread() function is called from within the main thread. Its responsibility is to create a new thread and start executing threadFunc() in it. A reference to an object of type exception_ptr is given as argument to threadFunc(). Once the thread is created, the doWorkInThread() function waits for the thread to finish by using the join() method, after which the error object is examined. Because exception_ptr is of type NullablePointer, you can easily check it using an if statement. If it’s a non-null value, the exception is rethrown in the current thread, which is the main thread in this example. Because you are rethrowing the exception in the main thread, the exception has been transferred from one thread to another thread.

void doWorkInThread()
{
    exception_ptr error;
    // Launch thread
    thread t{ threadFunc, ref(error) };
    // Wait for thread to finish
    t.join();
    // See if thread has thrown any exception
    if (error) {
        cout << "Main thread received exception, rethrowing it…" << endl;
        rethrow_exception(error);
    } else {
        cout << "Main thread did not receive any exception." << endl;
    }
}

The main() function is pretty straightforward. It calls doWorkInThread() and wraps the call in a try/catch block to catch exceptions thrown by the thread spawned by doWorkInThread():

int main()
{
    try {
        doWorkInThread();
    } catch (const exception& e) {
        cout << "Main function caught: '" << e.what() << "'" << endl;
    }
}

The output is as follows:

0
1
2
3
4
Thread throwing a runtime_error exception…
Thread caught exception, returning exception…
Main thread received exception, rethrowing it…
Main function caught: 'Exception from thread'

To keep the examples in this chapter compact and to the point, their main() functions usually use join() to block the main thread, and to wait until threads have finished. Of course, in real-world applications you do not want to block your main thread. For example, in a GUI application, blocking your main thread means that the UI becomes unresponsive. In that case, you can use a messaging paradigm to communicate between threads. For example, the earlier threadFunc() function could send a message to the UI thread with as argument a copy of the result of current_exception(). But even then, you need to make sure to call either join() or detach() on any spawned threads, as discussed earlier in this chapter.

ATOMIC OPERATIONS LIBRARY

Atomic types allow atomic access, which means that concurrent reading and writing without additional synchronization is allowed. Without atomic operations, incrementing a variable is not thread-safe because the compiler first loads the value from memory into a register, increments it, and then stores the result back in memory. Another thread might touch the same memory during this increment operation, which is a data race. For example, the following code is not thread-safe and contains a data race. This type of data races is discussed in the beginning of this chapter.

int counter = 0;   // Global variable
++counter;         // Executed in multiple threads

You can use an std::atomic type to make this thread-safe without explicitly using any synchronization mechanism, such as mutual exclusion objects, which are discussed later in this chapter. Here is the same code using an atomic integer:

atomic<int> counter(0) ;  // Global variable
++counter;                // Executed in multiple threads

You need to include the <atomic> header to use these atomic types. The C++ standard defines named integral atomic types for all primitive types. The following table lists a few.

NAMED ATOMIC TYPE EQUIVALENT STD::ATOMIC TYPE
atomic_bool atomic<bool>
atomic_char atomic<char>
atomic_uchar atomic<unsigned char>
atomic_int atomic<int>
atomic_uint atomic<unsigned int>
atomic_long atomic<long>
atomic_ulong atomic<unsigned long>
atomic_llong atomic<long long>
atomic_ullong atomic<unsigned long long>
atomic_wchar_t atomic<wchar_t>

You can use atomic types without explicitly using any synchronization mechanism. However, underneath, operations on atomics of a certain type might use a synchronization mechanism such as mutual exclusion objects. This might happen, for example, when the hardware you are targeting lacks the instructions to perform an operation atomically. You can use the is_lock_free() method on an atomic type to query whether it supports lock-free operations, that is, its operations run without any explicit synchronization mechanism underneath.

The std::atomic class template can be used with all kinds of types, not only integral types. For example, you can create an atomic<double>, or an atomic<MyType>, but only if MyType is trivially copyable. Depending on the size of the specified type, underneath these might require explicit synchronization mechanisms. In the following example, both Foo and Bar are trivially copyable, that is, std::is_trivially_copyable_v is true for both. However, atomic<Foo> is not lock free, while atomic<Bar> is.

class Foo { private: int mArray[123]; };
class Bar { private: int mInt; };

int main()
{
    atomic<Foo> f;
    // Outputs: 1 0
    cout << is_trivially_copyable_v<Foo> << " " << f.is_lock_free() << endl;
    atomic<Bar> b;
    // Outputs: 1 1
    cout << is_trivially_copyable_v<Bar> << " " << b.is_lock_free() << endl;
}

When accessing a piece of data from multiple threads, atomics also solve problems such as memory ordering, compiler optimizations, and so on. Basically, it’s virtually never safe to read and write to the same piece of data from multiple threads without using atomics or explicit synchronization mechanisms!

Atomic Type Example

This section explains in more detail why you should use atomic types. Suppose you have a function called increment() that increments an integer reference parameter in a loop. This code uses std::this_thread::sleep_for() to introduce a small delay in each loop. The argument to sleep_for() is an std::chrono::duration, which is explained in Chapter 20.

void increment(int& counter)
{
    for (int i = 0; i < 100; ++i) {
        ++counter;
        this_thread::sleep_for(1ms);
    }
}

Now, you would like to run several threads in parallel, all executing this increment() function on a shared counter variable. By implementing this naively without atomic types or without any kind of thread synchronization, you introduce data races. The following code launches ten threads, after which it waits for all threads to finish by calling join() on each thread:

int main()
{
    int counter = 0;
    vector<thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.push_back(thread{ increment, ref(counter) });
    }

    for (auto& t : threads) {
        t.join();
    }
    cout << "Result = " << counter <<endl;
}

Because increment() increments its given integer 100 times, and ten threads are launched, each of which executes increment() on the same shared counter, the expected result is 1,000. If you execute this program several times, you might get the following output but with different values:

Result = 982
Result = 977
Result = 984

This code is clearly showing a data race. In this example, you can use an atomic type to fix the code. The following code highlights the required changes:

#include <atomic>

void increment(atomic<int>& counter)
{
    for (int i = 0; i < 100; ++i) {
        ++counter;
        this_thread::sleep_for(1ms);
    }
}

int main()
{
    atomic<int> counter(0);
    vector<thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.push_back(thread{ increment, ref(counter) });
    }

    for (auto& t : threads) {
        t.join();
    }
    cout << "Result = " << counter << endl;
}

The changes add the <atomic> header file, and change the type of the shared counter to std::atomic<int> instead of int. When you run this modified version, you always get 1,000 as the result:

Result = 1000
Result = 1000
Result = 1000

Without explicitly adding any synchronization mechanism to the code, it is now thread-safe and data-race-free because the ++counter operation on an atomic type loads, increments, and stores the value in one atomic transaction, which cannot be interrupted.

However, there is a new problem with this modified code: a performance problem. You should try to minimize the amount of synchronization, either atomic or explicit synchronization, because it lowers performance. For this simple example, the best and recommended solution is to let increment() calculate its result in a local variable, and only after the loop, add it to the counter reference. Note that it is still required to use an atomic type, because you are still writing to counter from multiple threads.

void increment(atomic<int>& counter)
{
    int result = 0;
    for (int i = 0; i < 100; ++i) {
        ++result;
        this_thread::sleep_for(1ms);
    }
    counter += result;
}

Atomic Operations

The C++ standard defines a number of atomic operations. This section describes a few of those operations. For a full list, consult a Standard Library Reference, see Appendix B.

A first example of an atomic operation is the following:

bool atomic<T>::compare_exchange_strong(T& expected, T desired);

The logic implemented atomically by this operation is as follows, in pseudo-code:

if (*this == expected) {
    *this = desired;
    return true;
} else {
    expected = *this;
    return false;
}

Although this logic might seem fairly strange on first sight, this operation is a key building block for writing lock-free concurrent data structures. Lock-free concurrent data structures allow operations on their data without requiring any synchronization mechanisms. However, implementing such data structures is an advanced topic, outside the scope of this book.

A second example is atomic<T>::fetch_add(), which works for integral atomic types. It fetches the current value of the atomic type, adds the given increment to the atomic value, and returns the original non-incremented value. Here is an example:

atomic<int> value(10);
cout << "Value = " << value << endl;
int fetched = value.fetch_add(4);
cout << "Fetched = " << fetched << endl;
cout << "Value = " << value << endl;

If no other threads are touching the contents of the fetched and value variables, the output is as follows:

Value = 10
Fetched = 10
Value = 14

Atomic integral types support the following atomic operations: fetch_add(), fetch_sub(), fetch_and(), fetch_or(), fetch_xor(), ++, --, +=, -=, &=, ^=, and |=. Atomic pointer types support fetch_add(), fetch_sub(), ++, --, +=, and -=.

Most of the atomic operations can accept an extra parameter specifying the memory ordering that you would like. Here is an example:

T atomic<T>::fetch_add(T value, memory_order = memory_order_seq_cst);

You can change the default memory_order. The C++ standard provides memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, and memory_order_seq_cst, all of which are defined in the std namespace. However, you will rarely want to use them instead of the default. While another memory order may perform better than the default, according to some metrics, if you use them in a slightly incorrect way, you will again introduce data races or other difficult-to-track threading-related problems. If you do want to know more about memory ordering, consult one of the multithreading references in Appendix B.

MUTUAL EXCLUSION

If you are writing multithreaded applications, you have to be sensitive to sequencing of operations. If your threads read and write shared data, this can be a problem. There are many ways to avoid this problem, such as never actually sharing data between threads. However, if you can’t avoid sharing data, you must provide for synchronization so that only one thread at a time can change the data.

Scalars such as Booleans and integers can often be synchronized properly with atomic operations, as described earlier; however, when your data is more complex, and you need to use that data from multiple threads, you must provide explicit synchronization.

The Standard Library has support for mutual exclusion in the form of mutex and lock classes. These can be used to implement synchronization between threads and are discussed in the following sections.

Mutex Classes

Mutex stands for mutual exclusion. The basic mechanism of using a mutex is as follows:

  • A thread that wants to use (read/write) memory shared with other threads tries to lock a mutex object. If another thread is currently holding this lock, the new thread that wants to gain access blocks until the lock is released, or until a timeout interval expires.
  • Once the thread has obtained the lock, it is free to use the shared memory. Of course, this assumes that all threads that want to use the shared data correctly acquire a lock on the mutex.
  • After the thread is finished with reading/writing to the shared memory, it releases the lock to give some other thread an opportunity to obtain the lock to the shared memory. If two or more threads are waiting on the lock, there are no guarantees as to which thread will be granted the lock and thus allowed to proceed.

The C++ standard provides non-timed mutex and timed mutex classes.

Non-timed Mutex Classes

The Standard Library has three non-timed mutex classes: std::mutex, recursive_mutex, and shared_mutex (since C++17). The first two classes are defined in <mutex>, and the last one in <shared_mutex>. Each mutex supports the following methods.

  • lock(): The calling thread tries to obtain the lock and blocks until the lock has been acquired. It blocks indefinitely. If there is a desire to limit the amount of time the thread blocks, you should use a timed mutex, discussed in the next section.
  • try_lock(): The calling thread tries to obtain the lock. If the lock is currently held by another thread, the call returns immediately. If the lock has been obtained, try_lock() returns true; otherwise, it returns false.
  • unlock(): The calling thread releases the lock it currently holds, making it available for another thread.

std::mutex is a standard mutual exclusion class with exclusive ownership semantics. There can be only one thread owning the mutex. If another thread wants to obtain ownership of this mutex, it either blocks when using lock(), or fails when using try_lock(). A thread already having ownership of a mutex is not allowed to call lock() or try_lock() again on that mutex. This might lead to a deadlock!

std::recursive_mutex behaves almost identically to mutex, except that a thread already having ownership of a recursive mutex is allowed to call lock() or try_lock() again on the same recursive mutex. The calling thread should call the unlock() method as many times as it obtained a lock on the recursive mutex.

The shared_mutex class supports the concept of shared lock ownership, also known as readers-writers lock. A thread can get either exclusive ownership or shared ownership of the lock. Exclusive ownership, also known as a write lock, can be acquired only when there are no other threads having exclusive or shared ownership. Shared ownership, also known as a read lock, can be acquired if there is no other thread having exclusive ownership, but other threads are allowed to have acquired shared ownership. The shared_mutex class supports lock(), try_lock(), and unlock(). These methods acquire and release exclusive locks. Additionally, they have the following shared ownership-related methods: lock_shared(), try_lock_shared(), and unlock_shared(). These work similarly to the other set of methods, but try to acquire or release shared ownership.

A thread already having a lock on a shared_mutex is not allowed to try to acquire a second lock on that mutex. This might lead to a deadlock!

Timed Mutex Classes

The Standard Library provides three timed mutex classes: std::timed_mutex, recursive_timed_mutex, and shared_timed_mutex. The first two classes are defined in <mutex>, and the last one in <shared_mutex>. They all support the lock(), try_lock(), and unlock() methods; and shared_timed_mutex also supports lock_shared(), try_lock_shared(), and unlock_shared(). All these methods behave the same as described in the previous section. Additionally, they support the following methods.

  • try_lock_for(rel_time): The calling thread tries to obtain the lock for a certain relative time. If the lock could not be obtained after the given timeout, the call fails and returns false. If the lock could be obtained within the timeout, the call succeeds and returns true. The timeout is specified as an std::chrono::duration, see Chapter 20.
  • try_lock_until(abs_time): The calling thread tries to obtain the lock until the system time equals or exceeds the specified absolute time. If the lock could be obtained before this time, the call returns true. If the system time passes the given absolute time, the function stops trying to obtain the lock and returns false. The absolute time is specified as an std::chrono::time_point, see Chapter 20.

A shared_timed_mutex also supports try_lock_shared_for() and try_lock_shared_until().

A thread already having ownership of a timed_mutex or a shared_timed_mutex is not allowed to acquire the lock a second time on that mutex. This might lead to a deadlock!

A recursive_timed_mutex allows a thread to acquire a lock multiple times, just as with recursive_mutex.

Locks

A lock class is an RAII class that makes it easier to correctly obtain and release a lock on a mutex; the destructor of the lock class automatically releases the associated mutex. The C++ standard defines four types of locks: std::lock_guard, unique_lock, shared_lock, and scoped_lock. The latter has been introduced with C++17.

lock_guard

lock_guard, defined in <mutex>, is a simple lock with two constructors:

  • explicit lock_guard(mutex_type& m);

    This is a constructor accepting a reference to a mutex. This constructor tries to obtain a lock on the mutex and blocks until the lock is obtained. The explicit keyword for constructors is discussed in Chapter 9.

  • lock_guard(mutex_type& m, adopt_lock_t);

    This is a constructor accepting a reference to a mutex and an instance of std::adopt_lock_t. There is a predefined adopt_lock_t instance provided, called std::adopt_lock. The lock assumes that the calling thread has already obtained a lock on the referenced mutex. The lock “adopts” the mutex, and automatically releases the mutex when the lock is destroyed.

unique_lock

std::unique_lock, defined in <mutex>, is a more sophisticated lock that allows you to defer lock acquisition until later in the execution, long after the declaration. You can use the owns_lock() method, or the unique_lock’s bool conversion operator, to see if the lock has been acquired. An example of using this conversion operator is given later in this chapter in the section “Using Timed Locks.” unique_lock has several constructors:

  • explicit unique_lock(mutex_type& m);

    This constructor accepts a reference to a mutex. It tries to obtain a lock on the mutex and blocks until the lock is obtained.

  • unique_lock(mutex_type& m, defer_lock_t) noexcept;

    This constructor accepts a reference to a mutex and an instance of std::defer_lock_t. There is a predefined defer_lock_t instance provided, called std::defer_lock. The unique_lock stores the reference to the mutex, but does not immediately try to obtain a lock. A lock can be obtained later.

  • unique_lock(mutex_type& m, try_to_lock_t);

    This constructor accepts a reference to a mutex and an instance of std::try_to_lock_t. There is a predefined try_to_lock_t instance provided, called std::try_to_lock. The lock tries to obtain a lock to the referenced mutex, but if it fails, it does not block, in which case, a lock can be obtained later.

  • unique_lock(mutex_type& m, adopt_lock_t);

    This constructor accepts a reference to a mutex and an instance of std::adopt_lock_t, for example std::adopt_lock. The lock assumes that the calling thread has already obtained a lock on the referenced mutex. The lock “adopts” the mutex, and automatically releases the mutex when the lock is destroyed.

  • unique_lock(mutex_type& m, const chrono::time_point<Clock, Duration>& abs_time);

    This constructor accepts a reference to a mutex and an absolute time. The constructor tries to obtain a lock until the system time passes the given absolute time. The chrono library is discussed in Chapter 20.

  • unique_lock(mutex_type& m, const chrono::duration<Rep, Period>& rel_time);

    This constructor accepts a reference to a mutex and a relative time. The constructor tries to get a lock on the mutex with the given relative timeout.

The unique_lock class also has the methods lock(), try_lock(), try_lock_for(), try_lock_until(), and unlock(), which behave as explained in the section “Timed Mutex Classes,” earlier in this chapter.

shared_lock

The shared_lock class, defined in <shared_mutex>, has the same type of constructors and the same methods as unique_lock. The difference is that the shared_lock class calls the shared ownership-related methods on the underlying shared mutex. Thus, the methods of shared_lock are called lock(), try_lock(), and so on, but on the underlying shared mutex they call lock_shared(), try_lock_shared(), and so on. This is done to give shared_lock the same interface as unique_lock, so it can be used as a stand-in replacement for unique_lock, but acquires a shared lock instead of an exclusive lock.

Acquiring Multiple Locks at Once

C++ has two generic lock functions that you can use to obtain locks on multiple mutex objects at once without the risk of creating deadlocks. Both functions are defined in the std namespace, and both are variadic template functions, as discussed in Chapter 22.

The first function, lock(), locks all the given mutex objects in an unspecified order without the risk of deadlocks. If one of the mutex lock calls throws an exception, unlock() is called on all locks that have already been obtained. Its prototype is as follows:

template <class L1, class L2, class… L3> void lock(L1&, L2&, L3&…);

try_lock() has a similar prototype, but it tries to obtain a lock on all the given mutex objects by calling try_lock() on each of them in sequence. It returns -1 if all calls to try_lock() succeed. If any try_lock() fails, unlock() is called on all locks that have already been obtained, and the return value is the zero-based index of the parameter position of the mutex on which try_lock() failed.

The following example demonstrates how to use the generic lock() function. The process() function first creates two locks, one for each mutex, and gives an instance of std::defer_lock_t as a second argument to tell unique_lock not to acquire the lock during construction. The call to std::lock() then acquires both locks without the risk of deadlocks.

mutex mut1;
mutex mut2;

void process()
{
    unique_lock lock1(mut1, defer_lock);  // C++17
    unique_lock lock2(mut2, defer_lock);  // C++17
    //unique_lock<mutex> lock1(mut1, defer_lock);
    //unique_lock<mutex> lock2(mut2, defer_lock);
    lock(lock1, lock2);
    // Locks acquired
} // Locks automatically released

scoped_lock

std::scoped_lock, defined in <mutex>, is similar to lock_guard, except that it accepts a variable number of mutexes. This greatly simplifies acquiring multiple locks. For instance, the example with the process() function in the previous section can be written using a scoped_lock as follows:

mutex mut1;
mutex mut2;

void process()
{
    scoped_lock locks(mut1, mut2);
    // Locks acquired
} // Locks automatically released

This uses C++17’s template argument deduction for constructors. If your compiler does not support this feature yet, you have to write the following:

scoped_lock<mutex, mutex> locks(mut1, mut2);

std::call_once

You can use std::call_once() in combination with std::once_flag to make sure a certain function or method is called exactly one time, no matter how many threads try to call call_once() with the same once_flag. Only one call_once() invocation actually calls the given function or method. If the given function does not throw any exceptions, then this invocation is called the effective call_once() invocation. If the given function does throw an exception, the exception is propagated back to the caller, and another caller is selected to execute the function. The effective invocation on a specific once_flag instance finishes before all other call_once() invocations on the same once_flag. Other threads calling call_once() on the same once_flag block until the effective call is finished. Figure 23-3 illustrates this with three threads. Thread 1 performs the effective call_once() invocation, thread 2 blocks until the effective invocation is finished, and thread 3 doesn’t block because the effective invocation from thread 1 has already finished.

Illustration of std::call_once() with three threads.

FIGURE 23-3

The following example demonstrates the use of call_once(). The example launches three threads running processingFunction() that use some shared resources. These shared resources should be initialized only once by calling initializeSharedResources() once. To accomplish this, each thread calls call_once() with a global once_flag. The result is that only one thread effectively executes initializeSharedResources(), and exactly one time. While this call_once() call is in progress, other threads block until initializeSharedResources() returns.

once_flag gOnceFlag;

void initializeSharedResources()
{
    // … Initialize shared resources to be used by multiple threads.
    cout << "Shared resources initialized." << endl;
}

void processingFunction()
{
    // Make sure the shared resources are initialized.
    call_once(gOnceFlag, initializeSharedResources);

    // … Do some work, including using the shared resources
    cout << "Processing" << endl;
}

int main()
{
    // Launch 3 threads.
    vector<thread> threads(3);
    for (auto& t : threads) {
        t = thread{ processingFunction };
    }
    // Join on all threads
    for (auto& t : threads) {
        t.join();
    }
} 

The output of this code is as follows:

Shared resources initialized.
Processing
Processing
Processing

Of course, in this example, you could call initializeSharedResources() once in the beginning of the main() function before the threads are launched; however, that wouldn’t demonstrate the use of call_once().

Examples Using Mutual Exclusion Objects

The following sections give a couple of examples on how to use mutual exclusion objects to synchronize multiple threads.

Thread-Safe Writing to Streams

Earlier in this chapter, in the “Threads” section, there is an example with a class called Counter. That example mentions that C++ streams are data-race free by default, but that the output from multiple threads can be interleaved. To solve this interleaving issue, you can use a mutual exclusion object to make sure that only one thread at a time is reading/writing to the stream object.

The following example synchronizes all accesses to cout in the Counter class. For this, a static mutex object is added. It should be static, because all instances of the class should use the same mutex instance. lock_guard is used to obtain a lock on the mutex before writing to cout. Changes compared to the earlier version are shown in bold.

class Counter
{
    public:
        Counter(int id, int numIterations)
            : mId(id), mNumIterations(numIterations)
        {
        }

        void operator()() const
        {
            for (int i = 0; i < mNumIterations; ++i) {
                lock_guard lock(sMutex);
                cout << "Counter " << mId << " has value " << i << endl;
            }
        }
    private:
        int mId;
        int mNumIterations;
        static mutex sMutex;
};

mutex Counter::sMutex;

This code creates a lock_guard instance on each iteration of the for loop. It is recommended to limit the time a lock is held as much as possible; otherwise, you are blocking other threads for too long. For example, if the lock_guard instance was created once right before the for loop, then you would basically lose all multithreading in this code because one thread would hold a lock for the entire duration of its for loop, and all other threads would wait for this lock to be released.

Using Timed Locks

The following example demonstrates how to use a timed mutex. It is the same Counter class as before, but this time it uses a timed_mutex in combination with a unique_lock. A relative time of 200 milliseconds is given to the unique_lock constructor, causing it to try to obtain a lock for 200 milliseconds. If the lock cannot be obtained within this timeout interval, the constructor returns. Afterward, you can check whether or not the lock has been acquired. You can do this with an if statement on the lock variable, because unique_lock defines a bool conversion operator. The timeout is specified using the chrono library, discussed in Chapter 20.

class Counter
{
    public:
        Counter(int id, int numIterations)
            : mId(id), mNumIterations(numIterations)
        {
        }

        void operator()() const
        {
            for (int i = 0; i < mNumIterations; ++i) {
                unique_lock lock(sTimedMutex, 200ms);
                if (lock) {
                    cout << "Counter " << mId << " has value " << i << endl;
                } else {
                    // Lock not acquired in 200ms, skip output.
                }
            }
        }
    private:
        int mId;
        int mNumIterations;
        static timed_mutex sTimedMutex;
};

timed_mutex Counter::sTimedMutex;

Double-Checked Locking

The double-checked locking pattern is actually an anti-pattern, which you should avoid! It is shown here because you might come across it in existing code bases. The idea of the double-checked locking pattern is to try to avoid the use of mutual exclusion objects. It’s a half-baked attempt at trying to write more efficient code than using a mutual exclusion object. It can really go wrong when you try to make it faster than demonstrated in the upcoming example, for instance, by using relaxed atomics (not further discussed), using a regular Boolean instead of an atomic<bool>, and so on. The pattern becomes sensitive to data races, and it is hard to get right. The irony is that using call_once() will actually be faster, and using a magic static1 (if applicable) will be even faster than that.

Double-checked locking could, for example, be used to make sure that resources are initialized exactly once. The following example shows how you can implement this. It is called the double-checked locking pattern because it is checking the value of the gInitialized variable twice, once before acquiring the lock and once right after acquiring the lock. The first gInitialized check is used to prevent acquiring a lock when it is not needed. The second check is required to make sure that no other thread performed the initialization between the first gInitialized check and acquiring the lock.

void initializeSharedResources()
{
    // … Initialize shared resources to be used by multiple threads.
    cout << "Shared resources initialized." << endl;
}

atomic<bool> gInitialized(false);
mutex gMutex;

void processingFunction()
{
    if (!gInitialized) {
        unique_lock lock(gMutex);
        if (!gInitialized) {
            initializeSharedResources();
            gInitialized = true;
        }
    }
    cout << "OK" << endl;
}

int main()
{
    vector<thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.push_back(thread{ processingFunction });
    }
    for (auto& t : threads) {
        t.join();
    }
}

The output clearly shows that only one thread initializes the shared resources:

Shared resources initialized.
OK
OK
OK
OK
OK

CONDITION VARIABLES

Condition variables allow a thread to block until a certain condition is set by another thread, or until the system time reaches a specified time. These variables allow for explicit inter-thread communication. If you are familiar with multithreaded programming using the Win32 API, you can compare condition variables with event objects in Windows.

Two kinds of condition variables are available, both of which are defined in the <condition_variable> header file.

  • std::condition_variable: A condition variable that can wait only on a unique_lock<mutex>, which, according to the C++ standard, allows for maximum efficiency on certain platforms.
  • std::condition_variable_any: A condition variable that can wait on any kind of object, including custom lock types.

A condition_variable supports the following methods:

  • notify_one();

    This method wakes up one of the threads waiting on this condition variable. This is similar to an auto-reset event in Windows.

  • notify_all();

    This method wakes up all threads waiting on this condition variable.

  • wait(unique_lock<mutex>& lk);

    The thread calling wait() should already have acquired a lock on lk. The effect of calling wait() is that it atomically calls lk.unlock() and then blocks the thread, waiting for a notification. When the thread is unblocked by a notify_one() or notify_all() call in another thread, the function calls lk.lock() again, possibly blocking until the lock has been acquired, and then returns.

  • wait_for(unique_lock<mutex>& lk, const chrono::duration<Rep, Period>& rel_time);

    This method is similar to wait(), except that the thread is unblocked by a notify_one() call, a notify_all() call, or when the given timeout has expired.

  • wait_until(unique_lock<mutex>& lk, const chrono::time_point<Clock, Duration>& abs_time);

    This method is similar to wait(), except that the thread is unblocked by a notify_one() call, a notify_all() call, or when the system time passes the given absolute time.

There are also versions of wait(), wait_for(), and wait_until() that accept an extra predicate parameter. For instance, the version of wait() accepting an extra predicate is equivalent to the following:

while (!predicate())
    wait(lk);

The condition_variable_any class supports the same methods as condition_variable, except that it accepts any kind of lock class instead of only a unique_lock<mutex>. The lock class used should have a lock() and unlock() method.

Spurious Wake-Ups

Threads waiting on a condition variable can wake up when another thread calls notify_one() or notify_all(), with a relative timeout, or when the system time reaches a certain time. However, they can also wake up spuriously. This means that a thread can wake up, even if no other thread has called any notify method, and no timeouts have been reached yet. Thus, when a thread waits on a condition variable and wakes up, it needs to check why it woke up. One way to check for this is by using one of the versions of wait() accepting a predicate, as shown in the following example.

Using Condition Variables

As an example, condition variables can be used for background threads processing items from a queue. You can define a queue in which you insert items to be processed. A background thread waits until there are items in the queue. When an item is inserted into the queue, the thread wakes up, processes the item, and goes back to sleep, waiting for the next item. Suppose you have the following queue:

queue<string> mQueue;

You need to make sure that only one thread is modifying this queue at any given time. You can do this with a mutex:

mutex mMutex;

To be able to notify a background thread when an item is added, you need a condition variable:

condition_variable mCondVar;

A thread that wants to add an item to the queue first acquires a lock on the mutex, then adds the item to the queue, and notifies the background thread. You can call notify_one() or notify_all() whether or not you currently have the lock; both work.

// Lock mutex and add entry to the queue.
unique_lock lock(mMutex);
mQueue.push(entry);
// Notify condition variable to wake up thread.
mCondVar.notify_all();

The background thread waits for notifications in an infinite loop, as follows. Note the use of wait() accepting a predicate to correctly handle spurious wake-ups. The predicate checks if there really is something in the queue. When the call to wait() returns, you are sure there is something in the queue.

unique_lock lock(mMutex);
while (true) {
    // Wait for a notification.
    mCondVar.wait(lock, [this]{ return !mQueue.empty(); });
    // Condition variable is notified, so something is in the queue.
    // Process queue item…
}

The section “Example: Multithreaded Logger Class,” toward the end of this chapter, provides a complete example on how to use condition variables to send notifications to other threads.

The C++ standard also defines a helper function called std::notify_all_at_thread_exit(cond, lk) where cond is a condition variable and lk is a unique_lock<mutex> instance. A thread calling this function should already have acquired the lock lk. When the thread exits, it automatically executes the following:

lk.unlock();
cond.notify_all();

FUTURES

As discussed earlier in this chapter, using std::thread to launch a thread that calculates a single result does not make it easy to get the computed result back once the thread has finished executing. Another problem with std::thread is in how it handles errors like exceptions. If a thread throws an exception and this exception is not caught by the thread itself, the C++ runtime calls std::terminate(), which usually terminates the entire application.

A future can be used to more easily get the result out of a thread, and to transport exceptions from one thread to another thread, which can then handle the exception however it wants. Of course, it’s still good practice to always try to handle exceptions in the actual threads as much as possible, in order to prevent them from leaving the thread.

A promise is something where a thread stores its result. A future is used to get access to the result stored in a promise. That is, a promise is the input side for a result, a future is the output side. Once a function, running in the same thread or in another thread, has calculated the value that it wants to return, it can put this value in a promise. This value can then be retrieved through a future. You can think of this mechanism as an inter-thread communication channel for a result.

C++ provides a standard future, called std::future. You can retrieve the result from an std::future as follows. T is the type of the calculated result.

future<T> myFuture = …;   // Is discussed later
T result = myFuture.get();

The call to get() retrieves the result and stores it in the variable result. If calculating the result is not finished yet, the call to get() blocks until the value becomes available. You can only call get() once on a future. The behavior of calling it a second time is undefined by the standard.

If you want to avoid blocking, you can first ask the future if there is a result available:

if (myFuture.wait_for(0)) {  // Value is available
    T result = myFuture.get();
} else {                // Value is not yet available

}

std::promise and std::future

C++ provides the std::promise class as one way to implement the concept of a promise. You can call set_value() on a promise to store a result, or you can call set_exception() on it to store an exception in the promise. Note that you can only call set_value() or set_exception() once on a specific promise. If you call it multiple times, an std::future_error exception will be thrown.

A thread A that launches another thread B to calculate something can create an std::promise and pass this to the launched thread. Note that a promise cannot be copied, but it can be moved into a thread! Thread B uses that promise to store the result. Before moving the promise into thread B, thread A calls get_future() on the created promise to be able to get access to the result once B has finished. Here is a simple example:

void DoWork(promise<int> thePromise)
{
    // … Do some work …
    // And ultimately store the result in the promise.
    thePromise.set_value(42);
}

int main()
{
    // Create a promise to pass to the thread.
    promise<int> myPromise;
    // Get the future of the promise.
    auto theFuture = myPromise.get_future();
    // Create a thread and move the promise into it.
    thread theThread{ DoWork, std::move(myPromise) };

    // Do some more work…

    // Get the result.
    int result = theFuture.get();
    cout << "Result: " << result << endl;

    // Make sure to join the thread.
    theThread.join();
}

std::packaged_task

An std::packaged_task makes it easier to work with promises than explicitly using std::promise, as in the previous section. The following code demonstrates this. It creates a packaged_task to execute CalculateSum(). The future is retrieved from the packaged_task by calling get_future(). A thread is launched, and the packaged_task is moved into it. A packaged_task cannot be copied! After the thread is launched, get() is called on the retrieved future to get the result. This blocks until the result is available.

Note that CalculateSum() does not need to store anything explicitly in any kind of promise. A packaged_task automatically creates a promise, automatically stores the result of the called function, CalculateSum() in this case, in the promise, and automatically stores any exceptions thrown from the function in the promise.

int CalculateSum(int a, int b) { return a + b; }

int main()
{
    // Create a packaged task to run CalculateSum.
    packaged_task<int(int, int)> task(CalculateSum);
    // Get the future for the result of the packaged task.
    auto theFuture = task.get_future();
    // Create a thread, move the packaged task into it, and
    // execute the packaged task with the given arguments.
    thread theThread{ std::move(task), 39, 3 };

    // Do some more work…

    // Get the result.
    int result = theFuture.get();
    cout << result << endl;

    // Make sure to join the thread.
    theThread.join();
}

std::async

If you want to give the C++ runtime more control over whether or not a thread is created to calculate something, you can use std::async(). It accepts a function to be executed, and returns a future that you can use to retrieve the result. There are two ways in which async() can run your function:

  • Run your function on a separate thread asynchronously
  • Run your function on the calling thread synchronously at the time you call get() on the returned future

If you call async() without additional arguments, the runtime automatically chooses one of the two methods depending on factors such as the number of CPU cores in your system and the amount of concurrency already taking place. You can influence the runtime’s behavior by specifying a policy argument.

  • launch::async: forces the runtime to execute the function asynchronously on a different thread.
  • launch::deferred: forces the runtime to execute the function synchronously on the calling thread when get() is called.
  • launch::async | launch::deferred: lets the runtime choose (= default behavior).

The following example demonstrates the use of async():

int calculate()
{
    return 123;
}

int main()
{
    auto myFuture = async(calculate);
    //auto myFuture = async(launch::async, calculate);
    //auto myFuture = async(launch::deferred, calculate);

    // Do some more work…

    // Get the result.
    int result = myFuture.get();
    cout << result << endl;
}

As you can see in this example, std::async() is one of the easiest methods to perform some calculations either asynchronously (on a different thread), or synchronously (on the same thread), and retrieve the result afterward.

Exception Handling

A big advantage of using futures is that they can transport exceptions between threads. Calling get() on a future either returns the calculated result, or rethrows any exception that has been stored in the promise linked to the future. When you use packaged_task or async(), any exception thrown from the launched function is automatically stored in the promise. If you use std::promise as your promise, you can call set_exception() to store an exception in it. Here is an example using async():

int calculate()
{
    throw runtime_error("Exception thrown from calculate().");
}

int main()
{
    // Use the launch::async policy to force asynchronous execution.
    auto myFuture = async(launch::async, calculate);

    // Do some more work…

    // Get the result.
    try {
        int result = myFuture.get();
        cout << result << endl;
    } catch (const exception& ex) {
        cout << "Caught exception: " << ex.what() << endl;
    }
}

std::shared_future

std::future<T> only requires T to be move-constructible. When you call get() on a future<T>, the result is moved out of the future and returned to you. This means you can call get() only once on a future<T>.

If you want to be able to call get() multiple times, even from multiple threads, then you need to use an std::shared_future<T>, in which case T needs to be copy-constructible. A shared_future can be created by using std::future::share(), or by passing a future to the shared_future constructor. Note that a future is not copyable, so you have to move it into the shared_future constructor.

shared_future can be used to wake up multiple threads at once. For example, the following piece of code defines two lambda expressions to be executed asynchronously on different threads. The first thing each lambda expression does is set a value to their respective promise to signal that they have started. Then they both call get() on signalFuture which blocks until a parameter is made available through the future, after which they continue their execution. Each lambda expression captures their respective promise by reference, and captures signalFuture by value, so both lambda expressions have a copy of signalFuture. The main thread uses async() to execute both lambda expressions asynchronously on different threads, waits until both threads have started, and then sets the parameter in the signalPromise which wakes up both threads.

promise<void> thread1Started, thread2Started;

promise<int> signalPromise;
auto signalFuture = signalPromise.get_future().share();
//shared_future<int> signalFuture(signalPromise.get_future());

auto function1 = [&thread1Started, signalFuture] {
    thread1Started.set_value();
    // Wait until parameter is set.
    int parameter = signalFuture.get();
    // …
};

auto function2 = [&thread2Started, signalFuture] {
    thread2Started.set_value();
    // Wait until parameter is set.
    int parameter = signalFuture.get();
    // …
};

// Run both lambda expressions asynchronously.
// Remember to capture the future returned by async()!
auto result1 = async(launch::async, function1);
auto result2 = async(launch::async, function2);

// Wait until both threads have started.
thread1Started.get_future().wait();
thread2Started.get_future().wait();

// Both threads are now waiting for the parameter.
// Set the parameter to wake up both of them.
signalPromise.set_value(42);

EXAMPLE: MULTITHREADED LOGGER CLASS

This section demonstrates how to use threads, mutual exclusion objects, locks, and condition variables to write a multithreaded Logger class. The class allows log messages to be added to a queue from different threads. The Logger class itself processes this queue in a background thread that serially writes the log messages to a file. The class will be designed in two iterations to show you some examples of problems you will encounter when writing multithreaded code.

The C++ standard does not have a thread-safe queue, so it is obvious that you have to protect access to the queue with some synchronization mechanism to prevent multiple threads from reading/writing to the queue at the same time. This example uses a mutual exclusion object and a condition variable to provide the synchronization. Based on that, you might define the Logger class as follows:

class Logger
{
    public:
        // Starts a background thread writing log entries to a file.
        Logger();
        // Prevent copy construction and assignment.
        Logger(const Logger& src) = delete;
        Logger& operator=(const Logger& rhs) = delete;
        // Add log entry to the queue.
        void log(std::string_view entry);
    private:
        // The function running in the background thread.
        void processEntries();
        // Mutex and condition variable to protect access to the queue.
        std::mutex mMutex;
        std::condition_variable mCondVar;
        std::queue<std::string> mQueue;
        // The background thread.
        std::thread mThread;
};

The implementation is as follows. Note that this initial design has a couple of problems and when you try to run it, it might behave strangely or even crash. This is discussed and solved in the next iteration of the Logger class. The inner while loop in the processEntries() method is worth looking at. It processes all messages in the queue one at a time, and acquires and releases the lock on each iteration. This is done to make sure the loop doesn’t keep the lock for too long, blocking other threads.

Logger::Logger()
{
    // Start background thread.
    mThread = thread{ &Logger::processEntries, this };
}

void Logger::log(string_view entry)
{
    // Lock mutex and add entry to the queue.
    unique_lock lock(mMutex);
    mQueue.push(string(entry));
    // Notify condition variable to wake up thread.
    mCondVar.notify_all();
}

void Logger::processEntries()
{
    // Open log file.
    ofstream logFile("log.txt");
    if (logFile.fail()) {
        cerr << "Failed to open logfile." << endl;
        return;
    }

    // Start processing loop.
    unique_lock lock(mMutex);
    while (true) {
        // Wait for a notification.
        mCondVar.wait(lock);

        // Condition variable notified, something might be in the queue.
        lock.unlock();
        while (true) {
            lock.lock();
            if (mQueue.empty()) {
                break;
            } else {
                logFile << mQueue.front() << endl;
                mQueue.pop();
            }
            lock.unlock();
        }
    }
}

This Logger class can be tested with the following test code. It launches a number of threads, all logging a few messages to the same Logger instance.

void logSomeMessages(int id, Logger& logger)
{
    for (int i = 0; i < 10; ++i) {
        stringstream ss;
        ss << "Log entry " << i << " from thread " << id;
        logger.log(ss.str());
    }
}

int main()
{
    Logger logger;
    vector<thread> threads;
    // Create a few threads all working with the same Logger instance.
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(logSomeMessages, i, ref(logger));
    }
    // Wait for all threads to finish.
    for (auto& t : threads) {
        t.join();
    }
}

If you build and run this naive initial version, you will notice that the application is terminated abruptly. That is caused because the application never calls join() or detach() on the background thread. Remember from earlier in this chapter that the destructor of a thread object which is still joinable, that is, neither join() nor detach() has been called yet, will call std::terminate() to terminate all running threads and the application itself. This means that messages still in the queue are not written to the file on disk. Some runtime libraries even issue an error or generate a crash

dump when the application is terminated like this. You need to add a mechanism to gracefully shut down the background thread and wait until the background thread is completely shut down before terminating the application. You can do this by adding a destructor and a Boolean member variable to the class. The new definition of the class is as follows:

class Logger
{
    public:
        // Gracefully shut down background thread.
        virtual ~Logger();

        // Other public members omitted for brevity
    private:
        // Boolean telling the background thread to terminate.
        bool mExit = false;

        // Other members omitted for brevity
};

The destructor sets mExit to true, wakes up the background thread, and then waits until the thread is shut down. The destructor acquires a lock on mMutex before setting mExit to true and before calling notify_all(). This is to prevent a race condition and deadlock with processEntries(). processEntries() could be at the beginning of its while loop right after having checked mExit and right before the call to wait(). If the main thread calls the Logger destructor at that very moment, and the destructor wouldn’t acquire a lock on mMutex, then the destructor sets mExit to true and calls notify_all() after processEntries() has checked mExit and before processEntries() is waiting on the condition variable; thus, processEntries() will not see the new value of mExit and it will miss the notification. In that case, the application is in a deadlock situation, because the destructor is waiting on the join() call and the background thread is waiting on the condition variable. Note that the destructor must release the lock on mMutex before calling join(), which explains the extra code block using curly brackets.

Logger::~Logger()
{
    {
        unique_lock lock(mMutex);
        // Gracefully shut down the thread by setting mExit
        // to true and notifying the thread.
        mExit = true;
        // Notify condition variable to wake up thread.
        mCondVar.notify_all();
    }
    // Wait until thread is shut down. This should be outside the above code
    // block because the lock must be released before calling join()!
    mThread.join();
}

The processEntries() method needs to check this Boolean variable and terminate the processing loop when it’s true:

void Logger::processEntries()
{
    // Open log file.
    ofstream logFile("log.txt");
    if (logFile.fail()) {
        cerr << "Failed to open logfile." << endl;
        return;
    }

    // Start processing loop.
    unique_lock lock(mMutex);
    while (true) {
        if (!mExit) { // Only wait for notifications if we don't have to exit.
           // Wait for a notification.
            mCondVar.wait(lock);
        }

        // Condition variable is notified, so something might be in the queue
        // and/or we need to shut down this thread.
        lock.unlock();
        while (true) {
            lock.lock();
            if (mQueue.empty()) {
                break;
            } else {
                logFile << mQueue.front() << endl;
                mQueue.pop();
            }
            lock.unlock();
        }
        if (mExit) {
            break;
        }
    }
}

Note that you cannot just check for mExit in the condition for the outer while loop because even when mExit is true, there might still be log entries in the queue that need to be written.

You can add artificial delays in specific places in your multithreaded code to trigger certain behavior. Note that such delays should only be added for testing, and should be removed from your final code! For example, to test that the race condition with the destructor is solved, you can remove any calls to log() from the main program, causing it to almost immediately call the destructor of the Logger class, and add the following delay:

void Logger::processEntries()
{
    // Omitted for brevity

    // Start processing loop.
    unique_lock lock(mMutex);
    while (true) {
        this_thread::sleep_for(1000ms);  // Needs #include <chrono>

        if (!mExit) { // Only wait for notifications if we don't have to exit.
           // Wait for a notification.
            mCondVar.wait(lock);
        }

        // Omitted for brevity
    }
}

THREAD POOLS

Instead of creating and deleting threads dynamically throughout your program’s lifetime, you can create a pool of threads that can be used as needed. This technique is often used in programs that want to handle some kind of event in a thread. In most environments, the ideal number of threads is equal to the number of processing cores. If there are more threads than cores, threads will have to be suspended to allow other threads to run, and this will ultimately add overhead. Note that while the ideal number of threads is equal to the number of cores, this applies only in the case where the threads are compute bound and cannot block for any other reason, including I/O. When threads can block, it is often appropriate to run more threads than there are cores. Determining the optimal number of threads in such cases is hard, and may involve throughput measurements.

Because not all processing is identical, it is not uncommon to have threads from a thread pool receive, as part of their input, a function object or lambda expression that represents the computation to be done.

Because threads from a thread pool are pre-existing, it is much more efficient for the operating system to schedule a thread from the pool to run than it is to create one in response to an input. Furthermore, the use of a thread pool allows you to manage the number of threads that are created, so, depending on the platform, you may have just one thread or thousands of threads.

Several libraries are available that implement thread pools, including Intel Threading Building Blocks (TBB), Microsoft Parallel Patterns Library (PPL), and so on. It’s recommended to use such a library for your thread pools instead of writing your own implementation. If you do want to implement a thread pool yourself, it can be done in a similar way to an object pool. Chapter 25 gives an example implementation of an object pool.

THREADING DESIGN AND BEST PRACTICES

This section briefly lists a few best practices related to multithreaded programming.

  • Use parallel Standard Library algorithms: The Standard Library contains a large collection of algorithms. Since C++17, more than 60 of them support parallel execution! Whenever possible, use such parallel algorithms instead of writing your own multithreaded code. See Chapter 18 for details on how to specify parallelization options for algorithms.
  • Before closing the application, make sure all thread objects are unjoinable: Make sure that either join() or detach() has been called on all thread objects. Destructors of threads that are still joinable will call std::terminate() which abruptly terminates all threads and the application.
  • The best synchronization is no synchronization: Multithreaded programming becomes much easier if you manage to design your different threads in such a way that all threads working on shared data read only from that shared data and never write to it, or only write to parts never read by other threads. In that case, there is no need for any synchronization, and you cannot have problems like data races or deadlocks.
  • Try to use the single-thread ownership pattern: This means that a block of data is owned by no more than one thread at a time. Owning the data means that no other thread is allowed to read from or write to the data. When the thread is finished with the data, the data can be passed off to another thread, which now has sole and complete responsibility/ownership of the data. No synchronization is necessary in this case.
  • Use atomic types and operations when possible: Atomic types and atomic operations make it easier to write data-race and deadlock-free code, because they handle synchronization automatically. If atomic types and operations are not possible in your multithreaded design, and you need shared data, you have to use some synchronization mechanism, such as mutual exclusion, to ensure proper synchronization.
  • Use locks to protect mutable shared data: If you need mutable shared data to which multiple threads can write, and you cannot use atomic types and operations, you have to use a locking mechanism to make sure that reads and writes between different threads are synchronized.
  • Release locks as soon as possible: When you need to protect your shared data with a lock, make sure that you release the lock as soon as possible. While a thread is holding a lock, it is blocking other threads waiting for the same lock, possibly hurting performance.
  • Do not manually acquire multiple locks, instead use std::lock() or std::try_lock():If multiple threads need to acquire multiple locks, they must be acquired in the same order in all threads to prevent deadlocks. You should use the generic std::lock() or std::try_lock() functions to acquire multiple locks.
  • Use RAII lock objects: Use the lock_guard, unique_lock, shared_lock, or scoped_lock RAII classes to automatically release locks at the right time.
  • Use a multithreading-aware profiler: This helps to find performance bottlenecks in your multithreaded applications, and to find out if your multiple threads are indeed utilizing all available processing power in your system. An example of a multithreading-aware profiler is the profiler in certain editions of Microsoft Visual Studio.
  • Understand the multithreading support features of your debugger: Most debuggers have at least basic support for debugging multithreaded applications. You should be able to get a list of all running threads in your application, and you should be able to switch to any one of those threads to inspect their call stack. You can use this, for example, to inspect deadlocks because you can see exactly what each thread is doing.
  • Use thread pools instead of creating and destroying a lot of threads dynamically: Your performance decreases if you dynamically create and destroy a lot of threads. In that case, it’s better to use a thread pool to reuse existing threads.
  • Use higher-level multithreading libraries: The C++ standard, at this moment, only provides basic building blocks for writing multithreaded code. Using those correctly is not trivial. Where possible, use higher-level multithreading libraries such as Intel Threading Building Blocks (TBB), Microsoft Parallel Patterns Library (PPL), and so on, rather than reinventing the wheel. Multithreaded programming is hard to get right, and is error prone. More often than not, your wheel may not be as round as you think.

SUMMARY

This chapter gave a brief overview of multithreaded programming using the standard C++ threading support library. It explained how you can use atomic types and atomic operations to operate on shared data without having to use an explicit synchronization mechanism. In case you cannot use these atomic types and operations, you learned how to use mutual exclusion mechanisms to ensure proper synchronization between different threads that need read/write access to shared data. You also saw how promises and futures represent a simple inter-thread communication channel; you can use futures to more easily get a result back from a thread. The chapter finished with a number of best practices for multithreaded application design.

As mentioned in the introduction, this chapter tried to touch on all the basic multithreading building blocks provided by the Standard Library, but due to space constraints, it cannot go into all the details of multithreaded programming. There are books available that discuss nothing but multithreading. See Appendix B for a few references.

NOTES

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset