Functional way of reactive programming

Most reactive programming frameworks provide functional APIs for reactive programming, which makes it even easier to work with. In this section, we will build a functional reactive API and solve a problem with it. The idea is to use the concept of a stream. A stream is a data generator or source that can provide input when requested. Functional APIs provide map, filter, and consume operations on the stream. The map and the filter operations create a new stream, and the consume operation gives a EventConsumer instance. The idea is that when EventConsumer is asked to start processing, it would spawn its own producer threads and consumer threads and treat each map, filter, or consume operations as a separately scheduled operation in a producer-consumer queue. This is just to highlight what we are really trying to achieve.

For example, I will put the code to use the functional API to solve the same perfect number problem. We will replace the pseudo-method someWayCreateAStream with the actual code to create a stream later. The point is to show how an event stream can be manipulated using the map, filter, and consume method. The processing really starts when the process method is called, and in each step of map, filter, and consume are decoupled processing steps and are potentially run in different threads:

  public static void findPerfectNumbersWithFunctionalAPI(){
        EventStream<Long> stream = someWayCreateAStream();
        stream.filter((x)->x>1)
                .filter(EventStream::isPerfect)
                .consume((x)->{System.out.println(x);})
                .onError((x)->System.out.println(x))
                .process(4096,1,4);

    }

When we create an instance of EventStream or EventConsumer, no processing happens; only metadata is created. It is when the method process is invoked that the processing starts. This is done by the process method spawning the producer and consumer threads. The producer threads create and enqueue events that contain the initial value and the processing code (like map, filter, or consume operations). A dequeuer runs the first piece of processing and enqueues another event for the next level of processing; it does this for map and filter operations. A consume operation is the end of the processing chain, and it does not return any value. This is when no more events are scheduled.

This requires that a dequeuer thread must also do some enqueue operations. What can be a problem with this? There are two kinds of threads that enqueue. One of these threads is responsible for dequeueing too. These threads may get blocked while trying to perform enqueue operations when the queue is full. But this would mean that they would not be able to do any dequeue operation either; this is because if they do this, the queue would never have more space again. This situation is a deadlock; all the threads are blocked and are expecting other threads to do something to unblock them.

To see why this deadlock happens, let's imagine a queue with length 4. Suppose there are two dequeuing threads that also perform an enqueue operation once in some cases. Let's have another enqueuer thread too. Since the threads can run in any order, it is possible that the enqueuer runs first and enqueues four new elements to make the queue full. Now say two dequeuers run, each dequeuing one element. Before these threads get a chance to enqueue once more, the enqueuer thread is run again, and this time it enqueues two new elements to fill the queue. Now the dequeuer threads are run, but they are blocked because the queue is full. They cannot even dequeue any element because they are themselves blocked from enqueuing more elements. This is a deadlock situation. Figure 2 shows this situation:

Functional way of reactive programming

What we really want is threads that will not only perform the enqueue operation, but also block the queue before it is completely full. This is so that the dequeueing threads can use some space to keep dequeuing and enqueuing until they reach a point where they will not have to enqueue anymore (because they have reached the last step of the processing chain). Eventually, the queue gets empty and the enqueuer threads can be unblocked again. To do this, we need to have two different kinds of enqueue operations. One that does not block until the queue is full, and another that blocks once the queue is half or more full. We can implement the second type using the following code in the ThreadSafeFixedLengthSpinlockQueue class. The enqueueProducerOnly method is just like the enqueue method, except it performs an atomic check of the currentElementCount variable instead of just incrementing it. If, while enqueueing, it is seen that the queue is already full, we release the enqueue lock and restart. The thread that does only enqueue operations and no dequeue operation must use this method instead of the regular enqueue method:

public void enqueueProducerOnly(E value ) throws InterruptedException{
        int halfLength = length/2;
        while (true) {

            int index = nextEnqueueIndex;
            nextEnqueueIndex = (nextEnqueueIndex+1) % length;
            if(enqueueLocks[index].compareAndSet(false,true)){
                int numberOfElements = currentElementCount.get();
                if(numberOfElements>=halfLength
                   || (!currentElementCount.compareAndSet(numberOfElements, numberOfElements+1))){
                    enqueueLocks[index].set(false);
                    continue;
                }
                store[index] = value;
                dequeueLocks[index].set(false);
                return;
            }
        }
    }

We can now use this method to implement a corresponding method in the ProducerConsumerQueue class. This method is exactly the same as the produce method, except that here, the call to enqueue has been replaced by a call to the enqueueProducerOnly method:

public void produceExternal(E value) throws InterruptedException {
        Event event = new Event();
        event.value = value;
        event.eventType = EventType.INVOCATION;
        queue.enqueueProducerOnly(event);
 }

Now let's see the EventStream class. The whole point of the EventStream class is to create metadata in a functional way. It is an abstract class with only one abstract method called read(). A call to the read method should return the next object that needs to be processed. The class maintains a pointer to the previous EventStream on which this EventStream will work. This means that the operation represented by EventStream will work on the data obtained after all the previous EventStream have been processed. It is really a linked list of EventStream. Depending on the kind of operation the current EventStream represents, it either has a mapper, a filter, or nothing. The read method is applicable only to the first EventStream that generates the data. Both the map filter methods return another EventStream that represents the corresponding processing. After all the map and filter calls, the list linked by EventStream will store all the operations from the last to the first:

public abstract class EventStream<E> {
    EventStream previous;
    OneArgumentExpressionWithException mapper;
    OneArgumentExpressionWithException filter;
    public <R> EventStream<R> map(OneArgumentExpressionWithException<E,R> mapper){
        EventStream<R> mapped = new EventStream<R>() {

            @Override
            public R read() {
                return null;
            }
        };
        mapped.mapper = mapper;
        mapped.previous = this;
        return mapped;
    }
    public EventStream<E> filter(OneArgumentExpressionWithException<E, Boolean> filter){
        EventStream<E> mapped = new EventStream<E>() {

            @Override
            public E read() {
                return null;
            }
        };
        mapped.filter = filter;
        mapped.previous = this;
        return mapped;
    }

The consume method, however, returns an instance of EventConsumer. This is the terminal processing in any chain that does not compute a new value. The EventConsumer class, as would be shown a little later, contains all of the logic to actually start the processing:

    public EventConsumer<E> consume(
      OneArgumentStatementWithException<E> consumer){
            EventConsumer eventConsumer = new EventConsumer(consumer, this) {
        };
        return eventConsumer;
    }
    public abstract E read();
}

Since we need to store the details of the processing inside an EventConsumer instance, we will first make a few classes to store this information. The first one is a Task interface that represents any of the map, filter, or consume operation:

public interface Task {
}

This interface is implemented by three classes that represent each kind of operation. To store the code, we need two additional functional interfaces that represent an expression and a statement that would allow you to throw exceptions:

@FunctionalInterface
public interface OneArgumentExpressionWithException<A,R> {
    R compute(A a) throws Exception;
}
@FunctionalInterface
public interface OneArgumentStatementWithException<E> {
    void doSomething(E input) throws Exception;
}

The following classes implement the Task interface:

public class MapperTask implements Task {
    OneArgumentExpressionWithException mapper;
    Task nextTask;

    public MapperTask(
            OneArgumentExpressionWithException mapper,
            Task nextTask) {
        this.mapper = mapper;
        this.nextTask = nextTask;
    }

}

public class FilterTask implements Task{
    OneArgumentExpressionWithException filter;
    Task nextTask;

    public FilterTask(
            OneArgumentExpressionWithException filter,
            Task nextTask) {
        this.filter = filter;
        this.nextTask = nextTask;
    }
}

Both MapperTask and FilterTask have a pointer to the next task because they are intermediate operations. They also store the piece of code associated with the processing. The ProcessorTask represents the terminal operation, so it does not have a pointer to the next task:

public class ProcessorTask<E> implements Task{
    OneArgumentStatementWithException<E> processor;

    public ProcessorTask(
            OneArgumentStatementWithException<E> processor) {
        this.processor = processor;
    }
}

We will now create the EventConsumer class that will create a task chain and run it:

public abstract class EventConsumer<E> {
    OneArgumentStatementWithException consumptionCode;
    EventStream<E> eventStream;
    Task taskList = null;
    private ProducerConsumerQueue<StreamEvent> queue;
    private OneArgumentStatement<Exception> errorHandler = (ex)->ex.printStackTrace();

A StreamEvent is a processing request that is an element of the producer-consumer queue. It stores value as Object and task. The task can have more tasks pointed to by its next reference:

    class StreamEvent{
        Object value;
        Task task;
    }

An EventStream stores its previous operation—that is to say that if we read the head of the list, which would be the last operation. Of course, we need to arrange the operations in the order of execution and not in reverse order. This is what the eventStreamToTask method does. A MapperTask or FilterTask stores the next operation, so the head of the list is the first operation to be carried out:

        private Task eventStreamToTask(EventStream stream){
        Task t = new ProcessorTask(consumptionCode);
        EventStream s = stream;
        while(s.previous !=null){
            if(s.mapper!=null)
                t = new MapperTask(s.mapper, t);
            else if(s.filter!=null){
                t = new FilterTask(s.filter, t);
            }
            s = s.previous;
        }
        return t;
    }

The constructor is package-accessible; it is intended to be initialized only from inside the consume method of an EventStream:

    EventConsumer(
            OneArgumentStatementWithException consumptionCode,
            EventStream<E> eventStream) {
        this.consumptionCode = consumptionCode;
        this.eventStream = eventStream;
        taskList = eventStreamToTask(eventStream);
    }

The following is the piece of code responsible for actually carrying out the operations. The ConsumerCodeContainer class implements Consumer and acts as the consumer of the producer-consumer queue for processing events:

    class ConsumerCodeContainer implements Consumer<StreamEvent>{
        @Override
        public void onError(Exception error) {
            errorHandler.doSomething(error);
        }

The onMessage method is invoked for every event in the producer-consumer queue. Based on the actual task, it takes the corresponding action. Notice that for MapperTask and FilterTask, a new event is enqueued with the next operation:

        @Override
        public void onMessage(StreamEvent evt) {

The ProcessorTask is always the end of a processing chain. The operation is simply invoked on the value and no new event is queued:

            if(evt.task instanceof ProcessorTask){
                try {
                    ((ProcessorTask) evt.task).processor
                            .doSomething(evt.value);
                } catch (Exception e) {
                    queue.sendError(e);
                }
            }

For a FilterTask, the event with the next task is enqueued only if the condition is satisfied:

            else if(evt.task instanceof FilterTask){
                StreamEvent nextEvent = new StreamEvent();
                try {
                    if((Boolean)((FilterTask) evt.task).filter.compute(evt.value)) {
                        nextEvent.task =
                                ((FilterTask) evt.task).nextTask;
                        nextEvent.value = evt.value;
                        queue.produce(nextEvent);
                    }
                } catch (Exception e) {
                    queue.sendError(e);
                }
            }

For a MapperTask, the next task is enqueued with the value computed by the current map operation:

             else if(evt.task instanceof MapperTask){
                StreamEvent nextEvent = new StreamEvent();
                try {
                    nextEvent.value = ((MapperTask) evt.task).mapper.compute(evt.value);
                    nextEvent.task = ((MapperTask) evt.task).nextTask;
                    queue.produce(nextEvent);
                } catch (Exception e) {
                    queue.sendError(e);
                }
            }
        }
    }

The process method is responsible for kicking the actual processing of the tasks. It uses a ProducerConsumerQueue to schedule events that are processed by the consumer previously discussed:

    public void process(int bufferSize, int numberOfProducerThreads, int numberOfConsumerThreads) {
      queue = new ProducerConsumerQueue<>(bufferSize,
      numberOfConsumerThreads, new ConsumerCodeContainer());

Only the original EventStream on which map and filter were called has the read method implemented. So we simply get a reference to the original EventStream:

        EventStream s = eventStream;
        while(s.previous !=null){
            s = s.previous;
        }

The startingStream variable points to the original EventStream:

        EventStream startingStream = s;

The producer code also runs in separate threads. The Runnable producerRunnable contains the producer code. It simply keeps calling the read method of the EventStream until null is returned (which marks the end of the stream) and enqueues a StreamEvent with the value and the task chain we have created with the help of the eventStreamToTask method:

        Runnable producerRunnable = ()->{
            while(true){
                Object value = startingStream.read();
                if(value==null){
                    break;
                }
                StreamEvent nextEvent = new StreamEvent();
                try {
                    nextEvent.value = value;
                    nextEvent.task = taskList;
                    queue.produceExternal(nextEvent);
                } catch (Exception e) {
                    queue.sendError(e);
                }
            }
            try {
                queue.markCompleted();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

Now we spawn the producer threads and wait for them to finish with the join calls:

        Thread [] producerThreads = new Thread[numberOfProducerThreads];
        for(int i=0;i<numberOfProducerThreads;i++){
            producerThreads[i] = new Thread(producerRunnable);
            producerThreads[i].start();
        }
        for(int i=0;i<numberOfProducerThreads;i++){
            try {
                producerThreads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

This is a method to register a custom error handler and return a new EventConsumer:

    public EventConsumer<E> onError(
               OneArgumentStatement<Exception> errorHandler){
        EventConsumer<E> consumer 
         = new EventConsumer<E>(consumptionCode, eventStream) {};
        consumer.taskList = taskList;
        consumer.errorHandler = errorHandler;
        return consumer;
    }
}

Going back to our original problem of perfect numbers, all we have to do now is to define an EventStream with a read method that generates all the numbers and then does maps and filters on them as follows. Notice that the EventStream.read() method may be invoked by multiple threads simultaneously if we use more than one producer thread, so it is better for it to be thread-safe.

The read method simply increments an AtomicLong and returns the previous value, unless the previous value is greater than 5_00_000L; in this case, it returns null, marking the end of the stream. We have already seen the rest of the code:

  public static void findPerfectNumbersWithFunctionalAPI(){
        long start = System.currentTimeMillis();
        EventStream<Long> stream = new EventStream<Long>() {
            AtomicLong next = new AtomicLong(0L);
            @Override
            public Long read() {
                Long ret = next.incrementAndGet();
                if(ret<=5_00_000L){
                    return ret;
                }
                return null;
            }
        };
        stream.filter((x)->x>1)
                .filter(EventStream::isPerfect)
                .consume((x)->{System.out.println(x);})
                .onError((x)->System.out.println(x))
                .process(4096,1,4);

        System.out.println("Time in ms: "+(System.currentTimeMillis()-start));
    }

This code runs for almost the same time as the previous reactive version without a functional API. I will leave it up to you to use the functional API to implement the friend count solution, as it is fairly simple as one gets the hang of it. All you need to think about is how to implement the read method to return the integers from the file.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset