Protocol Buffers is a cross-language data format. Protocol Buffers uses an interface definition file to generate bindings in many languages, including Java.
This recipe will demonstrate how to define a Protocol Buffers message, generate the corresponding Java bindings, and use these bindings to serialize a Java object to HDFS using MapReduce.
You will need to download/compile/install the following:
weblog_entries.txt
, from http://www.packtpub.com/supportTo install GNU C/C++ using Yum, run the following command as the root user from a bash shell:
# yum install gcc gcc-c++ autoconf automake
To compile and install Protocol Buffers, type the following lines of code:
$ cd /path/to/protobuf $ ./configure $ make $ make check # make install # ldconfig
$ mkdir test-protobufs $ mkdir test-protobufs/src $ mkdir test-protobufs/src/proto $ mkdir test-protobufs/src/java $ cd test-protobufs/src/proto
package example; option java_package = "com.packt.hadoop.hdfs.ch2"; option java_outer_classname = "WeblogRecord"; message Record { optional string cookie = 1; required string page = 2; required int64 timestamp = 3; required string ip = 4; }
Save the file as weblog_record.proto
in the test-protobufs/src/proto/
folder.
test-protobufs
folder. WeblogRecord.java
is generated in src/java/
by protoc
:$ cd ../../ $ protoc --proto_path=src/proto/ --java_out=src/java/ src/proto/weblog_record.proto
weblog_entries.txt
from HDFS and use Elephant Bird's LzoProtobufBlockOutputFormat
class to serialize the WeblogRecord
object to an LZO compressed file:public class ProtobufMapper extends Mapper<Object, Text, NullWritable, ProtobufWritable<WeblogRecord.Record>> { private ProtobufWritable<WeblogRecord.Record> protobufRecord = ProtobufWritable.newInstance(WeblogRecord.Record.class); private SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss"); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split(" "); String cookie = tokens[0]; String page = tokens[1]; String date = tokens[2]; String time = tokens[3]; String formatedDate = date + ":" + time; Date timestamp = null; try { timestamp = dateFormatter.parse(formatedDate); } catch(ParseException ex) { return; } String ip = tokens[4]; protobufRecord.set(WeblogRecord.Record.newBuilder() .setCookie(cookie) .setPage(page) .setTimestamp(timestamp.getTime()) .setIp(ip) .build()); context.write(NullWritable.get(), protobufRecord); } }
public class ProtobufWriter extends Configured implements Tool { public int run(String[] args) throws Exception { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Configuration conf = getConf(); Job weblogJob = new Job(conf); weblogJob.setJobName("ProtobufWriter"); weblogJob.setJarByClass(getClass()); weblogJob.setNumReduceTasks(0); weblogJob.setMapperClass(ProtobufMapper.class); weblogJob.setMapOutputKeyClass(LongWritable.class); weblogJob.setMapOutputValueClass(Text.class); weblogJob.setOutputKeyClass(LongWritable.class); weblogJob.setOutputValueClass(Text.class); weblogJob.setInputFormatClass(TextInputFormat.class); weblogJob.setOutputFormatClass( LzoProtobufBlockOutputFormat.class); FileInputFormat.setInputPaths(weblogJob, inputPath); LzoProtobufBlockOutputFormat.setClassConf(WeblogRecord.Record.class, weblogJob.getConfiguration()); LzoProtobufBlockOutputFormat.setOutputPath(weblogJob, outputPath); if(weblogJob.waitForCompletion(true)) { return 0; } return 1; } public static void main( String[] args ) throws Exception { int returnCode = ToolRunner.run( new ProtobufWriter(), args); System.exit(returnCode); } }
The first task is to define and compile a Protocol Buffers message definition. This definition file can be used to generate bindings in any language the Protocol Buffers compiler supports. There are a couple of things to note about the format of the message.
First, the package definition package example;
is not related to Java packages. It is the namespace of the message defined in the *.proto
file. Second, the option java_package
declaration is a Java package definition. Finally, the option java_outer_classname
declaration is the output class name that will be used. Within java_outer_classname
, the
Record
class will be defined.
Next, we wrote a MapReduce application to serialize the WeblogRecord
object generated by the Protocol Buffers compiler. To set up the MapReduce job, we set the input format to read a normal text file.
weblogJob.setInputFormatClass(TextInputFormat.class);
Then, the output format was set to store the records produced from the job in the Protocol Buffers block format, compressed using LZO.
LzoProtobufBlockOutputFormat.setClassConf(WeblogRecord.Record.class, weblogJob.getConfiguration()); LzoProtobufBlockOutputFormat.setOutputPath(weblogJob, outputPath);
In the mapper, we use the
ProtobufWritable
class of Elephant Bird to wrap the
WeblogRecord.Record
object. The ProtobufWritable
class is derived from the WritableComparable
class of Hadoop, which all keys emitted in MapReduce must implement. Every time we generate any type of binding using protoc
, the ProtobufWritable
class helps avoid having to write a custom
WritableComparable
class.
In the mapper, we instantiate a
ProtobufWritable
instance.
private ProtobufWritable<WeblogRecord.Record> protobufRecord = ProtobufWritable.newInstance(WeblogRecord.Record.class);
Then, we call the set method of the protobufRecord
object with a new instance of WeblogRecord.Record
. Finally, the mapper emits the
protobufRecord
object:
protobufRecord.set(WeblogRecord.Record.newBuilder() .setCookie(cookie) .setPage(page) .setTimestamp(timestamp.getTime()) .setIp(ip) .build()); context.write(NullWritable.get(), protobufRecord);