Parquet (https://parquet.apache.org/) is rapidly becoming the go-to data storage format in the world of big data because of the distinct advantages it offers:
As you can see in the preceding screenshot, Parquet stores data in chunks of rows, say 100 rows. In Parquet terms, these are called RowGroups. Each of these RowGroups has chunks of columns inside them (or column chunks). Column chunks can hold more than a single unit of data for a particular column (as represented in the blue box in the first column). For example. Jai, Suri, and Dhina form a single chunk even though they are composed of three single units of data for Name.
Another unique feature is that these column chunks (groups of a single column's information) can be read independently. Let's consider the following image:
We can see that the items of column data are stored next to each other in a sequence. Since our queries are focused on just a few columns (a projection) most of the time and not on the entire table, this storage mechanism enables us to retrieve data much faster than reading the entire row data that is stored and filtering for columns. Also, with Spark's in-memory computations, the memory requirements are reduced in this way.
Parquet-MR
project (https://github.com/Parquet/parquet-mr).In this recipe, we'll cover the following steps:
The entire code for this recipe can be found at https://github.com/arunma/ScalaDataAnalysisCookbook/tree/master/chapter3-data-loading-parquet.
Before we dive into the steps, let's briefly look at our build.sbt
file, specifically the library dependencies and Avro settings (which we'll talk about in the following sections):
organization := "com.packt" name := "chapter3-data-loading-parquet" scalaVersion := "2.10.4" val sparkVersion="1.4.1" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion, "org.apache.spark" %% "spark-sql" % sparkVersion, "org.apache.spark" %% "spark-mllib" % sparkVersion, "org.apache.spark" %% "spark-hive" % sparkVersion, "org.apache.avro" % "avro" % "1.7.7", "org.apache.parquet" % "parquet-avro" % "1.8.1", "com.twitter" %% "chill-avro" % "0.6.0" ) resolvers ++= Seq( "Apache HBase" at "https://repository.apache.org/content/repositories/releases", "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/", "Twitter" at "http://maven.twttr.com/" ) fork := true seq( sbtavro.SbtAvro.avroSettings : _*) (stringType in avroConfig) := "String" javaSource in sbtavro.SbtAvro.avroConfig <<= (sourceDirectory in Compile)(_ / "java")
Now that we have build.sbt
out of the way, let's go ahead and look at the code behind each of the listed steps.
We can actually create a DataFrame directly from CSV using the com.databricks/spark-csv
file, as we saw in Chapter 2, Getting Started with Apache Spark DataFrames, but for this recipe, we'll just tokenize the CSV and create classes from it. The input CSV has a header column. So, the conversion process involves skipping the first row.
The class file that we will discuss in this section is the https://github.com/arunma/ScalaDataAnalysisCookbook/blob/master/chapter3-data-loading-parquet/src/main/scala/com/packt/dataload/ParquetCaseClassMain.scala.
There are just two interesting things that you might notice in the code:
sqlContext.setConf("spark.sql.parquet.binaryAsString","true")
Some Parquet producing systems, such as Impala, binary encode the strings. In order to work around this issue, we set the following configuration, which says that if it sees binary data, it should be treated as a string:
Instead of using sqlContext.createDataFrame
, we just use a toDF()
on the RDD[Student]
. The SQLContext.Implicits
object has a number of implicit conversions that help us convert an RDD[T]
to a DataFrame directly. The only requirement for us, as expected, is to import the implicits:
import sqlContext.implicits._
The rest of the code is the same as we saw earlier:
val conf = new SparkConf().setAppName("CaseClassToParquet").setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //Treat binary encoded values as Strings sqlContext.setConf("spark.sql.parquet.binaryAsString","true") import sqlContext.implicits._ //Convert each line into Student val rddOfStudents = convertCSVToStudents("StudentData.csv", sc) //Convert RDD[Student] to a Dataframe using sqlContext.implicits val studentDFrame = rddOfStudents.toDF()
The convertCSVToStudents
method, which converts each line into a Student
object, looks like this:
def convertCSVToStudents(filePath: String, sc: SparkContext): RDD[Student] = { val rddOfStudents: RDD[Student] =sc.textFile(filePath).flatMap(line => { val data = line.split("\|") if (data(0) == "id") None else Some(Student(data(0), data(1), data(2), data(3))) }) rddOfStudents }
This is just a one-liner once we have the DataFrame. This can be done using either the saveAsParquetFile
or the save
method. If you wish to save it in a Hive table (https://hive.apache.org/), then there is also a saveAsTable
method for you:
//Save DataFrame as Parquet using saveAsParquetFile studentDFrame.saveAsParquetFile("studentPq.parquet") //OR //Save DataFrame as Parquet using the save method studentDFrame.save("studentPq.parquet", "parquet", SaveMode.Overwrite)
The save
methods create a directory in the location that you specify (here, we simply store it in our project directory). The directory holds the files that represent the serialized data. It is not entirely human readable, but you may notice that the data of a single column is stored together.
Just as we do for the rest of the recipes, let's read the file and sample the data for confirmation:
//Read data for confirmation val pqDFrame=sqlContext.parquetFile("studentPq.parquet") pqDFrame.show()
The following is the output:
Other than using the printSchema
method of the DataFrame to inspect the schema, we can use some interesting parquet tools provided as part of the parquet project to get a variety of other information.
The
parquet-tools
is a subproject of Parquet and is available at https://github.com/Parquet/parquet-mr/tree/master/parquet-tools.
Since Spark 1.4.1 uses Parquet 1.6.0rc3, we'll need to download that version of the tools from the Maven repository. The executables and the JARs can be downloaded as one bundle from https://repo1.maven.org/maven2/com/twitter/parquet-tools/1.6.0rc3/parquet-tools-1.6.0rc3-bin.tar.gz.
Let's put the tools into action. Specifically, we'll do three things in this step:
head
and cat
parquet-tools
command with schema
and the parquet
file as the parameter. As an example, let's print the schema using one of the part files:bash-3.2$ parquet-tools-1.6.0rc3/parquet-tools meta part-r-00000-20a8b58c-fe1d-43e7-b148-f874b78eb5ec.gz.parquet message root { optional binary id (UTF8); optional binary name (UTF8); optional binary phone (UTF8); optional binary email (UTF8); }
We see that the schema is indeed available in Parquet format and is derived from our case classes.
We see that the extra
information has the schema that is specific to the data model we used. This information is used when the data is deserialized. The meta
parameter of parquet-tools
will help achieve this:
bash-3.2$ parquet-tools-1.6.0rc3/parquet-tools meta part-r-00000-20a8b58c-fe1d-43e7-b148-f874b78eb5ec.gz.parquet creator: parquet-mr version 1.6.0rc3 (build d4d5a07ec9bd262ca1e93c309f1d7d4a74ebda4c) extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true, [more]... file schema: root --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- id: OPTIONAL BINARY O:UTF8 R:0 D:1 name: OPTIONAL BINARY O:UTF8 R:0 D:1 phone: OPTIONAL BINARY O:UTF8 R:0 D:1 email: OPTIONAL BINARY O:UTF8 R:0 D:1 row group 1: RC:50 TS:3516 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- id: BINARY GZIP DO:0 FPO:4 SZ:140/326/2.33 VC:50 ENC:RLE,BIT_PACKED,PLAIN name: BINARY GZIP DO:0 FPO:144 SZ:313/483/1.54 VC:50 ENC:RLE,BIT_PACKED,PLAIN phone: BINARY GZIP DO:0 FPO:457 SZ:454/961/2.12 VC:50 ENC:RLE,BIT_PACKED,PLAIN email: BINARY GZIP DO:0 FPO:911 SZ:929/1746/1.88 VC:50 ENC:RLE,BIT_PACKED,PLAIN
head
function will help us do that. It accepts an additional -n
parameter, where you can specify the number of records to be displayed:bash-3.2$ parquet-tools-1.6.0rc3/parquet-tools head -n 2 part-r-00001.parquet
The preceding command will display only two rows because of the additional -n 2
parameter.
The following is the output of this command:
id = 1 name = Burke phone = 1-300-746-8446 email = [email protected] id = 2 name = Kamal phone = 1-668-571-5046 email = [email protected]
Optionally, if you wish to display all the records in the file, you can use the cat
parameter with the
parquet-tools
command:
parquet-tools cat part-r-00001.parquet
As you can see from the meta information, the data is gzipped
by default. In order to use
Snappy compression, all that we need to do is set a configuration to our SQLContext
(actually the SQLConf
of SQLContext
). There's just one catch with regard to enabling Lempel–Ziv–Oberhumer (LZO) compression—we are required to install native-lzo
on all the machines where this data is stored. Otherwise, we get a "native-lzo library not available"
error message.
Let's enable Snappy (http://google.github.io/snappy/) compression by passing the configuration parameter of Parquet compression to Snappy:
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
After running the program, let's use the parquet-tools meta
command to verify it:
parquet-tools meta part-r-00000-aee54b77-288e-44b2-8f36-53b38a489e8d.snappy.parquet