Parquet is a kind of highly efficient columnar storage, but it is also relatively new. Avro (https://avro.apache.org) is a widely used row-based storage format. This recipe showcases how we can retain the older and flexible Avro schema in our code but still use the Parquet format during storage.
The Spark MR project (yes, the one that has the Parquet tools we saw in the previous recipe) has converters for almost all the popular data formats. These model converters take your format and convert it into Parquet format before causing it to persist.
In this recipe, we'll use the Avro data model and serialize the data in a Parquet file. The recipe involves the following steps:
The Avro schema is defined using JSON. In our case, we'll just use the same Student.csv
as the input file. So, let's code the four fields— id
, name
, phone
, and email
—in the schema:
{"namespace": "studentavro.avro", "type": "record", "name": "StudentAvro", "fields": [ {"name": "id", "type": ["string", "null"]}, {"name": "name", "type": ["string", "null"]}, {"name": "phone", "type": ["string", "null"]}, {"name": "email", "type": ["string", "null"]} ] }
Probably, you are already familiar with Avro, or you have already understood the schema just by taking a look at it, but let me bore you with some explanation of the schema anyway.
The namespace
and name
attributes in the JSON translate into our package name and class name in our world, respectively. So, our generated class will have a fully qualified name as studentavro.avro.StudentAvro
. The "record"
(of the type
attribute) is one of the complex types in Avro (http://avro.apache.org/docs/1.7.6/spec.html#schema_complex). Let me rephrase this again. A record roughly translates to classes in Java/Scala. It is at the topmost level in the schema hierarchy. A record can have multiple fields encapsulated inside it, and these fields can be primitives (https://avro.apache.org/docs/1.7.7/spec.html#schema_primitive) or other complex types. The last bit about the type
having an array of types is interesting ("type": ["string", "null"]
). It just means that the field can be more than one type
. In Avro terms, it is called a union.
Now that we are done with the schema, let's save this file with an extension of .avsc
. I have saved it as student.avsc
in the src/main/avro
directory.
The next step is to generate a class from the schema. The reason we stored the avro
schema file in the src/main/avro
folder is this: we'll be using an sbt-avro
plugin (https://github.com/cavorite/sbt-avro) to generate a Java class from the schema. Configuring the plugin is as easy as configuring any other plugin for SBT:
project/plugins.sbt
:addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")
build.sbt
:seq( sbtavro.SbtAvro.avroSettings : _*)
sbt avro:generate
. You can see the generated Java file at target/scala-2.10/src_managed/main/compiled_avro/studentavro/avro/StudentAvro.java
.libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion, "org.apache.spark" %% "spark-sql" % sparkVersion, "org.apache.spark" %% "spark-mllib" % sparkVersion, "org.apache.spark" %% "spark-hive" % sparkVersion, "org.apache.avro" % "avro" % "1.7.7", "org.apache.parquet" % "parquet-avro" % "1.8.1", "com.twitter" %% "chill-avro" % "0.6.0" ) sbt compile
This step is very similar to the previous recipe in the sense that we use the convertCSVToStudents
function to generate an RDD of the StudentAvro
object. Also, since this isn't a Scala class and the generated Java object comes up with a builder inside it, we use the builder to construct the class fluently (http://en.wikipedia.org/wiki/Fluent_interface):
val conf = new SparkConf().setAppName("AvroModelToParquet").setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") val rddOfStudents = convertCSVToStudents("StudentData.csv", sc) //The CSV has a header row. Zipping with index and skipping the first row def convertCSVToStudents(filePath: String, sc: SparkContext): RDD[StudentAvro] = { val rddOfStudents: RDD[StudentAvro]=sc.textFile(filePath).flatMap(eachLine => { val data = eachLine.split("\|") if (data(0) == "id") None else Some(StudentAvro.newBuilder() .setId(data(0)) .setName(data(1)) .setPhone(data(2)) .setEmail(data(3)).build()) }) rddOfStudents }
This is a tricky step and involves multiple substeps. Let's decipher this step backwards. We fall back to RDD[StudentAvro]
in this example instead of a DataFrame because DataFrames can be constructed only from an RDD of case classes (or classes that extend Product
, as we saw earlier in this chapter) or from RDD[org.apache.spark.sql.Row]
. If you prefer to use DataFrames, you can read the CSV as an array of values, and use RowFactory.create
for each array of values. Once an RDD[Row]
is available, we can use sqlContext.createDataFrame
to convert it to a DataFrame:
SequenceFile
, we can use saveAsNewAPIHadoopFile
. A sequence file is simply a text file that holds key-value pairs. We could have chosen one of the Student
attributes as a key, but for the sake of it, let's have it as a Void
in this example.To represent a pair (key-value) in Spark, we use PairRDD
(https://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions). Not surprisingly, saveAsNewAPIHadoopFile
is available only for PairRDDs. To convert the existing RDD[StudentAvro]
to a PairRDD[Void,StudentAvro]
, we use the map
function:
val pairRddOfStudentsWithNullKey = rddOfStudents.map(each => (null, each))
val conf = new SparkConf().setAppName("AvroModelToParquet").setMaster("local[2]") conf.set("spark.kryo.registrator", classOf[StudentAvroRegistrator].getName) conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
So, we say using the "spark.serializer"
configuration that we intend to use KryoSerializer
, and that our registrator
is StudentAvroRegistrator
. As you may expect, what the Registrator
does is register our StudentAvro
class as a candidate for Kryo serialization. The twitter-chill
project (https://github.com/twitter/chill) provides a nice extension to delegate the Kryo serializer to use the Avro serialization:
class StudentAvroRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[StudentAvro], AvroSerializer.SpecificRecordBinarySerializer[StudentAvro]) } }
org.apache.hadoop.mapreduce.OutputFormat
specifies the output format of the file that we are going to write, and as expected, we use ParquetOutputFormat
(this is available in the parquet-hadoop
subproject in the parquet-mr
project). There are two things that an OutputFormat
requires:WriteSupport
class, which knows how to convert the Avro data model to the actual format. This is achieved with the following line: ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
StudentAvro
is accessible by using the getClassSchema
function. This line of code achieves that: AvroParquetOutputFormat.setSchema(job, StudentAvro.getClassSchema)
Now, what's that job
parameter doing here in these two lines of code? The job
object is just an instance of org.apache.hadoop.mapreduce.Job
:
val job = new Job()
When we call the setWriteSupportClass
and setSchema
methods of ParquetOutputFormat
and AvroParquetOutputFormat
, the resulting configuration is captured inside the JobConf
encapsulated inside the Job
object. We'll be using this job
configuration while saving the data in a sequence file.
saveAsNewAPIHadoopFile
. The save
method requires a bunch of parameters, each of which we have already discussed. The first parameter is the filename, followed by the key and the value classes. The fourth is the OutputFormat
of the file, and finally comes the job
configuration itself:pairRddOfStudentsWithNullKey.saveAsNewAPIHadoopFile("studentAvroPq", classOf[Void], classOf[StudentAvro], classOf[AvroParquetOutputFormat], job.getConfiguration())
We saw the entire program in bits and pieces, so for the sake of completion, let's see it completely:
object ParquetAvroSchemaMain extends App { val conf = new SparkConf().setAppName("AvroModelToParquet").setMaster("local[2]") conf.set("spark.kryo.registrator", classOf[StudentAvroRegistrator].getName) conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val job = new Job() val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") val rddOfStudents = convertCSVToStudents("StudentData.csv", sc) ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) AvroParquetOutputFormat.setSchema(job, StudentAvro.getClassSchema) val pairRddOfStudentsWithNullKey = rddOfStudents.map(each => (null, each)) pairRddOfStudentsWithNullKey.saveAsNewAPIHadoopFile("studentAvroPq", classOf[Void], classOf[StudentAvro], classOf[AvroParquetOutputFormat], job.getConfiguration()) //The CSV has a header row. Zipping with index and skipping the first row def convertCSVToStudents(filePath: String, sc: SparkContext): RDD[StudentAvro] = { val rddOfStudents: RDD[StudentAvro]=sc.textFile(filePath).flatMap(eachLine => { val data = eachLine.split("\|") if (data(0) == "id") None else Some(StudentAvro.newBuilder() .setId(data(0)) .setName(data(1)) .setPhone(data(2)) .setEmail(data(3)).build()) }) rddOfStudents } } class StudentAvroRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[StudentAvro], AvroSerializer.SpecificRecordBinarySerializer[StudentAvro]) } }
As always, let's read the file back for confirmation. The function to be called for this is newAPIHadoopFile
, which accepts a similar set of parameters as saveAsNewAPIHadoopFile
: the name of the file, InputFormat, the key class, the value class, and finally the job configuration. Note that we are using newAPIHadoopFile
instead of the previously used the parquetFile
method. This is because we are reading from a Hadoop sequence file:
//Reading the file back for confirmation. ParquetInputFormat.setReadSupportClass(job, classOf[AvroWriteSupport]) val readStudentsPair = sc.newAPIHadoopFile("studentAvroPq", classOf[AvroParquetInputFormat[StudentAvro]], classOf[Void], classOf[StudentAvro], job.getConfiguration()) val justStudentRDD: RDD[StudentAvro] = readStudentsPair.map(_._2) val studentsAsString = justStudentRDD.collect().take(5).mkString(" ") println(studentsAsString)
This is the output:
We'll also use Parquet tools to confirm that the schema that is stored in the Parquet file is indeed an avro
schema:
/Users/Gabriel/Dropbox/arun/ScalaDataAnalysis/git/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc3/parquet-tools meta /Users/Gabriel/Dropbox/arun/ScalaDataAnalysis/Code/scaladataanalysisCB-tower/chapter3-data-loading-parquet/studentAvroPq
Yup! Looks like it is! The extra
section in meta
does confirm that the avro
schema is stored:
creator: parquet-mr extra: parquet.avro.schema = {"type":"record","name":"StudentAvro","namespace":"studentavro.avro","fields":[{"name":"id","type":[{"type":"string","avro.java.string":"Stri [more]...