Chapter 6. Working with Unstructured Data

I am very excited to introduce you to this chapter. Unstructured data is what, in reality, makes big data different from the old data, it also makes Scala to be the new paradigm for processing the data. To start with, unstructured data at first sight seems a lot like a derogatory term. Notwithstanding, every sentence in this book is unstructured data: it does not have the traditional record / row / column semantics. For most people, however, this is the easiest thing to read rather than the book being presented as a table or spreadsheet.

In practice, the unstructured data means nested and complex data. An XML document or a photograph are good examples of unstructured data, which have very rich structure to them. My guess is that the originators of the term meant that the new data, the data that engineers at social interaction companies such as Google, Facebook, and Twitter saw, had a different structure to it as opposed to a traditional flat table that everyone used to see. These indeed did not fit the traditional RDBMS paradigm. Some of them can be flattened, but the underlying storage would be too inefficient as the RDBMSs were not optimized to handle them and also be hard to parse not only for humans, but for the machines as well.

A lot of techniques introduced in this chapter were created as an emergency Band-Aid to deal with the need to just process the data.

In this chapter, we will cover the following topics:

  • Learning about the serialization, popular serialization frameworks, and language in which the machines talk to each other
  • Learning about Avro-Parquet encoding for nested data
  • Learning how RDBMs try to incorporate nested structures in modern SQL-like languages to work with them
  • Learning how you can start working with nested structures in Scala
  • Seeing a practical example of sessionization—one of the most frequent use cases for unstructured data
  • Seeing how Scala traits and match/case statements can simplify path analysis
  • Learning where the nested structures can benefit your analysis

Nested data

You already saw unstructured data in the previous chapters, the data was an array of LabeledPoint, which is a tuple (label: Double, features: Vector). The label is just a number of type Double. Vector is a sealed trait with two subclasses: SparseVector and DenseVector. The class diagram is as follows:

Nested data

Figure 1: The LabeledPoint class structure is a tuple of label and features, where features is a trait with two inherited subclasses {Dense,Sparse}Vector. DenseVector is an array of double, while SparseVector stores only size and non-default elements by index and value.

Each observation is a tuple of label and features, and features can be sparse. Definitely, if there are no missing values, the whole row can be represented as vector. A dense vector representation requires (8 x size + 8) bytes. If most of the elements are missing—or equal to some default value—we can store only the non-default elements. In this case, we would require (12 x non_missing_size + 20) bytes, with small variations depending on the JVM implementation. So, the threshold for switching between one or another, from the storage point of view, is when the size is greater than 1.5 x ( non_missing_size + 1 ), or if roughly at least 30% of elements are non-default. While the computer languages are good at representing the complex structures via pointers, we need some convenient form to exchange these data between JVMs or machines. First, let's see first how Spark/Scala does it, specifically persisting the data in the Parquet format:

akozlov@Alexanders-MacBook-Pro$ bin/spark-shell 
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 1.6.1-SNAPSHOT
      /_/

Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LabeledPoint

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors
Wha
scala> 

scala> val points = Array(
     |    LabeledPoint(0.0, Vectors.sparse(3, Array(1), Array(1.0))),
     |    LabeledPoint(1.0, Vectors.dense(0.0, 2.0, 0.0)),
     |    LabeledPoint(2.0, Vectors.sparse(3, Array((1, 3.0)))),
     |    LabeledPoint.parse("(3.0,[0.0,4.0,0.0])"));
pts: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((0.0,(3,[1],[1.0])), (1.0,[0.0,2.0,0.0]), (2.0,(3,[1],[3.0])), (3.0,[0.0,4.0,0.0]))
scala> 

scala> val rdd = sc.parallelize(points)
rdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> 

scala> val df = rdd.repartition(1).toDF
df: org.apache.spark.sql.DataFrame = [label: double, features: vector]

scala> df.write.parquet("points")

What we did was create a new RDD dataset from command line, or we could use org.apache.spark.mllib.util.MLUtils to load a text file, converted it to a DataFrames and create a serialized representation of it in the Parquet file under the points directory.

Note

What Parquet stands for?

Apache Parquet is a columnar storage format, jointly developed by Cloudera and Twitter for big data. Columnar storage allows for better compression of values in the datasets and is more efficient if only a subset of columns need to be retrieved from the disk. Parquet was built from the ground up with complex nested data structures in mind and uses the record shredding and assembly algorithm described in the Dremel paper (https://blog.twitter.com/2013/dremel-made-simple-with-parquet). Dremel/Parquet encoding uses definition/repetition fields to denote the level in the hierarchy the data is coming from, which covers most of the immediate encoding needs, as it is sufficient to store optional fields, nested arrays, and maps. Parquet stores the data by chunks, thus probably the name Parquet, which means flooring composed of wooden blocks arranged in a geometric pattern. Parquet can be optimized for reading only a subset of blocks from disk, depending on the subset of columns to be read and the index used (although it very much depends on whether the specific implementation is aware of these features). The values in the columns can use dictionary and Run-Length Encoding (RLE), which provides exceptionally good compression for columns with many duplicate entries, a frequent use case in big data.

Parquet file is a binary format, but you might look at the information in it using parquet-tools, which are downloadable from http://archive.cloudera.com/cdh5/cdh/5:

akozlov@Alexanders-MacBook-Pro$ wget -O - http://archive.cloudera.com/cdh5/cdh/5/parquet-1.5.0-cdh5.5.0.tar.gz | tar xzvf -

akozlov@Alexanders-MacBook-Pro$ cd parquet-1.5.0-cdh5.5.0/parquet-tools

akozlov@Alexanders-MacBook-Pro$ tar xvf xvf parquet-1.5.0-cdh5.5.0/parquet-tools/target/parquet-tools-1.5.0-cdh5.5.0-bin.tar.gz

akozlov@Alexanders-MacBook-Pro$ cd parquet-tools-1.5.0-cdh5.5.0

akozlov@Alexanders-MacBook-Pro $ ./parquet-schema ~/points/*.parquet 
message spark_schema {
  optional double label;
  optional group features {
    required int32 type (INT_8);
    optional int32 size;
    optional group indices (LIST) {
      repeated group list {
        required int32 element;
      }
    }
    optional group values (LIST) {
      repeated group list {
        required double element;
      }
    }
  }
}

Let's look at the schema, which is very close to the structure depicted in Figure 1: first member is the label of type double and the second and last one is features of composite type. The keyword optional is another way of saying that the value can be null (absent) in the record for one or another reason. The lists or arrays are encoded as a repeated field. As the whole array may be absent (it is possible for all features to be absent), it is wrapped into optional groups (indices and values). Finally, the type encodes whether it is a sparse or dense representation:

akozlov@Alexanders-MacBook-Pro $ ./parquet-dump ~/points/*.parquet 
row group 0 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
label:       DOUBLE GZIP DO:0 FPO:4 SZ:78/79/1.01 VC:4 ENC:BIT_PACKED,PLAIN,RLE
features:   
.type:       INT32 GZIP DO:0 FPO:82 SZ:101/63/0.62 VC:4 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
.size:       INT32 GZIP DO:0 FPO:183 SZ:97/59/0.61 VC:4 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
.indices:   
..list:     
...element:  INT32 GZIP DO:0 FPO:280 SZ:100/65/0.65 VC:4 ENC:PLAIN_DICTIONARY,RLE
.values:    
..list:     
...element:  DOUBLE GZIP DO:0 FPO:380 SZ:125/111/0.89 VC:8 ENC:PLAIN_DICTIONARY,RLE

    label TV=4 RL=0 DL=1
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------
    page 0:                                           DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:38 VC:4

    features.type TV=4 RL=0 DL=1 DS:                 2 DE:PLAIN_DICTIONARY
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------
    page 0:                                           DLE:RLE RLE:BIT_PACKED VLE:PLAIN_DICTIONARY SZ:9 VC:4

    features.size TV=4 RL=0 DL=2 DS:                 1 DE:PLAIN_DICTIONARY
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------
    page 0:                                           DLE:RLE RLE:BIT_PACKED VLE:PLAIN_DICTIONARY SZ:9 VC:4

    features.indices.list.element TV=4 RL=1 DL=3 DS: 1 DE:PLAIN_DICTIONARY
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------
    page 0:                                           DLE:RLE RLE:RLE VLE:PLAIN_DICTIONARY SZ:15 VC:4

    features.values.list.element TV=8 RL=1 DL=3 DS:  5 DE:PLAIN_DICTIONARY
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------
    page 0:                                           DLE:RLE RLE:RLE VLE:PLAIN_DICTIONARY SZ:17 VC:8

DOUBLE label 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 4 *** 
value 1: R:0 D:1 V:0.0
value 2: R:0 D:1 V:1.0
value 3: R:0 D:1 V:2.0
value 4: R:0 D:1 V:3.0

INT32 features.type 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 4 *** 
value 1: R:0 D:1 V:0
value 2: R:0 D:1 V:1
value 3: R:0 D:1 V:0
value 4: R:0 D:1 V:1

INT32 features.size 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 4 *** 
value 1: R:0 D:2 V:3
value 2: R:0 D:1 V:<null>
value 3: R:0 D:2 V:3
value 4: R:0 D:1 V:<null>

INT32 features.indices.list.element 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 4 *** 
value 1: R:0 D:3 V:1
value 2: R:0 D:1 V:<null>
value 3: R:0 D:3 V:1
value 4: R:0 D:1 V:<null>

DOUBLE features.values.list.element 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 8 *** 
value 1: R:0 D:3 V:1.0
value 2: R:0 D:3 V:0.0
value 3: R:1 D:3 V:2.0
value 4: R:1 D:3 V:0.0
value 5: R:0 D:3 V:3.0
value 6: R:0 D:3 V:0.0
value 7: R:1 D:3 V:4.0
value 8: R:1 D:3 V:0.0

You are probably a bit confused about the R: and D: in the output. These are the repetition and definition levels as described in the Dremel paper and they are necessary to efficiently encode the values in the nested structures. Only repeated fields increment the repetition level and only non-required fields increment the definition level. Drop in R signifies the end of the list(array). For every non-required level in the hierarchy tree, one needs a new definition level. Repetition and definition level values are small by design and can be efficiently stored in a serialized form.

What is best, if there are many duplicate entries, they will all be placed together. The case for which the compression algorithm (by default, it is gzip) are optimized. Parquet also implements other algorithms exploiting repeated values such as dictionary encoding or RLE compression.

This is a simple and efficient serialization out of the box. We have been able to write a set of complex objects to a file, each column stored in a separate block, representing all values in the records and nested structures.

Let's now read the file and recover RDD. The Parquet format does not know anything about the LabeledPoint class, so we'll have to do some typecasting and trickery here. When we read the file, we'll see a collection of org.apache.spark.sql.Row:

akozlov@Alexanders-MacBook-Pro$ bin/spark-shell 
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 1.6.1-SNAPSHOT
      /_/

Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val df = sqlContext.read.parquet("points")
df: org.apache.spark.sql.DataFrame = [label: double, features: vector]

scala> val df = sqlContext.read.parquet("points").collect
df: Array[org.apache.spark.sql.Row] = Array([0.0,(3,[1],[1.0])], [1.0,[0.0,2.0,0.0]], [2.0,(3,[1],[3.0])], [3.0,[0.0,4.0,0.0]])

scala> val rdd = df.map(x => LabeledPoint(x(0).asInstanceOf[scala.Double], x(1).asInstanceOf[org.apache.spark.mllib.linalg.Vector]))
rdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[16] at map at <console>:25

scala> rdd.collect
res12: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((0.0,(3,[1],[1.0])), (1.0,[0.0,2.0,0.0]), (2.0,(3,[1],[3.0])), (3.0,[0.0,4.0,0.0]))

scala> rdd.filter(_.features(1) <= 2).collect
res13: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((0.0,(3,[1],[1.0])), (1.0,[0.0,2.0,0.0]))

Personally, I think that this is pretty cool: without any compilation, we can encode and decide complex objects. One can easily create their own objects in REPL. Let's consider that we want to track user's behavior on the web:

akozlov@Alexanders-MacBook-Pro$ bin/spark-shell 
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 1.6.1-SNAPSHOT
      /_/

Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> case class Person(id: String, visits: Array[String]) { override def toString: String = { val vsts = visits.mkString(","); s"($id -> $vsts)" } }
defined class Person

scala> val p1 = Person("Phil", Array("http://www.google.com", "http://www.facebook.com", "http://www.linkedin.com", "http://www.homedepot.com"))
p1: Person = (Phil -> http://www.google.com,http://www.facebook.com,http://www.linkedin.com,http://www.homedepot.com)

scala> val p2 = Person("Emily", Array("http://www.victoriassecret.com", "http://www.pacsun.com", "http://www.abercrombie.com/shop/us", "http://www.orvis.com"))
p2: Person = (Emily -> http://www.victoriassecret.com,http://www.pacsun.com,http://www.abercrombie.com/shop/us,http://www.orvis.com)

scala> sc.parallelize(Array(p1,p2)).repartition(1).toDF.write.parquet("history")

scala> import scala.collection.mutable.WrappedArray
import scala.collection.mutable.WrappedArray

scala> val df = sqlContext.read.parquet("history")
df: org.apache.spark.sql.DataFrame = [id: string, visits: array<string>]

scala> val rdd = df.map(x => Person(x(0).asInstanceOf[String], x(1).asInstanceOf[WrappedArray[String]].toArray[String]))
rdd: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[27] at map at <console>:28

scala> rdd.collect
res9: Array[Person] = Array((Phil -> http://www.google.com,http://www.facebook.com,http://www.linkedin.com,http://www.homedepot.com), (Emily -> http://www.victoriassecret.com,http://www.pacsun.com,http://www.abercrombie.com/shop/us,http://www.orvis.com))

As a matter of good practice, we need to register the newly created classes with the Kryo serializer—Spark will use another serialization mechanism to pass the objects between tasks and executors. If the class is not registered, Spark will use default Java serialization, which might be up to 10 x slower:

scala> :paste
// Entering paste mode (ctrl-D to finish)

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[Person])
  }
}

object MyKryoRegistrator {
  def register(conf: org.apache.spark.SparkConf) {
    conf.set("spark.serializer", classOf[KryoSerializer].getName)
    conf.set("spark.kryo.registrator", classOf[MyKryoRegistrator].getName)
  }
}
^D

// Exiting paste mode, now interpreting.

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}
defined class MyKryoRegistrator
defined module MyKryoRegistrator

scala>

If you are deploying the code on a cluster, the recommendation is to put this code in a jar on the classpath.

I've certainly seen examples of up to 10 level deep nesting in production. Although this might be an overkill for performance reasons, nesting is required in more and more production business use cases. Before we go into the specifics of constructing a nested object in the example of sessionization, let's get an overview of serialization in general.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset