JSON has become the most common text-based data representation format these days. In this recipe, we'll see how to load data represented as JSON into our DataFrame. To make it more interesting, let's have our JSON in HDFS instead of our local filesystem.
The Hadoop Distributed File System (HDFS) is a highly distributed filesystem that is both scalable and fault tolerant. It is a critical part of the Hadoop ecosystem and is inspired by the Google File System paper (http://research.google.com/archive/gfs.html). More details about the architecture and communication protocols on HDFS can be found at http://hadoop.apache.org/docs/r1.2.1/hdfs_design.html.
In this recipe, we'll see three subrecipes:
sqlContext.jsonFile
sqlContext.jsonRDD
DataFrame
This recipe consists of three steps:
json
(profiles.json
) in HDFS: A copy of the data file is added to our project repository, and it can be downloaded from https://github.com/arunma/ScalaDataAnalysisCookbook/blob/master/chapter3-data-loading/profiles.json:hadoop fs -mkdir -p /data/scalada hadoop fs -put profiles.json /data/scalada/profiles.json hadoop fs -ls /data/scalada -rw-r--r-- 1 Gabriel supergroup 176948 2015-05-16 22:13 /data/scalada/profiles.json
The following screenshot shows the HDFS file explorer available at http://localhost:50070
, which confirms that our upload is successful:
SparkConfig
, SparkContext
, and then SQLContext
:val conf = new SparkConf().setAppName("DataFromJSON").setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc)
jsonFile
function of SQLContext
to create a DataFrame. This is very similar to the sqlContext.csvFile
function that we used in Chapter 2, Getting Started with Apache Spark DataFrames. There's just one thing that we need to watch out here; our .json
should be formatted as one line per record. It is unusual to store JSON as one line per record considering that it is a structured format, but the jsonFile
function treats every single line as one record, failing to do which it would throw a scala.MatchError
error while parsing: val dFrame=sqlContext.jsonFile("hdfs://localhost:9000/data/scalada/profiles.json")
That's it! We are done! Let's just print the schema and sample the data:
dFrame.printSchema() dFrame.show()
The following screenshot shows the schema that is inferenced from the JSON file. Note that now the age is resolved as long and tags are resolved as an array of string, as you can see here:
In the previous section, we saw how we can directly import a textFile
containing JSON records as a DataFrame using sqlContext.jsonFile
. Now, we'll see an alternate approach, wherein we construct an RDD[String]
from the same profiles.json
file and then convert them into a DataFrame. This has a distinct advantage from the previous approach—we can have more control over the schema instead of relying on the one that is inferenced:
val strRDD=sc.textFile("hdfs://localhost:9000/data/scalada/profiles.json") val jsonDf=sqlContext.jsonRDD(strRDD) jsonDf.printSchema()
The following is the output of the preceding command:
Using jsonRDD
and letting it resolve the schema by itself is clean and simple. However, it gives less control over the types; for example, the age
field must be Integer
and not Long
. Similarly, the `registered`
column is inferenced as a String
while it is actually a TimeStamp
. In order to achieve this, let's go ahead and declare our own schema. The way we do this is by constructing a StructType
and StructField
:
val profilesSchema = StructType( Seq( StructField("_id",StringType, true), StructField("about",StringType, true), StructField("address",StringType, true), StructField("age",IntegerType, true), StructField("company",StringType, true), StructField("email",StringType, true), StructField("eyeColor",StringType, true), StructField("favoriteFruit",StringType, true), StructField("gender",StringType, true), StructField("name",StringType, true), StructField("phone",StringType, true), StructField("registered",TimestampType, true), StructField("tags",ArrayType(StringType), true) ) ) val jsonDfWithSchema=sqlContext.jsonRDD(strRDD, profilesSchema) jsonDfWithSchema.printSchema() //Has timestamp jsonDfWithSchema.show()
Another advantage of specifying our own schema is that all the columns need not be specified in the StructType
. We just need to specify the columns that we are interested in, and only those columns will be available in the target DataFrame. Also, any column that is declared in the schema but is not available in the dataset will be filled in with null values.
The following is the output.
We can see that the registered
feature is considered to have a timestamp
data type and age
as integer
:
Finally, just for kicks, let's fire a filter query based on the timestamp. This involves three steps:
profilesTable
: jsonRDDWithSchema.registerTempTable("profilesTable")
TimeStamp
: val filterCount = sqlContext.sql("select * from profilesTable where registered> CAST('2014-08-26 00:00:00' AS TIMESTAMP)").count
println("Filtered based on timestamp count : " + filterCount) //106
If you aren't comfortable with having the schema in the code and would like to save the schema in a file, it's just a one-liner for you:
import scala.reflect.io.File import scala.io.Source //Writes schema as JSON to file File("profileSchema.json").writeAll(profilesSchema.json)
Obviously, you would want to reconstruct the schema from JSON, and that's also a one-liner:
val loadedSchema = DataType.fromJson(Source.fromFile("profileSchema.json").mkString)
Let's check whether the loadedSchema
and the profileSchema
encapsulate the same schema by doing an equality check on their json
:
println ("ProfileSchema == loadedSchema :"+(loadedSchema.json==profilesSchema.json))
The output is shown as follows:
ProfileSchema == loadedSchema :true
If we would like to eyeball the json
, we have a nice method called prettyJson
that formats the json
:
//Print loaded schema println(loadedSchema.prettyJson)
The output is as follows: