Loading from RDBMS

As the final recipe on loading, let's try to load data from an RDBMS data source, which is MySQL in our case. This recipe assumes that you have already installed MySQL in your machine.

How to do it…

Let's go through the prerequisite steps first. If you already have a MySQL table to play with, you can safely ignore this step. We are just going to create a new database and a table and load some sample data into it.

The prerequisite step (optional):

  1. Creating a database and a table: This is achieved in MySQL by using the create database and the create table DDL:
    create database scalada;
    
    use scalada
    CREATE TABLE student (
    id varchar(20),
    `name` varchar(200),
    phone varchar(50),
    email varchar(200),
    PRIMARY KEY (id));
  2. Loading data into the table: Let's dump some data into the table. I wrote a very simple app to do this. Alternatively, you can use the load data infile command if you have "local-infile=1" enabled on your server and the client. Refer to https://dev.mysql.com/doc/refman/5.1/en/load-data.html for details about this command.

    As you can see, the program loads the Student.csv that we saw in Chapter 2, Getting Started with Apache Spark DataFrames, when we saw how to use DataFrames with Spark using the databricks.csv connector. Then, for each line, the data is inserted into the table using the plain old JDBC insert. As you might have already figured out, we need to add the MySQL connector java dependency to our build.sbt too:

    "mysql" % "mysql-connector-java" % "5.1.34"
    
    object LoadDataIntoMySQL extends App {
    
      val conf = new SparkConf().setAppName("LoadDataIntoMySQL").setMaster("local[2]")
      val config=ConfigFactory.load()
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
    
      val students = sqlContext.csvFile(filePath = "StudentData.csv", useHeader = true, delimiter = '|')
    
      students.foreachPartition { iter=>
          val conn = DriverManager.getConnection(config.getString("mysql.connection.url"))
          val statement = conn.prepareStatement("insert into scalada.student (id, name, phone, email) values (?,?,?,?) ")
    
          for (eachRow <- iter) {
            statement.setString(1, eachRow.getString(0))
            statement.setString(2, eachRow.getString(1))
            statement.setString(3, eachRow.getString(2))
            statement.setString(4, eachRow.getString(3))
            statement.addBatch()
          }
    
          statement.executeBatch()
          conn.close()
          println ("All rows inserted successfully")
      }
    
    }

A "select * from scalada.student" on the MySQL client should confirm this, as shown here:

How to do it…

Steps for loading RDBMS data into DataFrame:

The recommended approach to loading data from RDBMS databases is using the SQLContext's load method:

  1. Creating the Spark and SQLContext: You may have already become familiar with this step by looking at the previous recipes:
      val conf = new SparkConf().setAppName("DataFromRDBMS").setMaster("local[2]")
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
  2. Constructing a map of options: This map is expected to have not only the driver and the connection URL, but also the query to be invoked in order to load the data. In this example, we'll store the parameter values in an external Typesafe config file and load the values into our program.

    The Typesafe application.conf is located at src/main/resources as per standard SBT/Maven conventions. Here is a screenshot that shows the contents of application.conf:

    How to do it…

    Now let's look at the code that constructs the map:

      val config = ConfigFactory.load()
    
      val options = Map(
        "driver" -> config.getString("mysql.driver"),
        "url" -> config.getString("mysql.connection.url"),
        "dbtable" -> "(select * from student) as student",
        "partitionColumn" -> "id",
        "lowerBound" -> "1",
        "upperBound" -> "100",
        "numPartitions"-> "2")

    The first three parameters are straightforward. The numPartitions specifies the number of partitions for this job, and partitionColumn specifies the column in the table based on which the job has to be partitioned. The lowerBound and upperBound are values of the "id" field. The amount of data to be handled by a single partition is calculated using the number of partitions and the lower and upper bounds.

  3. Loading data from the table: The load function of SQLContext expects two parameters. The first one specifies that the source of the data is through "jdbc", and the second parameter is the options that we constructed in step 2. Let's now print the schema and show the first 20 rows, as we always do:
      val dFrame=sqlContext.load("jdbc", options)
      dFrame.printSchema()
      dFrame.show()
    

This is the output:

root
 |-- id: string (nullable = false)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)

We see that the schema of the DataFrame is derived from the MySQL table definition by examining the not nullable constraint of the id field.

The output is as follows:

How to do it…
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset