Loading JSON into DataFrames

JSON has become the most common text-based data representation format these days. In this recipe, we'll see how to load data represented as JSON into our DataFrame. To make it more interesting, let's have our JSON in HDFS instead of our local filesystem.

The Hadoop Distributed File System (HDFS) is a highly distributed filesystem that is both scalable and fault tolerant. It is a critical part of the Hadoop ecosystem and is inspired by the Google File System paper (http://research.google.com/archive/gfs.html). More details about the architecture and communication protocols on HDFS can be found at http://hadoop.apache.org/docs/r1.2.1/hdfs_design.html.

How to do it…

In this recipe, we'll see three subrecipes:

  • How to create a schema-inferenced DataFrame from JSON using sqlContext.jsonFile
  • Alternatively, if we prefer to preprocess the input file before parsing it into JSON, we'll parse the input file as text and convert it into JSON using sqlContext.jsonRDD
  • Finally, we'll take a look at declaring an explicit schema and using it to create a DataFrame

Reading a JSON file using SQLContext.jsonFile

This recipe consists of three steps:

  1. Storing our json (profiles.json) in HDFS: A copy of the data file is added to our project repository, and it can be downloaded from https://github.com/arunma/ScalaDataAnalysisCookbook/blob/master/chapter3-data-loading/profiles.json:
      hadoop fs -mkdir -p /data/scalada
      hadoop fs -put profiles.json /data/scalada/profiles.json
      hadoop fs -ls /data/scalada
      -rw-r--r--   1 Gabriel supergroup     176948 2015-05-16 22:13 /data/scalada/profiles.json
    

    The following screenshot shows the HDFS file explorer available at http://localhost:50070, which confirms that our upload is successful:

    Reading a JSON file using SQLContext.jsonFile
  2. Creating contexts: We do the regular stuff—create SparkConfig, SparkContext, and then SQLContext:
    val conf = new SparkConf().setAppName("DataFromJSON").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
  3. Creating a DataFrame from JSON: In this step, we use the jsonFile function of SQLContext to create a DataFrame. This is very similar to the sqlContext.csvFile function that we used in Chapter 2, Getting Started with Apache Spark DataFrames. There's just one thing that we need to watch out here; our .json should be formatted as one line per record. It is unusual to store JSON as one line per record considering that it is a structured format, but the jsonFile function treats every single line as one record, failing to do which it would throw a scala.MatchError error while parsing:
      val dFrame=sqlContext.jsonFile("hdfs://localhost:9000/data/scalada/profiles.json")
    

That's it! We are done! Let's just print the schema and sample the data:

    dFrame.printSchema()
    dFrame.show()

The following screenshot shows the schema that is inferenced from the JSON file. Note that now the age is resolved as long and tags are resolved as an array of string, as you can see here:

Reading a JSON file using SQLContext.jsonFile

The next screenshot shows you a sample of the dataset:

Reading a JSON file using SQLContext.jsonFile

Reading a text file and converting it to JSON RDD

In the previous section, we saw how we can directly import a textFile containing JSON records as a DataFrame using sqlContext.jsonFile. Now, we'll see an alternate approach, wherein we construct an RDD[String] from the same profiles.json file and then convert them into a DataFrame. This has a distinct advantage from the previous approach—we can have more control over the schema instead of relying on the one that is inferenced:

    val strRDD=sc.textFile("hdfs://localhost:9000/data/scalada/profiles.json")
    val jsonDf=sqlContext.jsonRDD(strRDD)
    jsonDf.printSchema()

The following is the output of the preceding command:

Reading a text file and converting it to JSON RDD

Explicitly specifying your schema

Using jsonRDD and letting it resolve the schema by itself is clean and simple. However, it gives less control over the types; for example, the age field must be Integer and not Long. Similarly, the `registered` column is inferenced as a String while it is actually a TimeStamp. In order to achieve this, let's go ahead and declare our own schema. The way we do this is by constructing a StructType and StructField:

  val profilesSchema = StructType(
      Seq(
      StructField("_id",StringType, true),
    StructField("about",StringType, true),
    StructField("address",StringType, true),
    StructField("age",IntegerType, true),
    StructField("company",StringType, true),
    StructField("email",StringType, true),
    StructField("eyeColor",StringType, true),
    StructField("favoriteFruit",StringType, true),
    StructField("gender",StringType, true),
    StructField("name",StringType, true),
    StructField("phone",StringType, true),
    StructField("registered",TimestampType, true),
    StructField("tags",ArrayType(StringType), true)
      )
    )

  val jsonDfWithSchema=sqlContext.jsonRDD(strRDD, profilesSchema)

  jsonDfWithSchema.printSchema() //Has timestamp
  jsonDfWithSchema.show()

Note

Another advantage of specifying our own schema is that all the columns need not be specified in the StructType. We just need to specify the columns that we are interested in, and only those columns will be available in the target DataFrame. Also, any column that is declared in the schema but is not available in the dataset will be filled in with null values.

The following is the output.

We can see that the registered feature is considered to have a timestamp data type and age as integer:

Explicitly specifying your schema

Finally, just for kicks, let's fire a filter query based on the timestamp. This involves three steps:

  1. Register the schema as a temporary table for querying, as has been done several times in previous recipes. The following line of code registers a table by the name of profilesTable:
      jsonRDDWithSchema.registerTempTable("profilesTable")
    
  2. Let's fire away our filter query. The following query returns all profiles that have been registered after August 26, 2014. Since the registered field is a timestamp, we require an additional minor step of casting the parameter into a TimeStamp:
      val filterCount = sqlContext.sql("select * from profilesTable where registered> CAST('2014-08-26 00:00:00' AS TIMESTAMP)").count
    
  3. Let's print the count:
      println("Filtered based on timestamp count : " + filterCount) //106
    

There's more…

If you aren't comfortable with having the schema in the code and would like to save the schema in a file, it's just a one-liner for you:

  import scala.reflect.io.File
  import scala.io.Source
  //Writes schema as JSON to file
  File("profileSchema.json").writeAll(profilesSchema.json)

Obviously, you would want to reconstruct the schema from JSON, and that's also a one-liner:

  val loadedSchema = DataType.fromJson(Source.fromFile("profileSchema.json").mkString)

Let's check whether the loadedSchema and the profileSchema encapsulate the same schema by doing an equality check on their json:

println ("ProfileSchema == loadedSchema :"+(loadedSchema.json==profilesSchema.json))

The output is shown as follows:

ProfileSchema == loadedSchema :true

If we would like to eyeball the json, we have a nice method called prettyJson that formats the json:

  //Print loaded schema
  println(loadedSchema.prettyJson)

The output is as follows:

There's more…
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset