As the final recipe on loading, let's try to load data from an RDBMS data source, which is MySQL in our case. This recipe assumes that you have already installed MySQL in your machine.
Let's go through the prerequisite steps first. If you already have a MySQL table to play with, you can safely ignore this step. We are just going to create a new database and a table and load some sample data into it.
The prerequisite step (optional):
create database scalada; use scalada CREATE TABLE student ( id varchar(20), `name` varchar(200), phone varchar(50), email varchar(200), PRIMARY KEY (id));
load data infile
command if you have "local-infile=1"
enabled on your server and the client. Refer to https://dev.mysql.com/doc/refman/5.1/en/load-data.html for details about this command.As you can see, the program loads the Student.csv
that we saw in Chapter 2, Getting Started with Apache Spark DataFrames, when we saw how to use DataFrames with Spark using the databricks.csv
connector. Then, for each line, the data is inserted into the table using the plain old JDBC insert. As you might have already figured out, we need to add the MySQL connector java
dependency to our build.sbt
too:
"mysql" % "mysql-connector-java" % "5.1.34" object LoadDataIntoMySQL extends App { val conf = new SparkConf().setAppName("LoadDataIntoMySQL").setMaster("local[2]") val config=ConfigFactory.load() val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val students = sqlContext.csvFile(filePath = "StudentData.csv", useHeader = true, delimiter = '|') students.foreachPartition { iter=> val conn = DriverManager.getConnection(config.getString("mysql.connection.url")) val statement = conn.prepareStatement("insert into scalada.student (id, name, phone, email) values (?,?,?,?) ") for (eachRow <- iter) { statement.setString(1, eachRow.getString(0)) statement.setString(2, eachRow.getString(1)) statement.setString(3, eachRow.getString(2)) statement.setString(4, eachRow.getString(3)) statement.addBatch() } statement.executeBatch() conn.close() println ("All rows inserted successfully") } }
A "select * from scalada.student"
on the MySQL client should confirm this, as shown here:
Steps for loading RDBMS data into DataFrame:
The recommended approach to loading data from RDBMS databases is using the SQLContext's load
method:
SQLContext
: You may have already become familiar with this step by looking at the previous recipes:val conf = new SparkConf().setAppName("DataFromRDBMS").setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc)
Typesafe config
file and load the values into our program.The Typesafe application.conf
is located at src/main/resources
as per standard SBT/Maven conventions. Here is a screenshot that shows the contents of application.conf
:
Now let's look at the code that constructs the map:
val config = ConfigFactory.load() val options = Map( "driver" -> config.getString("mysql.driver"), "url" -> config.getString("mysql.connection.url"), "dbtable" -> "(select * from student) as student", "partitionColumn" -> "id", "lowerBound" -> "1", "upperBound" -> "100", "numPartitions"-> "2")
The first three parameters are straightforward. The numPartitions
specifies the number of partitions for this job, and partitionColumn
specifies the column in the table based on which the job has to be partitioned. The lowerBound
and upperBound
are values of the "id"
field. The amount of data to be handled by a single partition is calculated using the number of partitions and the lower and upper bounds.
load
function of SQLContext
expects two parameters. The first one specifies that the source of the data is through "jdbc"
, and the second parameter is the options that we constructed in step 2. Let's now print the schema and show the first 20 rows, as we always do:val dFrame=sqlContext.load("jdbc", options) dFrame.printSchema() dFrame.show()
This is the output:
root |-- id: string (nullable = false) |-- name: string (nullable = true) |-- phone: string (nullable = true) |-- email: string (nullable = true) |-- gender: string (nullable = true)
We see that the schema of the DataFrame is derived from the MySQL table definition by examining the not nullable
constraint of the id
field.
The output is as follows: