Sqoop is an Apache project that is part of the broader Hadoop ecosphere. In many ways Sqoop is similar to distcp
(See the Moving data efficiently between clusters using Distributed Copy recipe of this chapter). Both are built on
top of MapReduce and take advantage of its parallelism and fault tolerance. Instead of moving data between clusters, Sqoop was designed to move data from and into relational databases using a JDBC driver to connect.
Its functionality is extensive. This recipe will show how to use Sqoop to export data from HDFS to MySQL using the weblog entries as an example.
This example uses Sqoop v1.3.0.
If you are using CDH3, you already have Sqoop installed. If you are not running CDH3 you can find instructions for your distro at https://ccp.cloudera.com/display/CDHDOC/Sqoop+Installation.
This recipe assumes that you have a MySQL instance up and running that can reach your Hadoop cluster. The mysql.user
table is configured to accept a user connecting from the machine where you will be running Sqoop. Visit http://dev.mysql.com/doc/refman/5.5/en/installing.html for more information on installing and configuring MySQL.
The MySQL JDBC driver JAR file has been copied to $SQOOP_HOME/libs
. The driver can be downloaded from http://dev.mysql.com/downloads/connector/j/.
Follow the Importing and exporting data into HDFS using the Hadoop shell commands recipe of this chapter to load the weblog_entires.txt
file
into HDFS.
Complete the following steps to transfer data from HDFS to a MySQL table:
CREATE DATABASE logs;
weblogs_from_hdfs
table:USE logs; CREATE TABLE weblogs_from_hdfs ( md5 VARCHAR(32), url VARCHAR(64), request_date DATE, request_time TIME, ip VARCHAR(15) );
weblog_entries.txt
file from HDFS to MySQL:sqoop export -m 1 --connect jdbc:mysql://<HOST>:<PORT>/logs --username hdp_usr --password test1 --table weblogs_from_hdfs --export-dir /data/weblogs/05102012 --input-fields-terminated-by ' ' --mysql-delmiters
The output is as follows:
INFO mapreduce.ExportJobBase: Beginning export of weblogs_from_hdfs input.FileInputFormat: Total input paths to process : 1 input.FileInputFormat: Total input paths to process : 1 mapred.JobClient: Running job: job_201206222224_9010 INFO mapred.JobClient: Map-Reduce Framework INFO mapred.JobClient: Map input records=3000 INFO mapred.JobClient: Spilled Records=0 INFO mapred.JobClient: Total committed heap usage (bytes)=85000192 INFO mapred.JobClient: Map output records=3000 INFO mapred.JobClient: SPLIT_RAW_BYTES=133 INFO mapreduce.ExportJobBase: Transferred 248.3086 KB in 12.2398 seconds (20.287 KB/sec) INFO mapreduce.ExportJobBase: Exported 3000 records.
Sqoop loads the JDBC driver defined in the
--connect
statement from $SQOOP_HOME/libs
, where
$SQOOP_HOME
is the full path to the location where Sqoop is installed. The
--username
and
--password
options are used to authenticate the user issuing the command against the MySQL instance. The mysql.user
table must have an entry for the --username
and the host of each node in the Hadoop cluster; or else Sqoop will throw an exception indicating that the host is not allowed to
connect to the MySQL Server.
mysql> USE mysql; mysql> select host, user from user;
+---------------+-----------+ | user | host | +---------------+-----------+ | hdp_usr | hdp01 | | hdp_usr | hdp02 | | hdp_usr | hdp03 | | hdp_usr | hdp04 | | root | 127.0.0.1 | | root | ::1 | | root | localhost | +---------------+-----------+ 7 rows in set (1.04 sec)
In this example, we connected to the MySQL server using hdp_usr
. Our cluster has four machines, hdp01
, hdp02
, hdp03
, and hdp04
.
The --table
argument identifies the MySQL table that will receive the data from HDFS. This table must be created before running the Sqoop export
command. Sqoop uses the metadata of the table, the number of columns, and their types, to validate the data coming from the HDFS folder and to create INSERT
statements. For example, the export job can be thought of as reading each line of the weblogs_entries.txt
file in HDFS and producing the following output:
INSERT INTO weblogs_from_hdfs VALUES('aabba15edcd0c8042a14bf216c5', '/jcwbtvnkkujo.html', '2012-05-10', '21:25:44', '148.113.13.214'), INSERT INTO weblogs_from_hdfs VALUES('e7d3f242f111c1b522137481d8508ab7', '/ckyhatbpxu.html', '2012-05-10', '21:11:20', '4.175.198.160'), INSERT INTO weblogs_from_hdfs VALUES('b8bd62a5c4ede37b9e77893e043fc1', '/rr.html', '2012-05-10', '21:32:08', '24.146.153.181'), ...
By default, Sqoop export
creates INSERT
statements. If the --update-key
argument is specified, UPDATE
statements will be created instead. If the preceding example had used the argument --update-key md5
, the generated code would have run
like the following:
UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_date='2012-05-10'request_time='21:25:44' ip='148.113.13.214'WHERE md5='aabba15edcd0c8042a14bf216c5' UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_date='2012-05-10'request_time='21:11:20' ip='4.175.198.160' WHERE md5='e7d3f242f111c1b522137481d8508ab7' UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_date='2012-05-10'request_time='21:32:08' ip='24.146.153.181' WHERE md5='b8bd62a5c4ede37b9e77893e043fc1'
In the case where the
--update-key
value is not found, setting the --update-mode
to allowinsert
will insert the row.
The -m
argument sets the number of map jobs reading the file splits from HDFS. Each mapper will have its own connection to the MySQL Server. It will insert up to 100 records per statement. After it has completed 100 INSERT
statements, that is 10,000 records in total, it will commit the current transaction. It is possible that a map task failure could cause data inconsistency resulting in possible insert collisions or duplicated data. These issues can be overcome with the use of the
--staging-table
argument. This will cause the job to insert into a staging table, and
then in one transaction, move the data from the staging table to the table specified by the --table
argument. The --staging-table
argument must have the same format as --table
. The --staging-table
argument must be empty, or else the
--clear-staging-table
argument must be used.