The description from the Apache Avro site defines Avro as a "data serialization system". Apache Avro supports a language-independent file format and includes serialization and RPC mechanisms. One of the neat features of Avro is that you do not need to compile any type of interface or protocol definition files in order to use the serialization features of the framework.
In this recipe, we will use Avro to serialize and write Java objects to a file in HDFS using MapReduce.
You will need to download/compile/install the following:
avro
and the avro-mapred
JAR files, from http://avro.apache.orgweblog_entries.txt
, from http://www.packtpub.com/supportweblog_entries.txt
dataset:public class WeblogRecord { private String cookie; private String page; private Date date; private String ip; public WeblogRecord() { } public WeblogRecord(String cookie, String page, Date date, String ip) { this.cookie = cookie; this.page = page; this.date = date; this.ip = ip; } //getters and setters @Override public String toString() { return cookie + " " + page + " " + date.toString() + " " + ip; } }
IdentityMapper
, we will write a mapper that reads a row from weblog_entries.txt
and creates an instance of WeblogRecord
.public class WeblogMapper extends MapReduceBase implements Mapper<LongWritable, Text, AvroWrapper, NullWritable> { private AvroWrapper<WeblogRecord> outputRecord = new AvroWrapper<WeblogRecord>(); private WeblogRecord weblogRecord = new WeblogRecord(); SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss"); public void map(LongWritable key, Text value, OutputCollector<AvroWrapper, NullWritable> oc, Reporter rprtr) throws IOException { String[] tokens = value.toString().split(" "); String cookie = tokens[0]; String page = tokens[1]; String date = tokens[2]; String time = tokens[3]; String formattedDate = date + ":" + time; Date timestamp = null; try { timestamp = dateFormatter.parse(formattedDate); } catch(ParseException ex) { // ignore records with invalid dates return; } String ip = tokens[4]; weblogRecord.setCookie(cookie); weblogRecord.setDate(timestamp); weblogRecord.setIp(ip); weblogRecord.setPage(page); outputRecord.datum(weblogRecord); oc.collect(outputRecord, NullWritable.get()); } }
WeblogRecord
object:public class AvroWriter extends Configured implements Tool { public int run(String[] args) throws Exception { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Schema schema = ReflectData.get().getSchema(WeblogRecord.class); Configuration conf = getConf(); JobConf weblogJob = new JobConf(conf, getClass()); weblogJob.setJobName("Avro Writer"); weblogJob.setNumReduceTasks(0); weblogJob.setMapperClass(WeblogMapper.class); weblogJob.setMapOutputKeyClass(AvroWrapper.class); weblogJob.setMapOutputValueClass(NullWritable.class); weblogJob.setInputFormat(TextInputFormat.class); AvroJob.setOutputSchema(weblogJob, schema); FileInputFormat.setInputPaths(weblogJob, inputPath); FileOutputFormat.setOutputPath(weblogJob, outputPath); RunningJob job = JobClient.runJob(weblogJob); if(job.isSuccessful()) { return 0; } return 1; } public static void main(String[] args) throws Exception { int returnCode = ToolRunner.run(new AvroWriter(), args); System.exit(returnCode); } }
The AvroWriter
MapReduce job reads a plain text file and serializes the WeblogRecord
class into an Avro file. The first step is to set up a MapReduce job to read the text file and write the output file using the Avro file format.
Set the input format to read a text file:
weblogJob.setInputFormat(TextInputFormat.class);
Build an Avro schema based on the
WeblogRecord
class, and then set the output schema:
Schema schema = ReflectData.get().getSchema(WeblogRecord.class); AvroJob.setOutputSchema(weblogJob, schema);
Next, we use the old Hadoop MapReduce API to write the mapper and emit the WeblogRecord
object by using the
AvroWrapper
class.
Members emitted of the WeblogMapper
class are:
private AvroWrapper<WeblogRecord> outputRecord = new AvroWrapper<WeblogRecord>(); private WeblogRecord weblogRecord = new WeblogRecord();
Data emitted from the
WeblogMapper
map()
method are:
outputRecord.datum(weblogRecord); oc.collect(outputRecord, NullWritable.get());
The output of this map-only job is stored in the Avro file format.
To read the Avro file format produced by the
AvroWriter
job, we just need to change the input format and the mapper class. First, set the input format and the input schema.
JobConf weblogJob = new JobConf(conf, getClass()); Schema schema = ReflectData.get().getSchema(WeblogRecord.class); AvroJob.setReflect(weblogJob);
Next, create a mapper class with the following definition:
public class WeblogMapperAvro extends MapReduceBase implements Mapper<AvroWrapper<WeblogRecord>, NullWritable, Text, NullWritable> { public void map(AvroWrapper<WeblogRecord> key, NullWritable value, OutputCollector<Text, NullWritable> oc, Reporter rprtr) throws IOException { WeblogRecord weblogRecord = key.datum(); //process the web log record } }