This recipe will use the MongoOutputFormat
class to
load data from an HDFS instance into a MongoDB collection.
The easiest way to get started with the Mongo Hadoop Adaptor is to clone the Mongo-Hadoop project from GitHub and build the project configured for a specific version of Hadoop. A Git client must be installed to clone this project.
This recipe assumes that you are using the CDH3 distribution of Hadoop.The official Git Client can be found at http://git-scm.com/downloads.
GitHub for Windows can be found at http://windows.github.com/.
GitHub for Mac can be found at http://mac.github.com/.
The Mongo Hadoop Adaptor can be found on GitHub at https://github.com/mongodb/mongo-hadoop. This project needs to be built for a specific version of Hadoop. The resulting JAR file must be installed on each node in the $HADOOP_HOME/lib
folder.
The Mongo Java Driver is required to be installed on each node in the $HADOOP_HOME/lib
folder. It can be found at https://github.com/mongodb/mongo-java-driver/downloads.
Complete the following steps to copy data form HDFS into MongoDB:
mongo-hadoop
repository with the following command line:git clone https://github.com/mongodb/mongo-hadoop.git
git checkout release-1.0
mongo-hadoop
should target. In the folderthat mongo-hadoop
was cloned to, open the build.sbt
file with a text editor. Change the following line:hadoopRelease in ThisBuild := "default"
to
hadoopRelease in ThisBuild := "cdh3"
mongo-hadoop
:./sbt package
This will create a file named mongo-hadoop-core_cdh3u3-1.0.0.jar
in the core/targ
mongo-hadoop
and the MongoDB Java Driver to $HADOOP_HOME/lib
on each node:cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib
weblog_entries.txt
file from HDFS and write them to MongoDB using the MongoOutputFormat
class:import java.io.*; import org.apache.commons.logging.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.*; import org.bson.*; import org.bson.types.ObjectId; import com.mongodb.hadoop.*; import com.mongodb.hadoop.util.*; public class ExportToMongoDBFromHDFS { private static final Log log = LogFactory.getLog(ExportToMongoDBFromHDFS.class); public static class ReadWeblogs extends Mapper<LongWritable, Text, ObjectId, BSONObject>{ public void map(Text key, Text value, Context context) throws IOException, InterruptedException{ System.out.println("Key: " + key); System.out.println("Value: " + value); String[] fields = value.toString().split(" "); String md5 = fields[0]; String url = fields[1]; String date = fields[2]; String time = fields[3]; String ip = fields[4]; BSONObject b = new BasicBSONObject(); b.put("md5", md5); b.put("url", url); b.put("date", date); b.put("time", time); b.put("ip", ip); context.write( new ObjectId(), b); } } public static void main(String[] args) throws Exception{ final Configuration conf = new Configuration(); MongoConfigUtil.setOutputURI(conf,"mongodb://<HOST>:<PORT>/test.weblogs"); System.out.println("Configuration: " + conf); final Job job = new Job(conf, "Export to Mongo"); Path in = new Path("/data/weblogs/weblog_entries.txt"); FileInputFormat.setInputPaths(job, in); job.setJarByClass(ExportToMongoDBFromHDFS.class); job.setMapperClass(ReadWeblogs.class); job.setOutputKeyClass(ObjectId.class); job.setOutputValueClass(BSONObject.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(MongoOutputFormat.class); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true) ? 0 : 1 ); } }
hadoop jar ExportToMongoDBFromHDFS.jar
db.weblogs.find();