Simple Synchronization Examples

In this section, we’ll look at two examples that use synchronization to perform complex synchronization tasks.

Barrier

Of all the different types of thread synchronization tools, the barrier is probably the easiest to understand and the least used. When we think of synchronization, our first thought is of a group of threads executing part of an overall task followed by a point at which they must synchronize their results. The barrier is simply a waiting point where all the threads can synch up either to merge results or to move on to the next part of the task. The synchronization techniques that we have discussed up to now were concerned with more complicated issues like preventing race conditions, handling data transfer and delivery, or signaling between threads.

Given its simplicity, why has the barrier not been mentioned up to this point? We have actually used this technique; however, we have used the Thread class itself to synch up the threads. By using the join() method, we have waited for all of the threads to terminate before we merged the results or started new threads for the next task.

There are a few problems with using the join() method. First, we must constantly create and terminate threads. This means that the threads may lose any state that they have stored in their previous operation. Second, if we must always create new threads, logical operations cannot be placed together: since new threads have to be created for each subtask, the code for each subtask must be placed in separate run() methods. It may be easier to code all of the logic as one method—particularly if the subtasks are very small:

public class Barrier {
    private int threads2Wait4;
    private InterruptedException iex;

    public Barrier (int nThreads) {
        threads2Wait4 = nThreads;
    }

    public synchronized int waitForRest()
        throws InterruptedException {
        int threadNum = --threads2Wait4;

        if (iex != null) throw iex;
        if (threads2Wait4 <= 0) {
            notifyAll();
            return threadNum;
        }
        while (threads2Wait4 > 0) {
            if (iex != null) throw iex;
            try {
                wait();
            } catch (InterruptedException ex) {
                iex = ex;
                notifyAll();
            }
        }
        return threadNum;
    }

    public synchronized void freeAll() {
        iex = new InterruptedException("Barrier Released by freeAll");
        notifyAll();
    }
}

Implementation of the Barrier class with the techniques of the previous chapters is straightforward. We simply have each thread that arrives at the barrier call the wait() method, while the last thread to arrive at the barrier has the task of notifying all of the waiting threads. If any of the threads receives an interruption, all of the threads will receive the same interruption. Another method, freeAll(), is also provided to generate an interruption on all of the threads. As an added benefit, a thread number is assigned to the threads to help distinguish the waiting threads. The last thread to reach the barrier is assigned the value of zero, and any threads that reach the barrier after it has been released are assigned a negative value. This indicates an error: if you send five threads through a barrier that is designed to synchronize four threads, the fifth thread will receive this negative value.

This implementation of the barrier is also a single-use implementation. Once the barrier reaches the thread limit as specified by the constructor, or an error is generated, the barrier will no longer block any threads. Given the simplicity of this implementation, having single-use instances of this class should not be a problem.

Here’s an example of how we might use the Barrier class:

public class ProcessIt implements Runnable {
    String is[];
    Barrier bpStart, bp1, bp2, bpEnd;

    public ProcessIt (String sources[]) {
        is = sources;
        bpStart = new Barrier(sources.length);
        bp1 = new Barrier(sources.length);
        bp2 = new Barrier(sources.length);
        bpEnd = new Barrier(sources.length);

        for (int i=0; i < is.length; i++) {
            (new Thread(this)).start();
        }
    }

    public void run() {
        try {
            int i = bpStart.waitForRest();
            doPhaseOne(is[i]);
            bp1.waitForRest();
            doPhaseTwo(is[i]);
            bp2.waitForRest();
            doPhaseThree(is[i]);
            bpEnd.waitForRest();
        } catch (InterruptedException ex) {};
    }

    public void doPhaseOne(String ps) {
    }

    public void doPhaseTwo(String ps) {
    }

    public void doPhaseThree(String ps) {
    }

    static public void main(String args[]) {
        ProcessIt pi = new ProcessIt(args);
    }
}

Using the Barrier class does not mean that we no longer need to create threads. In the ProcessIt class, we still need to create threads and implement the run() method; however, we only need to implement it once. All three phases of the process are done in the same run() method. The thread simply waits for the other threads before starting the next phase. We are also using a barrier to allow the threads to start at the same time and to assign them a thread number.

The flow of execution of this example is shown in Figure 5.1, which also shows how execution would proceed if we were using the join() method and creating new threads. There are some subtle differences between using a barrier and creating new threads. The first is that the barrier technique should not task the threading system as much since it does not destroy and create as many threads, which is sometimes an advantage. The second is that using the Barrier class means the application will never be single threaded because all the threads are always alive.[7] Using the Barrier class is a one-phase process, whereas using the Thread class is a two-phase process that requires that we first join() the threads—that is, become single threaded—and then create new threads.

Comparison of the Barrier class with joining threads

Figure 5-1. Comparison of the Barrier class with joining threads

The two-phase process, in which a single thread exists between the phases, allows tasks to be executed before the new threads are created. This is not possible when we use the Barrier class, since the only requirement for the threads to be released is a thread count. Complicated situations where setup and cleanup tasks need to be accomplished are a problem. A way to solve this problem is to modify the Barrier class to allow the barrier to execute setup code for the next phase. Unfortunately, this removes one of the advantages of the Barrier class—the ability to have code in a single location. Instead of having the implementation of the phases in separate run() methods, we will have the setup implementation for the different phases also protected by the Barrier class.

Here’s how we code this solution without modifying the Barrier class:

public class ProcessIt implements Runnable {
    public void run() {
        try {
            int i = bpStart.waitForRest();
            doPhaseOne(is[i]);
            if (bp1.waitForRest() == 0)

                               doPhaseTwoSetup();

                           bp1b.waitForRest();

            doPhaseTwo(is[i]);
            if (bp2.waitForRest() == 0)

                               doPhaseThreeSetup();

                           bp2b.waitForRest();

            doPhaseThree(is[i]);
            bpEnd.waitForRest();
        } catch (InterruptedException ex) {};
    }
}

In this example, instead of having a single barrier between the phases, we now have two barriers. This is done to simulate the two-phase requirements of the cleanup and setup code. Effectively, since only one of the threads will execute the code, that portion of the code is single threaded. We are using the thread number returned by the barrier to determine which thread will execute code. In practice, there are many other techniques for choosing this thread, including making the determination once at the beginning or using a thread just to run the setup code. Furthermore, since we are now able to run cleanup and setup code, there is no need to declare all of the barriers at the beginning. Barrier definition and cleanup may be included in the setup code.

Barriers are useful for algorithms that have multiple passes. A compiler, for instance, often has passes that preprocess the code, parse the code, convert the code to intermediate format, optimize that code, and so on. Each of these passes may be implemented with several threads, each of which needs to wait between passes for all the other threads to complete their portions of the phase.

Condition Variables

Condition variables are a type of synchronization provided by POSIX threads. A condition variable is very similar to the Java environment’s wait and notify mechanism—in fact, in most cases it is functionally identical. The four basic operations of a condition variable—wait(), timed_wait(), signal(), and broadcast()—map directly to the methods provided by the Java environment—wait(), wait(long), notify(), and notifyAll(). The implementations are also logically identical. The wait() operation of a condition variable requires that a mutex lock be held. It will release the lock while waiting and reacquire the lock prior to returning to the caller. The signal() function wakes up one thread, whereas the broadcast() function wakes up all waiting threads. These functions also require that the mutex lock be held during the call. The race conditions of a condition variable are solved in the same way as those of the Java environment’s wait and notify mechanism.

There is a subtle difference. In the Java environment, the wait and notify mechanism is highly integrated with its associated lock. This makes the mechanism easier to use than its condition variable counterpart. Calling the wait() and notify() methods from synchronized sections of code is just a natural part of their use. Using condition variables, however, requires that you create a separate mutex lock, store that mutex, and eventually destroy the mutex when it is no longer necessary.

Unfortunately, Java’s convenience comes with a small price. A condition variable and its associated mutex lock are separate synchronization entities. It is possible to use the same mutex with two different condition variables, to use two different mutexes with the same condition variable, or to use any combination of condition variables and mutexes. While the wait and notify mechanism is much easier to use and solves the problem for most cases of signal-based synchronization, it is not capable of assigning any synchronization lock to any notification object. When you need to signal two different notification objects while requiring the same synchronization lock to protect common data, a condition variable is more efficient.

Here is the implementation of the condition variable:

public class CondVar {
private BusyFlag SyncVar;

    public CondVar() {
        this(new BusyFlag());
    }

    public CondVar(BusyFlag sv) {
        SyncVar = sv;
    }

    public void cvWait() throws InterruptedException {
        cvTimedWait(SyncVar, 0);
    }

    public void cvWait(BusyFlag sv) throws InterruptedException {
        cvTimedWait(sv, 0);
    }

    public void cvTimedWait(int millis) throws InterruptedException {
        cvTimedWait(SyncVar, millis);
    }

    public void cvTimedWait(BusyFlag sv, int millis)
                                throws InterruptedException {
        int i = 0;
        InterruptedException errex = null;

        synchronized (this) {
            // You must own the lock in order to use this method.
            if (sv.getBusyFlagOwner() != Thread.currentThread()) {
                throw new IllegalMonitorStateException(
                                    "current thread not owner");
            }

            // Release the lock (completely).
            while (sv.getBusyFlagOwner() == Thread.currentThread()) {
                i++;
                sv.freeBusyFlag();
            }
        
            // Use wait() method.    
            try {
                if (millis == 0) {
                    wait();
                } else {
                    wait(millis);
                }
            } catch (InterruptedException iex) {
                errex = iex;
            }
        }
     
        // Obtain the lock (return to original state).
        for (; i>0; i--) {
            sv.getBusyFlag();
        }

        if (errex != null) throw errex;
        return;
    }

    public void cvSignal() {
        cvSignal(SyncVar);
    }

    public synchronized void cvSignal(BusyFlag sv) {
        // You must own the lock in order to use this method.
        if (sv.getBusyFlagOwner() != Thread.currentThread()) {
            throw new IllegalMonitorStateException(
                                "current thread not owner");
        }
        notify();
    }

    public void cvBroadcast() {
        cvBroadcast(SyncVar);
    }

    public synchronized void cvBroadcast(BusyFlag sv) {
        // You must own the lock in order to use this method.
        if (sv.getBusyFlagOwner() != Thread.currentThread()) {
            throw new IllegalMonitorStateException(
                                "current thread not owner");
        }
        notifyAll();
    }
}

In this code, we simply reverse engineer the wait and notify mechanism, using the BusyFlag class as the synchronization lock instead of the lock that is bound to the object. Signaling between the threads is done with Java’s wait and notify mechanism. And in order to solve the race condition that exists between the BusyFlag class and the CondVar class, we use the standard synchronization mechanism and the wait and notify mechanism.

The cvWait() mechanism is implemented in three sections. First, we must free the BusyFlag lock. This is done with the freeBusyFlag() method. Since the BusyFlag class is nestable, we must remove all the locks on the busyflag. In order to reacquire the lock at a later time, we have to keep track of the number of locks removed.

The second section simply calls the wait() method to map to Java’s internal system. The final task is to reacquire the locks that were released earlier. The race condition that exists between the first two sections of the cvWait() method is solved by using a synchronized block around both sections. There is no need to extend this synchronization to the third section because the signal has already been received, and if we receive another signal at this point, that signal should just be ignored by this thread (this is analogous to what happens if the wait() method receives two simultaneous notifications).

The cvSignal() and cvBroadcast() methods simply map to the notify() and notifyAll() methods. These methods must also be synchronized in order to avoid a race condition with the cvWait() method.

Most of the time, when you use a condition variable instead of Java’s wait and notify mechanism, you will want to set up two signaling channels (i.e., two variables) that are controlled by a single lock. In these cases, you will construct a single BusyFlag and construct all your condition variables using that BusyFlag. You will then use the methods of the CondVar class that do not require a BusyFlag parameter. For more complex cases, when you need to use different locks for the same variable, you will construct the CondVar without a BusyFlag and then pass a BusyFlag to the wait and signal methods.

One common use of the CondVar class is in buffer management. When threads are sending data to a buffer, they must stop when the buffer is full. Other threads that are reading data from the buffer must wait until data is available in the buffer. Here we have a single lock (associated with the buffer) but two conditions: empty and full. Condition variables allow a much cleaner implementation of this situation than does the simple wait and notify technique. We’ll show an example of this later in this chapter.



[7] It can be argued that since all but one of the threads is waiting, the system is effectively single threaded.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset