Apache Thrift is a cross-language serialization and RPC services framework. Thrift uses an interface definition file to generate bindings in many languages, including Java.
This recipe demonstrates the defining of a Thrift interface, the generation of the corresponding Java bindings, and the use of these bindings to serialize a Java object to HDFS using MapReduce.
You will need to download/compile/install the following:
weblog_entries.txt
, from http://www.packtpub.com/supportTo compile and install Apache Thrift, first ensure that you have all the required dependencies using Yum:
# yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel openssl-devel
Next, build Elephant Bird.
$ cd /path/to/elephant-bird $ ant
Copy the elephant-bird-X.X.X.jar
file to the classpath of your development environment.
$ mkdir test-thrift $ mkdir test-thrift/src $ mkdir test-thrift/src/thrift $ mkdir test-thrift/src/java $ cd test-thrift/src/thrift
namespace java com.packt.hadoop.hdfs.ch2.thrift struct WeblogRecord { 1: optional string cookie, 2: string page, 3: i64 timestamp, 4: string ip }
Save the file as weblog_record.thrift
in the test-thrift/src/thrift/
folder.
.java
file:# thrift --gen java -o src/java/ src/thrift/weblog_record.thrift
Thrift should have generated a file named WeblogRecord.java
in the src/java/
folder.
weblog_entries.txt
from HDFS and use Elephant-Bird's LzoThriftBlockOutputFormat
class to serialize the WeblogRecord
object to an LZO compressed filepublic class ThriftMapper extends Mapper<Object, Text, NullWritable, ThriftWritable<WeblogRecord>> { private ThriftWritable<WeblogRecord> thriftRecord = ThriftWritable.newInstance(WeblogRecord.class); private WeblogRecord record = new WeblogRecord(); private SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss"); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split(" "); String cookie = tokens[0]; String page = tokens[1]; String date = tokens[2]; String time = tokens[3]; String formatedDate = date + ":" + time; Date timestamp = null; try { timestamp = dateFormatter.parse(formatedDate); } catch(ParseException ex) { return; } String ip = tokens[4]; record.setCookie(cookie); record.setPage(page); record.setTimestamp(timestamp.getTime()); record.setIp(ip); thriftRecord.set(record); context.write(NullWritable.get(), thriftRecord); } }
public class ThriftWriter extends Configured implements Tool { public int run(String[] args) throws Exception { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Configuration conf = getConf(); Job weblogJob = new Job(conf); weblogJob.setJobName("ThriftWriter"); weblogJob.setJarByClass(getClass()); weblogJob.setNumReduceTasks(0); weblogJob.setMapperClass(ThriftMapper.class); weblogJob.setMapOutputKeyClass(LongWritable.class); weblogJob.setMapOutputValueClass(Text.class); weblogJob.setOutputKeyClass(LongWritable.class); weblogJob.setOutputValueClass(Text.class); weblogJob.setInputFormatClass(TextInputFormat.class); weblogJob.setOutputFormatClass( LzoThriftBlockOutputFormat.class); FileInputFormat.setInputPaths(weblogJob, inputPath); LzoThriftBlockOutputFormat.setClassConf( WeblogRecord.class, weblogJob.getConfiguration()); LzoThriftBlockOutputFormat.setOutputPath(weblogJob, outputPath); if (weblogJob.waitForCompletion(true)) { return 0; } return 1; } public static void main( String[] args ) throws Exception { int returnCode = ToolRunner.run( new ThriftWriter(), args); System.exit(returnCode); } }
The first task required us to define and compile a Thrift interface definition. This definition file can be used to generate bindings in any language that Thrift supports.
Next, we used Elephant Bird to build a MapReduce application to serialize the WeblogRecord
object that Thrift generated. To set up the MapReduce job, we set the input format to read a normal text file:
weblogJob.setInputFormatClass(TextInputFormat.class);
Then the output format was configured to use Thrift block format compression with LZO to store the output records.
LzoThriftBlockOutputFormat.setClassConf( WeblogRecord.class, weblogJob.getConfiguration()); LzoThriftBlockOutputFormat.setOutputPath(weblogJob, outputPath);
In the mapper, we use the
ThriftWritable
class of Elephant Bird to wrap the
WeblogRecord
object. The ThriftWritable
class is derived from the
WritableComparable
class of Hadoop, which must be implemented by all the keys emitted in MapReduce. Every time we generate any type of binding using Thrift, the ThriftWritable
class helps avoid having to write a custom
WritableComparable
class.
In the mapper, we instantiate both ThriftWritable
and WeblogRecord
instances:
private ThriftWritable<WeblogRecord> thriftRecord = ThriftWritable.newInstance(WeblogRecord.class); private WeblogRecord record = new WeblogRecord();
Then, we call the set method of the thriftRecord
object with an instance of WeblogRecord
. Finally, the mapper emits the
thriftRecord
object, which contains an instance of WeblogRecord
.
thriftRecord.set(record); context.write(NullWritable.get(), thriftRecord);