Producer-consumer model

A producer-consumer model is a design that divides processing into small components that send messages to other components. One produces a message and the other consumes and acts on it. It provides an abstraction to easily implement an application optimized to utilize all the resources. A producer-consumer model starts with a queue of messages. Producers publish messages in this queue and consumers receive them. This queue is different from the queues we have studied so far in a few ways. We want this queue to be thread-safe, which is required for the queue to work correctly in a multithreaded environment. We do not need to worry about the exact order of the messages being dequeued. After all, the order of messages does not matter when they are being received by different threads. In these conditions, we optimize the delivery of the messages. Before implementing this queue, let's discuss a few thread synchronization techniques beyond what we have learned so far using the synchronized keyword. These techniques are required for more optimal usage of resources while maintaining the correctness of the program.

Semaphore

A semaphore is a special variable that lets us limit the number of threads that can use a particular resource. The following code shows an example of a semaphore that gives us a thread-safe counter:

public class SemaphoreExample {
    volatile int threadSafeInt = 0;
    Semaphore semaphore = new Semaphore(1);
    public int incremementAndGet() throws InterruptedException{
        semaphore.acquire();
        int previousValue = threadSafeInt++;
        semaphore.release();
        return previousValue;
    }
}

Here, the semaphore has been initialized to 1, which means it will allow only one thread to acquire it. No other thread can acquire it until it is released. Unlike synchronization, there is no requirement here that the same thread has to call the release method that had acquired it, which makes it particularly flexible.

A call to the acquire method of a semaphore will be blocked until it successfully acquires it. This means that the calling thread will be taken off the thread scheduler and put aside in such a way that the operating system's thread scheduler will not be able to see it. Once the semaphore is ready to be acquired, this thread will be put back in place for the thread scheduler to see it.

Compare and set

Compare and set is an atomic operation that lets you update the value of a variable if and only if the existing value matches a specific value. This enables us to update a variable based on its previous value. The CAS operation returns a Boolean. If the comparison is a match, which means the set operation is successful, it returns true; otherwise, it returns false. The idea is to keep trying until this set operation is successful. The following diagram shows the basic strategy:

Compare and set

Figure 1: Atomic update using the compare and set operation

In Figure 1, we are trying to increment the value of the shared variable var. The operation requires us to read the value in a thread-specific temporary location, then increment the temporary value and reassign it to the shared variable. However, this operation can cause problems if there are multiple threads trying to perform the update simultaneously. It can happen that both the threads read the value simultaneously in order to get the same temp value. Both these threads can update the shared variable with the incremented value. This will increment the value only once, but it should have actually caused two increments. To avoid this, we check whether the value of var is still the same and update only when it is so; otherwise, we read the value of var again and repeat the process. Since this compare and set operation is atomic, it guarantees that no increments will be lost. The following is the Java code that does the exact same thing:

public class ThreadSafeCounter {
    AtomicInteger counter;
    public int incrementAndGet(){
        while (true){
            int value = counter.get();
            if(counter.compareAndSet(value, value+1)){
                return value;
            }
        }
    }
}

To use any atomic operation, we need to use classes from the java.util.concurrent.atomic package. AtomicInteger is a class that encapsulates an integer and enables the compareAndSet operation on it. There are other utility methods as well. In particular, it has methods to perform atomic increments and decrements, just like the one we have implemented here.

Volatile field

Suppose we have a field that is being written to and read from multiple threads. If all the threads run on the same single CPU, the writes can simply happen on the CPU cache; they need not be synced to the main memory often. This would not be a problem as the value could also be read from the same cache. However, multiple CPUs can have their own caches, and in such a case, a write to a cache from one CPU will not be visible from another thread running on a different CPU. Most programs accept this and work accordingly. Java, for example, maintains a separate copy of a shared variable for each thread, which is occasionally synced. If we, however, want to mandate that the writes from one thread should be visible from another thread, we need to declare the field volatile. All fields involved in atomic operations are declared volatile.

Thread-safe blocking queue

Now we are ready to implement our thread-safe blocking queue. Thread-safe means that multiple threads can share the same queue; blocking means that if a thread tries to dequeue an element and the queue is currently empty, the call to dequeue will be blocked until some other thread enqueues an element. Similarly, if a thread tries to enqueue a new element and the queue is full, the call to the queue will be blocked until another thread dequeues an element and frees some space.

Our queue will store elements in a fixed length array and maintain two counters that would store the next index for queuing and dequeuing. Two semaphores block threads when the queue is either empty or full. Along with this, each array position is provided with two semaphores that ensure that enqueuing and dequeuing operations do not overwrite or repeat any elements. It does this by ensuring that once a new element is enqueued in a particular position, it is not overwriten before it is dequeued. Similarly, once a particular array index is dequeued, it is never dequeued again before another enqueue stores another element in it:

public class ThreadSafeFixedLengthBlockingQueue<E> {

The underflowSemaphore ensures that dequeues are blocked when the queue is empty, and overflowSemaphore ensures that enqueues are blocked when the queue is full:

    Semaphore underflowSemaphore;
    Semaphore overflowSemaphore;

    AtomicInteger nextEnqueueIndex;
    AtomicInteger nextDequeueIndex;

The array store is the space that holds the elements:

    E[] store;

Both enqueueLocks and dequeueLocks are individual position-based locks that allow only a dequeue after an enqueue and vice versa:

    Semaphore [] enqueueLocks;
    Semaphore [] dequeueLocks;

    int length;

The alive flag can be used by the dequeuing threads to know when they can stop running, and no more elements would be expected. This flag needs to be set by the enqueuing threads:

    boolean alive = true;

All initializations are pretty much self-evident:

    public ThreadSafeFixedLengthBlockingQueue(int length){
        this.length = length;
        store = (E[]) new Object[length];
        nextEnqueueIndex = new AtomicInteger();
        nextDequeueIndex = new AtomicInteger();
        underflowSemaphore = new Semaphore(length);
        overflowSemaphore = new Semaphore(length);
        underflowSemaphore.acquireUninterruptibly(length);
        enqueueLocks = new Semaphore[length];
        dequeueLocks = new Semaphore[length];
        for(int i=0;i<length;i++){
            enqueueLocks[i] = new Semaphore(1);
            dequeueLocks[i] = new Semaphore(1);
            dequeueLocks[i].acquireUninterruptibly();
        }
    }

The enqueue operation first makes sure that the queue is not full by acquiring overflowSemaphore:

    public void enqueue(E value) throws InterruptedException {
        overflowSemaphore.acquire();

The nextEnqueueIndex is then incremented and the previous value is returned, which is then used to compute the index in the array where the element would be stored. The seemingly complicated expression ensures that the index rolls over properly even after the nextEnqueueIndex integer rolls over, provided the length of the queue is an integer power of 2:

        int index = (length + nextEnqueueIndex.getAndIncrement() % length) 
          % length;

Once the index is selected, we must acquire an enqueue lock on the position, store the value, and then release the dequeue lock to mark this position as ready for dequeuing. At the end, we release one count on underflowSemaphore to mark the fact that there is one more element in the queue to be dequeued:

        enqueueLocks[index].acquire();
        store[index] = value;
        dequeueLocks[index].release();
        underflowSemaphore.release();
    }

The dequeue operation is very similar to the enqueue operation, just the role of the semaphores are reversed. There is slightly more complicated code before the actual operation starts. This is to enable the dequeuing threads to quit when no more elements are available:

    public E dequeue() throws InterruptedException {

Instead of directly acquiring underflowSemaphore, we use tryAcquire, which will wake up the thread after 1 second if there are no elements are available to be dequeued. This gives us a chance to check the value of the alive Boolean flag and quit the dequeue operation in case it is no longer alive. If the queue is no longer alive, we interrupt the current thread and exit. Otherwise, we compute the index and dequeue the element to the enqueue operation in a similar manner:

        while (alive && !underflowSemaphore.tryAcquire(1, TimeUnit.SECONDS));
        if(!alive){
            Thread.currentThread().interrupt();
        }
        int index = (length + nextDequeueIndex.getAndIncrement() % length) 
                 % length;
        dequeueLocks[index].acquire();
        E value = store[index];
        enqueueLocks[index].release();
        overflowSemaphore.release();
        return value;
    }

This is a utility method to return the current number of elements in the queue. This is useful for knowing when to kill the queue (set the alive flag to false) in a producer-consumer setup:

    public int currentElementCount(){
        return underflowSemaphore.availablePermits();
    }

    public void killDequeuers(){
        alive = false;
    }

}

Producer-consumer implementation

We can now implement a producer-consumer setup using the queue we have created. In simple words, the producer-consumer queue is a queue of events that the producers produce and the consumers consume. There are three kinds of events. The INVOCATION type refers to the regular events that propagate processing. The ERROR type event is raised when an exception needs to be propagated. The COMPLETION event is produced when it is required that the dequeue threads need to be terminated and the queue needs to be closed. The ProcerConsumer queue takes Consumer as input:

public interface Consumer<E> {
    void onMessage(E message);
    default void onError(Exception error){
        error.printStackTrace();
    }
    default void onComplete(){

    }
}

public class ProducerConsumerQueue<E> {
    enum EventType{
        INVOCATION, ERROR, COMPLETION
    }

The Event class represents single events. Depending on the type, it can have a value or exception:

    class Event{
        E value;
        Exception error;
        EventType eventType;
    }
    ThreadSafeFixedLengthBlockingQueue<Event> queue;
    boolean alive = true;
    Thread [] threads;

The constructor of ProducerConsuerQueue creates consumer threads. It also takes consumer code as input. The consumer must implement the Consumer interface:

    public ProducerConsumerQueue(int bufferSize, int threadCount, 
                 Consumer<E> consumer){
        queue = new ThreadSafeFixedLengthBlockingQueue<>(bufferSize);
        threads = new Thread[threadCount];

The consumer thread runs code that dequeues events and calls the methods on consumerCode as per the event type in the loop. The loop ends when the termination event is received and no more events are there in the queue to be processed:

        Runnable consumerCode = ()->{
            try{
                while(alive || queue.currentElementCount()>0){
                    Event e = queue.dequeue();
                    switch (e.eventType) {
                        case INVOCATION:
                            consumer.onMessage(e.value);
                            break;
                        case ERROR:
                            consumer.onError(e.error);
                            break;
                        case COMPLETION:
                            alive = false;
                            consumer.onComplete();
                    }
                }

            } catch (InterruptedException e) {

            } finally{

            }
        };

Consumer threads are spawned:

        for(int i=0;i<threadCount;i++) {
            threads[i] = new Thread(consumerCode);
            threads[i].start();
        }
    }

The produce method is invoked from a producer thread. Notice that the queue does not manage producer threads; they need to be managed separately:

    public void produce(E value) throws InterruptedException {
        Event event = new Event();
        event.value = value;
        event.eventType = EventType.INVOCATION;
        queue.enqueue(event);
    }

Once a producer thread marks the stream of events to be completed, no more new events could be generated and the dequeuing threads will be terminated after they process all the events:

    public void markCompleted() throws InterruptedException {
        Event event = new Event();
        event.eventType = EventType.COMPLETION;
        queue.enqueue(event);
    }

This is to propagate an exception:

    public void sendError(Exception ex) throws InterruptedException {
        Event event = new Event();
        event.error = ex;
        event.eventType = EventType.ERROR;
        queue.enqueue(event);
    }

If we need to wait for all the dequeuing threads to terminate, we use this:

    public void joinThreads() throws InterruptedException {
        for(Thread t: threads){
            t.join();
        }
    }
}

To see how to use this producer-consumer queue to actually solve a problem, we will consider a dummy problem. We will work on a file—com-orkut.ungraph.txt—that is open to public and contains all the friendships between users in Orkut, which was a social networking site in the past. The file can be downloaded from https://snap.stanford.edu/data/bigdata/communities/com-orkut.ungraph.txt.gz. To protect privacy, all the users are simply referenced by some arbitrary ID and the mapping with the actual users is not shared. We will also use another file called ulist that would contain the list of user IDs we are interested in. Our task is to find the number of friends that each user in the second file has. The following commands show how the two files look:

$ head com-orkut.ungraph.txt 
1	2 
1	3 
1	4 
1	5 
1	6 
1	7 
1	8 
1	9 
1	10 
1	11 
$ head ulist 
2508972 
1081826 
2022585 
141678 
709419 
877187 
1592426 
1013109 
1490560 
623595 

Each line in com-orkut.ungraph.txt has two IDs that are separated by a whitespace. The meaning is that there is friendship between these two users. It is given that each friendship is mentioned only once in the file and is undirected. Note that this means each line should increase the friend count for both the IDs. Each line in ulist has a single ID. All IDs are unique, and we must find the friend count of each of these IDs. Note that some of these have no friends and thus are not mentioned in com-orkut.ungraph.txt.

We will first create a utility class that will let us read integer IDs from the files. The purpose of this class is to read integer values from any text file so that not too many objects are created in the process. This is just to reduce garbage collection to some extent. In this case, we used file-channel-based logic that uses ByteBuffer as a buffer:

public class FileReader {
    ByteBuffer buf= ByteBuffer.allocate(65536);
    FileChannel channel;

The readCount variable keeps track of how many characters are left in the buffer:

    int readCount = 0;

    public FileReader(String filename) throws FileNotFoundException {
        channel = new FileInputStream(filename).getChannel();
        buf.clear();
    }

To read an int, keep reading the bytes in a loop until you hit a byte that is not a digit. In the meantime, keep computing the integer that the string of characters represents:

    public int readIntFromText() throws IOException {
        int value = 0;
        while(true){

First check whether the buffer is empty; if yes, refill it by reading from the file:

            if(readCount<=0){
                buf.clear();
                readCount = channel.read(buf);

If no more bytes are available in the file, don't care to flip the buffer:

                if(readCount<0){
                    break;
                }
                buf.flip();
            }

We read a byte and decrement readCount because now the buffer has one less byte:

            byte nextChar = buf.get();
            readCount--;

If the character is a digit, keep computing the integer; otherwise, break the loop and return the calculated integer value:

            if(nextChar>='0' && nextChar<='9') {
                value = value * 10 + (nextChar - '0');
            }else{
                break;
            }

        }
        return value;
    }
}

With the help of this, we will create a program to create a file output, which will contain the user IDs provided in ulist along with the corresponding friend count. The idea is that reading the file is made asynchronous by computing the friend count. Since the counting involves a binary search, we want two threads doing it instead of one:

public class FriendCountProblem {
    private static final String USER_LIST_FILE = "ulist";
    private static final String EDGES_PATH = "com-orkut.ungraph.txt";
    private static final String OUTPUT_FILE_PATH = "output";

    public static void main(String [] args)
      throws Exception {
        FileReader userListReader = new FileReader(USER_LIST_FILE);

First, we simply count the number of lines present in ulist. This will let us create the correct size array:

        int count = 0;

        while(true){

            int lineValue = userListReader.readIntFromText();
            if(lineValue==0){
                break;
            }
            count++;
        }

We create two arrays: one containing the keys and the other containing the friend count of each of the keys. The counts are stored in AtomicInteger objects so that they can be incremented from multiple threads:

        Integer [] keys = new Integer[count];
        AtomicInteger [] values = new AtomicInteger[count];

We read userIDs from ulist in an array:

        userListReader = new FileReader(USER_LIST_FILE);


        int index = 0;

        while(true){

            int uid = userListReader.readIntFromText();
            if(uid==0){
                break;
            }
            keys[index] = uid;
            values[index] =  new AtomicInteger(0);
            index++;

        }

Now we sort the array of userID so that we can perform binary search on it:

        ArraySorter.quicksort(keys,(a,b)->a-b);

The job of our consumers is to search for each user encountered in com-orkut.ungraph.txt and increment the corresponding count in the array values. Note that creating ProducerConsumerQueue does not start any processing; only consumer threads are created through this. Processing will start only when we produce events, which we will do after reading from com-orkut.ungraph.txt:

        ProducerConsumerQueue<Integer> queue 
                = new ProducerConsumerQueue<>(4092, 2, (v)->{
            int pos  = ArraySearcher.binarySearch(keys,v);
            if(pos<0){
                return;
            }
            values[pos].incrementAndGet();
        });

We use the main thread for producing the events. We use the same FileReader class for reading each user ID separately. This is because both the users in a line in com-orkut.ungraph.txt have a friend (which is the other one in the same line) for each line in the file. So we simply read the users and post them as events so that the consumers can process them:

        FileReader edgeListFileReader = new FileReader(EDGES_PATH);
        while(true){
            int val = edgeListFileReader.readIntFromText();
            if(val == 0){
                break;
            }
            queue.produce(val);
        }

Once we are done processing the entire com-orkut.ungraph.txt file, we simply mark the queue as completed and wait for the consumer threads to be terminated:

        queue.markCompleted();
        queue.joinThreads();

Now all the counts must be updated in the values array. So we simply read them one by one and output them in the file output:

        PrintStream out = new PrintStream(OUTPUT_FILE_PATH);
        for(int i=0;i<count;i++){
            out.println(keys[i] +" : "+values[i].get());
        }
        out.flush();
    }
}

The preceding example demonstrates how an actual problem can be solved using the reactive technique of producer-consumer. Now we will discuss another way of implementing our event queue; it does not involve blocking on semaphores.

Spinlock and busy wait

A semaphore normally blocks a thread before the thread acquires it. This blocking is achieved by the operating system by removing the thread from the list of threads that are ready to be scheduled for processing time on the CPU. The list of threads ready to be scheduled are called running threads. Every semaphore has a list of threads waiting on it, and these threads are removed from the list of running threads. Once the semaphore is released, threads from the list attached to the semaphore are removed and put back on the list of the running threads. This operation is somewhat heavyweight and requires processing time. Another way to stop a thread from accessing a shared resource is to use a spinlock. A spinlock is generally implemented using an atomic variable and compare and set operation. A thread in a spinlock simply tries to perform compare and set on a variable in a loop; it does so until it succeeds. To the operating system, this thread is as good as a running thread and is scheduled just like any other thread. The thread itself, however, keeps trying a compare and set operation and consumes processor time. This is why it is called a busy wait. The thread can proceed to do something meaningful once the compare and set operation is successful. Spinlocks are useful when the resource would not be available only for a short period of time. It simply does not make sense to do all the heavy lifting of removing the thread from the list of running thread and blocking on a semaphore if the resource is unavailable for a brief period of time.

We can implement our thread-safe queue with spinlocks instead of semaphores as shown in the following code. Each array location for storing the queue elements is protected by two AtomicBoolean variables, stored in the enqueueLocks and dequeueLocks arrays. The only thing we want to make sure is that after each dequeue, there should only be a single enqueue, and after each enqueue, there should only be a single dequeue for a particular array location. Different array locations should be independent of one another:

public class ThreadSafeFixedLengthSpinlockQueue<E> {
    int nextEnqueueIndex;
    int nextDequeueIndex;
    E[] store;
    AtomicBoolean[] enqueueLocks;
    AtomicBoolean[] dequeueLocks;
    AtomicInteger currentElementCount = new AtomicInteger(0);
    int length;
    volatile boolean alive = true;
    public ThreadSafeFixedLengthSpinlockQueue(int length){
        this.length = length;
        store = (E[]) new Object[length];
        enqueueLocks = new AtomicBoolean[length];
        dequeueLocks = new AtomicBoolean[length];

When enqueueLocks[i] is false, it means there is no element being stored at the position i. When dequeueLock[i] is true, it means the same thing. The reason we need both is for protection when an element is in the process of being enqueued or dequeued:

        for(int i=0;i<length;i++){
            enqueueLocks[i] = new AtomicBoolean(false);
            dequeueLocks[i] = new AtomicBoolean(true);
        }
    }

Here is the core of the lock. We simply take the next index to enqueue and try to get enqueueLock. If it is false, which means nothing is already enqueued, it is atomically set to true and it starts the enqueue process; otherwise, we keep doing the same thing in a busy loop until the compare and set operation is successful. Once the process is complete, we release dequeueLock by simply setting it to false. A compare and set operation is not necessary here because it is guaranteed to be true. The number of elements are maintained using another atomic variable:

    public void enqueue(E value) throws InterruptedException {

        while (true) {
            int index = nextEnqueueIndex;
            nextEnqueueIndex = (nextEnqueueIndex+1) % length;
            if(enqueueLocks[index].compareAndSet(false,true)){
                currentElementCount.incrementAndGet();
                store[index] = value;
                dequeueLocks[index].set(false);
                return;
            }
        }
    }

The dequeue operation is very similar, just that the enqueue and dequeue locks have switched places:

    public E dequeue() throws InterruptedException {
        while(alive) {
            int index = nextDequeueIndex;
            nextDequeueIndex = (nextDequeueIndex+1) % length;
            if(dequeueLocks[index].compareAndSet(false,true)){
                currentElementCount.decrementAndGet();
                E value = store[index];
                enqueueLocks[index].set(false);
                return value;
            }
        }
        throw new InterruptedException("");
    }

The rest of the code is self-evident:

    public int currentElementCount(){
        return currentElementCount.get();
    }

    public void killDequeuers(){
        alive = false;
    }
  
}

We can simply replace the queue in the ProducerConsumerQueue class to use this spinlock-based queue. In the case of our example problem, the spinlock version of the queue performs better.

Let's solve another problem using ProducerConsumerQueue. Our problem is to find all the perfect numbers between 2 and 500,000. What is a perfect number? A perfect number is a number that is the sum of all its divisors, excluding itself. The first perfect number is 6. 6 has three divisors excluding itself, namely 1, 2, and 3 and 6=1+2+3. This is what makes 6 a perfect number. To find all the perfect numbers between 2 and 500,000, we will check whether each number in the range is a perfect number. We can write the following code to figure out whether a given number is a perfect number. For every number div, we check whether the number x is divisible by div; if so, we add it to the sum. In such a case, if we divide x by div, we will of course get another divisor of x as a result stored in the variable quotient. This must also be added to the sum, unless it is equal to div. We stop this process when we pass through the square root of x, that is, when div is bigger than the quotient we get when x is divided by div. Since we, originally, exclude 1 as a divisor to avoid adding the number itself, we add 1 to the sum at the end and check whether it is equal to x; if so, x is a perfect number:

public static boolean isPerfect(long x){
        long div = 2;
        long sum=0;
        while(true){
            long quotient = x/div;
            if(quotient<div){
                break;
            }
            if(x%div==0){
                sum+=div;
                if(quotient!=div){
                    sum+=quotient;
                }
            }
            div++;
        }
        return 1+sum==x;
    }

As you can see, checking whether a given number is a perfect number is a computationally expensive operation, which makes it desirable to use all the CPUs to compute it. We will use our producer-consumer framework to do this. The code is self-explanatory. Our consumer code simply checks whether a given number is a perfect number and then prints the number if it is so. The producer simply generates and queues all the numbers. Since the consumer is run in multiple threads and it is the part that is computationally intensive, it should work faster than the single-threaded version:

public static void findPerfectNumberWithProducerConsumer() throws InterruptedException{
        long start = System.currentTimeMillis();
        ProducerConsumerQueue<Long> queue 
                 = new ProducerConsumerQueue<>(4096, 4, (x)->{
            if(isPerfect(x)){
                System.out.println(x);
            }
        });

        for(long i=2;i<5_00_000;i++){
            queue.produce(i);
        }
        queue.markCompleted();
        queue.joinThreads();
        System.out.println("Time in ms: "+(System.currentTimeMillis()-start));
    }

Since my computer has four CPU cores, I used four threads to do the heavy lifting. On my computer, this program takes 1,596 milliseconds as compared to 4,002 milliseconds for the single-threaded program, as shown in the following code:

public static void findPerfectNumberWithSingleThread(){
        long start = System.currentTimeMillis();
        for(long i=2;i<5_00_000;i++){
            if(isPerfect(i)){
                System.out.println(i);
            }
        }
        System.out.println("Time in ms: "+(System.currentTimeMillis()-start));
    }
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset