Key-value stores are an efficient tool for storing large datasets. In MapReduce, we can use key-value stores to house large datasets that might not fit into the memory of a mapper or mappers (remember that multiple mappers can run on the same slave node), but can fit into the main memory of the server.
In this recipe, we will demonstrate how to use Redis to perform a map-side join using MapReduce.
First, download and install Redis. This book used Redis version 2.4.15. A quick start guide is available on the Redis website, http://redis.io/topics/quickstart. Once you have compiled and installed the Redis server, start the server by issuing the following command:
$ redis-server
Verify that the Redis server is working properly by using redis-cli
:
$ redis-cli ping
Redis should respond with the message "PONG
" if everything has been set up properly.
Next, you will need to download and compile Jedis from https://github.com/xetorthio/jedis. Jedis is a Redis Java client that we will use in our MapReduce application to communicate with Redis. This book used Jedis version 2.1.0.
Finally, download the apache_nobots_tsv.txt
and nobots_ip_country_tsv.txt
datasets from http://www.packtpub.com/support. Place the apache_nobots_tsv.txt
file into HDFS, and leave the nobots_ip_country_tsv.txt
file in the folder that you are working on.
Follow these steps to join data in MapReduce using Redis:
nobots_ip_country_tsv.txt
file fromthe folder that you are working on, and load its contents to Redis using the Jedis client:private void loadRedis(String ipCountryTable) throws IOException { FileReader freader = new FileReader(ipCountryTable); BufferedReader breader = new BufferedReader(freader); jedis = new Jedis("localhost"); jedis.select(0); jedis.flushDB(); String line = breader.readLine(); while(line != null) { String[] tokens = line.split(" "); String ip = tokens[0]; String country = tokens[1]; jedis.set(ip, country); line = breader.readLine(); } System.err.println("db size = " + jedis.dbSize()); }
loadRedis()
method we created in step 1:public class MapSideJoinRedis extends Configured implements Tool { private Jedis jedis = null; private void loadRedis(String ipCountryTable) throws IOException { FileReader freader = new FileReader(ipCountryTable); BufferedReader breader = new BufferedReader(freader); jedis = new Jedis("localhost"); jedis.select(0); jedis.flushDB(); String line = breader.readLine(); while(line != null) { String[] tokens = line.split(" "); String ip = tokens[0]; String country = tokens[1]; jedis.set(ip, country); line = breader.readLine(); } System.err.println("db size = " + jedis.dbSize()); } public int run(String[] args) throws Exception { Path inputPath = new Path(args[0]); String ipCountryTable = args[1]; Path outputPath = new Path(args[2]); loadRedis(ipCountryTable); Configuration conf = getConf(); Job weblogJob = new Job(conf); weblogJob.setJobName("MapSideJoinRedis"); weblogJob.setNumReduceTasks(0); weblogJob.setJarByClass(getClass()); weblogJob.setMapperClass(WeblogMapper.class); weblogJob.setMapOutputKeyClass(Text.class); weblogJob.setMapOutputValueClass(Text.class); weblogJob.setOutputKeyClass(Text.class); weblogJob.setOutputValueClass(Text.class); weblogJob.setInputFormatClass(TextInputFormat.class); weblogJob.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(weblogJob, inputPath); FileOutputFormat.setOutputPath(weblogJob, outputPath); if(weblogJob.waitForCompletion(true)) { return 0; } return 1; } public static void main(String[] args) throws Exception { int returnCode = ToolRunner.run(new MapSideJoinRedis(), args); System.exit(returnCode); } }
apache_nobots_tsv.txt
dataset with the nobots_ip_country_tsv.txt
dataset that has been loaded to Redis:public class WeblogMapper extends Mapper<Object, Text, Text, Text> { private Map<String, String> ipCountryMap = new HashMap<String, String>(); private Jedis jedis = null; private Text outputKey = new Text(); private Text outputValue = new Text(); private String getCountry(String ip) { String country = ipCountryMap.get(ip); if (country == null) { if (jedis == null) { jedis = new Jedis("localhost"); jedis.select(0); } country = jedis.get(ip); ipCountryMap.put(ip, country); } return country; } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String row = value.toString(); String[] tokens = row.split(" "); String ip = tokens[0]; String country = getCountry(ip); outputKey.set(country); outputValue.set(row); context.write(outputKey, outputValue); } }
$ hadoop jar AdvJoinChapter5-1.0-SNAPSHOT.jar com.packt.ch5.advjoin.redis.MapSideJoinRedis /user/hadoop/apache_nobots_tsv.txt ./nobots_ip_country_tsv.txt /user/hadoop/data_jnd
In steps 1 and 2, we created a class to set up a map-only job. This class looks very familiar to other map-only jobs we've created in past recipes, except for the loadRedis()
method.
The loadRedis()
method first connects to the local Redis instance using the Jedis constructor. Next, we used the
select()
method to choose which Redis database we wanted to use. A single Redis instance can contain a number of databases, which are identified using a numeric index. Once we get connected to the desired database, we call the method
flushDB()
, which deletes everything currently stored in the current database. Finally, we read the nobots_ip_country_tsv.txt
file from the folder in which you are currently working, and load the Redis instance with the key-value pair ip
/country
by using the
set()
method.
This recipe used a very simple string data structure to store the ip
/country
key-value pairs. Redis supports many other data structures, including hashes, lists, and sorted sets. In addition, Redis has support for transactions, and a publish/subscribe mechanism. Visit the Redis website http://redis.io/, to review all of this functionality in depth.