In this section, we’ll look at two examples that use synchronization to perform complex synchronization tasks.
Of all the different types of thread synchronization tools, the barrier is probably the easiest to understand and the least used. When we think of synchronization, our first thought is of a group of threads executing part of an overall task followed by a point at which they must synchronize their results. The barrier is simply a waiting point where all the threads can synch up either to merge results or to move on to the next part of the task. The synchronization techniques that we have discussed up to now were concerned with more complicated issues like preventing race conditions, handling data transfer and delivery, or signaling between threads.
Given its simplicity, why has the barrier not been mentioned up to
this point? We have actually used this technique; however, we have
used the Thread class itself to synch up the threads. By using the
join()
method, we have waited for all of the
threads to terminate before we merged the results or started new
threads for the next task.
There are a few problems with using the join()
method. First, we must constantly create and terminate threads. This
means that the threads may lose any state that they have stored in
their previous operation. Second, if we must always create new
threads, logical operations cannot be placed together: since new
threads have to be created for each subtask, the code for each
subtask must be placed in separate run()
methods. It may be easier to code all of the logic as one
method—particularly if the subtasks are very small:
public class Barrier { private int threads2Wait4; private InterruptedException iex; public Barrier (int nThreads) { threads2Wait4 = nThreads; } public synchronized int waitForRest() throws InterruptedException { int threadNum = --threads2Wait4; if (iex != null) throw iex; if (threads2Wait4 <= 0) { notifyAll(); return threadNum; } while (threads2Wait4 > 0) { if (iex != null) throw iex; try { wait(); } catch (InterruptedException ex) { iex = ex; notifyAll(); } } return threadNum; } public synchronized void freeAll() { iex = new InterruptedException("Barrier Released by freeAll"); notifyAll(); } }
Implementation of the Barrier class with the techniques of the
previous chapters is straightforward. We simply have each thread that
arrives at the barrier call the wait()
method,
while the last thread to arrive at the barrier has the task of
notifying all of the waiting threads. If any of the threads receives
an interruption, all of the threads will receive the same
interruption. Another method, freeAll()
, is also
provided to generate an interruption on all of the threads. As an
added benefit, a thread number is assigned to the threads to help
distinguish the waiting threads. The last thread to reach the barrier
is assigned the value of zero, and any threads that reach the barrier
after it has been released are assigned a negative value. This
indicates an error: if you send five threads through a barrier that
is designed to synchronize four threads, the fifth thread will
receive this negative value.
This implementation of the barrier is also a single-use implementation. Once the barrier reaches the thread limit as specified by the constructor, or an error is generated, the barrier will no longer block any threads. Given the simplicity of this implementation, having single-use instances of this class should not be a problem.
Here’s an example of how we might use the Barrier class:
public class ProcessIt implements Runnable { String is[]; Barrier bpStart, bp1, bp2, bpEnd; public ProcessIt (String sources[]) { is = sources; bpStart = new Barrier(sources.length); bp1 = new Barrier(sources.length); bp2 = new Barrier(sources.length); bpEnd = new Barrier(sources.length); for (int i=0; i < is.length; i++) { (new Thread(this)).start(); } } public void run() { try { int i = bpStart.waitForRest(); doPhaseOne(is[i]); bp1.waitForRest(); doPhaseTwo(is[i]); bp2.waitForRest(); doPhaseThree(is[i]); bpEnd.waitForRest(); } catch (InterruptedException ex) {}; } public void doPhaseOne(String ps) { } public void doPhaseTwo(String ps) { } public void doPhaseThree(String ps) { } static public void main(String args[]) { ProcessIt pi = new ProcessIt(args); } }
Using the Barrier class does not mean that we no longer need to
create threads. In the ProcessIt class, we still need to create
threads and implement the run()
method; however,
we only need to implement it once. All three phases of the process
are done in the same run()
method. The thread
simply waits for the other threads before starting the next phase. We
are also using a barrier to allow the threads to start at the same
time and to assign them a thread number.
The flow of execution of this example is shown in Figure 5.1, which also shows how execution would proceed
if we were using the join()
method and creating
new threads. There are some subtle differences between using a
barrier and creating new threads. The first is that the barrier
technique should not task the threading system as much since it does
not destroy and create as many threads, which is sometimes an
advantage. The second is that using the Barrier class means the
application will never be single threaded because all the threads are
always alive.[7] Using the Barrier class is a one-phase
process, whereas using the Thread class is a two-phase process that
requires that we first join()
the
threads—that is, become single threaded—and then create
new threads.
The two-phase process, in which a single thread exists between the
phases, allows tasks to be executed before the new threads are
created. This is not possible when we use the Barrier class, since
the only requirement for the threads to be released is a thread
count. Complicated situations where setup and cleanup tasks need to
be accomplished are a problem. A way to solve this problem is to
modify the Barrier class to allow the barrier to execute setup code
for the next phase. Unfortunately, this removes one of the advantages
of the Barrier class—the ability to have code in a single
location. Instead of having the implementation of the phases in
separate run()
methods, we will have the setup
implementation for the different phases also protected by the Barrier
class.
Here’s how we code this solution without modifying the Barrier class:
public class ProcessIt implements Runnable { public void run() { try { int i = bpStart.waitForRest(); doPhaseOne(is[i]); if (bp1.waitForRest() == 0) doPhaseTwoSetup(); bp1b.waitForRest(); doPhaseTwo(is[i]); if (bp2.waitForRest() == 0) doPhaseThreeSetup(); bp2b.waitForRest(); doPhaseThree(is[i]); bpEnd.waitForRest(); } catch (InterruptedException ex) {}; } }
In this example, instead of having a single barrier between the phases, we now have two barriers. This is done to simulate the two-phase requirements of the cleanup and setup code. Effectively, since only one of the threads will execute the code, that portion of the code is single threaded. We are using the thread number returned by the barrier to determine which thread will execute code. In practice, there are many other techniques for choosing this thread, including making the determination once at the beginning or using a thread just to run the setup code. Furthermore, since we are now able to run cleanup and setup code, there is no need to declare all of the barriers at the beginning. Barrier definition and cleanup may be included in the setup code.
Barriers are useful for algorithms that have multiple passes. A compiler, for instance, often has passes that preprocess the code, parse the code, convert the code to intermediate format, optimize that code, and so on. Each of these passes may be implemented with several threads, each of which needs to wait between passes for all the other threads to complete their portions of the phase.
Condition variables are a type of
synchronization provided by POSIX threads. A condition variable is
very similar to the Java environment’s wait and notify
mechanism—in fact, in most cases it is functionally identical.
The four basic operations of a condition
variable—wait()
,
timed_wait()
, signal()
, and
broadcast()
—map directly to the methods
provided by the Java environment—wait()
,
wait(long)
, notify()
, and
notifyAll()
. The implementations are also
logically identical. The wait()
operation of a
condition variable requires that a mutex lock be held. It will
release the lock while waiting and reacquire the lock prior to
returning to the caller. The signal()
function
wakes up one thread, whereas the broadcast()
function wakes up all waiting threads. These functions also require
that the mutex lock be held during the call. The race conditions of a
condition variable are solved in the same way as those of the Java
environment’s wait and notify mechanism.
There is a subtle difference. In the Java environment, the wait and
notify mechanism is highly integrated with its associated lock. This
makes the mechanism easier to use than its condition variable
counterpart. Calling the wait()
and
notify()
methods from synchronized sections of
code is just a natural part of their use. Using condition variables,
however, requires that you create a separate mutex lock, store that
mutex, and eventually destroy the mutex when it is no longer
necessary.
Unfortunately, Java’s convenience comes with a small price. A condition variable and its associated mutex lock are separate synchronization entities. It is possible to use the same mutex with two different condition variables, to use two different mutexes with the same condition variable, or to use any combination of condition variables and mutexes. While the wait and notify mechanism is much easier to use and solves the problem for most cases of signal-based synchronization, it is not capable of assigning any synchronization lock to any notification object. When you need to signal two different notification objects while requiring the same synchronization lock to protect common data, a condition variable is more efficient.
Here is the implementation of the condition variable:
public class CondVar { private BusyFlag SyncVar; public CondVar() { this(new BusyFlag()); } public CondVar(BusyFlag sv) { SyncVar = sv; } public void cvWait() throws InterruptedException { cvTimedWait(SyncVar, 0); } public void cvWait(BusyFlag sv) throws InterruptedException { cvTimedWait(sv, 0); } public void cvTimedWait(int millis) throws InterruptedException { cvTimedWait(SyncVar, millis); } public void cvTimedWait(BusyFlag sv, int millis) throws InterruptedException { int i = 0; InterruptedException errex = null; synchronized (this) { // You must own the lock in order to use this method. if (sv.getBusyFlagOwner() != Thread.currentThread()) { throw new IllegalMonitorStateException( "current thread not owner"); } // Release the lock (completely). while (sv.getBusyFlagOwner() == Thread.currentThread()) { i++; sv.freeBusyFlag(); } // Use wait() method. try { if (millis == 0) { wait(); } else { wait(millis); } } catch (InterruptedException iex) { errex = iex; } } // Obtain the lock (return to original state). for (; i>0; i--) { sv.getBusyFlag(); } if (errex != null) throw errex; return; } public void cvSignal() { cvSignal(SyncVar); } public synchronized void cvSignal(BusyFlag sv) { // You must own the lock in order to use this method. if (sv.getBusyFlagOwner() != Thread.currentThread()) { throw new IllegalMonitorStateException( "current thread not owner"); } notify(); } public void cvBroadcast() { cvBroadcast(SyncVar); } public synchronized void cvBroadcast(BusyFlag sv) { // You must own the lock in order to use this method. if (sv.getBusyFlagOwner() != Thread.currentThread()) { throw new IllegalMonitorStateException( "current thread not owner"); } notifyAll(); } }
In this code, we simply reverse engineer the wait and notify mechanism, using the BusyFlag class as the synchronization lock instead of the lock that is bound to the object. Signaling between the threads is done with Java’s wait and notify mechanism. And in order to solve the race condition that exists between the BusyFlag class and the CondVar class, we use the standard synchronization mechanism and the wait and notify mechanism.
The cvWait()
mechanism is implemented in three
sections. First, we must free the BusyFlag lock. This is done with
the freeBusyFlag()
method. Since the BusyFlag
class is nestable, we must remove all the locks on the
busyflag
. In order to reacquire the lock at a
later time, we have to keep track of the number of locks removed.
The second section simply calls the wait()
method to map to Java’s internal system. The final task is to
reacquire the locks that were released earlier. The race condition
that exists between the first two sections of the
cvWait()
method is solved by using a
synchronized block around both sections. There is no need to extend
this synchronization to the third section because the signal has
already been received, and if we receive another signal at this
point, that signal should just be ignored by this thread (this is
analogous to what happens if the wait()
method
receives two simultaneous notifications).
The cvSignal()
and
cvBroadcast()
methods simply map to the
notify()
and notifyAll()
methods. These methods must also be synchronized in order to avoid a
race condition with the cvWait()
method.
Most of the time, when you use a condition variable instead of Java’s wait and notify mechanism, you will want to set up two signaling channels (i.e., two variables) that are controlled by a single lock. In these cases, you will construct a single BusyFlag and construct all your condition variables using that BusyFlag. You will then use the methods of the CondVar class that do not require a BusyFlag parameter. For more complex cases, when you need to use different locks for the same variable, you will construct the CondVar without a BusyFlag and then pass a BusyFlag to the wait and signal methods.
One common use of the CondVar class is in buffer management. When threads are sending data to a buffer, they must stop when the buffer is full. Other threads that are reading data from the buffer must wait until data is available in the buffer. Here we have a single lock (associated with the buffer) but two conditions: empty and full. Condition variables allow a much cleaner implementation of this situation than does the simple wait and notify technique. We’ll show an example of this later in this chapter.
[7] It can be argued that since all but one of the threads is waiting, the system is effectively single threaded.