Case classes have an inherent limitation. They can hold only 22 attributes—Catch 22, if you will. While a reasonable percentage of datasets would fit in that budget, in many cases, the limitation of 22 features in a dataset is a huge turnoff. In this recipe, we'll take a sample
Student
dataset (http://archive.ics.uci.edu/ml/datasets/Student+Performance), which has 33 features, and we'll see how we can work around this.
Case classes in Scala cannot go beyond encapsulating 22 fields because the companion classes that are generated (during compilation) for these case classes cannot find the matching FunctionN
and TupleN
classes. Let's take the example of the Employee
case class that we created in Chapter 2, Getting Started with Apache Spark DataFrames:
case class Employee(id:Int, name:String)
When we look at its decompiled companion object, we notice that for the two constructor parameters of the case class, the companion class uses Tuple2
and AbstractFunction2
in its unapply
method, the method that gets invoked when we pattern-match against a case class. The problem we face is that the Scala library has objects only until Tuple22
and Function22
(probably because outside the data analysis world, having an entity object with 10 fields is not a great idea). However, there is a simple yet powerful workaround, and we will be seeing it in this recipe.
We saw in Chapter 2, Getting Started with Apache Spark DataFrames (in the Creating a DataFrame from CSV recipe), that the requirement for creating a DataFrame using SQLContext.createDataFrame
from a collection of classes is that the class must extend scala.Product
. So, what we intend to do is write our own class that extends from scala.Product
.
This recipe consists of four steps:
SQLContext
from SparkContext
and Config
.Student
class that extends Product
and overrides the necessary functions.Student
classes from the sample dataset (student-mat.csv
).Refer to the How it works… section of this recipe for a basic introduction to RDD.
The code for this recipe can be found at https://github.com/arunma/ScalaDataAnalysisCookbook/tree/master/chapter3-data-loading.
Let's now cover these steps in detail:
SQLContext
: As with our recipes from the previous chapter, we construct SparkContext
from SparkConfig
and then create an SQLContext
from SparkContext
:val conf=new SparkConf().setAppName("DataWith33Atts").setMaster("local[2]") val sc=new SparkContext(conf) val sqlContext=new SQLContext(sc)
Student
class: Our next step is to create a simple Scala class that declares its constructor parameters, and make it extend Product
.Making a class extend Product
requires us to override two functions from scala.Product
and one function from scala.Equals
(which scala.Product
, in turn, extends from). The implementation of each of these functions is pretty straightforward.
Refer to the API docs of Product
(http://www.scala-lang.org/api/2.10.4/index.html#scala.Product) and Equals
(http://www.scala-lang.org/api/2.10.4/index.html#scala.Equals) for more details.
Firstly, let's make our Student
class declare its fields and extend Product
:
class Student (school:String, sex:String, age:Int, address:String, famsize:String, pstatus:String, medu:Int, fedu:Int, mjob:String, fjob:String, reason:String, guardian:String, traveltime:Int, studytime:Int, failures:Int, schoolsup:String, famsup:String, paid:String, activities:String, nursery:String, higher:String, internet:String, romantic:String, famrel:Int, freetime:Int, goout:Int, dalc:Int, walc:Int, health:Int, absences:Int, g1:Int, g2:Int, g3:Int) extends Product{
Next, let's implement these three functions after briefly looking at what they are expected to do:
productArity():Int
: This returns the size of the attributes. In our case, it's 33
. So, our implementation looks like this:override def productArity: Int = 33
productElement(n:Int):Any
: Given an index, this returns the attribute. As protection, we also have a default case, which throws an IndexOutOfBoundsException
exception:@throws(classOf[IndexOutOfBoundsException]) override def productElement(n: Int): Any = n match { case 0 => school case 1 => sex case 2 => age case 3 => address case 4 => famsize case 5 => pstatus case 6 => medu case 7 => fedu case 8 => mjob case 9 => fjob case 10 => reason case 11 => guardian case 12 => traveltime case 13 => studytime case 14 => failures case 15 => schoolsup case 16 => famsup case 17 => paid case 18 => activities case 19 => nursery case 20 => higher case 21 => internet case 22 => romantic case 23 => famrel case 24 => freetime case 25 => goout case 26 => dalc case 27 => walc case 28 => health case 29 => absences case 30 => g1 case 31 => g2 case 32 => g3 case _ => throw new IndexOutOfBoundsException(n.toString()) }
canEqual (that:Any):Boolean
: This is the last of the three functions, and it serves as a boundary condition when an equality check is being done against this class:override def canEqual(that: Any): Boolean = that.isInstanceOf[Student]
student-mat.csv
file: Now that we have our Student
class ready, let's convert the "student-mat.csv"
input file into a DataFrame:val rddOfStudents=convertCSVToStudents("student-mat.csv", sc) def convertCSVToStudents(filePath: String, sc: SparkContext): RDD[Student] = { val rddOfStudents: RDD[Student] = sc.textFile(filePath).flatMap(eachLine => Student(eachLine)) rddOfStudents }
As you can see, we have an apply
method for Student
that accepts a String
and returns an Option[Student]
. We use flatMap
to filter out None
thereby resulting in RDD[Student]
.
Let's look at the Student
companion object's apply
function. It's a very simple function that takes a String
, splits it based on semicolons into an array, and then passes the parameters to the Student's
constructor. The method returns None
if there is an error:
object Student { def apply(str: String): Option[Student] = { val paramArray = str.split(";").map(param => param.replaceAll(""", "")) //Few values have extra double quotes around it Try( new Student(paramArray(0), paramArray(1), paramArray(2).toInt, paramArray(3), paramArray(4), paramArray(5), paramArray(6).toInt, paramArray(7).toInt, paramArray(8), paramArray(9), paramArray(10), paramArray(11), paramArray(12).toInt, paramArray(13).toInt, paramArray(14).toInt, paramArray(15), paramArray(16), paramArray(17), paramArray(18), paramArray(19), paramArray(20), paramArray(21), paramArray(22), paramArray(23).toInt, paramArray(24).toInt, paramArray(25).toInt, paramArray(26).toInt, paramArray(27).toInt, paramArray(28).toInt, paramArray(29).toInt, paramArray(30).toInt, paramArray(31).toInt, paramArray(32).toInt)) match { case Success(student) => Some(student) case Failure(throwable) => { println (throwable.getMessage()) None } } }
RDD[Student]
. Converting an RDD[T]
to a DataFrame of the same type is just a matter of calling the toDF()
function. You are required to import sqlContext.implicits._
. Optionally, you can use the createDataFrame
method of sqlContext
too.We then print the schema using the DataFrame's printSchema()
method and sample data for confirmation using the show()
method:
import sqlContext.implicits._ //Create DataFrame val studentDFrame = rddOfStudents.toDF() studentDFrame.printSchema() studentDFrame.show()
The following is the output of the preceding code:
root |-- school: string (nullable = true) |-- sex: string (nullable = true) |-- age: integer (nullable = false) |-- address: string (nullable = true) |-- famsize: string (nullable = true) |-- pstatus: string (nullable = true) |-- medu: integer (nullable = false) |-- fedu: integer (nullable = false) |-- mjob: string (nullable = true) |-- fjob: string (nullable = true) |-- reason: string (nullable = true) |-- guardian: string (nullable = true) |-- traveltime: integer (nullable = false) |-- studytime: integer (nullable = false) |-- failures: integer (nullable = false) |-- schoolsup: string (nullable = true) |-- famsup: string (nullable = true) |-- paid: string (nullable = true) |-- activities: string (nullable = true) |-- nursery: string (nullable = true) |-- higher: string (nullable = true) |-- internet: string (nullable = true) |-- romantic: string (nullable = true) |-- famrel: integer (nullable = false) |-- freetime: integer (nullable = false) |-- goout: integer (nullable = false) |-- dalc: integer (nullable = false) |-- walc: integer (nullable = false) |-- health: integer (nullable = false) |-- absences: integer (nullable = false) |-- g1: integer (nullable = false) |-- g2: integer (nullable = false) |-- g3: integer (nullable = false)
The foundation of Spark is the Resilient Distributed Dataset (RDD). From a programmer's perspective, the composability of RDDs just like a regular Scala collection is a huge advantage. An RDD wraps three vital (and two subsidiary) pieces of information that help in the reconstruction of data. This enables fault tolerance. The other major advantage is that while RDDs can be composed into hugely complex graphs using RDD operations, the entire flow of data itself is not very difficult to reason with.
Other than optional optimization attributes (such as data location), at its core, RDD just wraps three vital pieces of information:
In simple words, RDDs are just collections of data elements that can exist in the memory or on the disk. These data elements must be serializable in order to have the capability to be moved across multiple machines (or be serialized on the disk). The number of partitions or blocks of data is primarily determined by the source of the input data (say, if the data is in HDFS, then each block would translate to a single partition), but there are also other ways of playing around with the number of partitions.
So, the number of partitions could be any of these:
spark.default.parallelism
parameter (set while starting the cluster)repartition
or coalesce
on the RDD itselfNote that currently, for all our recipes, we are running our Spark application in the self-contained single JVM mode. While the programs work just fine, we are not yet exploiting the distributed nature of the RDDs. In Chapter 6, Scaling Up, we'll explore how to bundle and deploy our Spark application on a variety of cluster managers: YARN, Spark standalone clusters, and Mesos.
In the previous chapter, we created a DataFrame from a List
of Employee
case classes:
val listOfEmployees =List(Employee(1,"Arun"), Employee(2, "Jason"), Employee (3, "Abhi")) val empFrame=sqlContext.createDataFrame(listOfEmployees)
However, in this recipe, we loaded a file, converted them to RDD[String]
, transformed them into case classes, and finally converted them into a DataFrame.
There are subtle, yet powerful, differences in these approaches. In the first approach (converting a List
of case classes into a DataFrame), we have the entire collection in the memory of the driver (we'll look at drivers and workers in Chapter 6, Scaling Up). Except for playing around with Spark, for all practical purposes, we don't have our dataset as a collection of case classes. We generally have it as a text file or read from a database. Also, requiring to hold the entire collection in a single machine before converting it into a distributed dataset (RDD) will unfold itself as a memory issue.
In this recipe, we loaded an HDFS distributed file as an RDD[String]
that is distributed across a cluster of worker nodes, and then serialized each String
into a case class, making the RDD[String]
into an RDD[Student]
. So, each worker node that holds some partitions of the dataset handles the computation around transforming RDD[String]
to the case class, while making the resulting dataset conform to a fixed schema enforced by the case class itself. Since the computation and the data itself are distributed, we don't need to worry about a single machine requiring a lot of memory to store the entire dataset.