Adding support for new input data formats – implementing a custom InputFormat

Hadoop enables us to implement and specify custom InputFormat implementations for our MapReduce computations. We can implement custom InputFormat implementations to gain more control over the input data as well as to support proprietary or application-specific input data file formats as inputs to Hadoop MapReduce computations. A InputFormat implementation should extend the org.apache.hadoop.mapreduce.InputFormat<K,V> abstract class overriding the createRecordReader() and getSplits() methods.

In this recipe, we implement a InputFormat and a RecordReader for the HTTP log files. This InputFormat will generate LongWritable instances as keys and LogWritable instances as the values.

How to do it...

The following are the steps to implement a custom InputFormat for the HTTP server log files based on the FileInputFormat.

  1. LogFileInputFormat operates on the data in HDFS files. Hence, we implement the LogFileInputFormat extending the FileInputFormat.
    public class LogFileInputFormat extends    FileInputFormat<LongWritable, LogWritable>{
    
      public RecordReader<LongWritable, LogWritable>createRecordReader(InputSplit arg0,TaskAttemptContext arg1) throws …… {
        return new LogFileRecordReader();
      }
    
    }
  2. Implement the LogFileRecordReader class.
    public class LogFileRecordReader extends RecordReader<LongWritable, LogWritable>{
    
      LineRecordReader lineReader;
      LogWritable value;
      
      public void initialize(InputSplitinputSplit, TaskAttemptContext attempt)…{
        lineReader = new LineRecordReader();
        lineReader.initialize(inputSplit, attempt);
      }
    
      public boolean nextKeyValue() throws IOException, ..{
        if (!lineReader.nextKeyValue())
          return false;
        
        String line = lineReader.getCurrentValue().toString();
        ……………//Extract the fields from 'line'using a regex
        
        value = new LogWritable(userIP, timestamp, request,
            status, bytes);
        return true;
      }
      
      public LongWritable getCurrentKey() throws..{
        return lineReader.getCurrentKey();
      }
    
      public LogWritable getCurrentValue() throws ..{
        return value;
      }
    
      public float getProgress() throws IOException, ..{
        return lineReader.getProgress();
      }
      
      public void close() throws IOException {
        lineReader.close();    
      }
    }
  3. Specify LogFileInputFormat as InputFormat for the MapReduce computation using the Job object as follows. Specify the input paths for the computations using the underlying FileInputFormat.
    Configuration conf = new Configuration();
    Job job = new Job(conf, "log-analysis");
    ……
    job.setInputFormatClass(LogFileInputFormat.class);  
    FileInputFormat.setInputPaths(job, new Path(inputPath));
  4. Make sure the mappers of the computation use LongWritable as the input key type and LogWritable as the input value type.
    public class LogProcessorMap extendsMapper<LongWritable, LogWritable, Text, IntWritable>{public void map(LongWritablekey,LogWritable value, Context context) throws ……{
        ………}
    }

How it works...

LogFileInputFormat extends the FileInputFormat, which provides a generic splitting mechanism for HDFS-file based InputFormat. We override the createRecordReader() method in the LogFileInputFormat to provide an instance of our custom RecordReader implementation, LogFileRecordReader. Optionally, we can also override the isSplitable() method of the FileInputFormat to control whether the input files are split-up into logical partitions or used as whole files.

public RecordReader<LongWritable, LogWritable>createRecordReader(InputSplit arg0,TaskAttemptContext arg1) throws …… {
    return new LogFileRecordReader();
}

The LogFileRecordReader class extends the org.apache.hadoop.mapreduce.RecordReader<K,V> abstract class and uses LineRecordReader internally to perform the basic parsing of the input data. LineRecordReader reads lines of text from the input data.

    lineReader = new LineRecordReader();
    lineReader.initialize(inputSplit, attempt);   

We perform the custom parsing of the log entries of the input data in the nextKeyValue() method. We use a regular expression to extract the fields out of the HTTP service log entry and populate an instance of the LogWritable class using those fields.

  public boolean nextKeyValue() throws IOException, ..{
    if (!lineReader.nextKeyValue())
      return false;
    
    String line = lineReader.getCurrentValue().toString();
    ……………//Extract the fields from 'line' using a regex
    
    value = new LogWritable(userIP, timestamp, request,status, bytes);
    return true;
  }

There's more...

We can perform custom splitting of input data by overriding the getSplits() method of the InputFormat class. The getSplits() method should return a list of InputSplit objects. A InputSplit object represents a logical partition of the input data and will be assigned to a single Map task.InputSplit classes extend the InputSplit abstract class and should override the getLocations() and getLength() methods. The getLength() method should provide the length of the split and the getLocations() method should provide a list of nodes where the data represented by this split is physically stored. Hadoop uses a list of data local nodes for Map task scheduling. FileInputFormat we use in the above example uses the org.apache.hadoop.mapreduce.lib.input.FileSplit as the InputSplit implementation.

You can write InputFormat implementations for none HDFS data as well. The org.apache.hadoop.mapreduce.lib.db.DBInputFormat is one example of InputFormat.DBInputFormat supports reading the input data from a SQL table.

See also

  • Choosing a suitable Hadoop InputFormat for your input data format
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset