Loading more than 22 features into classes

Case classes have an inherent limitation. They can hold only 22 attributes—Catch 22, if you will. While a reasonable percentage of datasets would fit in that budget, in many cases, the limitation of 22 features in a dataset is a huge turnoff. In this recipe, we'll take a sample Student dataset (http://archive.ics.uci.edu/ml/datasets/Student+Performance), which has 33 features, and we'll see how we can work around this.

Note

The 22-field limit is resolved in Scala version 2.11. However, Spark 1.4 uses Scala 2.10.

How to do it...

Case classes in Scala cannot go beyond encapsulating 22 fields because the companion classes that are generated (during compilation) for these case classes cannot find the matching FunctionN and TupleN classes. Let's take the example of the Employee case class that we created in Chapter 2, Getting Started with Apache Spark DataFrames:

case class Employee(id:Int, name:String)

When we look at its decompiled companion object, we notice that for the two constructor parameters of the case class, the companion class uses Tuple2 and AbstractFunction2 in its unapply method, the method that gets invoked when we pattern-match against a case class. The problem we face is that the Scala library has objects only until Tuple22 and Function22 (probably because outside the data analysis world, having an entity object with 10 fields is not a great idea). However, there is a simple yet powerful workaround, and we will be seeing it in this recipe.

How to do it...

We saw in Chapter 2, Getting Started with Apache Spark DataFrames (in the Creating a DataFrame from CSV recipe), that the requirement for creating a DataFrame using SQLContext.createDataFrame from a collection of classes is that the class must extend scala.Product. So, what we intend to do is write our own class that extends from scala.Product.

This recipe consists of four steps:

  1. Creating SQLContext from SparkContext and Config.
  2. Creating a Student class that extends Product and overrides the necessary functions.
  3. Constructing an RDD of the Student classes from the sample dataset (student-mat.csv).
  4. Creating a DataFrame from the RDD, followed by printing the schema and sampling the data.

    Note

    Refer to the How it works… section of this recipe for a basic introduction to RDD.

    The code for this recipe can be found at https://github.com/arunma/ScalaDataAnalysisCookbook/tree/master/chapter3-data-loading.

Let's now cover these steps in detail:

  1. Creating SQLContext: As with our recipes from the previous chapter, we construct SparkContext from SparkConfig and then create an SQLContext from SparkContext:
    val conf=new SparkConf().setAppName("DataWith33Atts").setMaster("local[2]")
    val sc=new SparkContext(conf)
    val sqlContext=new SQLContext(sc)
  2. Creating the Student class: Our next step is to create a simple Scala class that declares its constructor parameters, and make it extend Product.

    Making a class extend Product requires us to override two functions from scala.Product and one function from scala.Equals (which scala.Product, in turn, extends from). The implementation of each of these functions is pretty straightforward.

    Firstly, let's make our Student class declare its fields and extend Product:

    class Student (school:String,
        sex:String,
        age:Int,
        address:String,
        famsize:String,
        pstatus:String,
        medu:Int,
        fedu:Int,
        mjob:String,
        fjob:String,
        reason:String,
        guardian:String,
        traveltime:Int,
        studytime:Int,
        failures:Int,
        schoolsup:String,
        famsup:String,
        paid:String,
        activities:String,
        nursery:String,
        higher:String,
        internet:String,
        romantic:String,
        famrel:Int,
        freetime:Int,
        goout:Int,
        dalc:Int,
        walc:Int,
        health:Int,
        absences:Int,
        g1:Int,
        g2:Int,
        g3:Int) extends Product{

    Next, let's implement these three functions after briefly looking at what they are expected to do:

    • productArity():Int: This returns the size of the attributes. In our case, it's 33. So, our implementation looks like this:
      override def productArity: Int = 33
    • productElement(n:Int):Any: Given an index, this returns the attribute. As protection, we also have a default case, which throws an IndexOutOfBoundsException exception:
      @throws(classOf[IndexOutOfBoundsException])
        override def productElement(n: Int): Any = n match {
          case 0 => school
          case 1 => sex
          case 2 => age
          case 3 => address
          case 4 => famsize
          case 5 => pstatus
          case 6 => medu
          case 7 => fedu
          case 8 => mjob
          case 9 => fjob
          case 10 => reason
          case 11 => guardian
          case 12 => traveltime
          case 13 => studytime
          case 14 => failures
          case 15 => schoolsup
          case 16 => famsup
          case 17 => paid
          case 18 => activities
          case 19 => nursery
          case 20 => higher
          case 21 => internet
          case 22 => romantic
          case 23 => famrel
          case 24 => freetime
          case 25 => goout
          case 26 => dalc
          case 27 => walc
          case 28 => health
          case 29 => absences
          case 30 => g1
          case 31 => g2
          case 32 => g3
          case _ => throw new IndexOutOfBoundsException(n.toString())
        }
    • canEqual (that:Any):Boolean: This is the last of the three functions, and it serves as a boundary condition when an equality check is being done against this class:
          override def canEqual(that: Any): Boolean = that.isInstanceOf[Student]
  3. Constructing an RDD of students from the student-mat.csv file: Now that we have our Student class ready, let's convert the "student-mat.csv" input file into a DataFrame:
      val rddOfStudents=convertCSVToStudents("student-mat.csv", sc)
    
      def convertCSVToStudents(filePath: String, sc: SparkContext): RDD[Student] = {
        val rddOfStudents: RDD[Student] = sc.textFile(filePath).flatMap(eachLine => Student(eachLine))
        rddOfStudents
      }

    As you can see, we have an apply method for Student that accepts a String and returns an Option[Student]. We use flatMap to filter out None thereby resulting in RDD[Student].

    Let's look at the Student companion object's apply function. It's a very simple function that takes a String, splits it based on semicolons into an array, and then passes the parameters to the Student's constructor. The method returns None if there is an error:

          object Student {
    
          def apply(str: String): Option[Student] = {
            val paramArray = str.split(";").map(param => param.replaceAll(""", "")) //Few values have extra double quotes around it
            Try(
              new Student(paramArray(0),
                paramArray(1),
                paramArray(2).toInt,
                paramArray(3),
                paramArray(4),
                paramArray(5),
                paramArray(6).toInt,
                paramArray(7).toInt,
                paramArray(8),
                paramArray(9),
                paramArray(10),
                paramArray(11),
                paramArray(12).toInt,
                paramArray(13).toInt,
                paramArray(14).toInt,
                paramArray(15),
                paramArray(16),
                paramArray(17),
                paramArray(18),
                paramArray(19),
                paramArray(20),
                paramArray(21),
                paramArray(22),
                paramArray(23).toInt,
                paramArray(24).toInt,
                paramArray(25).toInt,
                paramArray(26).toInt,
                paramArray(27).toInt,
                paramArray(28).toInt,
                paramArray(29).toInt,
                paramArray(30).toInt,
                paramArray(31).toInt,

                paramArray(32).toInt)) match {
                case Success(student) => Some(student)
                case Failure(throwable) => {
                  println (throwable.getMessage())
                  None
                }
              }
          }
  4. Creating a DataFrame, printing the schema, and sampling: Finally, we create a DataFrame from RDD[Student]. Converting an RDD[T] to a DataFrame of the same type is just a matter of calling the toDF() function. You are required to import sqlContext.implicits._. Optionally, you can use the createDataFrame method of sqlContext too.

    Note

    The toDF() function is overloaded so as to accept custom column names while converting to a DataFrame.

    We then print the schema using the DataFrame's printSchema() method and sample data for confirmation using the show() method:

    import sqlContext.implicits._
    
    //Create DataFrame
    val studentDFrame = rddOfStudents.toDF()
      studentDFrame.printSchema()
      studentDFrame.show()

    The following is the output of the preceding code:

    root
     |-- school: string (nullable = true)
     |-- sex: string (nullable = true)
     |-- age: integer (nullable = false)
     |-- address: string (nullable = true)
     |-- famsize: string (nullable = true)
     |-- pstatus: string (nullable = true)
     |-- medu: integer (nullable = false)
     |-- fedu: integer (nullable = false)
     |-- mjob: string (nullable = true)
     |-- fjob: string (nullable = true)
     |-- reason: string (nullable = true)
     |-- guardian: string (nullable = true)
     |-- traveltime: integer (nullable = false)
     |-- studytime: integer (nullable = false)
     |-- failures: integer (nullable = false)
     |-- schoolsup: string (nullable = true)
     |-- famsup: string (nullable = true)
     |-- paid: string (nullable = true)
     |-- activities: string (nullable = true)
     |-- nursery: string (nullable = true)
     |-- higher: string (nullable = true)
     |-- internet: string (nullable = true)
     |-- romantic: string (nullable = true)
     |-- famrel: integer (nullable = false)
     |-- freetime: integer (nullable = false)
     |-- goout: integer (nullable = false)
     |-- dalc: integer (nullable = false)
     |-- walc: integer (nullable = false)
     |-- health: integer (nullable = false)
     |-- absences: integer (nullable = false)
     |-- g1: integer (nullable = false)
     |-- g2: integer (nullable = false)
     |-- g3: integer (nullable = false)
    How to do it...

How it works...

The foundation of Spark is the Resilient Distributed Dataset (RDD). From a programmer's perspective, the composability of RDDs just like a regular Scala collection is a huge advantage. An RDD wraps three vital (and two subsidiary) pieces of information that help in the reconstruction of data. This enables fault tolerance. The other major advantage is that while RDDs can be composed into hugely complex graphs using RDD operations, the entire flow of data itself is not very difficult to reason with.

Other than optional optimization attributes (such as data location), at its core, RDD just wraps three vital pieces of information:

  • The dependent/parent RDD (empty if not available)
  • The number of partitions
  • The function that needs to be applied to each element of the RDD

In simple words, RDDs are just collections of data elements that can exist in the memory or on the disk. These data elements must be serializable in order to have the capability to be moved across multiple machines (or be serialized on the disk). The number of partitions or blocks of data is primarily determined by the source of the input data (say, if the data is in HDFS, then each block would translate to a single partition), but there are also other ways of playing around with the number of partitions.

So, the number of partitions could be any of these:

  • Dictated by the input data itself, for example, the number of blocks in the case of reading files from HDFS
  • The number set by the spark.default.parallelism parameter (set while starting the cluster)
  • The number set by calling repartition or coalesce on the RDD itself

Note that currently, for all our recipes, we are running our Spark application in the self-contained single JVM mode. While the programs work just fine, we are not yet exploiting the distributed nature of the RDDs. In Chapter 6, Scaling Up, we'll explore how to bundle and deploy our Spark application on a variety of cluster managers: YARN, Spark standalone clusters, and Mesos.

There's more…

In the previous chapter, we created a DataFrame from a List of Employee case classes:

val listOfEmployees =List(Employee(1,"Arun"), Employee(2, "Jason"), Employee (3, "Abhi"))

val empFrame=sqlContext.createDataFrame(listOfEmployees)

However, in this recipe, we loaded a file, converted them to RDD[String], transformed them into case classes, and finally converted them into a DataFrame.

There are subtle, yet powerful, differences in these approaches. In the first approach (converting a List of case classes into a DataFrame), we have the entire collection in the memory of the driver (we'll look at drivers and workers in Chapter 6, Scaling Up). Except for playing around with Spark, for all practical purposes, we don't have our dataset as a collection of case classes. We generally have it as a text file or read from a database. Also, requiring to hold the entire collection in a single machine before converting it into a distributed dataset (RDD) will unfold itself as a memory issue.

In this recipe, we loaded an HDFS distributed file as an RDD[String] that is distributed across a cluster of worker nodes, and then serialized each String into a case class, making the RDD[String] into an RDD[Student]. So, each worker node that holds some partitions of the dataset handles the computation around transforming RDD[String] to the case class, while making the resulting dataset conform to a fixed schema enforced by the case class itself. Since the computation and the data itself are distributed, we don't need to worry about a single machine requiring a lot of memory to store the entire dataset.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset