The Spliterator interface

A new interface is added to the java.util package called Spliterator, which, as the name implies, is a new type of iterator that can traverse through a Collection object. The Collection interface is updated to include a new spliterator() method, which returns a Spliterator upon invocation. The Spliterator iterator's strength lies in its ability to split the Collection object, partitioning off some of its elements as another Spliterator component. This gives the ability to perform parallel processing in the Collection items; however, Spliterator alone cannot supply the ability to process the tasks in parallel. The Spliterator ability is to support parallel processing in certain elements of a Collection element. Compared to ArrayList, this is an added ability to split the content of a collection object, which can be executed in parallel additionally to the split operation.
The fork/join framework of Java is one of the implementations that can be combined with the Spliterator to take a unit of work that can be parallelized. This can be achieved by breaking it into a small set of subtasks separately and aggregating the outcomes of these subtasks for obtaining the final output.

The methods that can perform the split and iteration are shown in the following code snippet:

Spliterator<T> trySplit();
boolean tryAdvance(Consumer<? super T> action);

The method trySplit() performs the split operation on the total list of items and returns a subset of it.

tryAdvance() performs the iteration and execution operation; hence, the actual method execution behavior needs to be supplied to this operation.

forEachRemaining() is the optimal way of executing the process to the tryAdvance() method.

Now, let's review a sample implementation of Spliterator to read a large file input and review the processing time.

The first step is to write a base class that implements the Spliterator interface, as shown in the following code snippet:

 

package threads;

import static java.util.Spliterators.spliterator;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public abstract class BaseFixedBatchSpliterator<T> implements Spliterator<T> {
private int groupCount;
private int properties;
private long estimatedSize;

// constructor with no paramters
public BaseFixedBatchSpliterator() {
// set the behaviour of this component
this(IMMUTABLE | ORDERED | NONNULL);
}

// constructor with single parameter properties
public BaseFixedBatchSpliterator(int properties) {
// set the behaviour of this component
this(properties, 128, Long.MAX_VALUE);
}

// constructor with two parameters, properties and groupcount
public BaseFixedBatchSpliterator(int properties, int groupCount) {
// set the behaviour of this component
this(properties, groupCount, Long.MAX_VALUE);
}

// constructor with multiple parameters
public BaseFixedBatchSpliterator(int properties, int groupCount, long estimatedSize) {
this.properties = properties | SUBSIZED;
this.groupCount = groupCount;
this.estimatedSize = estimatedSize;
}

static final class HandlingConsumer<T> implements Consumer<T> {
Object obj;

@Override
public void accept(T obj) {
this.obj = obj;
}
}

@Override
public int characteristics() {
return properties;
}

@Override
public long estimateSize() {
return estimatedSize;
}

@Override
public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED))
return null;
throw new IllegalStateException();
}

@Override
public Spliterator<T> trySplit() {
final HandlingConsumer<T> holdingConsumer =
new HandlingConsumer<>();
if (!tryAdvance(holdingConsumer))
return null;
final Object[] consumerBatch = new Object[groupCount];
int iterator = 0;
do
consumerBatch[iterator] = holdingConsumer.obj;
while (++iterator < groupCount && tryAdvance(holdingConsumer));
if (estimatedSize != Long.MAX_VALUE)
estimatedSize -= iterator;
return spliterator(consumerBatch, 0, iterator,
characteristics() | SIZED);
}
}

The next step is to extend this base component to handle withBatchSize(), as shown in the following code snippet:

package threads;

import static java.util.stream.StreamSupport.stream;

import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class SplitFixedBatchSpliterator<T> extends BaseFixedBatchSpliterator<T> {
private final Spliterator<T> iterSpl;

public static <T> SplitFixedBatchSpliterator<T>
batchedSpliterator(Spliterator<T> toBundle, int groupCount) {
return new SplitFixedBatchSpliterator<>(toBundle, groupCount);
}

public SplitFixedBatchSpliterator(Spliterator<T> toBundle,
int groupCount) {
super(toBundle.characteristics(), groupCount,
toBundle.estimateSize());
this.iterSpl = toBundle;
}

@Override
public void forEachRemaining(Consumer<? super T> action) {
iterSpl.forEachRemaining(action);
}

@Override
public boolean tryAdvance(Consumer<? super T> action) {
return iterSpl.tryAdvance(action);
}

public static <T> Stream<T> withBatchSize(Stream<T> in,
int groupCount) {
return stream(batchedSpliterator(in.spliterator(),
groupCount), true);
}

}

Now it's time to write the main component that reads through the file and reviews the performance, as follows:

package threads;

import static java.util.concurrent.TimeUnit.SECONDS;
import static threads.SplitFixedBatchSpliterator.withBatchSize;

import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

public class SplitIteratorSample {

static double drop;
private static final int NUMBER_THREE = 3, NUMBER_TEN = 10;

public static void main(String[] args) throws IOException {
final Path inputFilePath = generateInput();
for (int i = 0; i < NUMBER_THREE; i++) {
System.out.println("Begin executing the jdk streams");
monitoringExeuction(Files.lines(inputFilePath));
System.out.println("Begin executing the fixed batch streams");
monitoringExeuction(withBatchSize(Files.lines
(inputFilePath),NUMBER_TEN));
}
}

private static Path generateInput() throws IOException {
final Path filePath = Paths.get("fileToRead.txt");
try (PrintWriter pw =
new PrintWriter(Files.newBufferedWriter(filePath))) {
for (int iterC = 0; iterC < 6000; iterC++) {
final String lineText = String.valueOf
(System.nanoTime());
for (int j = 0; j < 15; j++)
pw.print(lineText);
pw.println();
}
}
return filePath;
}

private static long executeStream(String stream) {
final long localBeginTimeOfExecution = System.nanoTime();
double dbl = 0;
for (int iterA = 0; iterA < stream.length(); iterA++)
for (int iterB = 0; iterB < stream.length(); iterB++)
dbl += Math.pow(stream.charAt(iterA), stream.charAt
(iterB) / 32.0);
drop += dbl;
return System.nanoTime() - localBeginTimeOfExecution;
}

private static void monitoringExeuction(
Stream<String> streamInput) throws IOException {
final long beginOfExecution = System.nanoTime();
try (Stream<String> lineOfExecutions = streamInput) {
final long overallTimeOfExeuction =
lineOfExecutions.parallel()
.mapToLong(SplitIteratorSample::executeStream).sum();
final double cpuExecutionTime =
overallTimeOfExeuction, actualTime =
System.nanoTime() - beginOfExecution;
final int processors =
Runtime.getRuntime().availableProcessors();
System.out.println(" Processors : " + processors);
System.out.format(" CPU Execution time : %.2f sn",
cpuExecutionTime / SECONDS.toNanos(1));
System.out.format(" Actual Execution time : %.2f sn",
actualTime / SECONDS.toNanos(1));
System.out.format("CPU utilization percentage : %.2f%%nn",
100.0 * cpuExecutionTime / actualTime / processors);
}
}

}

As you can see, a linear 4x speedup along with the Spliterator implementation can be achieved as compared to single-threaded processing.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset