The AsyncInputStream Class

The AsyncReadSocket class we previously developed had a few problems:

  • This class is specific to the network socket. We could also use an asynchronous I/O class for files, pipes, or any data stream. Ideally, we should have the ability to allow any data source to be asynchronous, not just network sockets.

    There is already a class structure for input from a stream. The top of this hierarchy is the InputStream class. Ideally, we should subclass the InputStream class. We can also benefit from the nested support of the FilterInputStream class and its subclasses.

  • Unlike the TCPServer class, the AsyncReadSocket class does not do a good job at hiding the threading details.

Do we need to develop a new class for this? Doesn’t the InputStream class have a method that supports asynchronous I/O? Although barely mentioned during the development of the AsyncReadSocket class, the InputStream class has the available() method that returns the number of bytes that can be read from the stream without blocking. Although this method sounds useful, it does not always suit our purposes because this method returns the number of bytes that have already been read and are available for processing. On some operating systems, this may include data that has been received at the machine and is being held by the operating system, but that’s not universally true (though it is true on most common operating systems, including those from Microsoft, Apple, Sun, and other Unix vendors).

Hence, just because the available() method returns does not indicate that a call to the read() method will block. Since avoiding calls that block is our primary purpose in developing this class, the available() method may not be suitable for our purpose.

In addition, we can usually benefit somewhat by buffering data within our program rather than relying on the data being buffered by the operating system. If we read this data from the operating system into our program while the program is otherwise unoccupied (when the user is thinking, for example), then the data will be available slightly faster to the program when it attempts to read the input stream, since the data has already been moved from the operating system into the program.

So what we need is an InputStream class whose available() method reports the correct number of bytes that can be actually read without blocking as well as buffering data within the program itself. This new class, the AsyncInputStream class, will be implemented just like our AsyncReadSocket class. It creates another thread that reads from the input stream. Since reading is done in another thread, the read() method is free to block if data is not available. Users of our AsyncInputStream class simply believe that we are an InputStream object. As shown in Figure 5.3, we are actually deriving from the FilterInputStream class, which is the base class for InputStream classes that contains InputStream instances.

The Java InputStream class hierarchy

Figure 5-3. The Java InputStream class hierarchy

The fact that we start another thread to read the data is an implementation detail. Before we examine the policies and other details of our AsyncInputStream class, let’s examine the AsyncInputStream class itself:

import java.net.*;
import java.io.*;

public class AsyncInputStream extends FilterInputStream
                                implements Runnable {
    private Thread runner;                // Async reader thread
    private volatile byte result[];       // Buffer
    private volatile int reslen;          // Buffer length
    private volatile boolean EOF;         // End-of-file indicator
    private volatile IOException IOError; // I/O exceptions

    BusyFlag lock;                    // Data lock
    CondVar empty, full;              // Signal variables

    protected AsyncInputStream(InputStream in, int bufsize) {
        super(in);

        lock = new BusyFlag();        // Allocate sync variables.
        empty = new CondVar(lock);
        full = new CondVar(lock);

        result = new byte[bufsize];   // Allocate storage area
        reslen = 0;                   // and initialize variables.
        EOF = false;
        IOError = null;
        runner = new Thread(this);    // Start reader thread.
        runner.start();
    }

    protected AsyncInputStream(InputStream in) {
        this(in, 1024);
    }

    public int read() throws IOException {
        try {
            lock.getBusyFlag();
            while (reslen == 0) {
                try {
                    if (EOF) return(-1);
                    if (IOError != null) throw IOError;
                    empty.cvWait();
                } catch (InterruptedException e) {}
            }
            return (int) getChar();
        } finally {
            lock.freeBusyFlag();
        }
    }

    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
    }

    public int read(byte b[], int off, int len) throws IOException {
        try {
            lock.getBusyFlag();
            while (reslen == 0) {
                try {
                    if (EOF) return(-1);
                    if (IOError != null) throw IOError;
                    empty.cvWait();
                } catch (InterruptedException e) {}
            }

            int sizeread = Math.min(reslen, len);
            byte c[] = getChars(sizeread);
            System.arraycopy(c, 0, b, off, sizeread);
            return(sizeread);
        } finally {
            lock.freeBusyFlag();
        }
    }

    public long skip(long n) throws IOException {
        try {
            lock.getBusyFlag();
            int sizeskip = Math.min(reslen, (int) n);
            if (sizeskip > 0) {
                byte c[] = getChars(sizeskip);
            }
            return((long)sizeskip);
        } finally {
            lock.freeBusyFlag();
        }
    }

    public int available() throws IOException {
        return reslen;
    }

    public void close() throws IOException {
        try {
            lock.getBusyFlag();
            reslen = 0;                        // Clear buffer.
            EOF = true;                        // Mark end of file.
            empty.cvBroadcast();               // Alert all threads.
            full.cvBroadcast();
        } finally {
            lock.freeBusyFlag();
        }
    }

    public void mark(int readlimit) {
    }

    public void reset() throws IOException {
    }

    public boolean markSupported() {
        return false;
    }

    public void run() {
        try {
            while (true) {
                int c = in.read();
                try {
                    lock.getBusyFlag();
                    if ((c == -1) || (EOF)) {
                        EOF = true;            // Mark end of file.
                        in.close();            // Close input source.
                        return;                // End I/O thread.
                    } else {
                        putChar((byte)c);      // Store the byte read.
                    }
                    if (EOF) {
                        in.close();            // Close input source.
                        return;                // End I/O thread.
                    }
                } finally {
                    lock.freeBusyFlag();
                }
            }

        } catch (IOException e) {
            IOError = e;                       // Store exception.
            return;
        } finally {
            try {
                lock.getBusyFlag();
                empty.cvBroadcast();           // Alert all threads.
            } finally {
                lock.freeBusyFlag();
            }
        }
    }

    private void putChar(byte c) {
        try {
            lock.getBusyFlag();
            while ((reslen == result.length) && (!EOF)) {
                try {
                    full.cvWait();
                } catch (InterruptedException ie) {}
            }
            if (!EOF) {
                result[reslen++] = c;
                empty.cvSignal();
            }
        } finally {
            lock.freeBusyFlag();
        }
    }

    private byte getChar() {
        try {
            lock.getBusyFlag();
            byte c = result[0];
            System.arraycopy(result, 1, result, 0, --reslen);
            full.cvSignal();
            return c;
        } finally {
            lock.freeBusyFlag();
        }
    }

    private byte[] getChars(int chars) {
        try {
            lock.getBusyFlag();
            byte c[] = new byte[chars];
            System.arraycopy(result, 0, c, 0, chars);
            reslen -= chars;
            System.arraycopy(result, chars, result, 0, reslen);
            full.cvSignal();
            return c;
        } finally {
            lock.freeBusyFlag();
        }
    }
}

For our purposes, we aren’t interested in the details of threading the I/O itself; there is no threading code in this class that we have not already seen in the Async-ReadSocket class. The new thread simply does a blocking read on the InputStream, and methods are provided so that the original thread can get the data in a nonblocking manner. The InputStream aspect of this class is interesting, but learning the Java data input system is not within the scope of this book.

Why is the discussion of this class important? And how is this class different from the Async-ReadSocket class? Although this class accomplishes the asynchronous read in the same fashion as the AsyncReadSocket class, it is also a FilterInputStream, and it is the relationship between the threaded I/O and the InputStream class that we are concerned with here. Since this class must behave as an InputStream, we cannot design the behavior of the class as optimally as if all we had been concerned with was communicating with the I/O thread. This is the sort of real-world trade-off that must be made when implementing threaded classes.

In order for the class to function correctly, we need to use practically every synchronization technique that we know. Let’s start with a look at the instance variables and constructors of the AsyncInputStream class:

public class AsyncInputStream extends FilterInputStream
                                implements Runnable {
    private Thread runner;                     // Async reader thread
    private volatile byte result[];            // Buffer
    private volatile int reslen;               // Buffer length
    private volatile boolean EOF;              // End-of-file indicator
    private volatile IOException IOError;      // I/O Exceptions

    BusyFlag lock;                    // Data lock
    CondVar empty, full;              // Signal variables

    protected AsyncInputStream(InputStream in, int bufsize) {
        super(in);

        lock = new BusyFlag();        // Allocate sync variables.
        empty = new CondVar(lock);
        full = new CondVar(lock);

        result = new byte[bufsize];   // Allocate storage area
        reslen = 0;                   // and initialize variables.
        EOF = false;
        IOError = null;
        runner = new Thread(this);    // Start reader thread.
        runner.start();
    }

    protected AsyncInputStream(InputStream in) {
        this(in, 1024);
    }

The first three instance variables, runner, result, and reslen, are the important data of the class. runner is the reference to the I/O thread that is started by this class, and result and reslen are the data storage and the length that is being passed back from the I/O thread. This is an important difference from the AsyncReadSocket class, which did not support the concept of data size: the getResult() method of the AsyncReadSocket class did not allow the caller to specify the amount to read. Since an InputStream class can read any amount of data, we must keep track of available data in the buffers.

The EOF and IOError instance variables are also used for communication. In order to behave as an InputStream class, we must report end-of-file (EOF) conditions and throw exceptions on I/O errors. These EOF conditions and I/O exceptions are generated from the InputStream object contained in the Async-InputStream class. We must save the EOF condition and catch the I/O exception in the I/O thread, and later indicate the EOF condition or throw the exception in the calling thread. If the AsyncInputStream class did not have to behave like an InputStream class, we could have designed a simpler error reporting system.

Data in the result buffer is protected by the lock instance variable, and we have associated two condition variables with the lock: the empty and full condition variables. This is an instance of the buffer management that we discussed with the CondVar class: we can have threads waiting on a single lock for two different conditions.

The first constructor of the AsyncInputStream class is straightforward. First, we just allocate and initialize the buffer and variables we will use to communicate with the I/O thread. Second, we instantiate and start() the I/O thread. The other constructor has the same signature as the FilterInputStream class, from which we inherit, and uses a default buffer size. By providing this constructor, we are behaving like all FilterInputStreams.

Let’s start to look into the details of how data is passed back to the user:

public int read() throws IOException {
        try {
            lock.getBusyFlag();
            while (reslen == 0) {
                try {
                    if (EOF) return(-1);
                                if (IOError != null) throw IOError;
                    empty.cvWait();
                } catch (InterruptedException e) {}
            }
            return (int) getChar();
        } finally {
            lock.freeBusyFlag();
        }
    }
    
    private byte getChar() {
        try {
            lock.getBusyFlag();
            byte c = result[0];
            System.arraycopy(result, 1, result, 0, --reslen);
            full.cvSignal();
            return c;
        } finally {
            lock.freeBusyFlag();
        }
    }

In the InputStream class, the read() method reads a single byte from the input data stream. If an EOF is detected or an IOException is caught by the I/O thread, it would be placed in the EOF or IOError instance variables, respectively. The read() method returns a -1 to report an EOF or throws the IOException on behalf of the I/O thread.

Also, we check for the EOF and the I/O exception only when there is no more data in the buffer. Since the I/O thread is reading ahead, we must delay the EOF indicator or throw the exception in the read() method until the user has drained the input from the buffer: the user should see the EOF or exception at the same point in the data it actually occurred. The I/O thread stops reading when it receives either an EOF or an IOException, so we can safely assume all data in the buffer occurred before either condition happened.

Finally, in order to protect the result data buffer and the reslen length indicator, we use the lock BusyFlag. The getChar() method, which returns the next character, also uses this BusyFlag. You might ask why we are only using a single lock to protect four different instance variables. This is a design issue; the result and reslen variables are related, and it is unlikely that we would be examining or changing one without the other. The EOF and IOError variables are accessed only once during the lifetime of the I/O thread. It is wasteful to create a new BusyFlag for this purpose when a suitable lock is already available.

What happens when we do not have data available when a read is requested? The read() method must behave correctly if the application calls the method when data is not available. This means that the read() method must block under such conditions. In other words, the read() method must do what it was designed to avoid in the first place:

public int read() throws IOException {
        try {
            lock.getBusyFlag();
            while (reslen == 0) {
                try {
                    if (EOF) return(-1);
                    if (IOError != null) throw IOError;
                    empty.cvWait();
                } catch (InterruptedException e) {}
            }
            return (int) getChar();
        } finally {
            lock.freeBusyFlag();
        }
    }

                
    private void putChar(byte c) {
        try {
            lock.getBusyFlag();
            while ((reslen == result.length) && (!EOF)) {
                try {
                    full.cvWait();
                } catch (InterruptedException ie) {}
            }
            if (!EOF) {
                result[reslen++] = c;
                empty.cvSignal();
            }
        } finally {
            lock.freeBusyFlag();
        }
    }

Obviously, the read() method cannot block by reading from the InputStream; the InputStream is under the control of the I/O thread and should not be accessed directly by the read() method. In order to simulate this blocking, we use the empty condition variable. The read() method simply waits for more data to arrive. When data arrives in the I/O thread, a signal is generated when the data is placed in the buffer. This is done by calling the cvSignal() method in the putChar() method. As can be seen by examining the run() method, the putChar() method is called by the I/O thread to place the data it receives in the data buffer:

public void run() {
        try {
            while (true) {
                int c = in.read();
                try {
                    lock.getBusyFlag();
                    if ((c == -1) || (EOF)) {
                                    EOF = true;           // Mark end of file.
                        in.close();           // Close input source.
                        return;               // End I/O thread.
                    } else {
                        putChar((byte)c);     // Store the byte read.
                    }
                    if (EOF) {
                        in.close();           // Close input source.
                        return;               // End I/O thread.
                    }
                } finally {
                    lock.freeBusyFlag();
                }
            }
        } catch (IOException e) {
                        IOError = e;                      // Store exception.
            return;
        } finally {
            try {
                lock.getBusyFlag();
                empty.cvBroadcast();          // Alert all threads.
            } finally {
                lock.freeBusyFlag();
            }
        }
    }

The code for the I/O thread is similar to the code in our AsyncReadSocket class. We simply read from the InputStream, blocking if necessary. When we receive data, we place it in the buffer using the putChar() method. Additionally, if we receive an EOF indicator or catch an IOException, we place that information into the appropriate instance variables. To allow all of these actions to take place safely with the other threads, we grab the same lock that is used by the read thread: the lock BusyFlag.

What will happen to all the blocking read threads when an EOF or IOException condition occurs? As we mentioned, we are using a condition variable to cause the read() method to behave in a blocking manner. However, when an EOF or IOException condition occurs, there can be no more future notifications, since no more data will be arriving. To solve this, we must use the cvBroadcast() method when these conditions occur. The threads can just wake up in turn, taking the available data from the buffer:

public void run() {
        try {
            while (true) {
                int c = in.read();
                try {
                    lock.getBusyFlag();
                    if ((c == -1) || (EOF)) {
                        EOF = true;           // Mark end of file.
                        in.close();           // Close input source.
                        return;               // End I/O thread.
                    } else {
                        putChar((byte)c);     // Store the byte read.
                    }
                    if (EOF) {
                        in.close();           // Close input source.
                        return;               // End I/O thread.
                    }
                } finally {
                    lock.freeBusyFlag();
                }
            }

        } catch (IOException e) {
            IOError = e;                      // Store exception.
            return;
        } finally {
            try {
                lock.getBusyFlag();
                empty.cvBroadcast();          // Alert all threads.
            } finally {
                lock.freeBusyFlag();
            }
        }
    }
    
    public void close() throws IOException {
        try {
            lock.getBusyFlag();
            reslen = 0;                       // Clear buffer.
            EOF = true;                       // Mark end of file.
            empty.cvBroadcast();              // Alert all threads.
                        full.cvBroadcast();
        } finally {
            lock.freeBusyFlag();
        }
    }

When no more data is available from the buffer, the remaining threads reading the InputStream return the EOF or IOError condition from their read() methods. We also do not have to worry about future read() method calls; they simply return the EOF or IOError condition that occurred.

The implementation of the available() method that works as desired—the method that was the reason for our AsyncInputStream class—is actually anticlimactic:

public int available() throws IOException {
    return reslen;
}

We simply return the number of bytes we have available in the buffer. Since the I/O thread is actually reading the InputStream, blocking if necessary, we know that there are usually no more bytes sitting on the network that are unaccounted for. There is, however, a maximum amount of data that is held by the buffer (which is user configurable), so that it’s possible that the buffer could be full and data could be held in the network buffers as well.

Finally, we made three additional design decisions during the development of the AsyncInputStream class. While these decisions are important to the AsyncInputStream class, they will not be examined here, because they don’t pertain to our discussion of threading issues. But to be complete, here is a brief overview:

  • The read(byte[]) method, just like the read() method, blocks if data is not available. However, if data is available, but not enough to fill the byte array, the read(byte[]) method simply reads less than requested, returning the number of bytes actually read. We have chosen this implementation due to the design of the AsyncInputStream class: it works asynchronously, and this implementation best fulfills that design spirit.

  • The skip() method skips only the number of bytes possible without blocking. This means that if the skip() method is called to skip more bytes than are available, it simply skips what is available and returns the number of bytes actually skipped. Again, this implementation best fulfills the design spirit of the AsyncInputStream class.

  • The mark and reset feature of the AsyncInputStream class is not supported, even if this feature is supported in the InputStream class that we contain. There’s no real reason why an asynchronous stream would support this, and if users really require this feature, they can always instantiate a BufferedInputStream object containing our AsyncInputStream object.

Why did we use two condition variables rather than the wait and notify mechanism? We did this for efficiency. Here is a case where we have a single data source (the result buffer) that can have two conditions: it can be empty, in which case threads attempting to read must wait for data, or it can be full, in which case threads attempting to store data into the buffer must wait for it to be partially emptied. If we had relied on the wait and notify mechanism, then whenever either condition occurred we would have had to call the notifyAll() method, which would have woken up too many threads. This would have worked, since all threads recheck the condition when they wake up, but it is not as efficient as using the condition variables.

Instances of the AsyncInputStream class behave like any InputStream object. They can be used in cases where an InputStream object is normally used with no changes. While the AsyncInputStream class is also a Runnable type, that is just an implementation detail. Users of the AsyncInputStream class should not even know that a new thread has been started on their behalf when an AsyncInputStream object is instantiated.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset