CPUs are not getting any faster, so manufacturers are adding more cores to the processors. That means that single-threaded applications are not able to leverage the "parallelization" offered by a multi-core processor. But how to put those cores to work?
The concept of parallelization is based on the assumption that often large problems can be divided into smaller ones, which are solved "in parallel". The smaller task execution can be spread through several cores to complete the main task faster.
Concurrent programming is not easy, mostly because of synchronization issues and the pitfalls of shared data. Historically Java has offered excellent support for multi-threaded programming, partially shielding the developer from the complexity of writing code that runs many tasks in parallel.
One of the most useful algorithms to successfully leverage multiple cores is "fork/join". The "fork/join" algorithms essentially divide a problem into many smaller subproblems and apply the same algorithm to each of the subproblems recursively. Once the subproblem becomes small enough it is resolved directly.
Hierarchical problems such as sort algorithms or file system/tree navigation greatly benefit from "fork/join" (also known as divide and conquer).
This recipe will show how to use the "fork/join" implementation of GPars to calculate the frequency of words in a large text.
Let's add a class that contains the logic to execute the frequency counting algorithm using "fork/join".
src/main/groovy/org/groovy/cookbook
directory created in the Processing collections concurrently recipe.package org.groovy.cookbook import static groovyx.gpars.GParsPool.runForkJoin import static groovyx.gpars.GParsPool.withPool import static com.google.common.collect.Lists.* class WordAnalyzer { static final Integer THRESHOLD = 50000 static final int MAX_THREAD = 8 private Map calculateFrequency(List<String> words) { def frequencies = [:] words.each { Integer num = frequencies.get(it) frequencies.put(it, num ? num + 1 : 1) } frequencies } Map frequency(List<String> tokens) { def frequencyMap = [:] def maps withPool(MAX_THREAD) { maps = runForkJoin(tokens) { words -> if (words.size() <= THRESHOLD) { // No parallelism return calculateFrequency(words) } else { partition(words, THRESHOLD).each { sublist -> forkOffChild(sublist) } // Collect all results. return childrenResults } } } maps.each { frequencyMap.putAll(it) } // Reverse sort. frequencyMap.sort { a,b -> b.value <=> a.value } } }
src/main/groovy/org/groovy/cookbook
folder:package org.groovy.cookbook import org.junit.* import edu.stanford.nlp.process.PTBTokenizer import edu.stanford.nlp.process.CoreLabelTokenFactory import edu.stanford.nlp.ling.CoreLabel class WordAnalyzerTest { @Test void testFrequency() { def bigText = 'http://norvig.com/big.txt'.toURL() def wa = new WordAnalyzer3() def tokens = tokenize(bigText.text) long start = System.currentTimeMillis() def m = wa.frequency(tokens) def timeSpent = (System.currentTimeMillis() - start) println "Execution time: ${timeSpent}ms" println 'For calculating frequency over: ' println "${tokens.size()} tokens" m.sort{ -it.value }.each { if (it.value > 50) { println it } } } def tokenize(String txt) { List<String> words = [] PTBTokenizer ptbt = new PTBTokenizer( new StringReader(txt), new CoreLabelTokenFactory(), '' ) ptbt.each { entry -> words << entry.value() } words } }
Execution time: 1007ms For calculating frequency over: 1297801 tokens the=1799 ,=1720 of=1301 .=967 and=907 to=745 that=505 a=456 in=443 is=436 it=266 not=234 as=231 or=204 ...
The code in step 1 requires a bit of explanation. The core function is calculateFrequency
. The function creates a Map
containing the frequency of the words found in the List
passed as an argument.
Once the Map
is constructed from the frequency calculation, it looks as follows:
John | 10 Michael | 8 Jeff | 4
The first column contains the words found in the analyzed document and the right column contains the number of times the word is found.
The algorithm is not very sophisticated and can probably be improved, but this is not the focus of the recipe.
The frequency
function contains the call to the "fork/join" API. The code follows a simple pattern. If the job is small enough, then execute it directly. Otherwise, split the job and apply the calculateFrequency
algorithm to the smaller chunks (in this case, sublists of words).
The smaller jobs are executed by concurrent threads. The number of these threads can be tuned by modifying the value passed to the withPool
method.
Things get interesting in the else
branch of the condition that determines if the job has to be divided into chunks.
The partition
function is statically imported from the com.google.common.collect.Lists
class, which belongs to the amazing Guava library. It slices up a list and returns consecutive sublists of the list, each of the same size (the last sublist may be smaller).
For each sublist, the forkOffChild
method is invoked. This method returns immediately and schedules a task for execution (the task in this case is the calculateFrequency
function).
When all the tasks are eventually executed, we can collect and return them by calling childrenResults
, which contains a list of whatever is returned by runForkJoin
. The runForkJoin
factory method will execute the provided recursive code along with the supplied values and build a hierarchical "fork/join" calculation. The number of values passed to the runForkJoin
method has to match the number of the closure's expected parameters, as well as the number of arguments passed into the
forkOffChild
method.
The last segment of the frequency
function iterates on the Maps
returned by the parallel computation and adds them to a master Map
, which contains the full list of words, ordered by frequency.
The test executes the following steps:
frequency
method and compute the time it took to run the frequency operation;The divide and conquer algorithm is approximately 25 percent to 50 percent faster than the same single threaded algorithm. It is necessary to fiddle around with the maximum number of threads and the optimal threshold (the size of each chunk) to achieve the best throughput. Results may vary depending on the hardware and the size of the token list.
This recipe should serve as a basis for better understanding when this algorithm can be put into practice.