Hadoop enables us to implement and specify custom InputFormat
implementations for our MapReduce computations. We can implement custom InputFormat
implementations to gain more control over the input data as well as to support proprietary or application-specific input data file formats as inputs to Hadoop MapReduce computations. A InputFormat
implementation should extend the org.apache.hadoop.mapreduce.InputFormat<K,V>
abstract class overriding the createRecordReader()
and getSplits()
methods.
In this recipe, we implement a InputFormat
and a RecordReader
for the HTTP log files. This InputFormat
will generate LongWritable
instances as keys and LogWritable
instances as the values.
The following are the steps to implement a custom InputFormat
for the HTTP server log files based on the FileInputFormat
.
LogFileInputFormat
operates on the data in HDFS files. Hence, we implement the LogFileInputFormat
extending the FileInputFormat
.public class LogFileInputFormat extends FileInputFormat<LongWritable, LogWritable>{ public RecordReader<LongWritable, LogWritable>createRecordReader(InputSplit arg0,TaskAttemptContext arg1) throws …… { return new LogFileRecordReader(); } }
LogFileRecordReader
class.public class LogFileRecordReader extends RecordReader<LongWritable, LogWritable>{ LineRecordReader lineReader; LogWritable value; public void initialize(InputSplitinputSplit, TaskAttemptContext attempt)…{ lineReader = new LineRecordReader(); lineReader.initialize(inputSplit, attempt); } public boolean nextKeyValue() throws IOException, ..{ if (!lineReader.nextKeyValue()) return false; String line = lineReader.getCurrentValue().toString(); ……………//Extract the fields from 'line'using a regex value = new LogWritable(userIP, timestamp, request, status, bytes); return true; } public LongWritable getCurrentKey() throws..{ return lineReader.getCurrentKey(); } public LogWritable getCurrentValue() throws ..{ return value; } public float getProgress() throws IOException, ..{ return lineReader.getProgress(); } public void close() throws IOException { lineReader.close(); } }
LogFileInputFormat
as InputFormat
for the MapReduce computation using the Job
object as follows. Specify the input paths for the computations using the underlying FileInputFormat
.Configuration conf = new Configuration(); Job job = new Job(conf, "log-analysis"); …… job.setInputFormatClass(LogFileInputFormat.class); FileInputFormat.setInputPaths(job, new Path(inputPath));
LongWritable
as the input key
type and LogWritable
as the input value
type.public class LogProcessorMap extendsMapper<LongWritable, LogWritable, Text, IntWritable>{public void map(LongWritablekey,LogWritable value, Context context) throws ……{ ………} }
LogFileInputFormat
extends the FileInputFormat
, which provides a generic splitting mechanism for HDFS-file based InputFormat
. We override the createRecordReader()
method in the LogFileInputFormat
to provide an instance of our custom RecordReader
implementation, LogFileRecordReader
. Optionally, we can also override the isSplitable()
method of the FileInputFormat
to control whether the input files are split-up into logical partitions or used as whole files.
public RecordReader<LongWritable, LogWritable>createRecordReader(InputSplit arg0,TaskAttemptContext arg1) throws …… { return new LogFileRecordReader(); }
The LogFileRecordReader
class extends the org.apache.hadoop.mapreduce.RecordReader<K,V>
abstract class and uses LineRecordReader
internally to perform the basic parsing of the input data. LineRecordReader
reads lines of text from the input data.
lineReader = new LineRecordReader(); lineReader.initialize(inputSplit, attempt);
We perform the custom parsing of the log entries of the input data in the nextKeyValue()
method. We use a regular expression to extract the fields out of the HTTP service log entry and populate an instance of the LogWritable
class using those fields.
public boolean nextKeyValue() throws IOException, ..{ if (!lineReader.nextKeyValue()) return false; String line = lineReader.getCurrentValue().toString(); ……………//Extract the fields from 'line' using a regex value = new LogWritable(userIP, timestamp, request,status, bytes); return true; }
We can perform custom splitting of input data by overriding the getSplits()
method of the InputFormat
class. The getSplits()
method should return a list of InputSplit
objects. A InputSplit
object represents a logical partition of the input data and will be assigned to a single Map task.InputSplit
classes extend the InputSplit
abstract class and should override the getLocations()
and getLength()
methods. The getLength()
method should provide the length of the split and the getLocations()
method should provide a list of nodes where the data represented by this split is physically stored. Hadoop uses a list of data local nodes for Map task scheduling. FileInputFormat we use in the above example uses the org.apache.hadoop.mapreduce.lib.input.FileSplit
as the InputSplit
implementation.
You can write InputFormat
implementations for none HDFS data as well. The org.apache.hadoop.mapreduce.lib.db.DBInputFormat
is one example of InputFormat.DBInputFormat
supports reading the input data from a SQL table.