Map-reduce is a model for processing large sets of data in a parallel, distributed manner. This model consists of a map
method for filtering and sorting data, and a reduce
method for summarizing data. The map-reduce framework is effective because it distributes the processing of a dataset across multiple servers, performing mapping and reduction simultaneously on smaller pieces of the data. Map-reduce provides significant performance improvements when implemented in a multi-threaded manner. In this section, we will demonstrate a technique using Apache's Hadoop implementation. In the Using Java 8 to perform map-reduce section, we will discuss techniques for performing map-reduce using Java 8 streams.
Hadoop is a software ecosystem providing support for parallel computing. Map-reduce jobs can be run on Hadoop servers, generally set up as clusters, to significantly improve processing speeds. Hadoop has trackers that run map-reduce operations on nodes within a Hadoop cluster. Each node operates independently and the trackers monitor the progress and integrate the output of each node to generate the final output. The following image can be found at http://www.developer.com/java/data/big-data-tool-map-reduce.html and demonstrates the basic map-reduce model with trackers.
We are going to show you a very simple example of a map-reduce application here. Before we can use Hadoop, we need to download and extract Hadoop application files. The latest versions can be found at http://hadoop.apache.org/releases.html. We are using version 2.7.3 for this demonstration.
You will need to set your JAVA_HOME
environment variable. Additionally, Hadoop is intolerant of long file paths and spaces within paths, so make sure you extract Hadoop to the simplest directory structure possible.
We will be working with a sample text file containing information about books. Each line of our tab-delimited file has the book title, author, and page count:
Moby Dick Herman Melville 822 Charlotte's Web E.B. White 189 The Grapes of Wrath John Steinbeck 212 Jane Eyre Charlotte Bronte 299 A Tale of Two Cities Charles Dickens 673 War and Peace Leo Tolstoy 1032 The Great Gatsby F. Scott Fitzgerald 275
We are going to use a map
function to extract the title and page count information and then a reduce
function to calculate the average page count of the books in our dataset. To begin, create a new class, AveragePageCount
. We will create two static classes within AveragePageCount
, one to handle the map procedure and one to handle the reduction.
First, we will create the TextMapper
class, which will implement the map
method. This class inherits from the Mapper
class and has two private instance variables, pages
and bookTitle
. pages
is an IntWritable
object and bookTitle
is a Text
object. IntWritable
and Text
are used because these objects will need to be serialized to a byte stream before they can be transmitted to the servers for processing. These objects take up less space and transfer faster than the comparable int
or String
objects:
public static class TextMapper extends Mapper<Object, Text, Text, IntWritable> { private final IntWritable pages = new IntWritable(); private final Text bookTitle = new Text(); }
Within our TextMapper
class we create the map
method. This method takes three parameters: the key
object, a Text
object, bookInfo
, and the Context
. The key allows the tracker to map each particular object back to the correct job. The bookInfo
object contains the text or string data about each book. Context
holds information about the entire system and allows the method to report on progress and update values within the system.
Within the map
method, we use the split
method to break each piece of book information into an array of String
objects. We set our bookTitle
variable to position 0
of the array and set pages
to the value stored in position 2
, after parsing it as an integer. We can then write out our book title and page count information through the context and update our entire system:
public void map(Object key, Text bookInfo, Context context) throws IOException, InterruptedException { String[] book = bookInfo.toString().split(" "); bookTitle.set(book[0]); pages.set(Integer.parseInt(book[2])); context.write(bookTitle, pages); }
Next, we will write our AverageReduce
class. This class extends the Reducer
class and will perform the reduction processes to calculate our average page count. We have created four variables for this class: a FloatWritable
object to store our average page count, a float average
to hold our temporary average, a float count
to count how many books exist in our dataset, and an integer sum
to add up the page counts:
public static class AverageReduce extends Reducer<Text, IntWritable, Text, FloatWritable> { private final FloatWritable finalAvg = new FloatWritable(); Float average = 0f; Float count = 0f; int sum = 0; }
Within our AverageReduce
class we will create the reduce
method. This method takes as input a Text
key, an Iterable
object holding writeable integers representing the page counts, and the Context
. We use our iterator to process the page counts and add each to our sum. We then calculate the average and set the value of finalAvg
. This information is paired with a Text
object label and written to the Context
:
public void reduce(Text key, Iterable<IntWritable> pageCnts, Context context) throws IOException, InterruptedException { for (IntWritable cnt : pageCnts) { sum += cnt.get(); } count += 1; average = sum / count; finalAvg.set(average); context.write(new Text("Average Page Count = "), finalAvg); }
We are now ready to create our main
method in the same class and execute our map-reduce processes. To do this, we need to create a new Configuration
object and a new Job
. We then set up the significant classes to use in our application.
public static void main(String[] args) throws Exception { Configuration con = new Configuration(); Job bookJob = Job.getInstance(con, "Average Page Count"); ... }
We set our main class, AveragePageCount
, in the setJarByClass
method. We specify our TextMapper
and AverageReduce
classes using the setMapperClass
and setReducerClass
methods, respectively. We also specify that our output will have a text-based key and a writeable integer using the setOutputKeyClass
and setOutputValueClass
methods:
bookJob.setJarByClass(AveragePageCount.class); bookJob.setMapperClass(TextMapper.class); bookJob.setReducerClass(AverageReduce.class); bookJob.setOutputKeyClass(Text.class); bookJob.setOutputValueClass(IntWritable.class);
Finally, we create new input and output paths using the addInputPath
and setOutputPath
methods. These methods both take our Job
object as the first parameter and a Path
object representing our input and output file locations as the second parameter. We then call waitForCompletion
. Our application exits once this call returns true:
FileInputFormat.addInputPath(bookJob, new Path("C:/Hadoop/books.txt")); FileOutputFormat.setOutputPath(bookJob, new Path("C:/Hadoop/BookOutput")); if (bookJob.waitForCompletion(true)) { System.exit(0); }
To execute the application, open a command prompt and navigate to the directory containing our AveragePageCount.class
file. We then use the following command to execute our sample application:
hadoop AveragePageCount
While our task is running, we see updated information about our process output to the screen. A sample of our output is shown as follows:
... File System Counters FILE: Number of bytes read=1132 FILE: Number of bytes written=569686 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=7 Map output records=7 Map output bytes=136 Map output materialized bytes=156 Input split bytes=90 Combine input records=0 Combine output records=0 Reduce input groups=7 Reduce shuffle bytes=156 Reduce input records=7 Reduce output records=7 Spilled Records=14 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=11 Total committed heap usage (bytes)=536870912 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=249 File Output Format Counters Bytes Written=216
If we open the BookOutput
directory created on our local machine, we find four new files. Use a text editor to open part-r-00000
. This file contains information about the average page count as it was calculated using parallel processes. A sample of this output follows:
Average Page Count = 673.0 Average Page Count = 431.0 Average Page Count = 387.0 Average Page Count = 495.75 Average Page Count = 439.0 Average Page Count = 411.66666 Average Page Count = 500.2857
Notice how the average changes as each individual process is combined with the other reduction processes. This has the same effect as calculating the average of the first two books first, then adding in the third book, then the fourth, and so on. The advantage here of course is that the averaging is done in a parallel manner. If we had a huge dataset, we should expect to see a noticeable advantage in execution time. The last line of BookOutput
reflects the correct and final average of all seven page counts.