The AsyncReadSocket class we previously developed had a few problems:
This class is specific to the network socket. We could also use an asynchronous I/O class for files, pipes, or any data stream. Ideally, we should have the ability to allow any data source to be asynchronous, not just network sockets.
There is already a class structure for input from a stream. The top of this hierarchy is the InputStream class. Ideally, we should subclass the InputStream class. We can also benefit from the nested support of the FilterInputStream class and its subclasses.
Unlike the TCPServer class, the AsyncReadSocket class does not do a good job at hiding the threading details.
Do we need to develop a new class for this? Doesn’t
the InputStream class have a method that supports asynchronous
I/O?
Although barely mentioned during the
development of the AsyncReadSocket class, the InputStream class has
the
available()
method that returns the number of bytes
that can be read from the stream without blocking. Although this
method sounds useful, it does not always suit our purposes because
this method returns the number of bytes that have already been read
and are available for processing. On some operating systems, this may
include data that has been received at the machine and is being held
by the operating system, but that’s not universally true
(though it is true on most common operating systems, including those
from Microsoft, Apple, Sun, and other Unix vendors).
Hence, just because the available()
method
returns
does not indicate that a call to the read()
method will block. Since avoiding calls that block is our primary
purpose in developing this class, the
available()
method may not be suitable for our
purpose.
In addition, we can usually benefit somewhat by buffering data within our program rather than relying on the data being buffered by the operating system. If we read this data from the operating system into our program while the program is otherwise unoccupied (when the user is thinking, for example), then the data will be available slightly faster to the program when it attempts to read the input stream, since the data has already been moved from the operating system into the program.
So what we need is an InputStream class whose
available()
method reports the correct number of
bytes that can be actually read without blocking as well as buffering
data within the program itself. This new class, the AsyncInputStream
class, will be implemented just like our AsyncReadSocket class. It
creates another thread that reads from the input stream. Since
reading is done in another thread, the read()
method is free to block if data is not available. Users of our
AsyncInputStream class simply believe that we are an InputStream
object. As shown in Figure 5.3, we are actually
deriving from the FilterInputStream class, which is the base
class for InputStream classes that contains InputStream instances.
The fact that we start another thread to read the data is an implementation detail. Before we examine the policies and other details of our AsyncInputStream class, let’s examine the AsyncInputStream class itself:
import java.net.*; import java.io.*; public class AsyncInputStream extends FilterInputStream implements Runnable { private Thread runner; // Async reader thread private volatile byte result[]; // Buffer private volatile int reslen; // Buffer length private volatile boolean EOF; // End-of-file indicator private volatile IOException IOError; // I/O exceptions BusyFlag lock; // Data lock CondVar empty, full; // Signal variables protected AsyncInputStream(InputStream in, int bufsize) { super(in); lock = new BusyFlag(); // Allocate sync variables. empty = new CondVar(lock); full = new CondVar(lock); result = new byte[bufsize]; // Allocate storage area reslen = 0; // and initialize variables. EOF = false; IOError = null; runner = new Thread(this); // Start reader thread. runner.start(); } protected AsyncInputStream(InputStream in) { this(in, 1024); } public int read() throws IOException { try { lock.getBusyFlag(); while (reslen == 0) { try { if (EOF) return(-1); if (IOError != null) throw IOError; empty.cvWait(); } catch (InterruptedException e) {} } return (int) getChar(); } finally { lock.freeBusyFlag(); } } public int read(byte b[]) throws IOException { return read(b, 0, b.length); } public int read(byte b[], int off, int len) throws IOException { try { lock.getBusyFlag(); while (reslen == 0) { try { if (EOF) return(-1); if (IOError != null) throw IOError; empty.cvWait(); } catch (InterruptedException e) {} } int sizeread = Math.min(reslen, len); byte c[] = getChars(sizeread); System.arraycopy(c, 0, b, off, sizeread); return(sizeread); } finally { lock.freeBusyFlag(); } } public long skip(long n) throws IOException { try { lock.getBusyFlag(); int sizeskip = Math.min(reslen, (int) n); if (sizeskip > 0) { byte c[] = getChars(sizeskip); } return((long)sizeskip); } finally { lock.freeBusyFlag(); } } public int available() throws IOException { return reslen; } public void close() throws IOException { try { lock.getBusyFlag(); reslen = 0; // Clear buffer. EOF = true; // Mark end of file. empty.cvBroadcast(); // Alert all threads. full.cvBroadcast(); } finally { lock.freeBusyFlag(); } } public void mark(int readlimit) { } public void reset() throws IOException { } public boolean markSupported() { return false; } public void run() { try { while (true) { int c = in.read(); try { lock.getBusyFlag(); if ((c == -1) || (EOF)) { EOF = true; // Mark end of file. in.close(); // Close input source. return; // End I/O thread. } else { putChar((byte)c); // Store the byte read. } if (EOF) { in.close(); // Close input source. return; // End I/O thread. } } finally { lock.freeBusyFlag(); } } } catch (IOException e) { IOError = e; // Store exception. return; } finally { try { lock.getBusyFlag(); empty.cvBroadcast(); // Alert all threads. } finally { lock.freeBusyFlag(); } } } private void putChar(byte c) { try { lock.getBusyFlag(); while ((reslen == result.length) && (!EOF)) { try { full.cvWait(); } catch (InterruptedException ie) {} } if (!EOF) { result[reslen++] = c; empty.cvSignal(); } } finally { lock.freeBusyFlag(); } } private byte getChar() { try { lock.getBusyFlag(); byte c = result[0]; System.arraycopy(result, 1, result, 0, --reslen); full.cvSignal(); return c; } finally { lock.freeBusyFlag(); } } private byte[] getChars(int chars) { try { lock.getBusyFlag(); byte c[] = new byte[chars]; System.arraycopy(result, 0, c, 0, chars); reslen -= chars; System.arraycopy(result, chars, result, 0, reslen); full.cvSignal(); return c; } finally { lock.freeBusyFlag(); } } }
For our purposes, we aren’t interested in the details of threading the I/O itself; there is no threading code in this class that we have not already seen in the Async-ReadSocket class. The new thread simply does a blocking read on the InputStream, and methods are provided so that the original thread can get the data in a nonblocking manner. The InputStream aspect of this class is interesting, but learning the Java data input system is not within the scope of this book.
Why is the discussion of this class important? And how is this class different from the Async-ReadSocket class? Although this class accomplishes the asynchronous read in the same fashion as the AsyncReadSocket class, it is also a FilterInputStream, and it is the relationship between the threaded I/O and the InputStream class that we are concerned with here. Since this class must behave as an InputStream, we cannot design the behavior of the class as optimally as if all we had been concerned with was communicating with the I/O thread. This is the sort of real-world trade-off that must be made when implementing threaded classes.
In order for the class to function correctly, we need to use practically every synchronization technique that we know. Let’s start with a look at the instance variables and constructors of the AsyncInputStream class:
public class AsyncInputStream extends FilterInputStream implements Runnable { private Thread runner; // Async reader thread private volatile byte result[]; // Buffer private volatile int reslen; // Buffer length private volatile boolean EOF; // End-of-file indicator private volatile IOException IOError; // I/O Exceptions BusyFlag lock; // Data lock CondVar empty, full; // Signal variables protected AsyncInputStream(InputStream in, int bufsize) { super(in); lock = new BusyFlag(); // Allocate sync variables. empty = new CondVar(lock); full = new CondVar(lock); result = new byte[bufsize]; // Allocate storage area reslen = 0; // and initialize variables. EOF = false; IOError = null; runner = new Thread(this); // Start reader thread. runner.start(); } protected AsyncInputStream(InputStream in) { this(in, 1024); }
The first three instance variables, runner
,
result
, and reslen
, are the
important data of the class. runner
is the
reference to the I/O thread that is started by this class, and
result
and reslen
are the
data storage and the length that is being passed back from the I/O
thread. This is an important difference from the AsyncReadSocket
class, which did not support the concept of data size: the
getResult()
method of the AsyncReadSocket class
did not allow the caller to specify the amount to read. Since an
InputStream class can read any amount of data, we must keep track of
available data in the buffers.
The EOF
and IOError
instance variables are also used for communication. In order to
behave as an InputStream class, we must report end-of-file (EOF)
conditions and throw exceptions on I/O errors. These EOF conditions
and I/O exceptions are generated from the InputStream object
contained in the Async-InputStream class. We must save the EOF
condition and catch the I/O exception in the I/O thread, and later
indicate the EOF condition or throw the exception in the calling
thread. If the AsyncInputStream class did not have to behave like an
InputStream class, we could have designed a simpler error reporting
system.
Data in the result
buffer is protected by the
lock
instance variable, and we have associated
two condition variables with the lock: the empty
and full
condition variables. This is an
instance of the buffer management that we discussed with the CondVar
class: we can have threads waiting on a single lock for two different
conditions.
The first constructor of the AsyncInputStream class is
straightforward. First, we just allocate and initialize the buffer
and variables we will use to communicate with the I/O thread. Second,
we instantiate and start()
the I/O thread. The
other constructor has the same signature as the FilterInputStream
class, from which we inherit, and uses a default buffer size. By
providing this constructor, we are behaving like all
FilterInputStreams.
Let’s start to look into the details of how data is passed back to the user:
public int read() throws IOException { try { lock.getBusyFlag(); while (reslen == 0) { try { if (EOF) return(-1); if (IOError != null) throw IOError; empty.cvWait(); } catch (InterruptedException e) {} } return (int) getChar(); } finally { lock.freeBusyFlag(); } } private byte getChar() { try { lock.getBusyFlag(); byte c = result[0]; System.arraycopy(result, 1, result, 0, --reslen); full.cvSignal(); return c; } finally { lock.freeBusyFlag(); } }
In the InputStream class, the read()
method
reads a single byte from the input data stream. If an EOF is detected
or an
IOException is
caught by the I/O thread, it would be placed in the
EOF
or IOError
instance
variables, respectively. The read()
method
returns a -1
to report an EOF
or throws the IOException
on behalf of the I/O
thread.
Also, we check for the EOF and the I/O exception only when there is
no more data in the buffer. Since the I/O thread is reading ahead, we
must delay the EOF
indicator or throw the
exception in the read()
method until the user
has drained the input from the buffer: the user should see the
EOF
or exception at the same point in the data
it actually occurred. The I/O thread stops reading when it receives
either an EOF
or an
IOException
, so we can safely assume all data in
the buffer occurred before either condition happened.
Finally, in order to protect the result
data
buffer and the reslen
length indicator, we use
the lock
BusyFlag. The
getChar()
method, which returns the next
character, also uses this BusyFlag. You might ask why we are only
using a single lock to protect four different instance variables.
This is a design issue; the result
and
reslen
variables are related, and it is unlikely
that we would be examining or changing one without the other. The
EOF
and IOError
variables
are accessed only once during the lifetime of the I/O thread. It is
wasteful to create a new BusyFlag for this purpose when a suitable
lock is already available.
What happens when we do not have data available when a read
is requested? The read()
method must
behave correctly if the application calls the method when data is not
available. This means that the read()
method
must block under such conditions. In other words, the
read()
method must do what it was designed to
avoid in the first place:
public int read() throws IOException { try { lock.getBusyFlag(); while (reslen == 0) { try { if (EOF) return(-1); if (IOError != null) throw IOError; empty.cvWait(); } catch (InterruptedException e) {} } return (int) getChar(); } finally { lock.freeBusyFlag(); } } private void putChar(byte c) { try { lock.getBusyFlag(); while ((reslen == result.length) && (!EOF)) { try { full.cvWait(); } catch (InterruptedException ie) {} } if (!EOF) { result[reslen++] = c; empty.cvSignal(); } } finally { lock.freeBusyFlag(); } }
Obviously, the read()
method cannot block by
reading from the InputStream; the InputStream is under the control of
the I/O thread and should not be accessed directly by the
read()
method. In order to simulate this
blocking, we use the empty
condition variable.
The read()
method simply waits for more data to
arrive. When data arrives in the I/O thread, a signal is generated
when the data is placed in the buffer. This is done by calling the
cvSignal()
method in the
putChar()
method. As can be seen by examining
the run()
method, the
putChar()
method is called by the I/O thread to
place the data it receives in the data buffer:
public void run() { try { while (true) { int c = in.read(); try { lock.getBusyFlag(); if ((c == -1) || (EOF)) { EOF = true; // Mark end of file. in.close(); // Close input source. return; // End I/O thread. } else { putChar((byte)c); // Store the byte read. } if (EOF) { in.close(); // Close input source. return; // End I/O thread. } } finally { lock.freeBusyFlag(); } } } catch (IOException e) { IOError = e; // Store exception. return; } finally { try { lock.getBusyFlag(); empty.cvBroadcast(); // Alert all threads. } finally { lock.freeBusyFlag(); } } }
The code for the I/O thread is similar to the code in our
AsyncReadSocket class. We simply read from the InputStream, blocking
if necessary. When we receive data, we place it in the buffer using
the putChar()
method. Additionally, if we
receive an EOF
indicator or catch an
IOException
, we place that information into the
appropriate instance variables. To allow all of these actions to take
place safely with the other threads, we grab the same lock that is
used by the read thread: the lock
BusyFlag.
What will happen to all the blocking read threads when an
EOF or IOException condition
occurs?
As we mentioned, we are using a condition variable to cause the
read()
method to behave in a blocking manner.
However, when an EOF
or
IOException
condition occurs, there can be no
more future notifications, since no more data will be arriving. To
solve this, we must use the
cvBroadcast()
method when these conditions occur.
The threads can just wake up in turn, taking the available data from
the buffer:
public void run() { try { while (true) { int c = in.read(); try { lock.getBusyFlag(); if ((c == -1) || (EOF)) { EOF = true; // Mark end of file. in.close(); // Close input source. return; // End I/O thread. } else { putChar((byte)c); // Store the byte read. } if (EOF) { in.close(); // Close input source. return; // End I/O thread. } } finally { lock.freeBusyFlag(); } } } catch (IOException e) { IOError = e; // Store exception. return; } finally { try { lock.getBusyFlag(); empty.cvBroadcast(); // Alert all threads. } finally { lock.freeBusyFlag(); } } } public void close() throws IOException { try { lock.getBusyFlag(); reslen = 0; // Clear buffer. EOF = true; // Mark end of file. empty.cvBroadcast(); // Alert all threads. full.cvBroadcast(); } finally { lock.freeBusyFlag(); } }
When no more data is available from the buffer, the remaining threads
reading the InputStream return the EOF
or
IOError
condition from their
read()
methods. We also do not have to worry
about future read()
method calls; they simply
return the EOF
or IOError
condition that occurred.
The implementation of the
available()
method that works as
desired—the method that was the reason for our AsyncInputStream
class—is actually anticlimactic:
public int available() throws IOException { return reslen; }
We simply return the number of bytes we have available in the buffer. Since the I/O thread is actually reading the InputStream, blocking if necessary, we know that there are usually no more bytes sitting on the network that are unaccounted for. There is, however, a maximum amount of data that is held by the buffer (which is user configurable), so that it’s possible that the buffer could be full and data could be held in the network buffers as well.
Finally, we made three additional design decisions during the development of the AsyncInputStream class. While these decisions are important to the AsyncInputStream class, they will not be examined here, because they don’t pertain to our discussion of threading issues. But to be complete, here is a brief overview:
The read(byte[])
method, just like the
read()
method, blocks if data is not available.
However, if data is available, but not enough to fill the byte array,
the read(byte[])
method simply reads less than
requested, returning the number of bytes actually read. We have
chosen this implementation due to the design of the AsyncInputStream
class: it works asynchronously, and this implementation best fulfills
that design spirit.
The
skip()
method skips only the number of bytes possible without blocking. This
means that if the skip()
method is called to
skip more bytes than are available, it simply skips what is available
and returns the number of bytes actually skipped. Again, this
implementation best fulfills the design spirit of the
AsyncInputStream class.
The mark and reset feature of the AsyncInputStream class is not supported, even if this feature is supported in the InputStream class that we contain. There’s no real reason why an asynchronous stream would support this, and if users really require this feature, they can always instantiate a BufferedInputStream object containing our AsyncInputStream object.
Why did we use two condition variables rather than the wait
and notify mechanism? We did this for efficiency. Here is
a case where we have a single data source (the
result
buffer) that can have two conditions: it
can be empty, in which case threads attempting to read must wait for
data, or it can be full, in which case threads attempting to store
data into the buffer must wait for it to be partially emptied. If we
had relied on the wait and notify mechanism, then whenever either
condition occurred we would have had to call the
notifyAll()
method, which would have woken up
too many threads. This would have worked, since all threads recheck
the condition when they wake up, but it is not as efficient as using
the condition variables.
Instances of the AsyncInputStream class behave like any InputStream object. They can be used in cases where an InputStream object is normally used with no changes. While the AsyncInputStream class is also a Runnable type, that is just an implementation detail. Users of the AsyncInputStream class should not even know that a new thread has been started on their behalf when an AsyncInputStream object is instantiated.