When reading input, or writing output from a MapReduce application, it is sometimes easier to work with data using an abstract class instead of the primitive
Hadoop Writable classes (for example, Text
and IntWritable
). This recipe demonstrates how to create a custom Hadoop Writable and InputFormat that can be used by MapReduce applications.
You will need to download the Nigeria_ACLE
D_cleaned.tsv
dataset from http://www.packtpub.com/support and place the file into HDFS.
Follow these steps to create custom InputFormat and Writable classes:
WritableComparable
classes. These classes represent the key-value pairs that are passed to the mapper, much as how TextInputFormat
passes LongWritable
and Text
to the mapper.Write the key class:
public class GeoKey implements WritableComparable { private Text location; private FloatWritable latitude; private FloatWritable longitude; public GeoKey() { location = null; latitude = null; longitude = null; } public GeoKey(Text location, FloatWritable latitude, FloatWritable longitude) { this.location = location; this.latitude = latitude; this.longitude = longitude; } //...getters and setters public void readFields(DataInput di) throws IOException { if (location == null) { location = new Text(); } if (latitude == null) { latitude = new FloatWritable(); } if (longitude == null) { longitude = new FloatWritable(); } location.readFields(di); latitude.readFields(di); longitude.readFields(di); } public int compareTo(Object o) { GeoKey other = (GeoKey)o; int cmp = location.compareTo(other.location); if (cmp != 0) { return cmp; } cmp = latitude.compareTo(other.latitude); if (cmp != 0) { return cmp; } return longitude.compareTo(other.longitude); } }
public class GeoValue implements WritableComparable { private Text eventDate; private Text eventType; private Text actor; private Text source; private IntWritable fatalities; public GeoValue() { eventDate = null; eventType = null; actor = null; source = null; fatalities = null; } //...getters and setters public void write(DataOutput d) throws IOException { eventDate.write(d); eventType.write(d); actor.write(d); source.write(d); fatalities.write(d); } public void readFields(DataInput di) throws IOException { if (eventDate == null) { eventDate = new Text(); } if (eventType == null) { eventType = new Text(); } if (actor == null) { actor = new Text(); } if (source == null) { source = new Text(); } if (fatalities == null) { fatalities = new IntWritable(); } eventDate.readFields(di); eventType.readFields(di); actor.readFields(di); source.readFields(di); fatalities.readFields(di); } public int compareTo(Object o) { GeoValue other = (GeoValue)o; int cmp = eventDate.compareTo(other.eventDate); if (cmp != 0) { return cmp; } cmp = eventType.compareTo(other.eventType); if (cmp != 0) { return cmp; } cmp = actor.compareTo(other.actor); if (cmp != 0) { return cmp; } cmp = source.compareTo(other.source); if (cmp != 0) { return cmp; } return fatalities.compareTo(other.fatalities); } }
GeoKey
and GeoValue
instances. This input format extends the Hadoop FileInputFormat
class and returns our own implementation of a RecordReader:public class GeoInputFormat extends FileInputFormat<GeoKey, GeoValue> { @Override public RecordReader<GeoKey, GeoValue> createRecordReader(InputSplit split, TaskAttemptContext context) { return new GeoRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); return codec == null; } }
Nigeria_ACLED_cleaned.tsv
dataset:public class GeoRecordReader extends RecordReader<GeoKey, GeoValue> { private GeoKey key; private GeoValue value; private LineRecordReader reader = new LineRecordReader(); @Override public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException { reader.initialize(is, tac); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { boolean gotNextKeyValue = reader.nextKeyValue(); if(gotNextKeyValue) { if (key == null) { key = new GeoKey(); } if (value == null) { value = new GeoValue(); } Text line = reader.getCurrentValue(); String[] tokens = line.toString().split(" "); key.setLocation(new Text(tokens[0])); key.setLatitude(new FloatWritable(Float.parseFloat(tokens[4]))); key.setLongitude(new FloatWritable(Float.parseFloat(tokens[5]))); value.setActor(new Text(tokens[3])); value.setEventDate(new Text(tokens[1])); value.setEventType(new Text(tokens[2])); try { value.setFatalities(new IntWritable(Integer.parseInt(tokens[7]))); } catch(NumberFormatException ex) { value.setFatalities(new IntWritable(0)); } value.setSource(new Text(tokens[6])); } else { key = null; value = null; } return gotNextKeyValue; } @Override public GeoKey getCurrentKey() throws IOException, InterruptedException { return key; } @Override public GeoValue getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return reader.getProgress(); } @Override public void close() throws IOException { reader.close(); } }
public class GeoFilter extends Configured implements Tool { public static class GeoFilterMapper extends Mapper<GeoKey, GeoValue, Text, IntWritable> { @Override protected void map(GeoKey key, GeoValue value, Context context) throws IOException, InterruptedException { String location = key.getLocation().toString(); if (location.toLowerCase().equals("aba")) { context.write(value.getActor(), value.getFatalities()); } } } public int run(String[] args) throws Exception { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Configuration conf = getConf(); Job geoJob = new Job(conf); geoJob.setNumReduceTasks(0); geoJob.setJobName("GeoFilter"); geoJob.setJarByClass(getClass()); geoJob.setMapperClass(GeoFilterMapper.class); geoJob.setMapOutputKeyClass(Text.class); geoJob.setMapOutputValueClass(IntWritable.class); geoJob.setInputFormatClass(GeoInputFormat.class); geoJob.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(geoJob, inputPath); FileOutputFormat.setOutputPath(geoJob, outputPath); if(geoJob.waitForCompletion(true)) { return 0; } return 1; } public static void main(String[] args) throws Exception { int returnCode = ToolRunner.run(new GeoFilter(), args); System.exit(returnCode); } }
The first task was to define our own Hadoop key and value representations by implementing the
WritableComparable
interface. The WritableComparable
interface allows us to create our own abstract types, which can be used as keys or values by the MapReduce framework.
Next, we created an InputFormat that inherits from the
FileInputFormat
class. The Hadoop FileInputFormat
is the base class for all file-based InputFormats. The InputFormat takes care of managing the input files for a MapReduce job. Since we do not want to change the way in which our input files are split and
distributed across the cluster, we only need to override two methods, createRecordReader()
and isSplitable()
.
The isSplitable()
method is used to instruct the FileInputFormat
class that it is acceptable to split up the input files if there is a codec available in the Hadoop environment to read and split the file. The createRecordReader()
method is used to create a Hadoop RecordReader that processes individual file splits and generates a key-value pair for the mappers to process.
After the GeoInputFormat
class was written, we wrote a RecordReader to process the individual input splits and create GeoKey
and GeoValue
for the mappers. The GeoRecordReader
class reused the Hadoop
LineRecordReader
class to read from the input split. When the LineRecordReader
class completed reading a record from the Nigeria_ACLED_cleaned.tsv
dataset, we created two objects.
These objects are GeoKey
and GeoValue
, which are sent to the mapper.